|
|
Skiplist スキップリスト: 並行処理編 Part. 1ソースはGitHubに移行しました。 Lazy Skiplist(単方向 Lazy スキップリスト)
(単方向)Lazy Skiplistを実装する: ソースはGitHubに移行しました。 データ構造データ構造list_tとnode_tを定義する。 list_tはListの本体、node_tはListに連結するnodeのデータ構造である。 以降、共通のデータ型として"common.h"に"bool_t"、"lkey_t"、"val_t"を定義する。 "bool_t"は自明、 "lkey_t"はnodeのkey、"val_t"はnodeのvalを保持する。 common.h: typedef bool bool_t; typedef uint64_t lkey_t; typedef int64_t val_t; 次に、nodeとlistのデータ構造を示す。 LazySkiplist .h: typedef struct _skiplist_node_t { lkey_t key; /* key */ val_t val; /* 値 */ int topLevel; /* このノードのレベル(高さ) */ bool_t marked; /* 論理的に削除されているか否か示す */ bool_t fullyLinked; pthread_mutex_t mtx; struct _skiplist_node_t *next[1]; /* 次のnodeへのポインタ */ } skiplist_node_t; typedef struct _skiplist_t { int maxLevel; /* このリストの最大レベル(高さ) */ skiplist_node_t *head; skiplist_node_t *tail; pthread_key_t workspace_key; /* add, delete, findのための補助メモリ領域用pthread_key */ } skiplist_t; typedef struct _workspace_t { /* add, delete, findのための補助メモリ領域。スレッド毎に保持する。 */ skiplist_node_t **preds; skiplist_node_t **succs; } workspace_t; skiplist_node_tは(key,val)のペアと次のノードへのポインタ、およびこのノードのレベル(高さ)を保持する。 また、markedとfullyLinkedの2つのマーク(bool_t)と、 ノード毎のlockのためにpthreadのmutex変数を確保する。
skiplist_tにはリストの先頭headと末尾tailが確保される。 具体的には、pthread_keyの機能を利用する。 skiplist_tにはpthread_keyのみ確保し別途、構造体workspace_tを用意してスレッド毎にメモリ領域を確保する。 基本関数スレッド毎のメモリ領域確保pthread_keyを知っていれば自明なコードである。 concurrent_skiplist.h: /* static workspace_t *init_workspace(const int maxLevel) * * (スレッド毎に)skiplist *slの補助メモリ領域:predsとsuccsを確保する。 * * 成功すると確保したメモリへのポインタを返す。失敗するとNULLを返す。 */ static workspace_t *init_workspace(const int maxLevel) { workspace_t *ws; if ((ws = (workspace_t *) calloc(1, sizeof(workspace_t))) == NULL) { elog("calloc error"); return NULL; } if ((ws->preds = (skiplist_node_t **) calloc(1, sizeof(skiplist_node_t *) * maxLevel)) == NULL) { elog("calloc error"); goto end; } if ((ws->succs = (skiplist_node_t **) calloc(1, sizeof(skiplist_node_t *) * maxLevel)) == NULL) { elog("calloc error"); goto end; } return ws; end: free(ws->preds); free(ws); return (workspace_t *) NULL; } /* * void free_workspace(workspace_t * ws) * * wsを開放する。 */ static void free_workspace(workspace_t * ws) { free(ws->preds); free(ws->succs); free(ws); } /* * workspace_t *get_workspace(skiplist_t * sl) * * (スレッド毎に)初めて呼ばれた時はメモリ領域へ確保し、pthread_keyに対応させる。 * 必ずスレッドに対応したメモリ領域へのポインタを返す。 * */ static workspace_t *get_workspace(skiplist_t * sl) { /* スレッド毎のメモリ領域へのポインタを得る */ workspace_t *workspace = pthread_getspecific(sl->workspace_key); /* まだメモリ領域が確保されていない場合: */ if (workspace == NULL) { /* メモリ領域の確保 */ if ((workspace = init_workspace(sl->maxLevel)) != NULL) { /* メモリ領域をpthread_keyに対応させる */ if (pthread_setspecific(sl->workspace_key, (void *) workspace) != 0) { elog("pthread_setspecific() error"); abort(); } } else { elog("init_workspace() error"); abort(); } } assert(workspace != NULL); return workspace; } 少し先行して、使い方を示す。 bool_t add(skiplist_t * sl, lkey_t key, val_t val) { workspace_t *ws = get_workspace(sl); /* スレッド毎に確保された補助メモリ領域wsへのポインタを返す */ assert(ws != NULL); return _add(sl, ws->preds, ws->succs, key, val); /* 実際にノードの追加を行う */ } listの生成/* * skiplist_t *init_list(const int maxLevel, const lkey_t min, const lkey_t max) * * skiplistを生成する。maxLevelは最大の高さ(レベル)を指定。 * minはスキップリストのkeyの最小値(skiplist->head->keyに保持)、 * maxはスキップリストのkeyの最大値(skiplist->tail->keyに保持)、 * * 成功すればskiplistへのポインタを返す。失敗ならNULLを返す。 * */ skiplist_t *init_list(const int maxLevel, const lkey_t min, const lkey_t max) { int i; skiplist_t *sl; skiplist_node_t *head, *tail; if ((sl = (skiplist_t *) calloc(1, sizeof(skiplist_t))) == NULL) { elog("calloc error"); return NULL; } sl->maxLevel = maxLevel; if ((head = create_node(maxLevel, min, min)) == NULL) { elog("create_node() error"); goto end; } head->fullyLinked = true; if ((tail = create_node(maxLevel, max, max)) == NULL) { elog("create_node() error"); goto end; } tail->fullyLinked = true; for (i = 0; i < maxLevel; i++) { head->next[i] = tail; tail->next[i] = NULL; } sl->head = head; sl->tail = tail; /* pthread_key_t workspace_keyの初期化 * 確保したメモリ領域の開放はfree_workspace()@concurrent_skiplist.hで行う。 */ if (pthread_key_create(&sl->workspace_key, (void *) free_workspace) != 0) { elog("pthread_key_create() error"); abort(); } return sl; end: free(sl->head); free(sl); return NULL; } ノードの検索各種操作(add,delete,find)が使うノードの検索関数を示す。 /* *static int search(skiplist_t * sl, const lkey_t key, skiplist_node_t ** preds, * skiplist_node_t ** succs) * * keyを持つノードを探す。 * ノードが見つかった場合: * そのノードのレベル(高さ)を返す。またsuccsには目的のノードが保持するポインタ、 * predsにはsuccsを指すノードへのポインタを返す。 * 例) 以下のスキップリストでkey[4]のノードを探す場合、 * *level: head tail * 3 [-inf]------>[2]--------------------->[inf] * 2 [-inf]------>[2]------>[4]------>[7]->[inf] * 1 [-inf]->[1]->[2]------>[4]->[5]->[7]->[inf] * 0 [-inf]->[1]->[2]->[3]->[4]->[5]->[7]->[inf] * * predsとsuccsはそれぞれ次のようになり、レベル2を返す。 * level: preds succs * 2 [2] [7] * 1 [2] [5] * 0 [3] [5] * preds[2]=2は"レベル2では、ノード(key=2)がノード(key=4)を指している"、 * preds[1]=2は"レベル1では、ノード(key=2)がノード(key=4)を指している"、 * preds[0]=3は"レベル0では、ノード(key=3)がノード(key=4)を指している"、 * を意味し、 * succs[2]=7は"レベル2では、ノード(key=4)がノード(key=7)を指している"、 * succs[1]=5は"レベル1では、ノード(key=4)がノード(key=5)を指している"、 * succs[0]=5は"レベル0では、ノード(key=4)がノード(key=5)を指している"、 * を意味する。 */ static int search(const skiplist_t * sl, const lkey_t key, skiplist_node_t ** preds, skiplist_node_t ** succs) { int lFound, level; skiplist_node_t *pred, *curr; pred = sl->head; lFound = -1; for (level = sl->maxLevel - 1; level >= 0; level--) { curr = pred->next[level]; while (key > curr->key) { pred = curr; curr = pred->next[level]; } if (lFound == -1 && key == curr->key) lFound = level; preds[level] = pred; succs[level] = curr; } return lFound; } ノードの追加/* * bool_t add(list_t * list, const lkey_t key, const val_t val) * * listにnode(key,val)を追加する。 * 追加位置はkeyの昇順に従う。すでに同じkeyを持つnodeがある場合は失敗。 * * 成功すればtrue、失敗すればfalseを返す。 */ bool_t add(skiplist_t * sl, lkey_t key, val_t val) { workspace_t *ws = get_workspace(sl); /* スレッド毎に確保された補助メモリ領域wsへのポインタを返す */ assert(ws != NULL); return _add(sl, ws->preds, ws->succs, key, val); } static bool_t _add(skiplist_t * sl, skiplist_node_t ** preds, skiplist_node_t ** succs, const lkey_t key, const val_t val) { int topLevel, highestLocked, lFound, level; skiplist_node_t *newNode, *pred, *succ, *nodeFound; bool_t valid; int r = rand(); topLevel = (r % sl->maxLevel); assert(0 <= topLevel && topLevel < sl->maxLevel); /* * step 1: ノードの検索 */ while (1) { if ((lFound = search(sl, key, preds, succs)) != -1) { nodeFound = succs[lFound]; if (nodeFound->marked != true) { while (nodeFound->fullyLinked != true) {}; return false; } continue; } /* * step 2: preds[]に含まれるノードのロック */ highestLocked = -1; valid = true; for (level = 0; (valid == true) && (level <= topLevel); level++) { pred = preds[level]; succ = succs[level]; lock(pred->mtx); /* levelによって複数回lockされる可能性がある -> RECURSIVE_MUTEX */ highestLocked = level; valid = (pred->marked != true) && (succ->marked != true) && (pred->next[level] == succ); } if (valid != true) { for (level = 0; level <= highestLocked; level++) unlock(preds[level]->mtx); continue; } assert(valid == true); for (level = 0; (level <= topLevel); level++) { pred = preds[level]; succ = succs[level]; assert(pred->marked != true); assert(succ->marked != true); assert(pred->next[level] == succ); } /* * step 3: ノード追加 */ newNode = create_node(topLevel, key, val); for (level = 0; level <= topLevel; level++) { newNode->next[level] = succs[level]; preds[level]->next[level] = newNode; } newNode->fullyLinked = true; break; } /* * step 4: ロックの解除 */ assert(highestLocked == topLevel); for (level = 0; level <= topLevel; level++) unlock(preds[level]->mtx); return true; }
関数add()内部で呼ぶnode生成関数create_node()を示す。 /* * static skiplist_node_t *create_node(const int topLevel, const lkey_t key, const val_t val) * * レベル(高さ)topLevelで(key, val)を持つnodeを生成する。 * * 成功すればnodeへのポインタを返す。失敗すればNULLを返す。 */ #define node_size(level) (sizeof(skiplist_node_t) + ((level + 1) * sizeof(skiplist_node_t *))) static skiplist_node_t *create_node(const int topLevel, const lkey_t key, const val_t val) { skiplist_node_t *node; #ifndef PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP pthread_mutexattr_t mtx_attr; #endif if ((node = (skiplist_node_t *) calloc(1, node_size(topLevel))) == NULL) { elog("calloc error"); return NULL; } node->key = key; node->val = val; node->topLevel = topLevel; #ifdef PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP node->mtx = (pthread_mutex_t) PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP; #else pthread_mutexattr_init(&mtx_attr); pthread_mutexattr_settype(&mtx_attr, PTHREAD_MUTEX_RECURSIVE_NP); pthread_mutex_init(&node->mtx, &mtx_attr); #endif node->marked = false; node->fullyLinked = false; return node; } nodeの削除/* * bool_t delete(skiplist_t * sl, lkey_t key, val_t * val) * * listからkeyを持つnodeを削除し、そのnodeのvalを*valに書き込む。 * keyが存在しない場合は失敗。 * * 成功すればtrue、失敗すればfalseを返す。 */ bool_t delete(skiplist_t * sl, const lkey_t key, val_t * val) { workspace_t *ws = get_workspace(sl); assert(ws != NULL); return _delete(sl, ws->preds, ws->succs, key, val); } static bool_t _delete(skiplist_t * sl, skiplist_node_t ** preds, skiplist_node_t ** succs, const lkey_t key, val_t * val) { int topLevel = -1; int i, lFound, level, highestLocked; skiplist_node_t *pred, *victim; bool_t isMarked = false; bool_t valid, flag; while (1) { /* * step 1: 削除するノードの検索 */ victim = NULL; if ((lFound = search(sl, key, preds, succs)) == -1) /* 削除するノードが存在しない場合 */ return false; victim = succs[lFound]; flag = ((victim->fullyLinked == true) && (victim->topLevel == lFound) && (victim->marked != true)); /* 初回、もしくはのみ実行 * => victim->marked = true; isMarked = true; topLevel = victim->topLevel; */ if (isMarked == true || ((lFound != -1) && flag)) { if (isMarked != true) { topLevel = victim->topLevel; lock(victim->mtx); if (victim->marked == true) { unlock(victim->mtx); return false; } victim->marked = true; isMarked = true; } /* * step 2: preds[]のロック獲得 */ highestLocked = -1; valid = true; for (level = 0; (valid == true) && (level <= topLevel); level++) { pred = preds[level]; lock(pred->mtx); highestLocked = level; valid = (pred->marked != true) && (pred->next[level] == victim); } if (valid != true) { /* preds[]のロック獲得に失敗した場合、やりなおし */ for (i = 0; i <= highestLocked; i++) unlock(preds[i]->mtx); continue; } /* * step 3: ノードの削除 */ for (level = victim->topLevel; level >= 0; level--) preds[level]->next[level] = victim->next[level]; unlock(victim->mtx); for (i = 0; i <= lFound; i++) /* preds[]のロック開放 */ unlock(preds[i]->mtx); break; } else return false; } *val = victim->val; free_node(victim); return true; } nodeの検索/* * val_t find(skiplist_t * sl, lkey_t key) * * skiplistからkeyを持つノードを検索する。ノードが存在する場合はそのノードのvalを返す。 * keyが存在しない場合はNULLを返す。 * */ val_t find(skiplist_t * sl, lkey_t key) { workspace_t *ws = get_workspace(sl); assert(ws != NULL); return _find(sl, ws->preds, ws->succs, key); } static bool_t _find(skiplist_t * sl, skiplist_node_t ** preds, skiplist_node_t ** succs, const lkey_t key) { int lFound; lFound = search(sl, key, preds, succs); if (lFound == -1) return (val_t) NULL; if (succs[lFound]->fullyLinked != true || succs[lFound]->marked == true) return (val_t) NULL; return preds[0]->next[0]->val; } 実行ソースはGitHubに移行しました。
Last-modified: 2014-7-6
|