谈.Net委托与线程——创建无阻塞的异步调用

前言

本文大部分内容来自于mikeperetz的Asynchronous Method Invocation及本人的一些个人体会所得,希望对你有所帮助。原英文文献可以在codeproject中搜索到。

介绍

这篇文章将介绍异步调用的实现机制及如何调用异步方法。大多数.NET开发者在经过delegate、Thread、AsynchronousInvocation之后,通常都会对以上概念产生混淆及误用。实际上,以上概念是.NET2.0版本中对并行编程的核心支持,基于概念上的错误认识有可能导致在实际的编程中,无法利用异步调用的特性优化我们的程序,例如大数据量加载引起的窗体”假死”。事实上这并不是一个困难的问题,该文将以一种逐层深入、抽丝剥茧的方式逐渐深入到异步编程的学习中。

同步与异步

大多数人并不喜欢阅读大量的文字说明,而喜欢直接阅读代码,因此,我们在下文中将主要以代码的形式阐述同步与异步的调用。

同步方法调用

假设我们有一个函数,它的功能是将当前线程挂起3秒钟。

staticvoid Sleep()
{
Thread.Sleep(3000);
}

通常,当你的程序在调用Sleep后,它将等待3秒钟的时间,在这3秒钟时间内,你不能做任何其他操作。3秒之后,控制权被交回给调用线程(通常也就是你的主线程,即WinForm程序的UI线程)。这种类型的调用称为同步,本次调用顺序如下:

●  调用Sleep();

●  Sleep()执行中;

●  Sleep()执行完毕,控制权归还调用线程。

我们再次调用Sleep()函数,不同的是,我们要基于委托来完成这次调用。一般为了将函数绑定在委托中,我们要定义与函数返回类型、参数值完全一致的委托,这稍有点麻烦。但.NET内部已经为我们定义好了一些委托,例如MethodInvoker,这是一种无返回值、无参数的委托签名,这相当于你自定义了一种委托:

publicdelegatevoid SimpleHandler();

执行以下代码:

MethodInvoker invoker =new MethodInvoker(Sleep);
invoker.Invoke();

我们使用了委托,但依然是同步的方式。主线程仍然要等待3秒的挂起,然后得到响应。

注意:Delegate.Invoke是同步方式的。

异步方法调用

如何在调用Sleep()方法的同时,使主线程可以不必等待Sleep()的完成,一直能够得到相应呢?这很重要,它意味着在函数执行的同时,主线程依然是非阻塞状态。在后台服务类型的程序中,非阻塞的状态意味着该应用服务可以在等待一项任务的同时去接受另一项任务;在传统的WinForm程序中,意味着主线程(即UI线程)依然可以对用户的操作得到响应,避免了”假死”。我们继续调用Sleep()函数,但这次要引入BeginInvoke。

MethodInvoker invoker =new MethodInvoker(Sleep);
invoker.BeginInvoke(null, null);

●  注意BeginInvoke这行代码,它会执行委托所调用的函数体。同时,调用BeginInvoke方法的线程(以下简称为调用线程)会立即得到响应,而不必等待Sleep()函数    的完成。

●  以上代码是异步的,调用线程完全可以在调用函数的同时处理其他工作,但是不足的是我们仍然不知道对于Sleep()函数的调用何时会结束,这是下文将要解决的问    题。

●  BeginInvoke可以以异步的方式完全取代Invoke,我们也不必担心函数包含参数的情况,下文介绍传值问题。

注意:Delegate.BeginInvoke是异步方式的。如果你要执行一项任务,但并不关心它何时完成,我们就可以使用BeginInvoke,它不会带来调用线程的阻塞。

对于异步调用,.NET内部究竟做了什么?

一旦你使用.NET完成了一次异步调用,它都需要一个线程来处理异步工作内容(以下简称异步线程),异步线程不可能是当前的调用线程,因为那样仍然会造成调用线程的阻塞,与同步无异。事实上,.NET会将所有的异步请求队列加入线程池,以线程池内的线程处理所有的异步请求。对于线程池似乎不必了解的过于深入,但我们仍需要关注以下几点内容:

●  Sleep()的异步调用会在一个单独的线程内执行,这个线程来自于.NET线程池。

