Redis 技術專題系列之網絡架構和線程模型

互聯網技術學堂 發佈 2024-03-14T11:26:28.620464+00:00

它採用了 I/O 多路復用技術,利用單個線程處理所有客戶端請求和網絡 I/O 操作。同時,在 ae.c 文件中,還有以下關於多路復用模型的代碼:aeMain 函數是 Redis 事件循環的核心,其中的 while 循環不斷處理事件,如果事件循環被設置為停止狀態,那麼就跳出循環。

引言

Redis 是一個高性能、內存存儲的鍵值資料庫。它支持豐富的數據結構,如字符串、哈希、列表、集合和有序集合。Redis 的性能得益於其網絡架構和線程模型。

本文將介紹 Redis 的網絡架構和線程模型,希望能夠幫助讀者更好地理解 Redis 的性能優化和運維管理。

大家好,這裡是網際網路技術學堂,留下你的點讚、關注、分享,支持一下吧,謝謝。

網絡架構

Redis 的網絡架構是基於單線程的事件循環模型。它採用了 I/O 多路復用技術,利用單個線程處理所有客戶端請求和網絡 I/O 操作。

網絡編程離不開 Socket,網絡 I/O 模型常用的無非是同步阻塞、同步非阻塞、異步阻塞、異步非阻塞,高性能網絡伺服器常見的線程模型也就是基於 EventLoop 模式的單線程模型。

當客戶端連接到 Redis 伺服器時,伺服器會為每個客戶端分配一個文件描述符,並將其加入到事件循環機制中。當有數據可讀或可寫時,Redis 會觸發相應的事件,單線程通過事件循環機制調度事件處理函數來處理客戶端請求和網絡 I/O 操作。

這種單線程的事件循環模型帶來了以下優點:

  1. 簡單:不需要線程切換和同步機制,減少了鎖競爭和上下文切換的開銷。
  2. 可擴展性:通過多個 Redis 實例和數據分片來實現橫向擴展。
  3. 高可用性:通過主從複製和哨兵機制來實現高可用性。

線程模型

Redis 採用的是單線程的事件循環模型,但它並不是單進程的。Redis 可以通過配置文件中的 daemonize 選項將自己變成守護進程,同時可以通過 fork 函數來創建子進程來處理持久化操作和複製操作。

子進程主要用於持久化和複製操作,Redis 採用了寫時複製(Copy-On-Write,簡稱 COW)技術來優化子進程的性能。當子進程需要進行寫操作時,Redis 會將需要修改的數據複製一份,修改完成後再將修改後的數據替換原來的數據。

這種寫時複製技術帶來了以下優點:

  1. 性能:減少了內存複製的開銷,提高了寫操作的性能。
  2. 數據:避免了在多個進程之間共享同一塊內存帶來的數據問題。

總體來說,Redis 的線程模型和網絡架構都是非常不錯的,可以幫助 Redis 實現高性能、高可用和可擴展的特性。因此,掌握 Redis 的網絡架構和線程模型是 Redis 開發和運維的重要知識點,也是高頻面試題的考點之一。

源碼層面分析網絡架構

Redis 的網絡架構主要包含以下兩個部分:

1、事件驅動模型

Redis 使用 epoll 等事件驅動模型來處理客戶端的連接、讀寫操作。具體實現代碼在 ae.c 文件中。

首先,Redis 會創建一個事件循環機制,不斷地監聽和處理事件。在初始化 Redis 伺服器時,通過 aeCreateEventLoop() 函數創建一個事件循環機制。

aeEventLoop *aeCreateEventLoop(int setsize) {
aeEventLoop *eventLoop;
int i;
if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
eventLoop->setsize = setsize;
eventLoop->lastTime = time(NULL);
eventLoop->timeEventHead = NULL;
eventLoop->timeEventNextId = 0;
eventLoop->stop = 0;
eventLoop->maxfd = -1;
eventLoop->beforesleep = NULL;
if (aeApiCreate(eventLoop) == -1) goto err;
for (i = 0; i < setsize; i++)
eventLoop->events[i].mask = AE_NONE;
return eventLoop;
err:
if (eventLoop) {
zfree(eventLoop->events);
zfree(eventLoop->fired);
zfree(eventLoop);
}
return NULL;
}

然後,Redis 會將客戶端的連接事件和讀寫事件加入到事件循環機制中。具體實現代碼在 networking.c 文件中。

