Skiplist スキップリスト: 並行処理編 Part. 1

ソースはGitHubに移行しました。

Lazy Skiplist(単方向 Lazy スキップリスト)

(単方向)Lazy Skiplistを実装する:
"A Simple Optimistic skip-list Algorithm" M.Herlihy, Y.Lev, V.Luchangco, N.Shavit

ソースはGitHubに移行しました。

データ構造

データ構造list_tとnode_tを定義する。 list_tはListの本体、node_tはListに連結するnodeのデータ構造である。

以降、共通のデータ型として"common.h"に"bool_t"、"lkey_t"、"val_t"を定義する。 "bool_t"は自明、 "lkey_t"はnodeのkey、"val_t"はnodeのvalを保持する。

common.h:

typedef bool bool_t;
typedef uint64_t lkey_t;
typedef int64_t  val_t;

次に、nodeとlistのデータ構造を示す。

LazySkiplist .h:

typedef struct _skiplist_node_t {
  lkey_t key;                       /* key */
  val_t val;                         /* 値  */

  int topLevel;                      /* このノードのレベル(高さ) */

  bool_t marked;		     /* 論理的に削除されているか否か示す */
  bool_t fullyLinked;

  pthread_mutex_t mtx;
  struct _skiplist_node_t *next[1];  /* 次のnodeへのポインタ */
} skiplist_node_t;

typedef struct _skiplist_t {
  int	maxLevel;                    /* このリストの最大レベル(高さ) */

  skiplist_node_t *head;
  skiplist_node_t *tail;

  pthread_key_t workspace_key;       /* add, delete, findのための補助メモリ領域用pthread_key */
} skiplist_t;

typedef struct _workspace_t {        /* add, delete, findのための補助メモリ領域。スレッド毎に保持する。 */
  skiplist_node_t **preds;
  skiplist_node_t **succs;
} workspace_t;

skiplist_node_tは(key,val)のペアと次のノードへのポインタ、およびこのノードのレベル(高さ)を保持する。 また、markedとfullyLinkedの2つのマーク(bool_t)と、 ノード毎のlockのためにpthreadのmutex変数を確保する。

skiplist_tにはリストの先頭headと末尾tailが確保される。
また、各種操作(add,delete,find)のためにあらかじめ補助メモリ領域を確保しなければならないが、 複数のスレッドによる並行処理に対処するには、スキップリストそのものに付随するのでなく、 スレッド毎に補助メモリ領域を確保しなければならない。
そこで、スレッド毎に補助メモリ領域を確保する。

具体的には、pthread_keyの機能を利用する。 skiplist_tにはpthread_keyのみ確保し別途、構造体workspace_tを用意してスレッド毎にメモリ領域を確保する。

基本関数

スレッド毎のメモリ領域確保

pthread_keyを知っていれば自明なコードである。

concurrent_skiplist.h:

/* static workspace_t *init_workspace(const int maxLevel)
 *
 * (スレッド毎に)skiplist *slの補助メモリ領域:predsとsuccsを確保する。
 *
 * 成功すると確保したメモリへのポインタを返す。失敗するとNULLを返す。
 */
static workspace_t *init_workspace(const int maxLevel)
{
    workspace_t *ws;

    if ((ws = (workspace_t *) calloc(1, sizeof(workspace_t))) == NULL) {
	elog("calloc error");
	return NULL;
    }

    if ((ws->preds = (skiplist_node_t **) calloc(1, sizeof(skiplist_node_t *) * maxLevel)) == NULL) {
	elog("calloc error");
	goto end;
    }
    if ((ws->succs = (skiplist_node_t **) calloc(1, sizeof(skiplist_node_t *) * maxLevel)) == NULL) {
	elog("calloc error");
	goto end;
    }

    return ws;

  end:
    free(ws->preds);
    free(ws);
    return (workspace_t *) NULL;
}

/*
 * void free_workspace(workspace_t * ws)
 *
 * wsを開放する。
 */
static void free_workspace(workspace_t * ws)
{
    free(ws->preds);
    free(ws->succs);
    free(ws);
}

/*
 * workspace_t *get_workspace(skiplist_t * sl)
 *
 * (スレッド毎に)初めて呼ばれた時はメモリ領域へ確保し、pthread_keyに対応させる。
 * 必ずスレッドに対応したメモリ領域へのポインタを返す。
 *
 */
static workspace_t *get_workspace(skiplist_t * sl)
{
  /* スレッド毎のメモリ領域へのポインタを得る */
  workspace_t *workspace = pthread_getspecific(sl->workspace_key);

  /* まだメモリ領域が確保されていない場合: */
  if (workspace == NULL) {
    /* メモリ領域の確保 */
    if ((workspace = init_workspace(sl->maxLevel)) != NULL) { 
      /* メモリ領域をpthread_keyに対応させる */
      if (pthread_setspecific(sl->workspace_key, (void *) workspace) != 0) {
	elog("pthread_setspecific() error");
	abort();
      }
    } else {
      elog("init_workspace() error");
      abort();
    }
  }
  assert(workspace != NULL);
  return workspace;
}

少し先行して、使い方を示す。

bool_t add(skiplist_t * sl, lkey_t key, val_t val)
{
  workspace_t *ws = get_workspace(sl); /* スレッド毎に確保された補助メモリ領域wsへのポインタを返す */
  assert(ws != NULL);
  return _add(sl, ws->preds, ws->succs, key, val); /* 実際にノードの追加を行う */
}
listの生成
/*
 * skiplist_t *init_list(const int maxLevel, const lkey_t min, const lkey_t max)
 *
 * skiplistを生成する。maxLevelは最大の高さ(レベル)を指定。
 * minはスキップリストのkeyの最小値(skiplist->head->keyに保持)、
 * maxはスキップリストのkeyの最大値(skiplist->tail->keyに保持)、
 *
 * 成功すればskiplistへのポインタを返す。失敗ならNULLを返す。
 *
 */
skiplist_t *init_list(const int maxLevel, const lkey_t min, const lkey_t max)
{
    int i;
    skiplist_t *sl;
    skiplist_node_t *head, *tail;

    if ((sl = (skiplist_t *) calloc(1, sizeof(skiplist_t))) == NULL) {
	elog("calloc error");
	return NULL;
    }

    sl->maxLevel = maxLevel;

    if ((head = create_node(maxLevel, min, min)) == NULL) {
	elog("create_node() error");
	goto end;
    }
    head->fullyLinked = true;

    if ((tail = create_node(maxLevel, max, max)) == NULL) {
	elog("create_node() error");
	goto end;
    }

    tail->fullyLinked = true;

    for (i = 0; i < maxLevel; i++) {
	head->next[i] = tail;
	tail->next[i] = NULL;
    }

    sl->head = head;
    sl->tail = tail;

    /* pthread_key_t workspace_keyの初期化 
     *  確保したメモリ領域の開放はfree_workspace()@concurrent_skiplist.hで行う。
     */
    if (pthread_key_create(&sl->workspace_key, (void *) free_workspace) != 0) {
	elog("pthread_key_create() error");
	abort();
    }

    return sl;

  end:
    free(sl->head);
    free(sl);
    return NULL;
}
ノードの検索

各種操作(add,delete,find)が使うノードの検索関数を示す。

/*
 *static int search(skiplist_t * sl, const lkey_t key, skiplist_node_t ** preds,
 *                                                     skiplist_node_t ** succs)
 *
 * keyを持つノードを探す。
 * ノードが見つかった場合:
 *  そのノードのレベル(高さ)を返す。またsuccsには目的のノードが保持するポインタ、
 *  predsにはsuccsを指すノードへのポインタを返す。
 *  例) 以下のスキップリストでkey[4]のノードを探す場合、
 *
 *level: head                                 tail
 *  3   [-inf]------>[2]--------------------->[inf]
 *  2   [-inf]------>[2]------>[4]------>[7]->[inf]
 *  1   [-inf]->[1]->[2]------>[4]->[5]->[7]->[inf]
 *  0   [-inf]->[1]->[2]->[3]->[4]->[5]->[7]->[inf]
 *   
 *  predsとsuccsはそれぞれ次のようになり、レベル2を返す。
 *                level: preds succs
 *                  2    [2]   [7]
 *                  1    [2]   [5]
 *                  0    [3]   [5]
 *   preds[2]=2は"レベル2では、ノード(key=2)がノード(key=4)を指している"、
 *   preds[1]=2は"レベル1では、ノード(key=2)がノード(key=4)を指している"、
 *   preds[0]=3は"レベル0では、ノード(key=3)がノード(key=4)を指している"、
 * を意味し、
 *   succs[2]=7は"レベル2では、ノード(key=4)がノード(key=7)を指している"、
 *   succs[1]=5は"レベル1では、ノード(key=4)がノード(key=5)を指している"、
 *   succs[0]=5は"レベル0では、ノード(key=4)がノード(key=5)を指している"、
 * を意味する。
 */
