多多色-多人伦交性欧美在线观看-多人伦精品一区二区三区视频-多色视频-免费黄色视屏网站-免费黄色在线

國內最全IT社區平臺 聯系我們 | 收藏本站
阿里云優惠2
您當前位置:首頁 > php框架 > 框架設計 > redis源碼分析(2)――事件循環

redis源碼分析(2)――事件循環

來源:程序員人生   發布時間:2015-01-23 08:24:36 閱讀次數:4273次
  redis作為服務器程序,網絡IO處理是關鍵。redis不像memcached使用libevent,它實現了自己的IO事件框架,并且很簡單、小巧。可以選擇select、epoll、kqueue等實現。
    作為 IO事件框架,需要抽象多種IO模型的共性,將全部進程主要抽象為:
           1)初始化
         2)添加、刪除事件
           3)等待事件產生
      下面也依照這個步驟分析代碼。

(1)初始化

回想1下redis的初始化進程中,initServer函數會調用aeCreateEventLoop創建event loop對象,對事件循環進行初始化。下面看1下aeEventLoop結構,存儲事件循環相干的屬性。
typedef struct aeEventLoop { int maxfd; /* highest file descriptor currently registered */ int setsize; /* max number of file descriptors tracked */ long long timeEventNextId; // <MM> // 寄存的是上次觸發定時器事件的時間 // </MM> time_t lastTime; /* Used to detect system clock skew */ aeFileEvent *events; /* Registered events */ aeFiredEvent *fired; /* Fired events */ // <MM> // 所有定時器事件組織成鏈表 // </MM> aeTimeEvent *timeEventHead; // <MM> // 是不是停止eventLoop // </MM> int stop; void *apidata; /* This is used for polling API specific data */ // <MM> // 事件循環每次迭代都會調用beforesleep // </MM> aeBeforeSleepProc *beforesleep; } aeEventLoop;
setsize:指定事件循環要監聽的文件描寫符集合的大小。這個值與配置文件中得maxclients有關。
      events:寄存所有注冊的讀寫事件,是大小為setsize的數組。內核會保證新建連接的fd是當前可用描寫符的最小值,所以最多監聽setsize個描寫符,那末最大的fd就是setsize - 1。這類組織方式的好處是,可以以fd為下標,索引到對應的事件,在事件觸發后根據fd快速查找到對應的事件。
    fired:寄存觸發的讀寫事件。一樣是setsize大小的數組。
      timeEventHead:redis將定時器事件組織成鏈表,這個屬性指向表頭。
      apidata:寄存epoll、select等實現相干的數據。
      beforesleep:事件循環在每次迭代前會調用beforesleep履行1些異步處理。

io模型初始化的抽象函數為aeApiCreate。aeCreateEventLoop函數創建并初始化全局事件循環結構,并調用aeApiCreate初始化具體實現依賴的數據結構。
aeEventLoop *aeCreateEventLoop(int setsize) { aeEventLoop *eventLoop; int i; // <MM> // setsize指定事件循環監聽的fd的數目 // 由于內核保證新創建的fd是最小的正整數,所以直接創建setsize大小 // 的數組,寄存對應的event // </MM> if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err; eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize); eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize); if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err; eventLoop->setsize = setsize; eventLoop->lastTime = time(NULL); eventLoop->timeEventHead = NULL; eventLoop->timeEventNextId = 0; eventLoop->stop = 0; eventLoop->maxfd = ⑴; eventLoop->beforesleep = NULL; if (aeApiCreate(eventLoop) == ⑴) goto err; /* Events with mask == AE_NONE are not set. So let's initialize the * vector with it. */ for (i = 0; i < setsize; i++) eventLoop->events[i].mask = AE_NONE; return eventLoop; err: if (eventLoop) { zfree(eventLoop->events); zfree(eventLoop->fired); zfree(eventLoop); } return NULL; }
以epoll為例,aeApiCreate主要是創建epoll的fd,和要監聽的epoll_event,這些數據定義在:
typedef struct aeApiState { int epfd; struct epoll_event *events; } aeApiState;
這里,監聽到的事件組織方式與event_loop中監聽事件1樣,一樣是setsize大小的數據,以fd為下標。
aeApiCreate會初始化這些屬性,并將aeApiState結構寄存到eventLoop->apidata。
static int aeApiCreate(aeEventLoop *eventLoop) { aeApiState *state = zmalloc(sizeof(aeApiState)); if (!state) return ⑴; state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize); if (!state->events) { zfree(state); return ⑴; } state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */ if (state->epfd == ⑴) { zfree(state->events); zfree(state); return ⑴; } eventLoop->apidata = state; return 0; }

(2)添加、刪除事件

redis支持兩類事件,網絡io事件和定時器事件。定時器事件的添加、刪除相對簡單些,主要是保護定時器事件列表。首先看1下表示定時器事件的結構:
/* Time event structure */ typedef struct aeTimeEvent { long long id; /* time event identifier. */ long when_sec; /* seconds */ long when_ms; /* milliseconds */ aeTimeProc *timeProc; aeEventFinalizerProc *finalizerProc; void *clientData; struct aeTimeEvent *next; } aeTimeEvent;
when_sec和when_ms:表示定時器觸發的事件戳,在事件循環迭代返回后,如果當前時間戳大于這個值就會回調事件處理函數。
      timeProc:事件處理函數。
      finalizerProc:清算函數,在刪除定時器時調用。
      clientData:需要傳入事件處理函數的參數。
      next:定時器事件組織成鏈表,next指向下1個事件。

aeCreateTimeEvent函數用于添加定時器事件,邏輯很簡單,根據當前時間計算下1次觸發的事件,對事件屬性賦值,并插入到定時器鏈表表頭之前。刪除通過aeDeleteTimeEvent函數,根據id找到事件并從鏈表刪除該節點,回調清算函數。具體定時器事件的處理見后文,下面看1下io事件。
io事件的添加通過aeCreateFileEvent,邏輯很簡單,根據要注冊的fd,獲得其event,設置屬性,會調用aeApiAddEvent函數添加到底層的io模型。
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData) { if (fd >= eventLoop->setsize) { errno = ERANGE; return AE_ERR; } aeFileEvent *fe = &eventLoop->events[fd]; if (aeApiAddEvent(eventLoop, fd, mask) == ⑴) return AE_ERR; fe->mask |= mask; if (mask & AE_READABLE) fe->rfileProc = proc; if (mask & AE_WRITABLE) fe->wfileProc = proc; fe->clientData = clientData; if (fd > eventLoop->maxfd) eventLoop->maxfd = fd; return AE_OK; }
mask:指定注冊的事件類型,可以是讀或寫。
      proc:事件處理函數。