static void acceptCommonHandler(int fd, int flags, char *ip) {
int cport, cfd, max = server.maxclients;
char cip[NET_IP_STR_LEN];
listNode *ln;
client *c;
if ((cfd = anetTcpAccept(server.neterr, fd, cip, &cport)) == ANET_ERR) {
if (errno != EWOULDBLOCK) {
serverLog(LL_WARNING, "Accepting client connection: %s", server.neterr);
}
return;
}
anetNonBlock(NULL,cfd);
anetEnableTcpNoDelay(NULL,cfd);
if (server.tcpkeepalive)
anetKeepAlive(NULL, cfd, server.tcpkeepalive);
if (max && listLength(server.clients) >= max) {
serverLog(LL_VERBOSE,"WARNING: maximum number of clients reached");
close(cfd);
return;
}
ln = listAddNodeTail(server.clients,NULL);
c = zmalloc(sizeof(*c));
c->connfd = cfd;
c->firsttime = time(NULL);
c->lastinteraction = c->lastping = c->lastoutput = server.unixtime;
c->authenticated = 0;
c->fd = cfd;
c->name = NULL;
c->querybuf = sdsempty();
c->reqtype = 0;
c->argc = 0

在上面的代碼中,anetTcpAccept() 函數用於監聽客戶端的連接事件,並返回連接客戶端的文件描述符。如果有客戶端連接,就會執行 listAddNodeTail() 函數將客戶端添加到 server.clients 鍊表中,並為客戶端分配一個 client 結構體對象。在 client 結構體對象中,保存了客戶端的連接描述符、讀寫緩衝區、近一次交互時間等信息。

然後,Redis 會將客戶端的連接描述符和讀寫事件加入到事件循環機制中,使得 Redis 能夠及時地處理客戶端的請求。具體實現代碼在 ae.c 文件中。

int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData)
{
if (fd >= eventLoop->setsize) {
errno = ERANGE;
return AE_ERR;
}
aeFileEvent *fe = &eventLoop->events[fd];
if (aeApiAddEvent(eventLoop, fd, mask) == -1)
return AE_ERR;
fe->mask |= mask;
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
fe->clientData = clientData;
if (fd > eventLoop->maxfd)
eventLoop->maxfd = fd;
return AE_OK;
}

在上面的代碼中,aeCreateFileEvent() 函數用於將文件描述符和事件類型加入到事件循環機制中,實現了對客戶端的讀寫事件監聽。

2、多路復用模型

Redis 的多路復用模型主要使用了 epoll 函數實現,可以同時監聽多個客戶端的讀寫事件。具體實現代碼在 ae_epoll.c 文件中。

