private void flush() { int drained; final List<InputLogEvent> logEvents = new ArrayList<>(AWS_DRAIN_LIMIT); do { drained = queue.drainTo(logEvents, AWS_DRAIN_LIMIT); if (logEvents.isEmpty()) { break; } logEvents.sort(Comparator.comparing(InputLogEvent::getTimestamp)); if (lastReportedTimestamp > 0) { for (final InputLogEvent event : logEvents) { if (event.getTimestamp() < lastReportedTimestamp) { event.setTimestamp(lastReportedTimestamp); } } } lastReportedTimestamp = logEvents.get(logEvents.size() - 1).getTimestamp(); final PutLogEventsRequest putLogEventsRequest = new PutLogEventsRequest(logGroupName, logStreamName, logEvents); putLogEventsRequest.setSequenceToken(sequenceTokenCache); try { final PutLogEventsResult putLogEventsResult = awsLogsClient.putLogEvents(putLogEventsRequest); sequenceTokenCache = putLogEventsResult.getNextSequenceToken(); } catch (final DataAlreadyAcceptedException daae) { sequenceTokenCache = daae.getExpectedSequenceToken(); } catch (final InvalidSequenceTokenException iste) { sequenceTokenCache = iste.getExpectedSequenceToken(); } catch (final Exception e) { LOGGER.error(e.getMessage(), e); } logEvents.clear(); } while (drained >= AWS_DRAIN_LIMIT); }
@Test public void testDiscardOldest() throws Exception { initialize("TestCloudWatchAppender/testDiscardOldest.properties"); // this is a dummy client: never actually run the writer thread, but // need to test the real writer MockCloudWatchClient mockClient = new MockCloudWatchClient() { @Override protected PutLogEventsResult putLogEvents(PutLogEventsRequest request) { throw new IllegalStateException("should never be called"); } }; appender.setThreadFactory(new NullThreadFactory()); appender.setWriterFactory(mockClient.newWriterFactory()); for (int ii = 0 ; ii < 20 ; ii++) { logger.debug("message " + ii); } List<LogMessage> messages = appender.getMessageQueue().toList(); assertEquals("number of messages in queue", 10, messages.size()); assertEquals("oldest message", "message 10\n", messages.get(0).getMessage()); assertEquals("newest message", "message 19\n", messages.get(9).getMessage()); }
@Test public void testDiscardNewest() throws Exception { initialize("TestCloudWatchAppender/testDiscardNewest.properties"); // this is a dummy client: never actually run the writer thread, but // need to test the real writer MockCloudWatchClient mockClient = new MockCloudWatchClient() { @Override protected PutLogEventsResult putLogEvents(PutLogEventsRequest request) { throw new IllegalStateException("should never be called"); } }; appender.setThreadFactory(new NullThreadFactory()); appender.setWriterFactory(mockClient.newWriterFactory()); for (int ii = 0 ; ii < 20 ; ii++) { logger.debug("message " + ii); } List<LogMessage> messages = appender.getMessageQueue().toList(); assertEquals("number of messages in queue", 10, messages.size()); assertEquals("oldest message", "message 0\n", messages.get(0).getMessage()); assertEquals("newest message", "message 9\n", messages.get(9).getMessage()); }
@Test public void testDiscardNone() throws Exception { initialize("TestCloudWatchAppender/testDiscardNone.properties"); // this is a dummy client: we never actually run the writer thread, but // need to test the real writer MockCloudWatchClient mockClient = new MockCloudWatchClient() { @Override protected PutLogEventsResult putLogEvents(PutLogEventsRequest request) { throw new IllegalStateException("should never be called"); } }; appender.setThreadFactory(new NullThreadFactory()); appender.setWriterFactory(mockClient.newWriterFactory()); for (int ii = 0 ; ii < 20 ; ii++) { logger.debug("message " + ii); } List<LogMessage> messages = appender.getMessageQueue().toList(); assertEquals("number of messages in queue", 20, messages.size()); assertEquals("oldest message", "message 0\n", messages.get(0).getMessage()); assertEquals("newest message", "message 19\n", messages.get(19).getMessage()); }
private void sendEvent(String message) { List<InputLogEvent> logEvents = new LinkedList<>(); logEvents.add(new InputLogEvent().withTimestamp(new Date().getTime()).withMessage(message)); PutLogEventsRequest putLogEventsRequest = new PutLogEventsRequest(logGroupName, logStreamName, logEvents); putLogEventsRequest.setSequenceToken(lastSequenceToken); PutLogEventsResult putLogEventsResult = awsLogsClient.putLogEvents(putLogEventsRequest); lastSequenceToken = putLogEventsResult.getNextSequenceToken(); }
@Test(timeout = 5000) public void testBasic() throws InterruptedException { CloudWatchAppender appender = new CloudWatchAppender(); AWSLogsClient awsLogClient = createMock(AWSLogsClient.class); appender.setAwsLogsClient(awsLogClient); appender.setMaxBatchSize(1); appender.setRegion("region"); final String logGroup = "pfqoejpfqe"; appender.setLogGroup(logGroup); final String logStream = "pffqjfqjpoqoejpfqe"; appender.setLogStream(logStream); PatternLayout layout = new PatternLayout(); layout.setContext(new LoggerContext()); layout.setPattern("[%thread] %level %logger{20} - %msg%n%xThrowable"); layout.start(); appender.setLayout(layout); LoggingEvent event = new LoggingEvent(); event.setTimeStamp(System.currentTimeMillis()); String loggerName = "name"; event.setLoggerName(loggerName); Level level = Level.DEBUG; event.setLevel(level); String message = "fjpewjfpewjfpewjfepowf"; event.setMessage(message); String threadName = Thread.currentThread().getName(); final String fullMessage = "[" + threadName + "] " + level + " " + loggerName + " - " + message + "\n"; final PutLogEventsResult result = new PutLogEventsResult(); String sequence = "ewopjfewfj"; result.setNextSequenceToken(sequence); expect(awsLogClient.putLogEvents(isA(PutLogEventsRequest.class))).andAnswer(new IAnswer<PutLogEventsResult>() { @Override public PutLogEventsResult answer() { PutLogEventsRequest request = (PutLogEventsRequest) getCurrentArguments()[0]; assertEquals(logGroup, request.getLogGroupName()); assertEquals(logStream, request.getLogStreamName()); List<InputLogEvent> events = request.getLogEvents(); assertEquals(1, events.size()); assertEquals(fullMessage, events.get(0).getMessage()); return result; } }).times(2); awsLogClient.shutdown(); // ===================================== replay(awsLogClient); appender.start(); // for coverage appender.start(); appender.append(event); Thread.sleep(10); appender.append(event); while (appender.getEventsWrittenCount() < 2) { Thread.sleep(10); } appender.stop(); verify(awsLogClient); }
@Test public void testMessageErrorHandling() throws Exception { // WARNING: this test may break if the internal implementation changes initialize("TestCloudWatchAppender/testMessageErrorHandling.properties"); // the mock client -- will throw on odd invocations MockCloudWatchClient mockClient = new MockCloudWatchClient() { @Override protected PutLogEventsResult putLogEvents(PutLogEventsRequest request) { if (putLogEventsInvocationCount % 2 == 1) { throw new TestingException("anything"); } else { return super.putLogEvents(request); } } }; appender.setThreadFactory(new DefaultThreadFactory()); appender.setWriterFactory(mockClient.newWriterFactory()); for (int ii = 0 ; ii < 10 ; ii++) { logger.debug("message " + ii); } mockClient.allowWriterThread(); assertEquals("first batch, number of events in request", 10, mockClient.mostRecentEvents.size()); List<InputLogEvent> preservedEvents = new ArrayList<InputLogEvent>(mockClient.mostRecentEvents); // the first batch should have been returned to the message queue, in order mockClient.allowWriterThread(); assertEquals("second batch, number of events in request", 10, mockClient.mostRecentEvents.size()); for (int ii = 0 ; ii < mockClient.mostRecentEvents.size() ; ii++) { assertEquals("event #" + ii, preservedEvents.get(ii), mockClient.mostRecentEvents.get(ii)); } // now assert that those messages will not be resent for (int ii = 100 ; ii < 102 ; ii++) { logger.debug("message " + ii); } mockClient.allowWriterThread(); assertEquals("third batch, number of events in request", 2, mockClient.mostRecentEvents.size()); }
@Test(timeout = 15000) public void concurrencyAndThroughput() throws InterruptedException { final AWSLogs awsLogs = mock(AWSLogs.class); when(awsLogs.putLogEvents(any(PutLogEventsRequest.class))).thenReturn(new PutLogEventsResult()); final CloudwatchLogsLogbackAppender appender = new CloudwatchLogsLogbackAppender() { @Override CloudwatchLogsLogEventPutter createCloudwatchLogsLogEventPutter() { return new CloudwatchLogsLogEventPutter(config, eventQueue, awsLogs, true); } }; appender.setConfig(new CloudwatchLogsConfig()); appender.start(); ExecutorService executorService = Executors.newFixedThreadPool(NUM_THREADS); final LoggerContext loggerContext = new LoggerContext(); for (int i = 0; i < NUM_THREADS; i++) { executorService.execute(new Runnable() { @Override public void run() { String fqcn = "class-" + UUID.randomUUID(); Logger logger = loggerContext.getLogger(fqcn); for (int j = 0; j < EVENTS_PER_THREAD; j++) { appender.append(new LoggingEvent(fqcn, logger, Level.DEBUG, "msg-" + j, null, null)); } } }); } executorService.shutdown(); executorService.awaitTermination(1, TimeUnit.MINUTES); while (NUM_THREADS * EVENTS_PER_THREAD > appender.putter.getProcessedCount()) { Thread.sleep(100); } appender.stop(); assertEquals(0, appender.getDiscardedCount()); assertEquals(NUM_THREADS * EVENTS_PER_THREAD, appender.getProcessedCount()); assertEquals(NUM_THREADS * EVENTS_PER_THREAD, appender.putter.getProcessedCount()); }