Skip to content

Epoll

epoll

epoll实现

  1. 引入了epoll_ctl系统调用,通过(EPOLL_CTL_ADD、EPOLL_CTL_MOD、EPOLL_CTL_DEL)三个操作来分散对需要监控的fds集合的修改,做到了有变化才变更,避免了大量的内存拷贝。
  2. 引入一个中间的wait_entry_sk,它与某个socket密切相关,wait_entry_sk睡眠在socket的睡眠队列上,其唤醒逻辑是将当前socket排入到epoll的ready_list中,并唤醒epoll的single_epoll_wait_list。由single_epoll_wait_list的唤醒逻辑遍历ready_list上的所有socket,挨个调用socket的poll函数收集事件,然后唤醒进程从epoll_wait返回。

监听唤醒逻辑

  1. socket监听 epoll_ctl EPOLL_CTL_ADD
1. 构建睡眠实体epitem,将当前socket关联给epitem,并设置epitem的回调函数为ep_ptable_queue_proc;
2. 将epitem排入当前socket的睡眠队列上。
  1. socket唤醒 ep_poll_callback
1. 将之前关联的socket排入epoll的ready_list;
2. 然后唤醒epoll的单独睡眠队列single_epoll_wait_list。
  1. epoll_wait
1. 构建睡眠实体wait_entry_proc,将当前process关联给wait_entry_proc,并设置回调函数为epoll_callback_proc;
2. 判断epoll的ready_list是否为空,如果为空,则将wait_entry_proc排入epoll的single_epoll_wait_list中,随后进入schedule循环,这会导致调用epoll_wait的process睡眠;
3. wait_entry_proc被事件唤醒或超时醒来,wait_entry_proc将被从single_epoll_wait_list移除掉,然后wait_entry_proc执行回调函数epoll_callback_proc。
  1. epoll唤醒
1. 遍历epoll的ready_list,挨个调用每个socket的poll逻辑收集发生的事件;
2. 将每个socket收集到的事件,通过epoll_wait传入的events数组回传。

实现

struct eventpoll {
    /* Protect the access to this structure */
    spinlock_t lock;
    struct mutex mtx;

    /* Wait queue used by sys_epoll_wait() */
    wait_queue_head_t wq;

    /* Wait queue used by file->poll() */
    wait_queue_head_t poll_wait;

    /* List of ready file descriptors */
    struct list_head rdllist;

