用 C# 简单模拟 Google Go 语言中的 Channel 和 goroutine 机制

前段时间尝试了一点 Google 的 Go 语言,感觉其很多特性还是不错的。Go 语言旨在结合传统编译型的静态语言和解释型的动态语言的优点,在其中找到一个平衡。从而打造一个既快速(编译执行),又方便编程的语言(动态语言往往语法简单快捷)。同时,Go 语言还具备丰富的特性以支持并发编程,这在现在多核非常普及的情况下,是很重要和强大的一个功能。

Go 语言的并发特性主要有 goroutine, channel 等。

goroutine - 可以大致理解为一种轻量级的线程(或微线程),它是一种“分配在同一个地址空间内的,能够并行执行的函数”。同时,它是轻量级的,不需要像分配线程那样分配独立的栈空间。所以理论上讲,我们可以很容易的分配很多个 goroutine, 让它们并发执行,而其开销则比多线程程序要小得多,从而可以让程序支持比较大的并发性。

channel - 顾名思义,就是通道。通道的目的是用来传递数据。在一个通道上我们可以执行数据的发送(Send)和接受(Receive)操作。对于非缓冲的 channel 而言,Receive 方法执行时,会判断该通道上是否有值,如果没有就会等待(阻塞),直到有一个值为止。同样,在 channel 上有值,而尚未被一个 Receiver 接受的时候,Send 方法也会阻塞,直到 Channel 变空。这样,通过一个简单的机制就可以保证 Send 和 Receive 总是在不同的时间执行的,而且只有 Send 之后才能 Receive. 这样就避免了常规的多线程编程中数据共享的问题。正如 Go 语言的文档一句话所说:

Do not communicate by sharing memory; instead, share memory by communicating.

不要通过共享内存来沟通;而是通过沟通来共享内存。

在常规的多线程编程里,我们总是定义好一些类变量,如果这些变量有可能被多个线程同时访问,那么就需要加锁。这样带来了一定的编程复杂性,如果代码写的稍有bug,则会导致读/写到错误的值。

而通过 channel 来沟通,我们得到了一个更为清晰的沟通方式。两个线程(或者 goroutine)要读写相同的数据,则创建一个通道,双方通过对这个通道执行 Send / Receive 的操作来设值或取值即可,相对而言,比较不容易出错。

为了更好的理解这个原理,我尝试了在 C# 中实现类似的功能。

相对于 goroutine, 我没有去实现微线程,因为这需要更复杂的调度机制(打算接下来进一步研究这方面)。我们可以暂时利用 Thread 来简单的模拟之。

而 Channel, 则用 Semaphone 控制同步的 Send / Receive 就可以了。

首先让我们来实现一个简单的 Channel,思想上面已经说过了:

/// <summary>
/// 先实现简单的没有缓冲的 Channel.
/// </summary>
/// <typeparam name="T"></typeparam>
public class Channel<T>
{
	T _value;

	// 开始不能 Receive.
	Semaphore _canReceive = new Semaphore(0, 1);

	// 开始没有值,可以 Send
	Semaphore _canSend = new Semaphore(1, 1);

	public T Receive()
	{
		// 等待有值
		_canReceive.WaitOne();

		T value = _value;

		// 通知可以发送新的值了
		_canSend.Release();

		return value;
	}

	public void Send(T value)
	{
		// 如果是非缓冲的情况,则为阻塞式的,需要等待已有的值被一个 Receiver 接受完,
		// 才能发送新值,不能连续 Send
		_canSend.WaitOne();
		_value = value;

		// 通知可以接收了
		_canReceive.Release();
	}
}

接下来粗略的模拟实现 goroutine 的语法:

public static class GoLang
{
	/// <summary>
	/// 先简单的用线程来模拟 goroutine. 因为使用 channel 通信,所以
	/// 不需考虑线程之间的数据共享/同步问题
	/// </summary>
	/// <param name="action"></param>
	public static void go(Action action)
	{
		new Thread(new ThreadStart(action)).Start();
	}

}

有了这些,我们可以写一个 test case 来验证了。下面的代码简单的创建一个并发的 routine,分别做整数的 send, receive 操作,以验证是否能正确的发送和接受值:

