下面文章在這段時間內研究 select/poll/epoll的內核實現的一點心得體會:
select,poll,epoll都是多路復用IO的函數,簡單說就是在一個線程里,可以同時處理多個文件描述符的讀寫。
select/poll的實現很類似,epoll是從select/poll擴展而來,主要是為了解決select/poll天生的缺陷。
epoll在內核版本2.6以上才出現的新的函數,而他們在linux內核中的實現都是十分相似。
這三種函數都需要設備驅動提供poll回調函數,對于套接字而言,他們是 tcp_poll,udp_poll和datagram_poll;
對于自己開發的設備驅動而言,是自己實現的poll接口函數。
select實現(2.6的內核,其他版本的內核,應該都相差不多)
應用程序調用select,進入內核調用sys_select,做些簡單初始化工作,接著進入 core_sys_select,
此函數主要工作是把描述符集合從用戶空間復制到內核空間, 最終進入do_select,完成其主要的功能。
do_select里,調用 poll_initwait,主要工作是注冊poll_wait的回調函數為__pollwait,
當在設備驅動的poll回調函數里調用poll_wait,其實就是調用__pollwait,
__pollwait的主要工作是把當前進程掛載到等待隊列里,當等待的事件到來就會喚醒此進程。
接著執行for循環,循環里首先遍歷每個文件描述符,調用對應描述符的poll回調函數,檢測是否就緒,
遍歷完所有描述符之后,只要有描述符處于就緒狀態,信號中斷,出錯或者超時,就退出循環,
否則會調用schedule_xxx函數,讓當前進程睡眠,一直到超時或者有描述符就緒被喚醒。
接著又會再次遍歷每個描述符,調用poll再次檢測。
如此循環,直到符合條件才會退出。
以下是 2.6.31內核的有關select函數的部分片段:
他們調用關系:
select --> sys_select --> core_sys_select --> do_select
int do_select(int n, fd_set_bits *fds, struct timespec *end_time){ ktime_t expire, *to = NULL; struct poll_wqueues table; poll_table *wait; int retval, i, timed_out = 0; unsigned long slack = 0; ///這里為了獲得集合中的最大描述符,這樣可減少循環中遍歷的次數。 ///也就是為什么linux中select第一個參數為何如此重要了 rcu_read_lock(); retval = max_select_fd(n, fds); rcu_read_unlock(); if (retval < 0) return retval; n = retval; ////初始化 poll_table結構,其中一個重要任務是把 __pollwait函數地址賦值給它, poll_initwait(&table); wait = &table.pt; if (end_time && !end_time->tv_sec && !end_time->tv_nsec) { wait = NULL; timed_out = 1; } if (end_time && !timed_out) slack = estimate_accuracy(end_time); retval = 0; ///主循環,將會在這里完成描述符的狀態輪訓 for (;;) { unsigned long *rinp, *routp, *rexp, *inp, *outp, *exp; inp = fds->in; outp = fds->out; exp = fds->ex; rinp = fds->res_in; routp = fds->res_out; rexp = fds->res_ex; for (i = 0; i < n; ++rinp, ++routp, ++rexp) { unsigned long in, out, ex, all_bits, bit = 1, mask, j; unsigned long res_in = 0, res_out = 0, res_ex = 0; const struct file_operations *f_op = NULL; struct file *file = NULL; ///select中 fd_set 以及 do_select 中的 fd_set_bits 參數,都是按照位來保存描述符,意思是比如申請一個1024位的內存, ///如果第 28位置1,說明此集合有 描述符 28, in = *inp++; out = *outp++; ex = *exp++; all_bits = in | out | ex; // 檢測讀寫異常3個集合中有無描述符 if (all_bits == 0) { i += __NFDBITS; continue; } for (j = 0; j < __NFDBITS; ++j, ++i, bit <<= 1) { int fput_needed; if (i >= n) break; if (!(bit & all_bits)) continue; file = fget_light(i, &fput_needed); ///通過 描述符 index 獲得 struct file結構指針, if (file) { f_op = file->f_op; //通過 struct file 獲得 file_operations,這是操作文件的回調函數集合。 mask = DEFAULT_POLLMASK; if (f_op && f_op->poll) { wait_key_set(wait, in, out, bit); mask = (*f_op->poll)(file, wait); //調用我們的設備中實現的 poll函數, //因此,為了能讓select正常工作,在我們設備驅動中,必須要提供poll的實現, } fput_light(file, fput_needed); if ((mask & POLLIN_SET) && (in & bit)) { res_in |= bit; retval++; wait = NULL; /// 此處包括以下的,把wait設置為NULL,是因為檢測到mask = (*f_op->poll)(file, wait); 描述符已經就緒 /// 無需再把當前進程添加到等待隊列里,do_select 遍歷完所有描述符之后就會退出。 } if ((mask & POLLOUT_SET) && (out & bit)) { res_out |= bit; retval++; wait = NULL; } if ((mask & POLLEX_SET) && (ex & bit)) { res_ex |= bit; retval++; wait = NULL; } } } if (res_in) *rinp = res_in; if (res_out) *routp = res_out; if (res_ex) *rexp = res_ex; cond_resched(); } wait = NULL; //已經遍歷完一遍,該加到等待隊列的,都已經加了,無需再加,因此設置為NULL if (retval || timed_out || signal_pending(current)) //描述符就緒,超時,或者信號中斷就退出循環 break; if (table.error) {//出錯退出循環 retval = table.error; break; } /* * If this is the first loop and we have a timeout * given, then we convert to ktime_t and set the to * pointer to the expiry value. */ if (end_time && !to) { expire = timespec_to_ktime(*end_time); to = &expire; } /////讓進程休眠,直到超時,或者被就緒的描述符喚醒, if (!poll_schedule_timeout(&table, TASK_INTERRUPTIBLE, to, slack)) timed_out = 1; } poll_freewait(&table); return retval;}void poll_initwait(struct poll_wqueues *pwq){ init_poll_funcptr(&pwq->pt, __pollwait); //設置poll_table的回調函數為 __pollwait,這樣當我們在驅動中調用poll_wait 就會調用到 __pollwait ........}static void __pollwait(struct file *filp, wait_queue_head_t *wait_address, poll_table *p){ ................... init_waitqueue_func_entry(&entry->wait, pollwake); // 設置喚醒進程調用的回調函數,當在驅動中調用 wake_up喚醒隊列時候, // pollwake會被調用,這里其實就是調用隊列的默認函數 default_wake_function // 用來喚醒睡眠的進程。 add_wait_queue(wait_address, &entry->wait); //加入到等待隊列}int core_sys_select(int n, fd_set __user *inp, fd_set __user *outp, fd_set __user *exp, struct timespec *end_time){ ........ //把描述符集合從用戶空間復制到內核空間 if ((ret = get_fd_set(n, inp, fds.in)) || (ret = get_fd_set(n, outp, fds.out)) || (ret = get_fd_set(n, exp, fds.ex))) ......... ret = do_select(n, &fds, end_time); ............. ////把do_select返回集合,從內核空間復制到用戶空間 if (set_fd_set(n, inp, fds.res_in) || set_fd_set(n, outp, fds.res_out) || set_fd_set(n, exp, fds.res_ex)) ret = -EFAULT; ............}
poll的實現跟select基本差不多,按照
poll --> do_sys_poll --> do_poll --> do_pollfd 的調用序列
其中do_pollfd是對每個描述符調用 其回調poll狀態輪訓。
poll比select的好處就是沒有描述多少限制,select 有1024 的限制,描述符不能超過此值,poll不受限制。
我們從上面代碼分析,可以總結出select/poll天生的缺陷:
1)每次調用select/poll都需要要把描述符集合從用戶空間copy到內核空間,檢測完成之后,又要把檢測的結果集合從內核空間copy到用戶空間
當描述符很多,而且select經常被喚醒,這種開銷會比較大
2)如果說描述符集合來回復制不算什么,那么多次的全部描述符遍歷就比較恐怖了,
我們在應用程序中,每次調用select/poll 都必須首先遍歷描述符,把他們加到fd_set集合里,這是應用層的第一次遍歷,
接著進入內核空間,至少進行一次遍歷和調用每個描述符的poll回調檢測,一般可能是2次遍歷,第一次沒發現就緒描述符,
加入等待隊列,第二次是被喚醒,接著再遍歷一遍。再回到應用層,我們還必須再次遍歷所有描述符,用 FD_ISSET檢測結果集。
如果描述符很多,這種遍歷就很消耗CPU資源了。
3)描述符多少限制,當然poll沒有限制,select卻有1024的硬性限制,除了修改內核增加1024限制外沒別的辦法。
既然有這么些缺點 ,那不是 select/poll變得一無是處了,那就大錯特錯了。
他們依然是代碼移植的最好函數,因為幾乎所有平臺都有對它們的實現提供接口。
在描述符不是太多,他們依然十分出色的完成多路復用IO,
而且如果每個連接上的描述符都處于活躍狀態,他們的效率其實跟epoll也差不了多少。
曾經使用多個線程+每個線程采用poll的辦法開發TCP服務器,處理文件收發,連接達到幾千個,
當時的瓶頸已經不在網絡IO,而在磁盤IO了。
我們再來看epoll為了解決select/poll天生的缺陷,是如何實現的。
epoll只是select/poll的擴展,他不是在linux內核中另起爐灶,做顛覆性的設計的,他只是在select的基礎上來解決他們的缺陷。
他的底層依然需要設備驅動提供poll回調來作為狀態檢測基礎。
epoll分為三個函數 epoll_create,epoll_ctl, epoll_wait 。
他們的實現在 eventpoll.c代碼里。
epoll_create創建epoll設備,用來管理所有添加進去的描述符,epoll_ctl 用來添加新的描述符,修改或者刪除描述符。
epoll_wait等待描述符事件。
epoll_wait的等待已經不再是輪訓方式的等待了,epoll內部有個描述符就緒隊列,epoll_wait只檢測這個隊列即可,
他采用睡眠一會檢測一下的方式,如果發現描述符就緒隊列不為空,就把此隊列中的描述符copy到用戶空間,然后返回。
描述符就緒隊列里的數據又是從何而來的?
原來使用 epoll_ctl添加新描述符時候,epoll_ctl內核實現里會修改兩個回調函數,
一個是 poll_table結構里的qproc回調函數指針,
在 select中是 __pollwait函數,在epoll中換成 ep_ptable_queue_proc,
當在epoll_ctl中調用新添加的描述符的poll回調時候,底層驅動就會調用 poll_wait添加等待隊列,
底層驅動調用poll_wait時候,
其實就是調用ep_ptable_queue_proc,此函數會修改等待隊列的回調函數為 ep_poll_callback, 并加入到等待隊列頭里;
一旦底層驅動發現數據就緒,就會調用wake_up喚醒等待隊列,從而 ep_poll_callback將被調用,
在ep_poll_callback中 會把這個就緒的描述符添加到 epoll的描述符就緒隊列里,并同時喚醒 epoll_wait 所在的進程。
如此這般,就是epoll的內核實現的精髓。
看他是如何解決 select/poll的缺陷的, 首先他通過 epoll_ctl的EPOLL_CTL_ADD命令把描述符添加進epoll內部管理器里,
只需添加一次即可,直到用 epoll_ctl的EPOLL_CTL_DEL命令刪除此描述符為止,
而不像select/poll是每次執行都必須添加,很顯然大量減少了描述符在內核和用戶空間不斷的來回copy的開銷。
其次雖然 epoll_wait內部也是循環檢測,但是它只需檢測描述符就緒隊列是否為空即可,
比起select/poll必須輪訓每個描述符的poll,其開銷簡直可以忽略不計。
他同時也沒描述符多少的限制,只要你機器的內存夠大,就能容納非常多的描述符。
以下是 epoll相關部分內核代碼片段:
struct epitem { /* RB tree node used to link this structure to the eventpoll RB tree */ struct rb_node rbn; // 紅黑樹節點, struct epoll_filefd ffd; // 存儲此變量對應的描述符 struct epoll_event event; //用戶定義的結構 /*其他成員*/};struct eventpoll { /*其他成員*/ ....... /* Wait queue used by file->poll() */ wait_queue_head_t poll_wait; /* List of ready file descriptors */ struct list_head rdllist; ///描述符就緒隊列,掛載的是 epitem結構 /* RB tree root used to store monitored fd structs */ struct rb_root rbr; /// 存儲 新添加的 描述符的紅黑樹根, 此成員用來存儲添加進來的所有描述符。掛載的是epitem結構 .........};//epoll_createSYSCALL_DEFINE1(epoll_create1, int, flags){ int error; struct eventpoll *ep = NULL; /*其他代碼*/ ...... //分配 eventpoll結構,這個結構是epoll的靈魂,他包含了所有需要處理得數據。 error = ep_alloc(&ep); if (error < 0) return error; error = anon_inode_getfd("[eventpoll]", &eventpoll_fops, ep, flags & O_CLOEXEC); ///打開 eventpoll 的描述符,并把 ep存儲到 file->private_data變量里。 if (error < 0) ep_free(ep); return error;}SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd, struct epoll_event __user *, event){ /*其他代碼*/ ..... ep = file->private_data; ...... epi = ep_find(ep, tfile, fd); ///從 eventpoll的 rbr里查找描述符是 fd 的 epitem, error = -EINVAL; switch (op) { case EPOLL_CTL_ADD: if (!epi) { epds.events |= POLLERR | POLLHUP; error = ep_insert(ep, &epds, tfile, fd); // 在這個函數里添加新描述符,同時修改重要的回調函數。 //同時還調用描述符的poll,查看就緒狀態 } else error = -EEXIST; break; /*其他代碼*/ ........}static int ep_insert(struct eventpoll *ep, struct epoll_event *event, struct file *tfile, int fd){ ..... /*其他代碼*/ init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);//設置 poll_tabe回調函數為 ep_ptable_queue_proc //ep_ptable_queue_proc會設置等待隊列的回調指針為 ep_epoll_callback,同時添加等待隊列。 ........ /*其他代碼*/ revents = tfile->f_op->poll(tfile, &epq.pt); //調用描述符的poll回調,在此函數里 ep_ptable_queue_proc會被調用 ....... /*其他代碼*/ ep_rbtree_insert(ep, epi); //把新生成關于epitem添加到紅黑樹里 ...... /*其他代碼*/ if ((revents & event->events) && !ep_is_linked(&epi->rdllink)) { list_add_tail(&epi->rdllink, &ep->rdllist); //如果 上邊的poll調用,檢測到描述符就緒,添加本描述符到就緒隊列里。 if (waitqueue_active(&ep->wq)) wake_up_locked(&ep->wq); if (waitqueue_active(&ep->poll_wait)) pwake++; } ...... /*其他代碼*/ /* We have to call this outside the lock */ if (pwake) ep_poll_safewake(&ep->poll_wait); // 如果描述符就緒隊列不為空,則喚醒 epoll_wait所在的進程。 ......... /*其他代碼*/}//這個函數設置等待隊列回調函數為 ep_poll_callback,//這樣到底層有數據喚醒等待隊列時候,ep_poll_callback就會被調用,從而把就緒的描述符加到就緒隊列。static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead, poll_table *pt){ struct epitem *epi = ep_item_from_epqueue(pt); struct eppoll_entry *pwq; if (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) { init_waitqueue_func_entry(&pwq->wait, ep_poll_callback); pwq->whead = whead; pwq->base = epi; add_wait_queue(whead, &pwq->wait); list_add_tail(&pwq->llink, &epi->pwqlist); epi->nwait++; } else { /* We have to signal that an error occurred */ epi->nwait = -1; }}static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key){ int pwake = 0; unsigned long flags; struct epitem *epi = ep_item_from_wait(wait); struct eventpoll *ep = epi->ep; ......... /*其他代碼*/ if (!ep_is_linked(&epi->rdllink)) list_add_tail(&epi->rdllink, &ep->rdllist); // 把當前就緒的描述epitem結構添加到就緒隊列里 ......... /*其他代碼*/ if (pwake) ep_poll_safewake(&ep->poll_wait); //如果隊列不為空,喚醒 epoll_wait所在進程 ......... /*其他代碼*/}epoll_wait內核代碼里主要是調用ep_poll,列出ep_poll部分代碼片段:static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events, int maxevents, long timeout){ int res, eavail; unsigned long flags; long jtimeout; wait_queue_t wait; ......... /*其他代碼*/ if (list_empty(&ep->rdllist)) { init_waitqueue_entry(&wait, current); wait.flags |= WQ_FLAG_EXCLUSIVE; __add_wait_queue(&ep->wq, &wait); // 如果檢測到就緒隊列為空,添加當前進程到等待隊列,并執行否循環 for (;;) { set_current_state(TASK_INTERRUPTIBLE); if (!list_empty(&ep->rdllist) || !jtimeout) //如果就緒隊列不為空,或者超時則退出循環 break; if (signal_pending(current)) { //如果信號中斷,退出循環 res = -EINTR; break; } spin_unlock_irqrestore(&ep->lock, flags); jtimeout = schedule_timeout(jtimeout);//睡眠,知道被喚醒或者超時為止。 spin_lock_irqsave(&ep->lock, flags); } __remove_wait_queue(&ep->wq, &wait); set_current_state(TASK_RUNNING); } ......... /*其他代碼*/ if (!res && eavail && !(res = ep_send_events(ep, events, maxevents)) && jtimeout) goto retry; // ep_send_events主要任務是把就緒隊列的就緒描述符copy到用戶空間的 epoll_event數組里, return res;}
可以看到 ep_poll既epoll_wait的循環是相當輕松的循環,他只是簡單檢測就緒隊列而已,因此他的開銷很小。
我們最后看看描述符就緒時候,是如何通知給select/poll/epoll的,以網絡套接字的TCP協議來進行說明。
tcp協議對應的 poll回調是tcp_poll, 對應的等待隊列頭是 struct sock結構里 sk_sleep成員,
在tcp_poll中會把 sk_sleep加入到等待隊列,等待數據就緒。
當物理網卡接收到數據包,引發硬件中斷,驅動在中斷ISR例程里,構建skb包,把數據copy進skb,接著調用netif_rx
把skb掛載到CPU相關的 input_pkt_queue隊列,同時引發軟中斷,在軟中斷的net_rx_action回調函數里從input_pkt_queue里取出
skb數據包,通過分析,調用協議相關的回調函數,這樣層層傳遞,一直到struct sock,此結構里的 sk_data_ready回調指針被調用
sk_data_ready指向 sock_def_readable 函數,sock_def_readable函數其實就是 wake_up 喚醒 sock結構里 的 sk_sleep。
以上機制,對 select/poll/epoll都是一樣的,接下來喚醒 sk_sleep方式就不一樣了,因為他們指向了不同的回調函數。
在 select/poll實現中,等待隊列回調函數是 pollwake其實就是調用default_wake_function,喚醒被select阻塞住的進程。
epoll實現中,等待回調函數是 ep_poll_callback, 此回調函數只是把就緒描述符加入到epoll的就緒隊列里。
所以呢 select/poll/epoll其實他們在內核實現中,差別也不是太大,其實都差不多。
epoll雖然效率不錯,可以跟windows平臺中的完成端口比美,但是移植性太差,
目前幾乎就只有linux平臺才實現了epoll而且必須是2.6以上的內核版本。
|
新聞熱點
疑難解答
圖片精選