@Test public void testConnectionFactoryWithOverrides() { load(TestConfiguration.class, "spring.rabbitmq.host:remote-server", "spring.rabbitmq.port:9000", "spring.rabbitmq.username:alice", "spring.rabbitmq.password:secret", "spring.rabbitmq.virtual_host:/vhost", "spring.rabbitmq.connection-timeout:123"); CachingConnectionFactory connectionFactory = this.context .getBean(CachingConnectionFactory.class); assertThat(connectionFactory.getHost()).isEqualTo("remote-server"); assertThat(connectionFactory.getPort()).isEqualTo(9000); assertThat(connectionFactory.getVirtualHost()).isEqualTo("/vhost"); DirectFieldAccessor dfa = new DirectFieldAccessor(connectionFactory); com.rabbitmq.client.ConnectionFactory rcf = (com.rabbitmq.client.ConnectionFactory) dfa .getPropertyValue("rabbitConnectionFactory"); assertThat(rcf.getConnectionTimeout()).isEqualTo(123); assertThat((Address[]) dfa.getPropertyValue("addresses")).hasSize(1); }
protected void initRecv() { checkWorkable(); String[] hpArr = getParam().getHostsAndPorts().split(","); address = new Address[hpArr.length]; for (int i = 0; i < address.length; i++) { address[i] = new Address(hpArr[i].split(":")[0], Integer.parseInt(hpArr[i].split(":")[1])); } factory = new ConnectionFactory(); factory.setUsername(getParam().getUserName()); factory.setPassword(getParam().getPassward()); factory.setVirtualHost(getParam().getVhost()); try { createConnect(); } catch (IOException e) { LogUtil.getMqSyncLog().error("Create connect failure.", e); } LogUtil.getMqSyncLog().info(" Connection and Channel Create Complete. "); }
/** * {@inheritDoc} */ @Override public String toString() { String adrList = ""; Address[] list = getServerAddressList(); for ( int i = 0; i < list.length; i++ ) { if ( i > 0 ) { adrList += ","; } adrList += list[i].getHost()+":"+list[i].getPort(); } return "StandardRabbitExchangeConfiguration" + " (username : " + getUsername() + // REMOVED as it will appear in logs. ", password : " + getPassword() + ", serverAddressList : [" + adrList + "], sslConnection : " + isSSLConnection() + ", exchangeName : " + getExchangeName() + ", exchangeDurable : " + isExchangeDurable() + ", exchangeType : " + getExchangeType() + ")"; }
public Connection connection() throws IOException { ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(configuration.username()); factory.setPassword(configuration.password()); factory.setVirtualHost(configuration.virtualhost()); String[] urls = configuration.connectionUrl().split(";"); List<Address> addresses = new LinkedList<Address>(); for (String url : urls) { String[] urlInf = url.split(":"); String hostname = urlInf[0]; int port = parseInt(urlInf[1]); addresses.add(new Address(hostname, port)); } return factory.newConnection(addresses.toArray(new Address[addresses.size()])); }
/** * Returns the addresses to attempt connections to, in round-robin order. * * @see #withAddresses(Address...) * @see #withAddresses(String) * @see #withHost(String) * @see #withHosts(String...) */ public Address[] getAddresses() { if (addresses != null) return addresses; if (hosts != null) { addresses = new Address[hosts.length]; for (int i = 0; i < hosts.length; i++) addresses[i] = new Address(hosts[i], factory.getPort()); return addresses; } Address address = factory == null ? new Address("localhost", -1) : new Address( factory.getHost(), factory.getPort()); return new Address[] { address }; }
/** * Asserts that invocation failures are rethrown when a retry policy is not set. */ public void shouldThrowOnInvocationFailureWithNoRetryPolicy() throws Throwable { config = new Config().withRetryPolicy(RetryPolicies.retryNever()); connectionFactory = mock(ConnectionFactory.class); connection = mock(Connection.class); when(connectionFactory.newConnection(any(ExecutorService.class), any(Address[].class), anyString())).thenAnswer( failNTimes(3, new ConnectException("fail"), connection, connectionHandler)); try { mockConnection(); fail(); } catch (Exception expected) { } verifyCxnCreations(1); }
@Override public void connect() throws IOException { if(config.getAddresses().isEmpty()) { logger.warning("Skipping AMQP connection because no addresses are configured"); } else { logger.info("Connecting to AMQP API at " + Joiners.onCommaSpace.join(config.getAddresses())); this.connection = this.createConnectionFactory().newConnection(this.config.getAddresses().toArray(new Address[0])); this.channel = this.connection.createChannel(); } }
@Override public void init(AbstractConfiguration config, ApplicationListenerFactory factory) { try { ConnectionFactory cf = new ConnectionFactory(); cf.setUsername(config.getString("rabbitmq.userName", ConnectionFactory.DEFAULT_USER)); cf.setPassword(config.getString("rabbitmq.password", ConnectionFactory.DEFAULT_PASS)); cf.setVirtualHost(config.getString("rabbitmq.virtualHost", ConnectionFactory.DEFAULT_VHOST)); cf.setAutomaticRecoveryEnabled(true); cf.setExceptionHandler(new RabbitMQExceptionHandler()); this.conn = cf.newConnection(Address.parseAddresses(config.getString("rabbitmq.addresses"))); this.channel = conn.createChannel(); logger.trace("Initializing RabbitMQ application resources ..."); APPLICATION_TOPIC = config.getString("communicator.application.topic"); this.channel.exchangeDeclare(APPLICATION_TOPIC, "topic", true); logger.trace("Initializing RabbitMQ application consumer's workers ..."); Channel consumerChan = this.conn.createChannel(); consumerChan.queueDeclare(config.getString("rabbitmq.app.queueName"), true, false, true, null); consumerChan.queueBind(config.getString("rabbitmq.app.queueName"), APPLICATION_TOPIC, config.getString("rabbitmq.app.routingKey")); consumerChan.basicConsume(config.getString("rabbitmq.app.queueName"), true, new RabbitMQApplicationConsumer(consumerChan, factory.newListener())); } catch (IOException | TimeoutException e) { logger.error("Failed to connect to RabbitMQ servers", e); throw new IllegalStateException("Init RabbitMQ communicator failed"); } }
public synchronized void prepare() throws IOException, TimeoutException { if (rabbitMqChannelPool == null || rabbitMqChannelPool.isClosed()) { LOGGER.info("Creating RabbitMQ channel pool..."); ConnectionFactory rabbitMqConnectionFactory = createConnectionFactory(); if (rabbitMqConfig.hasAddresses()) { Address[] addresses = Address.parseAddresses(rabbitMqConfig.getAddresses()); this.rabbitMqChannelFactory = new RabbitMqChannelFactory(rabbitMqConnectionFactory, addresses); } else { this.rabbitMqChannelFactory = new RabbitMqChannelFactory(rabbitMqConnectionFactory); } this.rabbitMqChannelPool = createRabbitMqChannelPool(rabbitMqChannelFactory); LOGGER.info("RabbitMQ channel pool was created"); } }
@Test public void prepareWithAddresses() throws IOException, TimeoutException { String addresses = "10.189.21.119:8080,10.189.21.118:8181"; RabbitMqConfig rabbitMqConfig = new RabbitMqConfigBuilder() .setAddresses(addresses) .build(); RabbitMqChannelProvider rabbitMqChannelProvider = spy(new RabbitMqChannelProvider(rabbitMqConfig)); doReturn(mockConnectionFactory).when(rabbitMqChannelProvider).createConnectionFactory(); rabbitMqChannelProvider.prepare(); verify(mockConnectionFactory, times(1)).newConnection(Address.parseAddresses(addresses)); }
/** * */ private void initAddresses(String hosts,int port) { String[] servers = hosts.split(","); addresses = new Address[servers.length]; for(int i=0;i<servers.length;i++){ addresses[i] = new Address(servers[i],port); } }
static List<Address> convertAddresses(String addresses) { String[] addressStrings = addresses.split(","); Address[] addressArray = new Address[addressStrings.length]; for (int i = 0; i < addressStrings.length; i++) { String[] splitAddress = addressStrings[i].split(":"); String host = splitAddress[0]; Integer port = null; try { if (splitAddress.length == 2) port = Integer.parseInt(splitAddress[1]); } catch (NumberFormatException ignore) { } addressArray[i] = (port != null) ? new Address(host, port) : new Address(host); } return Arrays.asList(addressArray); }
@Test public void addresses() { context = new XmlBeans("" + "<bean id=\"sender\" class=\"zipkin2.reporter.beans.RabbitMQSenderFactoryBean\">\n" + " <property name=\"addresses\" value=\"localhost\"/>\n" + "</bean>" ); assertThat(context.getBean("sender", RabbitMQSender.class)) .extracting("addresses") .containsExactly(Arrays.asList(new Address("localhost"))); }
/** * If this option is set, camel-rabbitmq will try to create connection based on the setting of option addresses. * The addresses value is a string which looks like "server1:12345, server2:12345" */ public void setAddresses(String addresses) { Address[] addressArray = Address.parseAddresses(addresses); if (addressArray.length > 0) { this.addresses = addressArray; } }
@Test public void brokerEndpointAddressesSettings() throws Exception { RabbitMQEndpoint endpoint = context.getEndpoint("rabbitmq:localhost/exchange?addresses=server1:12345,server2:12345", RabbitMQEndpoint.class); assertEquals("Wrong size of endpoint addresses.", 2, endpoint.getAddresses().length); assertEquals("Get a wrong endpoint address.", new Address("server1", 12345), endpoint.getAddresses()[0]); assertEquals("Get a wrong endpoint address.", new Address("server2", 12345), endpoint.getAddresses()[1]); }
@Test public void shouldParseExchangeConfigurationCorrectly() { Properties exchangeProperties = propReader.readPropertiesFailFast("exchange"); // validate that the property is correctly read assertEquals("false", exchangeProperties.getProperty("exchangeDurable")); // Validate that the configuration matches the values defined in the property file StandardRabbitExchangeConfiguration config = new StandardRabbitExchangeConfiguration(exchangeProperties); assertEquals( "guest", config.getUsername()); assertEquals( "pwd", config.getPassword()); Address[] adrList = config.getServerAddressList(); assertEquals(3, adrList.length); assertEquals( "10.11.111.199", adrList[0].getHost()); assertEquals( 5672, adrList[0].getPort()); assertEquals( "ecosensemq01.cs.au.dk", adrList[1].getHost()); assertEquals( 2332, adrList[1].getPort()); assertEquals( "ecosensemq02.cs.au.dk", adrList[2].getHost()); assertEquals( 5671, adrList[2].getPort()); assertTrue("SSL should be true", config.isSSLConnection()); assertEquals( "ecosense-exchange", config.getExchangeName()); assertFalse("Exchange should not be durable", config.isExchangeDurable()); assertEquals( RabbitConstants.TOPIC, config.getExchangeType()); // validate nice output in toString assertTrue("toString contains server list", config.toString().contains("10.11.111.199:5672,ecosensemq01.cs.au.dk:2332,ecosensemq02.cs.au.dk:5671")); }
/** * Initializes the <code>StandardRabbitExchangeConfiguration</code> object. * * @param username - * @param password - * @param serverAddressList - * @param sslConection - * @param exchangeName - * @param exchangeDurable - * @param exchangeType - */ private void init(String username, String password, Address[] serverAddressList, boolean sslConnection, String exchangeName, boolean exchangeDurable, String exchangeType) { this.username = username; this.password = password; this.serverAddressList = serverAddressList; this.sslConnection = sslConnection; this.exchangeName = exchangeName; this.exchangeDurable = exchangeDurable; this.exchangeType = exchangeType; }
public static Connection createConnection(ConnectionFactory factory, Address[] addresses) throws IOException { Connection connection = null; try { connection = factory.newConnection(addresses); } catch (TimeoutException e) { log.warn("TimeoutException", e); } return connection; }
private void assertConnectorPropertiesMatchHosts(ConnectionFactory connector, List<String> uriStrings) throws Exception { Address[] addresses = (Address[]) ReflectionTestUtils.getField(connector, "addresses"); assertNotNull(addresses); assertEquals(uriStrings.size(), addresses.length); for (int i = 0; i < uriStrings.size(); i++) { URI uri = new URI(uriStrings.get(i)); assertEquals(uri.getHost(), addresses[i].getHost()); assertEquals(uri.getPort(), addresses[i].getPort()); } }
/** * Returns an array of Addresses for the {@code hosts} and {@code port}. */ public static Address[] addressesFor(String[] hosts, int port) { Address[] hostAddresses = new Address[hosts.length]; for (int i = 0; i < hosts.length; i++) hostAddresses[i] = new Address(hosts[i].trim(), port); return hostAddresses; }
protected void mockConnection() throws IOException, TimeoutException { if (connectionFactory == null) { mockConnectionOnly(); connectionFactory = mock(ConnectionFactory.class); when(connectionFactory.getVirtualHost()).thenReturn("/"); when(connectionFactory.newConnection(any(ExecutorService.class), any(Address[].class), anyString())) .thenReturn(connection); } if (options == null) options = new ConnectionOptions().withHost("test-host"); options.withConnectionFactory(connectionFactory); if (config == null) config = new Config().withRetryPolicy( RetryPolicies.retryAlways().withInterval(Duration.millis(10))).withRecoveryPolicy( RecoveryPolicies.recoverAlways()); if (connectionHandler == null) { connectionHandler = new ConnectionHandler(options, config, Connection.class.getClassLoader()); connectionProxy = (ConfigurableConnection) Proxy.newProxyInstance(Connection.class.getClassLoader(), new Class<?>[] {ConfigurableConnection.class}, connectionHandler); connectionHandler.createConnection(connectionProxy); channels = new HashMap<Integer, MockChannel>(); } }
/** * Asserts that a retryable connect failure results in the connection eventually succeeding. */ public void shouldHandleRetryableConnectFailure() throws Throwable { mockConnectionOnly(); connectionFactory = mock(ConnectionFactory.class); when(connectionFactory.newConnection(any(ExecutorService.class), any(Address[].class), anyString())).thenAnswer( failNTimes(3, new ConnectException("fail"), connection, connectionHandler)); mockConnection(); verifyCxnCreations(4); }
/** * Asserts that an non-retryable connect failure results in the connection being rethrown. */ public void shouldHandleNonRetryableConnectFailure() throws Throwable { connectionFactory = mock(ConnectionFactory.class); connection = mock(Connection.class); when(connectionFactory.newConnection(any(ExecutorService.class), any(Address[].class), anyString())).thenAnswer( failNTimes(3, new RuntimeException(), connection, connectionHandler)); try { mockConnection(); fail(); } catch (Exception expected) { } verifyCxnCreations(1); }
public HaConnection newConnection(ExecutorService executor, Address[] addrs) throws IOException { Connection target = null; int tries = 0; while(target == null && tries++ < maxReconnectTries) { try { if(Thread.interrupted()) { Thread.currentThread().interrupt(); throw new InterruptedException("Connection process interrupted after "+tries+" tries"); } target = newDelegateConnection(executor, addrs); } catch (Exception e) { if(e instanceof IOException && !HaUtils.shouldReconnect(e)) { throw (IOException)e; } else if (! (e instanceof IOException)) { throw new IOException("Unable to connect to RabbitMQ", e); } try { Thread.sleep(reconnectDelay); } catch (InterruptedException e1) { Thread.currentThread().interrupt(); throw new RuntimeException("Connect process was interrupted"); } log.warn("Unable to connect to RabbitMQ... trying again..."); } } if(target == null) { throw new RuntimeException("Unable to connect to RabbitMQ. Gave up after "+tries+" tries."); } return createConnectionProxyInstance(executor, addrs, target); }
@Override public List<Address> getAddresses() { return config.getStringList(ADDRESSES_PATH).stream().map(Address::new).collect(Collectors.toList()); }
public RabbitMqChannelFactory(ConnectionFactory rabbitMqConnectionFactory, Address[] addresses) throws IOException, TimeoutException { this.rabbitMqConnection = rabbitMqConnectionFactory.newConnection(addresses); }
public Address[] getAddresses() { return addresses; }
/** * Constructs a <code>StandardRabbitExchangeConfiguration</code>. * * @param exchangeProperties */ public StandardRabbitExchangeConfiguration(Properties exchangeProperties) { String username = FailFast.readProperty(exchangeProperties, USERNAME); String password = FailFast.readProperty(exchangeProperties, PASSWORD); boolean sslConnection = FailFast.readProperty(exchangeProperties, SSL_CONNECTION).equalsIgnoreCase("true"); String exchangeName = FailFast.readProperty(exchangeProperties, EXCHANGE_NAME); boolean exchangeDurable = FailFast.readProperty(exchangeProperties, EXCHANGE_DURABLE).equalsIgnoreCase("true"); String exchangeType = FailFast.readProperty(exchangeProperties, EXCHANGE_TYPE); String addressString = FailFast.readProperty(exchangeProperties, SERVER_ADDRESS_LIST); String[] addressStrings = addressString.split(","); List<Address> addresses = new ArrayList<Address>(5); for (String address : addressStrings) { String[] splitAddress = address.split(":"); String host = splitAddress[0]; int port = 5672; if (sslConnection) { port = 5671; } if (splitAddress.length > 1) { try { port = Integer.parseInt(splitAddress[1]); } catch (Exception e) { Logger log = LoggerFactory.getLogger(StandardRabbitExchangeConfiguration.class); log.error("Integer parsing error on port number from address property", e); System.out.println("Port number error in property file, review the log..."); // Fail fast, no need to carry on before the property file has been fixed. System.exit(-1); } } addresses.add(new Address(host, port)); } Address[] serverAddressList = addresses.toArray(new Address[0]); init(username, password, serverAddressList, sslConnection, exchangeName, exchangeDurable, exchangeType); }
/** * {@inheritDoc} */ @Override public Address[] getServerAddressList() { return serverAddressList; }
@Override public Address[] getServerAddressList() { // the default port is 5672, and the 'cluster' is a single machine Address[] clusterAddr = new Address[] { new Address(mqServerName, 5672) }; return clusterAddr; }
public RabbitMQDatabase(List<Address> addressList, String username, String password) { this.addressList = addressList; this.username = username; this.password = password; }
public Connection getConnection() throws IOException { return factory.newConnection(addressList.toArray(new Address[addressList.size()])); }
public void initRabbitMQDatabase(List<Address> addressList, String username, String password) { rabbitMQDatabase = new RabbitMQDatabase(addressList, username, password); rabbitMQDatabase.setupDatabase(); }
void verifyCxnCreations(int expectedCreations) throws IOException, TimeoutException { verify(connectionFactory, times(expectedCreations)).newConnection(any(ExecutorService.class), any(Address[].class), anyString()); }
public abstract HaConnection newConnection(ExecutorService executor, Address[] addrs) throws IOException;
public abstract HaConnection newConnection(Address[] addrs) throws IOException;
protected HaConnection createConnectionProxyInstance(ExecutorService executor, final Address[] addrs, final Connection targetConnection) { ReconnectionFactory factory = new ReconnectionFactory(this, executor, addrs); return new HaConnection(factory, targetConnection, reconnectDelay, maxReconnectTries); }
protected Connection newDelegateConnection(ExecutorService executor, Address[] addrs) throws IOException { return delegate.newConnection(executor, addrs); }
public HaConnection newConnection(Address[] addrs) throws IOException { return this.newConnection(null, addrs); }
/** * Constructs a <code>StandardRabbitExchangeConfiguration</code>. * * @param username - * @param password - * @param serverAddressList - * @param sslConnection - * @param exchangeName - * @param exchangeDurable - * @param exchangeType - */ public StandardRabbitExchangeConfiguration(String username, String password, Address[] serverAddressList, boolean sslConnection, String exchangeName, boolean exchangeDurable, String exchangeType) { init(username, password, serverAddressList, sslConnection, exchangeName, exchangeDurable, exchangeType); }