●  .NET线程池默认包含25个线程,你可以改变这个值的上限,每次异步调用都会使用其中某个线程执行,但我们并不能控制具体使用哪一个线程。

●  线程池具备最大线程数目上限,一旦所有的线程都处于忙碌状态,那么新的异步调用将会被置于等待队列,直到线程池产生了新的可用线程,因此对于大量异步请      求,我们有必要关注请求数量,否则可能造成性能上的影响。

简单了解线程池

为了暴露线程池的上限,我们修改Sleep()函数,将线程挂起的时间延长至30s。在代码的运行输出结果中,我们需要关注以下内容:

●  线程池内的可用线程数量。

●  异步线程是否来自于线程池。

●  线程托管ID值。

上文已经提到,.NET线程池默认包含25个线程,因此我们连续调用30次异步方法,这样可以在第25次调用后,看看线程池内部究竟发生了什么。

技术分享
privatevoid Sleep()
{
int intAvailableThreads, intAvailableIoAsynThreds;

// 取得线程池内的可用线程数目,我们只关心第一个参数即可
ThreadPool.GetAvailableThreads(out intAvailableThreads,
out intAvailableIoAsynThreds);

// 线程信息
string strMessage =
String.Format("是否是线程池线程:{0},线程托管ID:{1},可用线程数:{2}",
Thread.CurrentThread.IsThreadPoolThread.ToString(),
Thread.CurrentThread.GetHashCode(),
intAvailableThreads);

Console.WriteLine(strMessage);

Thread.Sleep(30000);
}

privatevoid CallAsyncSleep30Times()
{
// 创建包含Sleep函数的委托对象
MethodInvoker invoker =new MethodInvoker(Sleep);

for (int i =0; i <30; i++)
{
// 以异步的形式,调用Sleep函数30次
invoker.BeginInvoke(null, null);
}
}
技术分享

输出结果:

技术分享

对于输出结果,我们可以总结为以下内容:

●  所有的异步线程都来自于.NET线程池。

●  每次执行一次异步调用,便产生一个新的线程;同时可用线程数目减少。

●  在执行异步调用25次后,线程池中不再有空闲线程。此时,应用程序会等待空闲线程的产生。

●  一旦线程池内产生了空闲线程,它会立即被分配给异步任务等待队列,之后线程池中仍然不具备空闲线程,应用程序主线程进入挂起状态继续等待空闲线程,这样      的调用一直持续到异步调用被执行完30次。

针对以上结果,我们对于异步调用可以总结为以下内容:

●  每次异步调用都在新的线程中执行,这个线程来自于.NET线程池。

●  线程池有自己的执行上限,如果你想要执行多次耗费时间较长的异步调用,那么线程池有可能进入一种”线程饥饿”状态,去等待可用线程的产生。

BeginInvoke和EndInvoke

我们已经知道,如何在不阻塞调用线程的情况下执行一个异步调用,但我们无法得知异步调用的执行结果,及它何时执行完毕。为了解决以上问题,我们可以使用EndInvoke。EndInvoke在异步方法执行完成前,都会造成线程的阻塞。因此,在调用BeginInvoke之后调用EndInvoke,效果几乎完全等同于以阻塞模式执行你的函数(EndInvoke会使调用线程挂起,一直到异步函数执行完毕)。但是,.NET是如何将BeginInvoke和EndInvoke进行绑定呢?答案就是IAsyncResult。每次我们使用BeginInvoke,返回值都是IAsyncResult类型,它是.NET追踪异步调用的关键值。每次异步调用之后的结果如何?如果要了解具体执行结果,IAsyncResult便可视为一个标签。通过这个标签,你可以了解异步调用何时执行完毕,更重要的是,它可以保存异步调用的参数传值,解决异步函数上下文问题。

我们现在通过几个例子来了解IAsyncResult。如果之前对它了解不多,那么就需要耐心的将它领悟,因为这种类型的调用是.NET异步调用的关键内容。

技术分享
privatevoid SleepOneSecond()
{
// 当前线程挂起1秒
Thread.Sleep(1000);
}

