a亚洲精品_精品国产91乱码一区二区三区_亚洲精品在线免费观看视频_欧美日韩亚洲国产综合_久久久久久久久久久成人_在线区

首頁 > 編程 > C > 正文

linux內核select/poll,epoll實現與區別

2020-01-26 14:23:22
字體:
來源:轉載
供稿:網友

下面文章在這段時間內研究 select/poll/epoll的內核實現的一點心得體會:
select,poll,epoll都是多路復用IO的函數,簡單說就是在一個線程里,可以同時處理多個文件描述符的讀寫。
select/poll的實現很類似,epoll是從select/poll擴展而來,主要是為了解決select/poll天生的缺陷。
epoll在內核版本2.6以上才出現的新的函數,而他們在linux內核中的實現都是十分相似。
這三種函數都需要設備驅動提供poll回調函數,對于套接字而言,他們是 tcp_poll,udp_poll和datagram_poll;
對于自己開發的設備驅動而言,是自己實現的poll接口函數。

select實現(2.6的內核,其他版本的內核,應該都相差不多)
應用程序調用select,進入內核調用sys_select,做些簡單初始化工作,接著進入 core_sys_select,
此函數主要工作是把描述符集合從用戶空間復制到內核空間, 最終進入do_select,完成其主要的功能。
do_select里,調用 poll_initwait,主要工作是注冊poll_wait的回調函數為__pollwait,
當在設備驅動的poll回調函數里調用poll_wait,其實就是調用__pollwait,
__pollwait的主要工作是把當前進程掛載到等待隊列里,當等待的事件到來就會喚醒此進程。
接著執行for循環,循環里首先遍歷每個文件描述符,調用對應描述符的poll回調函數,檢測是否就緒,
遍歷完所有描述符之后,只要有描述符處于就緒狀態,信號中斷,出錯或者超時,就退出循環,
否則會調用schedule_xxx函數,讓當前進程睡眠,一直到超時或者有描述符就緒被喚醒。
接著又會再次遍歷每個描述符,調用poll再次檢測。
如此循環,直到符合條件才會退出。
以下是 2.6.31內核的有關select函數的部分片段:
他們調用關系:
select --> sys_select --> core_sys_select --> do_select

