linux网络编程系列(十)--epoll的基本使用

火山方舟向量数据库大模型
  1. 网络编程中的四种IO模型

  • 阻塞IO模型,默认socket都是阻塞的,就是IO操作都要等待操作完成以后才能返回;
  • 非阻塞IO模型,就是IO操作时不等待,立即返回,但需要不断的去询问内核,数据是否准备好了,如果准备好了,就主动调用函数去处理数据,使用fcntl设置socket为非阻塞;
  • 多路复用模型,就是事件驱动IO,也就是说检测到描述符上发生了事件,才去处理,典型的就是select和epoll;
  • 异步IO模型,就是发起IO操作后,立即返回去做其他的事,然后内核会等待数据准备完成后,将数据拷贝到用户内存中,并给用户进程发送一个信号,告知IO操作已完成;
  1. epoll函数

2.1 epoll的两种工作模式

2.1.1 LT模式(又叫水平模式,类似于select/poll):

完全靠内核驱动,只要某个文件描述符有变化,就会一直通知我们的应用程序,直到处理完毕为止。

2.1.2 ET模式(又叫边缘触发模式,必须将socket设置为非阻塞):

此种情况下,当文件描述符有变化时,epoll只会通知应用程序一次,并将描述符从监视队列中清除,直到应用程序处理完该次变化,如果应用程序没有处理,那epoll就不会再次关注该文件描述符,这样其实可能会造成丢包的。这时应用程序需要自己维护一张fds的表格,把从epoll_wait得到的状态信息登记到这张表格,然后应用程序可以选择遍历这张表格,对处于忙碌状态的fds进行操作。

2.1.3 水平模式和边沿模式的选择

ET比LT对应用程序的要求更多,需要程序员设计的部分也更多,看上去LT好像要简单很多,但是当我们要求对fd有超时控制时,LT也同样需要对fds进行遍历,此时不如使用本来就要遍历的ET。而且由于epollwait每次返回的fds数量是有限的,在大并发的模式下,LT将非常的繁忙,所有的fds都要在它的队列中产生状态信息,而每次只有一部分fds能返回给应用程序。而ET只要epollwait返回一次fds之后,这些fds就会从队列中删除,只有当fd重新变为空闲状态时才重新加入到队列中,这就是说,随着epoll_wait的返回,队列中的fds是在减少的,这样在大并发的情况下,ET模式将会更加具有优势。

2.2 epoll函数原型

2.2.1 epoll_create


 1. `int epoll\_create(int size); //创建一个epoll的句柄,size用来告诉内核要监听的数目`

返回值:>0 返回创建成功的epoll句柄 -1 失败

2.2.2 epoll_ctl


 1. `int epoll\_ctl(int epfd, int op, int fd, struct epoll\_event *event);`

epoll的事件注册函数, 注册要监听的事件类型:参数说明:

  • epfd epoll_create返回的句柄
  • op 表示动作,用3个宏表示:EPOLLCTLADD 注册新的fd到epfd中,EPOLLCTLMOD,修改已经注册的fd的监听事件,EPOLLCTLDEL,从epfd中删除一个fd。
  • fd 表示要监听的fd(一般就是socket函数生成的文件描述符)
  • event 告诉内核要监听什么事

struct epoll_event结构如下:


 1. `struct epoll_event {`
2. `__uint32_t events; //多个宏的集合,表示对应文件描述符可读、可写、紧急可读等等`
3. `epoll_data_t data; //一个联合体,详细介绍见下面`
4. `};`
5. `typedef union epoll_data`
6. `{`
7. `void *ptr;`
8. `int fd;`
9. `uint32_t u32;`
10. `uint64_t u64;`
11. `}epoll_data_t;`

2.2.3 epoll_wait


 1. `int epoll\_wait(int epfd, struct epoll\_event* events, int maxevents, int timeout);`

