ACE_linux:Reactor与Proactor两种模式的区别

一、概念:

Reactor与Proactor两种模式的区别。这里我们只关注read操作,因为write操作也是差不多的。下面是Reactor的做法:

  1. 某个事件处理器宣称它对某个socket上的读事件很感兴趣;

  2. 事件分离者等着这个事件的发生;

  3. 当事件发生了,事件分离器被唤醒,这负责通知先前那个事件处理器;

  4. 事件处理器收到消息,于是去那个socket上读数据了. 如果需要,它再次宣称对这个socket上的读事件感兴趣,一直重复上面的步骤;

下面再来看看真正意义的异步模式Proactor是如何做的:

  1. 事件处理器直接投递发一个读操作(当然,操作系统必须支持这个异步操作)。这个时候,事件处理器根本不关心读事件,它只管发这么个请求,它魂牵梦萦的是这个读操作的完成事件。这个事件处理器很拽,发个命令就不管具体的事情了,只等着别人(系统)帮他搞定的时候给他回个话。

  2. 事件分离器等着这个读事件的完成(比较下与Reactor的不同);

  3. 当事件分离器默默等待完成事情到来的同时,操作系统已经在一边开始干活了,它从目标读取数据,放入用户提供的缓存区中,最后通知事件分离器,这个事情我搞完了;

  4. 事件分离器通知之前的事件处理器: 你吩咐的事情搞定了;

  5. 事件处理器这时会发现想要读的数据已经乖乖地放在他提供的缓存区中,想怎么处理都行了。如果有需要,事件处理器还像之前一样发起另外一个读操作,和上面的几个步骤一样。

 

二、代码示例:

 

-----------------------------------

 Reactor:

 

 

-----------------------------------

-----------------------------------

Proactor:

技术分享
  1 #include "ace/Proactor.h"
  2 #include "ace/Asynch_Acceptor.h"
  3 
  4 class HA_Proactive_Service : public ACE_Service_Handler
  5 {
  6 public:
  7     HA_Proactive_Service()
  8     {
  9         ACE_OS::printf("Service_Handler constructed for accepter \n");
 10     }
 11     ~HA_Proactive_Service ()
 12     {
 13         if (this->handle () != ACE_INVALID_HANDLE)
 14         {
 15             ACE_OS::closesocket (this->handle ());
 16         }
 17     }
 18     
 19     virtual void open (ACE_HANDLE h, ACE_Message_Block&)
 20     {
 21         //在OPEN函数中完成读写操作
 22         this->handle (h);
 23         if (this->reader_.open (*this) != 0 ||
 24                 this->writer_.open (*this) != 0   )
 25         {
 26             ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"),
 27                         ACE_TEXT ("HA_Proactive_Service open")));
 28             
 29             return;
 30         }
 31         ACE_OS::printf("ready!\n");
 32         
 33         //异步发送
 34         send_to_remote();
 35         //异步读取
 36         reveive_from_remote();
 37         
 38         // mb is now controlled by Proactor framework.
 39         return;
 40     }
 41     
 42     //异步读完成后会调用此函数
 43     virtual void handle_read_stream
 44     (const ACE_Asynch_Read_Stream::Result &result)
 45     {
 46         ACE_Message_Block &mb = result.message_block ();
 47         if (!result.success () || result.bytes_transferred () == 0)
 48         {
 49             mb.release ();
 50             
 51             return;
 52         }
 53         //else
 54         //输出读取内容
 55         ACE_OS::printf("received:%s\n",mb.rd_ptr());
 56         mb.release();
 57         //继续下一次异步读取
 58         reveive_from_remote();
 59         
 60         return;
 61     }
 62     //异步写完成后会调用此函数
 63     virtual void handle_write_stream(const ACE_Asynch_Write_Stream::Result &result)
 64     {
 65         result.message_block ().release();
 66         ACE_OS::sleep(1);
 67         //上次发送完毕之后再接着发送一次,这次发送完成之后又会调用
 68         //handle_write_stream,所以会一直发送
 69         send_to_remote();
 70         return;
 71     }
 72     //remote 
 73     void reveive_from_remote(void)
 74     {
 75         ACE_Message_Block *mb;
 76         ACE_NEW_NORETURN (mb, ACE_Message_Block (1024));
 77         if (this->reader_.read (*mb, mb->space ()) != 0)
 78         {
 79             ACE_OS::printf("Begin read fail\n");
 80             
 81             return;
 82         }
 83     }
 84     //把当前时间发送到远端
 85     void send_to_remote(void)
 86     {
 87         std::string book = "S: ";
 88         time_t now = ACE_OS::gettimeofday().sec();
 89         book = book+ ctime(&now);
 90         ACE_Message_Block *mb = new ACE_Message_Block(100);
 91         //获取当前时间的字符串格式
 92         mb->copy(book.c_str() );
 93         //send message to accepter
 94         if (this->writer_.write(*mb,mb->length()) !=0)
 95         {
 96             ACE_OS::printf("Begin write fail\n");
 97             
 98             return;
 99         }
100         else
101         {
102             ACE_OS::printf("sended %s\n",mb->rd_ptr());
103         }
104     }
105     
106     // Listing 3
107 private:
108     ACE_Asynch_Read_Stream reader_;
109     ACE_Asynch_Write_Stream writer_;
110 };
111 
112 
113 int main(int argc, char *argv[]) 
114 {
115     int port=3000;
116     ACE_Asynch_Acceptor<HA_Proactive_Service> acceptor;
117     
118     if (acceptor.open (ACE_INET_Addr (port)) == -1)
119         return -1;
120     
121     while(true)
122         ACE_Proactor::instance ()->handle_events ();
123     
124     return 0; 
125 }
Acceptor.cpp

 

