ACE:Reactor框架--处理事件及多个I/O流

ACE Reactor框架:

只要做三件事:

1.从ACE_Event_Handler派生一个或多个类,并给各个虚回调方法增加应用特有的事件处理行为

2.向ACE_Reactor类登记应用的事件处理对象,把每个事件处理对象与它感兴趣的事件关联起来

3.运行ACE_Reactor事件循环

一个接受连接的例子:

  1. #include<iostream>
  2. #include"ace/auto_ptr.h"
  3. #include"ace/log_msg.h"
  4. #include"ace/inet_addr.h"
  5. #include"ace/sock_acceptor.h"
  6. #include"ace/reactor.h"
  7. #include"ace/Message_Block.h"
  8. #include"ace/Message_Queue.h"
  9. #include"ace/SOCK_Stream.h"
  10. #include"ace/Null_Mutex.h"
  11. #include"ace/Null_Condition.h"
  12. usingnamespacestd;
  13. //服务客户
  14. classClientService:publicACE_Event_Handler
  15. {
  16. public:
  17. ACE_SOCK_Stream&peer(void)
  18. {
  19. returnthis->sock_;
  20. }
  21. intopen(void);
  22. virtualACE_HANDLEget_handle(void)const
  23. {
  24. returnthis->sock_.get_handle();
  25. }
  26. virtualinthandle_input(ACE_HANDLEfd=ACE_INVALID_HANDLE);
  27. virtualinthandle_output(ACE_HANDLEfd=ACE_INVALID_HANDLE);
  28. virtualinthandle_close(ACE_HANDLEhandle,ACE_Reactor_Maskclose_mask);
  29. protected:
  30. ACE_SOCK_Streamsock_;
  31. ACE_Message_Queue<ACE_NULL_SYNCH>output_queue_;
  32. };
  33. intClientService::open(void)
  34. {
  35. ACE_TCHARpeer_name[512];
  36. ACE_INET_Addrpeer_addr;
  37. if(this->sock_.get_remote_addr(peer_addr)==0&&peer_addr.addr_to_string(peer_name,512)==0)
  38. cout<<"connectionfrom"<<peer_name<<endl;
  39. returnthis->reactor()->register_handler(this,ACE_Event_Handler::READ_MASK);
  40. }
  41. intClientService::handle_input(ACE_HANDLE)
  42. {
  43. constsize_tINPUT_SIZE=4096;
  44. charbuffer[INPUT_SIZE];
  45. ssize_trecv_cnt,send_cnt;
  46. if((recv_cnt=this->sock_.recv(buffer,sizeof(buffer)))<=0)
  47. {
  48. ACE_DEBUG((LM_DEBUG,ACE_TEXT("(%P|%t)connectionclosed/n")));
  49. return-1;
  50. }
  51. send_cnt=this->sock_.send(buffer,ACE_static_cast(size_t,recv_cnt));
  52. if(send_cnt==recv_cnt)
  53. return0;
  54. if(send_cnt==-1&&ACE_OS::last_error()!=EWOULDBLOCK)
  55. ACE_ERROR_RETURN((LM_ERROR,ACE_TEXT("%P|%t)%p/n"),ACE_TEXT("send")),0);
  56. if(send_cnt==-1)
  57. send_cnt=0;
  58. ACE_Message_Block*mb;
  59. size_tremaining=ACE_static_cast(size_t,(recv_cnt-send_cnt));
  60. ACE_NEW_RETURN(mb,ACE_Message_Block(&buffer[send_cnt],remaining),-1);
  61. intoutput_off=this->output_queue_.is_empty();
  62. ACE_Time_Valuenowait(ACE_OS::gettimeofday());
  63. if(this->output_queue_.enqueue_tail(mb,&nowait)==-1)
  64. {
  65. ACE_ERROR((LM_ERROR,ACE_TEXT("(%P|%t)%P;discardingdata/n"),ACE_TEXT("enqueuefailed")));
  66. mb->release();
  67. return0;
  68. }
  69. if(output_off)
  70. returnthis->reactor()->register_handler(this,ACE_Event_Handler::WRITE_MASK);
  71. return0;
  72. }
  73. intClientService::handle_output(ACE_HANDLE)
  74. {
  75. ACE_Message_Block*mb;
  76. ACE_Time_Valuenowait(ACE_OS::gettimeofday());
  77. while(0==this->output_queue_.dequeue_head(mb,&nowait))
  78. {
  79. ssize_tsend_cnt=this->sock_.send(mb->rd_ptr(),mb->length());
  80. if(send_cnt==-1)
  81. ACE_ERROR((LM_ERROR,ACE_TEXT("(%P|%t)%p/n"),ACE_TEXT("send")));
  82. else
  83. mb->rd_ptr(ACE_static_cast(size_t,send_cnt));
  84. if(mb->length()>0)
  85. {
  86. this->output_queue_.enqueue_head(mb);
  87. break;
  88. }
  89. mb->release();
  90. }
  91. return(this->output_queue_.is_empty())?-1:0;
  92. }
  93. intClientService::handle_close(ACE_HANDLE,ACE_Reactor_Maskmask)
  94. {
  95. if(mask==ACE_Event_Handler::WRITE_MASK)
  96. return0;
  97. mask=ACE_Event_Handler::ALL_EVENTS_MASK|ACE_Event_Handler::DONT_CALL;
  98. this->reactor()->remove_handler(this,mask);
  99. this->sock_.close();
  100. this->output_queue_.flush();
  101. deletethis;
  102. return0;
  103. }
  104. //接受客户
  105. classClientAccept:publicACE_Event_Handler
  106. {
  107. public:
  108. virtual~ClientAccept()
  109. {
  110. this->handle_close(ACE_INVALID_HANDLE,0);
  111. }
  112. intopen(constACE_INET_Addr&listen_addr);
  113. virtualACE_HANDLEget_handle(void)const
  114. {
  115. returnthis->acceptor_.get_handle();
  116. }
  117. virtualinthandle_input(ACE_HANDLEfd=ACE_INVALID_HANDLE);
  118. virtualinthandle_close(ACE_HANDLEhandle,ACE_Reactor_Maskclose_mask);
  119. protected:
  120. ACE_SOCK_Acceptoracceptor_;
  121. };
  122. intClientAccept::open(constACE_INET_Addr&listen_addr)
  123. {
  124. if(this->acceptor_.open(listen_addr,1)==-1)
  125. {
  126. ACE_ERROR_RETURN((LM_ERROR,ACE_TEXT("%p/n"),ACE_TEXT("acceptor.open")),-1);
  127. }
  128. returnthis->reactor()->register_handler(this,ACE_Event_Handler::ACCEPT_MASK);
  129. }
  130. intClientAccept::handle_input(ACE_HANDLE)
  131. {
  132. ClientService*client;
  133. ACE_NEW_RETURN(client,ClientService,-1);
  134. auto_ptr<ClientService>p(client);
  135. if(this->acceptor_.accept(client->peer())==-1)
  136. ACE_ERROR_RETURN((LM_ERROR,ACE_TEXT("%P|%T)%p/N"),ACE_TEXT("Failedtoaccept")ACE_TEXT("clientconnection")),-1);
  137. p.release();
  138. client->reactor(this->reactor());
  139. if(client->open()==-1)
  140. client->handle_close(ACE_INVALID_HANDLE,0);
  141. return0;
  142. }
  143. intClientAccept::handle_close(ACE_HANDLE,ACE_Reactor_Mask)
  144. {
  145. if(this->acceptor_.get_handle()!=ACE_INVALID_HANDLE)
  146. {
  147. ACE_Reactor_Maskm=ACE_Event_Handler::ACCEPT_MASK|ACE_Event_Handler::DONT_CALL;
  148. this->reactor()->remove_handler(this,m);
  149. }
  150. return0;
  151. }
  152. intmain(intargc,char*argv[])
  153. {
  154. ACE_INET_Addrport_to_listen(50000,ACE_LOCALHOST);
  155. ClientAcceptacceptor;
  156. acceptor.reactor(ACE_Reactor::instance());
  157. if(acceptor.open(port_to_listen)==-1)
  158. return1;
  159. ACE_Reactor::instance()->run_reactor_event_loop();
  160. return0;
  161. }