int do_select(int n, fd_set_bits *fds, struct timespec *end_time){  ktime_t expire, *to = NULL;  struct poll_wqueues table;  poll_table *wait;  int retval, i, timed_out = 0;  unsigned long slack = 0;    ///這里為了獲得集合中的最大描述符,這樣可減少循環中遍歷的次數。  ///也就是為什么linux中select第一個參數為何如此重要了  rcu_read_lock();  retval = max_select_fd(n, fds);  rcu_read_unlock();  if (retval < 0)    return retval;  n = retval;  ////初始化 poll_table結構,其中一個重要任務是把 __pollwait函數地址賦值給它,  poll_initwait(&table);  wait = &table.pt;  if (end_time && !end_time->tv_sec && !end_time->tv_nsec) {    wait = NULL;    timed_out = 1;  }  if (end_time && !timed_out)    slack = estimate_accuracy(end_time);  retval = 0;  ///主循環,將會在這里完成描述符的狀態輪訓  for (;;) {    unsigned long *rinp, *routp, *rexp, *inp, *outp, *exp;    inp = fds->in; outp = fds->out; exp = fds->ex;    rinp = fds->res_in; routp = fds->res_out; rexp = fds->res_ex;    for (i = 0; i < n; ++rinp, ++routp, ++rexp) {      unsigned long in, out, ex, all_bits, bit = 1, mask, j;      unsigned long res_in = 0, res_out = 0, res_ex = 0;      const struct file_operations *f_op = NULL;      struct file *file = NULL;      ///select中 fd_set 以及 do_select 中的 fd_set_bits 參數,都是按照位來保存描述符,意思是比如申請一個1024位的內存,      ///如果第 28位置1,說明此集合有 描述符 28,       in = *inp++; out = *outp++; ex = *exp++;      all_bits = in | out | ex; // 檢測讀寫異常3個集合中有無描述符      if (all_bits == 0) {        i += __NFDBITS;        continue;      }      for (j = 0; j < __NFDBITS; ++j, ++i, bit <<= 1) {        int fput_needed;        if (i >= n)          break;        if (!(bit & all_bits))          continue;        file = fget_light(i, &fput_needed); ///通過 描述符 index 獲得 struct file結構指針,        if (file) {          f_op = file->f_op; //通過 struct file 獲得 file_operations,這是操作文件的回調函數集合。          mask = DEFAULT_POLLMASK;          if (f_op && f_op->poll) {            wait_key_set(wait, in, out, bit);            mask = (*f_op->poll)(file, wait); //調用我們的設備中實現的 poll函數,                                     //因此,為了能讓select正常工作,在我們設備驅動中,必須要提供poll的實現,          }          fput_light(file, fput_needed);          if ((mask & POLLIN_SET) && (in & bit)) {            res_in |= bit;            retval++;            wait = NULL; /// 此處包括以下的,把wait設置為NULL,是因為檢測到mask = (*f_op->poll)(file, wait); 描述符已經就緒                       /// 無需再把當前進程添加到等待隊列里,do_select 遍歷完所有描述符之后就會退出。          }           if ((mask & POLLOUT_SET) && (out & bit)) {            res_out |= bit;            retval++;            wait = NULL;          }          if ((mask & POLLEX_SET) && (ex & bit)) {            res_ex |= bit;            retval++;            wait = NULL;          }        }      }      if (res_in)        *rinp = res_in;      if (res_out)        *routp = res_out;      if (res_ex)        *rexp = res_ex;      cond_resched();    }    wait = NULL; //已經遍歷完一遍,該加到等待隊列的,都已經加了,無需再加,因此設置為NULL    if (retval || timed_out || signal_pending(current)) //描述符就緒,超時,或者信號中斷就退出循環      break;    if (table.error) {//出錯退出循環      retval = table.error;      break;    }    /*     * If this is the first loop and we have a timeout     * given, then we convert to ktime_t and set the to     * pointer to the expiry value.     */    if (end_time && !to) {      expire = timespec_to_ktime(*end_time);      to = &expire;    }    /////讓進程休眠,直到超時,或者被就緒的描述符喚醒,    if (!poll_schedule_timeout(&table, TASK_INTERRUPTIBLE,            to, slack))      timed_out = 1;  }  poll_freewait(&table);  return retval;}void poll_initwait(struct poll_wqueues *pwq){  init_poll_funcptr(&pwq->pt, __pollwait); //設置poll_table的回調函數為 __pollwait,這樣當我們在驅動中調用poll_wait 就會調用到 __pollwait  ........}static void __pollwait(struct file *filp, wait_queue_head_t *wait_address,        poll_table *p){  ...................  init_waitqueue_func_entry(&entry->wait, pollwake); // 設置喚醒進程調用的回調函數,當在驅動中調用 wake_up喚醒隊列時候,                                          // pollwake會被調用,這里其實就是調用隊列的默認函數 default_wake_function                                          // 用來喚醒睡眠的進程。  add_wait_queue(wait_address, &entry->wait);     //加入到等待隊列}int core_sys_select(int n, fd_set __user *inp, fd_set __user *outp,        fd_set __user *exp, struct timespec *end_time){  ........  //把描述符集合從用戶空間復制到內核空間  if ((ret = get_fd_set(n, inp, fds.in)) ||    (ret = get_fd_set(n, outp, fds.out)) ||    (ret = get_fd_set(n, exp, fds.ex)))  .........  ret = do_select(n, &fds, end_time);  .............  ////把do_select返回集合,從內核空間復制到用戶空間  if (set_fd_set(n, inp, fds.res_in) ||    set_fd_set(n, outp, fds.res_out) ||    set_fd_set(n, exp, fds.res_ex))    ret = -EFAULT;   ............}

poll的實現跟select基本差不多,按照
poll --> do_sys_poll --> do_poll --> do_pollfd 的調用序列
其中do_pollfd是對每個描述符調用 其回調poll狀態輪訓。
poll比select的好處就是沒有描述多少限制,select 有1024 的限制,描述符不能超過此值,poll不受限制。
我們從上面代碼分析,可以總結出select/poll天生的缺陷:
1)每次調用select/poll都需要要把描述符集合從用戶空間copy到內核空間,檢測完成之后,又要把檢測的結果集合從內核空間copy到用戶空間
當描述符很多,而且select經常被喚醒,這種開銷會比較大
2)如果說描述符集合來回復制不算什么,那么多次的全部描述符遍歷就比較恐怖了,
我們在應用程序中,每次調用select/poll 都必須首先遍歷描述符,把他們加到fd_set集合里,這是應用層的第一次遍歷,
接著進入內核空間,至少進行一次遍歷和調用每個描述符的poll回調檢測,一般可能是2次遍歷,第一次沒發現就緒描述符,
加入等待隊列,第二次是被喚醒,接著再遍歷一遍。再回到應用層,我們還必須再次遍歷所有描述符,用 FD_ISSET檢測結果集。
如果描述符很多,這種遍歷就很消耗CPU資源了。
3)描述符多少限制,當然poll沒有限制,select卻有1024的硬性限制,除了修改內核增加1024限制外沒別的辦法。
既然有這么些缺點 ,那不是 select/poll變得一無是處了,那就大錯特錯了。
他們依然是代碼移植的最好函數,因為幾乎所有平臺都有對它們的實現提供接口。
在描述符不是太多,他們依然十分出色的完成多路復用IO,
而且如果每個連接上的描述符都處于活躍狀態,他們的效率其實跟epoll也差不了多少。
曾經使用多個線程+每個線程采用poll的辦法開發TCP服務器,處理文件收發,連接達到幾千個,
當時的瓶頸已經不在網絡IO,而在磁盤IO了。

