linux下有兩種aio,1種是glibc實(shí)現(xiàn)的aio,這個(gè)比較爛,它是直接在用戶空間用pthread進(jìn)行摹擬的。還有1種就是內(nèi)核實(shí)現(xiàn)的aio,這些系統(tǒng)調(diào)用是以io_xxx開始的。下面將針對(duì) 同步和異步模型,和阻塞和非阻塞的模型進(jìn)行介紹。而native aio的優(yōu)點(diǎn)就是能夠同時(shí)提交多個(gè)io要求給內(nèi)核,然后直接由內(nèi)核的io調(diào)度算法去處理這些要求(direct io),這樣的話,內(nèi)核就有可能履行1些合并,優(yōu)化。native aio包括下面幾個(gè)系統(tǒng)調(diào)用:
io_setup(2)
io_cancle(2)
io_destroy(2)
io_getevents(2)
io_submit(2)
要使用他們必須安裝libaio這個(gè)庫(kù),這個(gè)庫(kù)也就是簡(jiǎn)單的封裝了上面的幾個(gè)系統(tǒng)調(diào)用,而nginx中沒(méi)有使用libaio這個(gè)庫(kù),而是直接使用syscall來(lái)調(diào)用系統(tǒng)調(diào)用,這是由于Linux內(nèi)核實(shí)現(xiàn)提供的native AIO機(jī)制,要使用這1套機(jī)制,可以利用libaio庫(kù),也能夠手動(dòng)利用syscall做1層自己的封裝,不過(guò)這并沒(méi)有大礙,libaio庫(kù)本身也很簡(jiǎn)單。
io_setup用于建立1個(gè)aio的環(huán)境,io_cancel用于刪除1個(gè)提交 句柄的任務(wù),io_getevents用于任務(wù)履行 終了以后的事情信息。io_submit用于提交任務(wù)
nginx是這樣處理的,利用了系統(tǒng)調(diào)用eventfd,用eventfd建立1個(gè)句柄,然后將這個(gè)句柄加入epoll監(jiān)聽,然后在io_submit提交任務(wù)的時(shí)候,將aio_flags設(shè)置為IOCB_FLAG_RESFD,并且aio_resfd設(shè)置為eventfd建立的那個(gè)句柄,這樣io要求完成后,會(huì)向aio_resfd中寫入完成要求數(shù)量,然后此時(shí)epoll就接到可讀通知,從而進(jìn)行后續(xù)操作。
在這個(gè)函數(shù)中建立了aio環(huán)境
// 文件 ngx_epoll_module.c
// 下面是文件異步I/o機(jī)制中定義的全局變量
// 用于通知異步I/O事件的描寫符
int ngx_eventfd=-1;
//異步I/O的上下文,全局唯1,必須經(jīng)過(guò)io_setup初始化才可使用
aio_context_t ngx_aio_ctx = 0;
/*異步I/O事件完成落后行通知的描寫符,也就是ngx_eventfd所對(duì)應(yīng) 的ngx_event_t事件*/
static ngx_event_t ngx_eventfd_event;
/*異步I/O事件完成落后行通知描寫符,也就是ngx_eventfd所對(duì)應(yīng)的ngx_connection事件*/
static void
ngx_epoll_aio_init(ngx_cycle_t *cycle, ngx_epoll_conf_t *epcf)
{
int n;
struct epoll_event ee;
#if (NGX_HAVE_SYS_EVENTFD_H)
ngx_eventfd = eventfd(0, 0);
#else
// 調(diào)用eventfd
ngx_eventfd = syscall(SYS_eventfd, 0);
#endif
if (ngx_eventfd == -1) {
ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,
"eventfd() failed");
ngx_file_aio = 0;
return;
}
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"eventfd: %d", ngx_eventfd);
n = 1;
// 設(shè)置ngx_eventfd為無(wú)阻塞
if (ioctl(ngx_eventfd, FIONBIO, &n) == -1) {
ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,
"ioctl(eventfd, FIONBIO) failed");
goto failed;
}
// 安裝 aio環(huán)境,初始化文件異步I/O上下文
if (io_setup(epcf->aio_requests, &ngx_aio_ctx) == -1) {
ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,
"io_setup() failed");
goto failed;
}
/*創(chuàng)建aio上下文環(huán)境ngx_aio_ctx(全局變量),初始化ngx_eventfd_event和ngx_eventfd_conn(二者都是全局變量,利用conn和event來(lái)進(jìn)行統(tǒng)1描寫,便于將eventfd、aio融會(huì)并適用到nginx的整體邏輯里*/
// 設(shè)置將要傳遞給epoll的數(shù)據(jù),可以看到都是和eventfd關(guān)聯(lián)的,設(shè)置用于異步I/O完成 通知的ngx_eventfd_event事件,它與ngx_eventfd_conn連接是對(duì)應(yīng)的
ngx_eventfd_event.data = &ngx_eventfd_conn;
// 這個(gè)就是當(dāng)eventfd可讀被通知時(shí),epoll所將要履行的 讀方法
ngx_eventfd_event.handler = ngx_epoll_eventfd_handler;
ngx_eventfd_event.log = cycle->log;
ngx_eventfd_event.active = 1;
// 初始化 ngx_eventfd_conn連接
ngx_eventfd_conn.fd = ngx_eventfd;
// ngx_eventfd_conn連接的讀事件就是上面的ngx_eventfd_event
ngx_eventfd_conn.read = &ngx_eventfd_event;
ngx_eventfd_conn.log = cycle->log;
// 檢測(cè)可讀事件 邊沿觸發(fā)
ee.events = EPOLLIN|EPOLLET;
ee.data.ptr = &ngx_eventfd_conn;
// 加入到epoll中 ,最后將代表aio的文件描寫符ngx_eventfd加入到epoll機(jī)制里,即完成eventfd與epoll的關(guān)聯(lián)
if (epoll_ctl(ep, EPOLL_CTL_ADD, ngx_eventfd, &ee) != -1) {
return;
}
ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,
"epoll_ctl(EPOLL_CTL_ADD, eventfd) failed");
if (io_destroy(ngx_aio_ctx) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"io_destroy() failed");
}
failed:
if (close(ngx_eventfd) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"eventfd close() failed");
}
ngx_eventfd = -1;
ngx_aio_ctx = 0;
ngx_file_aio = 0;
}
#endif
要是設(shè)置io_submit所需要的參數(shù),然后傳遞io要求給io_submit.這里有個(gè)關(guān)鍵的就是aio_flags的設(shè)置,這個(gè)標(biāo)記說(shuō)明我們要使用aio_resfd來(lái)接收aio履行的結(jié)果。其實(shí)也就是履行的io任務(wù)的個(gè)數(shù)
// 文件名:ngx_linux_aio_read.c
ssize_t
ngx_file_aio_read(ngx_file_t *file, u_char *buf, size_t size, off_t offset,
ngx_pool_t *pool)
{
ngx_err_t err;
struct iocb *piocb[1];
ngx_event_t *ev;
ngx_event_aio_t *aio;
if (!ngx_file_aio) {
return ngx_read_file(file, buf, size, offset);
}
if (file->aio == NULL && ngx_file_aio_init(file, pool) != NGX_OK) {
return NGX_ERROR;
}
aio = file->aio;
ev = &aio->event;
if (!ev->ready) {
ngx_log_error(NGX_LOG_ALERT, file->log, 0,
"second aio post for \"%V\"", &file->name);
return NGX_AGAIN;
}
ngx_log_debug4(NGX_LOG_DEBUG_CORE, file->log, 0,
"aio complete:%d @%O:%uz %V",
ev->complete, offset, size, &file->name);
if (ev->complete) {
ev->active = 0;
ev->complete = 0;
if (aio->res >= 0) {
ngx_set_errno(0);
return aio->res;
}
ngx_set_errno(-aio->res);
ngx_log_error(NGX_LOG_CRIT, file->log, ngx_errno,
"aio read \"%s\" failed", file->name.data);
return NGX_ERROR;
}
ngx_memzero(&aio->aiocb, sizeof(struct iocb));
aio->aiocb.aio_data = (uint64_t) (uintptr_t) ev;
// 這里設(shè)置讀,nginx只使用到了異步讀取,其中 1個(gè)很重要的緣由就是文件的異步I/O沒(méi)法利用緩存,而寫操作通常是落入緩存。
aio->aiocb.aio_lio_opcode = IOCB_CMD_PREAD;
aio->aiocb.aio_fildes = file->fd;
aio->aiocb.aio_buf = (uint64_t) (uintptr_t) buf;
aio->aiocb.aio_nbytes = size;
aio->aiocb.aio_offset = offset;
// 設(shè)置 了aio_flags
aio->aiocb.aio_flags = IOCB_FLAG_RESFD;
aio->aiocb.aio_resfd = ngx_eventfd;
ev->handler = ngx_file_aio_event_handler;
piocb[0] = &aio->aiocb;
// 提交要求
if (io_submit(ngx_aio_ctx, 1, piocb) == 1) {
ev->active = 1;
ev->ready = 0;
ev->complete = 0;
return NGX_AGAIN;
}
err = ngx_errno;
if (err == NGX_EAGAIN) {
return ngx_read_file(file, buf, size, offset);
}
ngx_log_error(NGX_LOG_CRIT, file->log, err,
"io_submit(\"%V\") failed", &file->name);
if (err == NGX_ENOSYS) {
ngx_file_aio = 0;
return ngx_read_file(file, buf, size, offset);
}
return NGX_ERROR;
}
當(dāng)有aio要求完成時(shí),文件描寫符ngx_eventfd將變得可讀,阻塞點(diǎn)epoll_wait()函數(shù)返回
static ngx_int_t
ngx_epoll_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags)
{
int events;
uint32_t revents;
ngx_int_t instance, i;
ngx_uint_t level;
ngx_err_t err;
ngx_event_t *rev, *wev;
ngx_queue_t *queue;
ngx_connection_t *c;
/* NGX_TIMER_INFINITE == INFTIM */
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"epoll timer: %M", timer);
/*調(diào)用epoll_wait() 獲得事件*/
events = epoll_wait(ep, event_list, (int) nevents, timer);
err = (events == -1) ? ngx_errno : 0;
/*Nginx 對(duì) 事件緩存和管理,當(dāng) flags標(biāo)志位唆使要 更新時(shí)間,就是在這里更新*/
if (flags & NGX_UPDATE_TIME || ngx_event_timer_alarm) {
// 更新時(shí)間
ngx_time_update();
}
if (err) {
if (err == NGX_EINTR) {
if (ngx_event_timer_alarm) {
ngx_event_timer_alarm = 0;
return NGX_OK;
}
level = NGX_LOG_INFO;
} else {
level = NGX_LOG_ALERT;
}
ngx_log_error(level, cycle->log, err, "epoll_wait() failed");
return NGX_ERROR;
}
if (events == 0) {
if (timer != NGX_TIMER_INFINITE) {
return NGX_OK;
}
ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
"epoll_wait() returned no events without timeout");
return NGX_ERROR;
}
// 遍歷 本次epoll_wait返回所有的 事件
for (i = 0; i < events; i++) {
/*這個(gè)對(duì)比ngx_epoll_add_event方法,成員ptr就是ngx_connection_t連接 地址,但 最后1位有特殊含義,需要將它屏蔽掉*/
c = event_list[i].data.ptr;
// 將最后1位取出來(lái),用instance變量標(biāo)識(shí)
instance = (uintptr_t) c & 1;
/*不論是32位還是64位機(jī)器,其地址的最后1位肯定是0,使用下面 的方法把ngx_connection_t的地址還原到真實(shí)的地址值*/
c = (ngx_connection_t *) ((uintptr_t) c & (uintptr_t) ~1);
// 取出讀事件
rev = c->read;
// 判斷事件 是不是過(guò)期
if (c->fd == -1 || rev->instance != instance) {
/*套接字 描寫符為⑴或instance標(biāo)示位不相等時(shí),表示這個(gè) 事件 已 過(guò)期了,不用途理了*/
/*
* the stale event from a file descriptor
* that was just closed in this iteration
*/
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"epoll: stale event %p", c);
continue;
}
// 取失事件的類型
revents = event_list[i].events;
ngx_log_debug3(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"epoll: fd:%d ev:%04XD d:%p",
c->fd, revents, event_list[i].data.ptr);
if (revents & (EPOLLERR|EPOLLHUP)) {
ngx_log_debug2(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"epoll_wait() error on fd:%d ev:%04XD",
c->fd, revents);
}
#if 0
if (revents & ~(EPOLLIN|EPOLLOUT|EPOLLERR|EPOLLHUP)) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
"strange epoll_wait() events fd:%d ev:%04XD",
c->fd, revents);
}
#endif
if ((revents & (EPOLLERR|EPOLLHUP))
&& (revents & (EPOLLIN|EPOLLOUT)) == 0)
{
/*
* if the error events were returned without EPOLLIN or EPOLLOUT,
* then add these flags to handle the events at least in one
* active handler
*/
revents |= EPOLLIN|EPOLLOUT;
}
// 如果是讀事件且該事件是活躍的
if ((revents & EPOLLIN) && rev->active) {
#if (NGX_HAVE_EPOLLRDHUP)
if (revents & EPOLLRDHUP) {
rev->pending_eof = 1;
}
rev->available = 1;
#endif
rev->ready = 1;
// flags參數(shù)中含有NGX_POST_EVENTS表示這批事件要延后處理
if (flags & NGX_POST_EVENTS) {
/*如果要在post隊(duì)列處理該事件,首先要判斷它是 新連接事件還是普通事件,決定把它加入到ngx_posted_acept_events隊(duì)列或ngx_posted_events隊(duì)列中*/
queue = rev->accept ? &ngx_posted_accept_events
: &ngx_posted_events;
ngx_post_event(rev, queue);
} else {
// 立即調(diào)用讀事件的回調(diào)方法處理這個(gè)事件
rev->handler(rev);
}
}
// 取出寫事件
wev = c->write;
if ((revents & EPOLLOUT) && wev->active) {
if (c->fd == -1 || wev->instance != instance) {
//判斷這個(gè)讀事件是不是過(guò)期
/*
* the stale event from a file descriptor
* that was just closed in this iteration
*/
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"epoll: stale event %p", c);
continue;
}
wev->ready = 1;
#if (NGX_THREADS)
wev->complete = 1;
#endif
if (flags & NGX_POST_EVENTS) {
// 這個(gè)事件添加到post隊(duì)列中延后處理
ngx_post_event(wev, &ngx_posted_events);
} else {
// 立即調(diào)用這個(gè)寫事件的回調(diào)方法來(lái)處理這個(gè)事件
wev->handler(wev);
}
}
}
return NGX_OK;
}
io要求完成后,nginx的回調(diào)函數(shù)如何處理的。前面的代碼分析我們知道回調(diào)函數(shù)是ngx_epoll_eventfd_handler,因此我們就來(lái)看這個(gè)函數(shù),它的流程比較簡(jiǎn)單,首先從eventfd中讀取返回的事件個(gè)數(shù),然后調(diào)用io_getevents來(lái)取得所完成的io要求事件。
static void
ngx_epoll_eventfd_handler(ngx_event_t *ev)
{
int n, events;
long i;
uint64_t ready;
ngx_err_t err;
ngx_event_t *e;
ngx_event_aio_t *aio;
//1次性最多處理64個(gè)事件
struct io_event event[64];
struct timespec ts;
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, ev->log, 0, "eventfd handler");
// 開始讀取完成的事件,并將完成的 數(shù)目 設(shè)置到ready 中,這個(gè)ready可以大于64
n = read(ngx_eventfd, &ready, 8);
err = ngx_errno;
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, ev->log, 0, "eventfd: %d", n);
if (n != 8) {
if (n == -1) {
if (err == NGX_EAGAIN) {
return;
}
ngx_log_error(NGX_LOG_ALERT, ev->log, err, "read(eventfd) failed");
return;
}
ngx_log_error(NGX_LOG_ALERT, ev->log, 0,
"read(eventfd) returned only %d bytes", n);
return;
}
ts.tv_sec = 0;
ts.tv_nsec = 0;
// ready 表示還有未處理的事件,當(dāng)ready大于0時(shí)繼續(xù)處理
while (ready) {
//用來(lái)得到所完成event,events就是事件的個(gè)數(shù),而event則是1個(gè)事件數(shù)組
events = io_getevents(ngx_aio_ctx, 1, 64, event, &ts);
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, ev->log, 0,
"io_getevents: %d", events);
if (events > 0) {
// 將ready減去已取出的事件
ready -= events;
//遍歷event,然后處理事件
for (i = 0; i < events; i++) {
ngx_log_debug4(NGX_LOG_DEBUG_EVENT, ev->log, 0,
"io_event: %XL %XL %L %L",
event[i].data, event[i].obj,
event[i].res, event[i].res2);
// data 成員指向這個(gè)異步I/O事件對(duì)應(yīng)著的實(shí)際事件
e = (ngx_event_t *) (uintptr_t) event[i].data;
e->complete = 1;
e->active = 0;
e->ready = 1;
aio = e->data;
aio->res = event[i].res;
//post事件,將該事件放到ngx_posted_events隊(duì)列 中等待處理
ngx_post_event(e, &ngx_posted_events);
}
continue;
}
if (events == 0) {
return;
}
/* events == ⑴ */
ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_errno,
"io_getevents() failed");
return;
}
}
在linux下,nginx把a(bǔ)io結(jié)合到epoll里使用。ngx_epoll_init() -> ngx_epoll_aio_init()
下一篇 UI控件之菜單(Menu)