private void handleAdd(String methodName, Object arg) { if ("addConfirmListener".equals(methodName)) confirmListeners.add((ConfirmListener) arg); else if ("addFlowListener".equals(methodName)) flowListeners.add((FlowListener) arg); else if ("addReturnListener".equals(methodName)) returnListeners.add((ReturnListener) arg); }
private void handleRemove(String methodName, Object arg) { if ("removeConfirmListener".equals(methodName)) confirmListeners.remove((ConfirmListener) arg); else if ("removeFlowListener".equals(methodName)) flowListeners.remove((FlowListener) arg); else if ("removeReturnListener".equals(methodName)) returnListeners.remove((ReturnListener) arg); }
@Override public void addReturnListener(ReturnListener listener) { channel.addReturnListener(listener); }
@Override public ReturnListener addReturnListener(ReturnCallback returnCallback) { return channel.addReturnListener(returnCallback); }
@Override public boolean removeReturnListener(ReturnListener listener) { return channel.removeReturnListener(listener); }
@Override public List<ConfigIssue> init() { List<ConfigIssue> issues = super.init(); //Validate AMQP Properties only if it is set. if (conf.basicPropertiesConfig.setAMQPMessageProperties) { RabbitUtil.buildBasicProperties( conf.basicPropertiesConfig, getContext(), builder, issues ); } //Initialize Rabbit Channel, Queue And Exchange //Also initialize dataFormatConfig RabbitUtil.initRabbitStage( getContext(), conf, conf.dataFormat, conf.dataFormatConfig, rabbitCxnManager, issues ); if (issues.isEmpty()) { //Set a return listener for mandatory flag failure. this.rabbitCxnManager.getChannel().addReturnListener( new ReturnListener() { @Override public void handleReturn( int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body ) throws IOException { LOG.error( Errors.RABBITMQ_08.getMessage(), replyCode, replyText, exchange, routingKey ); getContext().reportError(Errors.RABBITMQ_08, replyCode, replyText, exchange, routingKey); } } ); generatorFactory = conf.dataFormatConfig.getDataGeneratorFactory(); errorRecordHandler = new DefaultErrorRecordHandler(getContext()); } return issues; }
/** * Add a {@link ReturnListener}. * @param listener the listener to add */ void addReturnListener(ReturnListener listener);
/** * Remove a {@link ReturnListener}. * @param listener the listener to remove * @return <code><b>true</b></code> if the listener was found and removed, * <code><b>false</b></code> otherwise */ boolean removeReturnListener(ReturnListener listener);