我們再來看epoll為了解決select/poll天生的缺陷,是如何實現的。
epoll只是select/poll的擴展,他不是在linux內核中另起爐灶,做顛覆性的設計的,他只是在select的基礎上來解決他們的缺陷。
他的底層依然需要設備驅動提供poll回調來作為狀態檢測基礎。
epoll分為三個函數 epoll_create,epoll_ctl, epoll_wait 。
他們的實現在 eventpoll.c代碼里。
epoll_create創建epoll設備,用來管理所有添加進去的描述符,epoll_ctl 用來添加新的描述符,修改或者刪除描述符。
epoll_wait等待描述符事件。
epoll_wait的等待已經不再是輪訓方式的等待了,epoll內部有個描述符就緒隊列,epoll_wait只檢測這個隊列即可,
他采用睡眠一會檢測一下的方式,如果發現描述符就緒隊列不為空,就把此隊列中的描述符copy到用戶空間,然后返回。
描述符就緒隊列里的數據又是從何而來的?
原來使用 epoll_ctl添加新描述符時候,epoll_ctl內核實現里會修改兩個回調函數,
一個是 poll_table結構里的qproc回調函數指針,
在 select中是 __pollwait函數,在epoll中換成 ep_ptable_queue_proc,
當在epoll_ctl中調用新添加的描述符的poll回調時候,底層驅動就會調用 poll_wait添加等待隊列,
底層驅動調用poll_wait時候,
其實就是調用ep_ptable_queue_proc,此函數會修改等待隊列的回調函數為 ep_poll_callback, 并加入到等待隊列頭里;
一旦底層驅動發現數據就緒,就會調用wake_up喚醒等待隊列,從而 ep_poll_callback將被調用,
在ep_poll_callback中 會把這個就緒的描述符添加到 epoll的描述符就緒隊列里,并同時喚醒 epoll_wait 所在的進程。
如此這般,就是epoll的內核實現的精髓。
看他是如何解決 select/poll的缺陷的, 首先他通過 epoll_ctl的EPOLL_CTL_ADD命令把描述符添加進epoll內部管理器里,
只需添加一次即可,直到用 epoll_ctl的EPOLL_CTL_DEL命令刪除此描述符為止,
而不像select/poll是每次執行都必須添加,很顯然大量減少了描述符在內核和用戶空間不斷的來回copy的開銷。
其次雖然 epoll_wait內部也是循環檢測,但是它只需檢測描述符就緒隊列是否為空即可,
比起select/poll必須輪訓每個描述符的poll,其開銷簡直可以忽略不計。
他同時也沒描述符多少的限制,只要你機器的內存夠大,就能容納非常多的描述符。

以下是 epoll相關部分內核代碼片段:

