ACE之Proactor模式使用实例

2023-12-06 20:58

本文主要是介绍ACE之Proactor模式使用实例,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

// ACE_Proactor_Client.cpp : 定义控制台应用程序的入口点。
//#include "stdafx.h"#include "ace/Message_Queue.h"
#include "ace/Asynch_IO.h"
#include "ace/OS.h"
#include "ace/Proactor.h"
#include "ace/Asynch_Connector.h"
#include "ace/SOCK_SEQPACK_Association.h"#pragma comment(lib,"ACEd.lib")class Service_Handler : public ACE_Service_Handler
{
public:Service_Handler(){//ACE_OS::printf("Service_Handler constructed for connector \n");}~Service_Handler (){if (this->handle () != ACE_INVALID_HANDLE)ACE_OS::closesocket (this->handle ());//ACE_OS::printf("one Service_Handler for connecter destructed");}void post_send(void){do {time_t now = ACE_OS::gettimeofday().sec();ACE_Message_Block *mb = new ACE_Message_Block(128);char buff[64];ACE_INET_Addr addr;ACE_SOCK_SEQPACK_Association ass=ACE_SOCK_SEQPACK_Association(this->handle());size_t addr_size=sizeof ACE_INET_Addr;ass.get_local_addrs(&addr,addr_size);//ACE_OS::printf("fd:%d ip:%d port:%d\n",(int)this->handle(), addr.get_ip_address(), addr.get_port_number());sprintf(buff,"%d",addr.get_port_number());mb->copy(buff/*ctime(now)*/);if (this->writer_.write(*mb,mb->length()) !=0){ACE_OS::printf("Begin write fail in open\n");delete this;break;}else{ACE_OS::printf("sended:%s\n",mb->rd_ptr());}} while (0);}void post_recv(void){do {ACE_Message_Block *mb = new ACE_Message_Block(buffer,128);if (this->reader_.read (*mb, mb->space ()) != 0){ACE_OS::printf("Begin read fail\n");delete this;break;}} while (0);}virtual void open (ACE_HANDLE h, ACE_Message_Block&){do {this->handle (h);if (this->writer_.open (*this) != 0 ){ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"),ACE_TEXT ("Service_Handler open")));delete this;break;}post_send();if (this->reader_.open (*this) != 0 ){ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"),ACE_TEXT ("Service_Handler open")));delete this;break;}post_recv();} while (0);}virtual void handle_write_stream(const ACE_Asynch_Write_Stream::Result &result){result.message_block ().release();//ACE_OS::sleep(1);post_send();}virtual void handle_read_stream(const ACE_Asynch_Read_Stream::Result &result){do {ACE_Message_Block &mb = result.message_block ();if (!result.success () || result.bytes_transferred () == 0){mb.release ();delete this;break;}ACE_OS::printf("received:%s\n",mb.rd_ptr());mb.release();post_recv();} while (0);}
private:ACE_Asynch_Write_Stream writer_;ACE_Asynch_Read_Stream reader_;char buffer[128];
};#include <ace/OS.h>
#include <ace/Task.h>class TTcpNetThread : public ACE_Task_Base
{
public:/// 运行int open();/// 停止运行int close();
protected:/// 线程函数virtual int svc();
};int TTcpNetThread::open() { return this->activate(); }int TTcpNetThread::close()
{ACE_Proactor::instance()->proactor_end_event_loop(); // 终止ACE_Proactor循环this->wait(); // 等待清理现场return 0;
}int TTcpNetThread::svc()
{/*ACE_INET_Addr listenAddr(4567); // 默认监听地址TTcpAcceptor tcpAcceptor; // 接收器// 演出开始if (tcpAcceptor.open(listenAddr, 0, 1, 5, 1, 0, 0) != 0)ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("%p\n"), ACE_TEXT("failed to open TcpAcceptor errno=%i\n"), errno), -1);*/// Proactor的事件循环开始ACE_Proactor::instance()->proactor_run_event_loop();ACE_DEBUG((LM_DEBUG, ACE_TEXT("Network fin\n")));return 0;
}#define TCP_CLIENT_THREAD_SEND	0x777const int CLIENT_CONNECTION_NUM_OF_PER_THREAD = 1; //< 客户端每个线程的连接数#include "ace/SOCK_Connector.h"
#include "ace/SOCK_Stream.h"
/**
* @class TTcpClientThread
* @brief TCP客户端测试线程
*/
class TTcpClientThread : public ACE_Task<ACE_MT_SYNCH>
{ACE_SOCK_Connector connector[CLIENT_CONNECTION_NUM_OF_PER_THREAD]; //< 连接器ACE_SOCK_Stream peerStream[CLIENT_CONNECTION_NUM_OF_PER_THREAD]; //< 流对象public:/// ctor~TTcpClientThread();/// 运行int open();/// 停止运行int close();
private:/// 线程函数virtual int svc();
};TTcpClientThread::~TTcpClientThread()
{for(int i = 0; i < CLIENT_CONNECTION_NUM_OF_PER_THREAD; i++)peerStream[i].close();
}int TTcpClientThread::open() { return this->activate(); }int TTcpClientThread::close()
{ACE_TRACE("TTcpClientThread::close");ACE_Message_Block* termBlock;ACE_NEW_NORETURN(termBlock, ACE_Message_Block(0, ACE_Message_Block::MB_HANGUP));if (!termBlock)ACE_DEBUG((LM_ERROR, ACE_TEXT("Allocate failed %i"), errno));else{putq(termBlock);wait();}return 0;
}int TTcpClientThread::svc()
{ACE_INET_Addr srvAddr(7878, "127.0.0.1");for(int i = 0; i < CLIENT_CONNECTION_NUM_OF_PER_THREAD; i++){if (connector[i].connect(peerStream[i], srvAddr) == -1){ACE_ERROR((LM_ERROR, ACE_TEXT("%i Failed to connect server errno=%i\n"), i, errno));}Sleep(100);}struct TPack{
#pragma pack(push)
#pragma pack(1)unsigned int seq;unsigned short len;char data [128];
#pragma pack(pop)};ACE_Message_Block* msg = 0;ACE_INET_Addr localAddr;ACE_TCHAR localAddrStr[128];peerStream[0].get_local_addr(localAddr);localAddr.addr_to_string(localAddrStr, sizeof(localAddrStr) / sizeof(ACE_TCHAR));TPack data;int len = sizeof(unsigned int) + sizeof(unsigned short);data.seq = 0;data.len = strlen(localAddrStr) + 1;strcpy(data.data, localAddrStr);len += data.len;char tmp[sizeof(TPack)];char buf[256];memcpy(tmp, &data, len);while(true) // 线程循环{if (getq(msg) != -1){switch(msg->msg_type()){case ACE_Message_Block::MB_HANGUP:{msg->release();return 0;}break;default:{for(int i = 0; i < CLIENT_CONNECTION_NUM_OF_PER_THREAD; i++){peerStream[i].send(tmp, 5);Sleep(100);peerStream[i].send(tmp + 5, len - 5);Sleep(100);ACE_Time_Value timeout(2);int recvLen =  peerStream[i].recv_n(buf, sizeof(unsigned int) + sizeof(unsigned short), 0, &timeout);if (recvLen == sizeof(unsigned int) + sizeof(unsigned short)){short dataLen = *(short *)(buf + 4);if (dataLen > 256)dataLen = 256;recvLen = peerStream[i].recv_n(buf, dataLen, 0, &timeout);if (recvLen != dataLen)ACE_DEBUG((LM_INFO, ACE_TEXT("Failed to recv data, length is %i, but only get %i\n"), dataLen, recvLen));elseACE_DEBUG((LM_INFO, ACE_TEXT("Client get data: len=%i data=%s\n"), recvLen, buf));} // if recvLen} // for} // defaultbreak;} // switchmsg->release();} // if getq} // whileACE_DEBUG((LM_INFO, ACE_TEXT("Exit client thread")));return 0;
}#include <vector>
#define CLIENT_THREAD_NUM 4
int main(int argc, char *argv[]) 
{ACE_INET_Addr remote_addr(4567,ACE_LOCALHOST); std::vector<ACE_Asynch_Connector<Service_Handler> *> vtconnector;for (int i=0;i<2000;i++){ACE_INET_Addr local_addr(10000+i,ACE_LOCALHOST); ACE_Asynch_Connector<Service_Handler> *connector = new ACE_Asynch_Connector<Service_Handler>;connector->open();if (connector->connect(remote_addr,local_addr) == -1)return -1;vtconnector.push_back(connector);}TTcpNetThread netThread[CLIENT_THREAD_NUM];for(int i = 0; i < CLIENT_THREAD_NUM; i++){netThread[i].open();}while (getchar()){ACE_OS::sleep(1);}//ACE_Proactor::instance ()->proactor_run_event_loop();return 0; 
}

// ACE_Proactor_Server.cpp : 定义控制台应用程序的入口点。
//#include "stdafx.h"#include "ace/Asynch_IO.h"
#include "ace/OS_main.h"
#include "ace/Proactor.h"
#include "ace/Asynch_Acceptor.h"
#include "ace/INET_Addr.h"
#include "ace/OS.h"
#include "ace/SOCK_Connector.h"
#include "ace/SOCK_Acceptor.h"
#include "ace/SOCK_Stream.h"
#include "ace/Message_Block.h"
#include "ace/Containers.h"
#include "ace/SOCK_SEQPACK_Association.h"ACE_DLList<ACE_Asynch_Write_Stream> wList;class Service_Handler:public ACE_Service_Handler
{
public:Service_Handler(){}~Service_Handler(void){if(this->handle()!=ACE_INVALID_HANDLE)ACE_OS::closesocket(this->handle());}virtual void open(ACE_HANDLE h,ACE_Message_Block &message_block){//handle_= h;//this->handle(h);if(rs_.open(*this,h)){ACE_ERROR ((LM_ERROR,"%p/n","ACE_Asynch_Read_Stream::open"));return;}if(ws_.open(*this)){ACE_ERROR ((LM_ERROR,"%p/n","ACE_Asynch_Write_Stream::open"));return;}if (post_recv()==-1)return;//wList.insert_tail(&ws_);addresses(remote_address,local_address);remote_address.addr_to_string(peer_name,MAXHOSTNAMELEN);ACE_INET_Addr addr;ACE_SOCK_SEQPACK_Association ass=ACE_SOCK_SEQPACK_Association(h);size_t addr_size=sizeof ACE_INET_Addr;ass.get_remote_addrs(&addr,addr_size);ACE_OS::printf("fd:%d ip:%d port:%d\n",(int)h, addr.get_ip_address(), addr.get_port_number());//ACE_DEBUG((LM_DEBUG,ACE_TEXT("peer:%s\n"),peer_name));}
protected:int post_recv(void){ACE_Message_Block *mb=0;ACE_NEW_RETURN(mb,ACE_Message_Block(512),-1);if(rs_.read(*mb,mb->space())==-1){ACE_ERROR_RETURN((LM_ERROR,"%p/n","ACE_Asynch_Read_Stream::read"),-1);}return 0;}virtual void handle_read_stream(const ACE_Asynch_Read_Stream::Result &result){//ACE_HANDLE h = result.handle();ACE_Message_Block &mb = result.message_block ();if (result.success()&&result.bytes_transferred()!=0){ACE_DEBUG((LM_DEBUG,ACE_TEXT("recv:%s\n"),mb.rd_ptr()));if (ws_.write(*mb.duplicate(),result.message_block().length())==-1){ACE_ERROR ((LM_ERROR,"%p\n","ACE_Asynch_Write_Stream::write"));}/*ACE_DLList_Iterator<ACE_Asynch_Write_Stream> iter(wList);while(!iter.done()){if (iter.next()->write(*result.message_block().duplicate(),result.message_block().length())==-1){ACE_ERROR ((LM_ERROR,"%p/n","ACE_Asynch_Write_Stream::write"));}iter++;}*/mb.release();post_recv();}else{mb.release();/*ACE_DLList_Iterator<ACE_Asynch_Write_Stream> iter(wList);while (!iter.done ()){if(&ws_==iter.next()){iter.remove();break;}iter++;}*/delete this;}}virtual void handle_write_stream(const ACE_Asynch_Write_Stream::Result &result){//ACE_OS::printf("write complete:%d %d\n", result.success(),result.bytes_transferred());result.message_block().release();}
private:ACE_Asynch_Read_Stream rs_;ACE_Asynch_Write_Stream ws_;ACE_HANDLE handle_;ACE_TCHAR peer_name[MAXHOSTNAMELEN];ACE_INET_Addr remote_address;ACE_INET_Addr local_address;
};#include <ace/OS.h>
#include <ace/Task.h>class TTcpNetThread : public ACE_Task_Base
{
public:/// 运行int open();/// 停止运行int close();
protected:/// 线程函数virtual int svc();
};int TTcpNetThread::open() { return this->activate(); }int TTcpNetThread::close()
{ACE_Proactor::instance()->proactor_end_event_loop(); // 终止ACE_Proactor循环this->wait(); // 等待清理现场return 0;
}int TTcpNetThread::svc()
{/*ACE_INET_Addr listenAddr(4567); // 默认监听地址TTcpAcceptor tcpAcceptor; // 接收器// 演出开始if (tcpAcceptor.open(listenAddr, 0, 1, 5, 1, 0, 0) != 0)ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("%p\n"), ACE_TEXT("failed to open TcpAcceptor errno=%i\n"), errno), -1);*/// Proactor的事件循环开始ACE_Proactor::instance()->proactor_run_event_loop();ACE_DEBUG((LM_DEBUG, ACE_TEXT("Network fin\n")));return 0;
}#pragma comment(lib,"ACEd.lib")#define CLIENT_THREAD_NUM 4int main(int argc,char *argv[])
{ACE_Asynch_Acceptor<Service_Handler> acceptor;if(acceptor.open(ACE_INET_Addr(4567),0,1) == -1){return -1;}TTcpNetThread netThread[CLIENT_THREAD_NUM];for(int i = 0; i < CLIENT_THREAD_NUM; i++){netThread[i].open();}while (getchar()){ACE_OS::sleep(1);}//ACE_Proactor::instance()->proactor_run_event_loop();return 0;
};

这篇关于ACE之Proactor模式使用实例的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/463303

相关文章

使用Python构建智能BAT文件生成器的完美解决方案

《使用Python构建智能BAT文件生成器的完美解决方案》这篇文章主要为大家详细介绍了如何使用wxPython构建一个智能的BAT文件生成器,它不仅能够为Python脚本生成启动脚本,还提供了完整的文... 目录引言运行效果图项目背景与需求分析核心需求技术选型核心功能实现1. 数据库设计2. 界面布局设计3

使用IDEA部署Docker应用指南分享

《使用IDEA部署Docker应用指南分享》本文介绍了使用IDEA部署Docker应用的四步流程:创建Dockerfile、配置IDEADocker连接、设置运行调试环境、构建运行镜像,并强调需准备本... 目录一、创建 dockerfile 配置文件二、配置 IDEA 的 Docker 连接三、配置 Do

Android Paging 分页加载库使用实践

《AndroidPaging分页加载库使用实践》AndroidPaging库是Jetpack组件的一部分,它提供了一套完整的解决方案来处理大型数据集的分页加载,本文将深入探讨Paging库... 目录前言一、Paging 库概述二、Paging 3 核心组件1. PagingSource2. Pager3.

python使用try函数详解

《python使用try函数详解》Pythontry语句用于异常处理,支持捕获特定/多种异常、else/final子句确保资源释放,结合with语句自动清理,可自定义异常及嵌套结构,灵活应对错误场景... 目录try 函数的基本语法捕获特定异常捕获多个异常使用 else 子句使用 finally 子句捕获所

C++11右值引用与Lambda表达式的使用

《C++11右值引用与Lambda表达式的使用》C++11引入右值引用,实现移动语义提升性能,支持资源转移与完美转发;同时引入Lambda表达式,简化匿名函数定义,通过捕获列表和参数列表灵活处理变量... 目录C++11新特性右值引用和移动语义左值 / 右值常见的左值和右值移动语义移动构造函数移动复制运算符

Python对接支付宝支付之使用AliPay实现的详细操作指南

《Python对接支付宝支付之使用AliPay实现的详细操作指南》支付宝没有提供PythonSDK,但是强大的github就有提供python-alipay-sdk,封装里很多复杂操作,使用这个我们就... 目录一、引言二、准备工作2.1 支付宝开放平台入驻与应用创建2.2 密钥生成与配置2.3 安装ali

C#中lock关键字的使用小结

《C#中lock关键字的使用小结》在C#中,lock关键字用于确保当一个线程位于给定实例的代码块中时,其他线程无法访问同一实例的该代码块,下面就来介绍一下lock关键字的使用... 目录使用方式工作原理注意事项示例代码为什么不能lock值类型在C#中,lock关键字用于确保当一个线程位于给定实例的代码块中时

MySQL 强制使用特定索引的操作

《MySQL强制使用特定索引的操作》MySQL可通过FORCEINDEX、USEINDEX等语法强制查询使用特定索引,但优化器可能不采纳,需结合EXPLAIN分析执行计划,避免性能下降,注意版本差异... 目录1. 使用FORCE INDEX语法2. 使用USE INDEX语法3. 使用IGNORE IND

C# $字符串插值的使用

《C#$字符串插值的使用》本文介绍了C#中的字符串插值功能,详细介绍了使用$符号的实现方式,文中通过示例代码介绍的非常详细,需要的朋友们下面随着小编来一起学习学习吧... 目录$ 字符使用方式创建内插字符串包含不同的数据类型控制内插表达式的格式控制内插表达式的对齐方式内插表达式中使用转义序列内插表达式中使用

flask库中sessions.py的使用小结

《flask库中sessions.py的使用小结》在Flask中Session是一种用于在不同请求之间存储用户数据的机制,Session默认是基于客户端Cookie的,但数据会经过加密签名,防止篡改,... 目录1. Flask Session 的基本使用(1) 启用 Session(2) 存储和读取 Se