参数说明如下:

  • events 根据events中事件,决定是调用accept回应一个连接,还是调用read或者write读写文件
  • maxevents 告诉内核这个events多大,并且不能大于epoll_create中的size

功能说明:等侍注册在epfd(epoll生成的文件描述符)上的socket fd的事件的发生,如果发生则将发生的sokct fd和事件类型放入到events数组中。

并且将注册在epfd上的socket fd的事件类型给清空,所以如果下一个循环你还要关注这个socket fd的话,则需要用epollctl(epfd,EPOLLCTLMOD,listenfd,&ev)来重新设置socket fd的事件类型。这时不用EPOLLCTLADD,因为socket fd并未清空,只是事件类型清空。这一步非常重要。当epollwait返回时根据返回值(大于0)调用accept。

2.3 epoll的实现

2.3.1 epoll函数调用过程

socket/bind/listen/epollcreate/epollctl/epoll_wait/accept/read/write/close

2.3.2 代码实现

首先对CTCP类做一下补充,将socket设置为非阻塞:


 1. `int CTcp::SetNoblock (int nSock)`
2. `{`
3. `assert (m_nSock != -1);`
4. `int nFlags;`
5. 
6. `if ( nSock == -1 )`
7. `{`
8. `nSock = m_nSock;`
9. `}`
10. 
11. `if ((nFlags = fcntl (nSock, F_GETFL, 0)) < 0)`
12. `return 0;`
13. 
14. `nFlags = nFlags | O_NONBLOCK;`
15. 
16. `if (fcntl (nSock, F_SETFL, nFlags) < 0)`
17. `return 0;`
18. 
19. `return 1;`
20. `}`

然后基于CTCP类,实现CEpollServer类,代码如下:


 1. `//EpollServer.h`
2. `#ifndef __EPOLL_SERVER_H__`
3. `#define __EPOLL_SERVER_H__`
4. 
5. `#include "SxTcp.h"`
6. 
7. `//Tcp类`
8. `class CEpollServer`
9. `{`
10. 
11. `//构造函数`
12. `public:`
13. `CEpollServer ();`
14. `virtual ~CEpollServer ();`
15. 
16. `//公有成员函数`
17. `public:`
18. `int CreateEpoll(const char* szIp, int nPort, int nSize);`
19. `int ProcessEpoll();`
20. `int CloseEpoll();`
21. 
22. `//私有成员变量`
23. `private:`
24. `CTcp m_cTcp;`
25. `int m_nEpollFd;`
26. `};`
27. 
28. `#endif`


 1. `//EpollServer.cpp`