/// <summary>
/// 测试多个 Sender 多个 Receiver 同时在一个 channel 上发送/接受消息
/// </summary>
private static void Test1()
{
	var ch = new Channel<int>();

	// 启动多个 Sender
	GoLang.go(() =>
	{
		var id = Thread.CurrentThread.ManagedThreadId;
		for (var i = 0; i < 7; i++)
		{
			Thread.Sleep(new Random((int)DateTime.Now.Ticks).Next(3000));
			Console.WriteLine("线程{0}发送值: {1}", id, i);
			ch.Send(i);
		}
	});

	GoLang.go(() =>
	{
		var id = Thread.CurrentThread.ManagedThreadId;
		for (var i = 7; i < 15; i++)
		{
			Thread.Sleep(new Random((int)DateTime.Now.Ticks).Next(3000));
			Console.WriteLine("线程{0}发送值: {1}", id, i);
			ch.Send(i);
		}
	});

	// 启动多个 Receiver
	GoLang.go(() =>
	{
		var id = Thread.CurrentThread.ManagedThreadId;
		for (var i = 0; i < 5; i++)
		{
			//Console.WriteLine("线程{0}阻塞", id);
			var value = ch.Receive();
			Console.WriteLine("线程{0}获得值: {1}", id, value);
		}
	});

	GoLang.go(() =>
	{
		var id = Thread.CurrentThread.ManagedThreadId;
		for (var i = 0; i < 5; i++)
		{
			//Console.WriteLine("线程{0}阻塞", id);
			var value = ch.Receive();
			Console.WriteLine("线程{0}获得值: {1}", id, value);
		}
	});

	GoLang.go(() =>
	{
		var id = Thread.CurrentThread.ManagedThreadId;
		for (var i = 0; i < 5; i++)
		{
			//Console.WriteLine("线程{0}阻塞", id);
			var value = ch.Receive();
			Console.WriteLine("线程{0}获得值: {1}", id, value);
		}
	});
}

再尝试实现一下 Go 语言文档里举出的一个例子 - 筛法求素数:

(见:http://golang.org/doc/go_tutorial.html, Prime numbers)

public class PrimeNumbers
{
	public void Main()
	{
		var primes = Sieve();

		// 测试:打印前100个素数
		for (var i = 0; i < 100; i++)
		{
			Console.WriteLine(primes.Receive());
		}
	}

	/// <summary>
	/// 筛法求素数
	/// </summary>
	/// <returns></returns>
	Channel<int> Sieve()
	{
		var @out = new Channel<int>();
		GoLang.go(() =>
		{
			var ch = Generate();
			for (; ; )
			{
				// 当前序列中的第一个值总是素数
				var prime = ch.Receive();

				// 将其发送到输出序列的尾部
				@out.Send(prime);

				// 用这个素数对列表进行过滤,在进入下一次循环,可以保证至少第一个数是素数
				ch = Filter(ch, prime);
			}
		});
		return @out;
	}

	/// <summary>
	/// 产生从2开始的自然数的无穷序列,这是原始数列
	/// 其开始元素 2 是一个素数。
	/// </summary>
	/// <returns></returns>
	Channel<int> Generate()
	{
		var ch = new Channel<int>();
		GoLang.go(() =>
		{
			for (var i = 2; ; i++)
			{
				ch.Send(i);
			}
		});
		return ch;
	}

	/// <summary>
	/// 从输入 channel 里逐个读取值,将不能被 prime 整除
	/// 的那些发送到输出 channel (即用 prime 对 @in 序列进行一次筛选)
	/// </summary>
	Channel<int> Filter(Channel<int> @in, int prime)
	{
		var @out = new Channel<int>();
		GoLang.go(() =>
		{
			for (; ; )
			{
				var i = @in.Receive();
				if (i % prime != 0)
				{
					@out.Send(i);
				}
			}
		});
		return @out;
	}
}

下面是整个测试工程的 Main 方法:

class Program
{
	static void Main(string[] args)
	{
		Test1();

		new PrimeNumbers().Main();

		Console.ReadLine();
	}
}

因为代码中已经详细注释了,不多做解释。可以看到,利用 Channel 的概念(好像和 Reactive Programming 有点关系?),我们可以更清晰的构建多线程或者并发的应用程序。

学习其他语言,并不是为了学习其特定的语法,而是学习一种思想。

作者:木野狐@博客园

转载请注明出处。

本文来自:博客园

感谢作者:RChen

查看原文:用 C# 简单模拟 Google Go 语言中的 Channel 和 goroutine 机制

郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。