socket通常用于跨进程通信(Inter-Process Communication,IPC),由于其最早BSD Unix发行版中使用,因此也叫做Barkeley sockets 。socket封装了底层网络通信协议细节,为上层应用提供了一个统一的接口,按照使用的“域”不同,又分为用于网络通信的internet socket以及用于本地进程通信的Unix domain socket。
那么,如何利用socket实现IPC通信了?在多个客户端的情况下,服务端要如何实现同时响应多个客户请求了,即如何实现多个IO端口的监听(I/O multiplex)?这篇文章,就来看看这两个问题。首先,来看下socket编程的一些基本知识。
socket编程基础 一个socket由三个元素唯一确定:
internet地址(如果是Unix domain socket,则对应一个本地文件名)
端到端的协议类型(面向连接的TCP或者无连接的UDP)
用于确定传送数据应用的端口号(port)
在使用socket前,首先需要通过系统调用socket
创建一个socket对象,并返回该socket对应的文件描述符:
1 2 3 4 5 6 #include <sys/types.h> #include <sys/socket.h> int socket (int domain, int type, int protocol) ;
创建一个socket需要指定三个参数:
通信域,确定协议族,比如AF_UNIX/AF_LOCAL
用于本地通信;AF_INET/AF_INET6
基于TCP/IP协议族,用于网络通信;AF_NETLINK
用于进程与内核通信;AF_APPLETALK
用于AppleTalk通信
socket类型:SOCK_STREAM
提供了序列化、可靠、全双工的字节流通信;SOCK_DGRAM
提供了无连接、不可靠的数据报文通信
协议:指定使用的协议类型,比如是TCP(IPPROTO_TCP
)还是UDP(IPPROTO_UDP
),其在sys/un.h
中定义;
| 有关socket参数的具体说明,可参考http://www.man7.org/linux/man-pages/man2/socket.2.html
下图是一个基于TCP协议的socket通信流程图(包括握手与挥手流程,这里只说明三次握手流程):
对于客户端与服务端进程,都需要通过socket(int, int, int)
来创建一个socket,得到一个socket文件描述符用于后续的通信;
在服务端,首先要将创建的socket文件描述符与本的某个地址进行绑定,并监听该地址对应的文件,并通过accept
接口准备接受来自客户端的通信请求;
在客户端,通过调用connect
尝试与服务端建立通信链路,在此过程中,客户端与服务端会进行三次握手来建立一个连接;
至此,一个全双工的通信链接就建立完成了,客户端与服务端都可以同时向对方发送数据了
按照上述流程,来看看一个client/server的socket通信示例: 客户端不断发送消息给服务端,服务端接受到消息后,原封不动的将其发送给客户端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <errno.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> const static char * SERVER_ADDR = "127.0.0.1" ;const static int MAX_BUF_SIZE = 256 ;int socket_fd;int main (int argc, char * argv[]) { if (argc != 2 ) { printf ("Usage: %s <port>\n" , argv[0 ]); exit (0 ); } struct sockaddr_in sa; int res; socket_fd = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP); if (socket_fd < 0 ) { perror ("cannot create socket" ); exit (EXIT_FAILURE); } memset (&sa, 0 , sizeof (sa)); sa.sin_family = AF_INET; sa.sin_port = htons (atoi (argv[1 ])); sa.sin_addr.s_addr = inet_addr (SERVER_ADDR); if (connect (socket_fd, (struct sockaddr*) &sa, sizeof (sa)) < 0 ) { perror ("connect failure" ); close (socket_fd); exit (EXIT_FAILURE); } printf ("Client: server is connected\n" ); char buf[MAX_BUF_SIZE]; while (1 ) { fgets (buf, MAX_BUF_SIZE, stdin); if (strncmp (buf, "quit" , 4 ) == 0 || strncmp (buf, "q" , 1 ) == 0 ) { printf ("quit\n" ); write (socket_fd, "quit" , 4 ); break ; } else { printf ("Client: %s\n" , buf); int len = strlen (buf); if (send (socket_fd, buf, len, 0 ) != len) { printf ("client: send message error\n" ); break ; } memset (buf, 0 , MAX_BUF_SIZE); if (recv (socket_fd, buf, MAX_BUF_SIZE, 0 ) < 0 ) { printf ("fail to receive message from server\n" ); break ; } printf ("Server:%s" , buf); } } close (socket_fd); return 0 ; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <errno.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> const static int SERVER_PORT = 2100 ;const static int MAX_PENDING = 10 ;const static int MAX_RCV_BUF = 256 ;int main (int argc, char * argv[]) { if (argc != 2 ) { printf ("Usage %s <port>\n" , argv[0 ]); exit (EXIT_FAILURE); } char buf[MAX_RCV_BUF]; int socket_fd; struct sockaddr_in sa; if ((socket_fd = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0 ) { printf ("fail to create socket" ); exit (EXIT_FAILURE); } sa.sin_family = AF_INET; sa.sin_addr.s_addr = htonl (INADDR_ANY); sa.sin_port = htons (atoi (argv[1 ])); if (bind (socket_fd, (struct sockaddr*) &sa, sizeof (sa)) < 0 ) { perror ("fail to bind socket address\n" ); goto errout; } if (listen (socket_fd, MAX_PENDING) < 0 ) { perror ("fail to listen port\n" ); goto errout; } printf ("server is started\n" ); for (; ;) { int connect_fd = accept (socket_fd, NULL , NULL ); if (connect_fd < 0 ) { perror ("fail to accept" ); goto errout; } while (1 ) { if (read (connect_fd, buf, MAX_RCV_BUF) < 0 ) { perror ("fail to read" ); break ; } buf[strlen (buf)] = 0 ; printf ("Client: %s\n" , buf); if (write (connect_fd, buf, strlen (buf)) < 0 ) { perror ("fail to send to client\n" ); break ; } if (strncmp (buf, "quit" , 4 ) == 0 ) { printf ("talk is done*_*\n" ); break ; } } close (connect_fd); } errout: close (socket_fd); exit (EXIT_FAILURE); exit (EXIT_SUCCESS); }
在服务端,如果等待请求队列中有client的请求,则accept
返回一个新的fd用于数据的读写;如果没有,则阻塞当前进程直到有客户请求为止。因此,上述服务端是无法同时响应多个客户请求的。如果服务端需要同时响应多个客户请求,不阻塞当前进程,则需要使用select
或者poll/epoll
来监听socket,如果有客户请求,kernel会及时通知用户进程;对每个客户请求,服务端可通过启动新的进程与线程来处理。作为演示,这里使用下面的方法来解决进程阻塞以及多个客户请求的问题:
服务端进程同时监听多个端口,客户端可通过不同的端口向服务端发送数据
分别使用select
或者poll/epoll
来监听这些端口,对于每个请求都启动一个新的线程进行处理
IO复用:服务端如何同时处理多个客户请求 目前在Linux下,常见的IO复用(I/O multiplexing)方式有:
select
: 可监听的文件描述符(fd)最大不超过1024(由_SC_OPEN_MAX
确定);
poll
:对监听的fd个数没有限制,但是随着fd数目的增加,性能也会随着下降,因为每次收到有可用fd事件时,poll都需要遍历整个监听集合;
epoll
: 对监听的fd个数无限制,不同与poll的地方时,epoll监听返回的是一个可用fd的集合,而不是整个监听fd的集合,因此效率上比poll更高;
接下来,就具体看看这种IO复用方式如何处理服务端的请求。
为了监听多个端口,需要在服务端进程启动时,创建多个socket:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 int main (int argc, char * argv[]) { if (argc < 2 ) { printf ("Usage %s <port_1> <port_2> ... \n" , argv[0 ]); exit (0 ); } fd_set sockset; struct timeval timeout; int running = 1 ; int maxFd = -1 ; int * serverSock; int portNum = 0 ; serverSock = (int *)malloc ((argc - 1 ) * sizeof (int )); for (int i = 1 ; i < argc; ++i) { int port = atoi (argv[i]); serverSock[portNum++] = createServerSocket (port); maxFd = serverSock[portNum-1 ] > maxFd ? serverSock[portNum-1 ] : maxFd; } ... }
接着,利用select
或者poll
或者epoll
来实现上述端口的监听:
select
使用select
需要做以下几个事情:
创建一个fd集合: fd_set sockset;
每次使用前都需要将其清空:FD_ZERO(&sockset);
将所要监听的fd保存到sockset
中: FD_SET(serverSock[p], &sockset);
设置监听的超时时间,监听对应的fd集合:select(maxFd + 1, &sockset, NULL, NULL, &timeout)
,如果该调用返回-1则意味着超时,否则表示有新的IO事件了,可通过FD_ISSET(serverSock[p], &sockset)
来检查对应的fd是否处于可读写的状态
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 while (running) { FD_ZERO (&sockset); FD_SET (STDIN_FILENO, &sockset); for (int p = 0 ; p < portNum; ++p) { FD_SET (serverSock[p], &sockset); } timeout.tv_sec = DEFAULT_TIMEOUT; timeout.tv_usec = 0 ; if (select (maxFd + 1 , &sockset, NULL , NULL , &timeout) < 0 ) { printf ("fail to get ready fd for %d seconds, waiting...again" , DEFAULT_TIMEOUT); } else { if (FD_ISSET (STDIN_FILENO, &sockset)) { printf ("server is gonna to shut down\n" ); running = 0 ; } for (int p = 0 ; p < portNum; ++p) { if (FD_ISSET (serverSock[p], &sockset)) { pthread_t tid; int clientFd = acceptConnection (serverSock[p]); pthread_create (&tid, NULL , (void *)handleRequest, (void *)&clientFd); } } } }
| select linux man page: http://man7.org/linux/man-pages/man2/select.2.html
poll
poll有一个专门的数据结构pollfd
来记录监听的fd: 其由三部分组成,一个是监听的fd,一个是需要监听的事件,比如POLLIN/POLLPRI
;一个监听返回的事件,比如‵POLLOUT/POLLERR/POLLHUP`,
1 2 3 4 5 6 7 struct pollfd { int fd; short events; short revents; };
利用poll来监听多个IO事件,需要:
创建一个pollfd
数组用于保存监听的fd集合: struct pollfd pollFds[MAX_PORTS];
将需要监听的fd添加到该集合:
1 2 3 4 5 6 for (i = 1 ; i < argc; ++i) { pollFds[i].fd = createServerSocket (atoi (argv[i])); pollFds[i].events = POLLIN; }
指定超时时间,监听fd集合: int ready = poll(pollFds, maxFds, MAX_POLL_TIMEOUT);
, 如果返回小于0的值,则表示出现了错误;如果返回值为0,则表示超时,返回一个大于0的值,表示当前可用的fd的个数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 int running = 1 ;while (running) { int ready = poll (pollFds, maxFds, MAX_POLL_TIMEOUT); if (ready < 0 ) { perror ("fail to poll" ); goto errout; } else if (ready == 0 ) { continue ; } else { memset (buf, 0 , MAX_BUF_SIZE); for (i = 0 ; i < maxFds; ++i) { if (pollFds[i].fd > 0 && (pollFds[i].events & POLLIN)) { if (pollFds[i].fd == STDIN_FILENO) { if (fgets (buf, MAX_BUF_SIZE, stdin) == NULL ) { perror ("fail to read stdin" ); continue ; } int len = strlen (buf); if (send (pollFds[i].fd, buf, len , 0 ) != len) { perror ("fail to send data" ); continue ; } } else { int fd = acceptConnection (pollFds[i].fd); if (fd < 0 ) { perror ("fail to accept connection" ); } else { pthread_t tid; if (pthread_create (&tid, NULL , handleRequst, (void *)&fd) != 0 ) { perror ("fail to create handle thread" ); close (fd); goto errout; } } } } } } }
| poll man page: http://man7.org/linux/man-pages/man2/poll.2.html
epoll
epoll使用一个数据结构epoll_event
来描述所监听的fd集合:
1 2 3 4 5 6 7 8 9 10 11 12 13 typedef union epoll_data { void *ptr; int fd; uint32_t u32; uint64_t u64; } epoll_data_t ; struct epoll_event { uint32_t events; epoll_data_t data; };
这里的events
表示所发生的事件类型,有EPOLLIN/EPOLLOUT/EPOLLHUP
,这里的取值跟poll中的事件基本一致。使用epoll的步骤:
声明两个epoll_event集合,一个用于监听,一个用于监听返回: struct epoll_event polledEv[MAX_PORTS], readyEv[MAX_PORTS];
在kernel创建一个epoll的fd: int epollFd = epoll_create1(0);
将需要监听的fd保存到epoll_event中:epoll_ctl(epollFd, EPOLL_CTL_ADD, sockFd, &polledEv[i])
等待可用的集合: int ready = epoll_wait(epollFd, readyEv, MAX_PORTS, 5);
, 返回小于零表示出现错误;返回0表示超时;返回大于0,表示当前有多少个可用的文件描述符。相应的集合保存到readyEv
中,因此只需要遍历这个子集即可得到对应可用的fd
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 struct epoll_event polledEv[MAX_PORTS], readyEv[MAX_PORTS]; int i; for (i = 0 ; i < argc; ++i) { polledEv[i].data.fd = -1 ; } int epollFd = epoll_create1 (0 ); if (epollFd < 0 ) { perror ("fail to create epoll for stdin" );exit (EXIT_FAILURE); } polledEv[0 ].data.fd = STDIN_FILENO; polledEv[0 ].events = EPOLLIN; if (epoll_ctl (epollFd, EPOLL_CTL_ADD, STDIN_FILENO, &polledEv[0 ]) < 0 ) { perror ("epoll_ctl: stdin" );exit (EXIT_FAILURE); } for (i = 1 ; i < argc; ++i) { int sockFd = createServerSocket (atoi (argv[i]));if (sockFd < 0 ) { perror ("fail to create socket" ); continue ; } polledEv[i].data.fd = sockFd; polledEv[i].events = EPOLLIN; if (epoll_ctl (epollFd, EPOLL_CTL_ADD, sockFd, &polledEv[i]) < 0 ) { perror ("epoll_ctl" ); close (sockFd); exit (EXIT_FAILURE); } } int running = 1 ; while (running) { int ready = epoll_wait (epollFd, readyEv, MAX_PORTS, 5 );if (ready < 0 ) { perror ("epoll_wait:" ); goto errout; } else if (ready == 0 ) { continue ; } else { for (i = 0 ; i < ready; ++i) { int fd = acceptConnecton (readyEv[i].data.fd); if (fd < 0 ) { perror ("fail to accept request" ); continue ; } pthread_t tid; if (pthread_create (&tid, NULL , handleRequest, (void *)&fd) != 0 ) { perror ("fail to create handle thread" ); close (fd); goto errout; } } } }
| epoll man page: http://man7.org/linux/man-pages/man7/epoll.7.html
最后,对于每个客户请求,都创建一个新的线程来处理请求:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 void handleRequest (void * args) { int fd = *((int *)args); if (fd < 0 ) { pthread_exit (NULL ); } printf ("handleRequest(): from fd=%d\n" , fd); char buf[MAX_BUF_SIZE]; while (1 ) { if (recv (fd, buf, MAX_BUF_SIZE, 0 ) < 0 ) { printf ("fail to receive message from client\n" ); break ; } printf ("Client(%d): %s\n" , fd, buf); int len = strlen (buf); buf[len] = 0 ; if (send (fd, buf, len, 0 ) != len) { printf ("fail to echo" ); break ; } } close (fd); pthread_exit (NULL ); }
这里分别采用select
,poll
以及epoll
在服务端监听客户IO事件,只是作为演示学习用,实际服务器端使用的是更为复杂的IO复用模型,更多相关的资料可参考C10K: http://www.kegel.com/c10k.html。
参考文献