@Override public void setup(Context context) throws IOException { conf = context.getConfiguration(); recordsToWrite = conf.getLong(NUM_TO_WRITE_KEY, NUM_TO_WRITE_DEFAULT); String tableName = conf.get(TABLE_NAME_KEY, TABLE_NAME_DEFAULT); numBackReferencesPerRow = conf.getInt(NUM_BACKREFS_KEY, NUM_BACKREFS_DEFAULT); this.connection = ConnectionFactory.createConnection(conf); mutator = connection.getBufferedMutator( new BufferedMutatorParams(TableName.valueOf(tableName)) .writeBufferSize(4 * 1024 * 1024)); String taskId = conf.get("mapreduce.task.attempt.id"); Matcher matcher = Pattern.compile(".+_m_(\\d+_\\d+)").matcher(taskId); if (!matcher.matches()) { throw new RuntimeException("Strange task ID: " + taskId); } shortTaskId = matcher.group(1); rowsWritten = context.getCounter(Counters.ROWS_WRITTEN); refsWritten = context.getCounter(Counters.REFERENCES_WRITTEN); }
@Override protected void setup(final Context context) throws IOException, InterruptedException { super.setup(context); final Configuration configuration = context.getConfiguration(); skipWAL = configuration.getBoolean(Constants.MAPREDUCE_INDEX_SKIP_WAL, false); TableName outputTable = TableName.valueOf(configuration.get(TableOutputFormat.OUTPUT_TABLE)); BufferedMutator.ExceptionListener listener = (e, mutator) -> { for (int i = 0; i < e.getNumExceptions(); i++) { LOG.warn("Failed to send put: " + e.getRow(i)); } }; BufferedMutatorParams mutatorParms = new BufferedMutatorParams(outputTable).listener(listener); mutator = getGraph().connection().getBufferedMutator(mutatorParms); }
public BufferedMutator getBufferedMutator(final ExceptionListener exceptionListener) { final BufferedMutatorParams params = new BufferedMutatorParams(getName()).listener(exceptionListener); BufferedMutator bufferedMutator; try { bufferedMutator = hBaseConnection.getConnection().getBufferedMutator(params); } catch (final Exception e) { throw new HBaseException("Unable to create buffered mutator for table " + getDisplayName(), e); } return bufferedMutator; }
@Override protected void instantiateHTable() throws IOException { for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) { BufferedMutatorParams params = new BufferedMutatorParams(getTableName(i)); params.writeBufferSize(4 * 1024 * 1024); BufferedMutator table = connection.getBufferedMutator(params); this.tables[i] = table; } }
private static BufferedMutator getBufferedMutator(HBaseGraph graph, String tableName) { try { HBaseGraphConfiguration config = graph.configuration(); TableName name = HBaseGraphUtils.getTableName(config, tableName); BufferedMutatorParams params = new BufferedMutatorParams(name).listener(LISTENER); return graph.connection().getBufferedMutator(params); } catch (IOException e) { throw new HBaseGraphException(e); } }
@Test @Ignore(value="We need a better test now that BigtableBufferedMutator has different logic") public void testBufferSizeFlush() throws Exception { int maxSize = 1024; BufferedMutatorParams params = new BufferedMutatorParams(TABLE_NAME) .writeBufferSize(maxSize); try (BufferedMutator mutator = getConnection().getBufferedMutator(params)) { // HBase 1.0.0 has a bug in it. It returns maxSize instead of the buffer size for // getWriteBufferSize. https://issues.apache.org/jira/browse/HBASE-13113 Assert.assertTrue( 0 == mutator.getWriteBufferSize() || maxSize == mutator.getWriteBufferSize()); Put put = getPut(); mutator.mutate(put); Assert.assertTrue(mutator.getWriteBufferSize() > 0); Put largePut = new Put(dataHelper.randomData("testrow-")); largePut.addColumn(COLUMN_FAMILY, qualifier, Bytes.toBytes(RandomStringUtils.randomAlphanumeric(maxSize * 2))); long heapSize = largePut.heapSize(); Assert.assertTrue("largePut heapsize is : " + heapSize, heapSize > maxSize); mutator.mutate(largePut); // HBase 1.0.0 has a bug in it. It returns maxSize instead of the buffer size for // getWriteBufferSize. https://issues.apache.org/jira/browse/HBASE-13113 Assert.assertTrue( 0 == mutator.getWriteBufferSize() || maxSize == mutator.getWriteBufferSize()); } }
/** * msg 包括: * * @param tablename * @param entity: * rowkey->cf:column->value 其中增加对_timestamp字段的处理 */ @SuppressWarnings({ "unchecked", "rawtypes" }) @Override protected boolean insert(DataStoreMsg msg) { // 根据TABLE名进行合法验证 Map[] maps = (Map[]) adaptor.prepareInsertObj(msg, datasource.getDataStoreConnection()); Map<byte[], Map> entity = maps[0]; Map<byte[], Long> entityStamp = maps[1]; String tableName = (String) msg.get(DataStoreProtocol.HBASE_TABLE_NAME); // add write buffer BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf(tableName)); params.writeBufferSize(1024 * 1024 * 2); try (BufferedMutator table = datasource.getSourceConnect().getBufferedMutator(params);) { // 取得所有cf List<Put> puts = Lists.newArrayList(); Put put = null; for (byte[] rowkey : entity.keySet()) { // 定制时间戳 put = entityStamp.containsKey(rowkey) ? new Put(rowkey, entityStamp.get(rowkey)) : new Put(rowkey); // 取得column和value for (Object entry : entity.get(rowkey).keySet()) { String[] column = ((String) entry).split(":"); put.addColumn(Bytes.toBytes(column[0]), Bytes.toBytes(column[1]), Bytes.toBytes((String) entity.get(rowkey).get(entry))); } puts.add(put); } // 批量提交 Object[] results = new Object[puts.size()]; // table.batch(puts, results); table.mutate(puts); // flush table.flush(); // 根据插入信息操作并返回结果 return adaptor.handleInsertResult(results, msg, datasource.getDataStoreConnection()); } catch (IOException e) { log.err(this, "INSERT HBASE TABLE[" + tableName + "] FAIL:" + msg.toJSONString(), e); return false; } }
protected void instantiateHTable() throws IOException { mutator = connection.getBufferedMutator( new BufferedMutatorParams(getTableName(connection.getConfiguration())) .writeBufferSize(4 * 1024 * 1024)); }
@Override public BufferedMutator getBufferedMutator(BufferedMutatorParams bmp) throws IOException { return new MBufferedMutator(STANDARD_HBASE_CONNECTION.getBufferedMutator(bmp), REPOSITORY); }
@StartBundle public void startBundle(StartBundleContext c) throws IOException { BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf(tableId)); mutator = connection.getBufferedMutator(params); recordsWritten = 0; }
@Override public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException { return this.conn.getBufferedMutator(params); }
@Override public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException { return null; }