@Override public Query convertToQuery(Row row) { if (filtersEntireRowKey(row)) { return convertToGet(row); } else if (filtersRowKeyPrefix(row)) { return convertToScan(row); } else { throw new RuntimeException("Default HBase serde only supports full row key or prefix row key reads."); } }
@Override public Iterable<Row> getExistingForFilters(Iterable<Row> filters) throws Exception { LOG.debug("Fetching filter rows from table: {}", tableName.toString()); List<Row> filterResults = Lists.newArrayList(); try (Table table = getConnection(config).getTable(tableName)) { List<Get> gets = Lists.newArrayList(); List<Scan> scans = Lists.newArrayList(); for (Row filter : filters) { // Construct row key from key columns Query query = getSerde(config).convertToQuery(filter); LOG.debug("Adding filter: {}", query); if (query instanceof Get) { gets.add((Get)query); } else if (query instanceof Scan) { scans.add((Scan)query); } else { throw new RuntimeException("Unsupported HBase query class: " + query.getClass().getName()); } } List<Result> results = Lists.newArrayList(); if (gets.size() > 0) { results.addAll(Lists.newArrayList(table.get(gets))); } if (scans.size() > 0) { Scan mergedScan = HBaseUtils.mergeRangeScans(scans); results.addAll(Lists.newArrayList(table.getScanner(mergedScan))); } filterResults.addAll(getSerde(config).convertFromResults(results)); } return filterResults; }
@Override public Query call(Row row) throws Exception { return serde.convertToQuery(row); }
@Test public void testGetTasksWithFilter() throws Exception { TaskMonitor tm = new TaskMonitor(new Configuration()); assertTrue("Task monitor should start empty", tm.getTasks().isEmpty()); // Create 5 general tasks tm.createStatus("General task1"); tm.createStatus("General task2"); tm.createStatus("General task3"); tm.createStatus("General task4"); tm.createStatus("General task5"); // Create 5 rpc tasks, and mark 1 completed int length = 5; ArrayList<MonitoredRPCHandler> rpcHandlers = new ArrayList<>(length); for (int i = 0; i < length; i++) { MonitoredRPCHandler rpcHandler = tm.createRPCStatus("Rpc task" + i); rpcHandlers.add(rpcHandler); } // Create rpc opertions byte[] row = new byte[] { 0x01 }; Mutation m = new Put(row); Query q = new Scan(); String notOperation = "for test"; rpcHandlers.get(0).setRPC("operations", new Object[]{ m, q }, 3000); rpcHandlers.get(1).setRPC("operations", new Object[]{ m, q }, 3000); rpcHandlers.get(2).setRPC("operations", new Object[]{ m, q }, 3000); rpcHandlers.get(3).setRPC("operations", new Object[]{ notOperation }, 3000); rpcHandlers.get(4).setRPC("operations", new Object[]{ m, q }, 3000); MonitoredRPCHandler completed = rpcHandlers.get(4); completed.markComplete("Completed!"); // Test get tasks with filter List<MonitoredTask> generalTasks = tm.getTasks("general"); assertEquals(5, generalTasks.size()); List<MonitoredTask> handlerTasks = tm.getTasks("handler"); assertEquals(5, handlerTasks.size()); List<MonitoredTask> rpcTasks = tm.getTasks("rpc"); // The last rpc handler is stopped assertEquals(4, rpcTasks.size()); List<MonitoredTask> operationTasks = tm.getTasks("operation"); // Handler 3 doesn't handle Operation. assertEquals(3, operationTasks.size()); tm.shutdown(); }
/** * Convert the given {@link Row} to a {@link Query} * @param row * @return a {@link Query} */ Query convertToQuery(Row row);