socket编程与IO multiplexing

socket通常用于跨进程通信(Inter-Process Communication,IPC),由于其最早BSD Unix发行版中使用,因此也叫做Barkeley sockets。socket封装了底层网络通信协议细节,为上层应用提供了一个统一的接口,按照使用的“域”不同,又分为用于网络通信的internet socket以及用于本地进程通信的Unix domain socket

那么,如何利用socket实现IPC通信了?在多个客户端的情况下,服务端要如何实现同时响应多个客户请求了,即如何实现多个IO端口的监听(I/O multiplex)?这篇文章,就来看看这两个问题。首先,来看下socket编程的一些基本知识。

socket编程基础

一个socket由三个元素唯一确定:

  • internet地址(如果是Unix domain socket,则对应一个本地文件名)
  • 端到端的协议类型(面向连接的TCP或者无连接的UDP)
  • 用于确定传送数据应用的端口号(port)

在使用socket前,首先需要通过系统调用socket创建一个socket对象,并返回该socket对应的文件描述符:

1
2
3
4
5

#include <sys/types.h>
#include <sys/socket.h>

int socket(int domain, int type, int protocol);

创建一个socket需要指定三个参数:

  • 通信域,确定协议族,比如AF_UNIX/AF_LOCAL用于本地通信;AF_INET/AF_INET6基于TCP/IP协议族,用于网络通信;AF_NETLINK用于进程与内核通信;AF_APPLETALK用于AppleTalk通信
  • socket类型:SOCK_STREAM提供了序列化、可靠、全双工的字节流通信;SOCK_DGRAM提供了无连接、不可靠的数据报文通信
  • 协议:指定使用的协议类型,比如是TCP(IPPROTO_TCP)还是UDP(IPPROTO_UDP),其在sys/un.h中定义;

| 有关socket参数的具体说明,可参考http://www.man7.org/linux/man-pages/man2/socket.2.html

下图是一个基于TCP协议的socket通信流程图(包括握手与挥手流程,这里只说明三次握手流程):

  • 对于客户端与服务端进程,都需要通过socket(int, int, int)来创建一个socket,得到一个socket文件描述符用于后续的通信;
  • 在服务端,首先要将创建的socket文件描述符与本的某个地址进行绑定,并监听该地址对应的文件,并通过accept接口准备接受来自客户端的通信请求;
  • 在客户端,通过调用connect尝试与服务端建立通信链路,在此过程中,客户端与服务端会进行三次握手来建立一个连接;
  • 至此,一个全双工的通信链接就建立完成了,客户端与服务端都可以同时向对方发送数据了

SocketFlow

按照上述流程,来看看一个client/server的socket通信示例: 客户端不断发送消息给服务端,服务端接受到消息后,原封不动的将其发送给客户端

  • 客户端代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77


#include<sys/types.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<arpa/inet.h>

#include<errno.h>
#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<unistd.h>

const static char* SERVER_ADDR = "127.0.0.1";
const static int MAX_BUF_SIZE = 256;

int socket_fd;

int main(int argc, char* argv[])
{
if (argc != 2) {
printf("Usage: %s <port>\n", argv[0]);
exit(0);
}

struct sockaddr_in sa;
int res;
/** 1. create a TCP based socket */
socket_fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (socket_fd < 0) {
perror("cannot create socket");
exit(EXIT_FAILURE);
}

memset(&sa, 0, sizeof(sa));
/** 2. set network address of the server */
sa.sin_family = AF_INET; /* address familiy*/
sa.sin_port = htons(atoi(argv[1])); /* address port */
sa.sin_addr.s_addr = inet_addr(SERVER_ADDR); /* internet address */
/** 3. try to connect server */
if (connect(socket_fd, (struct sockaddr*) &sa, sizeof(sa)) < 0) {
perror("connect failure");
close(socket_fd);
exit(EXIT_FAILURE);
}

printf("Client: server is connected\n");

char buf[MAX_BUF_SIZE];
/* keep sending message to server */
while(1) {
fgets(buf, MAX_BUF_SIZE, stdin);
if (strncmp(buf, "quit", 4) == 0 || strncmp(buf, "q", 1) == 0) {
printf("quit\n");
write(socket_fd, "quit", 4);
break;
} else {
printf("Client: %s\n", buf);
int len = strlen(buf);
if (send(socket_fd, buf, len, 0) != len) {
printf("client: send message error\n");
break;
}

memset(buf, 0, MAX_BUF_SIZE);
if (recv(socket_fd, buf, MAX_BUF_SIZE, 0) < 0) {
printf("fail to receive message from server\n");
break;
}
printf("Server:%s", buf);
}
}

close(socket_fd);

return 0;
}
  • 服务端代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91