2. `#include "EpollServer.h"`
3. `#include <sys/epoll.h>`
4. `#include "TypeError.h"`
5. `#include <assert.h>`
6. `#include <string.h>`
7. `#include <unistd.h>`
8. `#include <errno.h>`
9. `#include <sys/types.h>`
10. `#include <sys/socket.h>`
11. `#include <netinet/in.h>`
12. `#include <stdio.h>`
13. 
14. `CEpollServer::CEpollServer ()`
15. `{`
16. `m_nEpollFd = -1;`
17. `}`
18. 
19. `CEpollServer::~CEpollServer ()`
20. `{`
21. `CloseEpoll();`
22. `m_cTcp.Close();`
23. `}`
24. 
25. 
26. `/*创建epoll句柄`
27. `入参:`
28. `szIp 服务器ip地址`
29. `nPort 要绑定的端口`
30. `nSize 要监听的文件描述符数量`
31. `出参:1: 成功 ; 0: 失败`
32. `*/`
33. `int CEpollServer::CreateEpoll(const char* szIp, int nPort, int nSize)`
34. `{`
35. `assert(szIp != nullptr);`
36. `int iRet = 0;`
37. `int size = (nSize > 0 ? nSize : DEFAULT_EPOLL_FD_NUM);`
38. 
39. `iRet = m_cTcp.Open();`
40. `if ( iRet == 0 )`
41. `{`
42. `return SOCKET_ERROR;`
43. `}`
44. 
45. `iRet = m_cTcp.Bind(szIp, nPort);`
46. `if ( iRet == 0 )`
47. `{`
48. `return BIND_ERROR;`
49. `}`
50. 
51. `iRet = m_cTcp.SetNoblock();`
52. `if ( iRet == 0 )`
53. `{`
54. `return SETSOCKOPT_ERROR;`
55. `}`
56. 
57. `iRet = m_cTcp.Listen(nSize+1);//监听描述符数量要比epoll的多?`
58. `if ( iRet == 0)`
59. `{`
60. `return LISTEN_ERROR;`
61. `}`
62. 
63. `if ( m_nEpollFd != -1 )`
64. `{`
65. `CloseEpoll();`
66. `}`
67. 
68. `m_nEpollFd = epoll_create(size);`
69. `if ( m_nEpollFd == -1)`
70. `{`
71. `return EPOLL_CREATE_ERROR;`
72. `}`
73. 
74. `return 1;`
75. `}`
76. 
77. `/*处理epoll事件`
78. `出参:1: 成功 ; 0: 失败`
79. `*/`
80. `int CEpollServer::ProcessEpoll()`
81. `{`
82. `assert(m_nEpollFd != -1);`
83. `int nFds = 0;`
84. `int connFd = -1, readFd = -1, writeFd = -1;`
85. `int n = 0, nSize = 0;`
86. `int nListenFd = -1;`
87. `char buf[MAX_READ_SIZE] = {0};`
88. `struct sockaddr_in clientAddr;`
89. `socklen_t clilen;`
90. `struct epoll_event ev, events[20];`
91. `memset((void*)&ev, 0, sizeof(ev));`
92. `nListenFd = m_cTcp.GetHandle();`
93. `ev.data.fd = nListenFd;`
94. `ev.events = EPOLLIN|EPOLLET;`
95. `if ( epoll_ctl(m_nEpollFd, EPOLL_CTL_ADD, nListenFd, &ev) == -1 )`
96. `{`
97. `return EPOLL_CTL_ERROR;`
98. `}`
99. `while(1)`
100. `{`
101. `n = 0;`
102. `nSize = 0;`
103. `nFds = epoll_wait(m_nEpollFd, events, 20, 500);`
104. `for (int i = 0; i< nFds; ++i)`
105. `{`
106. `memset(buf, 0, MAX_READ_SIZE);`
107. `if (events[i].data.fd == nListenFd )`
108. `{`
109. `while ( (connFd = accept(nListenFd, (sockaddr*)&clientAddr, &clilen)) > 0 )`
110. `{`
111. `m_cTcp.SetNoblock(connFd); //ET模式需设置为非阻塞的`
112. `ev.data.fd = connFd;`
113. `ev.events = EPOLLIN|EPOLLET;`
114. `if ( epoll_ctl(m_nEpollFd, EPOLL_CTL_ADD, connFd, &ev) == -1 )`
115. `{`
116. `return EPOLL_CTL_ERROR;`
117. `}`
118. `}`
119. `if ( connFd == -1 && errno != EAGAIN && errno != ECONNABORTED && errno != EPROTO && errno != EINTR )`
120. `{`
121. `return ACCEPT_ERROR;`
122. `}`
123. `continue;`
124. `}`
125. `else if(events[i].events & EPOLLIN)`
126. `{`
127. `readFd = events[i].data.fd;`
128. `if (readFd < 0)`
129. `{`
130. `continue;`
131. `}`
132. `//读取数据`
133. `while ( (nSize = read(readFd, buf+n, MAX_READ_SIZE - 1)) > 0 )`
134. `{`
135. `n += nSize;`
136. `}`
137. `//EAGAIN说明读到结尾了`
138. `if (nSize == -1 && errno != EAGAIN )`
139. `{`
140. `fprintf(stderr, "epoll read failed\n");`
141. `//ngleLog::WriteLog(ERROR, "%s", "epoll read fialed");`
142. `}`
143. 
144. `fprintf(stdout, "read data is:%s\n", buf);`
145. 
146. `ev.data.fd = readFd;`
147. `ev.events = EPOLLOUT|EPOLLET;//边沿模式(ET)`
148. `epoll_ctl(m_nEpollFd, EPOLL_CTL_MOD, readFd, &ev);`
149. `}`
150. `else if(events[i].events & EPOLLOUT)`
151. `{`
152. `writeFd = events[i].data.fd;`
153. `//写数据`
154. `strncpy(buf, "hello client", sizeof(buf)-1);`
155. `int dataSize = strlen(buf);`
156. `n = dataSize;`
157. `while(n > 0)`
158. `{`
159. `nSize = write(writeFd, buf + dataSize - n, n);`
160. `if (nSize < n)`
161. `{`
162. `if (nSize == -1 && errno != EAGAIN)`
163. `{`
164. `break;`
165. `}`
166. `}`
167. `n -= nSize;`
168. `}`
169. 
170. `ev.data.fd = writeFd;`
171. `ev.events = EPOLLIN|EPOLLET;`
172. `epoll_ctl(m_nEpollFd, EPOLL_CTL_MOD, writeFd, &ev);`
173. `}`
174. `}`
175. `}`
176. `}`
177. 
178. `/*`
179. `关闭epoll文件描述符`
180. `*/`
181. `int CEpollServer::CloseEpoll()`
182. `{`
183. `if (m_nEpollFd != -1)`
184. `{`
185. `close (m_nEpollFd);`
186. `m_nEpollFd = -1;`
187. `}`
188. `return 1;`
189. 
190. `}`