每个类要处理任何类型的Reactor事件的类,必须从ACE_Event_Handler派生,虽然可以用一个类控制接受和所有客户的连接,但还是创建“连接接受”和“连接服务”不同的类比较好!

1.这样可以更好的封装数据和行为,这个类接受来自客户的连接,而这是他所做的全部事情

2.代表客户的类将为客户连接提供服务

在针对一些I/O事件向反应器登记某个事件处理器时,反应器会把一个ACE_Event_Handler指针与一个句柄以及处理器感兴趣的I/O事件类型关联在一起!

当I/O事件触发时,会回调特定的句柄传给handle_input()方法的ACE_HANDLE参数

而在上面程序例子中,创建了一个clientservice实例,为每个连接使用单独的服务处理对象,所以每次接受新的连接都会得到一个新的CLIENTSERVICE实例

为了对要发送的数据进行排队,CLientService用一个ACE_Message_Queue,当要对稍后发送的数据进行排队时,分配一个ACE_Message_Block保存这些数据,并把它放入队列中,以备后用,如果我们无法把数据放入队列,我们就会放弃,抛弃那些数据。如果在我们尝试把余下的数据放入队列之前,输出队列是空的,我们就会再向反应器登记这个处理,这一次针对的是 WRITE事件

ACE_Message_Queue