static int search(const skiplist_t * sl, const lkey_t key, skiplist_node_t ** preds, 
		  skiplist_node_t ** succs)
{
    int lFound, level;
    skiplist_node_t *pred, *curr;

    pred = sl->head;
    lFound = -1;

    for (level = sl->maxLevel - 1; level >= 0; level--) {
      curr = pred->next[level];

      while (key > curr->key) {
	pred = curr;
	curr = pred->next[level];
      }
      
      if (lFound == -1 && key == curr->key)
	    lFound = level;
     
      preds[level] = pred;
      succs[level] = curr;
    }

    return lFound;
}
ノードの追加
/*
 * bool_t add(list_t * list, const lkey_t key, const val_t val)
 *
 * listにnode(key,val)を追加する。
 * 追加位置はkeyの昇順に従う。すでに同じkeyを持つnodeがある場合は失敗。
 *
 * 成功すればtrue、失敗すればfalseを返す。
 */
bool_t add(skiplist_t * sl, lkey_t key, val_t val)
{
  workspace_t *ws = get_workspace(sl); /* スレッド毎に確保された補助メモリ領域wsへのポインタを返す */
  assert(ws != NULL);
  return _add(sl, ws->preds, ws->succs, key, val);
}

static bool_t
_add(skiplist_t * sl, skiplist_node_t ** preds, skiplist_node_t ** succs,
     const lkey_t key, const val_t val)
{
    int topLevel, highestLocked, lFound, level;
    skiplist_node_t *newNode, *pred, *succ, *nodeFound;
    bool_t valid;
    int r = rand();

    topLevel = (r % sl->maxLevel);
    assert(0 <= topLevel && topLevel < sl->maxLevel);

    /*
     * step 1: ノードの検索
     */
    while (1) {
      if ((lFound = search(sl, key, preds, succs)) != -1) {
	nodeFound = succs[lFound];
	if (nodeFound->marked != true) {
	  while (nodeFound->fullyLinked != true) {};
	  return false;
	}
	continue;
      }
      
    /*
     * step 2: preds[]に含まれるノードのロック
     */
      highestLocked = -1;
      
      valid = true;
      for (level = 0; (valid == true) && (level <= topLevel); level++) {
	pred = preds[level];
	succ = succs[level];
	
	lock(pred->mtx);  /* levelによって複数回lockされる可能性がある -> RECURSIVE_MUTEX */
	
	highestLocked = level;
	valid = (pred->marked != true) && (succ->marked != true)
	  && (pred->next[level] == succ);
      }
      
      if (valid != true) {
	for (level = 0; level <= highestLocked; level++)
	  unlock(preds[level]->mtx);
	continue;
      }

      assert(valid == true);
      for (level = 0; (level <= topLevel); level++) {
	pred = preds[level];	succ = succs[level];
	assert(pred->marked != true);	assert(succ->marked != true);
	assert(pred->next[level] == succ);
      }

      /*
       * step 3: ノード追加
       */      
      newNode = create_node(topLevel, key, val);
      
      for (level = 0; level <= topLevel; level++) {
	newNode->next[level] = succs[level];
	preds[level]->next[level] = newNode;
      }
      
      newNode->fullyLinked = true;
      break;      
    }

    /*
     * step 4: ロックの解除
     */    
    assert(highestLocked == topLevel);
    for (level = 0; level <= topLevel; level++)
      unlock(preds[level]->mtx);
    
    return true;
}

関数add()内部で呼ぶnode生成関数create_node()を示す。
Lazy Skiplistではノードが再帰的にロックされる可能性があるので、再帰的mutexとして初期化する。

/*
 * static skiplist_node_t *create_node(const int topLevel, const lkey_t key, const val_t val)
 *
 * レベル(高さ)topLevelで(key, val)を持つnodeを生成する。
 *
 * 成功すればnodeへのポインタを返す。失敗すればNULLを返す。
 */
#define node_size(level)	  (sizeof(skiplist_node_t) + ((level + 1) * sizeof(skiplist_node_t *)))