下面是io事件的結構,包括注冊的事件類型mask,讀寫事件處理函數,和對應的參數。
/* File event structure */ typedef struct aeFileEvent { int mask; /* one of AE_(READABLE|WRITABLE) */ aeFileProc *rfileProc; aeFileProc *wfileProc; void *clientData; } aeFileEvent;
下面看1下epoll添加事件的實現,主要是調用epoll_ctl
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) { aeApiState *state = eventLoop->apidata; struct epoll_event ee; /* If the fd was already monitored for some event, we need a MOD * operation. Otherwise we need an ADD operation. */ int op = eventLoop->events[fd].mask == AE_NONE ? EPOLL_CTL_ADD : EPOLL_CTL_MOD; ee.events = 0; mask |= eventLoop->events[fd].mask; /* Merge old events */ if (mask & AE_READABLE) ee.events |= EPOLLIN; if (mask & AE_WRITABLE) ee.events |= EPOLLOUT; ee.data.u64 = 0; /* avoid valgrind warning */ ee.data.fd = fd; if (epoll_ctl(state->epfd,op,fd,&ee) == ⑴) return ⑴; return 0; }
struct epll_event用于指定要監聽的事件,和該文件描寫符綁定的data,在事件觸發時可以返回。這里將data直接存為fd,通過這個數據,即可以找到對應的事件,然后調用其處理函數。
      epoll的刪除與添加類似,不再贅述。

(3)等待事件觸發

通過調用aeMain函數進入事件循環:
void aeMain(aeEventLoop *eventLoop) { eventLoop->stop = 0; while (!eventLoop->stop) { if (eventLoop->beforesleep != NULL) eventLoop->beforesleep(eventLoop); aeProcessEvents(eventLoop, AE_ALL_EVENTS); } }
函數內部就是1個while循環,不斷的調用aeProcessEvents函數,等待事件產生。在每次迭代前會調用會調用beforesleep函數,處理異步任務,后續會和serverCron1起介紹。
      aeProcessEvents函數首先會處理定時器事件,然后是io事件,下面介紹這個函數的實現。
      首先,聲明變量記錄處理的事件個數,和觸發的事件。flags表示此輪需要處理的事件類型,如果不需要處理定時器事件和io事件直接返回。
