Java 类io.reactivex.netty.protocol.http.sse.ServerSentEvent 实例源码
项目:MarketData
文件:RxNettyEventServer.java
private Observable<Void> getIntervalObservable(HttpServerRequest<?> request, final HttpServerResponse<ServerSentEvent> response) {
HttpRequest simpleRequest = new HttpRequest(request.getQueryParameters());
return getEvents(simpleRequest)
.flatMap(event -> {
System.out.println("Writing SSE event: " + event);
ByteBuf data = response.getAllocator().buffer().writeBytes(( event + "\n").getBytes());
ServerSentEvent sse = new ServerSentEvent(data);
return response.writeAndFlush(sse);
}).materialize()
.takeWhile(notification -> {
if (notification.isOnError()) {
System.out.println("Write to client failed, stopping response sending.");
notification.getThrowable().printStackTrace(System.err);
}
return !notification.isOnError();
})
.map((Func1<Notification<Void>, Void>) notification -> null);
}
项目:ReactiveLab
文件:BookmarksService.java
@Override
protected Observable<Void> handleRequest(HttpServerRequest<?> request, HttpServerResponse<ServerSentEvent> response) {
List<String> videoIds = request.getQueryParameters().get("videoId");
int latency = 1;
if (Random.randomIntFrom0to100() > 80) {
latency = 10;
}
return Observable.from(videoIds).map(videoId -> {
Map<String, Object> video = new HashMap<>();
video.put("videoId", videoId);
video.put("position", (int) (Math.random() * 5000));
return video;
}).flatMap(video -> response.writeStringAndFlush("data: " + SimpleJson.mapToJson(video) + "\n"))
.delay(latency, TimeUnit.MILLISECONDS).doOnCompleted(response::close); // simulate latency
}
项目:ReactiveLab
文件:GeoService.java
@Override
protected Observable<Void> handleRequest(HttpServerRequest<?> request, HttpServerResponse<ServerSentEvent> response) {
return request.getContent().flatMap(i -> {
List<String> ips = request.getQueryParameters().get("ip");
Map<String, Object> data = new HashMap<>();
for (String ip : ips) {
Map<String, Object> ip_data = new HashMap<>();
ip_data.put("country_code", "GB");
ip_data.put("longitude", "-0.13");
ip_data.put("latitude", "51.5");
data.put(ip, ip_data);
}
return response.writeStringAndFlush("data: " + SimpleJson.mapToJson(data) + "\n")
.doOnCompleted(response::close);
}).delay(10, TimeUnit.MILLISECONDS);
}
项目:ReactiveLab
文件:UserService.java
@Override
protected Observable<Void> handleRequest(HttpServerRequest<?> request, HttpServerResponse<ServerSentEvent> response) {
List<String> userIds = request.getQueryParameters().get("userId");
if (userIds == null || userIds.size() == 0) {
return writeError(request, response, "At least one parameter of 'userId' must be included.");
}
return Observable.from(userIds).map(userId -> {
Map<String, Object> user = new HashMap<>();
user.put("userId", userId);
user.put("name", "Name Here");
user.put("other_data", "goes_here");
return user;
}).flatMap(user -> response.writeStringAndFlush("data: " + SimpleJson.mapToJson(user) + "\n")
.doOnCompleted(response::close))
.delay(((long) (Math.random() * 500) + 500), TimeUnit.MILLISECONDS); // simulate latency
}
项目:ge-export
文件:Application.java
private static Observable<ServerSentEvent> buildStream(Instant since, String lastStreamedBuildId) {
AtomicReference<String> _lastBuildId = new AtomicReference<>(null);
final String buildsSinceUri = "/build-export/v1/builds/since/" + String.valueOf(since.toEpochMilli());
LOGGER.info("Builds uri: " + buildsSinceUri);
HttpClientRequest<ByteBuf, ByteBuf> request = HTTP_CLIENT
.createGet(buildsSinceUri)
.setKeepAlive(true);
if (BASIC_AUTH != null) {
request = request.addHeader("Authorization", "Basic " + BASIC_AUTH);
}
if (lastStreamedBuildId != null) {
request = request.addHeader("Last-Event-ID", lastStreamedBuildId);
}
return request
.flatMap(HttpClientResponse::getContentAsServerSentEvents)
.doOnNext(serverSentEvent -> _lastBuildId.set(serverSentEvent.getEventIdAsString()))
.doOnSubscribe(() -> LOGGER.info("Streaming builds..."))
.onErrorResumeNext(t -> {
LOGGER.info("Error streaming builds, resuming from build id: " + _lastBuildId.get());
return buildStream(since, _lastBuildId.get());
});
}
项目:MarketData
文件:RxNettyEventEventStreamClient.java
private Observable<String> initializeStream() {
HttpClient<ByteBuf, ServerSentEvent> client =
RxNetty.createHttpClient("localhost", port, PipelineConfigurators.<ByteBuf>clientSseConfigurator());
return client.submit(HttpClientRequest.createGet("/hello")).
flatMap(response -> {
printResponseHeader(response);
return response.getContent();
}).map(serverSentEvent -> serverSentEvent.contentAsString());
}
项目:MarketData
文件:RxNettyEventEventStreamClient.java
private static void printResponseHeader(HttpClientResponse<ServerSentEvent> response) {
System.out.println("New response received.");
System.out.println("========================");
System.out.println(response.getHttpVersion().text() + ' ' + response.getStatus().code()
+ ' ' + response.getStatus().reasonPhrase());
for (Map.Entry<String, String> header : response.getHeaders().entries()) {
System.out.println(header.getKey() + ": " + header.getValue());
}
}
项目:MarketData
文件:RxNettyEventBroadcaster.java
public HttpServer<ByteBuf, ServerSentEvent> createServer() {
if (flaky) {
events = SubscriptionLimiter
.limitSubscriptions(1,initializeEventStream());
} else {
events = initializeEventStream();
}
return super.createServer();
}
项目:MarketData
文件:RxNettyEventServer.java
public HttpServer<ByteBuf, ServerSentEvent> createServer() {
HttpServer<ByteBuf, ServerSentEvent> server = RxNetty.createHttpServer(port,
(request, response) -> {
response.getHeaders().set(ACCESS_CONTROL_ALLOW_ORIGIN, "*");
response.getHeaders().set(CACHE_CONTROL, "no-cache");
response.getHeaders().set(CONNECTION, "keep-alive");
response.getHeaders().set(CONTENT_TYPE, "text/event-stream");
return getIntervalObservable(request, response);
}, PipelineConfigurators.<ByteBuf>serveSseConfigurator());
System.out.println("HTTP Server Sent Events server started...");
return server;
}
项目:vizceral-hystrix
文件:HystrixReader.java
/**
* Creates a new hystrix reader.
*
* @param configuration The configuration to use.
* @param cluster The cluster to read from.
*/
public HystrixReader(Configuration configuration, String cluster)
{
this.configuration = configuration;
this.cluster = cluster;
HttpClientBuilder<ByteBuf, ServerSentEvent> builder = RxNetty.newHttpClientBuilder(configuration.getTurbineHost(), configuration.getTurbinePort());
builder.pipelineConfigurator(PipelineConfigurators.clientSseConfigurator());
if (configuration.isSecure())
{
builder.withSslEngineFactory(DefaultFactories.trustAll());
}
rxNetty = builder.build();
}
项目:argos-dashboard
文件:DefaultHystrixClusterMonitor.java
@Override
public Observable<String> observeJson() {
if(jsonObservable != null) {
return jsonObservable;
}
HttpClientRequest<ByteBuf> request = HttpClientRequest.createGet(url.getPath() + "?" + url.getQuery());
int port = url.getPort() < 0 ? url.getDefaultPort() : url.getPort();
HttpClient<ByteBuf, ServerSentEvent> client = RxNetty.<ByteBuf, ServerSentEvent>newHttpClientBuilder(url.getHost(), port)
.withNoConnectionPooling()
.pipelineConfigurator(PipelineConfigurators.<ByteBuf>clientSseConfigurator())
.build();
jsonObservable = client.submit(request)
.doOnError(t -> LOG.error("Error connecting to " + url, t))
.flatMap(response -> {
if (response.getStatus().code() != 200) {
return Observable.error(new RuntimeException("Failed to connect: " + response.getStatus()));
}
return response.getContent()
.doOnSubscribe(() -> LOG.info("Turbine => Aggregate Stream from URL: " + url))
.doOnUnsubscribe(() -> LOG.info("Turbine => Unsubscribing Stream: " + url))
.map(ServerSentEvent::contentAsString);
}
)
.timeout(120, TimeUnit.SECONDS)
.retryWhen(attempts -> attempts.zipWith(Observable.range(1, Integer.MAX_VALUE), (k, i) -> i)
.flatMap(n -> {
int waitTimeSeconds = Math.min(6, n) * 10; // wait in 10 second increments up to a max of 1 minute
LOG.info("Turbine => Retrying connection to: " + this.url + " in {} seconds", waitTimeSeconds);
return Observable.timer(waitTimeSeconds, TimeUnit.SECONDS);
})
)
.repeat()
.share();
return jsonObservable;
}
项目:spring-cloud-netflix
文件:TurbineStreamConfiguration.java
@Bean
@SuppressWarnings("deprecation")
public HttpServer<ByteBuf, ServerSentEvent> aggregatorServer() {
// multicast so multiple concurrent subscribers get the same stream
Observable<Map<String, Object>> publishedStreams = StreamAggregator
.aggregateGroupedStreams(hystrixSubject().groupBy(
data -> InstanceKey.create((String) data.get("instanceId"))))
.doOnUnsubscribe(() -> log.info("Unsubscribing aggregation."))
.doOnSubscribe(() -> log.info("Starting aggregation")).flatMap(o -> o)
.publish().refCount();
Observable<Map<String, Object>> ping = Observable.timer(1, 10, TimeUnit.SECONDS)
.map(count -> Collections.singletonMap("type", (Object) "Ping")).publish()
.refCount();
Observable<Map<String, Object>> output = Observable.merge(publishedStreams, ping);
this.turbinePort = this.properties.getPort();
if (this.turbinePort <= 0) {
this.turbinePort = SocketUtils.findAvailableTcpPort(40000);
}
HttpServer<ByteBuf, ServerSentEvent> httpServer = RxNetty
.createHttpServer(this.turbinePort, (request, response) -> {
log.info("SSE Request Received");
response.getHeaders().setHeader("Content-Type", "text/event-stream");
return output.doOnUnsubscribe(
() -> log.info("Unsubscribing RxNetty server connection"))
.flatMap(data -> response.writeAndFlush(new ServerSentEvent(
null,
Unpooled.copiedBuffer("message",
StandardCharsets.UTF_8),
Unpooled.copiedBuffer(JsonUtility.mapToJson(data),
StandardCharsets.UTF_8))));
}, serveSseConfigurator());
return httpServer;
}
项目:ReactiveLab
文件:AbstractMiddleTierService.java
public HttpServer<ByteBuf, ServerSentEvent> createServer(int port) {
System.out.println("Start " + getClass().getSimpleName() + " on port: " + port);
// declare handler chain (wrapped in Hystrix)
// TODO create a better way of chaining these (related https://github.com/ReactiveX/RxNetty/issues/232 and https://github.com/ReactiveX/RxNetty/issues/202)
HystrixMetricsStreamHandler<ByteBuf, ServerSentEvent> handlerChain
= new HystrixMetricsStreamHandler<>(metrics, "/hystrix.stream", 1000, (request, response) -> {
try {
long startTime = System.currentTimeMillis();
return handleRequest(request, response)
.doOnCompleted(() -> System.out.println("Response => " + request.getPath() + " Time => " + (int) (System.currentTimeMillis() - startTime) + "ms"))
.doOnCompleted(() -> metrics.getRollingPercentile().addValue((int) (System.currentTimeMillis() - startTime)))
.doOnCompleted(() -> metrics.getRollingNumber().add(Metrics.EventType.SUCCESS, 1))
.doOnError(t -> metrics.getRollingNumber().add(Metrics.EventType.FAILURE, 1));
} catch (Throwable e) {
e.printStackTrace();
System.err.println("Server => Error [" + request.getPath() + "] => " + e);
response.setStatus(HttpResponseStatus.BAD_REQUEST);
return response.writeStringAndFlush("data: Error 500: Bad Request\n" + e.getMessage() + "\n");
}
});
return RxNetty.createHttpServer(port, (request, response) -> {
// System.out.println("Server => Request: " + request.getPath());
return handlerChain.handle(request, response);
}, PipelineConfigurators.<ByteBuf> serveSseConfigurator());
}
项目:ReactiveLab
文件:PersonalizedCatalogService.java
@Override
protected Observable<Void> handleRequest(HttpServerRequest<?> request, HttpServerResponse<ServerSentEvent> response) {
return Observable.from(request.getQueryParameters().get("userId")).map(userId -> {
Map<String, Object> userData = new HashMap<>();
userData.put("user_id", userId);
userData.put("list_title", "Really quirky and over detailed list title!");
userData.put("other_data", "goes_here");
userData.put("videos", Arrays.asList(12345, 23456, 34567, 45678, 56789, 67890));
return userData;
}).flatMap(list -> response.writeStringAndFlush("data: " + SimpleJson.mapToJson(list) + "\n"))
.delay(((long) (Math.random() * 100) + 20), TimeUnit.MILLISECONDS)
.doOnCompleted(response::close); // simulate latency
}
项目:ReactiveLab
文件:SocialService.java
@Override
protected Observable<Void> handleRequest(HttpServerRequest<?> request, HttpServerResponse<ServerSentEvent> response) {
return Observable.from(request.getQueryParameters().get("userId")).map(userId -> {
Map<String, Object> user = new HashMap<>();
user.put("userId", userId);
user.put("friends", Arrays.asList(randomUser(), randomUser(), randomUser(), randomUser()));
return user;
}).flatMap(list -> response.writeStringAndFlush("data: " + SimpleJson.mapToJson(list) + "\n"))
.delay(((long) (Math.random() * 100) + 20), TimeUnit.MILLISECONDS).doOnCompleted(response::close); // simulate latency
}
项目:ReactiveLab
文件:VideoMetadataService.java
@Override
protected Observable<Void> handleRequest(HttpServerRequest<?> request, HttpServerResponse<ServerSentEvent> response) {
List<String> videoIds = request.getQueryParameters().get("videoId");
return Observable.from(videoIds).map(videoId -> {
Map<String, Object> video = new HashMap<>();
video.put("videoId", videoId);
video.put("title", "Video Title");
video.put("other_data", "goes_here");
return video;
}).flatMap(video -> response.writeStringAndFlush("data: " + SimpleJson.mapToJson(video) + "\n")
.doOnCompleted(response::close))
.delay(((long) (Math.random() * 20) + 20), TimeUnit.MILLISECONDS); // simulate latency
}
项目:ReactiveLab
文件:MockService.java
@Override
protected Observable<Void> handleRequest(HttpServerRequest<?> request, HttpServerResponse<ServerSentEvent> response) {
List<String> _id = request.getQueryParameters().get("id");
if (_id == null || _id.size() != 1) {
return writeError(request, response, "Please provide a numerical 'id' value. It can be a random number (uuid). Received => " + _id);
}
long id = Long.parseLong(String.valueOf(_id.get(0)));
int delay = getParameter(request, "delay", 50); // default to 50ms server-side delay
int itemSize = getParameter(request, "itemSize", 128); // default to 128 bytes item size (assuming ascii text)
int numItems = getParameter(request, "numItems", 10); // default to 10 items in a list
// no more than 100 items
if (numItems < 1 || numItems > 100) {
return writeError(request, response, "Please choose a 'numItems' value from 1 to 100.");
}
// no larger than 50KB per item
if (itemSize < 1 || itemSize > 1024 * 50) {
return writeError(request, response, "Please choose an 'itemSize' value from 1 to 1024*50 (50KB).");
}
// no larger than 60 second delay
if (delay < 0 || delay > 60000) {
return writeError(request, response, "Please choose a 'delay' value from 0 to 60000 (60 seconds).");
}
response.setStatus(HttpResponseStatus.OK);
return MockResponse.generateJson(id, delay, itemSize, numItems)
.flatMap(json -> response.writeStringAndFlush("data:" + json + "\n"))
.doOnCompleted(response::close);
}
项目:ReactiveLab
文件:RatingsService.java
@Override
protected Observable<Void> handleRequest(HttpServerRequest<?> request, HttpServerResponse<ServerSentEvent> response) {
List<String> videoIds = request.getQueryParameters().get("videoId");
return Observable.from(videoIds).map(videoId -> {
Map<String, Object> video = new HashMap<>();
video.put("videoId", videoId);
video.put("estimated_user_rating", 3.5);
video.put("actual_user_rating", 4);
video.put("average_user_rating", 3.1);
return video;
}).flatMap(video -> response.writeStringAndFlush("data: " + SimpleJson.mapToJson(video) + "\n"))
.delay(20, TimeUnit.MILLISECONDS).doOnCompleted(response::close); // simulate latenc
}
项目:ReactiveLab
文件:LoadBalancerFactory.java
public LoadBalancer<HttpClientHolder<ByteBuf, ServerSentEvent>> forVip(String targetVip) {
Observable<MembershipEvent<Host>> eurekaHostSource = membershipSource.forInterest(Interests.forVips(targetVip), instanceInfo -> {
String ipAddress = instanceInfo.getDataCenterInfo()
.getAddresses().stream()
.filter(na -> na.getProtocolType() == ProtocolType.IPv4)
.collect(Collectors.toList()).get(0).getIpAddress();
HashSet<ServicePort> servicePorts = instanceInfo.getPorts();
ServicePort portToUse = servicePorts.iterator().next();
return new Host(ipAddress, portToUse.getPort());
});
final Map<Host, HttpClientHolder<ByteBuf, ServerSentEvent>> hostVsHolders = new ConcurrentHashMap<>();
String lbName = targetVip + "-lb";
return LoadBalancers.newBuilder(eurekaHostSource.map(
hostEvent -> {
HttpClient<ByteBuf, ServerSentEvent> client = clientPool.getClientForHost(hostEvent.getClient());
HttpClientHolder<ByteBuf, ServerSentEvent> holder;
if (hostEvent.getType() == MembershipEvent.EventType.REMOVE) {
holder = hostVsHolders.remove(hostEvent.getClient());
} else {
holder = new HttpClientHolder<>(client);
hostVsHolders.put(hostEvent.getClient(), holder);
}
return new MembershipEvent<>(hostEvent.getType(), holder);
})).withWeightingStrategy(new LinearWeightingStrategy<>(new RxNettyPendingRequests<>()))
.withName(lbName)
.withFailureDetector(new RxNettyFailureDetector<>()).build();
}
项目:ReactiveLab
文件:PersonalizedCatalogCommand.java
@Override
protected Observable<Catalog> construct() {
HttpClientRequest<ByteBuf> request = HttpClientRequest.createGet("/catalog?" + UrlGenerator.generate("userId", users));
return loadBalancer.choose()
.map(holder -> holder.getClient())
.<Catalog>flatMap(client -> client.submit(request)
.flatMap(r -> r.getContent()
.map((ServerSentEvent sse) -> Catalog.fromJson(sse.contentAsString()))))
.retry(1);
}
项目:ReactiveLab
文件:SocialCommand.java
@Override
protected Observable<Social> construct() {
HttpClientRequest<ByteBuf> request = HttpClientRequest.createGet("/social?" + UrlGenerator.generate("userId", users));
return loadBalancer.choose().map(holder -> holder.getClient())
.<Social>flatMap(client -> client.submit(request)
.flatMap(r -> r.getContent().map((ServerSentEvent sse) -> {
String social = sse.contentAsString();
return Social.fromJson(social);
})))
.retry(1);
}
项目:ReactiveLab
文件:GeoCommand.java
@Override
protected Observable<GeoIP> construct() {
HttpClientRequest<ByteBuf> request = HttpClientRequest.createGet("/geo?" + UrlGenerator.generate("ip", ips));
return loadBalancer.choose()
.map(holder -> holder.getClient())
.<GeoIP>flatMap(client -> client.submit(request)
.flatMap(r -> r.getContent()
.map((ServerSentEvent sse) -> GeoIP.fromJson(sse.contentAsString()))))
.retry(1);
}
项目:ReactiveLab
文件:UserCommand.java
@Override
protected Observable<User> construct() {
HttpClientRequest<ByteBuf> request = HttpClientRequest.createGet("/user?" + UrlGenerator.generate("userId", userIds));
return loadBalancer.choose().map(holder -> holder.getClient())
.<User>flatMap(client -> client.submit(request)
.flatMap(r -> r.getContent().map(
(ServerSentEvent sse) -> {
String user = sse.contentAsString();
return User.fromJson(user);
})))
.retry(1);
}
项目:ReactiveLab
文件:VideoMetadataCommand.java
@Override
protected Observable<VideoMetadata> construct() {
HttpClientRequest<ByteBuf> request = HttpClientRequest.createGet("/metadata?" + UrlGenerator.generate("videoId",
videos));
return loadBalancer.choose()
.map(holder -> holder.getClient())
.<VideoMetadata>flatMap(client -> client.submit(request)
.flatMap(r -> r.getContent()
.map((ServerSentEvent sse) -> VideoMetadata.fromJson(sse.contentAsString()))))
.retry(1);
}
项目:ReactiveLab
文件:RatingsCommand.java
@Override
protected Observable<Rating> construct() {
HttpClientRequest<ByteBuf> request = HttpClientRequest.createGet("/ratings?" + UrlGenerator.generate("videoId", videos));
return loadBalancer.choose()
.map(holder -> holder.getClient())
.<Rating>flatMap(client -> client.submit(request)
.flatMap(r -> r.getContent()
.map((ServerSentEvent sse) -> Rating.fromJson(sse.contentAsString()))))
.retry(1);
}
项目:ReactiveLab
文件:BookmarksCommand.java
public BookmarksCommand(List<Video> videos, LoadBalancer<HttpClientHolder<ByteBuf, ServerSentEvent>> loadBalancer) {
super(HystrixCommandGroupKey.Factory.asKey("GetBookmarks"));
this.videos = videos;
this.loadBalancer = loadBalancer;
StringBuilder b = new StringBuilder();
for (Video v : videos) {
b.append(v.getId()).append("-");
}
this.cacheKey = b.toString();
}
项目:ReactiveLab
文件:BookmarksCommand.java
@Override
public Observable<Bookmark> construct() {
HttpClientRequest<ByteBuf> request = HttpClientRequest.createGet("/bookmarks?" + UrlGenerator.generate("videoId", videos));
return loadBalancer.choose()
.map(holder -> holder.getClient())
.<Bookmark>flatMap(client -> client.submit(request)
.flatMap(r -> r.getContent().map((ServerSentEvent sse) -> Bookmark.fromJson(sse.contentAsString()))))
.retry(1);
}
项目:ReactiveLab
文件:LoadBalancerFactory.java
public LoadBalancerFactory(EurekaMembershipSource membershipSource,
HttpClientPool<ByteBuf, ServerSentEvent> clientPool) {
this.membershipSource = membershipSource;
this.clientPool = clientPool;
}
项目:ReactiveLab
文件:AbstractMiddleTierService.java
protected abstract Observable<Void> handleRequest(HttpServerRequest<?> request, HttpServerResponse<ServerSentEvent> response);