@Override @SuppressWarnings("unchecked") public Processor createProcessor(RouteContext routeContext) throws Exception { Processor childProcessor = this.createChildProcessor(routeContext, true); IdempotentRepository<String> idempotentRepository = (IdempotentRepository<String>) resolveMessageIdRepository(routeContext); ObjectHelper.notNull(idempotentRepository, "idempotentRepository", this); Expression expression = getExpression().createExpression(routeContext); // these boolean should be true by default boolean eager = getEager() == null || getEager(); boolean duplicate = getSkipDuplicate() == null || getSkipDuplicate(); boolean remove = getRemoveOnFailure() == null || getRemoveOnFailure(); // these boolean should be false by default boolean completionEager = getCompletionEager() != null && getCompletionEager(); return new IdempotentConsumer(expression, idempotentRepository, eager, completionEager, duplicate, remove, childProcessor); }
public void testNotEager() throws Exception { context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { final IdempotentRepository<String> repo = MemoryIdempotentRepository.memoryIdempotentRepository(200); from("direct:start").idempotentConsumer(header("messageId"), repo).eager(false). process(new Processor() { public void process(Exchange exchange) throws Exception { String id = exchange.getIn().getHeader("messageId", String.class); // should not contain assertFalse("Should not eager add to repo", repo.contains(id)); } }).to("mock:result"); } }); context.start(); resultEndpoint.expectedBodiesReceived("one", "two", "three"); sendMessage("1", "one"); sendMessage("2", "two"); sendMessage("3", "three"); assertMockEndpointsSatisfied(); }
public void testEager() throws Exception { context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { final IdempotentRepository<String> repo = MemoryIdempotentRepository.memoryIdempotentRepository(200); from("direct:start").idempotentConsumer(header("messageId"), repo).eager(true). process(new Processor() { public void process(Exchange exchange) throws Exception { String id = exchange.getIn().getHeader("messageId", String.class); // should contain assertTrue("Should eager add to repo", repo.contains(id)); } }).to("mock:result"); } }); context.start(); resultEndpoint.expectedBodiesReceived("one", "two", "three"); sendMessage("1", "one"); sendMessage("2", "two"); sendMessage("3", "three"); assertMockEndpointsSatisfied(); }
@Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override public void configure() throws Exception { IdempotentRepository<?> repository = context.getRegistry().lookupByNameAndType("messageIdRepository", IdempotentRepository.class); from("activemq:queue:inbox") .transacted("required") .to("mock:a") .idempotentConsumer(header("uid"), repository) .to("mock:b") .transform(simple("DONE-${body}")) .to("activemq:queue:outbox"); } }; }
@SuppressWarnings("unchecked") @Override protected void setUp() throws Exception { deleteDirectory("target/fileidempotent"); createDirectory("target/fileidempotent"); File file = new File("target/fileidempotent/.filestore.dat"); FileOutputStream fos = new FileOutputStream(file); // insert existing name to the file repo, so we should skip this file String name = FileUtil.normalizePath(new File("target/fileidempotent/report.txt").getAbsolutePath()); fos.write(name.getBytes()); fos.write(LS.getBytes()); fos.close(); super.setUp(); // add a file to the repo repo = context.getRegistry().lookupByNameAndType("fileStore", IdempotentRepository.class); }
@Test public void testTransactedExceptionThrown() throws InterruptedException { AuditLogDao auditLogDao = getMandatoryBean(AuditLogDao.class, "auditLogDao"); String message = "this message will explode"; assertEquals(0, auditLogDao.getAuditCount(message)); MockEndpoint mockCompleted = getMockEndpoint("mock:out"); mockCompleted.setExpectedMessageCount(1); mockCompleted.whenAnyExchangeReceived(new ExceptionThrowingProcessor()); try { template.sendBodyAndHeader("direct:transacted", message, "messageId", "foo"); fail(); } catch (CamelExecutionException cee) { assertEquals("boom!", ExceptionUtils.getRootCause(cee).getMessage()); } assertMockEndpointsSatisfied(); assertEquals(0, auditLogDao.getAuditCount(message)); // the insert was rolled back @SuppressWarnings("unchecked") IdempotentRepository<String> idempotentRepository = getMandatoryBean(IdempotentRepository.class, "jdbcIdempotentRepository"); // even though the transaction rolled back, the repository should still contain an entry for this messageId assertTrue(idempotentRepository.contains("foo")); }
@Test public void testTransactedExceptionNotThrown() throws InterruptedException { AuditLogDao auditLogDao = getMandatoryBean(AuditLogDao.class, "auditLogDao"); String message = "this message will be OK"; assertEquals(0, auditLogDao.getAuditCount(message)); MockEndpoint mockCompleted = getMockEndpoint("mock:out"); mockCompleted.setExpectedMessageCount(1); template.sendBodyAndHeader("direct:transacted", message, "messageId", "foo"); assertMockEndpointsSatisfied(); assertEquals(1, auditLogDao.getAuditCount(message)); // the insert was successful @SuppressWarnings("unchecked") IdempotentRepository<String> idempotentRepository = getMandatoryBean(IdempotentRepository.class, "jdbcIdempotentRepository"); // even though the transaction rolled back, the repository should still contain an entry for this messageId assertTrue(idempotentRepository.contains("foo")); }
/** * Strategy method to resolve the {@link org.apache.camel.spi.IdempotentRepository} to use * * @param routeContext route context * @return the repository */ protected IdempotentRepository<?> resolveMessageIdRepository(RouteContext routeContext) { if (messageIdRepositoryRef != null) { idempotentRepository = routeContext.mandatoryLookup(messageIdRepositoryRef, IdempotentRepository.class); } return idempotentRepository; }
public IdempotentConsumer(Expression messageIdExpression, IdempotentRepository<String> idempotentRepository, boolean eager, boolean completionEager, boolean skipDuplicate, boolean removeOnFailure, Processor processor) { this.messageIdExpression = messageIdExpression; this.idempotentRepository = idempotentRepository; this.eager = eager; this.completionEager = completionEager; this.skipDuplicate = skipDuplicate; this.removeOnFailure = removeOnFailure; this.processor = AsyncProcessorConverterHelper.convert(processor); }
public void testNotSkiDuplicate() throws Exception { context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { IdempotentRepository<String> repo = MemoryIdempotentRepository.memoryIdempotentRepository(200); from("direct:start") .idempotentConsumer(header("messageId")).messageIdRepository(repo).skipDuplicate(false) .to("mock:result"); } }); context.start(); resultEndpoint.expectedBodiesReceived("one", "two", "one", "two", "one", "three"); resultEndpoint.message(0).exchangeProperty(Exchange.DUPLICATE_MESSAGE).isNull(); resultEndpoint.message(1).exchangeProperty(Exchange.DUPLICATE_MESSAGE).isNull(); resultEndpoint.message(2).exchangeProperty(Exchange.DUPLICATE_MESSAGE).isEqualTo(Boolean.TRUE); resultEndpoint.message(3).exchangeProperty(Exchange.DUPLICATE_MESSAGE).isEqualTo(Boolean.TRUE); resultEndpoint.message(4).exchangeProperty(Exchange.DUPLICATE_MESSAGE).isEqualTo(Boolean.TRUE); resultEndpoint.message(5).exchangeProperty(Exchange.DUPLICATE_MESSAGE).isNull(); sendMessage("1", "one"); sendMessage("2", "two"); sendMessage("1", "one"); sendMessage("2", "two"); sendMessage("1", "one"); sendMessage("3", "three"); assertMockEndpointsSatisfied(); }
public void testNotSkiDuplicateWithFilter() throws Exception { context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { IdempotentRepository<String> repo = MemoryIdempotentRepository.memoryIdempotentRepository(200); // START SNIPPET: e1 from("direct:start") // instruct idempotent consumer to not skip duplicates as we will filter then our self .idempotentConsumer(header("messageId")).messageIdRepository(repo).skipDuplicate(false) .filter(property(Exchange.DUPLICATE_MESSAGE).isEqualTo(true)) // filter out duplicate messages by sending them to someplace else and then stop .to("mock:duplicate") .stop() .end() // and here we process only new messages (no duplicates) .to("mock:result"); // END SNIPPET: e1 } }); context.start(); resultEndpoint.expectedBodiesReceived("one", "two", "three"); getMockEndpoint("mock:duplicate").expectedBodiesReceived("one", "two", "one"); getMockEndpoint("mock:duplicate").allMessages().exchangeProperty(Exchange.DUPLICATE_MESSAGE).isEqualTo(Boolean.TRUE); sendMessage("1", "one"); sendMessage("2", "two"); sendMessage("1", "one"); sendMessage("2", "two"); sendMessage("1", "one"); sendMessage("3", "three"); assertMockEndpointsSatisfied(); }
private void assertStoreExists(File store) throws Exception { // Given IdempotentRepository<String> repo = fileIdempotentRepository(store); // must start repo repo.start(); // When repo.add("anyKey"); // Then assertTrue(store.exists()); repo.stop(); }
@SuppressWarnings("unchecked") @Override protected void setUp() throws Exception { deleteDirectory("target/fileidempotent"); super.setUp(); repo = context.getRegistry().lookupByNameAndType("fileStore", IdempotentRepository.class); }
@Test @SuppressWarnings("unchecked") public void testIdempotent() throws Exception { IdempotentRepository repo = context.getRegistry().lookupByNameAndType("repo", IdempotentRepository.class); // the repo should not yet contain these unique keys assertFalse(repo.contains("123")); assertFalse(repo.contains("456")); assertFalse(repo.contains("789")); // we expect 3 non duplicate order messages getMockEndpoint("mock:order").expectedMessageCount(3); getMockEndpoint("mock:order").assertNoDuplicates(header("orderId")); // but there is 5 incoming messages, where as 2 should be duplicate messages getMockEndpoint("mock:inbox").expectedMessageCount(5); template.sendBodyAndHeader("seda:inbox", "Motor", "orderId", "123"); template.sendBodyAndHeader("seda:inbox", "Motor", "orderId", "123"); template.sendBodyAndHeader("seda:inbox", "Tires", "orderId", "789"); template.sendBodyAndHeader("seda:inbox", "Brake pad", "orderId", "456"); template.sendBodyAndHeader("seda:inbox", "Tires", "orderId", "789"); assertMockEndpointsSatisfied(); // the repo should contain these unique keys assertTrue(repo.contains("123")); assertTrue(repo.contains("456")); assertTrue(repo.contains("789")); }
@Test public void testWebserviceExceptionRollsBackTransactionAndIdempotentRepository() throws InterruptedException { AuditLogDao auditLogDao = getMandatoryBean(AuditLogDao.class, "auditLogDao"); String message = "this message will be OK"; assertEquals(0, auditLogDao.getAuditCount(message)); MockEndpoint mockCompleted = getMockEndpoint("mock:out"); mockCompleted.setExpectedMessageCount(0); MockEndpoint mockWs = getMockEndpoint("mock:ws"); mockWs.whenAnyExchangeReceived(new ExceptionThrowingProcessor("ws is down")); try { template.sendBodyAndHeader("direct:transacted", message, "messageId", "foo"); fail(); } catch (CamelExecutionException cee) { assertEquals("ws is down", ExceptionUtils.getRootCause(cee).getMessage()); } assertMockEndpointsSatisfied(); assertEquals(0, auditLogDao.getAuditCount(message)); // the insert was successful @SuppressWarnings("unchecked") IdempotentRepository<String> idempotentRepository = getMandatoryBean(IdempotentRepository.class, "jdbcIdempotentRepository"); // the repository has not seen this messageId assertTrue(!idempotentRepository.contains("foo")); }
/** * {@link IdempotentRepository} backed by Redis. Used to prevent retweets being * processed more than once */ @Bean public IdempotentRepository springCacheIdempotentRepository( CacheManager cacheManager) { return new SpringCacheIdempotentRepository(cacheManager, "retweets"); }
public TwitterSourceRoute(TwitterProperties properties, IdempotentRepository idempotentRepository) { this.properties = properties; this.idempotentRepository = idempotentRepository; }
public IdempotentConsumerDefinition(Expression messageIdExpression, IdempotentRepository<?> idempotentRepository) { super(messageIdExpression); this.idempotentRepository = idempotentRepository; }
public IdempotentRepository<?> getMessageIdRepository() { return idempotentRepository; }
public void setMessageIdRepository(IdempotentRepository<?> idempotentRepository) { this.idempotentRepository = idempotentRepository; }
public IdempotentRepository<String> getIdempotentRepository() { return idempotentRepository; }
public IdempotentOnCompletion(IdempotentRepository<String> idempotentRepository, String messageId, boolean eager, boolean removeOnFailure) { this.idempotentRepository = idempotentRepository; this.messageId = messageId; this.eager = eager; this.removeOnFailure = removeOnFailure; }
public IdempotentRepository<String> getInProgressRepository() { return inProgressRepository; }
/** * The idempotent repository to use as the store for the read locks. */ public IdempotentRepository<String> getIdempotentRepository() { return idempotentRepository; }
/** * The idempotent repository to use as the store for the read locks. */ public void setIdempotentRepository(IdempotentRepository<String> idempotentRepository) { this.idempotentRepository = idempotentRepository; }
public static IdempotentConsumerDefinition idempotentConsumer(ProcessorDefinition<?> self, IdempotentRepository<?> rep, Closure<?> expression) { return self.idempotentConsumer(toExpression(expression), rep); }
public IdempotentConsumerInTransactionRoute(IdempotentRepository idempotentRepository) { this.idempotentRepository = idempotentRepository; }
/** * <a href="http://camel.apache.org/idempotent-consumer.html">Idempotent consumer EIP:</a> * Creates an {@link org.apache.camel.processor.idempotent.IdempotentConsumer IdempotentConsumer} * to avoid duplicate messages * * @param idempotentRepository the repository to use for duplicate check * @return the builder used to create the expression * @deprecated will be removed in Camel 3.0. Instead use any of the other methods */ @Deprecated public ExpressionClause<IdempotentConsumerDefinition> idempotentConsumer(IdempotentRepository<?> idempotentRepository) { IdempotentConsumerDefinition answer = new IdempotentConsumerDefinition(); answer.setMessageIdRepository(idempotentRepository); addOutput(answer); return ExpressionClause.createAndSetExpression(answer); }
/** * <a href="http://camel.apache.org/idempotent-consumer.html">Idempotent consumer EIP:</a> * Creates an {@link org.apache.camel.processor.idempotent.IdempotentConsumer IdempotentConsumer} * to avoid duplicate messages * * @param messageIdExpression expression to test of duplicate messages * @param idempotentRepository the repository to use for duplicate check * @return the builder */ public IdempotentConsumerDefinition idempotentConsumer(Expression messageIdExpression, IdempotentRepository<?> idempotentRepository) { IdempotentConsumerDefinition answer = new IdempotentConsumerDefinition(messageIdExpression, idempotentRepository); addOutput(answer); return answer; }
/** * Sets the the message id repository for the idempotent consumer * * @param idempotentRepository the repository instance of idempotent * @return builder */ public IdempotentConsumerDefinition messageIdRepository(IdempotentRepository<?> idempotentRepository) { setMessageIdRepository(idempotentRepository); return this; }
/** * Creates a new memory based repository using a {@link LRUCache} * with a default of 1000 entries in the cache. */ public static IdempotentRepository<String> memoryIdempotentRepository() { return new MemoryIdempotentRepository(); }
/** * Creates a new memory based repository using a {@link LRUCache}. * * @param cacheSize the cache size */ public static IdempotentRepository<String> memoryIdempotentRepository(int cacheSize) { return memoryIdempotentRepository(new LRUCache<String, Object>(cacheSize)); }
/** * Creates a new memory based repository using the given {@link Map} to * use to store the processed message ids. * <p/> * Care should be taken to use a suitable underlying {@link Map} to avoid this class being a * memory leak. * * @param cache the cache */ public static IdempotentRepository<String> memoryIdempotentRepository(Map<String, Object> cache) { return new MemoryIdempotentRepository(cache); }
/** * Creates a new file based repository using a {@link org.apache.camel.util.LRUCache} * as 1st level cache with a default of 1000 entries in the cache. * * @param fileStore the file store */ public static IdempotentRepository<String> fileIdempotentRepository(File fileStore) { return fileIdempotentRepository(fileStore, 1000); }
/** * Creates a new file based repository using a {@link org.apache.camel.util.LRUCache} * as 1st level cache. * * @param fileStore the file store * @param cacheSize the cache size */ public static IdempotentRepository<String> fileIdempotentRepository(File fileStore, int cacheSize) { return fileIdempotentRepository(fileStore, new LRUCache<String, Object>(cacheSize)); }
/** * Creates a new file based repository using a {@link org.apache.camel.util.LRUCache} * as 1st level cache. * * @param fileStore the file store * @param cacheSize the cache size * @param maxFileStoreSize the max size in bytes for the filestore file */ public static IdempotentRepository<String> fileIdempotentRepository(File fileStore, int cacheSize, long maxFileStoreSize) { FileIdempotentRepository repository = new FileIdempotentRepository(fileStore, new LRUCache<String, Object>(cacheSize)); repository.setMaxFileStoreSize(maxFileStoreSize); return repository; }
/** * Creates a new file based repository using the given {@link java.util.Map} * as 1st level cache. * <p/> * Care should be taken to use a suitable underlying {@link java.util.Map} to avoid this class being a * memory leak. * * @param store the file store * @param cache the cache to use as 1st level cache */ public static IdempotentRepository<String> fileIdempotentRepository(File store, Map<String, Object> cache) { return new FileIdempotentRepository(store, cache); }
/** * A pluggable repository org.apache.camel.spi.IdempotentRepository which by default use MemoryMessageIdRepository * if none is specified and idempotent is true. */ public void setIdempotentRepository(IdempotentRepository<String> idempotentRepository) { this.idempotentRepository = idempotentRepository; }