從select引起的bug聊聊多路復用一

碼農世界 發佈 2022-09-18T19:29:12.199573+00:00

一 前言首先祝大家雙節過的開心,平安喜樂!

一 前言

首先祝大家雙節過的開心,平安喜樂! 很久沒寫文章了,主要自己還在沉澱,學習類的分享總覺得為了分享而分享,多幾天可能自己都記不清細節了,所以一直沒有再去寫,這次遇到一個比較有意思的bug,多路復用的一個bug,這個領域那,雖然自己也學習過,但是一直也沒寫過代碼練習,就這個機會就一併練習下,可能對高手來說這是稀鬆平常的問題,卻耗費了我們一天左右的時間進行問題的排查。

二 問題描述和排查步驟

我們有個跑了很久的c開發的系統,在新版本測試中,發現一直會core,core的位置飄忽不定,而且core的有點莫名其妙,根本不該core的地方卻core了,開始從現象看來很像是多線程引起的問題,排查了下卻沒有發現問題所在。

由於代碼量很多,我們排查步驟是:

  1. 利用ascan庫定位core的位置,我們根據core的地方開始關閉相關的功能。
  2. 減少了core的地方後,接下來還是會core,core的位置在一個unix socket 通信線程的創建上,這個線程本該早就創建好的,但是為什麼運行5-10分鐘才開始創建,線程創建沒有做父子進程的監控,所以不存在重啟可能而且如果是這個線程掛了,引起的重新創建也是不可能的,因為線程掛了,必然會導致進程都掛了,結果整個進程的其他線程仍然是正常運行的。(這個至今無解)
  3. 由於是線程創建問題,同事注意到了此進程的由於新增寫kafka的功能,導致線程過多,遂代碼上注釋掉這些功能,繼續排查。
  4. 由於這個線程主要用來執行一些程序交互命令的,所以就用客戶端工具連著去測試,發現經常連不上,有時候連上也會core,ascan的報錯信息:
ASAN:SIGSEGV
=================================================================
==316088== ERROR: AddressSanitizer: SEGV on unknown address 0x00000366650b (pc 0x00000366650b sp 0x7f6e7db81fa0 bp 0x7f6e7db82820 T236)
AddressSanitizer can not provide additional info.
    #0 0x366650a (+0xbd650a)

從報錯信息利用add2line命令查到具體的堆棧,這個命令以前文章有聊過,執行起來是:

addr2line -a -C -e bin/可執行程序 pc對應的地址
  1. 如是經過gdb調試,發現core的時候在unix socket的處理函數的返回上,也就是說棧信息被破壞了,百思不得其解啊,甚至彙編每次跟蹤地址也沒查到誰破壞的。

正常連接的時候,客戶端進程卡死,通過strace 跟蹤客戶端的系統調用,如下:

