Skip to content

Select

select

IO操作两个阶段

  1. 等待数据准备就绪
  2. 将数据从内核拷贝到进程中

IO模型

同步阻塞

img

同步非阻塞

img

IO多路复用

img

异步IO

img

事件阻塞唤醒机制

进程的睡眠和唤醒动作都是在文件对应的设备驱动程序的read和write中实现的。其核心是一个叫做“等待队列”的数据结构。

等待队列

等待队列基于struct list_head实现,包含一个头结点和若干个队列元素。

struct __wait_queue_head {
    spinlock_t lock;
    struct list_head task_list;
};

头结点包含一个指向第一个队列元素的指针和一个保护队列的自旋锁:当插入(add_wait_queue)新元素或删除(remove_wait_queue)旧元素时,均使用自旋锁保证同步。

struct __wait_queue {
    unsigned int flags;
    void *private;
    wait_queue_func_t func;
    struct list_head task_list;
};

等待队列中的每个元素包含一个回调函数func,在唤醒该元素时调用。其中,private指针指向的是进程的task_struct结构,这样在唤醒时才知道唤醒哪个进程。元素与元素间使用list_head连成双向链表。

睡眠

  1. 判断条件是否满足,若满足则无需睡眠
  2. 否则,定义一个新的wait_queue_t队列元素,插入到wq表示的等待队列中去
  3. 设置进程状态
  4. 调用schedule(),让出CPU,调度其它进程
int tcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len, int nonblock,
        int flags, int *addr_len)
{
    ......  
    // copied是指向用户空间拷贝了多少字节,即读了多少
    int copied;
    // target指的是期望多少字节
    int target;
    // 等效为timo = noblock ? 0 : sk->sk_rcvtimeo;
    timeo = sock_rcvtimeo(sk, nonblock);
    ......  
    // 如果设置了MSG_WAITALL标识target=需要读的长度
    // 如果未设置,则为最低低水位值
    target = sock_rcvlowat(sk, flags & MSG_WAITALL, len);
    ......

    do{
        // 表明读到数据
        if (copied) {
            // 注意,这边只要!timeo,即nonblock设置了就会跳出循环
            if (sk->sk_err ||
                sk->sk_state == TCP_CLOSE ||
                (sk->sk_shutdown & RCV_SHUTDOWN) ||
                !timeo ||
                signal_pending(current))
            break;
        }else{
            // 到这里,表明没有读到任何数据
            // 且nonblock设置了导致timeo=0,则返回-EAGAIN,符合我们的预期
            if (!timeo) {
                copied = -EAGAIN;
                break;
        }
        // 读到了期望的数据,继续,否则当前进程阻塞在sk_wait_data上
        if (copied >= target) {
            /* Do not sleep, just process backlog. */
            release_sock(sk);
            lock_sock(sk);
        } else
            sk_wait_data(sk, &timeo);
    } while (len > 0);      
    ......
    return copied
}

int sk_wait_data(struct sock *sk, long *timeo, const struct sk_buff *skb)
{
    DEFINE_WAIT_FUNC(wait, woken_wake_function);
    int rc;
    // 将当前进程加入到等待队列
    add_wait_queue(sk_sleep(sk), &wait);
    sk_set_bit(SOCKWQ_ASYNC_WAITDATA, sk);
    // 通过调用schedule_timeout让出CPU,然后进行睡眠
    rc = sk_wait_event(sk, timeo, skb_peek_tail(&sk->sk_receive_queue) != skb, &wait);
    // 到这里的时候,有网络事件或超时事件唤醒了此进程,继续运行
    sk_clear_bit(SOCKWQ_ASYNC_WAITDATA, sk);
    remove_wait_queue(sk_sleep(sk), &wait);
    return rc;
}

signed long __sched schedule_timeout(signed long timeout) {
    ......
    // 设定超时的回掉函数为process_timeout
    timer.task = current;
    timer_setup_on_stack(&timer.timer, process_timeout, 0);
    __mod_timer(&timer, expire);
    // 这边让出CPU
    schedule();
    del_singleshot_timer_sync(&timer);
    destroy_timer_on_stack(&timer.timer);
    timeout = expire - jiffies;
 out:
    // 返回经过了多长事件
    return timeout < 0 ? 0 : timeout;   
}

唤醒

  1. 有对应的网络数据到来
tcp_v4_rcv
    |-tcp_v4_do_rcv
        |-tcp_rcv_state_process
            |-tcp_data_queue
                |-sk->sk_data_ready=sock_def_readable
                    |-wake_up_interruptible_sync_poll
                            |-__wake_up_common

static int __wake_up_common(struct wait_queue_head *wq_head, unsigned int mode,
            int nr_exclusive, int wake_flags, void *key,
            wait_queue_entry_t *bookmark)
{
    wait_queue_entry_t *curr, *next;
    int cnt = 0;

    if (bookmark && (bookmark->flags & WQ_FLAG_BOOKMARK)) {
        curr = list_next_entry(bookmark, entry);

        list_del(&bookmark->entry);
        bookmark->flags = 0;
    } else
        // 获取等待队列的第一个节点
        curr = list_first_entry(&wq_head->head, wait_queue_entry_t, entry);

    if (&curr->entry == &wq_head->head)
        return nr_exclusive;
    // 变量等待队列
    list_for_each_entry_safe_from(curr, next, &wq_head->head, entry) {
        unsigned flags = curr->flags;
        int ret;

        if (flags & WQ_FLAG_BOOKMARK)
            continue;
        // 调用节点的回调函数woken_wake_function,唤醒进程
        ret = curr->func(curr, mode, wake_flags, key);
        if (ret < 0)
            break;
    }

    return nr_exclusive;
}
  1. 设定的超时时间到来
static void process_timeout(struct timer_list *t)
{
    struct process_timer *timeout = from_timer(timeout, t, timer);
    // 唤醒线程
    wake_up_process(timeout->task);
}

select实现

同时监听多个socket,当其中一个或多个有事件发生时,尝试去读取或接收数据;没有事件发生时,进程休眠,等待其中任意一个socket有事件发生被唤醒。

以读事件为例,设计简单的select实现

count=0
FD_ZERO(&res_rset)
for fd in read_set
    if( readable(fd) )
        count++
        FDSET(fd, &res_rset)
        break
    else
        add_to_wait_queue

if count > 0
    return count
else
    wait_any_event

return count

问题:

  1. 如何判断文件可读?即如何实现readable(fd)
  2. 如何将进程加入到文件的读等待队列而不让程序进入睡眠?

poll文件操作

在Linux中规定每一个支持select/poll监听的文件所属设备驱动必须实现struct file_operations中的poll函数。

poll函数的工作有两部分:

  1. 判断当前文件状态,并在返回值中标记。
  2. 对本驱动程序的等待队列调用poll_wait函数。
__poll_t tcp_poll(struct file *file, struct socket *sock, poll_table *wait)
{
    __poll_t mask;
    struct sock *sk = sock->sk;
    const struct tcp_sock *tp = tcp_sk(sk);
    int state;

    sock_poll_wait(file, sk_sleep(sk), wait);

    state = inet_sk_state_load(sk);
    if (state == TCP_LISTEN)
        return inet_csk_listen_poll(sk);

    mask = 0;
    if (sk->sk_shutdown == SHUTDOWN_MASK || state == TCP_CLOSE)
        mask |= EPOLLHUP;
    if (sk->sk_shutdown & RCV_SHUTDOWN)
        mask |= EPOLLIN | EPOLLRDNORM | EPOLLRDHUP;

    /* Connected or passive Fast Open socket? */
    if (state != TCP_SYN_SENT &&
        (state != TCP_SYN_RECV || tp->fastopen_rsk)) {
        int target = sock_rcvlowat(sk, 0, INT_MAX);

        if (tp->urg_seq == tp->copied_seq &&
            !sock_flag(sk, SOCK_URGINLINE) &&
            tp->urg_data)
            target++;

        if (tcp_stream_is_readable(tp, target, sk))
            mask |= EPOLLIN | EPOLLRDNORM;

        if (!(sk->sk_shutdown & SEND_SHUTDOWN)) {
            if (sk_stream_is_writeable(sk)) {
                mask |= EPOLLOUT | EPOLLWRNORM;
            } else {  /* send SIGIO later */
                sk_set_bit(SOCKWQ_ASYNC_NOSPACE, sk);
                set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
                smp_mb__after_atomic();
                if (sk_stream_is_writeable(sk))
                    mask |= EPOLLOUT | EPOLLWRNORM;
            }
        } else
            mask |= EPOLLOUT | EPOLLWRNORM;

        if (tp->urg_data & TCP_URG_VALID)
            mask |= EPOLLPRI;
    } else if (state == TCP_SYN_SENT && inet_sk(sk)->defer_connect) {
        /* Active TCP fastopen socket with defer_connect
         * Return EPOLLOUT so application can call write()
         * in order for kernel to generate SYN+data
         */
        mask |= EPOLLOUT | EPOLLWRNORM;
    }
    /* This barrier is coupled with smp_wmb() in tcp_reset() */
    smp_rmb();
    if (sk->sk_err || !skb_queue_empty(&sk->sk_error_queue))
        mask |= EPOLLERR;

    return mask;
}

static inline void sock_poll_wait(struct file *filp,
        wait_queue_head_t *wait_address, poll_table *p)
{
    if (!poll_does_not_wait(p) && wait_address) {
        poll_wait(filp, wait_address, p);
        /* We need to be sure we are in sync with the
         * socket flags modification.
         *
         * This memory barrier is paired in the wq_has_sleeper.
         */
        smp_mb();
    }
}

// 调用了poll_table的一个函数指针,poll_table是在调用文件的poll函数时传入的
static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p)
{
    if (p && p->_qproc && wait_address)
        p->_qproc(filp, wait_address, p);
}

