public static void generateHBaseDatasetNullStr(Connection conn, Admin admin, TableName tableName, int numberRegions) throws Exception { if (admin.tableExists(tableName)) { admin.disableTable(tableName); admin.deleteTable(tableName); } HTableDescriptor desc = new HTableDescriptor(tableName); desc.addFamily(new HColumnDescriptor("f")); if (numberRegions > 1) { admin.createTable(desc, Arrays.copyOfRange(SPLIT_KEYS, 0, numberRegions-1)); } else { admin.createTable(desc); } BufferedMutator table = conn.getBufferedMutator(tableName); Put p = new Put("a1".getBytes()); p.addColumn("f".getBytes(), "c1".getBytes(), "".getBytes()); p.addColumn("f".getBytes(), "c2".getBytes(), "".getBytes()); p.addColumn("f".getBytes(), "c3".getBytes(), "5".getBytes()); p.addColumn("f".getBytes(), "c4".getBytes(), "".getBytes()); table.mutate(p); table.close(); }
/** * Writes an action (Put or Delete) to the specified table. * * @param tableName * the table being updated. * @param action * the update, either a put or a delete. * @throws IllegalArgumentException * if the action is not a put or a delete. */ @Override public void write(ImmutableBytesWritable tableName, Mutation action) throws IOException { BufferedMutator mutator = getBufferedMutator(tableName); // The actions are not immutable, so we defensively copy them if (action instanceof Put) { Put put = new Put((Put) action); put.setDurability(useWriteAheadLogging ? Durability.SYNC_WAL : Durability.SKIP_WAL); mutator.mutate(put); } else if (action instanceof Delete) { Delete delete = new Delete((Delete) action); mutator.mutate(delete); } else throw new IllegalArgumentException( "action must be either Delete or Put"); }
public void write(final TableName table, final List<Put> puts) { Preconditions.checkNotNull(table); Preconditions.checkNotNull(puts); try(final Connection connection = this.connectionFactory.getConnection(); final BufferedMutator mutator = connection.getBufferedMutator(table);) { mutator.mutate(puts); mutator.flush(); } catch(Exception ex) { final String errorMsg = String.format("Failed with a [%s] when writing to table [%s] ", ex.getMessage(), table.getNameAsString()); throw new SinkConnectorException(errorMsg, ex); } }
@Override protected void setup(final Context context) throws IOException, InterruptedException { super.setup(context); final Configuration configuration = context.getConfiguration(); skipWAL = configuration.getBoolean(Constants.MAPREDUCE_INDEX_SKIP_WAL, false); TableName outputTable = TableName.valueOf(configuration.get(TableOutputFormat.OUTPUT_TABLE)); BufferedMutator.ExceptionListener listener = (e, mutator) -> { for (int i = 0; i < e.getNumExceptions(); i++) { LOG.warn("Failed to send put: " + e.getRow(i)); } }; BufferedMutatorParams mutatorParms = new BufferedMutatorParams(outputTable).listener(listener); mutator = getGraph().connection().getBufferedMutator(mutatorParms); }
/** * 利用BufferedMutator批量导入 * * @param connection * @throws IOException */ private static void bmImport(Connection connection) throws IOException { BufferedMutator bufferedMutator = connection.getBufferedMutator(TableName.valueOf("t3")); byte[] columnFamily = "f1".getBytes(); long startTime = System.currentTimeMillis(); ArrayList<Put> puts = new ArrayList<Put>(); for (int i = 0; i < 999999; i++) { puts.add(HBaseUtil.createPut(i + "", columnFamily, "c1", i + "")); //每10000条导入一次 if (i % 10000 == 0) { bufferedMutator.mutate(puts); puts.clear(); } } //批量调用 bufferedMutator.mutate(puts); bufferedMutator.close(); System.out.println("共耗时:" + (System.currentTimeMillis() - startTime) + "ms"); }
/** * Flushes to BigTable any buffered client-side write operation. * <p> * @return A {@link Deferred}, whose callback chain will be invoked when * everything that was buffered at the time of the call has been flushed. * <p> * Note that this doesn't guarantee that <b>ALL</b> outstanding RPCs have * completed. This doesn't introduce any sort of global sync point. All * it does really is it sends any buffered RPCs to BigTable. */ public Deferred<Object> flush() { LOG.info("Flushing buffered mutations"); final ArrayList<Deferred<Object>> deferreds = new ArrayList<Deferred<Object>>(mutators.size()); for (final BufferedMutator mutator : mutators.values()) { try { // TODO - run in a separate thread, breaks asynchronus behavior // right now mutator.flush(); deferreds.add(Deferred.fromResult(null)); } catch (IOException e) { LOG.error("Error occurred while flushing buffer", e); deferreds.add(Deferred.fromError(e)); } } num_flushes.increment(); @SuppressWarnings("unchecked") final Deferred<Object> flushed = (Deferred) Deferred.group(deferreds); return flushed; }
/** * loads some data to the table. */ private void loadData(Admin admin, BufferedMutator table, TableName tableName, int fileNum, int rowNumPerFile) throws IOException, InterruptedException { if (fileNum <= 0) { throw new IllegalArgumentException(); } for (int i = 0; i < fileNum * rowNumPerFile; i++) { for (byte k0 : KEYS) { byte[] k = new byte[] { k0 }; byte[] key = Bytes.add(k, Bytes.toBytes(i)); byte[] mobVal = makeDummyData(10 * (i + 1)); Put put = new Put(key); put.setDurability(Durability.SKIP_WAL); put.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), mobVal); put.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf2), mobVal); put.addColumn(Bytes.toBytes(family2), Bytes.toBytes(qf1), mobVal); table.mutate(put); } if ((i + 1) % rowNumPerFile == 0) { table.flush(); admin.flush(tableName); } } }
private LinkedBlockingQueue<Long> insertData() throws IOException, InterruptedException { LinkedBlockingQueue<Long> rowKeys = new LinkedBlockingQueue<>(25000); BufferedMutator ht = util.getConnection().getBufferedMutator(this.tableName); byte[] value = new byte[300]; TraceUtil.addSampler(Sampler.ALWAYS); for (int x = 0; x < 5000; x++) { try (TraceScope traceScope = TraceUtil.createTrace("insertData")) { for (int i = 0; i < 5; i++) { long rk = random.nextLong(); rowKeys.add(rk); Put p = new Put(Bytes.toBytes(rk)); for (int y = 0; y < 10; y++) { random.nextBytes(value); p.addColumn(familyName, Bytes.toBytes(random.nextLong()), value); } ht.mutate(p); } if ((x % 1000) == 0) { admin.flush(tableName); } } } admin.flush(tableName); return rowKeys; }
public BufferedMutator getBufferedMutator(final ExceptionListener exceptionListener) { final BufferedMutatorParams params = new BufferedMutatorParams(getName()).listener(exceptionListener); BufferedMutator bufferedMutator; try { bufferedMutator = hBaseConnection.getConnection().getBufferedMutator(params); } catch (final Exception e) { throw new HBaseException("Unable to create buffered mutator for table " + getDisplayName(), e); } return bufferedMutator; }
public static void generateHBaseDatasetCompositeKeyDate(Connection conn, Admin admin, TableName tableName, int numberRegions) throws Exception { if (admin.tableExists(tableName)) { admin.disableTable(tableName); admin.deleteTable(tableName); } HTableDescriptor desc = new HTableDescriptor(tableName); desc.addFamily(new HColumnDescriptor(FAMILY_F)); if (numberRegions > 1) { admin.createTable(desc, Arrays.copyOfRange(SPLIT_KEYS, 0, numberRegions-1)); } else { admin.createTable(desc); } BufferedMutator table = conn.getBufferedMutator(tableName); Date startDate = new Date(1408924800000L); long startTime = startDate.getTime(); long MILLISECONDS_IN_A_DAY = (long)1000 * 60 * 60 * 24; long MILLISECONDS_IN_A_YEAR = MILLISECONDS_IN_A_DAY * 365; long endTime = startTime + MILLISECONDS_IN_A_YEAR; long interval = MILLISECONDS_IN_A_DAY / 3; for (long ts = startTime, counter = 0; ts < endTime; ts += interval, counter ++) { byte[] rowKey = ByteBuffer.allocate(16) .putLong(ts).array(); for(int i = 0; i < 8; ++i) { rowKey[8 + i] = (byte)(counter >> (56 - (i * 8))); } Put p = new Put(rowKey); p.addColumn(FAMILY_F, COLUMN_C, "dummy".getBytes()); table.mutate(p); } table.close(); }
public static void generateHBaseDatasetCompositeKeyInt(Connection conn, Admin admin, TableName tableName, int numberRegions) throws Exception { if (admin.tableExists(tableName)) { admin.disableTable(tableName); admin.deleteTable(tableName); } HTableDescriptor desc = new HTableDescriptor(tableName); desc.addFamily(new HColumnDescriptor(FAMILY_F)); if (numberRegions > 1) { admin.createTable(desc, Arrays.copyOfRange(SPLIT_KEYS, 0, numberRegions-1)); } else { admin.createTable(desc); } BufferedMutator table = conn.getBufferedMutator(tableName); int startVal = 0; int stopVal = 1000; int interval = 47; long counter = 0; for (int i = startVal; i < stopVal; i += interval, counter ++) { byte[] rowKey = ByteBuffer.allocate(12).putInt(i).array(); for(int j = 0; j < 8; ++j) { rowKey[4 + j] = (byte)(counter >> (56 - (j * 8))); } Put p = new Put(rowKey); p.addColumn(FAMILY_F, COLUMN_C, "dummy".getBytes()); table.mutate(p); } table.close(); }
public static void generateHBaseDatasetDoubleOB(Connection conn, Admin admin, TableName tableName, int numberRegions) throws Exception { if (admin.tableExists(tableName)) { admin.disableTable(tableName); admin.deleteTable(tableName); } HTableDescriptor desc = new HTableDescriptor(tableName); desc.addFamily(new HColumnDescriptor(FAMILY_F)); if (numberRegions > 1) { admin.createTable(desc, Arrays.copyOfRange(SPLIT_KEYS, 0, numberRegions-1)); } else { admin.createTable(desc); } BufferedMutator table = conn.getBufferedMutator(tableName); for (double i = 0.5; i <= 100.00; i += 0.75) { byte[] bytes = new byte[9]; PositionedByteRange br = new SimplePositionedMutableByteRange(bytes, 0, 9); OrderedBytes.encodeFloat64(br, i, Order.ASCENDING); Put p = new Put(bytes); p.addColumn(FAMILY_F, COLUMN_C, String.format("value %03f", i).getBytes()); table.mutate(p); } table.close(); admin.flush(tableName); }
public static void generateHBaseDatasetFloatOB(Connection conn, Admin admin, TableName tableName, int numberRegions) throws Exception { if (admin.tableExists(tableName)) { admin.disableTable(tableName); admin.deleteTable(tableName); } HTableDescriptor desc = new HTableDescriptor(tableName); desc.addFamily(new HColumnDescriptor(FAMILY_F)); if (numberRegions > 1) { admin.createTable(desc, Arrays.copyOfRange(SPLIT_KEYS, 0, numberRegions-1)); } else { admin.createTable(desc); } BufferedMutator table = conn.getBufferedMutator(tableName); for (float i = (float)0.5; i <= 100.00; i += 0.75) { byte[] bytes = new byte[5]; PositionedByteRange br = new SimplePositionedMutableByteRange(bytes, 0, 5); OrderedBytes.encodeFloat32(br, i,Order.ASCENDING); Put p = new Put(bytes); p.addColumn(FAMILY_F, COLUMN_C, String.format("value %03f", i).getBytes()); table.mutate(p); } table.close(); admin.flush(tableName); }
public static void generateHBaseDatasetBigIntOB(Connection conn, Admin admin, TableName tableName, int numberRegions) throws Exception { if (admin.tableExists(tableName)) { admin.disableTable(tableName); admin.deleteTable(tableName); } HTableDescriptor desc = new HTableDescriptor(tableName); desc.addFamily(new HColumnDescriptor(FAMILY_F)); if (numberRegions > 1) { admin.createTable(desc, Arrays.copyOfRange(SPLIT_KEYS, 0, numberRegions-1)); } else { admin.createTable(desc); } BufferedMutator table = conn.getBufferedMutator(tableName); long startTime = (long)1438034423 * 1000; for (long i = startTime; i <= startTime + 100; i ++) { byte[] bytes = new byte[9]; PositionedByteRange br = new SimplePositionedMutableByteRange(bytes, 0, 9); OrderedBytes.encodeInt64(br, i, Order.ASCENDING); Put p = new Put(bytes); p.addColumn(FAMILY_F, COLUMN_C, String.format("value %d", i).getBytes()); table.mutate(p); } table.close(); admin.flush(tableName); }
public static void generateHBaseDatasetIntOB(Connection conn, Admin admin, TableName tableName, int numberRegions) throws Exception { if (admin.tableExists(tableName)) { admin.disableTable(tableName); admin.deleteTable(tableName); } HTableDescriptor desc = new HTableDescriptor(tableName); desc.addFamily(new HColumnDescriptor(FAMILY_F)); if (numberRegions > 1) { admin.createTable(desc, Arrays.copyOfRange(SPLIT_KEYS, 0, numberRegions-1)); } else { admin.createTable(desc); } BufferedMutator table = conn.getBufferedMutator(tableName); for (int i = -49; i <= 100; i ++) { byte[] bytes = new byte[5]; PositionedByteRange br = new SimplePositionedMutableByteRange(bytes, 0, 5); OrderedBytes.encodeInt32(br, i, Order.ASCENDING); Put p = new Put(bytes); p.addColumn(FAMILY_F, COLUMN_C, String.format("value %d", i).getBytes()); table.mutate(p); } table.close(); admin.flush(tableName); }
public static void generateHBaseDatasetDoubleOBDesc(Connection conn, Admin admin, TableName tableName, int numberRegions) throws Exception { if (admin.tableExists(tableName)) { admin.disableTable(tableName); admin.deleteTable(tableName); } HTableDescriptor desc = new HTableDescriptor(tableName); desc.addFamily(new HColumnDescriptor(FAMILY_F)); if (numberRegions > 1) { admin.createTable(desc, Arrays.copyOfRange(SPLIT_KEYS, 0, numberRegions-1)); } else { admin.createTable(desc); } BufferedMutator table = conn.getBufferedMutator(tableName); for (double i = 0.5; i <= 100.00; i += 0.75) { byte[] bytes = new byte[9]; PositionedByteRange br = new SimplePositionedMutableByteRange(bytes, 0, 9); OrderedBytes.encodeFloat64(br, i, Order.DESCENDING); Put p = new Put(bytes); p.addColumn(FAMILY_F, COLUMN_C, String.format("value %03f", i).getBytes()); table.mutate(p); } table.close(); admin.flush(tableName); }
public static void generateHBaseDatasetFloatOBDesc(Connection conn, Admin admin, TableName tableName, int numberRegions) throws Exception { if (admin.tableExists(tableName)) { admin.disableTable(tableName); admin.deleteTable(tableName); } HTableDescriptor desc = new HTableDescriptor(tableName); desc.addFamily(new HColumnDescriptor(FAMILY_F)); if (numberRegions > 1) { admin.createTable(desc, Arrays.copyOfRange(SPLIT_KEYS, 0, numberRegions-1)); } else { admin.createTable(desc); } BufferedMutator table = conn.getBufferedMutator(tableName); for (float i = (float)0.5; i <= 100.00; i += 0.75) { byte[] bytes = new byte[5]; PositionedByteRange br = new SimplePositionedMutableByteRange(bytes, 0, 5); OrderedBytes.encodeFloat32(br, i, Order.DESCENDING); Put p = new Put(bytes); p.addColumn(FAMILY_F, COLUMN_C, String.format("value %03f", i).getBytes()); table.mutate(p); } table.close(); admin.flush(tableName); }
public static void generateHBaseDatasetBigIntOBDesc(Connection conn, Admin admin, TableName tableName, int numberRegions) throws Exception { if (admin.tableExists(tableName)) { admin.disableTable(tableName); admin.deleteTable(tableName); } HTableDescriptor desc = new HTableDescriptor(tableName); desc.addFamily(new HColumnDescriptor(FAMILY_F)); if (numberRegions > 1) { admin.createTable(desc, Arrays.copyOfRange(SPLIT_KEYS, 0, numberRegions-1)); } else { admin.createTable(desc); } BufferedMutator table = conn.getBufferedMutator(tableName); long startTime = (long)1438034423 * 1000; for (long i = startTime; i <= startTime + 100; i ++) { byte[] bytes = new byte[9]; PositionedByteRange br = new SimplePositionedMutableByteRange(bytes, 0, 9); OrderedBytes.encodeInt64(br, i, Order.DESCENDING); Put p = new Put(bytes); p.addColumn(FAMILY_F, COLUMN_C, String.format("value %d", i).getBytes()); table.mutate(p); } table.close(); admin.flush(tableName); }
public static void generateHBaseDatasetIntOBDesc(Connection conn, Admin admin, TableName tableName, int numberRegions) throws Exception { if (admin.tableExists(tableName)) { admin.disableTable(tableName); admin.deleteTable(tableName); } HTableDescriptor desc = new HTableDescriptor(tableName); desc.addFamily(new HColumnDescriptor(FAMILY_F)); if (numberRegions > 1) { admin.createTable(desc, Arrays.copyOfRange(SPLIT_KEYS, 0, numberRegions-1)); } else { admin.createTable(desc); } BufferedMutator table = conn.getBufferedMutator(tableName); for (int i = -49; i <= 100; i ++) { byte[] bytes = new byte[5]; PositionedByteRange br = new SimplePositionedMutableByteRange(bytes, 0, 5); OrderedBytes.encodeInt32(br, i, Order.DESCENDING); Put p = new Put(bytes); p.addColumn(FAMILY_F, COLUMN_C, String.format("value %d", i).getBytes()); table.mutate(p); } table.close(); admin.flush(tableName); }
@Override public RecordWriter getRecordWriter(FileSystem ignored, JobConf job, String name, Progressable progress) throws IOException { // expecting exactly one path TableName tableName = TableName.valueOf(job.get(OUTPUT_TABLE)); BufferedMutator mutator = null; // Connection is not closed. Dies with JVM. No possibility for cleanup. Connection connection = ConnectionFactory.createConnection(job); mutator = connection.getBufferedMutator(tableName); // Clear write buffer on fail is true by default so no need to reset it. return new TableRecordWriter(mutator); }
/** * @param tableName * the name of the table, as a string * @return the named mutator * @throws IOException * if there is a problem opening a table */ BufferedMutator getBufferedMutator(ImmutableBytesWritable tableName) throws IOException { if(this.connection == null){ this.connection = ConnectionFactory.createConnection(conf); } if (!mutatorMap.containsKey(tableName)) { LOG.debug("Opening HTable \"" + Bytes.toString(tableName.get())+ "\" for writing"); BufferedMutator mutator = connection.getBufferedMutator(TableName.valueOf(tableName.get())); mutatorMap.put(tableName, mutator); } return mutatorMap.get(tableName); }
@Override public void close(TaskAttemptContext context) throws IOException { for (BufferedMutator mutator : mutatorMap.values()) { mutator.close(); } if (connection != null) { connection.close(); } }
private LinkedBlockingQueue<Long> insertData() throws IOException, InterruptedException { LinkedBlockingQueue<Long> rowKeys = new LinkedBlockingQueue<Long>(25000); BufferedMutator ht = util.getConnection().getBufferedMutator(this.tableName); byte[] value = new byte[300]; for (int x = 0; x < 5000; x++) { TraceScope traceScope = Trace.startSpan("insertData", Sampler.ALWAYS); try { for (int i = 0; i < 5; i++) { long rk = random.nextLong(); rowKeys.add(rk); Put p = new Put(Bytes.toBytes(rk)); for (int y = 0; y < 10; y++) { random.nextBytes(value); p.add(familyName, Bytes.toBytes(random.nextLong()), value); } ht.mutate(p); } if ((x % 1000) == 0) { admin.flush(tableName); } } finally { traceScope.close(); } } admin.flush(tableName); return rowKeys; }
@Override protected void instantiateHTable() throws IOException { for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) { BufferedMutatorParams params = new BufferedMutatorParams(getTableName(i)); params.writeBufferSize(4 * 1024 * 1024); BufferedMutator table = connection.getBufferedMutator(params); this.tables[i] = table; } }
MBufferedMutator(BufferedMutator userBufferedMutator, Repository repository) throws IOException { wrappedBufferedMutator = userBufferedMutator; this.repository = repository; if (this.repository.isActivated()) { mTableDescriptor = this.repository.getMTableDescriptor(wrappedBufferedMutator.getName()); includedInRepositoryProcessing = repository.isIncludedTable(wrappedBufferedMutator.getName()); } else { mTableDescriptor = null; includedInRepositoryProcessing = false; } }
private void persistDataUsingBufferedMutatorMethods(Connection connection, TableName tableName) throws IOException { try (BufferedMutator bufferedMutator = connection.getBufferedMutator(tableName); Table table = connection.getTable(tableName)) { // do standard Puts for subsequent Delete in BufferedMutator List<Put> putList = new LinkedList<>(); putList.add(new Put(ROW_ID_02). addColumn(CF01, COLQUALIFIER08, TABLE_PUT_WITH_LIST). addColumn(CF02, COLQUALIFIER07, TABLE_PUT_WITH_LIST)); table.put(putList); // test BufferMutator individual mutations (put and delete) bufferedMutator.mutate( new Put(ROW_ID_03).addColumn(CF02, COLQUALIFIER07, TABLE_PUT_WITH_BUFFERED_MUTATOR)); bufferedMutator.mutate( new Delete(ROW_ID_02).addColumn(CF01, COLQUALIFIER08)); bufferedMutator.flush(); // test BufferMutator with List of mutations (put and delete) List<Mutation> mutationList = new LinkedList<>(); mutationList.add(new Put(ROW_ID_04).addColumn( CF01, COLQUALIFIER07, TABLE_PUT_WITH_BUFFERED_MUTATOR_LIST)); mutationList.add(new Delete(ROW_ID_02).addColumn(CF02, COLQUALIFIER07)); bufferedMutator.mutate(mutationList); bufferedMutator.flush(); } }
private boolean overrideWrite(Connection connection, Fetchable fetchable) throws IOException { BufferedMutator mutator = connection.getBufferedMutator(TableName.valueOf(tableName)); try { Row row; while ((row = (Row) fetchable.fetch()) != null && !interrupted) { mutator.mutate(toPut(row, false)); } } finally { IOUtils.closeQuietly(mutator); } return true; }
private boolean duplicateWrite(Connection connection, Fetchable fetchable) throws IOException { Table table = connection.getTable(TableName.valueOf(tableName)); BufferedMutator mutator = connection.getBufferedMutator(TableName.valueOf(tableName)); try { List<Get> checks = Lists.newArrayListWithCapacity(BatchCount); List<Put> puts = Lists.newArrayListWithCapacity(BatchCount); List<Bee> bees; while ((bees = fetchable.fetch(BatchCount)).size() > 0 && !interrupted) { for (Bee bee : bees) { checks.add(toGet((Row) bee)); } // Check wheter those keys exists or not. boolean[] exists = table.existsAll(checks); for (int index = 0; index < bees.size(); index++) { puts.add(toPut((Row) bees.get(index), exists[index])); } mutator.mutate(puts); checks.clear(); puts.clear(); } } finally { IOUtils.closeQuietly(table); IOUtils.closeQuietly(mutator); } return true; }
private static BufferedMutator getBufferedMutator(HBaseGraph graph, String tableName) { try { HBaseGraphConfiguration config = graph.configuration(); TableName name = HBaseGraphUtils.getTableName(config, tableName); BufferedMutatorParams params = new BufferedMutatorParams(name).listener(LISTENER); return graph.connection().getBufferedMutator(params); } catch (IOException e) { throw new HBaseGraphException(e); } }
public HBaseBulkLoader(HBaseGraph graph, BufferedMutator edgesMutator, BufferedMutator edgeIndicesMutator, BufferedMutator verticesMutator, BufferedMutator vertexIndicesMutator) { this.graph = graph; this.edgesMutator = edgesMutator; this.edgeIndicesMutator = edgeIndicesMutator; this.verticesMutator = verticesMutator; this.vertexIndicesMutator = vertexIndicesMutator; this.skipWAL = graph.configuration().getBulkLoaderSkipWAL(); }
/** Helper function to create a table and return the rows that it created. */ private static void writeData(String tableId, int numRows) throws Exception { Connection connection = admin.getConnection(); TableName tableName = TableName.valueOf(tableId); BufferedMutator mutator = connection.getBufferedMutator(tableName); List<Mutation> mutations = makeTableData(numRows); mutator.mutate(mutations); mutator.flush(); mutator.close(); }
public static void generateHBaseDatasetSingleSchema(Connection conn, Admin admin, TableName tableName, int numberRegions) throws Exception { if (admin.tableExists(tableName)) { admin.disableTable(tableName); admin.deleteTable(tableName); } HTableDescriptor desc = new HTableDescriptor(tableName); desc.addFamily(new HColumnDescriptor("f")); if (numberRegions > 1) { admin.createTable(desc, Arrays.copyOfRange(SPLIT_KEYS, 0, numberRegions - 1)); } else { admin.createTable(desc); } BufferedMutator table = conn.getBufferedMutator(tableName); Put p = new Put("a1".getBytes()); p.addColumn("f".getBytes(), "c1".getBytes(), "21".getBytes()); p.addColumn("f".getBytes(), "c2".getBytes(), "22".getBytes()); p.addColumn("f".getBytes(), "c3".getBytes(), "23".getBytes()); table.mutate(p); p = new Put("a2".getBytes()); p.addColumn("f".getBytes(), "c1".getBytes(), "11".getBytes()); p.addColumn("f".getBytes(), "c2".getBytes(), "12".getBytes()); p.addColumn("f".getBytes(), "c3".getBytes(), "13".getBytes()); table.mutate(p); p = new Put("a3".getBytes()); p.addColumn("f".getBytes(), "c1".getBytes(), "31".getBytes()); p.addColumn("f".getBytes(), "c2".getBytes(), "32".getBytes()); p.addColumn("f".getBytes(), "c3".getBytes(), "33".getBytes()); table.mutate(p); table.close(); }
public static void generateHBaseDatasetMultiCF(Connection conn, Admin admin, TableName tableName, int numberRegions) throws Exception { if (admin.tableExists(tableName)) { admin.disableTable(tableName); admin.deleteTable(tableName); } HTableDescriptor desc = new HTableDescriptor(tableName); desc.addFamily(new HColumnDescriptor("f")); desc.addFamily(new HColumnDescriptor("F")); if (numberRegions > 1) { admin.createTable(desc, Arrays.copyOfRange(SPLIT_KEYS, 0, numberRegions - 1)); } else { admin.createTable(desc); } BufferedMutator table = conn.getBufferedMutator(tableName); Put p = new Put("a1".getBytes()); p.addColumn("f".getBytes(), "c1".getBytes(), "21".getBytes()); p.addColumn("f".getBytes(), "c2".getBytes(), "22".getBytes()); p.addColumn("F".getBytes(), "c3".getBytes(), "23".getBytes()); table.mutate(p); p = new Put("a2".getBytes()); p.addColumn("f".getBytes(), "c1".getBytes(), "11".getBytes()); p.addColumn("f".getBytes(), "c2".getBytes(), "12".getBytes()); p.addColumn("F".getBytes(), "c3".getBytes(), "13".getBytes()); table.mutate(p); p = new Put("a3".getBytes()); p.addColumn("f".getBytes(), "c1".getBytes(), "31".getBytes()); p.addColumn("f".getBytes(), "c2".getBytes(), "32".getBytes()); p.addColumn("F".getBytes(), "c3".getBytes(), "33".getBytes()); table.mutate(p); table.close(); }