将上面CEpollServer类和TCP类结合起来编译成一个动态库,makefile如下:


 1. `LIB_DIR=./lib`
2. `src=$(wildcard *.cpp)`
3. `obj=$(patsubst %.cpp,%.o,$(src))`
4. `PIC=-fPIC`
5. `LIBSO=-shared`
6. `#CC=g++ -gdwarf-2 -gstrict-dwarf`
7. `CC=g++ -g`
8. 
9. `%.o:%.cpp`
10. `$(CC) -c $< $(PIC)`
11. 
12. `network:$(obj)`
13. `$(CC) -o libnetwork.so $^ $(LIBSO)`
14. `cp -f libnetwork.so ../test/lib`
15. 
16. `clean:`
17. `rm -f *.o *.so`

然后实现TestEpollServer.cpp,如下:

注意:下面ConfigIni和SingleLog都是我本人测试时候写的库,如需使用下面代码,需要修改!


 1. `#include "../../readini/ConfigIni.h"`
2. `#include <string>`
3. `#include "../../network/EpollServer.h"`
4. `#include "../../log/SingleLog.h"`
5. 
6. `CEpollServer g_clEpollServer;`
7. `#define FILEDIR "./socket.ini"`
8. 
9. `//epoll server`
10. `int epoll_server_init()`
11. `{`
12. `int iRet = -1;`
13. `string strIp;`
14. `int nPort = 0, nEpollNum = 0, nTimeout = 0;`
15. `ConfigIni::Init(string(FILEDIR));`
16. `strIp = ConfigIni::ReadStr(string("SERVER"), string("Addr"));`
17. `if (strIp == "")`
18. `{`
19. `SingleLog::WriteLog(ERROR,"read server addr failed");`
20. `return iRet;`
21. `}`
22. 
23. `nPort = ConfigIni::ReadInt(string("SERVER"), string("Port"));`
24. `if ( nPort == -1 )`
25. `{`
26. `SingleLog::WriteLog(ERROR,"read server port failed");`
27. `return iRet;`
28. `}`
29. 
30. `nEpollNum = ConfigIni::ReadInt(string("SERVER"), string("MaxEpollNum"));`
31. `if ( nEpollNum == -1 )`
32. `{`
33. `SingleLog::WriteLog(ERROR,"read server epoll num failed");`
34. `return iRet;`
35. `}`
36. 
37. `nTimeout = ConfigIni::ReadInt(string("SERVER"), string("Timeout"));`
38. `if ( nTimeout == -1 )`
39. `{`
40. `SingleLog::WriteLog(ERROR,"read server timeout failed");`
41. `return iRet;`
42. `}`
43. 
44. `iRet = g_clEpollServer.CreateEpoll(strIp.c_str(), nPort, nEpollNum);`
45. `if ( iRet == 0 )`
46. `{`
47. `SingleLog::WriteLog(ERROR, "epoll create failed");`
48. `return -1;`
49. `}`
50. 
51. `return 0;`
52. `}`
53. 
54. `void epoll_server_run()`
55. `{`
56. `g_clEpollServer.ProcessEpoll();`
57. `}`
58. 
59. `int main()`
60. `{`
61. `SingleLog::Init();`
62. `if (epoll_server_init() == -1)`
63. `{`
64. `return -1;`
65. `}`
66. `epoll_server_run();`
67. `return 0;`
68. `}`


 1. `//TestClient.cpp`
