一尘不染

什么是ThreadPool服务器的异步/等待方式?

c#

我正在使用同步api和线程池在看起来像这样的tcp服务器上工作:

TcpListener listener;
void Serve(){
  while(true){
    var client = listener.AcceptTcpClient();
    ThreadPool.QueueUserWorkItem(this.HandleConnection, client);
    //Or alternatively new Thread(HandleConnection).Start(client)
  }
}

假设我的目标是使用最少的资源使用尽可能多的并发连接,这似乎很快就会受到可用线程数的限制。我怀疑通过使用非阻塞任务api,我将能够以更少的资源来处理更多任务。

我最初的印象是:

async Task Serve(){
  while(true){
    var client = await listener.AcceptTcpClientAsync();
    HandleConnectionAsync(client); //fire and forget?
  }
}

但是令我震惊的是,这可能会导致瓶颈。也许HandleConnectionAsync将花费很长的时间才能到达第一次等待状态,并且将阻止主接受循环继续进行。这是否只会使用一个线程,或者运行时会根据需要在多个线程上神奇地运行事物?

有没有一种方法可以将这两种方法结合起来,以便我的服务器将使用它所需数量的线程来运行大量活动任务,而不会在IO操作中不必要地阻塞线程?

在这种情况下,是否有惯用的方法来最大化吞吐量?


阅读 268

收藏
2020-05-19

共1个答案

一尘不染

我会让框架来管理线程,并且不会创建任何额外的线程,除非性能分析测试表明我可能需要这样做。特别是,如果内部的呼叫HandleConnectionAsync大部分是IO绑定的。

无论如何,如果您想在的开头释放调用线程(调度程序)HandleConnectionAsync,则有一个非常简单的解决方案。
您可以从一个新的线程跳ThreadPoolawait Yield()如果您的服务器在执行环境中运行,而该执行环境在初始线程上未安装任何同步上下文(控制台应用程序,WCF服务),则通常可以使用TCP服务器。

下面说明了这一点(代码最初是从此处开始)。注意,主while循环不会显式创建任何线程:

using System;
using System.Collections.Generic;
using System.Net.Sockets;
using System.Text;
using System.Threading.Tasks;

class Program
{
    object _lock = new Object(); // sync lock 
    List<Task> _connections = new List<Task>(); // pending connections

    // The core server task
    private async Task StartListener()
    {
        var tcpListener = TcpListener.Create(8000);
        tcpListener.Start();
        while (true)
        {
            var tcpClient = await tcpListener.AcceptTcpClientAsync();
            Console.WriteLine("[Server] Client has connected");
            var task = StartHandleConnectionAsync(tcpClient);
            // if already faulted, re-throw any error on the calling context
            if (task.IsFaulted)
                await task;
        }
    }

    // Register and handle the connection
    private async Task StartHandleConnectionAsync(TcpClient tcpClient)
    {
        // start the new connection task
        var connectionTask = HandleConnectionAsync(tcpClient);

        // add it to the list of pending task 
        lock (_lock)
            _connections.Add(connectionTask);

        // catch all errors of HandleConnectionAsync
        try
        {
            await connectionTask;
            // we may be on another thread after "await"
        }
        catch (Exception ex)
        {
            // log the error
            Console.WriteLine(ex.ToString());
        }
        finally
        {
            // remove pending task
            lock (_lock)
                _connections.Remove(connectionTask);
        }
    }

    // Handle new connection
    private async Task HandleConnectionAsync(TcpClient tcpClient)
    {
        await Task.Yield();
        // continue asynchronously on another threads

        using (var networkStream = tcpClient.GetStream())
        {
            var buffer = new byte[4096];
            Console.WriteLine("[Server] Reading from client");
            var byteCount = await networkStream.ReadAsync(buffer, 0, buffer.Length);
            var request = Encoding.UTF8.GetString(buffer, 0, byteCount);
            Console.WriteLine("[Server] Client wrote {0}", request);
            var serverResponseBytes = Encoding.UTF8.GetBytes("Hello from server");
            await networkStream.WriteAsync(serverResponseBytes, 0, serverResponseBytes.Length);
            Console.WriteLine("[Server] Response has been written");
        }
    }

    // The entry point of the console app
    static async Task Main(string[] args)
    {
        Console.WriteLine("Hit Ctrl-C to exit.");
        await new Program().StartListener();
    }
}

另外,代码可能如下所示,而没有await Task.Yield()。请注意,我通过
一个asynclambda来Task.Run,因为我还是想
从里面异步API的好处HandleConnectionAsync和使用await在那里:

// Handle new connection
private static Task HandleConnectionAsync(TcpClient tcpClient)
{
    return Task.Run(async () =>
    {
        using (var networkStream = tcpClient.GetStream())
        {
            var buffer = new byte[4096];
            Console.WriteLine("[Server] Reading from client");
            var byteCount = await networkStream.ReadAsync(buffer, 0, buffer.Length);
            var request = Encoding.UTF8.GetString(buffer, 0, byteCount);
            Console.WriteLine("[Server] Client wrote {0}", request);
            var serverResponseBytes = Encoding.UTF8.GetBytes("Hello from server");
            await networkStream.WriteAsync(serverResponseBytes, 0, serverResponseBytes.Length);
            Console.WriteLine("[Server] Response has been written");
        }
    });
}

*根据注释 *更新
了:如果这将是库代码,则执行环境的确是未知的,并且可能具有非默认的同步上下文。在这种情况下,我宁愿在池线程(没有任何同步上下文)上运行主服务器循环:

private static Task StartListener()
{
    return Task.Run(async () => 
    {
        var tcpListener = TcpListener.Create(8000);
        tcpListener.Start();
        while (true)
        {
            var tcpClient = await tcpListener.AcceptTcpClientAsync();
            Console.WriteLine("[Server] Client has connected");
            var task = StartHandleConnectionAsync(tcpClient);
            if (task.IsFaulted)
                await task;
        }
    });
}

这样,StartListener在客户端内部创建的所有子任务都不会受到客户端代码的同步上下文的影响。因此,我不必在Task.ConfigureAwait(false)任何地方明确调用。

*于2020年 *更新 ,有人在场外问了一个好问题:

我想知道在这里使用锁的原因是什么?这对于异常处理不是必需的。我的理解是使用锁是因为List不是线程安全的,因此,真正的问题是为什么将任务添加到列表中(并导致负载下锁的成本)。

由于Task.Run完全能够跟踪其启动的任务,因此我认为在此特定示例中,锁是无用的,但是您将其放在那儿是因为在实际程序中,将任务包含在列表中使我们能够例如,如果程序从操作系统收到终止信号,则迭代当前正在运行的任务并干净地终止任务。

确实,在现实生活中,出于以下几个原因,我们几乎总是想跟踪我们开始的任务Task.Run(或Task“运行中”的任何其他对象):

  • 跟踪任务异常,如果在其他地方无法观察到,则可能被默默地吞下
  • 为了能够异步等待所有待处理任务的完成(例如,考虑使用“启动/停止UI”按钮或处理启动/停止无头Windows服务内部的请求)。
  • 为了能够控制(和限制/限制)我们同时进行的任务数量。

有更好的机制来处理现实中的并发工作流(例如,TPL数据流库),但即使在这个简单示例中,我的确也包括了任务列表和目的锁定。使用一劳永逸的方法可能很诱人,但这绝不是一个好主意。以我自己的经验,当我确实想要一劳永逸时,我使用了一些async void方法(请检查)。

2020-05-19