public String subscribeQueueToTopic(String snsTopicArn, String sqsQueueUrl){ Map<String, String> queueAttributes = sqsClient.getQueueAttributes(new GetQueueAttributesRequest(sqsQueueUrl) .withAttributeNames(QueueAttributeName.QueueArn.toString())).getAttributes(); String sqsQueueArn = queueAttributes.get(QueueAttributeName.QueueArn.toString()); Policy policy = new Policy().withStatements( new Statement(Effect.Allow) .withId("topic-subscription-" + snsTopicArn) .withPrincipals(Principal.AllUsers) .withActions(SQSActions.SendMessage) .withResources(new Resource(sqsQueueArn)) .withConditions(ConditionFactory.newSourceArnCondition(snsTopicArn))); logger.debug("Policy: " + policy.toJson()); queueAttributes = new HashMap<String, String>(); queueAttributes.put(QueueAttributeName.Policy.toString(), policy.toJson()); sqsClient.setQueueAttributes(new SetQueueAttributesRequest(sqsQueueUrl, queueAttributes)); SubscribeResult subscribeResult = snsClient.subscribe(new SubscribeRequest() .withEndpoint(sqsQueueArn) .withProtocol("sqs") .withTopicArn(snsTopicArn)); return subscribeResult.getSubscriptionArn(); }
private void setQueuePolicy(String topicSnsArn, String queueArn, String queueURL) { logger.info("Set up policy for queue to allow SNS to publish to it"); Policy sqsPolicy = new Policy() .withStatements(new Statement(Statement.Effect.Allow) .withPrincipals(Principal.AllUsers) .withResources(new Resource(queueArn)) .withConditions(ConditionFactory.newSourceArnCondition(topicSnsArn)) .withActions(SQSActions.SendMessage)); Map<String, String> attributes = new HashMap<String,String>(); attributes.put("Policy", sqsPolicy.toJson()); SetQueueAttributesRequest setQueueAttributesRequest = new SetQueueAttributesRequest(); setQueueAttributesRequest.setQueueUrl(queueURL); setQueueAttributesRequest.setAttributes(attributes); sqsClient.setQueueAttributes(setQueueAttributesRequest); }
private String getPolicy(List<String> accountIds) { Policy policy = new Policy("AuthorizedWorkerAccessPolicy"); Statement stmt = new Statement(Effect.Allow); Action action = SQSActions.SendMessage; stmt.getActions().add(action); stmt.setResources(new LinkedList<>()); for(String accountId : accountIds) { Principal principal = new Principal(accountId); stmt.getPrincipals().add(principal); } stmt.getResources().add(new Resource(getQueueARN())); policy.getStatements().add(stmt); return policy.toJson(); }
private boolean allowQueuePublish(Statement statement) { if (statement.getEffect().equals(Statement.Effect.Allow)) { List<Action> actions = statement.getActions(); for(Action action : actions) { // .equals not properly defined on actions if (action.getActionName().equals("sqs:"+SQSActions.SendMessage.toString())) { return true; } } } return false; }
private static void allowSQSQueueToReceiveMessagesFromSNSTopic( AmazonSQS amazonSQS, String queueURL, String queueARN, String topicARN ) { GetQueueAttributesResult queueAttributesResult = amazonSQS.getQueueAttributes( new GetQueueAttributesRequest().withQueueUrl(queueURL).withAttributeNames( QueueAttributeName.Policy ) ); String policyJson = queueAttributesResult.getAttributes().get(QueueAttributeName.Policy.name()); final List<Statement> statements; if (policyJson != null) { statements = new ArrayList<>(Policy.fromJson(policyJson).getStatements()); } else { // no policies yet exist statements = new ArrayList<>(); } statements.add( new Statement(Statement.Effect.Allow) .withPrincipals(Principal.AllUsers) .withResources(new Resource(queueARN)) .withActions(SQSActions.SendMessage) .withConditions(ConditionFactory.newSourceArnCondition(topicARN)) ); Policy policy = new Policy(); policy.setStatements(statements); Map<String, String> queueAttributes = new HashMap<>(); queueAttributes.put(QueueAttributeName.Policy.name(), policy.toJson()); // Note that if the queue already has this policy, this will do nothing. amazonSQS.setQueueAttributes( new SetQueueAttributesRequest() .withQueueUrl(queueURL) .withAttributes(queueAttributes) ); }