|
|
UnBounded Queue: 並行処理編 Part 3ソースはGitHubに移行しました。 LL/SC based Lock-Free Queue
LL/SCを利用したLock-Free Queueの実装。 LL/SCといってもIntelのCPUはサポートしていないので、CAS(cmpxchg)でエミュレートしている。 汎用的なエミュレートでなく、あくまでQueueの実装でのみ有効な方式に見える(まだしっかり把握できていないので要調査)。 LL/SCはABA問題が生じないので、 素直にアドレス値に対してCAS命令が使えるため、この実装はintel64(X86_64)環境でも動作する。 ちなみに、 lucille development blogでも同じ実装が公開されている。 「見たら負けかな」と思ったので実装を終えるまでソースをよく見なかったが、改めて眺めてみるとほとんど同じである。 これは論文の疑似コードが 限りなくC言語に近いため。しかし、関数の並びまで同じなのは笑った。 ソースはGitHubに移行しました。 データ構造データ構造は完全に論文通り。唯一、スレッド毎のデータ領域確保のためpthread_keyを定義している。
LLSCLockFreeQueue.h: typedef struct _ExitTag { int count; int transfersLeft; bool_t nlP; bool_t toBeFreed; } ExitTag; typedef struct _node_t { val_t val; struct _node_t *next; struct _node_t *pred; ExitTag exit; } node_t; typedef struct _EntryTag { int ver; int count; } EntryTag; typedef struct _LLSCvar { node_t *ptr0; node_t *ptr1; EntryTag entry; } LLSCvar; typedef struct _queue_t { LLSCvar head; LLSCvar tail; pthread_key_t workspace_key; } queue_t; typedef struct _workspace_t { int myver; node_t *mynode __attribute__((aligned(16))); } workspace_t __attribute__((aligned(16))); 基本関数Lock-FreeのためのプリミティブCAS(cmpxchg系)でLL/SCをエミュレートする。詳細は論文参照。 今はソースを書き散らかしているだけだが、いずれ内容を噛み砕いて解説したいと思っている。 #ifdef __X86_64__ static inline bool_t cas(void *ptr, uint64_t oldv, uint64_t newv) { uint64_t result; __asm__ __volatile__("lock\n cmpxchgq %1,%2" : "=a" (result) : "q" (newv), "m" (*(uint64_t *)ptr),"0" (oldv) : "memory"); return ((result == oldv) ? true : false); } #define CAST(value) (*((uint64_t *)&(value))) #else static inline bool_t cas(void *ptr, uint32_t oldv, uint32_t newv) { uint32_t result; __asm__ __volatile__("lock\n cmpxchgl %1,%2" : "=a" (result) : "q" (newv), "m" (*(uint32_t *)ptr),"0" (oldv) : "memory"); return ((result == oldv) ? true : false); } #define CAST(value) (*((uint32_t *)&(value))) #endif #define CURRENT(loc, ver) (ver % 2 == 0 ? loc->ptr0 : loc->ptr1) #define NONCURADDR(loc, ver) (ver % 2 == 0 ? (void *)&(loc->ptr1) : (void *)&(loc->ptr0)) #define CLEAN(exit) ((exit.count == 0) && (exit.transfersLeft == 0)) #define FREEABLE(exit) (CLEAN(exit) && exit.nlP && exit.toBeFreed) static node_t *LL(LLSCvar *loc, int *myver, node_t **mynode) { EntryTag e, new; do { e = loc->entry; *myver = e.ver; *mynode = CURRENT(loc, e.ver); { new.ver = e.ver; new.count = e.count + 1; } } while (!cas(&loc->entry, CAST(e), CAST(new))); return *mynode; } static bool_t SC(LLSCvar *loc, node_t *nd, int myver, node_t *mynode) { EntryTag e, new; node_t *pred_nd = mynode->pred; bool_t success = cas(NONCURADDR(loc, myver), CAST(pred_nd), CAST(nd)); /**** if (!success) free(new_nd); ***/ e = loc->entry; while (e.ver == myver) { { new.ver = e.ver + 1; new.count = 0; } if (cas(&loc->entry, CAST(e), CAST(new))) transfer(mynode, e.count); e = loc->entry; } release(mynode); return success; } static void transfer(node_t *nd, int count) { ExitTag pre, post; do { pre = nd->exit; { post.count = pre.count + count; post.transfersLeft = pre.transfersLeft - 1; post.nlP = pre.nlP; post.toBeFreed = pre.toBeFreed; } } while (!cas(&nd->exit, CAST(pre), CAST(post))); } static void release(node_t *nd) { ExitTag pre, post; node_t *pred_nd = nd->pred; do { pre = nd->exit; { post.count = pre.count - 1; post.transfersLeft = pre.transfersLeft; post.nlP = pre.nlP; post.toBeFreed = pre.toBeFreed; } } while (!cas(&nd->exit, CAST(pre), CAST(post))); if (CLEAN(post)) setNLPred(pred_nd); if (FREEABLE(post)) free(nd); } static void unlink(LLSCvar *loc, int myver, node_t *mynode) { EntryTag e, new; do { e = loc->entry; } while (e.ver == myver); { new.ver = e.ver; new.count = e.count - 1; } if (!cas(&loc->entry, CAST(e), CAST(new))) release(mynode); } static void setNLPred(node_t *pred_nd) { ExitTag pre, post; do { pre = pred_nd->exit; { post.count = pre.count; post.transfersLeft = pre.transfersLeft; post.nlP = true; post.toBeFreed = pre.toBeFreed; } } while (!cas(&pred_nd->exit, CAST(pre), CAST(post))); if (FREEABLE(post)) free(pred_nd); } static void setToBeFreed(node_t *pred_nd) { ExitTag pre, post; do { pre = pred_nd->exit; { post.count = pre.count; post.transfersLeft = pre.transfersLeft; post.nlP = pre.nlP; post.toBeFreed = true; } } while (!cas(&pred_nd->exit, CAST(pre), CAST(post))); if (FREEABLE(post)) free(pred_nd); } queueの生成queue_t *init_queue (void) { queue_t *q; if ((q = (queue_t *) calloc(1, sizeof(queue_t))) == NULL) { elog("calloc error"); return NULL; } q->tail.entry.ver = 0; q->tail.entry.count = 0; if ((q->tail.ptr0 = (node_t *)calloc(1, sizeof(node_t))) == NULL) { elog("calloc error"); goto end; } if ((q->tail.ptr1 = (node_t *)calloc(1, sizeof(node_t))) == NULL) { elog("calloc error"); goto end; } q->tail.ptr0->pred = q->tail.ptr1; q->tail.ptr0->exit.count = 0; q->tail.ptr0->exit.transfersLeft = 2; q->tail.ptr0->exit.nlP = 0; q->tail.ptr0->exit.toBeFreed = 0; q->tail.ptr0->next = NULL; q->tail.ptr1->exit.count = 0; q->tail.ptr1->exit.transfersLeft = 0; q->tail.ptr1->exit.nlP = 0; q->tail.ptr1->exit.toBeFreed = 0; q->head = q->tail; if (pthread_key_create(&q->workspace_key, (void *) free_workspace) != 0) { elog("pthread_key_create() error"); abort(); } return q; end: free(q->tail.ptr0); free(q); return NULL; } void free_queue(queue_t *q) { free(q); } enqueuebool_t enq(queue_t *q, val_t val) { bool_t ret = true; node_t *nd, *tail; workspace_t *ws = get_workspace(q); assert(ws != NULL); if ((nd = create_node(val)) == NULL) { return false; } while (1) { tail = LL(&q->tail, &ws->myver, &ws->mynode); nd->pred = tail; if (cas(&tail->next, (uintptr_t)NULL, CAST(nd))) { SC(&q->tail, nd, ws->myver, ws->mynode); break; } else { SC(&q->tail, tail->next, ws->myver, ws->mynode); } } return ret; } static node_t *create_node (val_t val) { node_t *node; if ((node = (node_t*)calloc(1, sizeof(node_t))) == NULL) { elog("calloc error"); return NULL; } node->val = val; node->next = NULL; node->exit.count = 0; node->exit.transfersLeft = 2; node->exit.nlP = false; node->exit.toBeFreed = false; return node; } dequeuebool_t deq(queue_t *q, val_t *val) { bool_t ret = true; node_t *head, *next; workspace_t *ws = get_workspace(q); assert(ws != NULL); while (1) { head = LL(&q->head, &ws->myver, &ws->mynode); next = head->next; if (next == NULL) { unlink(&q->head, ws->myver, ws->mynode); *val = (val_t)NULL; ret = false; break; } if (SC(&q->head, next, ws->myver, ws->mynode)) { *val = next->val; setToBeFreed(next); free_node(next); break; } } return ret; } 実行ソースはGitHubに移行しました。
Last-modified: 2014-7-6
|