#include<sys/types.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<arpa/inet.h>

#include<errno.h>
#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<unistd.h>

const static int SERVER_PORT = 2100;
const static int MAX_PENDING = 10;
const static int MAX_RCV_BUF = 256;

int main(int argc, char* argv[])
{
if (argc != 2) {
printf("Usage %s <port>\n", argv[0]);
exit(EXIT_FAILURE);
}

char buf[MAX_RCV_BUF];

int socket_fd;
struct sockaddr_in sa;

if ((socket_fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) {
printf("fail to create socket");
exit(EXIT_FAILURE);
}

sa.sin_family = AF_INET;
/** htonl/htons 将host字节序转为network字节序 */
sa.sin_addr.s_addr = htonl(INADDR_ANY); /* INADDR_ANY表示该端口可接受任何入境消息 */
sa.sin_port = htons(atoi(argv[1]));

/* 将socket与地址进行绑定*/
if (bind(socket_fd, (struct sockaddr*) &sa, sizeof(sa)) < 0) {
perror("fail to bind socket address\n");
goto errout;
}

/* 监听socket */
if (listen(socket_fd, MAX_PENDING) < 0) {
perror("fail to listen port\n");
goto errout;
}

printf("server is started\n");

for(; ;)
{
/* accept any requests from clients */
int connect_fd = accept(socket_fd, NULL, NULL);
if (connect_fd < 0) {
perror("fail to accept");
goto errout;
}

/* keep receving message from client */
while(1) {
if (read(connect_fd, buf, MAX_RCV_BUF) < 0) {
perror("fail to read");
break;
}

buf[strlen(buf)] = 0;
printf("Client: %s\n", buf);

if (write(connect_fd, buf, strlen(buf)) < 0) {
perror("fail to send to client\n");
break;
}

if (strncmp(buf, "quit", 4) == 0) {
printf("talk is done*_*\n");
break;
}
}

close(connect_fd);
}

errout:
close(socket_fd);
exit(EXIT_FAILURE);

exit(EXIT_SUCCESS);
}

在服务端,如果等待请求队列中有client的请求,则accept返回一个新的fd用于数据的读写;如果没有,则阻塞当前进程直到有客户请求为止。因此,上述服务端是无法同时响应多个客户请求的。如果服务端需要同时响应多个客户请求,不阻塞当前进程,则需要使用select或者poll/epoll来监听socket,如果有客户请求,kernel会及时通知用户进程;对每个客户请求,服务端可通过启动新的进程与线程来处理。作为演示,这里使用下面的方法来解决进程阻塞以及多个客户请求的问题:

  • 服务端进程同时监听多个端口,客户端可通过不同的端口向服务端发送数据
  • 分别使用select或者poll/epoll来监听这些端口,对于每个请求都启动一个新的线程进行处理

IO复用:服务端如何同时处理多个客户请求

目前在Linux下,常见的IO复用(I/O multiplexing)方式有:

  • select: 可监听的文件描述符(fd)最大不超过1024(由_SC_OPEN_MAX确定);
  • poll:对监听的fd个数没有限制,但是随着fd数目的增加,性能也会随着下降,因为每次收到有可用fd事件时,poll都需要遍历整个监听集合;
  • epoll: 对监听的fd个数无限制,不同与poll的地方时,epoll监听返回的是一个可用fd的集合,而不是整个监听fd的集合,因此效率上比poll更高;

接下来,就具体看看这种IO复用方式如何处理服务端的请求。

为了监听多个端口,需要在服务端进程启动时,创建多个socket:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

int main(int argc, char* argv[])
{
if (argc < 2) {
printf("Usage %s <port_1> <port_2> ... \n", argv[0]);
exit(0);
}

/* io multiplex */
fd_set sockset;
struct timeval timeout;
/* is server running */
int running = 1;
/* max file descriptors of socket */
int maxFd = -1;
int* serverSock;
int portNum = 0;

serverSock = (int*)malloc((argc - 1) * sizeof(int));

for (int i = 1; i < argc; ++i) {
int port = atoi(argv[i]);
serverSock[portNum++] = createServerSocket(port);
maxFd = serverSock[portNum-1] > maxFd ? serverSock[portNum-1] : maxFd;
}
...
}

接着,利用select或者poll或者epoll来实现上述端口的监听:

select

使用select需要做以下几个事情:

  • 创建一个fd集合: fd_set sockset;
  • 每次使用前都需要将其清空:FD_ZERO(&sockset);
  • 将所要监听的fd保存到sockset中: FD_SET(serverSock[p], &sockset);
  • 设置监听的超时时间,监听对应的fd集合:select(maxFd + 1, &sockset, NULL, NULL, &timeout),如果该调用返回-1则意味着超时,否则表示有新的IO事件了,可通过FD_ISSET(serverSock[p], &sockset)来检查对应的fd是否处于可读写的状态
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

while(running) {
/* this must be reset for every time select() is called */
FD_ZERO(&sockset);
FD_SET(STDIN_FILENO, &sockset);

for (int p = 0; p < portNum; ++p) {
FD_SET(serverSock[p], &sockset);
}

/* timeout for select IO */
/* must be called every time select() is called */
timeout.tv_sec = DEFAULT_TIMEOUT;
timeout.tv_usec = 0;

if (select(maxFd + 1, &sockset, NULL, NULL, &timeout) < 0) {
printf("fail to get ready fd for %d seconds, waiting...again", DEFAULT_TIMEOUT);
} else {
if (FD_ISSET(STDIN_FILENO, &sockset)) {
printf("server is gonna to shut down\n");
running = 0;
}

for (int p = 0; p < portNum; ++p) {
if (FD_ISSET(serverSock[p], &sockset)) {
pthread_t tid;
int clientFd = acceptConnection(serverSock[p]);
/* create pthread to handle client request */
pthread_create(&tid, NULL, (void*)handleRequest, (void*)&clientFd);
}
}
}
}

| select linux man page: http://man7.org/linux/man-pages/man2/select.2.html

poll

poll有一个专门的数据结构pollfd来记录监听的fd: 其由三部分组成,一个是监听的fd,一个是需要监听的事件,比如POLLIN/POLLPRI;一个监听返回的事件,比如‵POLLOUT/POLLERR/POLLHUP`,

1
2
3
4
5
6

struct pollfd {
int fd; /* file descriptor */
short events; /* requested events */
short revents; /* returned events */
};

利用poll来监听多个IO事件,需要:

  • 创建一个pollfd数组用于保存监听的fd集合: struct pollfd pollFds[MAX_PORTS];
  • 将需要监听的fd添加到该集合:

    1
    2
    3
    4
    5

    for (i = 1; i < argc; ++i) {
    pollFds[i].fd = createServerSocket(atoi(argv[i]));
    pollFds[i].events = POLLIN;
    }
  • 指定超时时间,监听fd集合: int ready = poll(pollFds, maxFds, MAX_POLL_TIMEOUT);, 如果返回小于0的值,则表示出现了错误;如果返回值为0,则表示超时,返回一个大于0的值,表示当前可用的fd的个数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45

int running = 1;
while (running) {
int ready = poll(pollFds, maxFds, MAX_POLL_TIMEOUT);

if (ready < 0) {
perror("fail to poll");
goto errout;
} else if (ready == 0) {
//printf("timeout\n");
continue;
} else {
memset(buf, 0, MAX_BUF_SIZE);

for (i = 0; i < maxFds; ++i) {
if (pollFds[i].fd > 0 && (pollFds[i].events & POLLIN)) {
if (pollFds[i].fd == STDIN_FILENO) {
// read from stdin stream
if (fgets(buf, MAX_BUF_SIZE, stdin) == NULL) {
perror("fail to read stdin");
continue;
}

int len = strlen(buf);
if (send(pollFds[i].fd, buf, len , 0) != len) {
perror("fail to send data");
continue;
}
} else {
int fd = acceptConnection(pollFds[i].fd);
if (fd < 0) {
perror("fail to accept connection");
} else {
pthread_t tid;
if (pthread_create(&tid, NULL, handleRequst, (void*)&fd) != 0) {
perror("fail to create handle thread");
close(fd);
goto errout;
}
}
}
}
}
}
}

| poll man page: http://man7.org/linux/man-pages/man2/poll.2.html

epoll

epoll使用一个数据结构epoll_event来描述所监听的fd集合:

1
2
3
4
5
6
7
8
9
10
11
12

typedef union epoll_data {
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;

struct epoll_event {
uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};

这里的events表示所发生的事件类型,有EPOLLIN/EPOLLOUT/EPOLLHUP,这里的取值跟poll中的事件基本一致。使用epoll的步骤:

  • 声明两个epoll_event集合,一个用于监听,一个用于监听返回: struct epoll_event polledEv[MAX_PORTS], readyEv[MAX_PORTS];
  • 在kernel创建一个epoll的fd: int epollFd = epoll_create1(0);
  • 将需要监听的fd保存到epoll_event中:epoll_ctl(epollFd, EPOLL_CTL_ADD, sockFd, &polledEv[i])
  • 等待可用的集合: int ready = epoll_wait(epollFd, readyEv, MAX_PORTS, 5);, 返回小于零表示出现错误;返回0表示超时;返回大于0,表示当前有多少个可用的文件描述符。相应的集合保存到readyEv中,因此只需要遍历这个子集即可得到对应可用的fd
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64

struct epoll_event polledEv[MAX_PORTS], readyEv[MAX_PORTS];

int i;
for (i = 0; i < argc; ++i) {
polledEv[i].data.fd = -1;
}

int epollFd = epoll_create1(0);
if (epollFd < 0) {
perror("fail to create epoll for stdin");
exit(EXIT_FAILURE);
}

polledEv[0].data.fd = STDIN_FILENO;
polledEv[0].events = EPOLLIN;

if (epoll_ctl(epollFd, EPOLL_CTL_ADD, STDIN_FILENO, &polledEv[0]) < 0) {
perror("epoll_ctl: stdin");
exit(EXIT_FAILURE);
}

for (i = 1; i < argc; ++i) {
int sockFd = createServerSocket(atoi(argv[i]));
if (sockFd < 0) {
perror("fail to create socket");
continue;
}

polledEv[i].data.fd = sockFd;
polledEv[i].events = EPOLLIN;
if (epoll_ctl(epollFd, EPOLL_CTL_ADD, sockFd, &polledEv[i]) < 0) {
perror("epoll_ctl");
close(sockFd);
exit(EXIT_FAILURE);
}
}

int running = 1;
while (running) {
int ready = epoll_wait(epollFd, readyEv, MAX_PORTS, 5);
if (ready < 0) {
perror("epoll_wait:");
goto errout;
} else if (ready == 0) {
//printf("timeout\n")
continue;
} else {
for (i = 0; i < ready; ++i) {
int fd = acceptConnecton(readyEv[i].data.fd);
if (fd < 0) {
perror("fail to accept request");
continue;
}

pthread_t tid;
if (pthread_create(&tid, NULL, handleRequest, (void*)&fd) != 0) {
perror("fail to create handle thread");
close(fd);
goto errout;
}
}
}
}

| epoll man page: http://man7.org/linux/man-pages/man7/epoll.7.html

最后,对于每个客户请求,都创建一个新的线程来处理请求:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

void handleRequest(void* args)
{
int fd = *((int*)args);
if (fd < 0) {
pthread_exit(NULL);
}

printf("handleRequest(): from fd=%d\n", fd);

char buf[MAX_BUF_SIZE];

while(1) {
if (recv(fd, buf, MAX_BUF_SIZE, 0) < 0) {
printf("fail to receive message from client\n");
break;
}

printf("Client(%d): %s\n", fd, buf);

int len = strlen(buf);
buf[len] = 0;
if (send(fd, buf, len, 0) != len) {
printf("fail to echo");
break;
}
}

close(fd);
pthread_exit(NULL);
}

这里分别采用selectpoll以及epoll在服务端监听客户IO事件,只是作为演示学习用,实际服务器端使用的是更为复杂的IO复用模型,更多相关的资料可参考C10K: http://www.kegel.com/c10k.html。

参考文献