@Override public Observable<Boolean> shutdown() { return Observable.create(new Observable.OnSubscribe<Boolean>() { @Override public void call(final Subscriber<? super Boolean> subscriber) { new Thread(new Runnable() { @Override public void run() { try { isReallyShutdown = ThreadDeathWatcher.awaitInactivity(3, TimeUnit.SECONDS); if (!subscriber.isUnsubscribed()) { subscriber.onNext(isReallyShutdown); subscriber.onCompleted(); } } catch (Throwable e) { if (!subscriber.isUnsubscribed()) { subscriber.onError(e); } } } }).start(); } }); }
private static Thread watchForDefaultEventLoopThread(final Thread thread, final CountDownLatch latch) { ThreadDeathWatcher.watch(thread, new Runnable() { @Override public void run() { latch.countDown(); } }); return thread; }
PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena, int tinyCacheSize, int smallCacheSize, int normalCacheSize, int maxCachedBufferCapacity, int freeSweepAllocationThreshold) { if (maxCachedBufferCapacity < 0) { throw new IllegalArgumentException("maxCachedBufferCapacity: " + maxCachedBufferCapacity + " (expected: >= 0)"); } if (freeSweepAllocationThreshold < 1) { throw new IllegalArgumentException("freeSweepAllocationThreshold: " + maxCachedBufferCapacity + " (expected: > 0)"); } this.freeSweepAllocationThreshold = freeSweepAllocationThreshold; this.heapArena = heapArena; this.directArena = directArena; if (directArena != null) { tinySubPageDirectCaches = createSubPageCaches(tinyCacheSize, PoolArena.numTinySubpagePools); smallSubPageDirectCaches = createSubPageCaches(smallCacheSize, directArena.numSmallSubpagePools); numShiftsNormalDirect = log2(directArena.pageSize); normalDirectCaches = createNormalCaches( normalCacheSize, maxCachedBufferCapacity, directArena); } else { // No directArea is configured so just null out all caches tinySubPageDirectCaches = null; smallSubPageDirectCaches = null; normalDirectCaches = null; numShiftsNormalDirect = -1; } if (heapArena != null) { // Create the caches for the heap allocations tinySubPageHeapCaches = createSubPageCaches(tinyCacheSize, PoolArena.numTinySubpagePools); smallSubPageHeapCaches = createSubPageCaches(smallCacheSize, heapArena.numSmallSubpagePools); numShiftsNormalHeap = log2(heapArena.pageSize); normalHeapCaches = createNormalCaches( normalCacheSize, maxCachedBufferCapacity, heapArena); } else { // No heapArea is configured so just null out all caches tinySubPageHeapCaches = null; smallSubPageHeapCaches = null; normalHeapCaches = null; numShiftsNormalHeap = -1; } // The thread-local cache will keep a list of pooled buffers which must be returned to // the pool when the thread is not alive anymore. ThreadDeathWatcher.watch(thread, freeTask); }
/** * Should be called if the Thread that uses this cache is about to exist to release resources out of the cache */ void free() { ThreadDeathWatcher.unwatch(thread, freeTask); free0(); }