int processed = 0, numevents; /* Nothing to do? return ASAP */ if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
redis中的定時器事件是通過epoll實現的。大體思路是,在每次事件迭代調用epoll_wait時需要指定此輪sleep的時間。如果沒有io事件產生,則在sleep時間到了以后會返回。通過算出下1次最早產生的事件,到當前時間的間隔,用這個值設為sleep,這樣就能夠保證在事件到達后回調其處理函數。但是,由于每次返回后,還有處理io事件,所以定時器的觸發事件是不精確的,1定是比預定的觸發時間晚的。下面看下具體實現。
      首先是,查找下1次最早產生的定時器事件,以肯定sleep的事件。如果沒有定時器事件,則根據傳入的flags,選擇是1直阻塞指點io事件產生,或是不阻塞,檢查完立即返回。通過調用aeSearchNearestTimer函數查找最早產生的事件,采取的是線性查找的方式,復雜度是O(n),可以將定時器事件組織成堆,加快查找。不過,redis中只有1個serverCron定時器事件,所以暫時不需優化。
/* Note that we want call select() even if there are no * file events to process as long as we want to process time * events, in order to sleep until the next time event is ready * to fire. */ // <MM> // 在兩種情況下進入poll,阻塞等待事件產生: // 1)在有需要監聽的描寫符時(maxfd != ⑴) // 2)需要處理定時器事件,并且DONT_WAIT開關關閉的情況下 // </MM> if (eventLoop->maxfd != ⑴ || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) { int j; aeTimeEvent *shortest = NULL; struct timeval tv, *tvp; // <MM> // 根據最快產生的定時器事件的產生時間,肯定此次poll阻塞的時間 // </MM> if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT)) // <MM> // 線性查找最快產生的定時器事件 // </MM> shortest = aeSearchNearestTimer(eventLoop); if (shortest) { // <MM> // 如果有定時器事件,則根據它觸發的時間,計算sleep的時間(ms單位) // </MM> long now_sec, now_ms; /* Calculate the time missing for the nearest * timer to fire. */ aeGetTime(&now_sec, &now_ms); tvp = &tv; tvp->tv_sec = shortest->when_sec - now_sec; if (shortest->when_ms < now_ms) { tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000; tvp->tv_sec --; } else { tvp->tv_usec = (shortest->when_ms - now_ms)*1000; } if (tvp->tv_sec < 0) tvp->tv_sec = 0; if (tvp->tv_usec < 0) tvp->tv_usec = 0; } else { // <MM> // 如果沒有定時器事件,則根據情況是立即返回,或永久阻塞 // </MM> /* If we have to check for events but need to return * ASAP because of AE_DONT_WAIT we need to set the timeout * to zero */ if (flags & AE_DONT_WAIT) { tv.tv_sec = tv.tv_usec = 0; tvp = &tv; } else { /* Otherwise we can block */ tvp = NULL; /* wait forever */ } }
接著,調用aeApiPoll函數,傳入前面計算的sleep時間,等待io事件放生。在函數返回后,觸發的事件已填充到eventLoop的fired數組中。epoll的實現以下,就是調用epoll_wait,函數返回后,會將觸發的事件寄存到state->events數組中的前numevents個元素。接下來,填充fired數組,設置每一個觸發事件的fd,和事件類型。
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) { aeApiState *state = eventLoop->apidata; int retval, numevents = 0; // <MM> // 調用epoll_wait,state->events寄存返回的產生事件的fd // </MM> retval = epoll_wait(state->epfd,state->events,eventLoop->setsize, tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : ⑴); if (retval > 0) { int j; numevents = retval; // <MM> // 有事件產生,將產生的事件寄存于fired數組 // </MM> for (j = 0; j < numevents; j++) { int mask = 0; struct epoll_event *e = state->events+j; if (e->events & EPOLLIN) mask |= AE_READABLE; if (e->events & EPOLLOUT) mask |= AE_WRITABLE; if (e->events & EPOLLERR) mask |= AE_WRITABLE; if (e->events & EPOLLHUP) mask |= AE_WRITABLE; eventLoop->fired[j].fd = e->data.fd; eventLoop->fired[j].mask = mask; } } return numevents; }
在事件返回后,需要處理事件。遍歷fired數組,獲得fd對應的事件,并根據觸發的事件類型,回調其處理函數。
for (j = 0; j < numevents; j++) { // <MM> // poll返回后,會將所有觸發的時間寄存于fired數組 // </MM> aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd]; int mask = eventLoop->fired[j].mask; int fd = eventLoop->fired[j].fd; int rfired = 0; /* note the fe->mask & mask & ... code: maybe an already processed * event removed an element that fired and we still didn't * processed, so we check if the event is still valid. */ // <MM> // 回調產生事件的fd,注冊的事件處理函數 // </MM> if (fe->mask & mask & AE_READABLE) { rfired = 1; fe->rfileProc(eventLoop,fd,fe->clientData,mask); } if (fe->mask & mask & AE_WRITABLE) { if (!rfired || fe->wfileProc != fe->rfileProc) fe->wfileProc(eventLoop,fd,fe->clientData,mask); } processed++; }
以上便是,io事件的處理,下面看1下定時器事件的處理。會調用processTimeEvents函數處理定時器事件。
      首先會校驗是不是產生系統時鐘偏差(system clock skew,修改系統事件會產生?把事件調到過去),如果產生就將所有事件的產生時間置為0,立即觸發。
/* If the system clock is moved to the future, and then set back to the * right value, time events may be delayed in a random way. Often this * means that scheduled operations will not be performed soon enough. * * Here we try to detect system clock skews, and force all the time * events to be processed ASAP when this happens: the idea is that * processing events earlier is less dangerous than delaying them * indefinitely, and practice suggests it is. */ if (now < eventLoop->lastTime) { te = eventLoop->timeEventHead; while(te) { te->when_sec = 0; te = te->next; } } eventLoop->lastTime = now;
接下來遍歷所有定時器事件,查找觸發的事件,然后回調解理函數。定時器事件處理函數的返回值,決定這個事件是1次性的,還是周期性的。如果返回AE_NOMORE,則是1次性事件,在調用完后會刪除該事件。否則的話,返回值指定的是下1次觸發的時間。
te = eventLoop->timeEventHead; maxId = eventLoop->timeEventNextId⑴; while(te) { long now_sec, now_ms; long long id; if (te->id > maxId) { te = te->next; continue; } aeGetTime(&now_sec, &now_ms); if (now_sec > te->when_sec || (now_sec == te->when_sec && now_ms >= te->when_ms)) { // <MM> // 定時器事件的觸發時間已過,則回調注冊的事件處理函數 // </MM> int retval; id = te->id; retval = te->timeProc(eventLoop, id, te->clientData); processed++; /* After an event is processed our time event list may * no longer be the same, so we restart from head. * Still we make sure to don't process events registered * by event handlers itself in order to don't loop forever. * To do so we saved the max ID we want to handle. * * FUTURE OPTIMIZATIONS: * Note that this is NOT great algorithmically. Redis uses * a single time event so it's not a problem but the right * way to do this is to add the new elements on head, and * to flag deleted elements in a special way for later * deletion (putting references to the nodes to delete into * another linked list). */ // <MM> // 根據定時器事件處理函數的返回值,決定是不是將該定時器刪除。 // 如果retval不等于⑴(AE_NOMORE),則更改定時器的觸發時間為 // now + retval(ms) // </MM> if (retval != AE_NOMORE) { aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms); } else { // <MM> // 如果返回AE_NOMORE,則刪除該定時器 // </MM> aeDeleteTimeEvent(eventLoop, id); } te = eventLoop->timeEventHead; } else { te = te->next; } }
在回調解理函數時,有可能會添加新的定時器事件,如果不斷加入,存在無窮循環的風險,所以需要避免這類情況,每次循環不處理新添加的事件,這是通過下面的代碼實現的。
if (te->id > maxId) { te = te->next; continue; }

事件循環部份分析到此結束,感覺比較直觀、清晰,完全可以抽出來,作為1個獨立的庫使用。下面1節,會介紹要求的處理。



生活不易,碼農辛苦
如果您覺得本網站對您的學習有所幫助,可以手機掃描二維碼進行捐贈
程序員人生
------分隔線----------------------------
分享到:
------分隔線----------------------------
關閉
程序員人生
主站蜘蛛池模板: 国产成人毛片 | 久久精品综合国产二区 | avtt中文字幕| 国产成人精品久久 | 欧美激情一区二区亚洲专区 | 一区二区三区在线 | 日本 | 国产精品反差婊在线观看 | 开操网 | 欧美一级片网 | 尤物在线看 | 色阁在线 | vvvv99日韩精品亚洲 | 欧美性播放 | 三级做爰大爽长视频在线观看 | 中文天堂最新版在线精品 | 日本与大黑人xxxx | 久久国产成人精品国产成人亚洲 | 国产精品麻豆高清在线观看 | 久久亚洲精选 | 亚洲www视频 | 最新亚洲一区二区三区四区 | 欧美一级aⅴ毛片 | 日韩一级欧美一级一级国产 | 欧美日韩三区 | 亚洲观看视频 | www.国产精品视频 | 欧美自拍视频在线 | 欧美午夜视频在线 | 亚洲美女视频 | 国产欧美日韩综合一区二区三区 | xxx日韩| jizz在线观看18 | 国产欧美日韩精品一区二区三区 | 久久精品中文字幕不卡一二区 | 亚洲女人天堂a在线播放 | 亚洲经典一区二区三区 | 国产欧美日韩另类一区乌克兰 | 一本综合久久国产二区 | 中文天堂在线视频 | 欧美在线一区二区三区不卡 | 亚洲天堂一区二区三区 |