TL; DR: _由运行的任务中的死锁StaTaskScheduler。_长版:
StaTaskScheduler
我使用的是StaTaskScheduler从ParallelExtensionsExtras中通过平行小组,举办一些遗留STA COM对象由第三方提供。StaTaskScheduler实现细节的描述如下:
好消息是,TPL的实现能够在MTA或STA线程上运行,并考虑到了诸如WaitHandle.WaitAll之类的基础API的相关差异(仅在为该方法提供多个等待句柄时才支持MTA线程)。
我认为这意味着TPL的阻塞部分将使用等待API,该API会CoWaitForMultipleHandles在STA线程上调用时泵送消息(例如),以避免出现死锁情况。
CoWaitForMultipleHandles
在我的情况下,我相信正在发生以下情况:进程内STA COM对象A对进程外对象B进行调用,然后期望通过B作为该传出调用的一部分从B进行回调。
以简化形式:
var result = await Task.Factory.StartNew(() => { // in-proc object A var a = new A(); // out-of-proc object B var b = new B(); // A calls B and B calls back A during the Method call return a.Method(b); }, CancellationToken.None, TaskCreationOptions.None, staTaskScheduler);
问题是,a.Method(b)永不返回。据我所知,发生这种情况是因为内部的某个阻塞等待BlockingCollection<Task>不会泵送消息,因此我对引用语句的假设可能是错误的。
a.Method(b)
BlockingCollection<Task>
编辑 的在测试WinForms应用程序的UI线程上执行时(即提供TaskScheduler.FromCurrentSynchronizationContext()而不是staTaskSchedulerto Task.Factory.StartNew),可以使用相同的代码。
TaskScheduler.FromCurrentSynchronizationContext()
staTaskScheduler
Task.Factory.StartNew
解决这个问题的正确方法是什么?我是否应该实现一个自定义同步上下文,该上下文将显式地使用泵送消息CoWaitForMultipleHandles,并将其安装在由其启动的每个STA线程上StaTaskScheduler?
如果是这样,将在底层实现BlockingCollection调用我的SynchronizationContext.Wait方法吗?我可以SynchronizationContext.WaitHelper用来实施SynchronizationContext.Wait吗?
BlockingCollection
SynchronizationContext.Wait
SynchronizationContext.WaitHelper
*使用一些代码进行 *编辑 ,该代码表明在执行阻塞等待时托管的STA线程不会泵送。该代码是一个完整的控制台应用程序,可以复制/粘贴/运行:
using System; using System.Collections.Concurrent; using System.Runtime.InteropServices; using System.Threading; using System.Threading.Tasks; namespace ConsoleTestApp { class Program { // start and run an STA thread static void RunStaThread(bool pump) { // test a blocking wait with BlockingCollection.Take var tasks = new BlockingCollection<Task>(); var thread = new Thread(() => { // Create a simple Win32 window var hwndStatic = NativeMethods.CreateWindowEx(0, "Static", String.Empty, NativeMethods.WS_POPUP, 0, 0, 0, 0, IntPtr.Zero, IntPtr.Zero, IntPtr.Zero, IntPtr.Zero); // subclass it with a custom WndProc IntPtr prevWndProc = IntPtr.Zero; var newWndProc = new NativeMethods.WndProc((hwnd, msg, wParam, lParam) => { if (msg == NativeMethods.WM_TEST) Console.WriteLine("WM_TEST processed"); return NativeMethods.CallWindowProc(prevWndProc, hwnd, msg, wParam, lParam); }); prevWndProc = NativeMethods.SetWindowLong(hwndStatic, NativeMethods.GWL_WNDPROC, newWndProc); if (prevWndProc == IntPtr.Zero) throw new ApplicationException(); // post a test WM_TEST message to it NativeMethods.PostMessage(hwndStatic, NativeMethods.WM_TEST, IntPtr.Zero, IntPtr.Zero); // BlockingCollection blocks without pumping, NativeMethods.WM_TEST never arrives try { var task = tasks.Take(); } catch (Exception e) { Console.WriteLine(e.Message); } if (pump) { // NativeMethods.WM_TEST will arrive, because Win32 MessageBox pumps Console.WriteLine("Now start pumping..."); NativeMethods.MessageBox(IntPtr.Zero, "Pumping messages, press OK to stop...", String.Empty, 0); } }); thread.SetApartmentState(ApartmentState.STA); thread.Start(); Thread.Sleep(2000); // this causes the STA thread to end tasks.CompleteAdding(); thread.Join(); } static void Main(string[] args) { Console.WriteLine("Testing without pumping..."); RunStaThread(false); Console.WriteLine("\nTest with pumping..."); RunStaThread(true); Console.WriteLine("Press Enter to exit"); Console.ReadLine(); } } // Interop static class NativeMethods { [DllImport("user32")] public static extern IntPtr SetWindowLong(IntPtr hwnd, int nIndex, WndProc newProc); [DllImport("user32")] public static extern IntPtr CallWindowProc(IntPtr lpPrevWndFunc, IntPtr hwnd, int msg, int wParam, int lParam); [DllImport("user32.dll")] public static extern IntPtr CreateWindowEx(int dwExStyle, string lpClassName, string lpWindowName, int dwStyle, int x, int y, int nWidth, int nHeight, IntPtr hWndParent, IntPtr hMenu, IntPtr hInstance, IntPtr lpParam); [DllImport("user32.dll")] public static extern bool PostMessage(IntPtr hwnd, uint msg, IntPtr wParam, IntPtr lParam); [DllImport("user32.dll")] public static extern int MessageBox(IntPtr hwnd, string text, String caption, int options); public delegate IntPtr WndProc(IntPtr hwnd, int msg, int wParam, int lParam); public const int GWL_WNDPROC = -4; public const int WS_POPUP = unchecked((int)0x80000000); public const int WM_USER = 0x0400; public const int WM_TEST = WM_USER + 1; } }
产生输出:
测试无需抽水... collection参数为空,并已被标记为完成添加。 抽水测试... collection参数为空,并已被标记为完成添加。 现在开始抽水... WM_TEST已处理 按Enter退出
我对您的问题的理解:您StaTaskScheduler仅用于为旧版COM对象组织经典的COM STA公寓。您 不在 的STA线程上运行WinForms或WPF核心消息循环StaTaskScheduler。也就是说,你没有使用像什么Application.Run,Application.DoEvents或者Dispatcher.PushFrame该线程内。如果这是一个错误的假设,请纠正我。
Application.Run
Application.DoEvents
Dispatcher.PushFrame
就其本身而言,StaTaskScheduler 不会 在其创建的STA线程上 安装 任何同步上下文。因此,您依赖CLR为您发送消息。我仅在克里斯·布鲁姆(Chris Brumme)的《 CLR中的公寓和泵送》中发现了一个隐式确认,即CLR在STA线程上泵送:
我一直说,在STA线程上调用时,托管阻塞将执行“一些泵送”。确切知道将要泵出什么不是很好吗?不幸的是,抽水是一种妖术,无法超越凡人的理解。在Win2000及更高版本上,我们仅委托给OLE32的 CoWaitForMultipleHandles 服务。
这表明CLR在CoWaitForMultipleHandles内部将其用于STA线程。此外,用于COWAIT_DISPATCH_WINDOW_MESSAGES标记的MSDN文档提到了这一点:
COWAIT_DISPATCH_WINDOW_MESSAGES
……在STA中,只有一小部分特殊情况的消息被分发。
我对此进行了一些研究,但无法WM_TEST从的示例代码中抽取CoWaitForMultipleHandles,我们在对您的问题的评论中进行了讨论。我的理解是,前面提到的一 小部分特殊情况的消息 实际上仅限 于某些COM编组器特定的消息,并且不包括任何常规的通用消息,例如your WM_TEST。
WM_TEST
因此,回答您的问题:
…我是否应该实现一个自定义同步上下文,该上下文将显式地使用CoWaitForMultipleHandles泵送消息,并将其安装在由StaTaskScheduler启动的每个STA线程上?
是的,我相信创建自定义同步上下文和覆盖SynchronizationContext.Wait确实是正确的解决方案。
但是,您应该避免使用CoWaitForMultipleHandles,而 改为 使用MsgWaitForMultipleObjectsEx。如果MsgWaitForMultipleObjectsEx指示队列中有待处理的消息,则应使用PeekMessage(PM_REMOVE)和手动泵送它DispatchMessage。然后,您应该继续等待所有相同SynchronizationContext.Wait调用中的句柄。
MsgWaitForMultipleObjectsEx
PeekMessage(PM_REMOVE)
DispatchMessage
请注意,和之间 存在细微但重要的区别 。如果队列中已经看到一条消息(例如,带有或),但没有删除,则后者不会返回并保持阻塞状态。这对泵送不利,因为您的COM对象可能正在使用诸如检查消息队列之类的方法。以后可能会导致阻塞,这是不可预期的。MsgWaitForMultipleObjectsExMsgWaitForMultipleObjectsPeekMessage(PM_NOREMOVE)``GetQueueStatus``PeekMessage``MsgWaitForMultipleObjects
MsgWaitForMultipleObjects
PeekMessage(PM_NOREMOVE)``GetQueueStatus``PeekMessage``MsgWaitForMultipleObjects
MsgWaitForMultipleObjectsEx带有MWMO_INPUTAVAILABLE标志的OTOH 没有这种缺点,在这种情况下会返回。
MWMO_INPUTAVAILABLE
不久前,我创建了一个自定义版本的StaTaskScheduler(可在此处获得ThreadAffinityTaskScheduler),以尝试解决另一个问题:维护一个具有线程亲和力的线程池以用于后续await继续。如果跨多个使用STA COM对象,则线程亲和性 至关重要awaits。StaTaskScheduler仅当其原始存储池限制为1个线程时,才会显示此行为。
ThreadAffinityTaskScheduler
await
awaits
因此,我继续尝试了更多有关您的WM_TEST案例的实验。最初,我SynchronizationContext在STA线程上安装了标准类的实例。该WM_TEST消息没有得到抽,这是预期。
SynchronizationContext
然后我重写SynchronizationContext.Wait以将其转发给SynchronizationContext.WaitHelper。它确实被调用了,但是仍然没有启动。
最后,我实现了功能全面的消息泵循环,这是它的核心部分:
// the core loop var msg = new NativeMethods.MSG(); while (true) { // MsgWaitForMultipleObjectsEx with MWMO_INPUTAVAILABLE returns, // even if there's a message already seen but not removed in the message queue nativeResult = NativeMethods.MsgWaitForMultipleObjectsEx( count, waitHandles, (uint)remainingTimeout, QS_MASK, NativeMethods.MWMO_INPUTAVAILABLE); if (IsNativeWaitSuccessful(count, nativeResult, out managedResult) || WaitHandle.WaitTimeout == managedResult) return managedResult; // there is a message, pump and dispatch it if (NativeMethods.PeekMessage(out msg, IntPtr.Zero, 0, 0, NativeMethods.PM_REMOVE)) { NativeMethods.TranslateMessage(ref msg); NativeMethods.DispatchMessage(ref msg); } if (hasTimedOut()) return WaitHandle.WaitTimeout; }
这确实起作用,WM_TEST被抽出。以下是测试的改编版本:
public static async Task RunAsync() { using (var staThread = new Noseratio.ThreadAffinity.ThreadWithAffinityContext(staThread: true, pumpMessages: true)) { Console.WriteLine("Initial thread #" + Thread.CurrentThread.ManagedThreadId); await staThread.Run(async () => { Console.WriteLine("On STA thread #" + Thread.CurrentThread.ManagedThreadId); // create a simple Win32 window IntPtr hwnd = CreateTestWindow(); // Post some WM_TEST messages Console.WriteLine("Post some WM_TEST messages..."); NativeMethods.PostMessage(hwnd, NativeMethods.WM_TEST, new IntPtr(1), IntPtr.Zero); NativeMethods.PostMessage(hwnd, NativeMethods.WM_TEST, new IntPtr(2), IntPtr.Zero); NativeMethods.PostMessage(hwnd, NativeMethods.WM_TEST, new IntPtr(3), IntPtr.Zero); Console.WriteLine("Press Enter to continue..."); await ReadLineAsync(); Console.WriteLine("After await, thread #" + Thread.CurrentThread.ManagedThreadId); Console.WriteLine("Pending messages in the queue: " + (NativeMethods.GetQueueStatus(0x1FF) >> 16 != 0)); Console.WriteLine("Exiting STA thread #" + Thread.CurrentThread.ManagedThreadId); }, CancellationToken.None); } Console.WriteLine("Current thread #" + Thread.CurrentThread.ManagedThreadId); }
输出 :
初始线程#9 在STA线程#10上 发布一些WM_TEST消息... 按Enter继续... WM_TEST已处理:1 WM_TEST已处理:2 WM_TEST已处理:3 等待之后,线程#10 队列中的待处理消息:False 退出STA线程#10 当前线程#12 按任何一个键退出
请注意,此实现同时支持线程亲缘关系(它位于之后的#10线程上await)和消息泵送。完整的源代码包含可重复使用的部分(ThreadAffinityTaskScheduler和ThreadWithAffinityContext),可在此处作为独立的控制台应用程序使用。它尚未经过全面测试,因此使用时需您自担风险。
ThreadWithAffinityContext