/** * Setups the test environment. * * @throws Exception if any error occurs */ @Before public void setUp() throws Exception { instance = new HealthCheckTaskExecutor(); server = RxNetty.createHttpServer(PORT, new RequestHandler<ByteBuf, ByteBuf>() { @Override public Observable<Void> handle(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) { if ("/health".equals(request.getPath())) { return response.writeStringAndFlush("{\"status\": \"UP\", \"service\": {\"status\": \"UP\"}}"); } response.setStatus(HttpResponseStatus.NOT_FOUND); return response.close(); } }).start(); }
@Setup public void setup() { server = RxNetty.createHttpServer(SERVER_PORT, new RequestHandler<ByteBuf, ByteBuf>() { public rx.Observable handle(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) { return response.flush(); } }); server.start(); client = new OkHttpClient(); client.setRetryOnConnectionFailure(false); okFeign = Feign.builder() .client(new feign.okhttp.OkHttpClient(client)) .target(FeignTestInterface.class, "http://localhost:" + SERVER_PORT); queryRequest = new Request.Builder() .url("http://localhost:" + SERVER_PORT + "/?Action=GetUser&Version=2010-05-08&limit=1") .build(); }
private Observable<Void> getIntervalObservable(HttpServerRequest<?> request, final HttpServerResponse<ServerSentEvent> response) { HttpRequest simpleRequest = new HttpRequest(request.getQueryParameters()); return getEvents(simpleRequest) .flatMap(event -> { System.out.println("Writing SSE event: " + event); ByteBuf data = response.getAllocator().buffer().writeBytes(( event + "\n").getBytes()); ServerSentEvent sse = new ServerSentEvent(data); return response.writeAndFlush(sse); }).materialize() .takeWhile(notification -> { if (notification.isOnError()) { System.out.println("Write to client failed, stopping response sending."); notification.getThrowable().printStackTrace(System.err); } return !notification.isOnError(); }) .map((Func1<Notification<Void>, Void>) notification -> null); }
@Override public Observable<Void> handle(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) { if (alarmService == null) { response.setStatus(HttpResponseStatus.SERVICE_UNAVAILABLE); return response.close(); } if (HttpMethod.GET.equals(request.getHttpMethod())) { handleGet(response, request.getUri()); } else if (HttpMethod.PUT.equals(request.getHttpMethod())) { handlePut(response, request.getContent()); } else if (HttpMethod.DELETE.equals(request.getHttpMethod())) { handleDelete(response, request.getUri()); } else { response.setStatus(HttpResponseStatus.NOT_IMPLEMENTED); } return response.close(); }
private void handleGet(HttpServerResponse<ByteBuf> response, String uri) { String[] parts = uri.substring(1).split("/"); ObjectMapper mapper = new ObjectMapper(); try { if (parts.length == 1) { response.writeBytes(mapper.writeValueAsBytes(alarmService.getAlarms())); } else if (parts.length == 2) { String alarmName = URLDecoder.decode(parts[1], "UTF-8"); WeatherAlarm alarm = alarmService.getAlarm(alarmName); if (alarm != null) { response.writeBytes(mapper.writeValueAsBytes(alarm)); } else { logger.debug("No alarm found with name " + alarmName); response.setStatus(HttpResponseStatus.NOT_FOUND); } } else { logger.error("Unsupported resource request " + uri); response.setStatus(HttpResponseStatus.NOT_FOUND); } } catch (IOException e) { logger.error("Failed to write JSON to response", e); response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR); } }
private void handleDelete(HttpServerResponse<ByteBuf> response, String uri) { String[] parts = uri.substring(1).split("/"); try { if (parts.length == 1) { //Not allowed to delete all alarms response.setStatus(HttpResponseStatus.UNAUTHORIZED); } else if (parts.length == 2) { String alarmName = URLDecoder.decode(parts[1], "UTF-8"); boolean removed = alarmService.removeAlarm(alarmName); if (!removed) { logger.debug("No alarm found with name " + alarmName); response.setStatus(HttpResponseStatus.NOT_FOUND); } } else { logger.error("Unsupported resource request " + uri); response.setStatus(HttpResponseStatus.NOT_FOUND); } } catch (IOException e) { logger.error("Failed to write JSON to response", e); response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR); } }
@Test public void testHandleRequestForAlarm() throws Exception { IWeatherAlarmService alarmService = getMockAlarmService(); WeatherAlarmEndpoint alarmEndpoint = new WeatherAlarmEndpoint(); alarmEndpoint.setAlarmService(alarmService); WeatherAlarm alarm = alarmService.getAlarms().get(0); Capture<byte[]> written = EasyMock.newCapture(); Capture<HttpResponseStatus> status = EasyMock.newCapture(); String uri = URI + "/" + URLEncoder.encode(alarm.getName(), "UTF-8"); HttpServerRequest<ByteBuf> request = createMockHttpServerRequest(HttpMethod.GET, uri, Observable.empty()); HttpServerResponse<ByteBuf> response = createMockHttpResponse(status, written); alarmEndpoint.handle(request, response); byte[] expected = new ObjectMapper().writeValueAsBytes(alarm); Assert.assertTrue("Unexpected value written", Arrays.equals(expected, written.getValue())); }
@Test public void testHandleRequestForAddAlarm() throws Exception { IWeatherAlarmService alarmService = getEmptyAlarmService(); WeatherAlarmEndpoint alarmEndpoint = new WeatherAlarmEndpoint(); alarmEndpoint.setAlarmService(alarmService); WeatherAlarm alarm = createWeatherAlarm(); Capture<byte[]> written = EasyMock.newCapture(); Capture<HttpResponseStatus> status = EasyMock.newCapture(); HttpServerRequest<ByteBuf> request = createMockHttpServerRequest(HttpMethod.PUT, URI, createContent(new ObjectMapper().writeValueAsBytes(alarm))); HttpServerResponse<ByteBuf> response = createMockHttpResponse(status, written); alarmEndpoint.handle(request, response); Assert.assertTrue("Alarm not added from list " + alarm, alarmService.getAlarm(alarm.getName()) != null); }
@Test public void testHandleRequestForDeleteAlarm() throws Exception { IWeatherAlarmService alarmService = getMockAlarmService(); WeatherAlarmEndpoint alarmEndpoint = new WeatherAlarmEndpoint(); alarmEndpoint.setAlarmService(alarmService); WeatherAlarm alarm = alarmService.getAlarms().get(0); Capture<byte[]> written = EasyMock.newCapture(); Capture<HttpResponseStatus> status = EasyMock.newCapture(); String encodedAlarmName = URLEncoder.encode(alarm.getName(), "UTF-8"); String uri = URI + "/" + encodedAlarmName; HttpServerRequest<ByteBuf> request = createMockHttpServerRequest(HttpMethod.DELETE, uri, Observable.empty()); HttpServerResponse<ByteBuf> response = createMockHttpResponse(status, written); alarmEndpoint.handle(request, response); Assert.assertTrue("Alarm not deleted from list " + alarm, !alarmService.getAlarms().contains(alarm)); }
/** * This endpoint will forward the post data to the selected marathon server. * * TODO: Move logic from here * * @param request * @param response * @return response to be send to the caller */ @Override public Observable<Void> postApps(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) { return triathlonService.parseJson(request.getContent()) .flatMap(this::matchDataCenter) .flatMap(content -> { response.write(content); return response.close(); }) .onErrorResumeNext(throwable -> { LOGGER.info("Service ERROR"); response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR); return response.close(); }) .doOnCompleted(() -> response.close(true)); }
public HttpServer<ByteBuf, ByteBuf> createServer() { HttpServer<ByteBuf, ByteBuf> server = RxNetty.newHttpServerBuilder(port, new RequestHandler<ByteBuf, ByteBuf>() { @Override public Observable<Void> handle(HttpServerRequest<ByteBuf> request, final HttpServerResponse<ByteBuf> response) { if (request.getPath().contains("/v2/apps")) { if (request.getHttpMethod().equals(HttpMethod.POST)) { return handleTest(request, response); } } response.setStatus(HttpResponseStatus.NOT_FOUND); return response.close(); } }).pipelineConfigurator(PipelineConfigurators.<ByteBuf, ByteBuf>httpServerConfigurator()).enableWireLogging(LogLevel.ERROR).build(); System.out.println("RxTetstServer server started..."); return server; }
@Provides public HttpServer providesKaryonTransport() { SimpleUriRouter simpleUriRouter = new SimpleUriRouter(); simpleUriRouter.addUri("/foo", new RequestHandler() { @Override public Observable<Void> handle(HttpServerRequest request, HttpServerResponse response) { response.writeAndFlush("Hello World", StringTransformer.DEFAULT_INSTANCE); response.setStatus(HttpResponseStatus.OK); return Observable.empty(); } @Override public Observable<Void> handle(Object input, Object output) { return Observable.empty(); } }); return KaryonTransport.newHttpServer(8888, simpleUriRouter); }
private Observable<Void> handleHystrixRequest(final HttpServerResponse<O> response) { writeHeaders(response); final Subject<Void, Void> subject = PublishSubject.create(); final MultipleAssignmentSubscription subscription = new MultipleAssignmentSubscription(); Subscription actionSubscription = Observable.timer(0, interval, TimeUnit.MILLISECONDS, Schedulers.computation()) .subscribe(new Action1<Long>() { @Override public void call(Long tick) { if (!response.getChannel().isOpen()) { subscription.unsubscribe(); return; } try { writeMetric(JsonMapper.toJson(metrics), response); } catch (Exception e) { subject.onError(e); } } }); subscription.set(actionSubscription); return subject; }
@Override protected Observable<Void> handleRequest(HttpServerRequest<?> request, HttpServerResponse<ServerSentEvent> response) { List<String> videoIds = request.getQueryParameters().get("videoId"); int latency = 1; if (Random.randomIntFrom0to100() > 80) { latency = 10; } return Observable.from(videoIds).map(videoId -> { Map<String, Object> video = new HashMap<>(); video.put("videoId", videoId); video.put("position", (int) (Math.random() * 5000)); return video; }).flatMap(video -> response.writeStringAndFlush("data: " + SimpleJson.mapToJson(video) + "\n")) .delay(latency, TimeUnit.MILLISECONDS).doOnCompleted(response::close); // simulate latency }
@Override protected Observable<Void> handleRequest(HttpServerRequest<?> request, HttpServerResponse<ServerSentEvent> response) { return request.getContent().flatMap(i -> { List<String> ips = request.getQueryParameters().get("ip"); Map<String, Object> data = new HashMap<>(); for (String ip : ips) { Map<String, Object> ip_data = new HashMap<>(); ip_data.put("country_code", "GB"); ip_data.put("longitude", "-0.13"); ip_data.put("latitude", "51.5"); data.put(ip, ip_data); } return response.writeStringAndFlush("data: " + SimpleJson.mapToJson(data) + "\n") .doOnCompleted(response::close); }).delay(10, TimeUnit.MILLISECONDS); }
@Override protected Observable<Void> handleRequest(HttpServerRequest<?> request, HttpServerResponse<ServerSentEvent> response) { List<String> userIds = request.getQueryParameters().get("userId"); if (userIds == null || userIds.size() == 0) { return writeError(request, response, "At least one parameter of 'userId' must be included."); } return Observable.from(userIds).map(userId -> { Map<String, Object> user = new HashMap<>(); user.put("userId", userId); user.put("name", "Name Here"); user.put("other_data", "goes_here"); return user; }).flatMap(user -> response.writeStringAndFlush("data: " + SimpleJson.mapToJson(user) + "\n") .doOnCompleted(response::close)) .delay(((long) (Math.random() * 500) + 500), TimeUnit.MILLISECONDS); // simulate latency }
public Observable<Void> handle(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) { List<String> userId = request.getQueryParameters().get("userId"); if (userId == null || userId.size() != 1) { return StartGatewayServer.writeError(request, response, "A single 'userId' is required."); } return new UserCommand(userId).observe().flatMap(user -> { Observable<Map<String, Object>> catalog = new PersonalizedCatalogCommand(user).observe() .flatMap(catalogList -> catalogList.videos().<Map<String, Object>> flatMap( video -> { Observable<Bookmark> bookmark = new BookmarkCommand(video).observe(); Observable<Rating> rating = new RatingsCommand(video).observe(); Observable<VideoMetadata> metadata = new VideoMetadataCommand(video).observe(); return Observable.zip(bookmark, rating, metadata, (b, r, m) -> combineVideoData(video, b, r, m)); })); Observable<Map<String, Object>> social = new SocialCommand(user).observe().map(s -> { return s.getDataAsMap(); }); return Observable.merge(catalog, social); }).flatMap(data -> { String json = SimpleJson.mapToJson(data); return response.writeStringAndFlush("data: " + json + "\n"); }); }
public HttpServer<ByteBuf, ByteBuf> createServer() { HttpServer<ByteBuf, ByteBuf> server = RxNetty.newHttpServerBuilder(port, new RequestHandler<ByteBuf, ByteBuf>() { @Override public Observable<Void> handle(HttpServerRequest<ByteBuf> request, final HttpServerResponse<ByteBuf> response) { if (request.getPath().contains("/users")) { if (request.getHttpMethod().equals(HttpMethod.GET)) { return handleRecommendationsByUserId(request, response); } else { return handleUpdateRecommendationsForUser(request, response); } } if (request.getPath().contains("/recommendations")) { return handleRecommendationsBy(request, response); } if (request.getPath().contains("/movies")) { return handleRegisterMovie(request, response); } response.setStatus(HttpResponseStatus.NOT_FOUND); return response.close(); } }).pipelineConfigurator(PipelineConfigurators.<ByteBuf, ByteBuf>httpServerConfigurator()).enableWireLogging(LogLevel.ERROR).build(); System.out.println("RxMovie server started..."); return server; }
private Observable<Void> handleRecommendationsByUserId(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) { System.out.println("HTTP request -> recommendations by user id request: " + request.getPath()); final String userId = userIdFromPath(request.getPath()); if (userId == null) { response.setStatus(HttpResponseStatus.BAD_REQUEST); return response.close(); } if (!userRecommendations.containsKey(userId)) { response.setStatus(HttpResponseStatus.NOT_FOUND); return response.close(); } StringBuilder builder = new StringBuilder(); for (String movieId : userRecommendations.get(userId)) { System.out.println(" returning: " + movies.get(movieId)); builder.append(movies.get(movieId)).append('\n'); } ByteBuf byteBuf = UnpooledByteBufAllocator.DEFAULT.buffer(); byteBuf.writeBytes(builder.toString().getBytes(Charset.defaultCharset())); response.write(byteBuf); return response.close(); }
private Observable<Void> handleRegisterMovie(HttpServerRequest<ByteBuf> request, final HttpServerResponse<ByteBuf> response) { System.out.println("Http request -> register movie: " + request.getPath()); return request.getContent().flatMap(new Func1<ByteBuf, Observable<Void>>() { @Override public Observable<Void> call(ByteBuf byteBuf) { String formatted = byteBuf.toString(Charset.defaultCharset()); System.out.println(" movie: " + formatted); try { Movie movie = Movie.from(formatted); movies.put(movie.getId(), movie); response.setStatus(HttpResponseStatus.CREATED); } catch (Exception e) { System.err.println("Invalid movie content"); e.printStackTrace(); response.setStatus(HttpResponseStatus.BAD_REQUEST); } return response.close(); } }); }
@Override public void configureNewPipeline(ChannelPipeline pipeline) { serverPipelineConfigurator.configureNewPipeline(pipeline); pipeline.addLast(SSE_ENCODER_HANDLER_NAME, SERVER_SENT_EVENT_ENCODER); pipeline.addLast(SSE_RESPONSE_HEADERS_COMPLETER, new ChannelOutboundHandlerAdapter() { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { if (HttpServerResponse.class.isAssignableFrom(msg.getClass())) { @SuppressWarnings("rawtypes") HttpServerResponse rxResponse = (HttpServerResponse) msg; String contentTypeHeader = rxResponse.getHeaders().get(CONTENT_TYPE); if (null == contentTypeHeader) { rxResponse.getHeaders().set(CONTENT_TYPE, "text/event-stream"); } } super.write(ctx, msg, promise); } }); }
public Observable<Void> simulateTimeout(HttpServerRequest<ByteBuf> httpRequest, HttpServerResponse<ByteBuf> response) { String uri = httpRequest.getUri(); QueryStringDecoder decoder = new QueryStringDecoder(uri); List<String> timeout = decoder.parameters().get("timeout"); byte[] contentBytes; HttpResponseStatus status = HttpResponseStatus.NO_CONTENT; if (null != timeout && !timeout.isEmpty()) { try { Thread.sleep(Integer.parseInt(timeout.get(0))); contentBytes = "".getBytes(); } catch (Exception e) { contentBytes = e.getMessage().getBytes(); status = HttpResponseStatus.INTERNAL_SERVER_ERROR; } } else { status = HttpResponseStatus.BAD_REQUEST; contentBytes = "Please provide a timeout parameter.".getBytes(); } response.setStatus(status); return response.writeBytesAndFlush(contentBytes); }
@Override public Observable<Void> handle(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) { String uri = request.getUri(); if (uri.startsWith("test/singleEntity")) { return handleSingleEntity(response); } else if (uri.startsWith("test/stream")) { return handleStream(response); } else if (uri.startsWith("test/nochunk_stream")) { return handleStreamWithoutChunking(response); } else if (uri.startsWith("test/largeStream")) { return handleLargeStream(response); } else if (uri.startsWith("test/timeout")) { return simulateTimeout(request, response); } else if (uri.startsWith("test/post")) { return handlePost(request, response); } else { response.setStatus(HttpResponseStatus.NOT_FOUND); return response.flush(); } }
public GovernatorHttpInterceptorSupport<I, O> intercept(List<Class<? extends DuplexInterceptor<HttpServerRequest<I>, HttpServerResponse<O>>>> interceptors) { ArrayList<Class<? extends InboundInterceptor<HttpServerRequest<I>, HttpServerResponse<O>>>> ins = new ArrayList<Class<? extends InboundInterceptor<HttpServerRequest<I>, HttpServerResponse<O>>>>(); ArrayList<Class<? extends OutboundInterceptor<HttpServerResponse<O>>>> outs = new ArrayList<Class<? extends OutboundInterceptor<HttpServerResponse<O>>>>(); for (Class<? extends DuplexInterceptor<HttpServerRequest<I>, HttpServerResponse<O>>> interceptor : interceptors) { ins.add(interceptor); outs.add(interceptor); } HttpInClassHolder<I, O> inHolder = new HttpInClassHolder<I, O>(key, ins); interceptorSupport.inboundInterceptorClasses.add(inHolder); HttpOutClassHolder<I, O> outHolder = new HttpOutClassHolder<I, O>(key, outs); interceptorSupport.outboundInterceptorClasses.add(outHolder); return interceptorSupport; }
ContainerResponseWriter bridgeResponse(final HttpServerResponse<ByteBuf> serverResponse) { return new ContainerResponseWriter() { private final ByteBuf contentBuffer = serverResponse.getChannel().alloc().buffer(); @Override public OutputStream writeStatusAndHeaders(long contentLength, ContainerResponse response) { int responseStatus = response.getStatus(); serverResponse.setStatus(HttpResponseStatus.valueOf(responseStatus)); HttpResponseHeaders responseHeaders = serverResponse.getHeaders(); for(Map.Entry<String, List<Object>> header : response.getHttpHeaders().entrySet()){ responseHeaders.setHeader(header.getKey(), header.getValue()); } return new ByteBufOutputStream(contentBuffer); } @Override public void finish() { serverResponse.writeAndFlush(contentBuffer); } }; }
@Override public Observable<Void> handle(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) { if (request.getUri().startsWith(healthCheckUri)) { return healthCheckEndpoint.handle(request, response); } else if (request.getUri().startsWith("/hello/to/")) { int prefixLength = "/hello/to".length(); String userName = request.getPath().substring(prefixLength); if (userName.isEmpty() || userName.length() == 1 /*The uri is /hello/to/ but no name */) { response.setStatus(HttpResponseStatus.BAD_REQUEST); return response.writeStringAndFlush( "{\"Error\":\"Please provide a username to say hello. The URI should be /hello/to/{username}\"}"); } else { String msg = "Hello " + userName.substring(1) /*Remove the / prefix*/ + " from Netflix OSS"; return response.writeStringAndFlush("{\"Message\":\"" + msg + "\"}"); } } else if (request.getUri().startsWith("/hello")) { return response.writeStringAndFlush("{\"Message\":\"Hello newbee from Netflix OSS\"}"); } else { response.setStatus(HttpResponseStatus.NOT_FOUND); return response.close(); } }
public Observable<Void> sayHelloToUser(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) { JSONObject content = new JSONObject(); int prefixLength = "/hello/to".length(); String userName = request.getPath().substring(prefixLength); try { if (userName.isEmpty() || userName.length() == 1 /*The uri is /hello/to/ but no name */) { response.setStatus(HttpResponseStatus.BAD_REQUEST); content.put("Error", "Please provide a username to say hello. The URI should be /hello/to/{username}"); } else { content.put("Message", "Hello " + userName.substring(1) /*Remove the / prefix*/ + " from Netflix OSS"); } } catch (JSONException e) { logger.error("Error creating json response.", e); return Observable.error(e); } response.write(content.toString(), StringTransformer.DEFAULT_INSTANCE); return response.close(); }
public static void main(String... args) throws InterruptedException { HttpServer<ByteBuf, ByteBuf> server = RxNetty.createHttpServer(8080, (HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) -> { System.out.println("Server => Request: " + request.getPath()); try { if ("/error".equals(request.getPath())) { throw new RuntimeException("forced error"); } response.setStatus(HttpResponseStatus.OK); response.writeString("Path Requested =>: " + request.getPath() + '\n'); return response.close(); } catch (Throwable e) { System.err.println("Server => Error [" + request.getPath() + "] => " + e); response.setStatus(HttpResponseStatus.BAD_REQUEST); response.writeString("Error 500: Bad Request\n"); return response.close(); } }); server.startAndWait(); RxNetty.createHttpGet("http://localhost:8080/") .flatMap(response -> response.getContent()) .map(data -> "Client => " + data.toString(Charset.defaultCharset())) .toBlocking().forEach(System.out::println); RxNetty.createHttpGet("http://localhost:8080/error") .flatMap(response -> response.getContent()) .map(data -> "Client => " + data.toString(Charset.defaultCharset())) .toBlocking().forEach(System.out::println); RxNetty.createHttpGet("http://localhost:8080/data") .flatMap(response -> response.getContent()) .map(data -> "Client => " + data.toString(Charset.defaultCharset())) .toBlocking().forEach(System.out::println); //server.shutdown(); }
private void handlePut(HttpServerResponse<ByteBuf> response, Observable<ByteBuf> content) { ObjectMapper mapper = new ObjectMapper(); content.forEach(byteBuf -> { try { WeatherAlarm alarm = mapper.readValue(byteBuf.toString(Charset.defaultCharset()), WeatherAlarm.class); alarmService.addAlarm(alarm); } catch (IOException e) { logger.error("Failed to read JSON from request", e); response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR); } }); }
@Override public Observable<Void> handle(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) { RequestHandler<ByteBuf, ByteBuf> handler = findRequestHandler(request.getUri()); if (handler != null) { return handler.handle(request, response); } else { response.setStatus(HttpResponseStatus.NOT_FOUND); return response.close(); } }
@Test public void testNotImplemented() { IWeatherAlarmService alarmService = getMockAlarmService(); WeatherAlarmEndpoint alarmEndpoint = new WeatherAlarmEndpoint(); alarmEndpoint.setAlarmService(alarmService); Capture<byte[]> written = EasyMock.newCapture(); Capture<HttpResponseStatus> status = EasyMock.newCapture(); HttpServerRequest<ByteBuf> request = createMockHttpServerRequest(HttpMethod.POST, URI, Observable.empty()); HttpServerResponse<ByteBuf> response = createMockHttpResponse(status, written); alarmEndpoint.handle(request, response); HttpResponseStatus expected = HttpResponseStatus.NOT_IMPLEMENTED; Assert.assertEquals("Unexpected value for status", expected, status.getValue()); }
@Test public void testHandleRequestForAlarms() throws Exception { IWeatherAlarmService alarmService = getMockAlarmService(); WeatherAlarmEndpoint alarmEndpoint = new WeatherAlarmEndpoint(); alarmEndpoint.setAlarmService(alarmService); Capture<byte[]> written = EasyMock.newCapture(); Capture<HttpResponseStatus> status = EasyMock.newCapture(); HttpServerRequest<ByteBuf> request = createMockHttpServerRequest(HttpMethod.GET, URI, Observable.empty()); HttpServerResponse<ByteBuf> response = createMockHttpResponse(status, written); alarmEndpoint.handle(request, response); byte[] expected = new ObjectMapper().writeValueAsBytes(alarmService.getAlarms()); Assert.assertTrue("Unexpected value written", Arrays.equals(expected, written.getValue())); }
@Test public void testHandleRequestForAlarmNotFound() throws Exception { IWeatherAlarmService alarmService = getMockAlarmService(); WeatherAlarmEndpoint alarmEndpoint = new WeatherAlarmEndpoint(); alarmEndpoint.setAlarmService(alarmService); WeatherAlarm alarm = alarmService.getAlarms().get(0); Capture<byte[]> written = EasyMock.newCapture(); Capture<HttpResponseStatus> status = EasyMock.newCapture(); String uri = URI + "/unknownAlarm"; HttpServerRequest<ByteBuf> request = createMockHttpServerRequest(HttpMethod.GET, uri, Observable.empty()); HttpServerResponse<ByteBuf> response = createMockHttpResponse(status, written); alarmEndpoint.handle(request, response); Assert.assertEquals("Unexpected status", HttpResponseStatus.NOT_FOUND, status.getValue()); }