privatevoid UsingEndInvoke()
{
// 创建一个指向SleepOneSecond的委托
MethodInvoker invoker =new MethodInvoker(SleepOneSecond);

// 开始执行SleepOneSecond,但这次异步调用我们传递一些参数
// 观察Delegate.BeginInvoke()的第二个参数
IAsyncResult tag = invoker.BeginInvoke(null, "passing some state");

// 应用程序在此处会造成阻塞,直到SleepOneSecond执行完成
invoker.EndInvoke(tag);

// EndInvoke执行完毕,取得之前传递的参数内容
string strState = (string)tag.AsyncState;

Console.WriteLine("EndInvoke的传递参数"+ tag.AsyncState.ToString());
}
技术分享

输出结果:

 技术分享

回到文章初始提到的”窗体动态更新”问题,如果你将上述代码运行在一个WinForm程序中,会发现窗体依然陷入”假死”。对于这种情况,你可能会陷入疑惑:之前说异步函数都执行在线程池中,因此可以肯定异步函数的执行不会引起UI线程的忙碌,但为什么窗体依然陷入了”假死”?问题就在于EndInvoke。EndInvoke此时扮演的角色就是”线程锁”,它充当了一个调用线程与异步线程之间的调度器,有时调用线程需要使用异步函数的执行结果,那么调度线程就需要在异步执行完之前一直等待,直到得到结果方可继续运行。EndInvoke一方面负责监听异步函数的执行状况,一方面将调用线程挂起。

因此在Win Form环境下,UI线程的”假死”并不是因为线程忙碌造成,而是被EndInvoke”善意的”暂时封锁,它只是为了等待异步函数的完成。

我们可以对EndInvoke总结如下:

●  在执行EndInvoke时,调用线程会进入挂起状态,一直到异步函数执行完成。

●  使用EndInvoke可以使应用程序得知异步函数何时执行完毕。

●  如果将上述写法称为”异步”,你一定觉得这种”异步”徒具其名,虽然知道异步函数何时执行完毕,也得到了异步函数的传值,但我们的调用线程仍然会等待函数执行完毕,在等待过程中线程阻塞,实际上与同步调用无异。

如何捕捉异常?

现在我们把问题稍微复杂化,考虑异步函数抛出异常的一种情形。我们需要了解在何处捕捉到异常,是BeginInvoke,还是EndInvoke?甚至是有没有可能无法捕捉异常?答案是EndInvoke。BeginInvoke的工作只是开始线程池对于异步函数的执行工作,EndInvoke则需要处理函数执行完成的所有信息,包括其中产生的异常。

技术分享
privatevoid SleepOneSecond()
{
Thread.Sleep(3000);

thrownew Exception("Here Is An Async Function Exception");
}

privatevoid UsingEndInvoke()
{
// 创建一个指向SleepOneSecond的委托
MethodInvoker invoker =new MethodInvoker(SleepOneSecond);

// 开始执行SleepOneSecond,但这次异步调用我们传递一些参数
// 观察Delegate.BeginInvoke()的第二个参数
IAsyncResult tag = invoker.BeginInvoke(null, "passing some state");

try
{
// 应用程序在此处会造成阻塞,直到SleepOneSecond执行完成
invoker.EndInvoke(tag);
}
catch (Exception ex)
{
// 此处可以捕捉异常
MessageBox.Show(ex.Message);
}

// EndInvoke执行完毕,取得之前传递的参数内容
string strState = (string)tag.AsyncState;

Console.WriteLine("EndInvoke的传递参数"+ tag.AsyncState.ToString());
}
技术分享

执行以上代码后,你将发现只有在使用EndInvoke时,才会捕捉到异常,否则异常将丢失。需要注意的是,直接在编译器中运行程序是无法产生捕获异常的,只有在Debug、Release环境下运行,异常才会以对话框的形式直接弹出。

向函数中传递参数

现在我们来改变一下异步函数,让它接收一些参数。

技术分享
privatestring FuncWithParameters(int param1, string param2, ArrayList param3)
{
// 我们在这里改变参数值
param1 =100;
param2 ="hello";
param3 =new ArrayList();

return"thank you for reading me";
}
技术分享

下面我们使用BeginInvoke与EndInvoke来调用这个函数,首先,我们创建一个匹配该函数的委托签名。

publicdelegatestring DelegateWithParameters(int param1, string param2, ArrayList param3);

我们可以将BeginInvoke和EndInvoke视为将异步函数分割为两部分的特殊函数。BeginInvoke通过自己的两个参数值(一个AsyncCallBack委托,一个object对象)来接收传入参数,EndInvoke用于计算传出参数(标记了out或者ref的参数)和函数返回值。

现在我们回到自己的函数FuncWithParameters,param1、param2、param3是传入值,同时,它们也作为BeginInvoke的参数来处理;函数的返回值是string类型,它将作为EndInvoke的返回类型。比较酷的是,编译器可以通过委托类型,来自动为BeginInvoke和EndInvoke生成正确的参数与返回值类型。

注意我们在异步函数中为参数分配了新的值,这样可以检验这些参数在调用异步函数后,究竟会传出什么样的值……

技术分享
privatevoid CallFuncWithParameters()
{
// 创建几个参数
string strParam ="Param1";
int intValue =100;
ArrayList list =new ArrayList();
list.Add("Item1");

// 创建委托对象
DelegateWithParameters delFoo =
new DelegateWithParameters(FuncWithParameters);

// 调用异步函数
IAsyncResult tag =
delFoo.BeginInvoke(intValue, strParam, list, null, null);

// 通常调用线程会立即得到响应
// 因此你可以在这里进行一些其他处理

// 执行EndInvoke来取得返回值
string strResult = delFoo.EndInvoke(tag);

Trace.WriteLine("param1: "+ intValue);
Trace.WriteLine("param2: "+ strParam);
Trace.WriteLine("ArrayList count: "+ list.Count);
}
技术分享

技术分享

我们的异步函数对参数的改变并没有影响其传出值,现在我们把ArrayList变为ref参数,看看会给EndInvoke带来什么变化。

技术分享
publicdelegatestring DelegateWithParameters(outint param1, string param2, ref ArrayList param3);

privatestring FuncWithParameters(outint param1, string param2, ref ArrayList param3)
{
// 我们在这里改变参数值
param1 =300;
param2 ="hello";
param3 =new ArrayList();

return"thank you for reading me";
}

privatevoid CallFuncWithParameters()
{
// 创建几个参数
string strParam ="Param1";
int intValue =100;
ArrayList list =new ArrayList();
list.Add("Item1");

// 创建委托对象
DelegateWithParameters delFoo =
new DelegateWithParameters(FuncWithParameters);

// 调用异步函数
IAsyncResult tag =
delFoo.BeginInvoke(out intValue, strParam, ref list, null, null);

// 通常调用线程会立即得到响应
// 因此你可以在这里进行一些其他处理

// 调用EndInvoke,发现intValue和list可以作为参数被传出,
// 是因为他们可以被异步函数更新
string strResult = delFoo.EndInvoke(out intValue, ref list, tag);

Trace.WriteLine("param1: "+ intValue);
Trace.WriteLine("param2: "+ strParam);
Trace.WriteLine("ArrayList count: "+ list.Count);
}
技术分享

技术分享

param2没有变化,因为它是输入参数;param1作为输出参数,被更新为300;ArrayList的值已被重新分配,我们可以发现它的引用被指向了一个空元素的ArrayList对象(初始引用已丢失)。通过以上实例,我们应该能理解参数是如何在BeginInvoke与EndInvoke之间传递的。现在我们来尝试完成一个非阻塞模式下的异步调用,这是个重头戏!

了解IAsyncResult

         现在我们已经了解,EndInvoke可以给我们提供传出参数与更新后的ref参数;也可以向我们导出异步函数中的异常信息。例如,我们使用BeginInvoke调用了异步函数Sleep,它开始执行。之后调用EndInvoke,可以获取Sleep何时执行完成。但如果我们在Sleep执行完成20分钟后,才去调用EndInvoke呢?EndInvoke仍然会给我们提供传出值及异步中的异常(假如产生了异常),那么这些信息到底存储在哪里?EndInvoke如何在函数执行如此久之后仍然能够调用这些返回值?答案就在于IAsyncResult对象。EndInvoke每次在执行后,都会调用一个该对象作为参数,它包括以下信息:

●  异步函数是否已经完成

●  对调用了BeginInvoke方法的委托的引用

●  所有的传出参数及它们的值

●  所有的ref参数及它们的更新值

●  函数的返回值

●  异步函数产生的异常

