/** * Returns a Thread pool for the RPC's to region replicas. Similar to * Connection's thread pool. */ private ExecutorService getDefaultThreadPool(Configuration conf) { int maxThreads = conf.getInt("hbase.region.replica.replication.threads.max", 256); int coreThreads = conf.getInt("hbase.region.replica.replication.threads.core", 16); if (maxThreads == 0) { maxThreads = Runtime.getRuntime().availableProcessors() * 8; } if (coreThreads == 0) { coreThreads = Runtime.getRuntime().availableProcessors() * 8; } long keepAliveTime = conf.getLong("hbase.region.replica.replication.threads.keepalivetime", 60); LinkedBlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>(maxThreads * conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS, HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS)); ThreadPoolExecutor tpe = new ThreadPoolExecutor( coreThreads, maxThreads, keepAliveTime, TimeUnit.SECONDS, workQueue, Threads.newDaemonThreadFactory(this.getClass().getSimpleName() + "-rpc-shared-")); tpe.allowCoreThreadTimeOut(true); return tpe; }
@Override public void run() { try { while (!master.isStopped() && master.isActiveMaster()) { Thread.sleep(timeout); if (master.isInitialized()) { LOG.debug("Initialization completed within allotted tolerance. Monitor exiting."); } else { LOG.error("Master failed to complete initialization after " + timeout + "ms. Please" + " consider submitting a bug report including a thread dump of this process."); if (haltOnTimeout) { LOG.error("Zombie Master exiting. Thread dump to stdout"); Threads.printThreadInfo(System.out, "Zombie HMaster"); System.exit(-1); } } } } catch (InterruptedException ie) { LOG.trace("InitMonitor thread interrupted. Existing."); } }
public static byte[] waitForProcedureToComplete(ProcedureExecutor<MasterProcedureEnv> procExec, final long procId) throws IOException { while (!procExec.isFinished(procId) && procExec.isRunning()) { // TODO: add a config to make it tunable // Dev Consideration: are we waiting forever, or we can set up some timeout value? Threads.sleepWithoutInterrupt(250); } ProcedureInfo result = procExec.getResult(procId); if (result != null) { if (result.isFailed()) { // If the procedure fails, we should always have an exception captured. Throw it. throw RemoteProcedureException.fromProto( result.getForeignExceptionMessage()).unwrapRemoteException(); } return result.getResult(); } else { if (procExec.isRunning()) { throw new IOException("Procedure " + procId + "not found"); } else { throw new IOException("The Master is Aborting"); } } }
/** * We initialize the roller for the wal that handles meta lazily * since we don't know if this regionserver will handle it. All calls to * this method return a reference to the that same roller. As newly referenced * meta regions are brought online, they will be offered to the roller for maintenance. * As a part of that registration process, the roller will add itself as a * listener on the wal. */ protected LogRoller ensureMetaWALRoller() { // Using a tmp log roller to ensure metaLogRoller is alive once it is not // null LogRoller roller = metawalRoller.get(); if (null == roller) { LogRoller tmpLogRoller = new LogRoller(this, this); String n = Thread.currentThread().getName(); Threads.setDaemonThreadRunning(tmpLogRoller.getThread(), n + "-MetaLogRoller", uncaughtExceptionHandler); if (metawalRoller.compareAndSet(null, tmpLogRoller)) { roller = tmpLogRoller; } else { // Another thread won starting the roller Threads.shutdown(tmpLogRoller.getThread()); roller = metawalRoller.get(); } } return roller; }
@Override public void run() { try { this.user.runAs(new PrivilegedAction<Object>(){ public Object run() { runRegionServer(); return null; } }); } catch (Throwable t) { LOG.error("Exception in run", t); } finally { // Run this on the way out. if (this.shutdownThread != null) { this.shutdownThread.start(); Threads.shutdown(this.shutdownThread, 30000); } } }
/** * Blocks until there is an active master and that master has completed * initialization. * * @return true if an active master becomes available. false if there are no * masters left. * @throws InterruptedException */ public boolean waitForActiveAndReadyMaster(long timeout) throws IOException { List<JVMClusterUtil.MasterThread> mts; long start = System.currentTimeMillis(); while (!(mts = getMasterThreads()).isEmpty() && (System.currentTimeMillis() - start) < timeout) { for (JVMClusterUtil.MasterThread mt : mts) { if (mt.getMaster().isActiveMaster() && mt.getMaster().isInitialized()) { return true; } } Threads.sleep(100); } return false; }
/** * Test that we can interrupt a node that is blocked on a wait. * @throws IOException * @throws InterruptedException */ @Test public void testInterruptible() throws IOException, InterruptedException { Abortable abortable = new StubAbortable(); ZooKeeperWatcher zk = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(), "testInterruptible", abortable); final TestTracker tracker = new TestTracker(zk, "/xyz", abortable); tracker.start(); Thread t = new Thread() { @Override public void run() { try { tracker.blockUntilAvailable(); } catch (InterruptedException e) { throw new RuntimeException("Interrupted", e); } } }; t.start(); while (!t.isAlive()) Threads.sleep(1); tracker.stop(); t.join(); // If it wasn't interruptible, we'd never get to here. }
private static void waitUntilAllRegionsAssigned() throws IOException { HTable meta = new HTable(TEST_UTIL.getConfiguration(), TableName.META_TABLE_NAME); while (true) { int rows = 0; Scan scan = new Scan(); scan.addColumn(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER); ResultScanner s = meta.getScanner(scan); for (Result r = null; (r = s.next()) != null;) { byte [] b = r.getValue(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER); if (b == null || b.length <= 0) { break; } rows++; } s.close(); // If I get to here and all rows have a Server, then all have been assigned. if (rows >= countOfRegions) { break; } LOG.info("Found=" + rows); Threads.sleep(1000); } meta.close(); }
/** * Wait until all the regions are assigned. */ private void waitForAllRegionsAssigned() throws IOException { int totalRegions = HBaseTestingUtility.KEYS.length; while (UTIL.getMiniHBaseCluster().countServedRegions() < totalRegions) { // while (!cluster.getMaster().allRegionsAssigned()) { LOG.debug("Waiting for there to be "+ totalRegions +" regions, but there are " + UTIL.getMiniHBaseCluster().countServedRegions() + " right now."); try { Thread.sleep(200); } catch (InterruptedException e) {} } RegionStates regionStates = UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates(); while (!regionStates.getRegionsInTransition().isEmpty()) { Threads.sleep(100); } }
@Override protected void handleReportForDutyResponse(RegionServerStartupResponse c) throws IOException { if (firstRS.getAndSet(false)) { InetSocketAddress address = super.getRpcServer().getListenerAddress(); if (address == null) { throw new IOException("Listener channel is closed"); } for (NameStringPair e : c.getMapEntriesList()) { String key = e.getName(); // The hostname the master sees us as. if (key.equals(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)) { String hostnameFromMasterPOV = e.getValue(); assertEquals(address.getHostName(), hostnameFromMasterPOV); } } while (!masterActive) { Threads.sleep(100); } super.kill(); } else { super.handleReportForDutyResponse(c); } }
public static void compactAndBlockUntilDone(Admin admin, HRegionServer rs, byte[] regionName) throws IOException, InterruptedException { log("Compacting region: " + Bytes.toStringBinary(regionName)); admin.majorCompactRegion(regionName); log("blocking until compaction is complete: " + Bytes.toStringBinary(regionName)); Threads.sleepWithoutInterrupt(500); outer: for (;;) { for (Store store : rs.getOnlineRegion(regionName).getStores()) { if (store.getStorefilesCount() > 1) { Threads.sleep(50); continue outer; } } break; } }
public static void blockUntilRegionIsOpened(Configuration conf, long timeout, HRegionInfo hri) throws IOException, InterruptedException { log("blocking until region is opened for reading:" + hri.getRegionNameAsString()); long start = System.currentTimeMillis(); try (Connection conn = ConnectionFactory.createConnection(conf); Table table = conn.getTable(hri.getTable())) { byte[] row = hri.getStartKey(); // Check for null/empty row. If we find one, use a key that is likely to be in first region. if (row == null || row.length <= 0) row = new byte[] { '0' }; Get get = new Get(row); while (System.currentTimeMillis() - start < timeout) { try { table.get(get); break; } catch (IOException ex) { // wait some more } Threads.sleep(10); } } }
@BeforeClass public static void startCluster() throws Exception { metricsHelper = CompatibilityFactory.getInstance(MetricsAssertHelper.class); TEST_UTIL = new HBaseTestingUtility(); conf = TEST_UTIL.getConfiguration(); conf.getLong("hbase.splitlog.max.resubmit", 0); // Make the failure test faster conf.setInt("zookeeper.recovery.retry", 0); conf.setInt(HConstants.REGIONSERVER_INFO_PORT, -1); TEST_UTIL.startMiniCluster(1, 1); cluster = TEST_UTIL.getHBaseCluster(); cluster.waitForActiveAndReadyMaster(); while (cluster.getLiveRegionServerThreads().size() < 1) { Threads.sleep(100); } rs = cluster.getRegionServer(0); metricsRegionServer = rs.getRegionServerMetrics(); serverSource = metricsRegionServer.getMetricsSource(); }
@BeforeClass public static void startCluster() throws Exception { metricsHelper = CompatibilityFactory.getInstance(MetricsAssertHelper.class); TEST_UTIL = new HBaseTestingUtility(); conf = TEST_UTIL.getConfiguration(); conf.getLong("hbase.splitlog.max.resubmit", 0); // Make the failure test faster conf.setInt("zookeeper.recovery.retry", 0); conf.setInt(HConstants.REGIONSERVER_INFO_PORT, -1); TEST_UTIL.startMiniCluster(1, 2); cluster = TEST_UTIL.getHBaseCluster(); cluster.waitForActiveAndReadyMaster(); while (cluster.getLiveRegionServerThreads().size() < 2) { Threads.sleep(100); } }
private void compactAndWait() throws IOException, InterruptedException { LOG.debug("Compacting table " + tableName); HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0); HBaseAdmin admin = TEST_UTIL.getHBaseAdmin(); admin.majorCompact(tableName); // Waiting for the compaction to start, at least .5s. final long maxWaitime = System.currentTimeMillis() + 500; boolean cont; do { cont = rs.compactSplitThread.getCompactionQueueSize() == 0; Threads.sleep(1); } while (cont && System.currentTimeMillis() < maxWaitime); while (rs.compactSplitThread.getCompactionQueueSize() > 0) { Threads.sleep(1); } LOG.debug("Compaction queue size reached 0, continuing"); }
private boolean rollWriterOrDie() { for (int i = 0; i < rollRetries; ++i) { if (i > 0) Threads.sleepWithoutInterrupt(waitBeforeRoll * i); try { if (rollWriter()) { return true; } } catch (IOException e) { LOG.warn("Unable to roll the log, attempt=" + (i + 1), e); } } LOG.fatal("Unable to roll the log"); sendAbortProcessSignal(); throw new RuntimeException("unable to roll the log"); }
@Override public boolean waitForActiveAndReadyMaster(long timeout) throws IOException { long start = System.currentTimeMillis(); while (System.currentTimeMillis() - start < timeout) { try { getMasterAdminService(); return true; } catch (MasterNotRunningException m) { LOG.warn("Master not started yet " + m); } catch (ZooKeeperConnectionException e) { LOG.warn("Failed to connect to ZK " + e); } Threads.sleep(1000); } return false; }
@Override public void run() { // Add some jitter. int jitter = RandomUtils.nextInt((int) periodMs); LOG.info("Sleeping for " + jitter + " to add jitter"); Threads.sleep(jitter); while (!isStopped()) { long start = System.currentTimeMillis(); runOneIteration(); if (isStopped()) return; long sleepTime = periodMs - (System.currentTimeMillis() - start); if (sleepTime > 0) { LOG.info("Sleeping for: " + sleepTime); Threads.sleep(sleepTime); } } }
@InterfaceAudience.Private public static ThreadPoolExecutor getDefaultExecutor(Configuration conf) { int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE); if (maxThreads == 0) { maxThreads = 1; // is there a better default? } long keepAliveTime = conf.getLong("hbase.htable.threads.keepalivetime", 60); // Using the "direct handoff" approach, new threads will only be created // if it is necessary and will grow unbounded. This could be bad but in HCM // we only create as many Runnables as there are region servers. It means // it also scales when new region servers are added. ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), Threads.newDaemonThreadFactory("htable")); pool.allowCoreThreadTimeOut(true); return pool; }
private static Pair<EventLoopGroup, Class<? extends Channel>> createEventLoopGroup( Configuration conf) { // Max amount of threads to use. 0 lets Netty decide based on amount of cores int maxThreads = conf.getInt(CLIENT_MAX_THREADS, 0); // Config to enable native transport. Does not seem to be stable at time of implementation // although it is not extensively tested. boolean epollEnabled = conf.getBoolean(USE_NATIVE_TRANSPORT, false); // Use the faster native epoll transport mechanism on linux if enabled if (epollEnabled && JVM.isLinux() && JVM.isAmd64()) { if (LOG.isDebugEnabled()) { LOG.debug("Create EpollEventLoopGroup with maxThreads = " + maxThreads); } return new Pair<EventLoopGroup, Class<? extends Channel>>(new EpollEventLoopGroup(maxThreads, Threads.newDaemonThreadFactory("AsyncRpcChannel")), EpollSocketChannel.class); } else { if (LOG.isDebugEnabled()) { LOG.debug("Create NioEventLoopGroup with maxThreads = " + maxThreads); } return new Pair<EventLoopGroup, Class<? extends Channel>>(new NioEventLoopGroup(maxThreads, Threads.newDaemonThreadFactory("AsyncRpcChannel")), NioSocketChannel.class); } }
public static ThreadPoolExecutor getDefaultExecutor(Configuration conf) { int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE); if (maxThreads == 0) { maxThreads = 1; // is there a better default? } long keepAliveTime = conf.getLong("hbase.htable.threads.keepalivetime", 60); // Using the "direct handoff" approach, new threads will only be created // if it is necessary and will grow unbounded. This could be bad but in HCM // we only create as many Runnables as there are region servers. It means // it also scales when new region servers are added. ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), Threads.newDaemonThreadFactory("hbase-table")); pool.allowCoreThreadTimeOut(true); return pool; }
public static void blockUntilRegionIsOpened(Configuration conf, long timeout, HRegionInfo hri) throws IOException, InterruptedException { log("blocking until region is opened for reading:" + hri.getRegionNameAsString()); long start = System.currentTimeMillis(); HTable table = new HTable(conf, hri.getTable()); try { byte [] row = hri.getStartKey(); // Check for null/empty row. If we find one, use a key that is likely to be in first region. if (row == null || row.length <= 0) row = new byte [] {'0'}; Get get = new Get(row); while (System.currentTimeMillis() - start < timeout) { try { table.get(get); break; } catch(IOException ex) { //wait some more } Threads.sleep(10); } } finally { IOUtils.closeQuietly(table); } }
private void startCluster(int num_master, int num_rs, Configuration inConf) throws Exception { ZKSplitLog.Counters.resetCounters(); LOG.info("Starting cluster"); this.conf = inConf; conf.getLong("hbase.splitlog.max.resubmit", 0); // Make the failure test faster conf.setInt("zookeeper.recovery.retry", 0); TEST_UTIL = new HBaseTestingUtility(conf); TEST_UTIL.startMiniCluster(num_master, num_rs); cluster = TEST_UTIL.getHBaseCluster(); LOG.info("Waiting for active/ready master"); cluster.waitForActiveAndReadyMaster(); master = cluster.getMaster(); while (cluster.getLiveRegionServerThreads().size() < num_rs) { Threads.sleep(1); } }
public static void blockUntilRegionIsInMeta(Table metaTable, long timeout, HRegionInfo hri) throws IOException, InterruptedException { log("blocking until region is in META: " + hri.getRegionNameAsString()); long start = System.currentTimeMillis(); while (System.currentTimeMillis() - start < timeout) { Result result = getRegionRow(metaTable, hri.getRegionName()); if (result != null) { HRegionInfo info = HRegionInfo.getHRegionInfo(result); if (info != null && !info.isOffline()) { log("found region in META: " + hri.getRegionNameAsString()); break; } } Threads.sleep(10); } }
/** * Test we reopen a region once closed. * @throws Exception */ @Test (timeout=300000) public void testReOpenRegion() throws Exception { MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); LOG.info("Number of region servers = " + cluster.getLiveRegionServerThreads().size()); int rsIdx = 0; HRegionServer regionServer = TEST_UTIL.getHBaseCluster().getRegionServer(rsIdx); HRegionInfo hri = getNonMetaRegion(ProtobufUtil.getOnlineRegions(regionServer)); LOG.debug("Asking RS to close region " + hri.getRegionNameAsString()); LOG.info("Unassign " + hri.getRegionNameAsString()); cluster.getMaster().assignmentManager.unassign(hri); while (!cluster.getMaster().assignmentManager.wasClosedHandlerCalled(hri)) { Threads.sleep(100); } while (!cluster.getMaster().assignmentManager.wasOpenedHandlerCalled(hri)) { Threads.sleep(100); } LOG.info("Done with testReOpenRegion"); }
public static void blockUntilRegionIsOpened(Configuration conf, long timeout, HRegionInfo hri) throws IOException, InterruptedException { log("blocking until region is opened for reading:" + hri.getRegionNameAsString()); long start = System.currentTimeMillis(); Table table = new HTable(conf, hri.getTable()); try { byte [] row = hri.getStartKey(); // Check for null/empty row. If we find one, use a key that is likely to be in first region. if (row == null || row.length <= 0) row = new byte [] {'0'}; Get get = new Get(row); while (System.currentTimeMillis() - start < timeout) { try { table.get(get); break; } catch(IOException ex) { //wait some more } Threads.sleep(10); } } finally { IOUtils.closeQuietly(table); } }
public static void blockUntilRegionIsOpened(Configuration conf, long timeout, HRegionInfo hri) throws IOException, InterruptedException { log("blocking until region is opened for reading:" + hri.getRegionNameAsString()); long start = System.currentTimeMillis(); HTable table = new HTable(conf, hri.getTableName()); try { Get get = new Get(hri.getStartKey()); while (System.currentTimeMillis() - start < timeout) { try { table.get(get); break; } catch(IOException ex) { //wait some more } Threads.sleep(10); } } finally { IOUtils.closeQuietly(table); } }
@BeforeClass public static void startCluster() throws Exception { metricsHelper = CompatibilityFactory.getInstance(MetricsAssertHelper.class); TEST_UTIL = new HBaseTestingUtility(); conf = TEST_UTIL.getConfiguration(); conf.getLong("hbase.splitlog.max.resubmit", 0); // Make the failure test faster conf.setInt("zookeeper.recovery.retry", 0); conf.setInt(HConstants.REGIONSERVER_INFO_PORT, -1); TEST_UTIL.startMiniCluster(1, 1); cluster = TEST_UTIL.getHBaseCluster(); cluster.waitForActiveAndReadyMaster(); while (cluster.getLiveRegionServerThreads().size() < 1) { Threads.sleep(100); } rs = cluster.getRegionServer(0); metricsRegionServer = rs.getMetrics(); serverSource = metricsRegionServer.getMetricsSource(); }
@Before public void setUp() throws Exception { HTableDescriptor htd = new HTableDescriptor(TEST_TABLE.getTableName()); HColumnDescriptor hcd = new HColumnDescriptor(TEST_FAMILY1); hcd.setMaxVersions(4); htd.setOwner(USER_OWNER); htd.addFamily(hcd); hcd = new HColumnDescriptor(TEST_FAMILY2); hcd.setMaxVersions(4); htd.setOwner(USER_OWNER); htd.addFamily(hcd); // Create the test table (owner added to the _acl_ table) try (Connection connection = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration())) { try (Admin admin = connection.getAdmin()) { admin.createTable(htd, new byte[][] { Bytes.toBytes("s") }); } } TEST_UTIL.waitTableEnabled(TEST_TABLE.getTableName()); LOG.info("Sleeping a second because of HBASE-12581"); Threads.sleep(1000); }
/** * Loop until {@link ExecutorService#awaitTermination} finally does return * without an interrupted exception. If we don't do this, then we'll shut * down prematurely. We want to let the executor service clear its task * queue, closing client sockets appropriately. */ private void shutdownServer() { executorService.shutdown(); long msLeftToWait = serverOptions.stopTimeoutUnit.toMillis(serverOptions.stopTimeoutVal); long timeMillis = System.currentTimeMillis(); LOG.info("Waiting for up to " + msLeftToWait + " ms to finish processing" + " pending requests"); boolean interrupted = false; while (msLeftToWait >= 0) { try { executorService.awaitTermination(msLeftToWait, TimeUnit.MILLISECONDS); break; } catch (InterruptedException ix) { long timePassed = System.currentTimeMillis() - timeMillis; msLeftToWait -= timePassed; timeMillis += timePassed; interrupted = true; } } LOG.info("Interrupting all worker threads and waiting for " + TIME_TO_WAIT_AFTER_SHUTDOWN_MS + " ms longer"); // This will interrupt all the threads, even those running a task. executorService.shutdownNow(); Threads.sleepWithoutInterrupt(TIME_TO_WAIT_AFTER_SHUTDOWN_MS); // Preserve the interrupted status. if (interrupted) { Thread.currentThread().interrupt(); } LOG.info("Thrift server shutdown complete"); }
@SuppressWarnings("deprecation") public IncrementCoalescer(HBaseHandler hand) { this.handler = hand; LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(); pool = new ThreadPoolExecutor(CORE_POOL_SIZE, CORE_POOL_SIZE, 50, TimeUnit.MILLISECONDS, queue, Threads.newDaemonThreadFactory("IncrementCoalescer")); MBeanUtil.registerMBean("thrift", "Thrift", this); }
public static void doTestIncrements(HBaseHandler handler) throws Exception { List<Mutation> mutations = new ArrayList<Mutation>(1); mutations.add(new Mutation(false, columnAAname, valueEname, true)); mutations.add(new Mutation(false, columnAname, valueEname, true)); handler.mutateRow(tableAname, rowAname, mutations, null); handler.mutateRow(tableAname, rowBname, mutations, null); List<TIncrement> increments = new ArrayList<TIncrement>(); increments.add(new TIncrement(tableAname, rowBname, columnAAname, 7)); increments.add(new TIncrement(tableAname, rowBname, columnAAname, 7)); increments.add(new TIncrement(tableAname, rowBname, columnAAname, 7)); int numIncrements = 60000; for (int i = 0; i < numIncrements; i++) { handler.increment(new TIncrement(tableAname, rowAname, columnAname, 2)); handler.incrementRows(increments); } Thread.sleep(1000); long lv = handler.get(tableAname, rowAname, columnAname, null).get(0).value.getLong(); // Wait on all increments being flushed while (handler.coalescer.getQueueSize() != 0) Threads.sleep(10); assertEquals((100 + (2 * numIncrements)), lv ); lv = handler.get(tableAname, rowBname, columnAAname, null).get(0).value.getLong(); assertEquals((100 + (3 * 7 * numIncrements)), lv); assertTrue(handler.coalescer.getSuccessfulCoalescings() > 0); }
@Override public void startup() { String n = Thread.currentThread().getName(); Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() { @Override public void uncaughtException(final Thread t, final Throwable e) { LOG.error("Unexpected exception in ReplicationSource", e); } }; Threads .setDaemonThreadRunning(this, n + ".replicationSource," + this.peerClusterZnode, handler); }
public void startup() { String n = Thread.currentThread().getName(); Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() { @Override public void uncaughtException(final Thread t, final Throwable e) { LOG.error("Unexpected exception in ReplicationSourceWorkerThread," + " currentPath=" + getCurrentPath(), e); } }; Threads.setDaemonThreadRunning(this, n + ".replicationSource." + walGroupId + "," + peerClusterZnode, handler); workerThreads.put(walGroupId, this); }
private void terminate(String reason, Exception cause) { if (cause == null) { LOG.info("Closing worker for wal group " + this.walGroupId + " because: " + reason); } else { LOG.error("Closing worker for wal group " + this.walGroupId + " because an error occurred: " + reason, cause); } this.interrupt(); Threads.shutdown(this, sleepForRetries); LOG.info("ReplicationSourceWorker " + this.getName() + " terminated"); }
@Override public void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { if (!HttpServer.isInstrumentationAccessAllowed(getServletContext(), request, response)) { return; } response.setContentType("text/plain; charset=UTF-8"); try (PrintStream out = new PrintStream( response.getOutputStream(), false, "UTF-8")) { Threads.printThreadInfo(out, ""); out.flush(); } ReflectionUtils.logThreadInfo(LOG, "jsp requested", 1); }