単方向リスト: 並行処理編 Part 3ソースはGitHubに移行しました。 Non-Blocking Singly-linked List(Non-Blocking単方向リスト)Non-Blocking Slingly-Linked Listについて、ここではCASベースの比較的単純なアルゴリズムを実装した。 "A Pragmatic Implementation of Non-Blocking Linked-Lists" Timothy L. Harris .pdf アルゴリズムを簡単に図示する: ![]() 図の下部は今回実装したアルゴリズムで、削除するノードにマーキングして上記の状況を避ける。 ソースはGitHubに移行しました。 データ構造データ構造list_tとnode_tを定義する。 list_tはListの本体、node_tはListに連結するnodeのデータ構造である。
nodeとlistのデータ構造を示す。 NonBlockingList.h: typedef enum {UNMARKED = 0, MARKED = 1} node_stat; typedef struct _next_ref { node_stat mark; void *node_ptr; }__attribute__((packed)) next_ref; typedef struct _node_t { lkey_t key; /* key */ val_t val; /* 値 */ next_ref next; /* 次のnodeへのリファレンス */ }__attribute__((packed)) node_t; typedef struct _list_t { node_t *head; node_t *tail; } list_t; list_tには、リストの先頭headと末尾tailが確保される。 基本関数listの生成/* * list_t *init_list(void) * * listを生成する。 * * 成功すればlistへのポインタを返す。失敗ならNULLを返す。 * */ list_t *init_list(void) { list_t *list; if ((list = (list_t *) calloc(1, sizeof(list_t))) == NULL) { elog("calloc error"); return NULL; } if ((list->head = (node_t *) calloc(1, sizeof(node_t))) == NULL) { elog("calloc error"); goto end; } if ((list->tail = (node_t *) calloc(1, sizeof(node_t))) == NULL) { elog("calloc error"); goto end; } list->head->key = INT_MIN; list->head->next = make_ref(list->tail, UNMARKED); list->tail->key = INT_MAX; list->tail->next = make_ref(NULL, UNMARKED); return list; end: free (list->head); free (list); return NULL; } ノードの検索
static inline bool_t cas64(volatile next_ref * addr, const next_ref oldp, const next_ref newp) { char result; __asm__ __volatile__("lock; cmpxchg8b %0; setz %1":"=m"(*(volatile next_ref *)addr), "=q"(result) :"m"(*(volatile next_ref *)addr), "a"(oldp.mark), "d"(oldp.node_ptr), "b"(newp.mark), "c"(newp.node_ptr) :"memory"); return ((int)result == true ? true : false); } #define is_marked_ref(ref) (ref.mark == MARKED ? true:false) #define is_unmarked_ref(ref) (ref.mark == UNMARKED ? true:false) #define get_ptr(ref) (ref.node_ptr) static next_ref make_ref(node_t * node_ptr, const node_stat mark) { next_ref ref; ref.mark = mark; ref.node_ptr = (node_t *) node_ptr; return ref; } #define get_unmarked_ref(ptr) make_ref(ptr, UNMARKED) #define get_marked_ref(ptr) make_ref(ptr, MARKED) static node_t *search(list_t * list, const lkey_t key, node_t **pred) { node_t *pred_next = NULL; node_t *curr; search_again: do { node_t *t = list->head; node_t *t_next = (node_t *)get_ptr(list->head->next); /* step 1: find pred and curr */ do { if (!is_marked_ref(t_next->next)) { (*pred) = t; pred_next = t_next; } t = t_next; t->next.mark = (int)UNMARKED; if (t == list->tail) break; t_next = (node_t *)get_ptr(t->next); } while (t->key < key || is_marked_ref(t_next->next)); assert(key <= t->key || is_unmarked_ref(t_next->next)); curr = t; /* step 2: check nodes are adjacent */ if (pred_next == curr) { if ((curr != list->tail) && (is_marked_ref(curr->next))) { goto search_again; } else { assert (pred_next == curr); return curr; } } /* step 3: remove one or more marked nodes */ if (cas64(&(*pred)->next, make_ref(pred_next, MARKED), make_ref(curr, UNMARKED)) == true) { if (curr != list->tail && is_marked_ref(curr->next)) { goto search_again; } else { assert (get_ptr((*pred)->next) == curr); return curr; goto end; // dummy } } } while (1); end: assert (get_ptr((*pred)->next) == curr); return curr; } ノードの追加
/* * bool_t add(list_t * list, const lkey_t key, const val_t val) * * listにnode(key,val)を追加する。 * 追加位置はkeyの昇順に従う。すでに同じkeyを持つnodeがある場合は失敗。 * * 成功すればtrue、失敗すればfalseを返す。 */ bool add(list_t * list, const lkey_t key, const val_t val) { node_t *pred = NULL; node_t *curr; node_t *newNode; if ((newNode = create_node(key, val)) == NULL) return false; do { curr = search(list, key, &pred); assert(pred->key < key && key <= curr->key); if ((curr != list->tail) && (curr->key == key)) { free_node(newNode); return false; } newNode->next = make_ref(curr, UNMARKED); if (cas64(&pred->next, make_ref(curr, UNMARKED), make_ref(newNode, UNMARKED)) == true) { break; } } while (1); return true; }関数add()内部で呼ぶnode生成関数create_node()を示す。 /* * node_t *create_node(const lkey_t key, const val_t val) * * (key, val)を持つnodeを生成する。 * * 成功すればnodeへのポインタを返す。失敗すればNULLを返す。 */ static node_t *create_node(const lkey_t key, const val_t val) { node_t *node; if ((node = (node_t *) calloc(1, sizeof(node_t))) == NULL) { elog("calloc error"); return NULL; } node->next = make_ref(NULL, UNMARKED); node->key = key; node->val = val; return node; } ノードの削除ノードの削除も基本戦略は追加と同じである。 /* * bool_t delete(list_t * list, const lkey_t key, val_t *val) * * listからkeyを持つnodeを削除し、そのnodeのvalを*valに書き込む。 * keyが存在しない場合は失敗。 * * 成功すればtrue、失敗すればfalseを返す。 */ bool_t delete(list_t * list, const lkey_t key, val_t *val) { node_t *pred = NULL; node_t *curr, *curr_next; bool_t ret = true; do { curr = search(list, key, &pred); assert(pred->key < key && key <= curr->key); if ((curr == list->tail) || (curr->key != key)) { #ifdef _DEBUG_ if (curr->key != key) printf ("delete ERROR: key = %ld curr->key %ld\n", (long int)key, (long int)curr->key); #endif return false; } curr_next = (node_t *)get_ptr(curr->next); if (is_unmarked_ref(curr_next->next)) { node_t *node_ptr = (node_t *)get_ptr(curr->next); if (cas64(&curr->next, make_ref(node_ptr, curr->next.mark), get_marked_ref(node_ptr)) == true) break; } } while (1); if (cas64(&pred->next, get_unmarked_ref(curr), get_unmarked_ref(curr_next)) != true) { curr = search(list, curr->key, &pred); } *val = curr->val; free_node(curr); return ret; } ノードの検索/* * bool_t find(list_t * list, const lkey_t key, val_t *val) * * listからkeyを持つnodeを検索し、そのnodeのvalを*valに書き込む。 * keyが存在しない場合は失敗。 * * 成功すればtrue、失敗すればfalseを返す。 */ bool find(list_t * l, const lkey_t key) { node_t *curr; node_t *pred = NULL; curr = search(l, key, &pred); if ((curr == l->tail) || (curr->key != key)) return false; return true; } 実行ソースはGitHubに移行しました。
Last-modified: 2014-7-6