select函数中的poll_wait回调函数实现

static void __pollwait(struct file *filp, wait_queue_head_t *wait_address,
                poll_table *p)
{
    struct poll_wqueues *pwq = container_of(p, struct poll_wqueues, pt);
    struct poll_table_entry *entry = poll_get_entry(pwq);
    if (!entry)
        return;
    entry->filp = get_file(filp);
    entry->wait_address = wait_address;
    entry->key = p->_key;
    // 设置唤醒回调函数
    init_waitqueue_func_entry(&entry->wait, pollwake);
    entry->wait.private = pwq;
    // 将当前进程信息加入到文件等待队列
    add_wait_queue(wait_address, &entry->wait);
}

select实现

select为管理所有监听的文件,为每个文件分配了一个poll_table_entry结构。poll_table_entry包含文件信息、等待队列头以及等待队列元素。

struct poll_wqueues {
    poll_table pt;
    struct poll_table_page *table;
    struct task_struct *polling_task;/* 指向睡眠进程的task_struct */
    int triggered;
    int error;
    int inline_index; 
    struct poll_table_entry inline_entries[N_INLINE_POLL_ENTRIES];
};
//包含文件信息、等待队列头以及等待队列元素
struct poll_table_entry {
    struct file *filp;
    __poll_t key;
    wait_queue_entry_t wait;
    wait_queue_head_t *wait_address;
};

跟踪kern_select的内核实现,其大致过程如下:

  1. 拷贝用户空间的timeout对象到内核空间end_time,并重新设定时间值
  2. 调用core_sys_select。过程如下:
  3. 根据传入的maxfd值,计算保存所有fd需要多少字节(每fd占1bit),然后判断是在栈上分配内存还是在堆中分配内存。共需要分配6个fdset:用户传入的in, out, exception以及要返回给用户的res_in,res_out和res_exception;
  4. 将3个输入fdset从用户空间拷贝到内核空间,并初始化输出的fdset为0;
  5. 调用do_select,获得返回值ret。do_select的工作就是初始化poll_wqueues对象,并调用驱动程序的poll函数。类似于我们写的简单的select。过程如下所示:
    1. 调用poll_initwait初始化poll_wqueues对象table,包括其成员poll_table;
    2. 如果用户传入的timeout不为NULL,但是设定的时间为0,那么设置poll_table指针wait(即 &table.pt)为NULL;
    3. 将in,out和exception进行或运算,得到all_bits,然后遍历all_bits中bit为1的fd,根据进程的fd_table查找到file指针filp,然后设置wait的key值(POLLEX_SET, POLLIN_SET,POLLIN_SET三者的或运算,取决于用户输入),并调用filp->poll(filp, wait),获得返回值mask。 再根据mask值检查该文件是否立即满足条件,如果满足,设置res_in/res_out/res_exception的值,执行retval++, 并设置wait为NULL。
    4. 在每遍历32(取决于long型整数的位数)个文件后,调用1次cond_resched(),主动寻求调度,可以等待已经遍历过的文件是否有唤醒的;
    5. 在遍历完所有文件之后,设置wait为NULL,并检查是否有满足条件的文件(retval值是否为0),或者是否超时,或者是否有未决信号,如果有那么直接跳出循环,进入步骤7;
    6. 否则调用poll_schedule_timeout,使进程进入睡眠,直到超时(如果未设置超时,那么是直接调用的schedule())。如果是超时后进程继续执行,那么设置pwq->triggered为0;如果是被文件对应的驱动程序唤醒的,那么pwq->triggered被设置为1;
    7. 最终,函数调用poll_freewait,将本进程从所有文件的等待队列中删掉,并删除分配的poll_table_page对象,回收内存,并返回retval值。
  6. 拷贝res_in, res_out和res_exception到传入的in, out, exception,并返回ret。
  7. 调用poll_select_copy_remaining,将剩余的timeout时间拷贝回用户空间。

select的问题

  1. 可以同时监听的文件数量有限,最多1024个;
  2. 每次调用select,都需要从0bit一直遍历到最大的fd。
  3. 内存复制开销。需要在用户空间和内核空间来回拷贝fd_set,并且要为每个fd分配一个poll_table_entry对象。