Java 类org.apache.camel.spi.IdempotentRepository 实例源码

项目:Camel    文件:IdempotentConsumerDefinition.java   
@Override
@SuppressWarnings("unchecked")
public Processor createProcessor(RouteContext routeContext) throws Exception {
    Processor childProcessor = this.createChildProcessor(routeContext, true);

    IdempotentRepository<String> idempotentRepository =
            (IdempotentRepository<String>) resolveMessageIdRepository(routeContext);
    ObjectHelper.notNull(idempotentRepository, "idempotentRepository", this);

    Expression expression = getExpression().createExpression(routeContext);

    // these boolean should be true by default
    boolean eager = getEager() == null || getEager();
    boolean duplicate = getSkipDuplicate() == null || getSkipDuplicate();
    boolean remove = getRemoveOnFailure() == null || getRemoveOnFailure();
    // these boolean should be false by default
    boolean completionEager = getCompletionEager() != null && getCompletionEager();

    return new IdempotentConsumer(expression, idempotentRepository, eager, completionEager, duplicate, remove, childProcessor);
}
项目:Camel    文件:IdempotentConsumerEagerTest.java   
public void testNotEager() throws Exception {
    context.addRoutes(new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            final IdempotentRepository<String> repo = MemoryIdempotentRepository.memoryIdempotentRepository(200);

            from("direct:start").idempotentConsumer(header("messageId"), repo).eager(false).
                    process(new Processor() {
                        public void process(Exchange exchange) throws Exception {
                            String id = exchange.getIn().getHeader("messageId", String.class);
                            // should not contain
                            assertFalse("Should not eager add to repo", repo.contains(id));
                        }
                    }).to("mock:result");
        }
    });
    context.start();

    resultEndpoint.expectedBodiesReceived("one", "two", "three");

    sendMessage("1", "one");
    sendMessage("2", "two");
    sendMessage("3", "three");

    assertMockEndpointsSatisfied();
}
项目:Camel    文件:IdempotentConsumerEagerTest.java   
public void testEager() throws Exception {
    context.addRoutes(new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            final IdempotentRepository<String> repo = MemoryIdempotentRepository.memoryIdempotentRepository(200);

            from("direct:start").idempotentConsumer(header("messageId"), repo).eager(true).
                    process(new Processor() {
                        public void process(Exchange exchange) throws Exception {
                            String id = exchange.getIn().getHeader("messageId", String.class);
                            // should contain
                            assertTrue("Should eager add to repo", repo.contains(id));
                        }
                    }).to("mock:result");
        }
    });
    context.start();

    resultEndpoint.expectedBodiesReceived("one", "two", "three");

    sendMessage("1", "one");
    sendMessage("2", "two");
    sendMessage("3", "three");

    assertMockEndpointsSatisfied();
}
项目:Camel    文件:FromJmsToJdbcIdempotentConsumerToJmsTest.java   
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
    return new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            IdempotentRepository<?> repository = context.getRegistry().lookupByNameAndType("messageIdRepository", IdempotentRepository.class);

            from("activemq:queue:inbox")
                .transacted("required")
                .to("mock:a")
                .idempotentConsumer(header("uid"), repository)
                .to("mock:b")
                .transform(simple("DONE-${body}"))
                .to("activemq:queue:outbox");
        }
    };
}
项目:Camel    文件:FileConsumerIdempotentLoadStoreTest.java   
@SuppressWarnings("unchecked")
@Override
protected void setUp() throws Exception {
    deleteDirectory("target/fileidempotent");
    createDirectory("target/fileidempotent");

    File file = new File("target/fileidempotent/.filestore.dat");
    FileOutputStream fos = new FileOutputStream(file);

    // insert existing name to the file repo, so we should skip this file
    String name = FileUtil.normalizePath(new File("target/fileidempotent/report.txt").getAbsolutePath());
    fos.write(name.getBytes());
    fos.write(LS.getBytes());

    fos.close();

    super.setUp();

    // add a file to the repo
    repo = context.getRegistry().lookupByNameAndType("fileStore", IdempotentRepository.class);
}
项目:camel-cookbook-examples    文件:IdempotentConsumerInTransactionSpringTest.java   
@Test
public void testTransactedExceptionThrown() throws InterruptedException {
    AuditLogDao auditLogDao = getMandatoryBean(AuditLogDao.class, "auditLogDao");
    String message = "this message will explode";
    assertEquals(0, auditLogDao.getAuditCount(message));

    MockEndpoint mockCompleted = getMockEndpoint("mock:out");
    mockCompleted.setExpectedMessageCount(1);
    mockCompleted.whenAnyExchangeReceived(new ExceptionThrowingProcessor());

    try {
        template.sendBodyAndHeader("direct:transacted", message, "messageId", "foo");
        fail();
    } catch (CamelExecutionException cee) {
        assertEquals("boom!", ExceptionUtils.getRootCause(cee).getMessage());
    }

    assertMockEndpointsSatisfied();
    assertEquals(0, auditLogDao.getAuditCount(message)); // the insert was rolled back

    @SuppressWarnings("unchecked")
    IdempotentRepository<String> idempotentRepository = getMandatoryBean(IdempotentRepository.class, "jdbcIdempotentRepository");

    // even though the transaction rolled back, the repository should still contain an entry for this messageId
    assertTrue(idempotentRepository.contains("foo"));
}
项目:camel-cookbook-examples    文件:IdempotentConsumerInTransactionSpringTest.java   
@Test
public void testTransactedExceptionNotThrown() throws InterruptedException {
    AuditLogDao auditLogDao = getMandatoryBean(AuditLogDao.class, "auditLogDao");
    String message = "this message will be OK";
    assertEquals(0, auditLogDao.getAuditCount(message));

    MockEndpoint mockCompleted = getMockEndpoint("mock:out");
    mockCompleted.setExpectedMessageCount(1);

    template.sendBodyAndHeader("direct:transacted", message, "messageId", "foo");

    assertMockEndpointsSatisfied();
    assertEquals(1, auditLogDao.getAuditCount(message)); // the insert was successful

    @SuppressWarnings("unchecked")
    IdempotentRepository<String> idempotentRepository = getMandatoryBean(IdempotentRepository.class, "jdbcIdempotentRepository");

    // even though the transaction rolled back, the repository should still contain an entry for this messageId
    assertTrue(idempotentRepository.contains("foo"));
}
项目:Camel    文件:IdempotentConsumerDefinition.java   
/**
 * Strategy method to resolve the {@link org.apache.camel.spi.IdempotentRepository} to use
 *
 * @param routeContext route context
 * @return the repository
 */
protected IdempotentRepository<?> resolveMessageIdRepository(RouteContext routeContext) {
    if (messageIdRepositoryRef != null) {
        idempotentRepository = routeContext.mandatoryLookup(messageIdRepositoryRef, IdempotentRepository.class);
    }
    return idempotentRepository;
}
项目:Camel    文件:IdempotentConsumer.java   
public IdempotentConsumer(Expression messageIdExpression, IdempotentRepository<String> idempotentRepository,
                          boolean eager, boolean completionEager, boolean skipDuplicate, boolean removeOnFailure, Processor processor) {
    this.messageIdExpression = messageIdExpression;
    this.idempotentRepository = idempotentRepository;
    this.eager = eager;
    this.completionEager = completionEager;
    this.skipDuplicate = skipDuplicate;
    this.removeOnFailure = removeOnFailure;
    this.processor = AsyncProcessorConverterHelper.convert(processor);
}
项目:Camel    文件:IdempotentConsumerTest.java   
public void testNotSkiDuplicate() throws Exception {
    context.addRoutes(new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            IdempotentRepository<String> repo = MemoryIdempotentRepository.memoryIdempotentRepository(200);

            from("direct:start")
                .idempotentConsumer(header("messageId")).messageIdRepository(repo).skipDuplicate(false)
                .to("mock:result");
        }
    });
    context.start();

    resultEndpoint.expectedBodiesReceived("one", "two", "one", "two", "one", "three");
    resultEndpoint.message(0).exchangeProperty(Exchange.DUPLICATE_MESSAGE).isNull();
    resultEndpoint.message(1).exchangeProperty(Exchange.DUPLICATE_MESSAGE).isNull();
    resultEndpoint.message(2).exchangeProperty(Exchange.DUPLICATE_MESSAGE).isEqualTo(Boolean.TRUE);
    resultEndpoint.message(3).exchangeProperty(Exchange.DUPLICATE_MESSAGE).isEqualTo(Boolean.TRUE);
    resultEndpoint.message(4).exchangeProperty(Exchange.DUPLICATE_MESSAGE).isEqualTo(Boolean.TRUE);
    resultEndpoint.message(5).exchangeProperty(Exchange.DUPLICATE_MESSAGE).isNull();

    sendMessage("1", "one");
    sendMessage("2", "two");
    sendMessage("1", "one");
    sendMessage("2", "two");
    sendMessage("1", "one");
    sendMessage("3", "three");

    assertMockEndpointsSatisfied();
}
项目:Camel    文件:IdempotentConsumerTest.java   
public void testNotSkiDuplicateWithFilter() throws Exception {
    context.addRoutes(new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            IdempotentRepository<String> repo = MemoryIdempotentRepository.memoryIdempotentRepository(200);

            // START SNIPPET: e1
            from("direct:start")
                // instruct idempotent consumer to not skip duplicates as we will filter then our self
                .idempotentConsumer(header("messageId")).messageIdRepository(repo).skipDuplicate(false)
                .filter(property(Exchange.DUPLICATE_MESSAGE).isEqualTo(true))
                    // filter out duplicate messages by sending them to someplace else and then stop
                    .to("mock:duplicate")
                    .stop()
                .end()
                // and here we process only new messages (no duplicates)
                .to("mock:result");
            // END SNIPPET: e1
        }
    });
    context.start();

    resultEndpoint.expectedBodiesReceived("one", "two", "three");

    getMockEndpoint("mock:duplicate").expectedBodiesReceived("one", "two", "one");
    getMockEndpoint("mock:duplicate").allMessages().exchangeProperty(Exchange.DUPLICATE_MESSAGE).isEqualTo(Boolean.TRUE);

    sendMessage("1", "one");
    sendMessage("2", "two");
    sendMessage("1", "one");
    sendMessage("2", "two");
    sendMessage("1", "one");
    sendMessage("3", "three");

    assertMockEndpointsSatisfied();
}
项目:Camel    文件:FileIdempotentConsumerCreateRepoTest.java   
private void assertStoreExists(File store) throws Exception {
    // Given
    IdempotentRepository<String> repo = fileIdempotentRepository(store);

    // must start repo
    repo.start();

    // When
    repo.add("anyKey");

    // Then
    assertTrue(store.exists());

    repo.stop();
}
项目:Camel    文件:FileConsumerIdempotentTest.java   
@SuppressWarnings("unchecked")
@Override
protected void setUp() throws Exception {
    deleteDirectory("target/fileidempotent");

    super.setUp();
    repo = context.getRegistry().lookupByNameAndType("fileStore", IdempotentRepository.class);
}
项目:camelinaction2    文件:SpringIdempotentTest.java   
@Test
@SuppressWarnings("unchecked")
public void testIdempotent() throws Exception {
    IdempotentRepository repo = context.getRegistry().lookupByNameAndType("repo", IdempotentRepository.class);

    // the repo should not yet contain these unique keys
    assertFalse(repo.contains("123"));
    assertFalse(repo.contains("456"));
    assertFalse(repo.contains("789"));

    // we expect 3 non duplicate order messages
    getMockEndpoint("mock:order").expectedMessageCount(3);
    getMockEndpoint("mock:order").assertNoDuplicates(header("orderId"));

    // but there is 5 incoming messages, where as 2 should be duplicate messages
    getMockEndpoint("mock:inbox").expectedMessageCount(5);

    template.sendBodyAndHeader("seda:inbox", "Motor", "orderId", "123");
    template.sendBodyAndHeader("seda:inbox", "Motor", "orderId", "123");
    template.sendBodyAndHeader("seda:inbox", "Tires", "orderId", "789");
    template.sendBodyAndHeader("seda:inbox", "Brake pad", "orderId", "456");
    template.sendBodyAndHeader("seda:inbox", "Tires", "orderId", "789");

    assertMockEndpointsSatisfied();

    // the repo should contain these unique keys
    assertTrue(repo.contains("123"));
    assertTrue(repo.contains("456"));
    assertTrue(repo.contains("789"));
}
项目:camel-cookbook-examples    文件:IdempotentConsumerInTransactionSpringTest.java   
@Test
public void testWebserviceExceptionRollsBackTransactionAndIdempotentRepository() throws InterruptedException {
    AuditLogDao auditLogDao = getMandatoryBean(AuditLogDao.class, "auditLogDao");
    String message = "this message will be OK";
    assertEquals(0, auditLogDao.getAuditCount(message));

    MockEndpoint mockCompleted = getMockEndpoint("mock:out");
    mockCompleted.setExpectedMessageCount(0);

    MockEndpoint mockWs = getMockEndpoint("mock:ws");
    mockWs.whenAnyExchangeReceived(new ExceptionThrowingProcessor("ws is down"));

    try {
        template.sendBodyAndHeader("direct:transacted", message, "messageId", "foo");
        fail();
    } catch (CamelExecutionException cee) {
        assertEquals("ws is down", ExceptionUtils.getRootCause(cee).getMessage());
    }

    assertMockEndpointsSatisfied();
    assertEquals(0, auditLogDao.getAuditCount(message)); // the insert was successful

    @SuppressWarnings("unchecked")
    IdempotentRepository<String> idempotentRepository = getMandatoryBean(IdempotentRepository.class, "jdbcIdempotentRepository");

    // the repository has not seen this messageId
    assertTrue(!idempotentRepository.contains("foo"));
}
项目:camel-twitter-stream    文件:TwitterSourceConfiguration.java   
/**
 * {@link IdempotentRepository} backed by Redis. Used to prevent retweets being
 * processed more than once
 */
@Bean
public IdempotentRepository springCacheIdempotentRepository(
        CacheManager cacheManager) {
    return new SpringCacheIdempotentRepository(cacheManager, "retweets");
}
项目:camel-twitter-stream    文件:TwitterSourceRoute.java   
public TwitterSourceRoute(TwitterProperties properties,
                          IdempotentRepository idempotentRepository) {
    this.properties = properties;
    this.idempotentRepository = idempotentRepository;
}
项目:Camel    文件:IdempotentConsumerDefinition.java   
public IdempotentConsumerDefinition(Expression messageIdExpression, IdempotentRepository<?> idempotentRepository) {
    super(messageIdExpression);
    this.idempotentRepository = idempotentRepository;
}
项目:Camel    文件:IdempotentConsumerDefinition.java   
public IdempotentRepository<?> getMessageIdRepository() {
    return idempotentRepository;
}
项目:Camel    文件:IdempotentConsumerDefinition.java   
public void setMessageIdRepository(IdempotentRepository<?> idempotentRepository) {
    this.idempotentRepository = idempotentRepository;
}
项目:Camel    文件:IdempotentConsumer.java   
public IdempotentRepository<String> getIdempotentRepository() {
    return idempotentRepository;
}
项目:Camel    文件:IdempotentOnCompletion.java   
public IdempotentOnCompletion(IdempotentRepository<String> idempotentRepository, String messageId, boolean eager, boolean removeOnFailure) {
    this.idempotentRepository = idempotentRepository;
    this.messageId = messageId;
    this.eager = eager;
    this.removeOnFailure = removeOnFailure;
}
项目:Camel    文件:GenericFileEndpoint.java   
public IdempotentRepository<String> getIdempotentRepository() {
    return idempotentRepository;
}
项目:Camel    文件:GenericFileEndpoint.java   
public IdempotentRepository<String> getInProgressRepository() {
    return inProgressRepository;
}
项目:Camel    文件:FileIdempotentRepositoryReadLockStrategy.java   
/**
 * The idempotent repository to use as the store for the read locks.
 */
public IdempotentRepository<String> getIdempotentRepository() {
    return idempotentRepository;
}
项目:Camel    文件:FileIdempotentRepositoryReadLockStrategy.java   
/**
 * The idempotent repository to use as the store for the read locks.
 */
public void setIdempotentRepository(IdempotentRepository<String> idempotentRepository) {
    this.idempotentRepository = idempotentRepository;
}
项目:Camel    文件:MailEndpoint.java   
public IdempotentRepository<String> getIdempotentRepository() {
    return idempotentRepository;
}
项目:Camel    文件:CamelGroovyMethods.java   
public static IdempotentConsumerDefinition idempotentConsumer(ProcessorDefinition<?> self,
        IdempotentRepository<?> rep, Closure<?> expression) {
    return self.idempotentConsumer(toExpression(expression), rep);
}
项目:camel-cookbook-examples    文件:IdempotentConsumerInTransactionRoute.java   
public IdempotentConsumerInTransactionRoute(IdempotentRepository idempotentRepository) {
    this.idempotentRepository = idempotentRepository;
}
项目:Camel    文件:ProcessorDefinition.java   
/**
 * <a href="http://camel.apache.org/idempotent-consumer.html">Idempotent consumer EIP:</a>
 * Creates an {@link org.apache.camel.processor.idempotent.IdempotentConsumer IdempotentConsumer}
 * to avoid duplicate messages
 *
 * @param idempotentRepository the repository to use for duplicate check
 * @return the builder used to create the expression
 * @deprecated will be removed in Camel 3.0. Instead use any of the other methods
 */
@Deprecated
public ExpressionClause<IdempotentConsumerDefinition> idempotentConsumer(IdempotentRepository<?> idempotentRepository) {
    IdempotentConsumerDefinition answer = new IdempotentConsumerDefinition();
    answer.setMessageIdRepository(idempotentRepository);
    addOutput(answer);
    return ExpressionClause.createAndSetExpression(answer);
}
项目:Camel    文件:ProcessorDefinition.java   
/**
 * <a href="http://camel.apache.org/idempotent-consumer.html">Idempotent consumer EIP:</a>
 * Creates an {@link org.apache.camel.processor.idempotent.IdempotentConsumer IdempotentConsumer}
 * to avoid duplicate messages
 *
 * @param messageIdExpression  expression to test of duplicate messages
 * @param idempotentRepository  the repository to use for duplicate check
 * @return the builder
 */
