一尘不染

StaTaskScheduler和STA线程消息泵送

c#

TL; DR: _由运行的任务中的死锁StaTaskScheduler。_长版:

我使用的是StaTaskSchedulerParallelExtensionsExtras中通过平行小组,举办一些遗留STA
COM对象由第三方提供。StaTaskScheduler实现细节的描述如下:

好消息是,TPL的实现能够在MTA或STA线程上运行,并考虑到了诸如WaitHandle.WaitAll之类的基础API的相关差异(仅在为该方法提供多个等待句柄时才支持MTA线程)。

我认为这意味着TPL的阻塞部分将使用等待API,该API会CoWaitForMultipleHandles在STA线程上调用时泵送消息(例如),以避免出现死锁情况。

在我的情况下,我相信正在发生以下情况:进程内STA COM对象A对进程外对象B进行调用,然后期望通过B作为该传出调用的一部分从B进行回调。

以简化形式:

var result = await Task.Factory.StartNew(() =>
{
    // in-proc object A
    var a = new A(); 
    // out-of-proc object B
    var b = new B(); 
    // A calls B and B calls back A during the Method call
    return a.Method(b);     
}, CancellationToken.None, TaskCreationOptions.None, staTaskScheduler);

问题是,a.Method(b)永不返回。据我所知,发生这种情况是因为内部的某个阻塞等待BlockingCollection<Task>不会泵送消息,因此我对引用语句的假设可能是错误的。

编辑
的在测试WinForms应用程序的UI线程上执行时(即提供TaskScheduler.FromCurrentSynchronizationContext()而不是staTaskSchedulerto
Task.Factory.StartNew),可以使用相同的代码。

解决这个问题的正确方法是什么?我是否应该实现一个自定义同步上下文,该上下文将显式地使用泵送消息CoWaitForMultipleHandles,并将其安装在由其启动的每个STA线程上StaTaskScheduler

如果是这样,将在底层实现BlockingCollection调用我的SynchronizationContext.Wait方法吗?我可以SynchronizationContext.WaitHelper用来实施SynchronizationContext.Wait吗?


*使用一些代码进行 *编辑 ,该代码表明在执行阻塞等待时托管的STA线程不会泵送。该代码是一个完整的控制台应用程序,可以复制/粘贴/运行:

using System;
using System.Collections.Concurrent;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleTestApp
{
    class Program
    {
        // start and run an STA thread
        static void RunStaThread(bool pump)
        {
            // test a blocking wait with BlockingCollection.Take
            var tasks = new BlockingCollection<Task>();

            var thread = new Thread(() => 
            {
                // Create a simple Win32 window 
                var hwndStatic = NativeMethods.CreateWindowEx(0, "Static", String.Empty, NativeMethods.WS_POPUP,
                    0, 0, 0, 0, IntPtr.Zero, IntPtr.Zero, IntPtr.Zero, IntPtr.Zero);

                // subclass it with a custom WndProc
                IntPtr prevWndProc = IntPtr.Zero;

                var newWndProc = new NativeMethods.WndProc((hwnd, msg, wParam, lParam) =>
                {
                    if (msg == NativeMethods.WM_TEST)
                        Console.WriteLine("WM_TEST processed");
                    return NativeMethods.CallWindowProc(prevWndProc, hwnd, msg, wParam, lParam);
                });

                prevWndProc = NativeMethods.SetWindowLong(hwndStatic, NativeMethods.GWL_WNDPROC, newWndProc);
                if (prevWndProc == IntPtr.Zero)
                    throw new ApplicationException();

                // post a test WM_TEST message to it
                NativeMethods.PostMessage(hwndStatic, NativeMethods.WM_TEST, IntPtr.Zero, IntPtr.Zero);

                // BlockingCollection blocks without pumping, NativeMethods.WM_TEST never arrives
                try { var task = tasks.Take(); }
                catch (Exception e) { Console.WriteLine(e.Message); }

                if (pump)
                {
                    // NativeMethods.WM_TEST will arrive, because Win32 MessageBox pumps
                    Console.WriteLine("Now start pumping...");
                    NativeMethods.MessageBox(IntPtr.Zero, "Pumping messages, press OK to stop...", String.Empty, 0);
                }
            });

            thread.SetApartmentState(ApartmentState.STA);
            thread.Start();

            Thread.Sleep(2000);

            // this causes the STA thread to end
            tasks.CompleteAdding();

            thread.Join();
        }

        static void Main(string[] args)
        {
            Console.WriteLine("Testing without pumping...");
            RunStaThread(false);

            Console.WriteLine("\nTest with pumping...");
            RunStaThread(true);

            Console.WriteLine("Press Enter to exit");
            Console.ReadLine();
        }
    }

    // Interop
    static class NativeMethods
    {
        [DllImport("user32")]
        public static extern IntPtr SetWindowLong(IntPtr hwnd, int nIndex, WndProc newProc);

        [DllImport("user32")]
        public static extern IntPtr CallWindowProc(IntPtr lpPrevWndFunc, IntPtr hwnd, int msg, int wParam, int lParam);

        [DllImport("user32.dll")]
        public static extern IntPtr CreateWindowEx(int dwExStyle, string lpClassName, string lpWindowName, int dwStyle, int x, int y, int nWidth, int nHeight, IntPtr hWndParent, IntPtr hMenu, IntPtr hInstance, IntPtr lpParam);

        [DllImport("user32.dll")]
        public static extern bool PostMessage(IntPtr hwnd, uint msg, IntPtr wParam, IntPtr lParam);

        [DllImport("user32.dll")]
        public static extern int MessageBox(IntPtr hwnd, string text, String caption, int options);

        public delegate IntPtr WndProc(IntPtr hwnd, int msg, int wParam, int lParam);

        public const int GWL_WNDPROC = -4;
        public const int WS_POPUP = unchecked((int)0x80000000);
        public const int WM_USER = 0x0400;

        public const int WM_TEST = WM_USER + 1;
    }
}