IAsyncResult看起来空无一物,这是因为它仅仅是一个包含了若干属性的接口;而实际上,它是一个System.Runtime.Remoting.Messaging.AsyncResult对象。

如果我们在编译器运行期间监视tag的状态,就会发现,AsyncResult对象下包含类型为System.Runtime.Remoting.Messaging.ReturnMessage的对象。点开它,就会发现这个标签中包含的所有的异步函数的执行信息!

技术分享

使用Callback委托:好莱坞原则”不要联系我,我会联系你”

目前为止,我们需要了解如何传递参数、如何捕捉异常;了解我们的异步方法其实是执行在线程池中的某个具体线程对象中。唯一未涉及到的就是如何在异步函数执行完成后得到通知。毕竟,阻塞调用线程等待函数结束的做法始终差强人意。为了实现这个目的,我们必须为BeginInvoke函数提供一个Callback委托。观察一下两个函数:

技术分享
private void CallSleepWithoutOutAndRefParameterWithCallback()
{
// 创建几个参数
string strParam = "Param1";
int intValue = 100;
ArrayList list = new ArrayList();
list.Add("Item1");

// 创建委托对象
DelegateWithParameters delSleep =
new DelegateWithParameters(FuncWithParameters);

delSleep.BeginInvoke(out intValue, strParam, ref list, new AsyncCallback(CallBack), null);
}

private void CallBack(IAsyncResult tag)
{
// 我们的int参数标记了out,因此此处不能定义初始值
int intOutputValue;
ArrayList list = null;

// IAsyncResult实际上就是AsyncResult对象,
// 取得它也就可以从中取得用于调用函数的委托对象
AsyncResult result = (AsyncResult)tag;

// 取得委托
DelegateWithParameters del = (DelegateWithParameters)result.AsyncDelegate;

// 取得委托后,我们需要在其上执行EndInvoke。
// 这样就可以取得函数中的执行结果。
string strReturnValue = del.EndInvoke(out intOutputValue, ref list, tag);

Trace.WriteLine(strReturnValue);
}
技术分享

在这里,我们向BeginInvoke传递了Callback回调函数。这样.NET就可以在FuncWithParameters()执行完后调用Callback函数。在之前,我们已经了解到,必须使用EndInvoke来取得函数的执行结果,注意上面为了使用EndInvoke,我们使用了一些特殊操作来取得delegate对象。

// IAsyncResult实际上就是AsyncResult对象,
// 取得它也就可以从中取得用于调用函数的委托对象
AsyncResult result = (AsyncResult)tag;

// 取得委托
DelegateWithParameters del = (DelegateWithParameters)result.AsyncDelegate;

最后一个问题:回调函数执行在什么线程?

总而言之,Callback函数(回调函数)是.NET通过我们的委托对象来实现调用的。我们可能会希望得到一个更清晰的画面:回调函数究竟执行在那个线程?为了达到这个目的:我们在函数中加入线程日志。

技术分享
private string FuncWithParameters(out int param1, string param2, ref ArrayList param3)
{
// 记录线程信息
Trace.WriteLine("In FuncWithParameters: Thread Pool? "
+ Thread.CurrentThread.IsThreadPoolThread.ToString() +
" Thread Id: " + Thread.CurrentThread.GetHashCode());

// 挂起秒以模拟线程在这里执行了耗时较长的任务
Thread.Sleep(4000);

// 我们在这里改变参数值
param1 = 300;
param2 = "hello";
param3 = new ArrayList();

// 这里执行一些耗时较长的工作
Thread.Sleep(3000);

return "thank you for reading me";
}

private void CallBack(IAsyncResult tag)
{
// 回调函数在什么线程执行?
Trace.WriteLine("In Callback: Thread Pool? "
+ Thread.CurrentThread.IsThreadPoolThread.ToString() +
" Thread Id: " + Thread.CurrentThread.GetHashCode());

// 我们的int参数标记了out,因此此处不能定义初始值
int intOutputValue;
ArrayList list = null;

// IAsyncResult实际上就是AsyncResult对象,
// 取得它也就可以从中取得用于调用函数的委托对象
AsyncResult result = (AsyncResult)tag;

// 取得委托
DelegateWithParameters del = (DelegateWithParameters)result.AsyncDelegate;

// 取得委托后,我们需要在其上执行EndInvoke。
// 这样就可以取得函数中的执行结果。
string strReturnValue = del.EndInvoke(out intOutputValue, ref list, tag);

Trace.WriteLine(strReturnValue);
}
技术分享

