private synchronized void propagateResponse(Chunk chunk) { ChunkRequest chunkRequest = chunkRequestMap.get(chunk.getKey()); for (Promise<Chunk> localRequester : chunkRequest.localRequesters) { localRequester.setSuccess(chunk); } for (Map.Entry<Long, Collection<BzzRetrieveReqMessage>> e : chunkRequest.requesters.entrySet()) { BzzStoreReqMessage msg = new BzzStoreReqMessage(e.getKey(), chunk.getKey(), chunk.getData()); int counter = requesterCount; for (BzzRetrieveReqMessage r : e.getValue()) { r.getPeer().sendMessage(msg); statOutStoreReq.add(1); if (--counter < 0) { break; } } } }
@Override public Future<Channel> getConnection(HostAndPort address) { try { Bootstrap bootstrap = new Bootstrap() .group(group) .channel(NioSocketChannel.class) .option(CONNECT_TIMEOUT_MILLIS, saturatedCast(connectTimeout.toMillis())) .handler(new ThriftClientInitializer( messageFraming, messageEncoding, requestTimeout, socksProxy, sslContextSupplier)); Promise<Channel> promise = group.next().newPromise(); bootstrap.connect(new InetSocketAddress(address.getHost(), address.getPort())) .addListener((ChannelFutureListener) future -> notifyConnect(future, promise)); return promise; } catch (Throwable e) { return group.next().newFailedFuture(new TTransportException(e)); } }
@Override protected void channelRead0(ChannelHandlerContext ctx, ResponseWrapper msg) throws Exception { //找到对应的client请求, 给请求设置promise为true,然后给请求设置值 Promise<ResponseWrapper> responsePromise = RpcClient.getRequestWrapperMap().get(msg.getRequestId()); if (responsePromise != null) { if (msg.getStatus() == 200) { responsePromise.setSuccess(msg); } else { System.out.println("error: " + msg.getStatus()); //设置错误! } //设置完毕之后就可以移出map了 RpcClient.getRequestWrapperMap().remove(msg.getRequestId()); } else { System.out.println("requestWrapper not found"); } }
@Override protected void channelRead0(ChannelHandlerContext ctx, FullHttpResponse msg) throws Exception { Integer streamId = msg.headers().getInt(HttpUtil.ExtensionHeaderNames.STREAM_ID.text()); if (streamId == null) { System.err.println("HttpResponseHandler unexpected message received: " + msg); return; } if (streamId.intValue() == 1) { // this is the upgrade response message, just ignore it. return; } Promise<FullHttpResponse> promise; synchronized (this) { promise = streamId2Promise.get(streamId); } if (promise == null) { System.err.println("Message received for unknown stream id " + streamId); } else { // Do stuff with the message (for now just print it) promise.setSuccess(msg.retain()); } }
@Test public void test() throws InterruptedException, ExecutionException { int streamId = 3; FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/"); request.headers().add(HttpUtil.ExtensionHeaderNames.STREAM_ID.text(), streamId); Promise<FullHttpResponse> promise = CHANNEL.eventLoop().newPromise(); synchronized (RESPONSE_HANDLER) { CHANNEL.writeAndFlush(request); RESPONSE_HANDLER.put(streamId, promise); } assertEquals(HttpResponseStatus.OK, promise.get().status()); ByteBuf content = promise.get().content(); assertEquals("HTTP/2 DTP", content.toString(StandardCharsets.UTF_8)); }
/** * Disconnect from the AudioConnect server and shutdown this client.<br> * If this client has already been or is being shutdown, this will do nothing.<br> * <b>Note:</b> This client instance will no longer be able to connect after this is called. * @return a Future for when this client has completely disconnected and been shutdown. */ public Future<?> shutdown() { if (bootstrap.group().isShuttingDown()) { return GlobalEventExecutor.INSTANCE.newSucceededFuture(null); } final Promise<Object> shutdownPromise = GlobalEventExecutor.INSTANCE.newPromise(); disconnect().addListener(new FutureListener<Object>() { @Override public void operationComplete(Future<Object> future) { bootstrap.group().shutdownGracefully().addListener(new PromiseNotifier<>(shutdownPromise)); } }); return shutdownPromise; }
protected Bootstrap initBootStrap(Promise<Channel> promise, EventLoopGroup eventLoopGroup) { return new Bootstrap() .group(eventLoopGroup) .channel(NioSocketChannel.class) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, NettySettings.CONNECT_TIMEOUT) .option(ChannelOption.SO_KEEPALIVE, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { ProxyHandler proxyHandler = proxyHandlerSupplier.get(); if (proxyHandler != null) { ch.pipeline().addLast(proxyHandler); } ch.pipeline().addLast(new ChannelActiveAwareHandler(promise)); } }); }
private void runRoundTripTests(BlockingQueue<Object> serverQueue, BlockingQueue<Object> clientQueue, Promise<SocketChannel> serverChannel, Promise<SocketChannel> clientChannel) throws InterruptedException, ExecutionException { String obj1 = "test123ÄÖÜ∑"; clientChannel.get().writeAndFlush(obj1).await(1000); assertEquals(obj1, serverQueue.poll(1000, TimeUnit.MILLISECONDS)); Map<String, Object> obj2 = new HashMap<>(); obj2.put("test", "abc"); obj2.put("test2", 123); obj2.putAll(System.getenv()); serverChannel.get().writeAndFlush(obj2).await(1000); assertEquals(obj2, clientQueue.poll(1000, TimeUnit.MILLISECONDS)); SecretKey obj3 = new SecretKeySpec(new byte[]{1, 2, 3, 4}, "RAW"); clientChannel.get().writeAndFlush(obj3).await(1000); assertEquals(obj3, serverQueue.poll(1000, TimeUnit.MILLISECONDS)); }
/** * Disconnects. This will wait for pending writes to be flushed before * disconnecting. * * @return Future<Void> for when we're done disconnecting. If we weren't * connected, this returns null. */ Future<Void> disconnect() { if (channel == null) { return null; } else { final Promise<Void> promise = channel.newPromise(); writeToChannel(Unpooled.EMPTY_BUFFER).addListener( new GenericFutureListener<Future<? super Void>>() { @Override public void operationComplete( Future<? super Void> future) throws Exception { closeChannel(promise); } }); return promise; } }
private void closeChannel(final Promise<Void> promise) { channel.close().addListener( new GenericFutureListener<Future<? super Void>>() { public void operationComplete( Future<? super Void> future) throws Exception { if (future .isSuccess()) { promise.setSuccess(null); } else { promise.setFailure(future .cause()); } }; }); }
/** * Write <hello> to the server after SSL handshake completion to request <greeting> * * <p>When handling EPP over TCP, the server should issue a <greeting> to the client when a * connection is established. Nomulus app however does not automatically sends the <greeting> upon * connection. The proxy therefore first sends a <hello> to registry to request a <greeting> * response. * * <p>The <hello> request is only sent after SSL handshake is completed between the client and the * proxy so that the client certificate hash is available, which is needed to communicate with the * server. Because {@link SslHandshakeCompletionEvent} is triggered before any calls to {@link * #channelRead} are scheduled by the event loop executor, the <hello> request is guaranteed to be * the first message sent to the server. * * @see <a href="https://tools.ietf.org/html/rfc5734">RFC 5732 EPP Transport over TCP</a> * @see <a href="https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt">The Proxy * Protocol</a> */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { Promise<X509Certificate> unusedPromise = ctx.channel() .attr(CLIENT_CERTIFICATE_PROMISE_KEY) .get() .addListener( (Promise<X509Certificate> promise) -> { if (promise.isSuccess()) { sslClientCertificateHash = getCertificateHash(promise.get()); // Set the client cert hash key attribute for both this channel, // used for collecting metrics on specific clients. ctx.channel().attr(CLIENT_CERTIFICATE_HASH_KEY).set(sslClientCertificateHash); clientAddress = ctx.channel().attr(REMOTE_ADDRESS_KEY).get(); metrics.registerActiveConnection( "epp", sslClientCertificateHash, ctx.channel()); channelRead(ctx, Unpooled.wrappedBuffer(helloBytes)); } else { logger.severefmt(promise.cause(), "Cannot finish handshake."); ChannelFuture unusedFuture = ctx.close(); } }); super.channelActive(ctx); }
@Override protected <V, R> void async(boolean readOnlyMode, int slot, MultiDecoder<Object> messageDecoder, Codec codec, RedisCommand<V> command, Object[] params, Promise<R> mainPromise, int attempt) { if (executed) { throw new IllegalStateException("Batch already executed!"); } Entry entry = commands.get(slot); if (entry == null) { entry = new Entry(); Entry oldEntry = commands.putIfAbsent(slot, entry); if (oldEntry != null) { entry = oldEntry; } } if (!readOnlyMode) { entry.setReadOnlyMode(false); } entry.getCommands().add(new CommandEntry(new CommandData<V, R>(mainPromise, messageDecoder, codec, command, params), index.incrementAndGet())); }
public Future<RedisConnection> connectAsync() { final Promise<RedisConnection> f = bootstrap.group().next().newPromise(); ChannelFuture channelFuture = bootstrap.connect(); channelFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { RedisConnection c = new RedisConnection(RedisClient.this, future.channel()); f.setSuccess(c); } else { f.setFailure(future.cause()); } } }); return f; }
private <R, T> void retryReadRandomAsync(final RedisCommand<T> command, final Promise<R> mainPromise, final List<Integer> slots, final Object... params) { final Promise<R> attemptPromise = connectionManager.newPromise(); attemptPromise.addListener(new FutureListener<R>() { @Override public void operationComplete(Future<R> future) throws Exception { if (future.isSuccess()) { if (future.getNow() == null) { if (slots.isEmpty()) { mainPromise.setSuccess(null); } else { retryReadRandomAsync(command, mainPromise, slots, params); } } else { mainPromise.setSuccess(future.getNow()); } } else { mainPromise.setFailure(future.cause()); } } }); Integer slot = slots.remove(0); async(true, slot, null, connectionManager.getCodec(), command, params, attemptPromise, 0); }
public Future<Boolean> addAsync(final V value) { EventLoop loop = commandExecutor.getConnectionManager().getGroup().next(); final Promise<Boolean> promise = loop.newPromise(); loop.execute(new Runnable() { @Override public void run() { try { boolean result = add(value); promise.setSuccess(result); } catch (Exception e) { promise.setFailure(e); } } }); return promise; }
@Override public Future<Boolean> removeAsync(final V value) { EventLoopGroup group = commandExecutor.getConnectionManager().getGroup(); final Promise<Boolean> promise = group.next().newPromise(); group.execute(new Runnable() { @Override public void run() { try { boolean result = remove(value); promise.setSuccess(result); } catch (Exception e) { promise.setFailure(e); } } }); return promise; }
@Override public Future<Boolean> addAllAsync(final Collection<? extends V> c) { final Promise<Boolean> promise = newPromise(); if (c.isEmpty()) { promise.setSuccess(false); return promise; } final int listSize = size(); List<Object> args = new ArrayList<Object>(c.size() + 1); args.add(getName()); args.addAll(c); Future<Long> res = commandExecutor.writeAsync(getName(), RPUSH, args.toArray()); res.addListener(new FutureListener<Long>() { @Override public void operationComplete(Future<Long> future) throws Exception { if (future.isSuccess()) { promise.setSuccess(listSize != future.getNow()); } else { promise.setFailure(future.cause()); } } }); return promise; }
@Test public void testPipelineBigResponse() throws InterruptedException, ExecutionException { RedisClient c = new RedisClient("localhost", 6379); RedisConnection conn = c.connect(); List<CommandData<?, ?>> commands = new ArrayList<CommandData<?, ?>>(); for (int i = 0; i < 1000; i++) { CommandData<String, String> cmd1 = conn.create(null, RedisCommands.PING); commands.add(cmd1); } Promise<Void> p = c.getBootstrap().group().next().newPromise(); conn.send(new CommandsData(p, commands)); for (CommandData<?, ?> commandData : commands) { commandData.getPromise().get(); } conn.sync(RedisCommands.FLUSHDB); }
@Test public void testTerminationFutureSuccessReflectively() throws Exception { Field terminationFutureField = ThreadPerChannelEventLoopGroup.class.getDeclaredField("terminationFuture"); terminationFutureField.setAccessible(true); final Exception[] exceptionHolder = new Exception[1]; for (int i = 0; i < 2; i++) { ThreadPerChannelEventLoopGroup loopGroup = new ThreadPerChannelEventLoopGroup(64); Promise<?> promise = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE) { @Override public Promise<Void> setSuccess(Void result) { try { return super.setSuccess(result); } catch (IllegalStateException e) { exceptionHolder[0] = e; throw e; } } }; terminationFutureField.set(loopGroup, promise); runTest(loopGroup); } // The global event executor will not terminate, but this will give the test a chance to fail. GlobalEventExecutor.INSTANCE.awaitTermination(100, TimeUnit.MILLISECONDS); assertNull(exceptionHolder[0]); }
@SuppressWarnings({ "unchecked", "rawtypes" }) private void call(final NedisClient client, Method method, Object[] args, final Promise promise) throws IllegalAccessException, InvocationTargetException { ((Future) method.invoke(client, args)).addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { if (future.isSuccess()) { promise.trySuccess(future.getNow()); } else { promise.tryFailure(future.cause()); } client.release(); } }); }
@Override public Future<Void> release(final K key, final Channel channel, final Promise<Void> promise) { requireNonNull(key, "key"); requireNonNull(channel, "channel"); requireNonNull(promise, "promise"); try { EventLoop loop = channel.eventLoop(); if (loop.inEventLoop()) { doRelease(key, channel, promise); } else { loop.execute(() -> doRelease(key, channel, promise)); } } catch (Throwable cause) { closeAndFail(channel, cause, promise); } return promise; }
private void doRelease(K key, Channel channel, Promise<Void> promise) { assert channel.eventLoop().inEventLoop(); if (channel.attr(KeyedChannelPoolUtil.POOL).getAndSet(null) != this) { // Better including a stack trace here as this is a user error. closeAndFail(channel, new IllegalArgumentException( "Channel " + channel + " was not acquired from this ChannelPool"), promise); } else { try { if (healthCheckOnRelease) { healthCheckOnRelease(key, channel, promise); } else { releaseAndOffer(key, channel, promise); } } catch (Throwable cause) { closeAndFail(channel, cause, promise); } } }
private void setInfiniteTimeout(final NedisClient client, final Method method, final Object[] args, @SuppressWarnings("rawtypes") final Promise promise) { client.setTimeout(0L).addListener(new FutureListener<Long>() { @Override public void operationComplete(Future<Long> future) throws Exception { // will not fail, but could be null Long previousTimeoutMs = future.get(); if (previousTimeoutMs == null) { promise.tryFailure(new IllegalStateException("already closed")); } else { callBlocking(client, method, args, previousTimeoutMs.longValue(), promise); } } }); }
private void initSession(SessionProtocol protocol, ChannelFuture connectFuture, Promise<Channel> sessionPromise) { assert connectFuture.isSuccess(); final Channel ch = connectFuture.channel(); final EventLoop eventLoop = ch.eventLoop(); assert eventLoop.inEventLoop(); final ScheduledFuture<?> timeoutFuture = eventLoop.schedule(() -> { if (sessionPromise.tryFailure(new SessionProtocolNegotiationException( protocol, "connection established, but session creation timed out: " + ch))) { ch.close(); } }, connectTimeoutMillis, TimeUnit.MILLISECONDS); ch.pipeline().addLast(new HttpSessionHandler(this, ch, sessionPromise, timeoutFuture)); }
private Future<NedisClient> newClient() { Future<NedisClientImpl> f = NedisClientBuilder.create().group(group).channel(channelClass) .timeoutMs(timeoutMs).belongTo(this).connect(remoteAddress); final Promise<NedisClient> promise = getEventExecutor(f).newPromise(); f.addListener(new FutureListener<NedisClientImpl>() { @Override public void operationComplete(Future<NedisClientImpl> future) throws Exception { if (future.isSuccess()) { initialize(promise, future.getNow(), State.AUTH); } else { promise.tryFailure(future.cause()); } } }); return promise; }
public static PromiseConverter<byte[]> toBytes(EventExecutor executor) { return new PromiseConverter<byte[]>(executor) { @Override public FutureListener<Object> newListener(final Promise<byte[]> promise) { return new FutureListener<Object>() { @Override public void operationComplete(Future<Object> future) throws Exception { if (future.isSuccess()) { Object resp = future.getNow(); if (resp instanceof RedisResponseException) { promise.tryFailure((RedisResponseException) resp); } else if (resp == RedisResponseDecoder.NULL_REPLY) { promise.trySuccess(null); } else { promise.trySuccess((byte[]) resp); } } else { promise.tryFailure(future.cause()); } } }; } }; }
public static PromiseConverter<Double> toDouble(EventExecutor executor) { return new PromiseConverter<Double>(executor) { @Override public FutureListener<Object> newListener(final Promise<Double> promise) { return new FutureListener<Object>() { @Override public void operationComplete(Future<Object> future) throws Exception { if (future.isSuccess()) { Object resp = future.getNow(); if (resp instanceof RedisResponseException) { promise.tryFailure((RedisResponseException) resp); } else if (resp == RedisResponseDecoder.NULL_REPLY) { promise.trySuccess(null); } else { promise.trySuccess(bytesToDouble((byte[]) resp)); } } else { promise.tryFailure(future.cause()); } } }; } }; }
public static PromiseConverter<Long> toLong(EventExecutor executor) { return new PromiseConverter<Long>(executor) { @Override public FutureListener<Object> newListener(final Promise<Long> promise) { return new FutureListener<Object>() { @Override public void operationComplete(Future<Object> future) throws Exception { if (future.isSuccess()) { Object resp = future.getNow(); if (resp instanceof RedisResponseException) { promise.tryFailure((RedisResponseException) resp); } else if (resp == RedisResponseDecoder.NULL_REPLY) { promise.trySuccess(null); } else { promise.trySuccess((Long) resp); } } else { promise.tryFailure(future.cause()); } } }; } }; }
public static PromiseConverter<Object> toObject(EventExecutor executor) { return new PromiseConverter<Object>(executor) { @Override public FutureListener<Object> newListener(final Promise<Object> promise) { return new FutureListener<Object>() { @Override public void operationComplete(Future<Object> future) throws Exception { if (future.isSuccess()) { Object resp = future.getNow(); if (resp instanceof RedisResponseException) { promise.tryFailure((RedisResponseException) resp); } else if (resp == RedisResponseDecoder.NULL_REPLY) { promise.trySuccess(null); } else { promise.trySuccess(resp); } } else { promise.tryFailure(future.cause()); } } }; } }; }
public static PromiseConverter<String> toString(EventExecutor executor) { return new PromiseConverter<String>(executor) { @Override public FutureListener<Object> newListener(final Promise<String> promise) { return new FutureListener<Object>() { @Override public void operationComplete(Future<Object> future) throws Exception { if (future.isSuccess()) { Object resp = future.getNow(); if (resp instanceof RedisResponseException) { promise.tryFailure((RedisResponseException) resp); } else if (resp == RedisResponseDecoder.NULL_REPLY) { promise.trySuccess(null); } else { promise.trySuccess(resp.toString()); } } else { promise.tryFailure(future.cause()); } } }; } }; }
public static PromiseConverter<Void> toVoid(EventExecutor executor) { return new PromiseConverter<Void>(executor) { @Override public FutureListener<Object> newListener(final Promise<Void> promise) { return new FutureListener<Object>() { @Override public void operationComplete(Future<Object> future) throws Exception { if (future.isSuccess()) { Object resp = future.getNow(); if (resp instanceof RedisResponseException) { promise.tryFailure((RedisResponseException) resp); } else { promise.trySuccess(null); } } else { promise.tryFailure(future.cause()); } } }; } }; }
@SuppressWarnings({ "unchecked", "rawtypes" }) private void callBlocking(final NedisClient client, Method method, Object[] args, final long previousTimeoutMs, final Promise promise) throws IllegalAccessException, InvocationTargetException { ((Future) method.invoke(client, args)).addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { if (future.isSuccess()) { promise.trySuccess(future.getNow()); } else { promise.tryFailure(future.cause()); } resetTimeout(client, previousTimeoutMs); } }); }
public synchronized Future<Chunk> getAsync(Key key) { Chunk chunk = localStore.get(key); Promise<Chunk> ret = new DefaultPromise<Chunk>() {}; if (chunk == null) { // long timeout = 0; // TODO ChunkRequest chunkRequest = new ChunkRequest(); chunkRequest.localRequesters.add(ret); chunkRequestMap.put(key, chunkRequest); startSearch(-1, key, timeout); } else { ret.setSuccess(chunk); } return ret; }
private static void notifyConnect(ChannelFuture future, Promise<Channel> promise) { if (future.isSuccess()) { Channel channel = future.channel(); if (!promise.trySuccess(channel)) { // Promise was completed in the meantime (likely cancelled), just release the channel again channel.close(); } } else { promise.tryFailure(future.cause()); } }
@Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { Future<Channel> channelFuture = null; try { RequestWrapper requestWrapper = new RequestWrapper(); requestWrapper.setServiceName(name); requestWrapper.setMethodName(method.getName()); requestWrapper.setParameters(args); requestWrapper.setParamTypes(method.getParameterTypes()); requestWrapper.setRequestId(UUIDUtils.getRandomId()); System.out.println("send request: " + requestWrapper); //获取一个可用的channel channelFuture = connectionPool.acquire().sync(); if (!channelFuture.isSuccess()) { throw new Exception("获取链接失败!"); } Promise<ResponseWrapper> promise = eventExecutor.newPromise(); requestWrapper.setPromise(promise); RpcClient.getRequestWrapperMap().put(requestWrapper.getRequestId(), promise); //注意channel.write和context.write的区别 channelFuture.getNow().writeAndFlush(requestWrapper); //如果是同步模式,return结果 return promise.get(5000L, TimeUnit.MILLISECONDS).getResult(); } finally { if (channelFuture != null && channelFuture.isSuccess()) { connectionPool.release(channelFuture.getNow()); } } }
/** * Sends a command. If there is currently a command in progress, this command will be queued and executed when the currently running command finishes. * It is possible for a command to be queued and then a connection closed before it is actually executed, so it is important to listen to the returned future in order to ensure that the command was completed. * * @param imapCommand command to send * @param <T> Response type * @return Response future. Will be completed when a tagged response is received for this command. */ public synchronized <T extends ImapResponse> CompletableFuture<T> sendRaw(ImapCommand imapCommand) { final Promise<T> commandPromise = promiseExecutor.next().newPromise(); commandPromise.addListener((f) -> { writeNext(); }); send(imapCommand, commandPromise); return NettyCompletableFuture.from(commandPromise); }
private synchronized void send(ImapCommand imapCommand, Promise promise) { if (connectionShutdown.get()) { promise.tryFailure(new ConnectionClosedException("Cannot write to closed connection.")); return; } if ((currentCommandPromise != null && !currentCommandPromise.isDone()) || !isConnected()) { PendingCommand pendingCommand = PendingCommand.newInstance(imapCommand, promise); pendingWriteQueue.add(pendingCommand); } else { actuallySend(imapCommand, promise); } }
static PendingCommand newInstance(ImapCommand imapCommand, Promise promise) { PendingCommand pendingCommand = RECYCLER.get(); pendingCommand.imapCommand = imapCommand; pendingCommand.promise = promise; return pendingCommand; }