private Observable<Void> getIntervalObservable(HttpServerRequest<?> request, final HttpServerResponse<ServerSentEvent> response) { HttpRequest simpleRequest = new HttpRequest(request.getQueryParameters()); return getEvents(simpleRequest) .flatMap(event -> { System.out.println("Writing SSE event: " + event); ByteBuf data = response.getAllocator().buffer().writeBytes(( event + "\n").getBytes()); ServerSentEvent sse = new ServerSentEvent(data); return response.writeAndFlush(sse); }).materialize() .takeWhile(notification -> { if (notification.isOnError()) { System.out.println("Write to client failed, stopping response sending."); notification.getThrowable().printStackTrace(System.err); } return !notification.isOnError(); }) .map((Func1<Notification<Void>, Void>) notification -> null); }
@Override protected Observable<Void> handleRequest(HttpServerRequest<?> request, HttpServerResponse<ServerSentEvent> response) { List<String> videoIds = request.getQueryParameters().get("videoId"); int latency = 1; if (Random.randomIntFrom0to100() > 80) { latency = 10; } return Observable.from(videoIds).map(videoId -> { Map<String, Object> video = new HashMap<>(); video.put("videoId", videoId); video.put("position", (int) (Math.random() * 5000)); return video; }).flatMap(video -> response.writeStringAndFlush("data: " + SimpleJson.mapToJson(video) + "\n")) .delay(latency, TimeUnit.MILLISECONDS).doOnCompleted(response::close); // simulate latency }
@Override protected Observable<Void> handleRequest(HttpServerRequest<?> request, HttpServerResponse<ServerSentEvent> response) { return request.getContent().flatMap(i -> { List<String> ips = request.getQueryParameters().get("ip"); Map<String, Object> data = new HashMap<>(); for (String ip : ips) { Map<String, Object> ip_data = new HashMap<>(); ip_data.put("country_code", "GB"); ip_data.put("longitude", "-0.13"); ip_data.put("latitude", "51.5"); data.put(ip, ip_data); } return response.writeStringAndFlush("data: " + SimpleJson.mapToJson(data) + "\n") .doOnCompleted(response::close); }).delay(10, TimeUnit.MILLISECONDS); }
@Override protected Observable<Void> handleRequest(HttpServerRequest<?> request, HttpServerResponse<ServerSentEvent> response) { List<String> userIds = request.getQueryParameters().get("userId"); if (userIds == null || userIds.size() == 0) { return writeError(request, response, "At least one parameter of 'userId' must be included."); } return Observable.from(userIds).map(userId -> { Map<String, Object> user = new HashMap<>(); user.put("userId", userId); user.put("name", "Name Here"); user.put("other_data", "goes_here"); return user; }).flatMap(user -> response.writeStringAndFlush("data: " + SimpleJson.mapToJson(user) + "\n") .doOnCompleted(response::close)) .delay(((long) (Math.random() * 500) + 500), TimeUnit.MILLISECONDS); // simulate latency }
private static Observable<ServerSentEvent> buildStream(Instant since, String lastStreamedBuildId) { AtomicReference<String> _lastBuildId = new AtomicReference<>(null); final String buildsSinceUri = "/build-export/v1/builds/since/" + String.valueOf(since.toEpochMilli()); LOGGER.info("Builds uri: " + buildsSinceUri); HttpClientRequest<ByteBuf, ByteBuf> request = HTTP_CLIENT .createGet(buildsSinceUri) .setKeepAlive(true); if (BASIC_AUTH != null) { request = request.addHeader("Authorization", "Basic " + BASIC_AUTH); } if (lastStreamedBuildId != null) { request = request.addHeader("Last-Event-ID", lastStreamedBuildId); } return request .flatMap(HttpClientResponse::getContentAsServerSentEvents) .doOnNext(serverSentEvent -> _lastBuildId.set(serverSentEvent.getEventIdAsString())) .doOnSubscribe(() -> LOGGER.info("Streaming builds...")) .onErrorResumeNext(t -> { LOGGER.info("Error streaming builds, resuming from build id: " + _lastBuildId.get()); return buildStream(since, _lastBuildId.get()); }); }
private Observable<String> initializeStream() { HttpClient<ByteBuf, ServerSentEvent> client = RxNetty.createHttpClient("localhost", port, PipelineConfigurators.<ByteBuf>clientSseConfigurator()); return client.submit(HttpClientRequest.createGet("/hello")). flatMap(response -> { printResponseHeader(response); return response.getContent(); }).map(serverSentEvent -> serverSentEvent.contentAsString()); }
private static void printResponseHeader(HttpClientResponse<ServerSentEvent> response) { System.out.println("New response received."); System.out.println("========================"); System.out.println(response.getHttpVersion().text() + ' ' + response.getStatus().code() + ' ' + response.getStatus().reasonPhrase()); for (Map.Entry<String, String> header : response.getHeaders().entries()) { System.out.println(header.getKey() + ": " + header.getValue()); } }
public HttpServer<ByteBuf, ServerSentEvent> createServer() { if (flaky) { events = SubscriptionLimiter .limitSubscriptions(1,initializeEventStream()); } else { events = initializeEventStream(); } return super.createServer(); }
public HttpServer<ByteBuf, ServerSentEvent> createServer() { HttpServer<ByteBuf, ServerSentEvent> server = RxNetty.createHttpServer(port, (request, response) -> { response.getHeaders().set(ACCESS_CONTROL_ALLOW_ORIGIN, "*"); response.getHeaders().set(CACHE_CONTROL, "no-cache"); response.getHeaders().set(CONNECTION, "keep-alive"); response.getHeaders().set(CONTENT_TYPE, "text/event-stream"); return getIntervalObservable(request, response); }, PipelineConfigurators.<ByteBuf>serveSseConfigurator()); System.out.println("HTTP Server Sent Events server started..."); return server; }
/** * Creates a new hystrix reader. * * @param configuration The configuration to use. * @param cluster The cluster to read from. */ public HystrixReader(Configuration configuration, String cluster) { this.configuration = configuration; this.cluster = cluster; HttpClientBuilder<ByteBuf, ServerSentEvent> builder = RxNetty.newHttpClientBuilder(configuration.getTurbineHost(), configuration.getTurbinePort()); builder.pipelineConfigurator(PipelineConfigurators.clientSseConfigurator()); if (configuration.isSecure()) { builder.withSslEngineFactory(DefaultFactories.trustAll()); } rxNetty = builder.build(); }
@Override public Observable<String> observeJson() { if(jsonObservable != null) { return jsonObservable; } HttpClientRequest<ByteBuf> request = HttpClientRequest.createGet(url.getPath() + "?" + url.getQuery()); int port = url.getPort() < 0 ? url.getDefaultPort() : url.getPort(); HttpClient<ByteBuf, ServerSentEvent> client = RxNetty.<ByteBuf, ServerSentEvent>newHttpClientBuilder(url.getHost(), port) .withNoConnectionPooling() .pipelineConfigurator(PipelineConfigurators.<ByteBuf>clientSseConfigurator()) .build(); jsonObservable = client.submit(request) .doOnError(t -> LOG.error("Error connecting to " + url, t)) .flatMap(response -> { if (response.getStatus().code() != 200) { return Observable.error(new RuntimeException("Failed to connect: " + response.getStatus())); } return response.getContent() .doOnSubscribe(() -> LOG.info("Turbine => Aggregate Stream from URL: " + url)) .doOnUnsubscribe(() -> LOG.info("Turbine => Unsubscribing Stream: " + url)) .map(ServerSentEvent::contentAsString); } ) .timeout(120, TimeUnit.SECONDS) .retryWhen(attempts -> attempts.zipWith(Observable.range(1, Integer.MAX_VALUE), (k, i) -> i) .flatMap(n -> { int waitTimeSeconds = Math.min(6, n) * 10; // wait in 10 second increments up to a max of 1 minute LOG.info("Turbine => Retrying connection to: " + this.url + " in {} seconds", waitTimeSeconds); return Observable.timer(waitTimeSeconds, TimeUnit.SECONDS); }) ) .repeat() .share(); return jsonObservable; }
@Bean @SuppressWarnings("deprecation") public HttpServer<ByteBuf, ServerSentEvent> aggregatorServer() { // multicast so multiple concurrent subscribers get the same stream Observable<Map<String, Object>> publishedStreams = StreamAggregator .aggregateGroupedStreams(hystrixSubject().groupBy( data -> InstanceKey.create((String) data.get("instanceId")))) .doOnUnsubscribe(() -> log.info("Unsubscribing aggregation.")) .doOnSubscribe(() -> log.info("Starting aggregation")).flatMap(o -> o) .publish().refCount(); Observable<Map<String, Object>> ping = Observable.timer(1, 10, TimeUnit.SECONDS) .map(count -> Collections.singletonMap("type", (Object) "Ping")).publish() .refCount(); Observable<Map<String, Object>> output = Observable.merge(publishedStreams, ping); this.turbinePort = this.properties.getPort(); if (this.turbinePort <= 0) { this.turbinePort = SocketUtils.findAvailableTcpPort(40000); } HttpServer<ByteBuf, ServerSentEvent> httpServer = RxNetty .createHttpServer(this.turbinePort, (request, response) -> { log.info("SSE Request Received"); response.getHeaders().setHeader("Content-Type", "text/event-stream"); return output.doOnUnsubscribe( () -> log.info("Unsubscribing RxNetty server connection")) .flatMap(data -> response.writeAndFlush(new ServerSentEvent( null, Unpooled.copiedBuffer("message", StandardCharsets.UTF_8), Unpooled.copiedBuffer(JsonUtility.mapToJson(data), StandardCharsets.UTF_8)))); }, serveSseConfigurator()); return httpServer; }
public HttpServer<ByteBuf, ServerSentEvent> createServer(int port) { System.out.println("Start " + getClass().getSimpleName() + " on port: " + port); // declare handler chain (wrapped in Hystrix) // TODO create a better way of chaining these (related https://github.com/ReactiveX/RxNetty/issues/232 and https://github.com/ReactiveX/RxNetty/issues/202) HystrixMetricsStreamHandler<ByteBuf, ServerSentEvent> handlerChain = new HystrixMetricsStreamHandler<>(metrics, "/hystrix.stream", 1000, (request, response) -> { try { long startTime = System.currentTimeMillis(); return handleRequest(request, response) .doOnCompleted(() -> System.out.println("Response => " + request.getPath() + " Time => " + (int) (System.currentTimeMillis() - startTime) + "ms")) .doOnCompleted(() -> metrics.getRollingPercentile().addValue((int) (System.currentTimeMillis() - startTime))) .doOnCompleted(() -> metrics.getRollingNumber().add(Metrics.EventType.SUCCESS, 1)) .doOnError(t -> metrics.getRollingNumber().add(Metrics.EventType.FAILURE, 1)); } catch (Throwable e) { e.printStackTrace(); System.err.println("Server => Error [" + request.getPath() + "] => " + e); response.setStatus(HttpResponseStatus.BAD_REQUEST); return response.writeStringAndFlush("data: Error 500: Bad Request\n" + e.getMessage() + "\n"); } }); return RxNetty.createHttpServer(port, (request, response) -> { // System.out.println("Server => Request: " + request.getPath()); return handlerChain.handle(request, response); }, PipelineConfigurators.<ByteBuf> serveSseConfigurator()); }
@Override protected Observable<Void> handleRequest(HttpServerRequest<?> request, HttpServerResponse<ServerSentEvent> response) { return Observable.from(request.getQueryParameters().get("userId")).map(userId -> { Map<String, Object> userData = new HashMap<>(); userData.put("user_id", userId); userData.put("list_title", "Really quirky and over detailed list title!"); userData.put("other_data", "goes_here"); userData.put("videos", Arrays.asList(12345, 23456, 34567, 45678, 56789, 67890)); return userData; }).flatMap(list -> response.writeStringAndFlush("data: " + SimpleJson.mapToJson(list) + "\n")) .delay(((long) (Math.random() * 100) + 20), TimeUnit.MILLISECONDS) .doOnCompleted(response::close); // simulate latency }
@Override protected Observable<Void> handleRequest(HttpServerRequest<?> request, HttpServerResponse<ServerSentEvent> response) { return Observable.from(request.getQueryParameters().get("userId")).map(userId -> { Map<String, Object> user = new HashMap<>(); user.put("userId", userId); user.put("friends", Arrays.asList(randomUser(), randomUser(), randomUser(), randomUser())); return user; }).flatMap(list -> response.writeStringAndFlush("data: " + SimpleJson.mapToJson(list) + "\n")) .delay(((long) (Math.random() * 100) + 20), TimeUnit.MILLISECONDS).doOnCompleted(response::close); // simulate latency }
@Override protected Observable<Void> handleRequest(HttpServerRequest<?> request, HttpServerResponse<ServerSentEvent> response) { List<String> videoIds = request.getQueryParameters().get("videoId"); return Observable.from(videoIds).map(videoId -> { Map<String, Object> video = new HashMap<>(); video.put("videoId", videoId); video.put("title", "Video Title"); video.put("other_data", "goes_here"); return video; }).flatMap(video -> response.writeStringAndFlush("data: " + SimpleJson.mapToJson(video) + "\n") .doOnCompleted(response::close)) .delay(((long) (Math.random() * 20) + 20), TimeUnit.MILLISECONDS); // simulate latency }
@Override protected Observable<Void> handleRequest(HttpServerRequest<?> request, HttpServerResponse<ServerSentEvent> response) { List<String> _id = request.getQueryParameters().get("id"); if (_id == null || _id.size() != 1) { return writeError(request, response, "Please provide a numerical 'id' value. It can be a random number (uuid). Received => " + _id); } long id = Long.parseLong(String.valueOf(_id.get(0))); int delay = getParameter(request, "delay", 50); // default to 50ms server-side delay int itemSize = getParameter(request, "itemSize", 128); // default to 128 bytes item size (assuming ascii text) int numItems = getParameter(request, "numItems", 10); // default to 10 items in a list // no more than 100 items if (numItems < 1 || numItems > 100) { return writeError(request, response, "Please choose a 'numItems' value from 1 to 100."); } // no larger than 50KB per item if (itemSize < 1 || itemSize > 1024 * 50) { return writeError(request, response, "Please choose an 'itemSize' value from 1 to 1024*50 (50KB)."); } // no larger than 60 second delay if (delay < 0 || delay > 60000) { return writeError(request, response, "Please choose a 'delay' value from 0 to 60000 (60 seconds)."); } response.setStatus(HttpResponseStatus.OK); return MockResponse.generateJson(id, delay, itemSize, numItems) .flatMap(json -> response.writeStringAndFlush("data:" + json + "\n")) .doOnCompleted(response::close); }
@Override protected Observable<Void> handleRequest(HttpServerRequest<?> request, HttpServerResponse<ServerSentEvent> response) { List<String> videoIds = request.getQueryParameters().get("videoId"); return Observable.from(videoIds).map(videoId -> { Map<String, Object> video = new HashMap<>(); video.put("videoId", videoId); video.put("estimated_user_rating", 3.5); video.put("actual_user_rating", 4); video.put("average_user_rating", 3.1); return video; }).flatMap(video -> response.writeStringAndFlush("data: " + SimpleJson.mapToJson(video) + "\n")) .delay(20, TimeUnit.MILLISECONDS).doOnCompleted(response::close); // simulate latenc }
public LoadBalancer<HttpClientHolder<ByteBuf, ServerSentEvent>> forVip(String targetVip) { Observable<MembershipEvent<Host>> eurekaHostSource = membershipSource.forInterest(Interests.forVips(targetVip), instanceInfo -> { String ipAddress = instanceInfo.getDataCenterInfo() .getAddresses().stream() .filter(na -> na.getProtocolType() == ProtocolType.IPv4) .collect(Collectors.toList()).get(0).getIpAddress(); HashSet<ServicePort> servicePorts = instanceInfo.getPorts(); ServicePort portToUse = servicePorts.iterator().next(); return new Host(ipAddress, portToUse.getPort()); }); final Map<Host, HttpClientHolder<ByteBuf, ServerSentEvent>> hostVsHolders = new ConcurrentHashMap<>(); String lbName = targetVip + "-lb"; return LoadBalancers.newBuilder(eurekaHostSource.map( hostEvent -> { HttpClient<ByteBuf, ServerSentEvent> client = clientPool.getClientForHost(hostEvent.getClient()); HttpClientHolder<ByteBuf, ServerSentEvent> holder; if (hostEvent.getType() == MembershipEvent.EventType.REMOVE) { holder = hostVsHolders.remove(hostEvent.getClient()); } else { holder = new HttpClientHolder<>(client); hostVsHolders.put(hostEvent.getClient(), holder); } return new MembershipEvent<>(hostEvent.getType(), holder); })).withWeightingStrategy(new LinearWeightingStrategy<>(new RxNettyPendingRequests<>())) .withName(lbName) .withFailureDetector(new RxNettyFailureDetector<>()).build(); }
@Override protected Observable<Catalog> construct() { HttpClientRequest<ByteBuf> request = HttpClientRequest.createGet("/catalog?" + UrlGenerator.generate("userId", users)); return loadBalancer.choose() .map(holder -> holder.getClient()) .<Catalog>flatMap(client -> client.submit(request) .flatMap(r -> r.getContent() .map((ServerSentEvent sse) -> Catalog.fromJson(sse.contentAsString())))) .retry(1); }
@Override protected Observable<Social> construct() { HttpClientRequest<ByteBuf> request = HttpClientRequest.createGet("/social?" + UrlGenerator.generate("userId", users)); return loadBalancer.choose().map(holder -> holder.getClient()) .<Social>flatMap(client -> client.submit(request) .flatMap(r -> r.getContent().map((ServerSentEvent sse) -> { String social = sse.contentAsString(); return Social.fromJson(social); }))) .retry(1); }
@Override protected Observable<GeoIP> construct() { HttpClientRequest<ByteBuf> request = HttpClientRequest.createGet("/geo?" + UrlGenerator.generate("ip", ips)); return loadBalancer.choose() .map(holder -> holder.getClient()) .<GeoIP>flatMap(client -> client.submit(request) .flatMap(r -> r.getContent() .map((ServerSentEvent sse) -> GeoIP.fromJson(sse.contentAsString())))) .retry(1); }
@Override protected Observable<User> construct() { HttpClientRequest<ByteBuf> request = HttpClientRequest.createGet("/user?" + UrlGenerator.generate("userId", userIds)); return loadBalancer.choose().map(holder -> holder.getClient()) .<User>flatMap(client -> client.submit(request) .flatMap(r -> r.getContent().map( (ServerSentEvent sse) -> { String user = sse.contentAsString(); return User.fromJson(user); }))) .retry(1); }
@Override protected Observable<VideoMetadata> construct() { HttpClientRequest<ByteBuf> request = HttpClientRequest.createGet("/metadata?" + UrlGenerator.generate("videoId", videos)); return loadBalancer.choose() .map(holder -> holder.getClient()) .<VideoMetadata>flatMap(client -> client.submit(request) .flatMap(r -> r.getContent() .map((ServerSentEvent sse) -> VideoMetadata.fromJson(sse.contentAsString())))) .retry(1); }
@Override protected Observable<Rating> construct() { HttpClientRequest<ByteBuf> request = HttpClientRequest.createGet("/ratings?" + UrlGenerator.generate("videoId", videos)); return loadBalancer.choose() .map(holder -> holder.getClient()) .<Rating>flatMap(client -> client.submit(request) .flatMap(r -> r.getContent() .map((ServerSentEvent sse) -> Rating.fromJson(sse.contentAsString())))) .retry(1); }
public BookmarksCommand(List<Video> videos, LoadBalancer<HttpClientHolder<ByteBuf, ServerSentEvent>> loadBalancer) { super(HystrixCommandGroupKey.Factory.asKey("GetBookmarks")); this.videos = videos; this.loadBalancer = loadBalancer; StringBuilder b = new StringBuilder(); for (Video v : videos) { b.append(v.getId()).append("-"); } this.cacheKey = b.toString(); }
@Override public Observable<Bookmark> construct() { HttpClientRequest<ByteBuf> request = HttpClientRequest.createGet("/bookmarks?" + UrlGenerator.generate("videoId", videos)); return loadBalancer.choose() .map(holder -> holder.getClient()) .<Bookmark>flatMap(client -> client.submit(request) .flatMap(r -> r.getContent().map((ServerSentEvent sse) -> Bookmark.fromJson(sse.contentAsString())))) .retry(1); }
public LoadBalancerFactory(EurekaMembershipSource membershipSource, HttpClientPool<ByteBuf, ServerSentEvent> clientPool) { this.membershipSource = membershipSource; this.clientPool = clientPool; }
protected abstract Observable<Void> handleRequest(HttpServerRequest<?> request, HttpServerResponse<ServerSentEvent> response);