通过在类声明是指定锁类型就可以很方便实现进程,线程安全的消息队列
ACE_Message_Queue<ACE_MT_SYNCH> message_queue_;如果程序是单线程的话,
可以ACE_Message_Queue<ACE_NULL_SYNCH> message_queue_。

ACE_Message_Block功能简介

ACE_Message_Block在Ace中用来表示消息的存放空间,可用做网络通信中的消息缓冲区,使用非常频繁,下面将在如下方简单的介绍一下ACE_Message_Block相关功能。

  1. 创建消息块
  2. 释放消息块
  3. 从消息块中读写数据
  4. 数据的拷贝
  5. 其它常用函数

1。创建消息块

创建消息块的方式比较灵活,常用的有以下几种方式 :

1。直接给消息块分配内存空间创建。

ACE_Message_Block *mb = new ACE_Message_Block (30);

2。共享底层数据块创建。

char buffer[100];
ACE_Message_Block *mb = new ACE_Message_Block (buffer,30);

这种方式共享底层的数据块,被创建的消息块并不拷贝该数据,也不假定自己拥有它的所有权。在消息块mb被销毁时,相关联的数据缓冲区data将不会被销毁。这是有意义的:消息块没有拷贝数据,因此内存也不是它分配的,这样它也不应该负责销毁它。

3。通过duplicate()函数从已有的消息块中创建副本。

ACE_Message_Block *mb = new ACE_Message_Block (30);
ACE_Message_Block *mb2 = mb->duplicate();

这种方式下,mb2和mb共享同一数据空间,使用的是ACE_Message_Block的引用计数机制。它返回指向要被复制的消息块的指针,并在内部增加内部引用计数

4。通过clone()函数从已有的消息块中复制。

ACE_Message_Block *mb = new ACE_Message_Block (30);
ACE_Message_Block *mb2 = mb->clone();

clone()方法实际地创建整个消息块的新副本,包括它的数据块和附加部分;也就是说,这是一次"深拷贝"。

2。释放消息块

一旦使用完消息块,程序员可以调用它的release()方法来释放它。

  1. 如果消息数据内存是由该消息块分配的,调用release()方法就也会释放此内存。
  2. 如果消息块是引用计数的,release()就会减少计数,直到到达0为止;之后消息块和与它相关联的数据块才从内存中被移除。
  3. 如果消息块是通过共享已分配的底层数据块创建的,底层数据块不会被释放。

无论消息块是哪种方式创建的,只要在使用完后及时调用release()函数,就能确保相应的内存能正确的释放。

3。从消息块中读写数据

ACE_Message_Block提供了两个指针函数以供程序员进行读写操作,rd_ptr()指向可读的数据块地址,wr_ptr()指向可写的数据块地址,默认情况下都执行数据块的首地址。下面的例子简单了演示它的使用方法。

#include "ace/Message_Queue.h"
#include "ace/OS.h"

int main(int argc,char *argv[])
{
ACE_Message_Block *mb = new ACE_Message_Block (30);
ACE_OS::sprintf(mb->wr_ptr(),"%s","hello");
ACE_OS::printf("%s/n",mb->rd_ptr ());
mb->release();
return 0;
}

注意:这两个指针所指向的位置并不会自动移动,在上面的例子中,函数执行完毕后,执行的位置仍然是最开始的0,而不是最新的可写位置5,程序员需要通过wr_ptr(5)函数手动移动写指针的位置。

4。数据的拷贝

一般的数据的拷贝可以通过函数来实现数据的拷贝,copy()还会保证wr_ptr()的更新,使其指向缓冲区的新末尾处。

下面的例子演示了copy()函数的用法。

