public String subscribeQueueToTopic(String snsTopicArn, String sqsQueueUrl){ Map<String, String> queueAttributes = sqsClient.getQueueAttributes(new GetQueueAttributesRequest(sqsQueueUrl) .withAttributeNames(QueueAttributeName.QueueArn.toString())).getAttributes(); String sqsQueueArn = queueAttributes.get(QueueAttributeName.QueueArn.toString()); Policy policy = new Policy().withStatements( new Statement(Effect.Allow) .withId("topic-subscription-" + snsTopicArn) .withPrincipals(Principal.AllUsers) .withActions(SQSActions.SendMessage) .withResources(new Resource(sqsQueueArn)) .withConditions(ConditionFactory.newSourceArnCondition(snsTopicArn))); logger.debug("Policy: " + policy.toJson()); queueAttributes = new HashMap<String, String>(); queueAttributes.put(QueueAttributeName.Policy.toString(), policy.toJson()); sqsClient.setQueueAttributes(new SetQueueAttributesRequest(sqsQueueUrl, queueAttributes)); SubscribeResult subscribeResult = snsClient.subscribe(new SubscribeRequest() .withEndpoint(sqsQueueArn) .withProtocol("sqs") .withTopicArn(snsTopicArn)); return subscribeResult.getSubscriptionArn(); }
private void setQueuePolicy(String topicSnsArn, String queueArn, String queueURL) { logger.info("Set up policy for queue to allow SNS to publish to it"); Policy sqsPolicy = new Policy() .withStatements(new Statement(Statement.Effect.Allow) .withPrincipals(Principal.AllUsers) .withResources(new Resource(queueArn)) .withConditions(ConditionFactory.newSourceArnCondition(topicSnsArn)) .withActions(SQSActions.SendMessage)); Map<String, String> attributes = new HashMap<String,String>(); attributes.put("Policy", sqsPolicy.toJson()); SetQueueAttributesRequest setQueueAttributesRequest = new SetQueueAttributesRequest(); setQueueAttributesRequest.setQueueUrl(queueURL); setQueueAttributesRequest.setAttributes(attributes); sqsClient.setQueueAttributes(setQueueAttributesRequest); }
/** * Generate a policy that will allow messages published to an SNS topic * to be sent to all queues subscribed to that topic * @param topicArn the topic to create policy for * @return The policy */ private Policy makePolicy(String topicArn) { //SQSActions.SendMessage does not work!! Action sendMessageAction = new Action() { @Override public String getActionName() { return "SQS:SendMessage"; } }; return new Policy().withId("sns2sqs").withStatements( new Statement(Statement.Effect.Allow) .withPrincipals(Principal.AllUsers) .withActions(sendMessageAction) .withResources(new Resource(queueArn)) .withConditions(new ArnCondition(ArnCondition.ArnComparisonType.ArnEquals, ConditionFactory.SOURCE_ARN_CONDITION_KEY, topicArn))); }
@Test public void testMultipleStatements() throws Exception { Policy policy = new Policy("S3PolicyId1"); policy.withStatements( new Statement(Effect.Allow) .withId("0") .withPrincipals(Principal.AllUsers) .withActions(new TestAction("action1")) .withResources(new Resource("resource")) .withConditions( new IpAddressCondition("192.168.143.0/24"), new IpAddressCondition(IpAddressComparisonType.NotIpAddress, "192.168.143.188/32")), new Statement(Effect.Deny) .withId("1") .withPrincipals(Principal.AllUsers) .withActions(new TestAction("action2")) .withResources(new Resource("resource")) .withConditions(new IpAddressCondition("10.1.2.0/24"))); policy = Policy.fromJson(policy.toJson()); assertEquals(2, policy.getStatements().size()); assertEquals("S3PolicyId1", policy.getId()); List<Statement> statements = new LinkedList<Statement>(policy.getStatements()); assertEquals(Effect.Allow, statements.get(0).getEffect()); assertEquals("0", statements.get(0).getId()); assertEquals(1, statements.get(0).getPrincipals().size()); assertEquals("*", statements.get(0).getPrincipals().get(0).getId()); assertEquals("AWS", statements.get(0).getPrincipals().get(0).getProvider()); assertEquals(1, statements.get(0).getResources().size()); assertEquals("resource", statements.get(0).getResources().get(0).getId()); assertEquals(1, statements.get(0).getActions().size()); assertEquals("action1", statements.get(0).getActions().get(0).getActionName()); assertEquals(2, statements.get(0).getConditions().size()); assertEquals("IpAddress", statements.get(0).getConditions().get(0).getType()); assertEquals(ConditionFactory.SOURCE_IP_CONDITION_KEY, statements.get(0).getConditions().get(0).getConditionKey()); assertEquals(1, statements.get(0).getConditions().get(0).getValues().size()); assertEquals("192.168.143.0/24", statements.get(0).getConditions().get(0).getValues().get(0)); assertEquals("NotIpAddress", statements.get(0).getConditions().get(1).getType()); assertEquals(1, statements.get(0).getConditions().get(1).getValues().size()); assertEquals("192.168.143.188/32", statements.get(0).getConditions().get(1).getValues().get(0)); assertEquals(ConditionFactory.SOURCE_IP_CONDITION_KEY, statements.get(1).getConditions().get(0).getConditionKey()); assertEquals(Effect.Deny, statements.get(1).getEffect()); assertEquals("1", statements.get(1).getId()); assertEquals(1, statements.get(1).getPrincipals().size()); assertEquals("*", statements.get(1).getPrincipals().get(0).getId()); assertEquals("AWS", statements.get(1).getPrincipals().get(0).getProvider()); assertEquals(1, statements.get(1).getResources().size()); assertEquals("resource", statements.get(1).getResources().get(0).getId()); assertEquals(1, statements.get(1).getActions().size()); assertEquals("action2", statements.get(1).getActions().get(0).getActionName()); assertEquals(1, statements.get(1).getConditions().size()); assertEquals("IpAddress", statements.get(1).getConditions().get(0).getType()); assertEquals(ConditionFactory.SOURCE_IP_CONDITION_KEY, statements.get(0).getConditions().get(0).getConditionKey()); assertEquals(1, statements.get(0).getConditions().get(0).getValues().size()); assertEquals("10.1.2.0/24", statements.get(1).getConditions().get(0).getValues().get(0)); }
private static void allowSQSQueueToReceiveMessagesFromSNSTopic( AmazonSQS amazonSQS, String queueURL, String queueARN, String topicARN ) { GetQueueAttributesResult queueAttributesResult = amazonSQS.getQueueAttributes( new GetQueueAttributesRequest().withQueueUrl(queueURL).withAttributeNames( QueueAttributeName.Policy ) ); String policyJson = queueAttributesResult.getAttributes().get(QueueAttributeName.Policy.name()); final List<Statement> statements; if (policyJson != null) { statements = new ArrayList<>(Policy.fromJson(policyJson).getStatements()); } else { // no policies yet exist statements = new ArrayList<>(); } statements.add( new Statement(Statement.Effect.Allow) .withPrincipals(Principal.AllUsers) .withResources(new Resource(queueARN)) .withActions(SQSActions.SendMessage) .withConditions(ConditionFactory.newSourceArnCondition(topicARN)) ); Policy policy = new Policy(); policy.setStatements(statements); Map<String, String> queueAttributes = new HashMap<>(); queueAttributes.put(QueueAttributeName.Policy.name(), policy.toJson()); // Note that if the queue already has this policy, this will do nothing. amazonSQS.setQueueAttributes( new SetQueueAttributesRequest() .withQueueUrl(queueURL) .withAttributes(queueAttributes) ); }