我将CallSleepWithoutOutAndRefParameterWithCallback()函数放在某个窗体按钮的单击事件中,并且连续点击三次,将得到这样的执行结果:

 技术分享

注意FuncWithParameter函数被连续执行了3次,它们依次被执行在相互独立的线程上,并且这些线程来自于线程池。而他们各自的回调函数也执行在与FuncWithParameter相同的线程中。线程11执行了FuncWithParameter,3秒后,它的回调函数也执行在线程11中,线程12、13也是同样。这样,我们可以认为回调函数实际上是异步函数的一种延续。

为什么要这样做?也许是因为这样我们就不必过多的耗费线程池中的线程,达到线程复用的效果;通过执行在相同的线程,也可以避免不同的线程间传递上下文环境带来的损耗问题。

到此为止,我们在Form中执行异步函数,将会得到一个完全不堵塞主线程的异步调用,这就是我们所希望的效果!

应用场景模拟

现在我们了解了BeginInvoke、EndInvoke、Callback的使用及特点,如何将他们运用到我们的Win Form程序中,使数据的获取不再阻塞UI线程,实现异步加载数据的效果?我们现在通过一个具体实例来加以说明。

场景描述:将系统的操作日志从数据库中查询出来,并且加载到前端的ListBox控件中。

要求:查询数据库的过程是个时间复杂度较高的作业,但我们的窗体在执行查询时,不允许出现”假死”的情况。

技术分享
private void button1_Click(object sender, EventArgs e)
{
GetLogDelegate getLogDel = new GetLogDelegate(GetLogs);

getLogDel.BeginInvoke(new AsyncCallback(LogTableCallBack), null);
}

public delegate DataTable GetLogDelegate();

/// <summary>
/// 从数据库中获取操作日志,该操作耗费时间较长,
/// 且返回数据量较大,日志记录可能超过万条。
/// </summary>
/// <returns></returns>
private DataTable GetLogs()
{
string sql = "select * from ***";
DataSet ds = new DataSet();

using (OracleConnection cn = new OracleConnection(connectionString))
{
cn.Open();

OracleCommand cmd = new OracleCommand(sql, cn);

OracleDataAdapter adapter = new OracleDataAdapter(cmd);
adapter.Fill(ds);
}

return ds.Tables[0];
}

/// <summary>
/// 绑定日志到ListBox控件。
/// </summary>
/// <param name="tag"></param>
private void LogTableCallBack(IAsyncResult tag)
{
AsyncResult result = (AsyncResult)tag;
GetLogDelegate del = (GetLogDelegate)result.AsyncDelegate;

DataTable logTable = del.EndInvoke(tag);

if (this.listBox1.InvokeRequired)
{
this.listBox1.Invoke(new MethodInvoker(delegate()
{
BindLog(logTable);
}));
}
else
{
BindLog(logTable);
}
}

private void BindLog(DataTable logTable)
{
this.listBox1.DataSource = logTable;
}
技术分享

以上代码在获取数据时,将不会带来任何UI线程的阻塞。

总结:

写下本文的主要目的在于总结以非阻塞模式调用函数的方法,我们应当了解以下结论;

●  Delegate会对BeginInvoke与EndInvoke的调用生成正确的参数,所有的传出参数、返回值与异常都可以在EndInvoke中取得。

●  不要忘记BeginInvoke是取自线程池中的线程,要注意防止异步任务的数量超过了线程池的线程上限值。

●  CallBack委托表示对与异步任务的回调,它将使我们从阻塞的困扰中彻底解脱。

●  截止到目前为止,UI线程在处理异步工作时将不再阻塞,而只有在更新UI具体内容时才会发生阻塞。

问题

我们将发现,一旦数据量较大,我们的UI线程在装载这些数据到控件的时候,依然会发生”假死”的情况。这是正常的,因为我们只保证了获取数据与UI线程的独立性,并没有保证更新UI带来的线程忙碌问题,”假死”正是UI线程忙碌带来的一个用户感受,如何避免这种情况,下文继续介绍。

 

郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。