产生输出:

测试无需抽水...
collection参数为空,并已被标记为完成添加。

抽水测试...
collection参数为空,并已被标记为完成添加。
现在开始抽水...
WM_TEST已处理
按Enter退出

阅读 288

收藏
2020-05-19

共1个答案

一尘不染

我对您的问题的理解:您StaTaskScheduler仅用于为旧版COM对象组织经典的COM
STA公寓。您 不在
的STA线程上运行WinForms或WPF核心消息循环StaTaskScheduler。也就是说,你没有使用像什么Application.RunApplication.DoEvents或者Dispatcher.PushFrame该线程内。如果这是一个错误的假设,请纠正我。

就其本身而言,StaTaskScheduler 不会 在其创建的STA线程上 安装
任何同步上下文。因此,您依赖CLR为您发送消息。我仅克里斯·布鲁姆(Chris
Brumme)的《
CLR
中的公寓和泵送》中发现了一个隐式确认,即CLR在STA线程上泵送

我一直说,在STA线程上调用时,托管阻塞将执行“一些泵送”。确切知道将要泵出什么不是很好吗?不幸的是,抽水是一种妖术,无法超越凡人的理解。在Win2000及更高版本上,我们仅委托给OLE32的
CoWaitForMultipleHandles 服务。

这表明CLR在CoWaitForMultipleHandles内部将其用于STA线程。此外,用于COWAIT_DISPATCH_WINDOW_MESSAGES标记的MSDN文档提到了这一点

……在STA中,只有一小部分特殊情况的消息被分发。

我对此进行了一些研究,但无法WM_TEST从的示例代码中抽取CoWaitForMultipleHandles,我们在对您的问题的评论中进行了讨论。我的理解是,前面提到的一
小部分特殊情况的消息 实际上仅限 于某些COM编组器特定的消息,并且不包括任何常规的通用消息,例如your WM_TEST

因此,回答您的问题:

…我是否应该实现一个自定义同步上下文,该上下文将显式地使用CoWaitForMultipleHandles泵送消息,并将其安装在由StaTaskScheduler启动的每个STA线程上?

是的,我相信创建自定义同步上下文和覆盖SynchronizationContext.Wait确实是正确的解决方案。

但是,您应该避免使用CoWaitForMultipleHandles,而 改为
使用MsgWaitForMultipleObjectsEx。如果MsgWaitForMultipleObjectsEx指示队列中有待处理的消息,则应使用PeekMessage(PM_REMOVE)和手动泵送它DispatchMessage。然后,您应该继续等待所有相同SynchronizationContext.Wait调用中的句柄。

请注意,和之间 存在细微但重要的区别
。如果队列中已经看到一条消息(例如,带有或),但没有删除,则后者不会返回并保持阻塞状态。这对泵送不利,因为您的COM对象可能正在使用诸如检查消息队列之类的方法。以后可能会导致阻塞,这是不可预期的。MsgWaitForMultipleObjectsExMsgWaitForMultipleObjectsPeekMessage(PM_NOREMOVE)``GetQueueStatus``PeekMessage``MsgWaitForMultipleObjects