mb->copy("hello");
mb->copy("123",4);

注意:由于c++是以'/0'作为字符串结束标志的,对于上面的例子,底层数据块中保存的是"hello/0123/0",而用ACE_OS::printf("%s/n",mb->rd_ptr ());打印出来的结果是"hello",使用copy函数进行字符串连接的时候需要注意。

5。其它常用函数

  1. length()返回当前的数据长度
  2. next()获取和设置下一个ACE_Message_Block的链接。(用来建立消息队列非常有用)
  3. space()获取剩余可用空间大小
  4. size()获取和设置数据存储空间大小。

注:ACE_NEW_RETURN的意思用new动态生成一个参数2类型的空间,并将空间的首地址副给第一个参数。如果有错误产生则将第一个参数的值设为空,并返回值RET_VAL。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


react 中的高阶组件主要是对于 hooks 之前的类组件来说的,如果组件之中有复用的代码,需要重新创建一个父类,父类中存储公共代码,返回子类,同时把公用属性...
我们上一节了解了组件的更新机制,但是只是停留在表层上,例如我们的 setState 函数式同步执行的,我们的事件处理直接绑定在了 dom 元素上,这些都跟 re...
我们上一节了解了 react 的虚拟 dom 的格式,如何把虚拟 dom 转为真实 dom 进行挂载。其实函数是组件和类组件也是在这个基础上包裹了一层,一个是调...
react 本身提供了克隆组件的方法,但是平时开发中可能很少使用,可能是不了解。我公司的项目就没有使用,但是在很多三方库中都有使用。本小节我们来学习下如果使用该...
mobx 是一个简单可扩展的状态管理库,中文官网链接。小编在接触 react 就一直使用 mobx 库,上手简单不复杂。
我们在平常的开发中不可避免的会有很多列表渲染逻辑,在 pc 端可以使用分页进行渲染数限制,在移动端可以使用下拉加载更多。但是对于大量的列表渲染,特别像有实时数据...
本小节开始前,我们先答复下一个同学的问题。上一小节发布后,有小伙伴后台来信问到:‘小编你只讲了类组件中怎么使用 ref,那在函数式组件中怎么使用呢?’。确实我们...
上一小节我们了解了固定高度的滚动列表实现,因为是固定高度所以容器总高度和每个元素的 size、offset 很容易得到,这种场景也适合我们常见的大部分场景,例如...
上一小节我们处理了 setState 的批量更新机制,但是我们有两个遗漏点,一个是源码中的 setState 可以传入函数,同时 setState 可以传入第二...
我们知道 react 进行页面渲染或者刷新的时候,会从根节点到子节点全部执行一遍,即使子组件中没有状态的改变,也会执行。这就造成了性能不必要的浪费。之前我们了解...
在平时工作中的某些场景下,你可能想在整个组件树中传递数据,但却不想手动地通过 props 属性在每一层传递属性,contextAPI 应用而生。
楼主最近入职新单位了,恰好新单位使用的技术栈是 react,因为之前一直进行的是 vue2/vue3 和小程序开发,对于这些技术栈实现机制也有一些了解,最少面试...
我们上一节了了解了函数式组件和类组件的处理方式,本质就是处理基于 babel 处理后的 type 类型,最后还是要处理虚拟 dom。本小节我们学习下组件的更新机...
前面几节我们学习了解了 react 的渲染机制和生命周期,本节我们正式进入基本面试必考的核心地带 -- diff 算法,了解如何优化和复用 dom 操作的,还有...
我们在之前已经学习过 react 生命周期,但是在 16 版本中 will 类的生命周期进行了废除,虽然依然可以用,但是需要加上 UNSAFE 开头,表示是不安...
上一小节我们学习了 react 中类组件的优化方式,对于 hooks 为主流的函数式编程,react 也提供了优化方式 memo 方法,本小节我们来了解下它的用...
开源不易,感谢你的支持,❤ star me if you like concent ^_^
hel-micro,模块联邦sdk化,免构建、热更新、工具链无关的微模块方案 ,欢迎关注与了解
本文主题围绕concent的setup和react的五把钩子来展开,既然提到了setup就离不开composition api这个关键词,准确的说setup是由...
ReactsetState的执行是异步还是同步官方文档是这么说的setState()doesnotalwaysimmediatelyupdatethecomponent.Itmaybatchordefertheupdateuntillater.Thismakesreadingthis.staterightaftercallingsetState()apotentialpitfall.Instead,usecom