之前用基于dpdk 实现小包快速转发的时候有用到无锁队列!今天就来看看吧!(后续完成了去dpdk化,直接在内核完成快速转发功能)
dpdk的无锁队列ring是借鉴了linux内核kfifo无锁队列。ring的实质是FIFO的环形队列。
先进先出(FIFO)
最大大小固定,指针存储在表中
无锁实现
多消费者或单消费者出队操作
多生产者或单生产者入队操作
批量出队 – 如果成功,将指定数量的元素出队,否则什么也不做
批量入队 – 如果成功,将指定数量的元素入队,否则什么也不做
突发出队 – 如果指定的数目出队失败,则将最大可用数目对象出队
突发入队 – 如果指定的数目入队失败,则将最大可入队数目对象入队
相比于链表,这个数据结构的优点如下:
更快;只需要一个sizeof(void *)的Compare-And-Swap指令,而不是多个双重比较和交换指令
与完全无锁队列像是
适应批量入队/出队操作。 因为指针是存储在表中的,应i多个对象的出队将不会产生于链表队列中一样多的cache miss。 此外,批量出队成本并不比单个对象出队高。
缺点:
大小固定
大量ring相比于链表,消耗更多的内存,空ring至少包含n个指针。
/* structure to hold a pair of head/tail values and other metadata */ struct rte_ring_headtail { // 生产者头尾指针,生产完成后都指向队尾 // 消费者头尾指针,生产完成后都指向队头 volatile uint32_t head; /**< Prod/consumer head.预生产到地方/预出队的地方 */ volatile uint32_t tail; /**< Prod/consumer tail. 实际生产了的数量 /实际出队的地方 */ uint32_t single; /**< True if single prod/cons */ }; struct rte_ring { /* * Note: this field kept the RTE_MEMZONE_NAMESIZE size due to ABI * compatibility requirements, it could be changed to RTE_RING_NAMESIZE * next time the ABI changes */ char name[RTE_MEMZONE_NAMESIZE] __rte_cache_aligned; /**< Name of the ring. */ int flags; /**< Flags supplied at creation. */ const struct rte_memzone *memzone; /**< Memzone, if any, containing the rte_ring */ uint32_t size; /**< Size of ring. */ uint32_t mask; /**< Mask (size-1) of ring. */ uint32_t capacity; /**< Usable size of ring */ char pad0 __rte_cache_aligned; /**< empty cache line */ /** Ring producer status. */ struct rte_ring_headtail prod __rte_cache_aligned; char pad1 __rte_cache_aligned; /**< empty cache line */ /** Ring consumer status. */ struct rte_ring_headtail cons __rte_cache_aligned; char pad2 __rte_cache_aligned; /**< empty cache line */ };
入队列:
http://reader.epubee.com/books/mobile/54/54aa973816d258a932e39464018932ee/text00032.html 以上来自~~~~~~~~~~~~~~
static __rte_always_inline unsigned int __rte_ring_do_enqueue(struct rte_ring *r, void * const *obj_table, unsigned int n, enum rte_ring_queue_behavior behavior, unsigned int is_sp, unsigned int *free_space) { uint32_t prod_head, prod_next; uint32_t free_entries; n = __rte_ring_move_prod_head(r, is_sp, n, behavior, &prod_head, &prod_next, &free_entries); if (n == 0) goto end; //prod_head是旧的r->prod.head //r经过__rte_ring_move_prod_head处理后,r->prod.head已经移动到想要的位置&r[1]是数据的位置 ENQUEUE_PTRS(r, &r[1], prod_head, obj_table, n, void *); update_tail(&r->prod, prod_head, prod_next, is_sp, 1); end: if (free_space != NULL) *free_space = free_entries - n; return n; } static __rte_always_inline unsigned int __rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp, unsigned int n, enum rte_ring_queue_behavior behavior, uint32_t *old_head, uint32_t *new_head, uint32_t *free_entries) { const uint32_t capacity = r->capacity; unsigned int max = n; int success; do { /* Reset n to the initial burst count */ n = max; *old_head = r->prod.head; /* add rmb barrier to avoid load/load reorder in weak * memory model. It is noop on x86 */ rte_smp_rmb(); /* * The subtraction is done between two unsigned 32bits value * (the result is always modulo 32 bits even if we have * *old_head > cons_tail). So 'free_entries' is always between 0 * and capacity (which is < size). 计算当前可用容量, cons.tail是小于等于prod.head, 所以r->cons.tail - *old_head得到一个 负数,capacity减这个差值就得到剩余的容量 */ *free_entries = (capacity + r->cons.tail - *old_head); /* check that we have enough room in ring */ if (unlikely(n > *free_entries)) n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *free_entries; if (n == 0) return 0; *new_head = *old_head + n; /* 新头的位置 */ if (is_sp) {/* 如果是单生产者,直接更新r->prod.head即可,不需要加锁 */ r->prod.head = *new_head, success = 1; }else{ /* 如果是多生产者,需要使用cmpset比较,如果&r->prod.head == *old_head 则&r->prod.head = *new_head 否则重新循环,获取新的*old_head = r->prod.head,知道成功位置*/ success = rte_atomic32_cmpset(&r->prod.head, *old_head, *new_head); } } while (unlikely(success == 0)); return n; }
出队:
原理逻辑和入队一样 代码也比较相似,不具体分析
static __rte_always_inline unsigned int __rte_ring_do_dequeue(struct rte_ring *r, void **obj_table, unsigned int n, enum rte_ring_queue_behavior behavior, unsigned int is_sc, unsigned int *available) { uint32_t cons_head, cons_next; uint32_t entries; n = __rte_ring_move_cons_head(r, (int)is_sc, n, behavior, &cons_head, &cons_next, &entries); if (n == 0) goto end; DEQUEUE_PTRS(r, &r[1], cons_head, obj_table, n, void *); update_tail(&r->cons, cons_head, cons_next, is_sc, 0); end: if (available != NULL) *available = entries - n; return n; } static __rte_always_inline unsigned int __rte_ring_move_cons_head(struct rte_ring *r, unsigned int is_sc, unsigned int n, enum rte_ring_queue_behavior behavior, uint32_t *old_head, uint32_t *new_head, uint32_t *entries) { unsigned int max = n; int success; /* move cons.head atomically */ do { /* Restore n as it may change every loop */ n = max; *old_head = r->cons.head; /* add rmb barrier to avoid load/load reorder in weak * memory model. It is noop on x86 */ rte_smp_rmb(); /* The subtraction is done between two unsigned 32bits value * (the result is always modulo 32 bits even if we have * cons_head > prod_tail). So 'entries' is always between 0 * and size(ring)-1. */ *entries = (r->prod.tail - *old_head); /* Set the actual entries for dequeue */ if (n > *entries) n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries; if (unlikely(n == 0)) return 0; *new_head = *old_head + n; if (is_sc) r->cons.head = *new_head, success = 1; else success = rte_atomic32_cmpset(&r->cons.head, *old_head, *new_head); } while (unlikely(success == 0)); return n; }
最新评论