public static void main(String[] args) { AmazonDynamoDBClient client = new AmazonDynamoDBClient(); client.setEndpoint("http://localhost:8000"); DynamoDB dynamoDB = new DynamoDB(client); Table table = dynamoDB.getTable("Movies"); ScanSpec scanSpec = new ScanSpec() .withProjectionExpression("#yr, title, info.rating") .withFilterExpression("#yr between :start_yr and :end_yr") .withNameMap(new NameMap().with("#yr", "year")) .withValueMap(new ValueMap().withNumber(":start_yr", 1950).withNumber(":end_yr", 1959)); ItemCollection<ScanOutcome> items = table.scan(scanSpec); Iterator<Item> iter = items.iterator(); while (iter.hasNext()) { Item item = iter.next(); System.out.println(item.toString()); } }
private static void findProductsForPriceLessThanZero() { Table table = dynamoDB.getTable(tableName); Map<String, Object> expressionAttributeValues = new HashMap<String, Object>(); expressionAttributeValues.put(":pr", 100); ItemCollection<ScanOutcome> items = table.scan( "Price < :pr", //FilterExpression "Id, Title, ProductCategory, Price", //ProjectionExpression null, //ExpressionAttributeNames - not used in this example expressionAttributeValues); System.out.println("Scan of " + tableName + " for items with a price less than 100."); Iterator<Item> iterator = items.iterator(); while (iterator.hasNext()) { System.out.println(iterator.next().toJSONPretty()); } }
private <V> List<V> toList(ScanOutcome outcome, Class<V> type) { List<Item> items = outcome.getItems(); List<V> result = new ArrayList<V>(items.size()); for ( Item item : items ) { result.add(fromItem(_encryption.decrypt(item), type)); } return Collections.unmodifiableList(result); }
private ScanOutcome scan(ScanSpec scanSpec, PageIterator pageIterator) { if ( null == _convertMarker ) { throw new IllegalStateException("Index must first be initialized with ConvertMarker"); } if ( pageIterator.getPageSize() <= 0 ) { return new ScanOutcome(new ScanResult()); } ItemCollection<ScanOutcome> itemCollection = maybeBackoff(true, () -> _scan.scan(withMarker(scanSpec, pageIterator))); if ( null != itemCollection ) { Iterator<Page<Item, ScanOutcome>> iterator = itemCollection.pages().iterator(); if ( iterator.hasNext() ) { ScanOutcome outcome = maybeBackoff(true, () -> iterator.next().getLowLevelResult()); ScanResult result = outcome.getScanResult(); Map<String,AttributeValue> lastKey = result.getLastEvaluatedKey(); if ( null != lastKey && ! lastKey.isEmpty() ) { pageIterator.setMarker(toMarker(lastKey, false)); } else { pageIterator.setMarker(null); } return outcome; } } pageIterator.setMarker(null); return new ScanOutcome(new ScanResult()); }
@Override public void run() { System.out.println("Scanning " + tableName + " segment " + segment + " out of " + totalSegments + " segments " + itemLimit + " items at a time..."); int totalScannedItemCount = 0; Table table = dynamoDB.getTable(tableName); try { ScanSpec spec = new ScanSpec() .withMaxResultSize(itemLimit) .withTotalSegments(totalSegments) .withSegment(segment); ItemCollection<ScanOutcome> items = table.scan(spec); Iterator<Item> iterator = items.iterator(); Item currentItem = null; while (iterator.hasNext()) { totalScannedItemCount++; currentItem = iterator.next(); System.out.println(currentItem.toString()); } } catch (Exception e) { System.err.println(e.getMessage()); } finally { System.out.println("Scanned " + totalScannedItemCount + " items from segment " + segment + " out of " + totalSegments + " of " + tableName); } }
private static int toCount(ScanOutcome outcome) { Integer result = outcome.getScanResult().getCount(); return ( null == result ) ? 0 : result.intValue(); }
private List<T> toList(ScanOutcome outcome) { return toList(outcome, _clazz); }
@Override public Object handleRequest(SegmentScannerInput input, Context context) { context.getLogger().log("Input: " + input.toJson() + "\n"); context.getLogger().log("Start scanning segment " + input.getSegment() + "\n"); DynamoDB dynamodb = new DynamoDB(Regions.US_WEST_2); // update tracking table in DynamoDB stating that we're in progress dynamodb.getTable(FUNCTION_TRACKER_TABLE_NAME).putItem( new Item().withPrimaryKey(SEGMENT, input.getSegment()) .withString(STATUS, STATUS_IN_PROGRESS)); ScanSpec scanSpec = new ScanSpec() .withMaxPageSize(MAX_PAGE_SIZE) .withSegment(input.getSegment()) .withTotalSegments(input.getTotalSegments()) .withConsistentRead(true) .withMaxResultSize(MAX_RESULT_SIZE) .withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL); // if resuming an in-progress segment, specify the start key here if (input.getStartScore() != null) { scanSpec.withExclusiveStartKey(SCORE_ID, input.getStartScore()); } RateLimiter rateLimiter = RateLimiter.create(input.getMaxConsumedCapacity()); Map<String, AttributeValue> lastEvaluatedKey = null; Table scoresTable = dynamodb.getTable(SCORE_TABLE_NAME); for (Page<Item, ScanOutcome> scanResultPage : scoresTable.scan(scanSpec).pages()) { // process items for (Item item : scanResultPage) { DataTransformer.HIGH_SCORES_BY_DATE_TRANSFORMER.transform(item, dynamodb); } /* * After reading each page, we acquire the consumed capacity from * the RateLimiter. * * For more information on using RateLimiter with DynamoDB scans, * see "Rate Limited Scans in Amazon DynamoDB" * on the AWS Java Development Blog: * https://java.awsblog.com/post/Tx3VAYQIZ3Q0ZVW */ ScanResult scanResult = scanResultPage.getLowLevelResult().getScanResult(); lastEvaluatedKey = scanResult.getLastEvaluatedKey(); double consumedCapacity = scanResult.getConsumedCapacity().getCapacityUnits(); rateLimiter.acquire((int)Math.round(consumedCapacity)); // forego processing additional pages if we're running out of time if (context.getRemainingTimeInMillis() < REMAINING_TIME_CUTOFF) { break; } } if (lastEvaluatedKey != null && !lastEvaluatedKey.isEmpty()) { Entry<String, AttributeValue> entry = lastEvaluatedKey.entrySet() .iterator().next(); String lastScoreId = entry.getValue().getS(); dynamodb.getTable(FUNCTION_TRACKER_TABLE_NAME).putItem( new Item() .withPrimaryKey(SEGMENT, input.getSegment()) .withString(STATUS, STATUS_INCOMPLETE) .withString(LAST_SCORE_ID, lastScoreId)); return false; } // update tracking table in DynamoDB stating that we're done dynamodb.getTable(FUNCTION_TRACKER_TABLE_NAME).putItem( new Item().withPrimaryKey(SEGMENT, input.getSegment()) .withString(STATUS, STATUS_DONE)); context.getLogger().log("Finish scanning segment " + input.getSegment() + "\n"); return true; }