之前用基于dpdk 实现小包快速转发的时候有用到无锁队列!今天就来看看吧!(后续完成了去dpdk化,直接在内核完成快速转发功能)

 dpdk的无锁队列ring是借鉴了linux内核kfifo无锁队列。ring的实质是FIFO的环形队列。

先进先出(FIFO)
最大大小固定,指针存储在表中
无锁实现
多消费者或单消费者出队操作
多生产者或单生产者入队操作
批量出队 – 如果成功,将指定数量的元素出队,否则什么也不做
批量入队 – 如果成功,将指定数量的元素入队,否则什么也不做
突发出队 – 如果指定的数目出队失败,则将最大可用数目对象出队
突发入队 – 如果指定的数目入队失败,则将最大可入队数目对象入队

相比于链表,这个数据结构的优点如下:

更快;只需要一个sizeof(void *)的Compare-And-Swap指令,而不是多个双重比较和交换指令
与完全无锁队列像是
适应批量入队/出队操作。 因为指针是存储在表中的,应i多个对象的出队将不会产生于链表队列中一样多的cache miss。 此外,批量出队成本并不比单个对象出队高。

缺点:

大小固定
大量ring相比于链表,消耗更多的内存,空ring至少包含n个指针。

/* structure to hold a pair of head/tail values and other metadata */
struct rte_ring_headtail {
    // 生产者头尾指针,生产完成后都指向队尾
     // 消费者头尾指针,生产完成后都指向队头
    volatile uint32_t head;  /**< Prod/consumer head.预生产到地方/预出队的地方 */
    volatile uint32_t tail;  /**< Prod/consumer tail. 实际生产了的数量 /实际出队的地方 */
    uint32_t single;         /**< True if single prod/cons */
};
struct rte_ring {
    /*
     * Note: this field kept the RTE_MEMZONE_NAMESIZE size due to ABI
     * compatibility requirements, it could be changed to RTE_RING_NAMESIZE
     * next time the ABI changes
     */
    char name[RTE_MEMZONE_NAMESIZE] __rte_cache_aligned; /**< Name of the ring. */
    int flags;               /**< Flags supplied at creation. */
    const struct rte_memzone *memzone;
            /**< Memzone, if any, containing the rte_ring */
    uint32_t size;           /**< Size of ring. */
    uint32_t mask;           /**< Mask (size-1) of ring. */
    uint32_t capacity;       /**< Usable size of ring */

    char pad0 __rte_cache_aligned; /**< empty cache line */

    /** Ring producer status. */
    struct rte_ring_headtail prod __rte_cache_aligned;
    char pad1 __rte_cache_aligned; /**< empty cache line */

    /** Ring consumer status. */
    struct rte_ring_headtail cons __rte_cache_aligned;
    char pad2 __rte_cache_aligned; /**< empty cache line */
};

入队列:

聊一聊无锁队列rte_ring  转载-风君雪科技博客聊一聊无锁队列rte_ring  转载-风君雪科技博客

 聊一聊无锁队列rte_ring  转载-风君雪科技博客聊一聊无锁队列rte_ring  转载-风君雪科技博客

 聊一聊无锁队列rte_ring  转载-风君雪科技博客

 http://reader.epubee.com/books/mobile/54/54aa973816d258a932e39464018932ee/text00032.html  以上来自~~~~~~~~~~~~~~

static __rte_always_inline unsigned int
__rte_ring_do_enqueue(struct rte_ring *r, void * const *obj_table,
         unsigned int n, enum rte_ring_queue_behavior behavior,
         unsigned int is_sp, unsigned int *free_space)
{
    uint32_t prod_head, prod_next;
    uint32_t free_entries;

    n = __rte_ring_move_prod_head(r, is_sp, n, behavior,
            &prod_head, &prod_next, &free_entries);
    if (n == 0)
        goto end;
//prod_head是旧的r->prod.head
//r经过__rte_ring_move_prod_head处理后,r->prod.head已经移动到想要的位置&r[1]是数据的位置
    ENQUEUE_PTRS(r, &r[1], prod_head, obj_table, n, void *);

    update_tail(&r->prod, prod_head, prod_next, is_sp, 1);
end:
    if (free_space != NULL)
        *free_space = free_entries - n;
    return n;
}