struct epitem {  /* RB tree node used to link this structure to the eventpoll RB tree */  struct rb_node rbn; //   紅黑樹節點,    struct epoll_filefd ffd;  // 存儲此變量對應的描述符  struct epoll_event event; //用戶定義的結構  /*其他成員*/};struct eventpoll {  /*其他成員*/  .......  /* Wait queue used by file->poll() */  wait_queue_head_t poll_wait;  /* List of ready file descriptors */  struct list_head rdllist;      ///描述符就緒隊列,掛載的是 epitem結構   /* RB tree root used to store monitored fd structs */  struct rb_root rbr; /// 存儲 新添加的 描述符的紅黑樹根, 此成員用來存儲添加進來的所有描述符。掛載的是epitem結構     .........};//epoll_createSYSCALL_DEFINE1(epoll_create1, int, flags){  int error;  struct eventpoll *ep = NULL;  /*其他代碼*/  ......  //分配 eventpoll結構,這個結構是epoll的靈魂,他包含了所有需要處理得數據。   error = ep_alloc(&ep);  if (error < 0)    return error;    error = anon_inode_getfd("[eventpoll]", &eventpoll_fops, ep,         flags & O_CLOEXEC); ///打開 eventpoll 的描述符,并把 ep存儲到 file->private_data變量里。  if (error < 0)    ep_free(ep);  return error;}SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,    struct epoll_event __user *, event){  /*其他代碼*/  .....  ep = file->private_data;  ......  epi = ep_find(ep, tfile, fd);  ///從 eventpoll的 rbr里查找描述符是 fd 的 epitem,  error = -EINVAL;  switch (op) {  case EPOLL_CTL_ADD:    if (!epi) {      epds.events |= POLLERR | POLLHUP;      error = ep_insert(ep, &epds, tfile, fd);   // 在這個函數里添加新描述符,同時修改重要的回調函數。                                      //同時還調用描述符的poll,查看就緒狀態    } else      error = -EEXIST;    break;   /*其他代碼*/   ........}static int ep_insert(struct eventpoll *ep, struct epoll_event *event,       struct file *tfile, int fd){  ..... /*其他代碼*/   init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);//設置 poll_tabe回調函數為 ep_ptable_queue_proc         //ep_ptable_queue_proc會設置等待隊列的回調指針為 ep_epoll_callback,同時添加等待隊列。    ........ /*其他代碼*/    revents = tfile->f_op->poll(tfile, &epq.pt); //調用描述符的poll回調,在此函數里 ep_ptable_queue_proc會被調用  ....... /*其他代碼*/  ep_rbtree_insert(ep, epi); //把新生成關于epitem添加到紅黑樹里  ...... /*其他代碼*/  if ((revents & event->events) && !ep_is_linked(&epi->rdllink)) {     list_add_tail(&epi->rdllink, &ep->rdllist); //如果 上邊的poll調用,檢測到描述符就緒,添加本描述符到就緒隊列里。    if (waitqueue_active(&ep->wq))      wake_up_locked(&ep->wq);    if (waitqueue_active(&ep->poll_wait))      pwake++;  }  ...... /*其他代碼*/  /* We have to call this outside the lock */  if (pwake)    ep_poll_safewake(&ep->poll_wait); // 如果描述符就緒隊列不為空,則喚醒 epoll_wait所在的進程。  ......... /*其他代碼*/}//這個函數設置等待隊列回調函數為 ep_poll_callback,//這樣到底層有數據喚醒等待隊列時候,ep_poll_callback就會被調用,從而把就緒的描述符加到就緒隊列。static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead,         poll_table *pt){  struct epitem *epi = ep_item_from_epqueue(pt);  struct eppoll_entry *pwq;  if (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) {    init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);    pwq->whead = whead;    pwq->base = epi;    add_wait_queue(whead, &pwq->wait);    list_add_tail(&pwq->llink, &epi->pwqlist);    epi->nwait++;  } else {    /* We have to signal that an error occurred */    epi->nwait = -1;  }}static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key){  int pwake = 0;  unsigned long flags;  struct epitem *epi = ep_item_from_wait(wait);  struct eventpoll *ep = epi->ep;  ......... /*其他代碼*/  if (!ep_is_linked(&epi->rdllink))    list_add_tail(&epi->rdllink, &ep->rdllist); // 把當前就緒的描述epitem結構添加到就緒隊列里  ......... /*其他代碼*/  if (pwake)    ep_poll_safewake(&ep->poll_wait); //如果隊列不為空,喚醒 epoll_wait所在進程  ......... /*其他代碼*/}epoll_wait內核代碼里主要是調用ep_poll,列出ep_poll部分代碼片段:static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,      int maxevents, long timeout){  int res, eavail;  unsigned long flags;  long jtimeout;  wait_queue_t wait;  ......... /*其他代碼*/    if (list_empty(&ep->rdllist)) {        init_waitqueue_entry(&wait, current);    wait.flags |= WQ_FLAG_EXCLUSIVE;    __add_wait_queue(&ep->wq, &wait);      // 如果檢測到就緒隊列為空,添加當前進程到等待隊列,并執行否循環    for (;;) {            set_current_state(TASK_INTERRUPTIBLE);      if (!list_empty(&ep->rdllist) || !jtimeout)  //如果就緒隊列不為空,或者超時則退出循環        break;      if (signal_pending(current)) { //如果信號中斷,退出循環        res = -EINTR;        break;      }      spin_unlock_irqrestore(&ep->lock, flags);      jtimeout = schedule_timeout(jtimeout);//睡眠,知道被喚醒或者超時為止。      spin_lock_irqsave(&ep->lock, flags);    }    __remove_wait_queue(&ep->wq, &wait);    set_current_state(TASK_RUNNING);  }    ......... /*其他代碼*/    if (!res && eavail &&    !(res = ep_send_events(ep, events, maxevents)) && jtimeout)    goto retry;   // ep_send_events主要任務是把就緒隊列的就緒描述符copy到用戶空間的 epoll_event數組里,  return res;}

可以看到 ep_poll既epoll_wait的循環是相當輕松的循環,他只是簡單檢測就緒隊列而已,因此他的開銷很小。

我們最后看看描述符就緒時候,是如何通知給select/poll/epoll的,以網絡套接字的TCP協議來進行說明。

tcp協議對應的 poll回調是tcp_poll, 對應的等待隊列頭是 struct sock結構里 sk_sleep成員,
在tcp_poll中會把 sk_sleep加入到等待隊列,等待數據就緒。

當物理網卡接收到數據包,引發硬件中斷,驅動在中斷ISR例程里,構建skb包,把數據copy進skb,接著調用netif_rx
把skb掛載到CPU相關的 input_pkt_queue隊列,同時引發軟中斷,在軟中斷的net_rx_action回調函數里從input_pkt_queue里取出
skb數據包,通過分析,調用協議相關的回調函數,這樣層層傳遞,一直到struct sock,此結構里的 sk_data_ready回調指針被調用
sk_data_ready指向 sock_def_readable 函數,sock_def_readable函數其實就是 wake_up 喚醒 sock結構里 的 sk_sleep。
以上機制,對 select/poll/epoll都是一樣的,接下來喚醒 sk_sleep方式就不一樣了,因為他們指向了不同的回調函數。
在 select/poll實現中,等待隊列回調函數是 pollwake其實就是調用default_wake_function,喚醒被select阻塞住的進程。
epoll實現中,等待回調函數是 ep_poll_callback, 此回調函數只是把就緒描述符加入到epoll的就緒隊列里。

所以呢 select/poll/epoll其實他們在內核實現中,差別也不是太大,其實都差不多。
epoll雖然效率不錯,可以跟windows平臺中的完成端口比美,但是移植性太差,
目前幾乎就只有linux平臺才實現了epoll而且必須是2.6以上的內核版本。

發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表

圖片精選

主站蜘蛛池模板: 亚洲清色 | 国产一区二区高清视频 | 中文字幕免费在线 | 久久精品亚洲精品 | 亚洲精品国产setv | 日日摸夜夜添夜夜添特色大片 | 中文字字幕一区二区三区四区五区 | 99国产精品99久久久久久 | 亚洲不卡视频 | 国产一区二区三区四区五区 | 久久夜视频 | 欧美久久久久久久久久伊人 | 在线视频亚洲 | www.五月婷婷 | 欧美电影一区二区三区 | 色综合天天综合网国产成人网 | 国产一区二区三区四区视频 | 黄a在线观看 | 亚洲成人一区二区 | 亚洲 精品 综合 精品 自拍 | 亚洲成a| 国产精品成人在线观看 | 在线观看你懂的网站 | 亚洲国产精品视频 | 日本一级中文字幕久久久久久 | 亚洲人人舔人人 | 日韩欧美一区二区三区 | 一区二区三区精品 | 久久噜噜噜精品国产亚洲综合 | 欧美日韩一 | 久久精品国产77777蜜臀 | 国产视频一区二区在线观看 | 日本久久精品视频 | 国产免费高清 | 亚洲成人网络 | 国产精品久久久久aaaa | 91久久精品国产 | 99国内精品久久久久久久 | 国产精品成人一区二区 | 91丁香| 亚洲一区二区在线 |