socket(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC, 0) = 3
connect(3, {sa_family=AF_UNIX, sun_path="./run/xxxx.socket"}, 33) = 0
ioctl(3, FIONBIO, [1])                  = 0
poll([{fd=3, events=POLLOUT}], 1, 10000) = 1 ([{fd=3, revents=POLLOUT}])
sendto(3, "{\"version\": \"0.2\"}", 18, 0, NULL, 0) = 18
select(4, [3], [], [], {tv_sec=600, tv_usec=0}) = 1 (in [3], left {tv_sec=599, tv_usec=999794})
poll([{fd=3, events=POLLIN}], 1, 10000) = 1 ([{fd=3, revents=POLLIN}])
recvfrom(3, "{\"return\":\"OK\"}\n", 1024, 0, NULL, NULL) = 16
poll([{fd=3, events=POLLOUT}], 1, 10000) = 1 ([{fd=3, revents=POLLOUT}])
sendto(3, "{\"command\": \"command-list\"}\n", 28, 0, NULL, 0) = 28
select(4, [3], [], [], {tv_sec=600, tv_usec=0}

通過客戶端的日誌列印信息,發送command-list命令後伺服器端沒有返回,sendto命令是成功的,返回28,來看看伺服器端怎麼說:

[23331] 9/9/2022 -- 22:01:47 - (xxx.c:403) <Info> (xx) -- Unix socket: recv MSG: {"version": "0.2"}
[23331] 9/9/2022 -- 22:01:47 - (xxx.c:449) <Info> (xx) -- unix socket: send to client:(null)
[23331] 9/9/2022 -- 22:01:47 - (xxx.c:343) <Info> (xx) -- Unix socket: send content:{"return":"OK"}
[23331] 9/9/2022 -- 22:01:47 - (xxx.c:345) <Info> (xx) -- Unix socket:sent message of size 16 to client socket 1118

通過伺服器端的日誌來看,只收到了初次的版本信息,後續的command-list命令並沒有收到。 這就很奇怪了。

交互圖

  1. 百思不得其解,是難道是內核bug?通過gdb調試並沒有發現什麼問題,接著通過lsof 查看socket文件的連接數,當我們通過客戶端去連接的時候,連接數遞增了,這沒啥問題,如下圖:
[root@localhost xx]# lsof ./run/command.socket
COMMAND     PID USER   FD   TYPE             DEVICE SIZE/OFF    NODE NAME
xxx   30894 root  101u  unix 0xffff8810e7e99800      0t0  300947 ./run/command.socket
xxx  30894  root  1172u  unix 0xffff8802b42a4000      0t0 1446065 ./run/command.socket

開始沒有注意到這個1172,這個文件描述符有什麼特別的地方,也知道select做多路復用的時候,有一定的局限,只能處理1024個連接,我在想,我們就只有一個連接沒有超過1024這個限制啊, 也許有朋友知道了原因,是1172超過了1024,也就是說select的FD的數量不能超過1024,且大小也不能超過,那麼就是這麼簡單嘛,繼續實踐吧。

The behavior of these macros is undefined if a descriptor value is less than zero or
greater than or equal to FD_SETSIZE, which is normally at least equal to the maximum num-
ber of descriptors supported by the system.

三 多路復用

在高性能的伺服器上,多採用多路復用技術,多路其實就是多個連接,復用就是復用此伺服器進程,那麼何在一起多路復用,就是用一個進程進行多個連接的處理。

對於伺服器來說,開放埠等待客戶端連接,開始多採用多進程或多線程編程的方式,即每個連接採用單獨的進程或線程進行處理,但是每台計算機因為內存等資源限制,可以開的進程或線程數有限,而且過多的線程會導致線程切換的成本過大,緩存失效等一系列問題,根本無法做到單機處理十萬、百萬連接。

如果採用非阻塞,在用戶進程裡面輪詢方式那?這樣會占用很高的cpu資源,所以後來發展出多路復用技術,即採用一個進程處理多個連接,一個引用怎麼處理多個連接那,不可能採用阻塞的方式,一旦阻塞在一個連接的IO上,其他連接有事件過來了也沒辦法處理,那只能輪詢查看各個連接上是否有可讀、可寫消息,從而達到多路復用的目的,linux內核提供select、poll、epoll三種多路復用機制。

3.1 select 機制實現多路復用

3.1.1 基本使用說明

/* According to POSIX.1-2001 */
#include <sys/select.h>

int select(int nfds, fd_set *readfds, fd_set *writefds,
  fd_set *exceptfds, struct timeval *timeout);

void fd_CLR(int fd, fd_set *set);      // 從fdset中刪除fd
int  FD_ISSET(int fd, fd_set *set);      // 判斷fd是否已存在fdset
void FD_SET(int fd, fd_set *set);      // 將fd添加到fdset
void FD_ZERO(fd_set *set);       // fdset所有位清0

1.nfds 表示監視的文件描述符中,待測的最大描述符+1. 2. readfds:監視有讀數據到達的文件描述符集合。 3. writefds:監視有寫數據到達的文件描述符集合。 4. exceptfds:監視有異常發生的文件描述符集合。 這三個集合每次都要傳入,每當要監視的事件發生時候,都會被複製出來。 5. timeout 設置為NULL,則select阻塞,直到事件發生;如果不為NULL,且值不為0,則等待固定時間,如果這個事件沒有監視事件來的話,也仍然會返回;如果不為NULL,且值為0,則不等待,立刻返回。

下面四個為宏,含義如後面的注釋,在linux內核的 中的實現如下(不同的版本實現稍微有差異):

#define __NFDBITS (8 * sizeof(unsigned long))                // 每個ulong型可以表示多少個bit,
#define __FD_SETSIZE 1024                                          // socket最大取值為1024
#define __FDSET_LONGS (__FD_SETSIZE/__NFDBITS)     // bitmap一共有1024個bit,共需要多少個ulong
 
#define __FD_ELT(d) ((d) / __NFDBITS)
#define __FD_MASK(d) ((__fd_mask) 1 << ((d) % __NFDBITS))
 
typedef struct {
    unsigned long fds_bits [__FDSET_LONGS];                 //用ulong數組來表示bitmap
} __kernel_fd_set;
 
typedef __kernel_fd_set   fd_set;

#define __FD_SET(d, set) \
  ((void) (__FDS_BITS (set)[__FD_ELT (d)] |= __FD_MASK (d)))
#define __FD_CLR(d, set) \
  ((void) (__FDS_BITS (set)[__FD_ELT (d)] &= ~__FD_MASK (d)))
#define __FD_ISSET(d, set) \
  ((__FDS_BITS (set)[__FD_ELT (d)] & __FD_MASK (d)) != 0)

fd_set是由unsigned long 的類型組構成的位圖, FD_SET 操作即找到哪個unsigned long的哪個位,通過((__fd_mask) 1 << ((d) % __NFDBITS)) 來定位具體的位信息,將那一位設置為1,取反即設置為0.

問題出在是FD_SET地方,即在__FD_ELT(d) ((d) / __NFDBITS) 如果d的值大於1024,那麼fds_bits 就越界了,就會破壞棧數據,從而導致返回異常。

簡化下我們的程序,寫的如下:

#伺服器端 
#include <errno.h>
#include <fcntl.h>
#include <netinet/in.h>
#include <stddef.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/un.h>
#include <unistd.h>

#define CLIENT_SIZE 100
#define SOCK_FILE "command.socket"
#define TOO_MANY "Too many client."

typedef struct unix_socket_infos_ {
  int socket;
  int select_max;
  struct sockaddr_un client_addr;
  int clients[CLIENT_SIZE];
} unix_socket_infos_t;

static int create_unix_socket(unix_socket_infos_t *this) {
  struct sockaddr_un addr;
  addr.sun_family = AF_UNIX;
  strncpy(addr.sun_path, SOCK_FILE, sizeof(addr.sun_path));
  addr.sun_path[sizeof(addr.sun_path) - 1] = 0;
  int len = strlen(addr.sun_path) + sizeof(addr.sun_family) + 1;

  int listen_socket = socket(AF_UNIX, SOCK_STREAM, 0);
  if (listen_socket == -1) {
    perror("create socket error.\n");
    return -1;
  }
  // fcntl (socket, F_SETFL,SOCK_NONBLOCK) ;
  int on = 1;
  /* set reuse option */
  int ret = setsockopt(listen_socket, SOL_SOCKET, SO_REUSEADDR, (char *)&on,
                       sizeof(on));
  unlink(SOCK_FILE);
  /* bind socket */
  ret = bind(listen_socket, (struct sockaddr *)&addr, len);
  if (ret == -1) {
    perror("bind error.\n");
    return -1;
  }
  printf("start to listen\n");
  ret = listen(listen_socket, 1);
  if (ret == -1) {
    perror("listen error\n");
    return -1;
  }
  ret = chmod(SOCK_FILE, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
  if (ret == -1) {
    perror("chmod error\n");
    return -1;
  }
  this->socket = listen_socket;
  this->select_max = listen_socket;
  return 1;
}

static int set_max(unix_socket_infos_t *this) {
  for (int i = 0; i < CLIENT_SIZE; i++) {
    if (this->clients[i] >= this->select_max) {
      this->select_max = this->clients[i];
    }
    if (this->clients[i] < 0) {
      break;
    }
  }
  fprintf(stderr, "max is:%d\n", this->select_max);
  return 0;
}

static int close_client(unix_socket_infos_t *this, int index) {
  int client = this->clients[index];
  close(client);
  this->clients[index] = -1;
  set_max(this);
}

static int deal_client(unix_socket_infos_t *this, int index) {
  char buffer[1024] = {0};
  int ret = recv(this->clients[index], buffer, sizeof(buffer) - 1, 0);
  if (ret <= 0) {
    if (ret == 0) {
      printf("lost connect.\n");
    } else {
      printf("recv error:%s \n", strerror(errno));
    }
    close_client(this, index);
    return -1;
  }
  if (ret < sizeof(buffer)-2) {
    buffer[ret] = '\n';
    buffer[ret+1] = 0;
  }
  fprintf(stderr, "client[%d]:%s", this->clients[index],buffer);
  ret = send(this->clients[index], buffer, strlen(buffer), MSG_NOSIGNAL);
  if (ret < 0) {
    perror("send error:");
  } else {
    fprintf(stderr, "server:%s", buffer);
  }
  return 1;
}

static int accept_client(unix_socket_infos_t *this) {
  socklen_t len = sizeof(this->client_addr);
  char buffer[1024] = {0};
  int client = accept(this->socket, (struct sockaddr *)&(this->client_addr), &len);
  printf("client to comming:%d\n", client);
  if (client < 0) {
    perror("accept error\n");
    return -1;
  }
  memset(buffer,0x0,1024);
  int ret = recv(client, buffer, sizeof(buffer) - 1, 0);
  if (ret < 0) {
    perror("recv error\n");
    return -1;
  }
  if (ret < sizeof(buffer)-2) {
    buffer[ret] = '\n';
    buffer[ret+1] = 0;
  }
  fprintf(stderr, "client[%d][first]:%s",client,buffer);
  ret = send(client, buffer, strlen(buffer), MSG_NOSIGNAL);
  if (ret < 0) {
    perror("send error\n");
  } else {
    fprintf(stderr, "server[first]:%s", buffer);
  }
  int is_set = 0;
  for (int i = 0; i < CLIENT_SIZE; i++) {
    if (this->clients[i] < 0) {
      this->clients[i] = client;
      is_set = 1;
      break;
    }
  }
  set_max(this);
  if (is_set == 0) {
    fputs(TOO_MANY, stdout);
    close(client);
    return -1;
  }
  return 1;
}

static int run_select(unix_socket_infos_t *this) {
  struct timeval tv;
  int ret;
  fd_set select_set;
  FD_ZERO(&select_set);
  FD_SET(this->socket, &select_set);

  for (int i = 0; i < CLIENT_SIZE; i++) {
    if (this->clients[i] > 0) {
      FD_SET(this->clients[i], &select_set);
    } else {
      break;
    }
  }

  tv.tv_sec = 0;
  tv.tv_usec = 200 * 1000;
  int select_max = this->select_max + 1;
  ret = select(select_max, &select_set, NULL, NULL, &tv);
  if (ret == -1) {
    if (errno == EINTR) {
      return 1;
    }
    return -1;
  }
  if (ret == 0) {
    return 1;
  }
  if (FD_ISSET(this->socket, &select_set)) {
    accept_client(this);
  }
  for (int i = 0; i < CLIENT_SIZE; i++) {
    if (this->clients[i] <= 0) {
      break;
    }
    if (FD_ISSET(this->clients[i], &select_set)) {
      deal_client(this, i);
    }
  }
  return 1;
}

int main(int argc, char **argv) {
  unix_socket_infos_t unix_socket_infos;
  for (int i = 0; i < CLIENT_SIZE; i++) {
    unix_socket_infos.clients[i] = -1;
  }
  int ret = create_unix_socket(&unix_socket_infos);
  printf("start to loop\n");
  while (1) {
    int run_ret = run_select(&unix_socket_infos);
    if (run_ret == -1) {
      break;
    }
  }
  return 0;
}

客戶端連接代碼:

#include <errno.h>
#include <fcntl.h>
#include <netinet/in.h>
#include <stddef.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/un.h>
#include <unistd.h>
#include <string.h>

#define SOCK_FILE "command.socket"

int main(int argc, char **argv) {
  struct sockaddr_un un;
  int sock_fd;
  char buffer[1024] = "hello unix socket server";
  char recv_buffer[1024];

  un.sun_family = AF_UNIX;
  strcpy(un.sun_path, SOCK_FILE);
  sock_fd = socket(AF_UNIX, SOCK_STREAM, 0);
  if (sock_fd < 0) {
    perror("socket error.\n");
    return -1;
  }
  if (connect(sock_fd, (struct sockaddr *)&un, sizeof(un)) < 0) {
    perror("connect error.\n");
    return -1;
  }
  while (1) {
    memset(recv_buffer,0,1024);
    memset(buffer,0,1024);
    fprintf(stderr,"\nmy[%d]:",sock_fd);
    fgets(buffer,sizeof(buffer)-1,stdin);
    if (strncmp(buffer, "quit", 4) == 0) {
      break;
    }
    int ret = send(sock_fd, buffer, strlen(buffer) - 1, 0);
    if (ret == -1) {
      perror("send error.\n");
    } else {
      ret = recv(sock_fd, recv_buffer, sizeof(recv_buffer) - 1, 0);
      if (ret <= 0) {
        perror("recv error.\n");
      }
      recv_buffer[ret - 1] = 0;
      fprintf(stderr,"server:%s",recv_buffer);
    }
  }

  close(sock_fd);
  return 0;
}

練習代碼,寫的比較挫,客戶端通過unix socket 連接到伺服器,然後接收用戶輸入發送給伺服器,伺服器回送消息,直到用戶輸入quit退出。 示意下效果:

root@ubuntu-lab:/home/miao/c-test/select# ./a.out 
start to listen
start to loop
client to comming:4
client[4][first]:123
server[first]:123
max is:4
client[4]:456
server:456
client[4]:abc
server:abc
lost connect.
max is:4
.....

客戶端顯示:

root@ubuntu-lab:/home/miao/c-test/select# ./client 

my[3]:123
server:123
my[3]:456
server:456
my[3]:abc
server:abc
my[3]:quit

3.1.2 core模擬

在main的開始位置加上如下的代碼:

  int files[1800] = {0};
  char fileName[256] = {0};
  for (int i = 0; i < 1800; i++) {
    memset(fileName, 0x0, sizeof(fileName));
    sprintf(fileName, "test_%d", i);
    files[i] = open(fileName, O_RDWR | O_CREAT);
    if (files[i] < 0) {
      close(files[i]);
    }
  }

會發現,程序會自動退出或core,偶爾也有成功的情況,還有的情況是發送到的命令沒回復,也就是沒監聽起來。

3.2 select 缺點

雖然select也支持了IO多路復用,但是存在以下問題:

  1. 每次select返回後,監視的集合需要重新設置,比較麻煩。
  2. 限制1024個連接,如果想在應用上突破連接,採用malloc等動態申請內存方式也是可以,但是最好採用poll或epoll。
  3. 每次都要將監視的文件描述符複製到內核空間,有事件的發生的時候,需要再從內核空間複製到用戶空間,比較占用cpu資源, 幾種機制的性能比較如下

未完待續.....

關鍵字: