/** * Get the record corresponding to a given component identifier * * @param id Component identifier * @param future Future to provide the record * @see <a href="http://vertx.io/docs/apidocs/io/vertx/core/Future.html" target="_blank">Future</a> */ private void getClusterRecord(final String id, Future<JsonObject> future) { sharedData.getLock(RECORDS_LOCK_NAME, lockRes -> { if (lockRes.succeeded()) { Lock asyncLock = lockRes.result(); clusterRecords.get(id, getRes -> { if (getRes.succeeded()) { String record = getRes.result(); future.complete(new JsonObject(record)); } else { future.fail(getRes.cause()); } }); asyncLock.release(); } else { future.fail(lockRes.cause()); } }); }
@Override public void getLockWithTimeout(String name, long timeout, Handler<AsyncResult<Lock>> resultHandler) { vertx.executeBlocking(future -> { ZKLock lock = locks.get(name); if (lock == null) { InterProcessSemaphoreMutex mutexLock = new InterProcessSemaphoreMutex(curator, ZK_PATH_LOCKS + name); lock = new ZKLock(mutexLock); } try { if (lock.getLock().acquire(timeout, TimeUnit.MILLISECONDS)) { locks.putIfAbsent(name, lock); future.complete(lock); } else { future.fail(new VertxException("Timed out waiting to get lock " + name)); } } catch (Exception e) { future.fail(new VertxException("get lock exception", e)); } }, resultHandler); }
@Override public void getLockWithTimeout(String name, long timeout, Handler<AsyncResult<Lock>> resultHandler) { ContextImpl context = (ContextImpl) vertx.getOrCreateContext(); // Ordered on the internal blocking executor context.executeBlocking(() -> { java.util.concurrent.locks.Lock lock = lockService.getLock(name); try { if (lock.tryLock(timeout, TimeUnit.MILLISECONDS)) { return new JGroupsLock(vertx, lock); } else { throw new VertxException("Timed out waiting to get lock " + name); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new VertxException(e); } }, resultHandler); }
private static <T> void executeDefaultState( long _timeout, ThrowableFutureConsumer<T> _userOperation, VxmsShared vxmsShared, Future<T> operationResult, Lock lock) { lock.release(); if (_timeout > DEFAULT_LONG_VALUE) { addTimeoutHandler( _timeout, vxmsShared, (l) -> { if (!operationResult.isComplete()) { operationResult.fail(new TimeoutException("operation timeout")); } }); } executeAndCompleate(_userOperation, operationResult); }
private static <T> void openCircuitBreakerAndHandleError( Consumer<Throwable> errorHandler, ThrowableErrorConsumer<Throwable, T> onFailureRespond, Consumer<Throwable> errorMethodHandler, Consumer<ExecutionResult<T>> resultConsumer, AsyncResult<T> event, Lock lock, Counter counter) { counter.addAndGet( LOCK_VALUE, val -> { lock.release(); errorHandling( errorHandler, onFailureRespond, errorMethodHandler, resultConsumer, Future.failedFuture(event.cause())); }); }
private static <T, V> void executeDefaultState( long _timeout, ThrowableFutureBiConsumer<T, V> step, T inputValue, VxmsShared vxmsShared, Future<V> operationResult, Lock lock) { lock.release(); if (_timeout > DEFAULT_LONG_VALUE) { addTimeoutHandler( _timeout, vxmsShared, (l) -> { if (!operationResult.isComplete()) { operationResult.fail(new TimeoutException("operation timeout")); } }); } executeAndCompleate(step, inputValue, operationResult); }
private static <T> void openCircuitBreakerAndHandleError( Future<ExecutionResult<T>> _blockingHandler, Consumer<Throwable> _errorHandler, ThrowableFunction<Throwable, T> _onFailureRespond, Consumer<Throwable> _errorMethodHandler, VxmsShared vxmsShared, Throwable e, Lock lck, Counter counter) { counter.addAndGet( LOCK_VALUE, val -> { lck.release(); final Vertx vertx = vxmsShared.getVertx(); vertx.executeBlocking( bhandler -> { T result = handleError(_errorHandler, _onFailureRespond, _errorMethodHandler, e); if (!_blockingHandler.isComplete()) { _blockingHandler.complete(new ExecutionResult<>(result, true, true, null)); } }, false, res -> {}); }); }
private static <T> void openCircuitBreakerAndHandleError( Future<ExecutionResult<T>> _resultHandler, Consumer<Throwable> _errorHandler, ThrowableFunction<Throwable, T> _onFailureRespond, Consumer<Throwable> _errorMethodHandler, VxmsShared vxmsShared, Throwable e, Lock lck, Counter counter) { counter.addAndGet( LOCK_VALUE, val -> { lck.release(); final Vertx vertx = vxmsShared.getVertx(); vertx.executeBlocking( bhandler -> { T result = handleError(_errorHandler, _onFailureRespond, _errorMethodHandler, e); if (!_resultHandler.isComplete()) { _resultHandler.complete(new ExecutionResult<>(result, true, true, null)); } }, false, res -> {}); }); }
private static <T> void executeDefaultState( long _timeout, ThrowableFutureConsumer<T> _userOperation, VxmsShared vxmsShared, Future<T> operationResult, Lock lock) { lock.release(); if (_timeout > DEFAULT_LONG_VALUE) { addTimeoutHandler( _timeout, vxmsShared.getVertx(), (l) -> { if (!operationResult.isComplete()) { operationResult.fail(new TimeoutException("operation timeout")); } }); } executeAndCompleate(_userOperation, operationResult); }
private static <T, V> void executeDefaultState( long _timeout, ThrowableFutureBiConsumer<T, V> _step, T _inputValue, VxmsShared vxmsShared, Future<V> operationResult, Lock lock) { lock.release(); if (_timeout > DEFAULT_LONG_VALUE) { addTimeoutHandler( _timeout, vxmsShared.getVertx(), (l) -> { if (!operationResult.isComplete()) { operationResult.fail(new TimeoutException("operation timeout")); } }); } executeAndCompleate(_step, _inputValue, operationResult); }
@Override public void getLockWithTimeout(String name, long timeout, Handler<AsyncResult<Lock>> handler) { Context context = vertx.getOrCreateContext(); lockCache.getUnchecked(name).whenComplete((lock, error) -> { if (error == null) { lock.async().tryLock(Duration.ofMillis(timeout)).whenComplete((lockResult, lockError) -> { if (lockError == null) { if (lockResult.isPresent()) { context.runOnContext(v -> Future.<Lock>succeededFuture(new AtomixLock(vertx, lock)).setHandler(handler)); } else { context.runOnContext(v -> Future.<Lock>failedFuture(new VertxException("Timed out waiting to get lock " + name)).setHandler(handler)); } } else { context.runOnContext(v -> Future.<Lock>failedFuture(lockError).setHandler(handler)); } }); } else { context.runOnContext(v -> Future.<Lock>failedFuture(error).setHandler(handler)); } }); }
@Override public void getLockWithTimeout(String name, long timeout, Handler<AsyncResult<Lock>> resultHandler) { ContextImpl context = (ContextImpl) vertx.getOrCreateContext(); // Ordered on the internal blocking executor context.executeBlocking(() -> { ISemaphore iSemaphore = hazelcast.getSemaphore(LOCK_SEMAPHORE_PREFIX + name); boolean locked = false; long remaining = timeout; do { long start = System.nanoTime(); try { locked = iSemaphore.tryAcquire(remaining, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { // OK continue } remaining = remaining - MILLISECONDS.convert(System.nanoTime() - start, NANOSECONDS); } while (!locked && remaining > 0); if (locked) { return new HazelcastLock(iSemaphore); } else { throw new VertxException("Timed out waiting to get lock " + name); } }, resultHandler); }
@Override public void getLockWithTimeout(final String name, final long timeout, final Handler<AsyncResult<Lock>> resultHandler) { AsynchronousLock lock = new AsynchronousLock(this.vertx); final AsynchronousLock prev = locks.putIfAbsent(name, lock); if (prev != null) { lock = prev; } final FakeLock flock = new FakeLock(lock); flock.acquire(timeout, resultHandler); }
/** * Get the count of currently deployed components * * @param future Future to provide the count of deployed components * @see <a href="http://vertx.io/docs/apidocs/io/vertx/core/Future.html" target="_blank">Future</a> */ void count(Future<Integer> future) { if (!isInitialized()) { future.fail("DeployRecords should be initialized before using it!"); return; } if (null == localRecords) { sharedData.getLock(RECORDS_LOCK_NAME, lockRes -> { if (lockRes.succeeded()) { Lock asyncLock = lockRes.result(); clusterRecords.size(sizeRes -> { if (sizeRes.succeeded()) { future.complete(sizeRes.result()); } else { future.fail(sizeRes.cause()); } asyncLock.release(); }); } else { future.fail(lockRes.cause()); } }); return; } future.complete(Integer.valueOf(localRecords.size())); }
/** * Method to initialize the object so that it can handle deployment and undeployment of components * * @param isClustered Whether cluster mode is opted * @param future Future to provide the status of initialization * @see <a href="http://vertx.io/docs/apidocs/io/vertx/core/Future.html" target="_blank">Future</a> */ void init(Boolean isClustered, Future<Boolean> future) { if (isInitialized()) { future.complete(false); return; } if (isClustered) { sharedData.getLock(RECORDS_LOCK_NAME, lockRes -> { if (lockRes.succeeded()) { Lock asyncLock = lockRes.result(); sharedData.<String, String>getClusterWideMap(RECORDS_MAP_NAME, mapRes -> { if (mapRes.succeeded()) { clusterRecords = mapRes.result(); future.complete(true); } else { future.fail(mapRes.cause()); } asyncLock.release(); }); } else { future.fail(lockRes.cause()); } }); return; } localRecords = sharedData.getLocalMap(RECORDS_MAP_NAME); future.complete(true); }
@Override public void getLockWithTimeout(String name, long timeout, Handler<AsyncResult<Lock>> handler) { ContextImpl context = (ContextImpl) vertx.getOrCreateContext(); // Ordered on the internal blocking executor context.executeBlocking(() -> { boolean locked; try { IgniteQueue<String> queue = getQueue(name, true); pendingLocks.offer(name); locked = queue.offer(getNodeID(), timeout, TimeUnit.MILLISECONDS); if (!locked) { // EVT_NODE_LEFT/EVT_NODE_FAILED event might be already handled, so trying get lock again if // node left topology. // Use IgniteSempahore when it will be fixed. String ownerId = queue.peek(); ClusterNode ownerNode = ignite.cluster().forNodeId(UUID.fromString(ownerId)).node(); if (ownerNode == null) { queue.remove(ownerId); locked = queue.offer(getNodeID(), timeout, TimeUnit.MILLISECONDS); } } } catch (Exception e) { throw new VertxException("Error during getting lock " + name, e); } finally { pendingLocks.remove(name); } if (locked) { return new LockImpl(name); } else { throw new VertxException("Timed out waiting to get lock " + name); } }, handler); }
private static <T> void releaseLockAndHandleError( Consumer<Throwable> errorHandler, ThrowableErrorConsumer<Throwable, T> onFailureRespond, Consumer<Throwable> errorMethodHandler, Consumer<ExecutionResult<T>> resultConsumer, Lock lock, Throwable cause) { Optional.ofNullable(lock).ifPresent(Lock::release); errorHandling( errorHandler, onFailureRespond, errorMethodHandler, resultConsumer, Future.failedFuture(cause)); }
private static <T> void executeInitialState( int _retry, long _timeout, ThrowableFutureConsumer<T> _userOperation, VxmsShared vxmsShared, Future<T> operationResult, Lock lock, Counter counter) { final long initialRetryCounterValue = (long) (_retry + 1); counter.addAndGet( initialRetryCounterValue, rHandler -> executeDefaultState(_timeout, _userOperation, vxmsShared, operationResult, lock)); }
private static <T> void handleStatefulError( String _methodId, int _retry, long _timeout, long _circuitBreakerTimeout, ThrowableFutureConsumer<T> _userOperation, Consumer<Throwable> errorHandler, ThrowableErrorConsumer<Throwable, T> onFailureRespond, Consumer<Throwable> errorMethodHandler, VxmsShared vxmsShared, Consumer<ExecutionResult<T>> resultConsumer, AsyncResult<T> event, Lock lock, Counter counter, AsyncResult<Long> valHandler) { long count = valHandler.result(); if (count <= DEFAULT_LONG_VALUE) { setCircuitBreakerReleaseTimer(_retry, _circuitBreakerTimeout, vxmsShared, counter); openCircuitBreakerAndHandleError( errorHandler, onFailureRespond, errorMethodHandler, resultConsumer, event, lock, counter); } else { lock.release(); retry( _methodId, _retry, _timeout, _circuitBreakerTimeout, _userOperation, errorHandler, onFailureRespond, errorMethodHandler, vxmsShared, resultConsumer, event); } }
private static <T, V> void executeInitialState( int _retry, long _timeout, ThrowableFutureBiConsumer<T, V> step, T inputValue, VxmsShared vxmsShared, Future<V> operationResult, Lock lock, Counter counter) { final long initialRetryCounterValue = (long) (_retry + 1); counter.addAndGet( initialRetryCounterValue, rHandler -> executeDefaultState(_timeout, step, inputValue, vxmsShared, operationResult, lock)); }
private static <T, V> void handleStatefulError( String _methodId, int _retry, long _timeout, long _circuitBreakerTimeout, ThrowableFutureBiConsumer<T, V> step, T inputValue, Consumer<Throwable> errorHandler, ThrowableErrorConsumer<Throwable, V> onFailureRespond, Consumer<Throwable> errorMethodHandler, VxmsShared vxmsShared, Consumer<ExecutionResult<V>> resultConsumer, AsyncResult<V> event, Lock lock, Counter counter, AsyncResult<Long> valHandler) { long count = valHandler.result(); if (count <= DEFAULT_LONG_VALUE) { setCircuitBreakerReleaseTimer(_retry, _circuitBreakerTimeout, vxmsShared, counter); openCircuitBreakerAndHandleError( errorHandler, onFailureRespond, errorMethodHandler, resultConsumer, event, lock, counter); } else { lock.release(); retry( _methodId, _retry, _timeout, _circuitBreakerTimeout, step, inputValue, errorHandler, onFailureRespond, errorMethodHandler, vxmsShared, resultConsumer, event); } }
private static <T> void executeErrorState( Future<ExecutionResult<T>> _blockingHandler, Consumer<Throwable> _errorHandler, ThrowableFunction<Throwable, T> _onFailureRespond, Consumer<Throwable> _errorMethodHandler, Throwable failure, Lock lock) { Optional.ofNullable(lock).ifPresent(Lock::release); handleErrorExecution( _blockingHandler, _errorHandler, _onFailureRespond, _errorMethodHandler, Optional.ofNullable(failure).orElse(Future.failedFuture("circuit open").cause())); }
private static <T> void executeInitialState( String _methodId, ThrowableSupplier<T> _supplier, Future<ExecutionResult<T>> _blockingHandler, Consumer<Throwable> _errorHandler, ThrowableFunction<Throwable, T> _onFailureRespond, Consumer<Throwable> _errorMethodHandler, VxmsShared vxmsShared, Throwable _t, int _retry, long _timeout, long _circuitBreakerTimeout, long _delay, Lock lock, Counter counter) { final long initialRetryCounterValue = (long) (_retry + 1); counter.addAndGet( initialRetryCounterValue, rHandler -> executeDefault( _methodId, _supplier, _blockingHandler, _errorHandler, _onFailureRespond, _errorMethodHandler, vxmsShared, _t, _retry, _timeout, _circuitBreakerTimeout, _delay, lock)); }
private static <T> void releaseLockAndHandleError( Future<ExecutionResult<T>> _blockingHandler, Consumer<Throwable> _errorHandler, ThrowableFunction<Throwable, T> _onFailureRespond, Consumer<Throwable> _errorMethodHandler, Throwable cause, Lock lock) { Optional.ofNullable(lock).ifPresent(Lock::release); handleErrorExecution( _blockingHandler, _errorHandler, _onFailureRespond, _errorMethodHandler, cause); }
private static <T, V> void executeInitialState( String _methodId, ThrowableFunction<T, V> step, T value, Future<ExecutionResult<V>> _resultHandler, Consumer<Throwable> _errorHandler, ThrowableFunction<Throwable, V> _onFailureRespond, Consumer<Throwable> _errorMethodHandler, VxmsShared vxmsShared, Throwable _t, int _retry, long _timeout, long _circuitBreakerTimeout, long _delay, Lock lock, Counter counter) { final long initialRetryCounterValue = (long) (_retry + 1); counter.addAndGet( initialRetryCounterValue, rHandler -> executeDefault( _methodId, step, value, _resultHandler, _errorHandler, _onFailureRespond, _errorMethodHandler, vxmsShared, _t, _retry, _timeout, _circuitBreakerTimeout, _delay, lock)); }
private static <T> void releaseLockAndHandleError( Future<ExecutionResult<T>> _resultHandler, Consumer<Throwable> _errorHandler, ThrowableFunction<Throwable, T> _onFailureRespond, Consumer<Throwable> _errorMethodHandler, Throwable cause, Lock lock) { Optional.ofNullable(lock).ifPresent(Lock::release); handleErrorExecution( _resultHandler, _errorHandler, _onFailureRespond, _errorMethodHandler, cause); }
private static <T> void executeErrorState( String methodId, VxmsShared vxmsShared, Consumer<Throwable> errorMethodHandler, Message<Object> requestMessage, Encoder encoder, Consumer<Throwable> errorHandler, ThrowableErrorConsumer<Throwable, T> onFailureRespond, DeliveryOptions responseDeliveryOptions, int retryCount, long timeout, long circuitBreakerTimeout, RecursiveExecutor<T> executor, Lock lock) { final Throwable cause = Future.failedFuture("circuit open").cause(); handleError( methodId, vxmsShared, errorMethodHandler, requestMessage, encoder, errorHandler, onFailureRespond, responseDeliveryOptions, retryCount, timeout, circuitBreakerTimeout, executor, lock, cause); }
private static <T> void openCircuitAndHandleError( String methodId, VxmsShared vxmsShared, Consumer<Throwable> errorMethodHandler, Message<Object> requestMessage, Encoder encoder, Consumer<Throwable> errorHandler, ThrowableErrorConsumer<Throwable, T> onFailureRespond, DeliveryOptions responseDeliveryOptions, int retryCount, long timeout, long circuitBreakerTimeout, RecursiveExecutor<T> executor, AsyncResult<Message<Object>> event, Lock lock, Counter counter) { resetLockTimer(vxmsShared, retryCount, circuitBreakerTimeout, counter); lockAndHandle( counter, val -> { final Throwable cause = event.cause(); handleError( methodId, vxmsShared, errorMethodHandler, requestMessage, encoder, errorHandler, onFailureRespond, responseDeliveryOptions, retryCount, timeout, circuitBreakerTimeout, executor, lock, cause); }); }
private static <T> void handleError( String methodId, VxmsShared vxmsShared, Consumer<Throwable> errorMethodHandler, Message<Object> requestMessage, Encoder encoder, Consumer<Throwable> errorHandler, ThrowableErrorConsumer<Throwable, T> onFailureRespond, DeliveryOptions responseDeliveryOptions, int retryCount, long timeout, long circuitBreakerTimeout, RecursiveExecutor<T> executor, Lock lock, Throwable cause) { Optional.ofNullable(lock).ifPresent(Lock::release); final ThrowableFutureConsumer<T> failConsumer = (future) -> future.fail(cause); executor.execute( methodId, vxmsShared, cause, errorMethodHandler, requestMessage, failConsumer, encoder, errorHandler, onFailureRespond, responseDeliveryOptions, retryCount, timeout, circuitBreakerTimeout); }
private static <T> void executeErrorState( String methodId, VxmsShared vxmsShared, Consumer<Throwable> errorMethodHandler, Message<Object> requestMessage, Encoder encoder, Consumer<Throwable> errorHandler, ThrowableFunction<Throwable, T> onFailureRespond, DeliveryOptions responseDeliveryOptions, int retryCount, long timeout, long delay, long circuitBreakerTimeout, RecursiveExecutor<T> executor, Lock lock) { final Throwable cause = Future.failedFuture("circuit open").cause(); handleError( methodId, vxmsShared, errorMethodHandler, requestMessage, encoder, errorHandler, onFailureRespond, responseDeliveryOptions, retryCount, timeout, delay, circuitBreakerTimeout, executor, lock, cause); }
private static <T> void openCircuitAndHandleError( String methodId, VxmsShared vxmsShared, Consumer<Throwable> errorMethodHandler, Message<Object> requestMessage, Encoder encoder, Consumer<Throwable> errorHandler, ThrowableFunction<Throwable, T> onFailureRespond, DeliveryOptions responseDeliveryOptions, int retryCount, long timeout, long delay, long circuitBreakerTimeout, RecursiveExecutor<T> executor, AsyncResult<Message<Object>> event, Lock lock, Counter counter) { resetLockTimer(vxmsShared, retryCount, circuitBreakerTimeout, counter); lockAndHandle( counter, val -> { final Throwable cause = event.cause(); handleError( methodId, vxmsShared, errorMethodHandler, requestMessage, encoder, errorHandler, onFailureRespond, responseDeliveryOptions, retryCount, timeout, delay, circuitBreakerTimeout, executor, lock, cause); }); }
private static <T> void handleError( String methodId, VxmsShared vxmsShared, Consumer<Throwable> errorMethodHandler, Message<Object> requestMessage, Encoder encoder, Consumer<Throwable> errorHandler, ThrowableFunction<Throwable, T> onFailureRespond, DeliveryOptions responseDeliveryOptions, int retryCount, long timeout, long delay, long circuitBreakerTimeout, RecursiveExecutor<T> executor, Lock lock, Throwable cause) { Optional.ofNullable(lock).ifPresent(Lock::release); final ThrowableSupplier<T> failConsumer = () -> { assert cause != null; throw cause; }; executor.execute( methodId, vxmsShared, cause, errorMethodHandler, requestMessage, failConsumer, encoder, errorHandler, onFailureRespond, responseDeliveryOptions, retryCount, timeout, delay, circuitBreakerTimeout); }
/** * Get a local lock with the specified name with specifying a timeout. The lock will be passed to * the handler when it is available. If the lock is not obtained within the timeout a failure * will be sent to the handler * * @param name the name of the lock * @param timeout the timeout in ms * @param resultHandler the handler */ public void getLockWithTimeout(String name, long timeout, Handler<AsyncResult<Lock>> resultHandler) { Objects.requireNonNull(name, "name"); Objects.requireNonNull(resultHandler, "resultHandler"); Arguments.require(timeout >= 0L, "timeout must be >= 0"); AsynchronousLock lock = this.localLocks .computeIfAbsent(name, (n) -> new AsynchronousLock(this.vertx)); lock.acquire(timeout, resultHandler); }