/** * Calls the async version of the processor's process method and waits * for it to complete before returning. This can be used by {@link AsyncProcessor} * objects to implement their sync version of the process method. * <p/> * <b>Important:</b> This method is discouraged to be used, as its better to invoke the asynchronous * {@link AsyncProcessor#process(org.apache.camel.Exchange, org.apache.camel.AsyncCallback)} method, whenever possible. * * @param processor the processor * @param exchange the exchange * @throws Exception can be thrown if waiting is interrupted */ public static void process(final AsyncProcessor processor, final Exchange exchange) throws Exception { final AsyncProcessorAwaitManager awaitManager = exchange.getContext().getAsyncProcessorAwaitManager(); final CountDownLatch latch = new CountDownLatch(1); boolean sync = processor.process(exchange, new AsyncCallback() { public void done(boolean doneSync) { if (!doneSync) { awaitManager.countDown(exchange, latch); } } @Override public String toString() { return "Done " + processor; } }); if (!sync) { awaitManager.await(exchange, latch); } }
@Override public boolean process(Exchange exchange, AsyncCallback callback) { if (delegate == null) { exchange.setException(new IllegalStateException("Not started")); callback.done(true); return true; } if (delegate instanceof AsyncProcessor) { return ((AsyncProcessor) delegate).process(exchange, callback); } // fallback to sync mode try { process(exchange); } catch (Exception e) { exchange.setException(e); } callback.done(true); return true; }
public boolean process(Exchange exchange, final AsyncCallback callback) { boolean flag = true; if ((((RouteboxDirectEndpoint)getRouteboxEndpoint()).getConsumer() == null) && ((getRouteboxEndpoint()).getConfig().isSendToConsumer())) { exchange.setException(new CamelExchangeException("No consumers available on endpoint: " + getRouteboxEndpoint(), exchange)); callback.done(true); flag = true; } else { try { LOG.debug("Dispatching to Inner Route {}", exchange); RouteboxDispatcher dispatcher = new RouteboxDispatcher(producer); exchange = dispatcher.dispatchAsync(getRouteboxEndpoint(), exchange); if (getRouteboxEndpoint().getConfig().isSendToConsumer()) { AsyncProcessor processor = ((RouteboxDirectEndpoint)getRouteboxEndpoint()).getConsumer().getAsyncProcessor(); flag = processor.process(exchange, callback); } } catch (Exception e) { getExceptionHandler().handleException("Error processing exchange", exchange, e); } } return flag; }
private boolean processExchange(final Exchange exchange) throws Exception { taskProcessor.process(exchange); final Processor currentProcessor = getProcessor(); if (currentProcessor instanceof AsyncProcessor) { ((AsyncProcessor) currentProcessor).process(exchange, new AsyncCallback() { @Override public void done(boolean doneSync) { // we are not interested in this event } }); } else { currentProcessor.process(exchange); } return true; }
public boolean process(Exchange exchange, AsyncCallback callback) { Iterator<Processor> processors = next().iterator(); Object lastHandled = exchange.getProperty(Exchange.EXCEPTION_HANDLED); exchange.setProperty(Exchange.EXCEPTION_HANDLED, null); while (continueRouting(processors, exchange)) { exchange.setProperty(Exchange.TRY_ROUTE_BLOCK, true); ExchangeHelper.prepareOutToIn(exchange); // process the next processor Processor processor = processors.next(); AsyncProcessor async = AsyncProcessorConverterHelper.convert(processor); boolean sync = process(exchange, callback, processors, async, lastHandled); // continue as long its being processed synchronously if (!sync) { LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", exchange.getExchangeId()); // the remainder of the try .. catch .. finally will be completed async // so we break out now, then the callback will be invoked which then continue routing from where we left here return false; } LOG.trace("Processing exchangeId: {} is continued being processed synchronously", exchange.getExchangeId()); } ExchangeHelper.prepareOutToIn(exchange); exchange.removeProperty(Exchange.TRY_ROUTE_BLOCK); exchange.setProperty(Exchange.EXCEPTION_HANDLED, lastHandled); LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange); callback.done(true); return true; }
private RestBindingMarshalOnCompletion(String routeId, AsyncProcessor jsonMarshal, AsyncProcessor xmlMarshal, boolean wasXml, String accept) { this.routeId = routeId; this.jsonMarshal = jsonMarshal; this.xmlMarshal = xmlMarshal; this.wasXml = wasXml; this.accept = accept; }
private boolean processExchange(Processor processor, Exchange exchange, Exchange copy, AtomicInteger attempts, AtomicInteger index, AsyncCallback callback, List<Processor> processors) { if (processor == null) { throw new IllegalStateException("No processors could be chosen to process " + copy); } log.debug("Processing failover at attempt {} for {}", attempts, copy); AsyncProcessor albp = AsyncProcessorConverterHelper.convert(processor); return albp.process(copy, new FailOverAsyncCallback(exchange, copy, attempts, index, callback, processors)); }
private boolean executeProcessor(final Exchange exchange, final AsyncCallback callback) { Processor processor = getProcessors().get(0); if (processor == null) { throw new IllegalStateException("No processors could be chosen to process CircuitBreaker"); } // store state as exchange property exchange.setProperty(Exchange.CIRCUIT_BREAKER_STATE, stateAsString(state.get())); AsyncProcessor albp = AsyncProcessorConverterHelper.convert(processor); // Added a callback for processing the exchange in the callback boolean sync = albp.process(exchange, new CircuitBreakerCallback(exchange, callback)); // We need to check the exception here as albp is use sync call if (sync) { boolean failed = hasFailed(exchange); if (!failed) { failures.set(0); } else { failures.incrementAndGet(); lastFailure = System.currentTimeMillis(); } } else { // CircuitBreakerCallback can take care of failure check of the // exchange log.trace("Processing exchangeId: {} is continued being processed asynchronously", exchange.getExchangeId()); return false; } log.trace("Processing exchangeId: {} is continued being processed synchronously", exchange.getExchangeId()); callback.done(true); return true; }
/** * Calls the async version of the processor's process method. * <p/> * This implementation supports transacted {@link Exchange}s which ensure those are run in a synchronous fashion. * See more details at {@link org.apache.camel.AsyncProcessor}. * * @param processor the processor * @param exchange the exchange * @param callback the callback * @return <tt>true</tt> to continue execute synchronously, <tt>false</tt> to continue being executed asynchronously * @deprecated should no longer be needed, instead invoke the process method on the {@link AsyncProcessor} directly, * instead of using this method. */ @Deprecated public static boolean process(final AsyncProcessor processor, final Exchange exchange, final AsyncCallback callback) { boolean sync; if (exchange.isTransacted()) { // must be synchronized for transacted exchanges LOG.trace("Transacted Exchange must be routed synchronously for exchangeId: {} -> {}", exchange.getExchangeId(), exchange); try { process(processor, exchange); } catch (Throwable e) { exchange.setException(e); } callback.done(true); sync = true; } else { final UnitOfWork uow = exchange.getUnitOfWork(); // allow unit of work to wrap callback in case it need to do some special work // for example the MDCUnitOfWork AsyncCallback async = callback; if (uow != null) { async = uow.beforeProcess(processor, exchange, callback); } // we support asynchronous routing so invoke it sync = processor.process(exchange, async); // execute any after processor work (in current thread, not in the callback) if (uow != null) { uow.afterProcess(processor, exchange, callback, sync); } } if (LOG.isTraceEnabled()) { LOG.trace("Exchange processed and is continued routed {} for exchangeId: {} -> {}", new Object[]{sync ? "synchronously" : "asynchronously", exchange.getExchangeId(), exchange}); } return sync; }
@Override public <T> T convertTo(Class<T> type, Exchange exchange, Object value) { if (type.equals(AsyncProcessor.class)) { if (value instanceof Processor) { return type.cast(AsyncProcessorConverterHelper.convert((Processor) value)); } } return null; }
/** * Provides an {@link org.apache.camel.AsyncProcessor} interface to the configured * processor on the consumer. If the processor does not implement the interface, * it will be adapted so that it does. */ public synchronized AsyncProcessor getAsyncProcessor() { if (asyncProcessor == null) { asyncProcessor = AsyncProcessorConverterHelper.convert(processor); } return asyncProcessor; }
public boolean process(final Exchange exchange, final AsyncCallback callback) { Iterator<Processor> processors = next().iterator(); // callback to restore existing FILTER_MATCHED property on the Exchange final Object existing = exchange.getProperty(Exchange.FILTER_MATCHED); final AsyncCallback choiceCallback = new AsyncCallback() { @Override public void done(boolean doneSync) { if (existing != null) { exchange.setProperty(Exchange.FILTER_MATCHED, existing); } else { exchange.removeProperty(Exchange.FILTER_MATCHED); } callback.done(doneSync); } }; // as we only pick one processor to process, then no need to have async callback that has a while loop as well // as this should not happen, eg we pick the first filter processor that matches, or the otherwise (if present) // and if not, we just continue without using any processor while (processors.hasNext()) { // get the next processor Processor processor = processors.next(); // evaluate the predicate on filter predicate early to be faster // and avoid issues when having nested choices // as we should only pick one processor boolean matches = false; if (processor instanceof FilterProcessor) { FilterProcessor filter = (FilterProcessor) processor; try { matches = filter.matches(exchange); // as we have pre evaluated the predicate then use its processor directly when routing processor = filter.getProcessor(); } catch (Throwable e) { exchange.setException(e); } } else { // its the otherwise processor, so its a match notFiltered++; matches = true; } // check for error if so we should break out if (!continueProcessing(exchange, "so breaking out of choice", LOG)) { break; } // if we did not match then continue to next filter if (!matches) { continue; } // okay we found a filter or its the otherwise we are processing AsyncProcessor async = AsyncProcessorConverterHelper.convert(processor); return async.process(exchange, choiceCallback); } // when no filter matches and there is no otherwise, then just continue choiceCallback.done(true); return true; }
public DelegateAsyncProcessor(AsyncProcessor processor) { if (processor == this) { throw new IllegalArgumentException("Recursive DelegateAsyncProcessor!"); } this.processor = processor; }
public AsyncProcessor getProcessor() { return processor; }
public void setProcessor(AsyncProcessor processor) { this.processor = processor; }
public boolean process(Exchange exchange, AsyncCallback callback) { Iterator<Processor> processors = getProcessors().iterator(); Exchange nextExchange = exchange; boolean first = true; while (continueRouting(processors, nextExchange)) { if (first) { first = false; } else { // prepare for next run nextExchange = createNextExchange(nextExchange); } // get the next processor Processor processor = processors.next(); AsyncProcessor async = AsyncProcessorConverterHelper.convert(processor); boolean sync = process(exchange, nextExchange, callback, processors, async); // continue as long its being processed synchronously if (!sync) { LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", exchange.getExchangeId()); // the remainder of the pipeline will be completed async // so we break out now, then the callback will be invoked which then continue routing from where we left here return false; } LOG.trace("Processing exchangeId: {} is continued being processed synchronously", exchange.getExchangeId()); // check for error if so we should break out if (!continueProcessing(nextExchange, "so breaking out of pipeline", LOG)) { break; } } // logging nextExchange as it contains the exchange that might have altered the payload and since // we are logging the completion if will be confusing if we log the original instead // we could also consider logging the original and the nextExchange then we have *before* and *after* snapshots LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), nextExchange); // copy results back to the original exchange ExchangeHelper.copyResults(exchange, nextExchange); callback.done(true); return true; }
private boolean process(final Exchange original, final Exchange exchange, final AsyncCallback callback, final Iterator<Processor> processors, final AsyncProcessor asyncProcessor) { // this does the actual processing so log at trace level LOG.trace("Processing exchangeId: {} >>> {}", exchange.getExchangeId(), exchange); // implement asynchronous routing logic in callback so we can have the callback being // triggered and then continue routing where we left //boolean sync = AsyncProcessorHelper.process(asyncProcessor, exchange, boolean sync = asyncProcessor.process(exchange, new AsyncCallback() { public void done(boolean doneSync) { // we only have to handle async completion of the pipeline if (doneSync) { return; } // continue processing the pipeline asynchronously Exchange nextExchange = exchange; while (continueRouting(processors, nextExchange)) { AsyncProcessor processor = AsyncProcessorConverterHelper.convert(processors.next()); // check for error if so we should break out if (!continueProcessing(nextExchange, "so breaking out of pipeline", LOG)) { break; } nextExchange = createNextExchange(nextExchange); doneSync = process(original, nextExchange, callback, processors, processor); if (!doneSync) { LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", exchange.getExchangeId()); return; } } ExchangeHelper.copyResults(original, nextExchange); LOG.trace("Processing complete for exchangeId: {} >>> {}", original.getExchangeId(), original); callback.done(false); } }); return sync; }
public static AsyncProcessor convert(Processor value) { if (value instanceof AsyncProcessor) { return (AsyncProcessor)value; } return new ProcessorToAsyncProcessorBridge(value); }
/** * @deprecated use {@link AsyncProcessorConverterHelper#convert(org.apache.camel.Processor)} instead */ @Deprecated public static AsyncProcessor convert(Processor value) { return AsyncProcessorConverterHelper.convert(value); }
private void doProcessParallel(final ProcessorExchangePair pair) throws Exception { final Exchange exchange = pair.getExchange(); Processor processor = pair.getProcessor(); Producer producer = pair.getProducer(); TracedRouteNodes traced = exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getTracedRouteNodes() : null; // compute time taken if sending to another endpoint StopWatch watch = null; if (producer != null) { watch = new StopWatch(); } try { // prepare tracing starting from a new block if (traced != null) { traced.pushBlock(); } if (producer != null) { EventHelper.notifyExchangeSending(exchange.getContext(), exchange, producer.getEndpoint()); } // let the prepared process it, remember to begin the exchange pair AsyncProcessor async = AsyncProcessorConverterHelper.convert(processor); pair.begin(); // we invoke it synchronously as parallel async routing is too hard AsyncProcessorHelper.process(async, exchange); } finally { pair.done(); // pop the block so by next round we have the same staring point and thus the tracing looks accurate if (traced != null) { traced.popBlock(); } if (producer != null) { long timeTaken = watch.stop(); Endpoint endpoint = producer.getEndpoint(); // emit event that the exchange was sent to the endpoint // this is okay to do here in the finally block, as the processing is not using the async routing engine //( we invoke it synchronously as parallel async routing is too hard) EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, timeTaken); } } }
/** * Constructs the bridge * * @param interceptor the interceptor to bridge * @param target the target */ public InterceptorToAsyncProcessorBridge(Processor interceptor, AsyncProcessor target) { this.interceptor = AsyncProcessorConverterHelper.convert(interceptor); this.target = target; }