|
|
|
UnBounded Queue: 並行処理編 Part 3ソースはGitHubに移行しました。 LL/SC based Lock-Free Queue
LL/SCを利用したLock-Free Queueの実装。 LL/SCといってもIntelのCPUはサポートしていないので、CAS(cmpxchg)でエミュレートしている。 汎用的なエミュレートでなく、あくまでQueueの実装でのみ有効な方式に見える(まだしっかり把握できていないので要調査)。 LL/SCはABA問題が生じないので、 素直にアドレス値に対してCAS命令が使えるため、この実装はintel64(X86_64)環境でも動作する。 ちなみに、 lucille development blogでも同じ実装が公開されている。 「見たら負けかな」と思ったので実装を終えるまでソースをよく見なかったが、改めて眺めてみるとほとんど同じである。 これは論文の疑似コードが 限りなくC言語に近いため。しかし、関数の並びまで同じなのは笑った。 ソースはGitHubに移行しました。 データ構造データ構造は完全に論文通り。唯一、スレッド毎のデータ領域確保のためpthread_keyを定義している。
LLSCLockFreeQueue.h:
typedef struct _ExitTag {
int count;
int transfersLeft;
bool_t nlP;
bool_t toBeFreed;
} ExitTag;
typedef struct _node_t {
val_t val;
struct _node_t *next;
struct _node_t *pred;
ExitTag exit;
} node_t;
typedef struct _EntryTag {
int ver;
int count;
} EntryTag;
typedef struct _LLSCvar {
node_t *ptr0;
node_t *ptr1;
EntryTag entry;
} LLSCvar;
typedef struct _queue_t {
LLSCvar head;
LLSCvar tail;
pthread_key_t workspace_key;
} queue_t;
typedef struct _workspace_t {
int myver;
node_t *mynode __attribute__((aligned(16)));
} workspace_t __attribute__((aligned(16)));
基本関数Lock-FreeのためのプリミティブCAS(cmpxchg系)でLL/SCをエミュレートする。詳細は論文参照。 今はソースを書き散らかしているだけだが、いずれ内容を噛み砕いて解説したいと思っている。
#ifdef __X86_64__
static inline bool_t cas(void *ptr, uint64_t oldv, uint64_t newv)
{
uint64_t result;
__asm__ __volatile__("lock\n cmpxchgq %1,%2"
: "=a" (result)
: "q" (newv), "m" (*(uint64_t *)ptr),"0" (oldv)
: "memory");
return ((result == oldv) ? true : false);
}
#define CAST(value) (*((uint64_t *)&(value)))
#else
static inline bool_t cas(void *ptr, uint32_t oldv, uint32_t newv)
{
uint32_t result;
__asm__ __volatile__("lock\n cmpxchgl %1,%2"
: "=a" (result)
: "q" (newv), "m" (*(uint32_t *)ptr),"0" (oldv)
: "memory");
return ((result == oldv) ? true : false);
}
#define CAST(value) (*((uint32_t *)&(value)))
#endif
#define CURRENT(loc, ver) (ver % 2 == 0 ? loc->ptr0 : loc->ptr1)
#define NONCURADDR(loc, ver) (ver % 2 == 0 ? (void *)&(loc->ptr1) : (void *)&(loc->ptr0))
#define CLEAN(exit) ((exit.count == 0) && (exit.transfersLeft == 0))
#define FREEABLE(exit) (CLEAN(exit) && exit.nlP && exit.toBeFreed)
static node_t
*LL(LLSCvar *loc, int *myver, node_t **mynode)
{
EntryTag e, new;
do {
e = loc->entry;
*myver = e.ver;
*mynode = CURRENT(loc, e.ver);
{
new.ver = e.ver;
new.count = e.count + 1;
}
} while (!cas(&loc->entry, CAST(e), CAST(new)));
return *mynode;
}
static bool_t
SC(LLSCvar *loc, node_t *nd, int myver, node_t *mynode)
{
EntryTag e, new;
node_t *pred_nd = mynode->pred;
bool_t success = cas(NONCURADDR(loc, myver), CAST(pred_nd), CAST(nd));
/****
if (!success)
free(new_nd);
***/
e = loc->entry;
while (e.ver == myver) {
{
new.ver = e.ver + 1;
new.count = 0;
}
if (cas(&loc->entry, CAST(e), CAST(new)))
transfer(mynode, e.count);
e = loc->entry;
}
release(mynode);
return success;
}
static void
transfer(node_t *nd, int count)
{
ExitTag pre, post;
do {
pre = nd->exit;
{
post.count = pre.count + count;
post.transfersLeft = pre.transfersLeft - 1;
post.nlP = pre.nlP;
post.toBeFreed = pre.toBeFreed;
}
} while (!cas(&nd->exit, CAST(pre), CAST(post)));
}
static void
release(node_t *nd)
{
ExitTag pre, post;
node_t *pred_nd = nd->pred;
do {
pre = nd->exit;
{
post.count = pre.count - 1;
post.transfersLeft = pre.transfersLeft;
post.nlP = pre.nlP;
post.toBeFreed = pre.toBeFreed;
}
} while (!cas(&nd->exit, CAST(pre), CAST(post)));
if (CLEAN(post))
setNLPred(pred_nd);
if (FREEABLE(post))
free(nd);
}
static void
unlink(LLSCvar *loc, int myver, node_t *mynode)
{
EntryTag e, new;
do {
e = loc->entry;
} while (e.ver == myver);
{
new.ver = e.ver;
new.count = e.count - 1;
}
if (!cas(&loc->entry, CAST(e), CAST(new)))
release(mynode);
}
static void
setNLPred(node_t *pred_nd)
{
ExitTag pre, post;
do {
pre = pred_nd->exit;
{
post.count = pre.count;
post.transfersLeft = pre.transfersLeft;
post.nlP = true;
post.toBeFreed = pre.toBeFreed;
}
} while (!cas(&pred_nd->exit, CAST(pre), CAST(post)));
if (FREEABLE(post))
free(pred_nd);
}
static void
setToBeFreed(node_t *pred_nd)
{
ExitTag pre, post;
do {
pre = pred_nd->exit;
{
post.count = pre.count;
post.transfersLeft = pre.transfersLeft;
post.nlP = pre.nlP;
post.toBeFreed = true;
}
} while (!cas(&pred_nd->exit, CAST(pre), CAST(post)));
if (FREEABLE(post))
free(pred_nd);
}
queueの生成
queue_t *init_queue (void)
{
queue_t *q;
if ((q = (queue_t *) calloc(1, sizeof(queue_t))) == NULL) {
elog("calloc error");
return NULL;
}
q->tail.entry.ver = 0;
q->tail.entry.count = 0;
if ((q->tail.ptr0 = (node_t *)calloc(1, sizeof(node_t))) == NULL) {
elog("calloc error");
goto end;
}
if ((q->tail.ptr1 = (node_t *)calloc(1, sizeof(node_t))) == NULL) {
elog("calloc error");
goto end;
}
q->tail.ptr0->pred = q->tail.ptr1;
q->tail.ptr0->exit.count = 0;
q->tail.ptr0->exit.transfersLeft = 2;
q->tail.ptr0->exit.nlP = 0;
q->tail.ptr0->exit.toBeFreed = 0;
q->tail.ptr0->next = NULL;
q->tail.ptr1->exit.count = 0;
q->tail.ptr1->exit.transfersLeft = 0;
q->tail.ptr1->exit.nlP = 0;
q->tail.ptr1->exit.toBeFreed = 0;
q->head = q->tail;
if (pthread_key_create(&q->workspace_key, (void *) free_workspace) != 0) {
elog("pthread_key_create() error");
abort();
}
return q;
end:
free(q->tail.ptr0);
free(q);
return NULL;
}
void
free_queue(queue_t *q)
{
free(q);
}
enqueue
bool_t enq(queue_t *q, val_t val)
{
bool_t ret = true;
node_t *nd, *tail;
workspace_t *ws = get_workspace(q);
assert(ws != NULL);
if ((nd = create_node(val)) == NULL) {
return false;
}
while (1) {
tail = LL(&q->tail, &ws->myver, &ws->mynode);
nd->pred = tail;
if (cas(&tail->next, (uintptr_t)NULL, CAST(nd))) {
SC(&q->tail, nd, ws->myver, ws->mynode);
break;
}
else {
SC(&q->tail, tail->next, ws->myver, ws->mynode);
}
}
return ret;
}
static node_t *create_node (val_t val)
{
node_t *node;
if ((node = (node_t*)calloc(1, sizeof(node_t))) == NULL) {
elog("calloc error");
return NULL;
}
node->val = val;
node->next = NULL;
node->exit.count = 0;
node->exit.transfersLeft = 2;
node->exit.nlP = false;
node->exit.toBeFreed = false;
return node;
}
dequeue
bool_t deq(queue_t *q, val_t *val)
{
bool_t ret = true;
node_t *head, *next;
workspace_t *ws = get_workspace(q);
assert(ws != NULL);
while (1) {
head = LL(&q->head, &ws->myver, &ws->mynode);
next = head->next;
if (next == NULL) {
unlink(&q->head, ws->myver, ws->mynode);
*val = (val_t)NULL;
ret = false;
break;
}
if (SC(&q->head, next, ws->myver, ws->mynode)) {
*val = next->val;
setToBeFreed(next);
free_node(next);
break;
}
}
return ret;
}
実行ソースはGitHubに移行しました。
Last-modified: 2014-7-6
|