static int aeApiCreate(aeEventLoop *eventLoop) {
aeApiState *state = zmalloc(sizeof(aeApiState));
if (!state) return -1;
state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
if (!state->events) {
zfree(state);
return -1;
}
state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
if (state->epfd == -1) {
zfree(state->events);
zfree(state);
return -1;
}
eventLoop->apidata = state;
return 0;
}
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee = {0}; /* avoid valgrind warning */
int op = eventLoop->events[fd].mask == AE_NONE ?
EPOLL_CTL_ADD : EPOLL_CTL_MOD;
ee.events = 0;
mask |= eventLoop->events[fd].mask; /* Merge old events */
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
ee.data.fd = fd;
if (epoll_ctl(state->epfd,op,

在上面的代碼中,aeApiCreate() 函數用於創建 epoll 實例,同時初始化事件數組和事件循環機制。aeApiAddEvent() 函數用於將文件描述符和事件類型加入到 epoll 實例中,實現了對客戶端的讀寫事件監聽。

同時,在 ae.c 文件中,還有以下關於多路復用模型的代碼:

/* Process events in the main loop */
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
}
}
/* Wait for milliseconds for events to happen.
* If milliseconds is -1, wait indefinitely.
* Returns the number of events processed. */
int aeWait(int fd, int mask, long long milliseconds) {
struct epoll_event events[1];
int retval, numevents = 0;
if (milliseconds != -1) {
milliseconds /= 1000;
milliseconds++; /* rounding up */
}
retval = epoll_wait(fd, events, 1, milliseconds);
if (retval > 0) {
if (events[0].events & EPOLLIN) numevents |= AE_READABLE;
if (events[0].events & EPOLLOUT) numevents |= AE_WRITABLE;
if (events[0].events & EPOLLERR) numevents |= AE_WRITABLE;
if (events[0].events & EPOLLHUP) numevents |= AE_WRITABLE;
return numevents;
} else {
return retval;
}
}
/* Process every pending time event, then every pending file event
* (that may be registered by time event callbacks just processed).
* Without special flags the function sleeps until some file event
* fires, or when the next time event occurs (if any).
* If flags is 0, the function does nothing and returns.
* if flags has AE_ALL_EVENTS set, all the kind of events are processed.
* if flags has AE_FILE_EVENTS set, file events are processed.
* if flags has AE_TIME_EVENTS set, time events are processed.
* if flags has AE_DONT_WAIT set the function returns ASAP until all
* the events that's possible to process without to wait are processed. */
void aeProcessEvents(aeEventLoop *eventLoop, int flags) {
int processed = 0, numevents;
/* Nothing to do, return ASAP */
if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return;
/* Note that we want to sleep if we have no events to process. */
if (eventLoop->maxfd != -1 ||
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
int j;
aeTimeEvent *shortest = NULL;
struct timeval tv, *tvp;
if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
shortest = aeSearchNearestTimer(eventLoop);
if (shortest) {
long now_sec, now_ms;
aeGetTime(&now_sec, &now_ms);
tvp = &tv;
tvp->tv_sec = shortest->when_sec - now_sec;
if (shortest->when_ms < now_ms) {
tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;
tv } else {
tvp->tv_usec = (shortest->when_ms - now_ms)*1000;
}
if (tvp->tv_sec < 0) tvp->tv_sec = 0;
if (tvp->tv_usec < 0) tvp->tv_usec = 0;
} else {
/* If we have to check for events but need to return
* ASAP because of AE_DONT_WAIT we need to set the timeout
* to zero */
if (flags & AE_DONT_WAIT) {
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
} else {
/* Otherwise we can block */
tvp = NULL; /* wait forever */
}
}
numevents = aeApiPoll(eventLoop, tvp);
for (j = 0; j < numevents; j++) {
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int fired = 0; /* Number of events fired for current fd. */
/* Note the fe->mask & mask & ... code: maybe an already
* processed event removed an element that fired and we
* still didn't processed, so we check if the event is still
* valid. */
if (fe->mask & mask & AE_READABLE) {
fe->rfileProc(eventLoop, fd, fe->clientData, mask);
fired++;
}
if (fe->mask & mask & AE_WRITABLE) {
fe->wfileProc(eventLoop, fd, fe->clientData, mask);
fired++;
}
if (fe->mask & mask & AE_ERROR) {
fe->efileProc(eventLoop, fd, fe->clientData, mask);
fired++;
}
if (fe->mask & mask & AE_HUP) {
fe->hupfileProc(eventLoop, fd, fe->clientData, mask);
fired++;
}
processed++;
}
}

`aeMain()` 函數是 Redis 事件循環的核心,其中的 `while` 循環不斷處理事件,如果事件循環被設置為停止狀態,那麼就跳出循環。

`aeWait()` 函數主要用於等待事件的發生,如果等待的時間為 -1,表示等待無限長的時間,否則就等待指定的時間後返回。

`aeProcessEvents()` 函數用於處理文件事件和時間事件,其中 `aeSearchNearestTimer()` 函數用於尋找近的時間事件,並將其加入到時間事件的鍊表中,然後計算出時間事件距離當前時間的時間差,用於設置 epoll_wait() 函數的超時時間。

在 `ae.c` 文件中,還有以下與 Redis 事件循環相關的代碼:

/* Create a new event loop */
aeEventLoop *aeCreateEventLoop(int setsize) {
aeEventLoop *eventLoop;
int i;
if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
if (eventLoop->events ==)

總結

在 Redis 的事件循環中,主要採用了多路復用技術來實現,即使用 epoll 等系統調用來監控文件描述符,以及使用時間事件來處理一些需要在指定時間後執行的任務。

Redis 的事件循環採用單線程的方式實現,但是在處理文件事件時,會使用 I/O 多路復用技術來提高性能,同時還會使用時間事件來實現一些定時任務。

在 Redis 的事件循環中,時間事件和文件事件是分別處理的,時間事件使用鍊表來保存,文件事件使用 epoll 或 select 等系統調用來實現。在每次循環中,Redis 事件循環都會處理所有已就緒的文件事件,並且會尋找並處理近的時間事件,從而實現了良好的事件處理和定時任務的執行。

了解 Redis 事件循環的實現原理,有助於我們更好地理解 Redis 的工作原理,並且能夠更好地調試和優化 Redis 伺服器,也是面試中常被問及的高頻知識點。

關鍵字: