public void testCustomInterceptor() throws Exception { getMockEndpoint("mock:child").expectedMessageCount(3); getMockEndpoint("mock:result").expectedMessageCount(1); template.sendBody("direct:start", "A,B,C"); assertMockEndpointsSatisfied(); assertEquals(4, myInterceptor.getDefs().size()); assertIsInstanceOf(LogDefinition.class, myInterceptor.getDefs().get(0)); assertIsInstanceOf(ToDefinition.class, myInterceptor.getDefs().get(1)); assertEquals("mock:child", myInterceptor.getDefs().get(1).getLabel()); assertIsInstanceOf(SplitDefinition.class, myInterceptor.getDefs().get(2)); assertIsInstanceOf(ToDefinition.class, myInterceptor.getDefs().get(3)); assertEquals("mock:result", myInterceptor.getDefs().get(3).getLabel()); }
public Processor createProcessor(RouteContext routeContext, ProcessorDefinition<?> definition) throws Exception { if (definition instanceof SplitDefinition) { // add additional output to the splitter SplitDefinition split = (SplitDefinition) definition; split.addOutput(new ToDefinition("mock:extra")); } if (definition instanceof SetBodyDefinition) { SetBodyDefinition set = (SetBodyDefinition) definition; set.setExpression(new ConstantExpression("body was altered")); } // return null to let the default implementation create the processor, we just wanted to alter the definition // before the processor was created return null; }
@Override public void configure() throws Exception { log.debug("Loading Bulk Ingest Process: @" + folder); fEPoint = endpoint( "file:" + folder + "?noop=false&sortBy=file:name&move=.done&delay=" + BULK_INGEST_POLL_INTERVAL, FileEndpoint.class); fEPoint.setFilter(new BulkIngestFileFilter()); RouteDefinition route = from(fEPoint); route.setId(folder); SplitDefinition split = route.split().tokenizeXML("ingestDocument"); split.streaming(); AggregateDefinition aggregator = split.aggregate(constant(true), new BodyAggregator()); aggregator.setParallelProcessing(BULK_PROCESSOR_MULTI_THREADED); aggregator.completionPredicate(new SplitPredicate(BULK_PROCESSOR_SPLIT_SIZE)); ThreadsDefinition threads = aggregator.threads(BULK_PROCESSOR_THREADS_MIN, BULK_PROCESSOR_THREADS_MAX); bulkIngestNIndexProcessor = new BulkIngestNIndexProcessor(user, action); threads.process(bulkIngestNIndexProcessor); threads.setThreadName("bulkIngest"); route.setErrorHandlerBuilder(DocStoreCamelContext.getInstance().getErrorHandler()); log.info("Loaded Bulk Ingest Process: @" + folder); }
protected SplitDefinition getSplitter() { SplitDefinition result = null; List<RouteDefinition> routeDefinitions = context.getRouteDefinitions(); for (RouteDefinition routeType : routeDefinitions) { result = firstSplitterType(routeType.getOutputs()); if (result != null) { break; } } return result; }
public void testUnknownType() throws Exception { try { context.getRouteDefinitions().get(0).adviceWith(context, new AdviceWithRouteBuilder() { @Override public void configure() throws Exception { weaveByType(SplitDefinition.class).replace().to("mock:xxx"); } }); fail("Should hve thrown exception"); } catch (IllegalArgumentException e) { assertTrue(e.getMessage(), e.getMessage().startsWith("There are no outputs which matches: SplitDefinition in the route")); } }
@Test public void testWeaveByType() throws Exception { RouteDefinition route = context.getRouteDefinition("quotes"); route.adviceWith(context, new AdviceWithRouteBuilder() { @Override public void configure() throws Exception { // find the splitter and insert the route snippet before it weaveByType(SplitDefinition.class) .before() .filter(body().contains("Donkey")) .transform(simple("${body},Mules cannot do this")); } }); context.start(); getMockEndpoint("mock:line").expectedBodiesReceived("camel rules", "donkey is bad", "mules cannot do this"); getMockEndpoint("mock:combined").expectedMessageCount(1); getMockEndpoint("mock:combined").message(0).body().isInstanceOf(List.class); template.sendBody("seda:quotes", "Camel Rules,Donkey is Bad"); assertMockEndpointsSatisfied(); resetMocks(); // try again without the donkeys getMockEndpoint("mock:line").expectedBodiesReceived("beer is good", "whiskey is better"); getMockEndpoint("mock:combined").expectedMessageCount(1); getMockEndpoint("mock:combined").message(0).body().isInstanceOf(List.class); template.sendBody("seda:quotes", "Beer is good,Whiskey is better"); assertMockEndpointsSatisfied(); }
public ManagedSplitter(CamelContext context, Splitter processor, SplitDefinition definition) { super(context, processor, definition); this.processor = processor; }
@Override public SplitDefinition getDefinition() { return (SplitDefinition) super.getDefinition(); }
public static SplitDefinition aggregationStrategy(SplitDefinition self, Closure<Exchange> aggregationLogic) { return self.aggregationStrategy(toAggregationStrategy(aggregationLogic)); }
public static SplitDefinition onPrepare(SplitDefinition self, Closure<Exchange> processorLogic) { return self.onPrepare(toProcessor(processorLogic)); }
public static SplitDefinition split(ProcessorDefinition<?> self, Closure<?> expression) { return self.split(toExpression(expression)); }