2. `#include <stdio.h>`
3. `#include <iostream>`
4. `#include <string.h>`
5. `#include "../../network/SxTcp.h"`
6. `using namespace std;`
7. 
8. `int main()`
9. `{`
10. `CTcp tcp;`
11. `int iRet = 0;`
12. `int iFd = 0;`
13. `char buf[128] = {0};`
14. 
15. `iRet = tcp.Open();`
16. `if (iRet == 0)`
17. `{`
18. `perror("socket create failed");`
19. `return -1;`
20. `}`
21. 
22. `iRet = tcp.Connect("192.168.233.250", 6666);`
23. `if (iRet == 0)`
24. `{`
25. `perror("socket connect failed");`
26. `return -1;`
27. `}`
28. 
29. `while(1)`
30. `{`
31. `memset(buf, 0, sizeof(buf));`
32. `cout << "please input some string:";`
33. `cin >> buf;`
34. `iRet = tcp.Send(buf, strlen(buf));`
35. `if (iRet < -1 && errno != EAGAIN)`
36. `{`
37. `perror("send failed");`
38. `return -1;`
39. `}`
40. `else if(iRet == 0)`
41. `{`
42. `perror("connect is closed");`
43. `return -1;`
44. `}`
45. 
46. `memset(buf, 0, sizeof(buf));`
47. `iRet = tcp.Recv(buf, sizeof(buf));`
48. `if (iRet < 0 && errno != EAGAIN)`
49. `{`
50. `perror("recv failed");`
51. `return -1;`
52. `}`
53. `else if(iRet == 0)`
54. `{`
55. `perror("socket not connect");`
56. `return -1;`
57. `}`
58. 
59. `fprintf(stdout, "recv data is:%s\n", buf);`
60. `}`
61. 
62. `return 0;`
63. `}`

分别编译TestEpollServer.cpp和TestClient.cpp,生成服务端和客户端应用程序,即可实现通信。

更多c++及python系列文章,请关注我的公众号:晟夏的叶。

picture.image

0
0
0
0
关于作者
关于作者

文章

0

获赞

0

收藏

0

相关资源
如何利用云原生构建 AIGC 业务基石
AIGC即AI Generated Content,是指利用人工智能技术来生成内容,AIGC也被认为是继UGC、PGC之后的新型内容生产方式,AI绘画、AI写作等都属于AIGC的分支。而 AIGC 业务的部署也面临着异构资源管理、机器学习流程管理等问题,本次分享将和大家分享如何使用云原生技术构建 AIGC 业务。
相关产品
评论
未登录
看完啦,登录分享一下感受吧~
暂无评论