    /* RB tree root used to store monitored fd structs */
    struct rb_root_cached rbr;
    struct epitem *ovflist;
    /* wakeup_source used when ep_scan_ready_list is running */
    struct wakeup_source *ws;
    /* The user that created the eventpoll descriptor */
    struct user_struct *user;
    struct file *file;
    int visited;
    struct list_head visited_list_link;
};
// socket文件poll回调函数
static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead,
                 poll_table *pt)
{
    struct epitem *epi = ep_item_from_epqueue(pt);
    struct eppoll_entry *pwq;

    if (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) {
        init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);
        pwq->whead = whead;
        pwq->base = epi;
        if (epi->event.events & EPOLLEXCLUSIVE)
            add_wait_queue_exclusive(whead, &pwq->wait);
        else
            add_wait_queue(whead, &pwq->wait);
        list_add_tail(&pwq->llink, &epi->pwqlist);
        epi->nwait++;
    } else {
        /* We have to signal that an error occurred */
        epi->nwait = -1;
    }
}
// socket事件唤醒回调函数
static int ep_poll_callback(wait_queue_entry_t *wait, unsigned mode, int sync, void *key)
{
    int pwake = 0;
    unsigned long flags;
    struct epitem *epi = ep_item_from_wait(wait);
    struct eventpoll *ep = epi->ep;
    __poll_t pollflags = key_to_poll(key);
    int ewake = 0;

    spin_lock_irqsave(&ep->lock, flags);

    ep_set_busy_poll_napi_id(epi);

    if (!(epi->event.events & ~EP_PRIVATE_BITS))
        goto out_unlock;

    if (pollflags && !(pollflags & epi->event.events))
        goto out_unlock;

    if (unlikely(ep->ovflist != EP_UNACTIVE_PTR)) {
        if (epi->next == EP_UNACTIVE_PTR) {
            epi->next = ep->ovflist;
            ep->ovflist = epi;
            if (epi->ws) {
                /*
                 * Activate ep->ws since epi->ws may get
                 * deactivated at any time.
                 */
                __pm_stay_awake(ep->ws);
            }

        }
        goto out_unlock;
    }

    /* If this file is already in the ready list we exit soon */
    if (!ep_is_linked(&epi->rdllink)) {
        list_add_tail(&epi->rdllink, &ep->rdllist);
        ep_pm_stay_awake_rcu(epi);
    }

    /*
     * Wake up ( if active ) both the eventpoll wait list and the ->poll()
     * wait list.
     */
    if (waitqueue_active(&ep->wq)) {
        if ((epi->event.events & EPOLLEXCLUSIVE) &&
                    !(pollflags & POLLFREE)) {
            switch (pollflags & EPOLLINOUT_BITS) {
            case EPOLLIN:
                if (epi->event.events & EPOLLIN)
                    ewake = 1;
                break;
            case EPOLLOUT:
                if (epi->event.events & EPOLLOUT)
                    ewake = 1;
                break;
            case 0:
                ewake = 1;
                break;
            }
        }
        wake_up_locked(&ep->wq);
    }
    if (waitqueue_active(&ep->poll_wait))
        pwake++;

out_unlock:
    spin_unlock_irqrestore(&ep->lock, flags);

    /* We have to call this outside the lock */
    if (pwake)
        ep_poll_safewake(&ep->poll_wait);

    if (!(epi->event.events & EPOLLEXCLUSIVE))
        ewake = 1;

    if (pollflags & POLLFREE) {
        list_del_init(&wait->entry);
        smp_store_release(&ep_pwq_from_wait(wait)->whead, NULL);
    }

    return ewake;
}
// epoll_wait
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
           int maxevents, long timeout)
{
    int res = 0, eavail, timed_out = 0;
    unsigned long flags;
    u64 slack = 0;
    wait_queue_entry_t wait;
    ktime_t expires, *to = NULL;

    if (timeout > 0) {
        struct timespec64 end_time = ep_set_mstimeout(timeout);

        slack = select_estimate_accuracy(&end_time);
        to = &expires;
        *to = timespec64_to_ktime(end_time);
    } else if (timeout == 0) {
        /*
         * Avoid the unnecessary trip to the wait queue loop, if the
         * caller specified a non blocking operation.
         */
        timed_out = 1;
        spin_lock_irqsave(&ep->lock, flags);
        goto check_events;
    }

fetch_events:

    if (!ep_events_available(ep))
        ep_busy_loop(ep, timed_out);

    spin_lock_irqsave(&ep->lock, flags);

    if (!ep_events_available(ep)) {
        /*
         * Busy poll timed out.  Drop NAPI ID for now, we can add
         * it back in when we have moved a socket with a valid NAPI
         * ID onto the ready list.
         */
        ep_reset_busy_poll_napi_id(ep);

        /*
         * We don't have any available event to return to the caller.
         * We need to sleep here, and we will be wake up by
         * ep_poll_callback() when events will become available.
         */
        init_waitqueue_entry(&wait, current);
        __add_wait_queue_exclusive(&ep->wq, &wait);

        for (;;) {
            /*
             * We don't want to sleep if the ep_poll_callback() sends us
             * a wakeup in between. That's why we set the task state
             * to TASK_INTERRUPTIBLE before doing the checks.
             */
            set_current_state(TASK_INTERRUPTIBLE);
            /*
             * Always short-circuit for fatal signals to allow
             * threads to make a timely exit without the chance of
             * finding more events available and fetching
             * repeatedly.
             */
            if (fatal_signal_pending(current)) {
                res = -EINTR;
                break;
            }
            if (ep_events_available(ep) || timed_out)
                break;
            if (signal_pending(current)) {
                res = -EINTR;
                break;
            }

            spin_unlock_irqrestore(&ep->lock, flags);
            if (!schedule_hrtimeout_range(to, slack, HRTIMER_MODE_ABS))
                timed_out = 1;

            spin_lock_irqsave(&ep->lock, flags);
        }

        __remove_wait_queue(&ep->wq, &wait);
        __set_current_state(TASK_RUNNING);
    }
check_events:
    /* Is it worth to try to dig for events ? */
    eavail = ep_events_available(ep);

    spin_unlock_irqrestore(&ep->lock, flags);

    /*
     * Try to transfer events to user space. In case we get 0 events and
     * there's still timeout left over, we go trying again in search of
     * more luck.
     */
    if (!res && eavail &&
        !(res = ep_send_events(ep, events, maxevents)) && !timed_out)
        goto fetch_events;

    return res;
}