技术分享
  1 #include "ace/Proactor.h"
  2 #include "ace/Asynch_Connector.h"
  3 
  4 class HA_Proactive_Service : public ACE_Service_Handler
  5 {
  6 public:
  7     HA_Proactive_Service()
  8     {
  9         ACE_OS::printf("Service_Handler constructed for connector \n");
 10     }
 11     ~HA_Proactive_Service ()
 12     {
 13         if (this->handle () != ACE_INVALID_HANDLE)
 14         {
 15             ACE_OS::closesocket (this->handle ());
 16         }
 17     }
 18     
 19     virtual void open (ACE_HANDLE h, ACE_Message_Block&)
 20     {
 21         //在OPEN函数中完成读写操作
 22         this->handle (h);
 23         
 24         if (this->reader_.open (*this) != 0 ||
 25                 this->writer_.open (*this) != 0   )
 26         {
 27             ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"),
 28                         ACE_TEXT ("HA_Proactive_Service open")));
 29             
 30             return;
 31         }
 32         
 33         ACE_OS::printf("connceted!\n");
 34         //异步发送
 35         send_to_remote();
 36         //异步读取
 37         reveive_from_remote();
 38         
 39         
 40         return;
 41     }
 42     
 43     //异步读完成后会调用此函数
 44     virtual void handle_read_stream
 45     (const ACE_Asynch_Read_Stream::Result &result)
 46     {
 47         ACE_Message_Block &mb = result.message_block ();
 48         if (!result.success () || result.bytes_transferred () == 0)
 49         {
 50             mb.release ();
 51             
 52             return;
 53         }
 54         //else
 55         //输出读取内容
 56         ACE_OS::printf("received:%s\n",mb.rd_ptr());
 57         mb.release();
 58         //继续下一次异步读取
 59         reveive_from_remote();
 60         
 61         return;
 62     }
 63     //异步写完成后会调用此函数
 64     virtual void handle_write_stream(const ACE_Asynch_Write_Stream::Result &result)
 65     {
 66         result.message_block ().release();
 67         ACE_OS::sleep(1);
 68         //上次发送完毕之后再接着发送一次,这次发送完成之后又会调用
 69         //handle_write_stream,所以会一直发送
 70         send_to_remote();
 71         return;
 72     }
 73     //remote 
 74     void reveive_from_remote(void)
 75     {
 76         ACE_Message_Block *mb;
 77         ACE_NEW_NORETURN (mb, ACE_Message_Block (1024));
 78         if (this->reader_.read (*mb, mb->space ()) != 0)
 79         {
 80             ACE_OS::printf("Begin read fail\n");
 81             
 82             return;
 83         }
 84     }
 85     //把当前时间发送到远端
 86     void send_to_remote(void)
 87     {
 88         std::string book = "C: ";
 89         time_t now = ACE_OS::gettimeofday().sec();
 90         book = book+ ctime(&now);
 91         ACE_Message_Block *mb = new ACE_Message_Block(100);
 92         //获取当前时间的字符串格式
 93         mb->copy(book.c_str() );
 94         //send message to accepter
 95         if (this->writer_.write(*mb,mb->length()) !=0)
 96         {
 97             ACE_OS::printf("Begin write fail\n");
 98             
 99             return;
100         }
101         else
102         {
103             ACE_OS::printf("sended %s\n",mb->rd_ptr());
104         }
105     }
106     
107     // Listing 3
108 private:
109     ACE_Asynch_Read_Stream reader_;
110     ACE_Asynch_Write_Stream writer_;
111 };
112 
113 int main(int argc, char *argv[]) 
114 {
115     
116     ACE_INET_Addr addr(3000,"127.0.0.1"); 
117     
118     ACE_Asynch_Connector<HA_Proactive_Service> connector;
119     
120     connector.open();
121     if (connector.connect(addr) == -1)
122         return -1;
123     
124     while(true)
125         ACE_Proactor::instance ()->handle_events ();
126     
127     return 0; 
128 }
Connector.cpp

 

-----------------------------------

 

GOOD LUCK!

 

郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。