MsgWaitForMultipleObjectsEx带有MWMO_INPUTAVAILABLE标志的OTOH 没有这种缺点,在这种情况下会返回。

不久前,我创建了一个自定义版本的StaTaskScheduler可在此处获得ThreadAffinityTaskScheduler),以尝试解决另一个问题:维护一个具有线程亲和力的线程池以用于后续await继续。如果跨多个使用STA COM对象,则线程亲和性
至关重要awaitsStaTaskScheduler仅当其原始存储池限制为1个线程时,才会显示此行为。

因此,我继续尝试了更多有关您的WM_TEST案例的实验。最初,我SynchronizationContext在STA线程上安装了标准类的实例。该WM_TEST消息没有得到抽,这是预期。

然后我重写SynchronizationContext.Wait以将其转发给SynchronizationContext.WaitHelper。它确实被调用了,但是仍然没有启动。

最后,我实现了功能全面的消息泵循环,这是它的核心部分:

// the core loop
var msg = new NativeMethods.MSG();
while (true)
{
    // MsgWaitForMultipleObjectsEx with MWMO_INPUTAVAILABLE returns,
    // even if there's a message already seen but not removed in the message queue
    nativeResult = NativeMethods.MsgWaitForMultipleObjectsEx(
        count, waitHandles,
        (uint)remainingTimeout,
        QS_MASK,
        NativeMethods.MWMO_INPUTAVAILABLE);

    if (IsNativeWaitSuccessful(count, nativeResult, out managedResult) || WaitHandle.WaitTimeout == managedResult)
        return managedResult;

    // there is a message, pump and dispatch it
    if (NativeMethods.PeekMessage(out msg, IntPtr.Zero, 0, 0, NativeMethods.PM_REMOVE))
    {
        NativeMethods.TranslateMessage(ref msg);
        NativeMethods.DispatchMessage(ref msg);
    }
    if (hasTimedOut())
        return WaitHandle.WaitTimeout;
}

这确实起作用,WM_TEST被抽出。以下是测试的改编版本:

public static async Task RunAsync()
{
    using (var staThread = new Noseratio.ThreadAffinity.ThreadWithAffinityContext(staThread: true, pumpMessages: true))
    {
        Console.WriteLine("Initial thread #" + Thread.CurrentThread.ManagedThreadId);
        await staThread.Run(async () =>
        {
            Console.WriteLine("On STA thread #" + Thread.CurrentThread.ManagedThreadId);
            // create a simple Win32 window
            IntPtr hwnd = CreateTestWindow();

            // Post some WM_TEST messages
            Console.WriteLine("Post some WM_TEST messages...");
            NativeMethods.PostMessage(hwnd, NativeMethods.WM_TEST, new IntPtr(1), IntPtr.Zero);
            NativeMethods.PostMessage(hwnd, NativeMethods.WM_TEST, new IntPtr(2), IntPtr.Zero);
            NativeMethods.PostMessage(hwnd, NativeMethods.WM_TEST, new IntPtr(3), IntPtr.Zero);
            Console.WriteLine("Press Enter to continue...");
            await ReadLineAsync();

            Console.WriteLine("After await, thread #" + Thread.CurrentThread.ManagedThreadId);
            Console.WriteLine("Pending messages in the queue: " + (NativeMethods.GetQueueStatus(0x1FF) >> 16 != 0));

            Console.WriteLine("Exiting STA thread #" + Thread.CurrentThread.ManagedThreadId);
        }, CancellationToken.None);
    }
    Console.WriteLine("Current thread #" + Thread.CurrentThread.ManagedThreadId);
}

输出

初始线程#9
在STA线程#10上
发布一些WM_TEST消息...
按Enter继续...
WM_TEST已处理:1
WM_TEST已处理:2
WM_TEST已处理:3

等待之后,线程#10
队列中的待处理消息:False
退出STA线程#10
当前线程#12
按任何一个键退出

请注意,此实现同时支持线程亲缘关系(它位于之后的#10线程上await)和消息泵送。完整的源代码包含可重复使用的部分(ThreadAffinityTaskSchedulerThreadWithAffinityContext),可在此处作为独立的控制台应用程序使用。它尚未经过全面测试,因此使用时需您自担风险。

2020-05-19