static skiplist_node_t *create_node(const int topLevel, const lkey_t key, const val_t val)
{
    skiplist_node_t *node;
#ifndef PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP
    pthread_mutexattr_t mtx_attr;
#endif

    if ((node = (skiplist_node_t *) calloc(1, node_size(topLevel))) == NULL) {
	elog("calloc error");
	return NULL;
    }

    node->key = key;
    node->val = val;
    node->topLevel = topLevel;

#ifdef PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP
    node->mtx = (pthread_mutex_t) PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;
#else
    pthread_mutexattr_init(&mtx_attr);
    pthread_mutexattr_settype(&mtx_attr, PTHREAD_MUTEX_RECURSIVE_NP);
    pthread_mutex_init(&node->mtx, &mtx_attr);
#endif

    node->marked = false;
    node->fullyLinked = false;

    return node;
}
nodeの削除
/*
 * bool_t delete(skiplist_t * sl, lkey_t key, val_t * val)
 *
 * listからkeyを持つnodeを削除し、そのnodeのvalを*valに書き込む。
 * keyが存在しない場合は失敗。
 *
 * 成功すればtrue、失敗すればfalseを返す。
 */
bool_t delete(skiplist_t * sl, const lkey_t key, val_t * val)
{
    workspace_t *ws = get_workspace(sl);
    assert(ws != NULL);
    return _delete(sl, ws->preds, ws->succs, key, val);
}

static bool_t
_delete(skiplist_t * sl, skiplist_node_t ** preds, skiplist_node_t ** succs,
	const lkey_t key, val_t * val)
{
    int topLevel = -1;
    int i, lFound, level, highestLocked;
    skiplist_node_t *pred, *victim;
    bool_t isMarked = false;
    bool_t valid, flag;
    
    while (1) {
      /*
       * step 1: 削除するノードの検索
       */
      victim = NULL;
      if ((lFound = search(sl, key, preds, succs)) == -1) /* 削除するノードが存在しない場合 */
	return false;
      
      victim = succs[lFound];
      flag = ((victim->fullyLinked == true) && (victim->topLevel == lFound) && (victim->marked != true));

      /* 初回、もしくはのみ実行 
       * => victim->marked = true; isMarked = true; topLevel = victim->topLevel; 
       */
      if (isMarked == true || ((lFound != -1) && flag)) {
	if (isMarked != true) {
	  topLevel = victim->topLevel;
	  
	  lock(victim->mtx);
	  if (victim->marked == true) {
	    unlock(victim->mtx);
	    return false;
	  }
	  victim->marked = true;
	  isMarked = true;
	}
	
	/*
	 * step 2: preds[]のロック獲得
	 */
	highestLocked = -1;
	valid = true;
	for (level = 0; (valid == true) && (level <= topLevel); level++) {
	  pred = preds[level];
	  lock(pred->mtx);
	  highestLocked = level;
	  valid = (pred->marked != true) && (pred->next[level] == victim);
	}
	if (valid != true) { /* preds[]のロック獲得に失敗した場合、やりなおし */
	  for (i = 0; i <= highestLocked; i++)	      unlock(preds[i]->mtx);
	  continue;
	}
	
	/*
	 * step 3: ノードの削除
	 */
	for (level = victim->topLevel; level >= 0; level--)
	  preds[level]->next[level] = victim->next[level];
	
	unlock(victim->mtx);
	for (i = 0; i <= lFound; i++) /* preds[]のロック開放 */
	  unlock(preds[i]->mtx);
	break;
      }
      else
	return false;
    }
    
    *val = victim->val;
    free_node(victim);

    return true;
}
nodeの検索
/*
 * val_t find(skiplist_t * sl, lkey_t key)
 *
 * skiplistからkeyを持つノードを検索する。ノードが存在する場合はそのノードのvalを返す。
 * keyが存在しない場合はNULLを返す。
 *
 */
val_t find(skiplist_t * sl, lkey_t key)
{
    workspace_t *ws = get_workspace(sl);
    assert(ws != NULL);
    return _find(sl, ws->preds, ws->succs, key);
}

static bool_t _find(skiplist_t * sl, skiplist_node_t ** preds,
		    skiplist_node_t ** succs, const lkey_t key)
{
    int lFound;

    lFound = search(sl, key, preds, succs);

    if (lFound == -1)
	return (val_t) NULL;
    if (succs[lFound]->fullyLinked != true || succs[lFound]->marked == true)
	return (val_t) NULL;

    return preds[0]->next[0]->val;
}

実行

ソースはGitHubに移行しました。



Last-modified: 2014-7-6