ACE之Proactor模式使用实例

2023-12-06 20:58

本文主要是介绍ACE之Proactor模式使用实例,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

// ACE_Proactor_Client.cpp : 定义控制台应用程序的入口点。
//#include "stdafx.h"#include "ace/Message_Queue.h"
#include "ace/Asynch_IO.h"
#include "ace/OS.h"
#include "ace/Proactor.h"
#include "ace/Asynch_Connector.h"
#include "ace/SOCK_SEQPACK_Association.h"#pragma comment(lib,"ACEd.lib")class Service_Handler : public ACE_Service_Handler
{
public:Service_Handler(){//ACE_OS::printf("Service_Handler constructed for connector \n");}~Service_Handler (){if (this->handle () != ACE_INVALID_HANDLE)ACE_OS::closesocket (this->handle ());//ACE_OS::printf("one Service_Handler for connecter destructed");}void post_send(void){do {time_t now = ACE_OS::gettimeofday().sec();ACE_Message_Block *mb = new ACE_Message_Block(128);char buff[64];ACE_INET_Addr addr;ACE_SOCK_SEQPACK_Association ass=ACE_SOCK_SEQPACK_Association(this->handle());size_t addr_size=sizeof ACE_INET_Addr;ass.get_local_addrs(&addr,addr_size);//ACE_OS::printf("fd:%d ip:%d port:%d\n",(int)this->handle(), addr.get_ip_address(), addr.get_port_number());sprintf(buff,"%d",addr.get_port_number());mb->copy(buff/*ctime(now)*/);if (this->writer_.write(*mb,mb->length()) !=0){ACE_OS::printf("Begin write fail in open\n");delete this;break;}else{ACE_OS::printf("sended:%s\n",mb->rd_ptr());}} while (0);}void post_recv(void){do {ACE_Message_Block *mb = new ACE_Message_Block(buffer,128);if (this->reader_.read (*mb, mb->space ()) != 0){ACE_OS::printf("Begin read fail\n");delete this;break;}} while (0);}virtual void open (ACE_HANDLE h, ACE_Message_Block&){do {this->handle (h);if (this->writer_.open (*this) != 0 ){ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"),ACE_TEXT ("Service_Handler open")));delete this;break;}post_send();if (this->reader_.open (*this) != 0 ){ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"),ACE_TEXT ("Service_Handler open")));delete this;break;}post_recv();} while (0);}virtual void handle_write_stream(const ACE_Asynch_Write_Stream::Result &result){result.message_block ().release();//ACE_OS::sleep(1);post_send();}virtual void handle_read_stream(const ACE_Asynch_Read_Stream::Result &result){do {ACE_Message_Block &mb = result.message_block ();if (!result.success () || result.bytes_transferred () == 0){mb.release ();delete this;break;}ACE_OS::printf("received:%s\n",mb.rd_ptr());mb.release();post_recv();} while (0);}
private:ACE_Asynch_Write_Stream writer_;ACE_Asynch_Read_Stream reader_;char buffer[128];
};#include <ace/OS.h>
#include <ace/Task.h>class TTcpNetThread : public ACE_Task_Base
{
public:/// 运行int open();/// 停止运行int close();
protected:/// 线程函数virtual int svc();
};int TTcpNetThread::open() { return this->activate(); }int TTcpNetThread::close()
{ACE_Proactor::instance()->proactor_end_event_loop(); // 终止ACE_Proactor循环this->wait(); // 等待清理现场return 0;
}int TTcpNetThread::svc()
{/*ACE_INET_Addr listenAddr(4567); // 默认监听地址TTcpAcceptor tcpAcceptor; // 接收器// 演出开始if (tcpAcceptor.open(listenAddr, 0, 1, 5, 1, 0, 0) != 0)ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("%p\n"), ACE_TEXT("failed to open TcpAcceptor errno=%i\n"), errno), -1);*/// Proactor的事件循环开始ACE_Proactor::instance()->proactor_run_event_loop();ACE_DEBUG((LM_DEBUG, ACE_TEXT("Network fin\n")));return 0;
}#define TCP_CLIENT_THREAD_SEND	0x777const int CLIENT_CONNECTION_NUM_OF_PER_THREAD = 1; //< 客户端每个线程的连接数#include "ace/SOCK_Connector.h"
#include "ace/SOCK_Stream.h"
/**
* @class TTcpClientThread
* @brief TCP客户端测试线程
*/
class TTcpClientThread : public ACE_Task<ACE_MT_SYNCH>
{ACE_SOCK_Connector connector[CLIENT_CONNECTION_NUM_OF_PER_THREAD]; //< 连接器ACE_SOCK_Stream peerStream[CLIENT_CONNECTION_NUM_OF_PER_THREAD]; //< 流对象public:/// ctor~TTcpClientThread();/// 运行int open();/// 停止运行int close();
private:/// 线程函数virtual int svc();
};TTcpClientThread::~TTcpClientThread()
{for(int i = 0; i < CLIENT_CONNECTION_NUM_OF_PER_THREAD; i++)peerStream[i].close();
}int TTcpClientThread::open() { return this->activate(); }int TTcpClientThread::close()
{ACE_TRACE("TTcpClientThread::close");ACE_Message_Block* termBlock;ACE_NEW_NORETURN(termBlock, ACE_Message_Block(0, ACE_Message_Block::MB_HANGUP));if (!termBlock)ACE_DEBUG((LM_ERROR, ACE_TEXT("Allocate failed %i"), errno));else{putq(termBlock);wait();}return 0;
}int TTcpClientThread::svc()
{ACE_INET_Addr srvAddr(7878, "127.0.0.1");for(int i = 0; i < CLIENT_CONNECTION_NUM_OF_PER_THREAD; i++){if (connector[i].connect(peerStream[i], srvAddr) == -1){ACE_ERROR((LM_ERROR, ACE_TEXT("%i Failed to connect server errno=%i\n"), i, errno));}Sleep(100);}struct TPack{
#pragma pack(push)
#pragma pack(1)unsigned int seq;unsigned short len;char data [128];
#pragma pack(pop)};ACE_Message_Block* msg = 0;ACE_INET_Addr localAddr;ACE_TCHAR localAddrStr[128];peerStream[0].get_local_addr(localAddr);localAddr.addr_to_string(localAddrStr, sizeof(localAddrStr) / sizeof(ACE_TCHAR));TPack data;int len = sizeof(unsigned int) + sizeof(unsigned short);data.seq = 0;data.len = strlen(localAddrStr) + 1;strcpy(data.data, localAddrStr);len += data.len;char tmp[sizeof(TPack)];char buf[256];memcpy(tmp, &data, len);while(true) // 线程循环{if (getq(msg) != -1){switch(msg->msg_type()){case ACE_Message_Block::MB_HANGUP:{msg->release();return 0;}break;default:{for(int i = 0; i < CLIENT_CONNECTION_NUM_OF_PER_THREAD; i++){peerStream[i].send(tmp, 5);Sleep(100);peerStream[i].send(tmp + 5, len - 5);Sleep(100);ACE_Time_Value timeout(2);int recvLen =  peerStream[i].recv_n(buf, sizeof(unsigned int) + sizeof(unsigned short), 0, &timeout);if (recvLen == sizeof(unsigned int) + sizeof(unsigned short)){short dataLen = *(short *)(buf + 4);if (dataLen > 256)dataLen = 256;recvLen = peerStream[i].recv_n(buf, dataLen, 0, &timeout);if (recvLen != dataLen)ACE_DEBUG((LM_INFO, ACE_TEXT("Failed to recv data, length is %i, but only get %i\n"), dataLen, recvLen));elseACE_DEBUG((LM_INFO, ACE_TEXT("Client get data: len=%i data=%s\n"), recvLen, buf));} // if recvLen} // for} // defaultbreak;} // switchmsg->release();} // if getq} // whileACE_DEBUG((LM_INFO, ACE_TEXT("Exit client thread")));return 0;
}#include <vector>
#define CLIENT_THREAD_NUM 4
int main(int argc, char *argv[]) 
{ACE_INET_Addr remote_addr(4567,ACE_LOCALHOST); std::vector<ACE_Asynch_Connector<Service_Handler> *> vtconnector;for (int i=0;i<2000;i++){ACE_INET_Addr local_addr(10000+i,ACE_LOCALHOST); ACE_Asynch_Connector<Service_Handler> *connector = new ACE_Asynch_Connector<Service_Handler>;connector->open();if (connector->connect(remote_addr,local_addr) == -1)return -1;vtconnector.push_back(connector);}TTcpNetThread netThread[CLIENT_THREAD_NUM];for(int i = 0; i < CLIENT_THREAD_NUM; i++){netThread[i].open();}while (getchar()){ACE_OS::sleep(1);}//ACE_Proactor::instance ()->proactor_run_event_loop();return 0; 
}

// ACE_Proactor_Server.cpp : 定义控制台应用程序的入口点。
//#include "stdafx.h"#include "ace/Asynch_IO.h"
#include "ace/OS_main.h"
#include "ace/Proactor.h"
#include "ace/Asynch_Acceptor.h"
#include "ace/INET_Addr.h"
#include "ace/OS.h"
#include "ace/SOCK_Connector.h"
#include "ace/SOCK_Acceptor.h"
#include "ace/SOCK_Stream.h"
#include "ace/Message_Block.h"
#include "ace/Containers.h"
#include "ace/SOCK_SEQPACK_Association.h"ACE_DLList<ACE_Asynch_Write_Stream> wList;class Service_Handler:public ACE_Service_Handler
{
public:Service_Handler(){}~Service_Handler(void){if(this->handle()!=ACE_INVALID_HANDLE)ACE_OS::closesocket(this->handle());}virtual void open(ACE_HANDLE h,ACE_Message_Block &message_block){//handle_= h;//this->handle(h);if(rs_.open(*this,h)){ACE_ERROR ((LM_ERROR,"%p/n","ACE_Asynch_Read_Stream::open"));return;}if(ws_.open(*this)){ACE_ERROR ((LM_ERROR,"%p/n","ACE_Asynch_Write_Stream::open"));return;}if (post_recv()==-1)return;//wList.insert_tail(&ws_);addresses(remote_address,local_address);remote_address.addr_to_string(peer_name,MAXHOSTNAMELEN);ACE_INET_Addr addr;ACE_SOCK_SEQPACK_Association ass=ACE_SOCK_SEQPACK_Association(h);size_t addr_size=sizeof ACE_INET_Addr;ass.get_remote_addrs(&addr,addr_size);ACE_OS::printf("fd:%d ip:%d port:%d\n",(int)h, addr.get_ip_address(), addr.get_port_number());//ACE_DEBUG((LM_DEBUG,ACE_TEXT("peer:%s\n"),peer_name));}
protected:int post_recv(void){ACE_Message_Block *mb=0;ACE_NEW_RETURN(mb,ACE_Message_Block(512),-1);if(rs_.read(*mb,mb->space())==-1){ACE_ERROR_RETURN((LM_ERROR,"%p/n","ACE_Asynch_Read_Stream::read"),-1);}return 0;}virtual void handle_read_stream(const ACE_Asynch_Read_Stream::Result &result){//ACE_HANDLE h = result.handle();ACE_Message_Block &mb = result.message_block ();if (result.success()&&result.bytes_transferred()!=0){ACE_DEBUG((LM_DEBUG,ACE_TEXT("recv:%s\n"),mb.rd_ptr()));if (ws_.write(*mb.duplicate(),result.message_block().length())==-1){ACE_ERROR ((LM_ERROR,"%p\n","ACE_Asynch_Write_Stream::write"));}/*ACE_DLList_Iterator<ACE_Asynch_Write_Stream> iter(wList);while(!iter.done()){if (iter.next()->write(*result.message_block().duplicate(),result.message_block().length())==-1){ACE_ERROR ((LM_ERROR,"%p/n","ACE_Asynch_Write_Stream::write"));}iter++;}*/mb.release();post_recv();}else{mb.release();/*ACE_DLList_Iterator<ACE_Asynch_Write_Stream> iter(wList);while (!iter.done ()){if(&ws_==iter.next()){iter.remove();break;}iter++;}*/delete this;}}virtual void handle_write_stream(const ACE_Asynch_Write_Stream::Result &result){//ACE_OS::printf("write complete:%d %d\n", result.success(),result.bytes_transferred());result.message_block().release();}
private:ACE_Asynch_Read_Stream rs_;ACE_Asynch_Write_Stream ws_;ACE_HANDLE handle_;ACE_TCHAR peer_name[MAXHOSTNAMELEN];ACE_INET_Addr remote_address;ACE_INET_Addr local_address;
};#include <ace/OS.h>
#include <ace/Task.h>class TTcpNetThread : public ACE_Task_Base
{
public:/// 运行int open();/// 停止运行int close();
protected:/// 线程函数virtual int svc();
};int TTcpNetThread::open() { return this->activate(); }int TTcpNetThread::close()
{ACE_Proactor::instance()->proactor_end_event_loop(); // 终止ACE_Proactor循环this->wait(); // 等待清理现场return 0;
}int TTcpNetThread::svc()
{/*ACE_INET_Addr listenAddr(4567); // 默认监听地址TTcpAcceptor tcpAcceptor; // 接收器// 演出开始if (tcpAcceptor.open(listenAddr, 0, 1, 5, 1, 0, 0) != 0)ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("%p\n"), ACE_TEXT("failed to open TcpAcceptor errno=%i\n"), errno), -1);*/// Proactor的事件循环开始ACE_Proactor::instance()->proactor_run_event_loop();ACE_DEBUG((LM_DEBUG, ACE_TEXT("Network fin\n")));return 0;
}#pragma comment(lib,"ACEd.lib")#define CLIENT_THREAD_NUM 4int main(int argc,char *argv[])
{ACE_Asynch_Acceptor<Service_Handler> acceptor;if(acceptor.open(ACE_INET_Addr(4567),0,1) == -1){return -1;}TTcpNetThread netThread[CLIENT_THREAD_NUM];for(int i = 0; i < CLIENT_THREAD_NUM; i++){netThread[i].open();}while (getchar()){ACE_OS::sleep(1);}//ACE_Proactor::instance()->proactor_run_event_loop();return 0;
};

这篇关于ACE之Proactor模式使用实例的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/463303

相关文章

使用Python和OpenCV库实现实时颜色识别系统

《使用Python和OpenCV库实现实时颜色识别系统》:本文主要介绍使用Python和OpenCV库实现的实时颜色识别系统,这个系统能够通过摄像头捕捉视频流,并在视频中指定区域内识别主要颜色(红... 目录一、引言二、系统概述三、代码解析1. 导入库2. 颜色识别函数3. 主程序循环四、HSV色彩空间详解

Windows下C++使用SQLitede的操作过程

《Windows下C++使用SQLitede的操作过程》本文介绍了Windows下C++使用SQLite的安装配置、CppSQLite库封装优势、核心功能(如数据库连接、事务管理)、跨平台支持及性能优... 目录Windows下C++使用SQLite1、安装2、代码示例CppSQLite:C++轻松操作SQ

Python常用命令提示符使用方法详解

《Python常用命令提示符使用方法详解》在学习python的过程中,我们需要用到命令提示符(CMD)进行环境的配置,:本文主要介绍Python常用命令提示符使用方法的相关资料,文中通过代码介绍的... 目录一、python环境基础命令【Windows】1、检查Python是否安装2、 查看Python的安

Python并行处理实战之如何使用ProcessPoolExecutor加速计算

《Python并行处理实战之如何使用ProcessPoolExecutor加速计算》Python提供了多种并行处理的方式,其中concurrent.futures模块的ProcessPoolExecu... 目录简介完整代码示例代码解释1. 导入必要的模块2. 定义处理函数3. 主函数4. 生成数字列表5.

Redis Cluster模式配置

《RedisCluster模式配置》:本文主要介绍RedisCluster模式配置,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录分片 一、分片的本质与核心价值二、分片实现方案对比 ‌三、分片算法详解1. ‌范围分片(顺序分片)‌2. ‌哈希分片3. ‌虚

Python中help()和dir()函数的使用

《Python中help()和dir()函数的使用》我们经常需要查看某个对象(如模块、类、函数等)的属性和方法,Python提供了两个内置函数help()和dir(),它们可以帮助我们快速了解代... 目录1. 引言2. help() 函数2.1 作用2.2 使用方法2.3 示例(1) 查看内置函数的帮助(

Linux脚本(shell)的使用方式

《Linux脚本(shell)的使用方式》:本文主要介绍Linux脚本(shell)的使用方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录概述语法详解数学运算表达式Shell变量变量分类环境变量Shell内部变量自定义变量:定义、赋值自定义变量:引用、修改、删

Python实例题之pygame开发打飞机游戏实例代码

《Python实例题之pygame开发打飞机游戏实例代码》对于python的学习者,能够写出一个飞机大战的程序代码,是不是感觉到非常的开心,:本文主要介绍Python实例题之pygame开发打飞机... 目录题目pygame-aircraft-game使用 Pygame 开发的打飞机游戏脚本代码解释初始化部

Java使用HttpClient实现图片下载与本地保存功能

《Java使用HttpClient实现图片下载与本地保存功能》在当今数字化时代,网络资源的获取与处理已成为软件开发中的常见需求,其中,图片作为网络上最常见的资源之一,其下载与保存功能在许多应用场景中都... 目录引言一、Apache HttpClient简介二、技术栈与环境准备三、实现图片下载与保存功能1.

Python中使用uv创建环境及原理举例详解

《Python中使用uv创建环境及原理举例详解》uv是Astral团队开发的高性能Python工具,整合包管理、虚拟环境、Python版本控制等功能,:本文主要介绍Python中使用uv创建环境及... 目录一、uv工具简介核心特点:二、安装uv1. 通过pip安装2. 通过脚本安装验证安装:配置镜像源(可