public IdempotentConsumerDefinition idempotentConsumer(Expression messageIdExpression, IdempotentRepository<?> idempotentRepository) {
    IdempotentConsumerDefinition answer = new IdempotentConsumerDefinition(messageIdExpression, idempotentRepository);
    addOutput(answer);
    return answer;
}
项目:Camel    文件:IdempotentConsumerDefinition.java   
/**
 * Sets the the message id repository for the idempotent consumer
 *
 * @param idempotentRepository the repository instance of idempotent
 * @return builder
 */
public IdempotentConsumerDefinition messageIdRepository(IdempotentRepository<?> idempotentRepository) {
    setMessageIdRepository(idempotentRepository);
    return this;
}
项目:Camel    文件:MemoryIdempotentRepository.java   
/**
 * Creates a new memory based repository using a {@link LRUCache}
 * with a default of 1000 entries in the cache.
 */
public static IdempotentRepository<String> memoryIdempotentRepository() {
    return new MemoryIdempotentRepository();
}
项目:Camel    文件:MemoryIdempotentRepository.java   
/**
 * Creates a new memory based repository using a {@link LRUCache}.
 *
 * @param cacheSize  the cache size
 */
public static IdempotentRepository<String> memoryIdempotentRepository(int cacheSize) {
    return memoryIdempotentRepository(new LRUCache<String, Object>(cacheSize));
}
项目:Camel    文件:MemoryIdempotentRepository.java   
/**
 * Creates a new memory based repository using the given {@link Map} to
 * use to store the processed message ids.
 * <p/>
 * Care should be taken to use a suitable underlying {@link Map} to avoid this class being a
 * memory leak.
 *
 * @param cache  the cache
 */
public static IdempotentRepository<String> memoryIdempotentRepository(Map<String, Object> cache) {
    return new MemoryIdempotentRepository(cache);
}
项目:Camel    文件:FileIdempotentRepository.java   
/**
 * Creates a new file based repository using a {@link org.apache.camel.util.LRUCache}
 * as 1st level cache with a default of 1000 entries in the cache.
 *
 * @param fileStore  the file store
 */
public static IdempotentRepository<String> fileIdempotentRepository(File fileStore) {
    return fileIdempotentRepository(fileStore, 1000);
}
项目:Camel    文件:FileIdempotentRepository.java   
/**
 * Creates a new file based repository using a {@link org.apache.camel.util.LRUCache}
 * as 1st level cache.
 *
 * @param fileStore  the file store
 * @param cacheSize  the cache size
 */
public static IdempotentRepository<String> fileIdempotentRepository(File fileStore, int cacheSize) {
    return fileIdempotentRepository(fileStore, new LRUCache<String, Object>(cacheSize));
}
项目:Camel    文件:FileIdempotentRepository.java   
/**
 * Creates a new file based repository using a {@link org.apache.camel.util.LRUCache}
 * as 1st level cache.
 *
 * @param fileStore  the file store
 * @param cacheSize  the cache size
 * @param maxFileStoreSize  the max size in bytes for the filestore file 
 */
public static IdempotentRepository<String> fileIdempotentRepository(File fileStore, int cacheSize, long maxFileStoreSize) {
    FileIdempotentRepository repository = new FileIdempotentRepository(fileStore, new LRUCache<String, Object>(cacheSize));
    repository.setMaxFileStoreSize(maxFileStoreSize);
    return repository;
}
项目:Camel    文件:FileIdempotentRepository.java   
/**
 * Creates a new file based repository using the given {@link java.util.Map}
 * as 1st level cache.
 * <p/>
 * Care should be taken to use a suitable underlying {@link java.util.Map} to avoid this class being a
 * memory leak.
 *
 * @param store  the file store
 * @param cache  the cache to use as 1st level cache
 */
public static IdempotentRepository<String> fileIdempotentRepository(File store, Map<String, Object> cache) {
    return new FileIdempotentRepository(store, cache);
}
项目:Camel    文件:GenericFileEndpoint.java   
/**
 * A pluggable repository org.apache.camel.spi.IdempotentRepository which by default use MemoryMessageIdRepository
 * if none is specified and idempotent is true.
 */
public void setIdempotentRepository(IdempotentRepository<String> idempotentRepository) {
    this.idempotentRepository = idempotentRepository;
}