|
|
UnBounded Queue: 並行処理編 Part 2ソースはGitHubに移行しました。 CAS based Lock-Free Queue
CASを使ったLock-Free Queueの論文"Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms"で提案されたアルゴリズムを実装。
例えばLock-Free Hashでスレッド毎に2つのhashテーブルを必要とするアルゴリズムがあったりする。 上限10スレッドなら20のhashテーブルがメモリ上に予め確保されて、複雑怪奇なアルゴリズムが動くわけである。CASによるLock-FreeなアルゴリズムはABA問題がつきまとう。 この実装ではアドレスは32bitと仮定し、アドレス(32bit)とラベル(uint_t:32bit)をペアにして64bitのデータとして扱うことで ABA問題を避けている。これはCASによるLock-Freeアルゴリズム実装の常套手段の一つのようだ。 X86_64環境のインラインアセンブラはまだ不慣れなため、今のところ32-bit版Linuxディストリビューション上でのみ動作保証。 Intel64(X86_64)環境は近々に対応したいと考えている。 ソースはGitHubに移行しました。 データ構造データ構造はほぼ論文通り。 typedef struct _pointer_t { unsigned int count; struct _node_t *ptr; }__attribute__((packed)) pointer_t; typedef struct _node_t { val_t val; pointer_t next; }__attribute__((packed)) node_t; typedef struct _queue_t { pointer_t head; pointer_t tail; } queue_t; 基本関数Lock-Freeのためのプリミティブほとんど素のcmpxchg8b。 inline bool_t cas64(pointer_t * addr, const pointer_t oldp, const pointer_t newp) { char result; __asm__ __volatile__("lock; cmpxchg8b %0; setz %1":"=m"(*addr), "=q"(result) :"m"(*addr), "a"(oldp.count), "d"(oldp.ptr), "b"(newp.count), "c"(newp.ptr) :"memory"); return (((int) result != 0) ? true : false); } queueの生成queue_t *init_queue(void) { queue_t *q; node_t *node; if ((q = (queue_t *) calloc(1, sizeof(queue_t))) == NULL) { elog("calloc error"); return NULL; } if ((node = create_node((val_t)NULL)) == NULL) { elog("create_node() error"); abort(); } q->head.ptr = node; q->tail.ptr = node; return q; } void free_queue(queue_t * q) { free(q); } enqueuebool_t enq(queue_t * q, const val_t val) { node_t *newNode; pointer_t tail, next, tmp; if ((newNode = create_node(val)) == NULL) return false; while (1) { tail = q->tail; next = tail.ptr->next; if (tail.count == q->tail.count && tail.ptr == q->tail.ptr) { if (next.ptr == NULL) { tmp.ptr = newNode; tmp.count = next.count + 1; if (cas64(&tail.ptr->next, next, tmp) == true) { break; } } else { tmp.ptr = next.ptr; tmp.count = tail.count + 1; cas64(&q->tail, tail, tmp); } } } tmp.ptr = newNode; tmp.count = tail.count + 1; cas64(&q->tail, tail, tmp); return true; } static node_t *create_node(const val_t val) { node_t *node; if ((node = (node_t *) calloc(1, sizeof(node_t))) == NULL) { elog("calloc error"); return NULL; } node->val = val; node->next.ptr = NULL; node->next.count = 0; return node; } dequeuebool_t deq(queue_t * q, val_t * val) { pointer_t head, tail, next, tmp; while (1) { head = q->head; tail = q->tail; next = head.ptr->next; if (head.count == q->head.count && head.ptr == q->head.ptr) { if (head.ptr == tail.ptr) { if (next.ptr == NULL) { return false; } tmp.ptr = next.ptr; tmp.count = head.count + 1; cas64(&q->tail, tail, tmp); } else { *val = next.ptr->val; tmp.ptr = next.ptr; tmp.count = head.count + 1; if (cas64(&q->head, head, tmp) == true) { break; } } } } free_node (head.ptr); return true; } 実行ソースはGitHubに移行しました。
Last-modified: 2014-7-6
|