static __rte_always_inline unsigned int
__rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp,
        unsigned int n, enum rte_ring_queue_behavior behavior,
        uint32_t *old_head, uint32_t *new_head,
        uint32_t *free_entries)
{
    const uint32_t capacity = r->capacity;
    unsigned int max = n;
    int success;

    do {
        /* Reset n to the initial burst count */
        n = max;

        *old_head = r->prod.head;

        /* add rmb barrier to avoid load/load reorder in weak
         * memory model. It is noop on x86
         */
        rte_smp_rmb();

        /*
         *  The subtraction is done between two unsigned 32bits value
         * (the result is always modulo 32 bits even if we have
         * *old_head > cons_tail). So 'free_entries' is always between 0
         * and capacity (which is < size).
          计算当前可用容量,
           cons.tail是小于等于prod.head, 所以r->cons.tail - *old_head得到一个
           负数,capacity减这个差值就得到剩余的容量 
         */
        *free_entries = (capacity + r->cons.tail - *old_head);

        /* check that we have enough room in ring */
        if (unlikely(n > *free_entries))
            n = (behavior == RTE_RING_QUEUE_FIXED) ?
                    0 : *free_entries;

        if (n == 0)
            return 0;

        *new_head = *old_head + n; /* 新头的位置 */
        if (is_sp)  {/* 如果是单生产者,直接更新r->prod.head即可,不需要加锁 */
            r->prod.head = *new_head, success = 1;
        }else{
            /* 如果是多生产者,需要使用cmpset比较,如果&r->prod.head == *old_head
           则&r->prod.head = *new_head
           否则重新循环,获取新的*old_head = r->prod.head,知道成功位置*/
            success = rte_atomic32_cmpset(&r->prod.head, *old_head, *new_head);
        }
    } while (unlikely(success == 0));
    return n;
}

出队:

原理逻辑和入队一样 代码也比较相似,不具体分析

static __rte_always_inline unsigned int
__rte_ring_do_dequeue(struct rte_ring *r, void **obj_table,
         unsigned int n, enum rte_ring_queue_behavior behavior,
         unsigned int is_sc, unsigned int *available)
{
    uint32_t cons_head, cons_next;
    uint32_t entries;

    n = __rte_ring_move_cons_head(r, (int)is_sc, n, behavior,
            &cons_head, &cons_next, &entries);
    if (n == 0)
        goto end;

    DEQUEUE_PTRS(r, &r[1], cons_head, obj_table, n, void *);

    update_tail(&r->cons, cons_head, cons_next, is_sc, 0);

end:
    if (available != NULL)
        *available = entries - n;
    return n;
}

static __rte_always_inline unsigned int
__rte_ring_move_cons_head(struct rte_ring *r, unsigned int is_sc,
        unsigned int n, enum rte_ring_queue_behavior behavior,
        uint32_t *old_head, uint32_t *new_head,
        uint32_t *entries)
{
    unsigned int max = n;
    int success;

    /* move cons.head atomically */
    do {
        /* Restore n as it may change every loop */
        n = max;

        *old_head = r->cons.head;

        /* add rmb barrier to avoid load/load reorder in weak
         * memory model. It is noop on x86
         */
        rte_smp_rmb();

        /* The subtraction is done between two unsigned 32bits value
         * (the result is always modulo 32 bits even if we have
         * cons_head > prod_tail). So 'entries' is always between 0
         * and size(ring)-1.
         */
        *entries = (r->prod.tail - *old_head);

        /* Set the actual entries for dequeue */
        if (n > *entries)
            n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;

        if (unlikely(n == 0))
            return 0;

        *new_head = *old_head + n;
        if (is_sc)
            r->cons.head = *new_head, success = 1;
        else
            success = rte_atomic32_cmpset(&r->cons.head, *old_head,
                    *new_head);
    } while (unlikely(success == 0));
    return n;
}

聊一聊无锁队列rte_ring  转载-风君雪科技博客