Chain型ハッシュ: 並行処理編 Part 1

ソースはGitHubに移行しました。

Striped Hash Table (Stripedハッシュテーブル)

少しロックの粒度を細かくしたStripedハッシュテーブルを実装する。 ハッシュ全般とこのアルゴリズムの位置付けについてはこちらを必読

ソースはGitHubに移行しました。

なお、通常のアルゴリズム解説ならば必須の、ハッシュ関数に関する議論は一切行わない。 ここではハッシュ関数hashCode()を"keyをテーブルサイズで割った余り"で定義する。

static unsigned int hashCode(lkey_t key, const hashtable_t * ht)
{
    return (key % ht->table_size);
}

データ構造

データ構造node_t、list_t、hashtable_tを定義する。 hashtable_tはハッシュテーブルの本体、 list_tはハッシュテーブルの各要素に付属するList、node_tはListに連結するnodeのデータ構造である。
簡単に図示する。

mutex  hashtable
[0:x]   [0|head]->[4]->[16]->[32]
[1:o]   [1|head]
        [2|head]->[2]->[6]
        [3|head]->[3]->[7]->[10]

ハッシュテーブルにはロックを管理するテーブルmutexがあり、それぞれ特定のハッシュテーブル要素のロックを管理する。
上の例ではmutex[0]はhashtableの偶数要素、mutex[1]は奇数要素のロックを管理する。mutex[0]は"x"でロックされているので、 hashtable[0]とhashtable[2]はにはアクセスできない。
一方、mutex[1]はロックされていない"o"ので、hashtable[1]とhashtable[3]はアクセスできる。

以降、共通のデータ型として"common.h"に"bool_t"、"lkey_t"、"val_t"を定義する。 "bool_t"は自明、 "lkey_t"はnodeのkey、"val_t"はnodeのvalを保持する。

common.h:

typedef bool bool_t;
typedef uint64_t lkey_t;
typedef int64_t  val_t;

次に、nodeとlist、およびhashtableのデータ構造を示す。

StripedHash.h

typedef struct _node_t
{
  lkey_t key;            /* key */
  val_t value;           /* 値 */

  struct _node_t *next;  /* 次のnodeへのポインタ*/
} node_t;

typedef struct _list_t
{
  node_t *head;
} list_t;

typedef struct _hashtable_t
{
  unsigned long int setSize;        /* 全node数 */

  list_t *bucket;                   /* ハッシュテーブル本体 */
  unsigned int table_size;          /* ハッシュテーブルの大きさ(長さ) */

  list_t *old_bucket;               /* resizeする際に、一時的に利用するハッシュテーブル */
  unsigned int old_table_size;      /* old_bucketの大きさ = resize前のテーブルの大きさ */

  pthread_mutex_t *mtx;             /* ロック変数の配列 */
  unsigned int lock_size;           /* mtxの配列長 */
} hashtable_t;

node_tは(key,val)のペアと、次のnodeへのポインタを保持する。

list_tにはリストの先頭headが確保される。

hashtable_tには全ノード数を記録するsetSize、 ハッシュテーブル本体であるbucketとその大きさを記録するtable_size、 およびresize()でテーブルサイズを変更する際に、一時的にこれまでのテーブル(へのポインタ)を保持する old_bucketとそのサイズを記録するold_table_sizeがある。
さらに、全操作(add,delete,find)毎に ハッシュテーブルの特定の要素集団をロックするためのpthread_mutex変数mtxの配列がある。この配列の長さはlock_sizeである。

基本関数

ハッシュテーブルの生成
/*
 * bool_t *init_list(list_t *l)
 *
 * list_t *lにノードheadを生成する。
 *
 * 成功すればtrue、失敗ならfalseを返す。
 *
 */
static bool_t list_init(list_t * l)
{
  if ((l->head = (node_t *) malloc(sizeof(node_t))) == NULL) {
    fprintf(stderr, "%s():%s:%u: malloc error\n", __FUNCTION__,
	    __FILE__, __LINE__);
    return false;
  }
  l->head->next = NULL;
  return true;
}

/*
 * bool_t init_bucket(hashtable_t * ht, const unsigned int table_size)
 *
 * ハッシュテーブルhtの各要素を初期化(リストの初期化)する。
 *
 * 成功すればtrue、失敗すればfalseを返す。
 */
static bool_t init_bucket(hashtable_t * ht, const unsigned int table_size)
{
    unsigned int i;

    if ((ht->bucket =
	 (list_t *) malloc(sizeof(list_t) * table_size)) == NULL) {
      fprintf(stderr, "%s():%s:%u: malloc error\n", __FUNCTION__,
	      __FILE__, __LINE__);
      return false;
    }

    for (i = 0; i < table_size; i++)
      if (list_init(&ht->bucket[i]) == false) {
	return false;
      }
    return true;
}


/*
 * hashtable_t *init_hashtable(const unsigned int table_size)
 *
 * 大きさtable_sizeのハッシュテーブルを生成する。
 *
 * 成功すれば、そのポインタを返す。失敗すればNULLを返す。
 * 
 */
hashtable_t *init_hashtable(const unsigned int table_size)
{
    hashtable_t *ht;
    int i;

    if ((ht = (hashtable_t *) malloc(sizeof(hashtable_t))) == NULL) {
      fprintf(stderr, "%s():%s:%u: malloc error\n", __FUNCTION__,
	      __FILE__, __LINE__);
      return NULL;
    }

    ht->table_size = table_size;
    ht->lock_size = table_size;
    ht->setSize = 0;

    if (init_bucket(ht, ht->table_size) != true) {
	free(ht);
	return NULL;
    }

    if ((ht->mtx =
	 (pthread_mutex_t *) malloc(sizeof(pthread_mutex_t) *
				    (ht->lock_size))) == NULL) {
	free_bucket(ht->bucket, ht->lock_size);
	free(ht);
	fprintf(stderr, "%s():%s:%u: malloc error\n", __FUNCTION__,
		__FILE__, __LINE__);
	return NULL;
    }

    for (i = 0; i < ht->lock_size; i++)
	ht->mtx[i] = (pthread_mutex_t) PTHREAD_MUTEX_INITIALIZER;

    return ht;
}
ロックのプリミティブ

このアルゴリズムの肝はロックの分散だが、それを行うマクロlockKeyと、 ロック、アンロック関数を示す。

#define lockKey(ht, hashkey)   (hashkey % ht->lock_size)

static void lock(hashtable_t * ht, const unsigned int hashkey)
{
    unsigned int key = lockKey(ht, hashkey);
    pthread_mutex_lock(&ht->mtx[key]);
}

static void unlock(hashtable_t * ht, const unsigned int hashkey)
{
    unsigned int key = lockKey(ht, hashkey);
    pthread_mutex_unlock(&ht->mtx[key]);
}
ノードの追加
/*
/*
 * bool_t add_node_op(list_t * l, node_t * newNode)
 *
 * ノードnewNodeをリストlに追加する。
 *
 * 成功すればtrue、失敗ならfalseを返す。
 *
 */
static bool_t add_node_op(list_t * l, node_t * newNode)
{
    node_t *pred, *curr;
    bool_t ret = true;

    pred = l->head;
    curr = pred->next;

    if (curr == NULL) {
	l->head->next = newNode;
	newNode->next = NULL;
    } else {

	while (curr != NULL && curr->key < newNode->key) {
	    pred = curr;
	    curr = curr->next;
	}

	if (curr == NULL || newNode->key != curr->key) {
	    newNode->next = curr;
	    pred->next = newNode;
	} else
	    ret = false;

    }
    return ret;
}


/*
 * bool_t add_node(list_t * l, const lkey_t key, const val_t val)
 *
 * リストlにノード(key,val)を生成して追加する。
 *
 * 成功するとtrue、失敗するとfalseを返す。
 *
 */
static bool_t add_node(list_t * l, const lkey_t key, const val_t val)
{
    node_t *newNode;

    if ((newNode = create_node(key, val)) == 0)
	return false;

    if (add_node_op(l, newNode) != true) {
	free_node(newNode);
	return false;
    }

    return true;
}


/*
 * bool_t add(hashtable_t * ht, const lkey_t key, const val_t val)
 *
 * ハッシュテーブルhtにnode(key,val)を追加する。
 *
 * 成功すればtrue、失敗すればfalseを返す。
 */
bool_t add(hashtable_t * ht, const lkey_t key, const val_t val)
{
    unsigned int myBucket, table_size;
    bool_t ret = false;
    int retry = 2;


    do {
	table_size = ht->table_size;
	myBucket = hashCode(key, ht);	/* T1: */

	lock(ht, myBucket);	/* T2: */

	if (table_size == ht->table_size)	/* T1とT2の間でresize()が実行された場合の対処 */
	{
	    if (add_node(&ht->bucket[myBucket], key, val) == true) {
		ht->setSize++;
		ret = true;
		unlock(ht, myBucket);
		break;
	    }
	}

	unlock(ht, myBucket);
    }
    while (retry-- > 0);

    if (policy(ht)) {
      resize(ht);
      fprintf (stderr, "Resized\n");
    }
    return ret;
}
関数add_node()内部で呼ぶnode生成関数create_node()を示す。
/*
 * node_t *create_node(const lkey_t key, const val_t val)
 *
 * (key, val)を持つnodeを生成する。
 *
 * 成功すればnodeへのポインタを返す。失敗すればNULLを返す。
 */
static node_t *create_node(const lkey_t key, const val_t val)
{
    node_t *newNode;
    if ((newNode = (node_t *) calloc(1, sizeof(node_t))) == NULL) {
      elog("calloc error")
      return NULL;
    }

    newNode->key = key;
    newNode->value = val;
    return newNode;
}
nodeの削除
/*
 * bool_t delete_node(list_t * l, const lkey_t key, val_t * getval)
 *
 * リストlからノード(key,val)を削除する。
 *
 * 成功するとgetvalにvalを代入してtrueを返す。失敗するとfalseを返す。
 *
 */
static bool_t delete_node(list_t * l, const lkey_t key, val_t * getval)
{
    node_t *pred, *curr;

    pred = l->head;
    curr = pred->next;

    if (curr == NULL)
	return false;

    while (curr != NULL && curr->key < key) {
	pred = curr;
	curr = curr->next;
    }

    if (key == curr->key && curr != NULL) {
	*getval = curr->value;
	pred->next = curr->next;
	free_node(curr);
    } else
	return false;

    return true;
}


/*
 * bool_t delete(hashtable_t * ht, const lkey_t key, val_t * getval)
 *
 * ハッシュテーブルhtからkeyを持つnodeを削除し、そのnodeのvalを*valに書き込む。
 * keyが存在しない場合は失敗。
 *
 * 成功すればtrue、失敗すればfalseを返す。
 */
bool_t delete(hashtable_t * ht, const lkey_t key, val_t * getval)
{
    unsigned int myBucket, table_size;
    bool_t ret = false;
    int retry = 2;

    do {
	table_size = ht->table_size;
	myBucket = hashCode(key, ht);	/* T1: */

	lock(ht, myBucket);	/* T2: */

	if (table_size == ht->table_size)	/* T1とT2の間でresize()が実行された場合の対処 */
	{
	    if (delete_node(&ht->bucket[myBucket], key, getval) == true) {
		ht->setSize--;
		ret = true;
		unlock(ht, myBucket);
		break;
	    }
	}
	unlock(ht, myBucket);
    }
    while (retry-- > 0);

    return ret;
}
ノードの検索
/*
 * bool_t find_node(list_t * l, lkey_t key)
 *
 * リストlにkeyを持つノードがあるか否か調べる。
 *
 * 成功すればtrue、失敗すればfalseを返す。
 */
static bool_t find_node(list_t * l, lkey_t key)
{
    node_t *pred, *curr;

    pred = l->head;
    curr = pred->next;

    if (curr == NULL) {
	return false;
    } else {

	while (curr != NULL && curr->key < key) {
	    pred = curr;
	    curr = curr->next;
	}

	if (curr == NULL || key != curr->key)
	    return false;
    }
    return true;
}


/*
 * bool_t find(hashtable_t * ht, const lkey_t key)
 *
 * ハッシュテーブルhtからkeyを持つnodeを検索し、そのnodeのvalを*valに書き込む。
 * keyが存在しない場合は失敗。
 *
 * 成功すればtrue、失敗すればfalseを返す。
 */
bool_t find(hashtable_t * ht, const lkey_t key)
{
    unsigned int myBucket = hashCode(key, ht);
    bool_t ret;

    lock(ht, myBucket);
    ret = contains_node(&ht->bucket[myBucket], key);
    unlock(ht, myBucket);

    return ret;
}

ハッシュテーブルの再構築(resize())
static bool_t policy(hashtable_t * ht)
{
    return ((int) (ht->setSize / ht->table_size) > 4 ? true : false);
}

static void resize(hashtable_t * ht)
{
    list_t *l;
    node_t *pred, *curr;
    unsigned int i, myBucket, table_size;

    table_size = ht->table_size;
    for (i = 0; i < ht->lock_size; i++)
	lock(ht, i);

    if (table_size != ht->table_size) {
	for (i = 0; i < ht->lock_size; i++)
	    unlock(ht, i);
	return;
    }

    ht->old_table_size = ht->table_size;
    ht->old_bucket = ht->bucket;

    if (init_bucket(ht, ht->table_size * 2) == false)
	return;

    ht->table_size *= 2;

    for (i = 0; i < ht->old_table_size; i++) {
	l = &ht->old_bucket[i];

	pred = l->head;
	curr = pred->next;
	while (curr != NULL) {
	    pred->next = curr->next;	/*  <==>   next = curr->next; pred->next = next; */

	    myBucket = hashCode(curr->key, ht);
	    add_node_op(&ht->bucket[myBucket], curr);

	    curr = pred->next;
	}
    }

    for (i = 0; i < ht->lock_size; i++)
	unlock(ht, i);

    free_bucket(ht->old_bucket, ht->old_table_size);

}

実行

ソースはGitHubに移行しました。

議論

ここで、デッドロックの可能性についてSpinによるモデル化で検証する。
この程度の話ならロックの順番を決めることでデッドロックに陥らないことは自明に近いが、 複数のアルゴリズムを組み合わせたり(ハッシュのresize時にメモリ管理用のロックを獲得するとか)、 アルゴリズム自体を拡張する場合に、モデル化による仕様検証をしておくのは重要になってくる。
特に並行・並列プログラミングには必須と思うので、小さな問題から実践していく。

/* ---------------------------------------------------------------------------
 * 
 * author: suzuki hironobu (hironobu@interdb.jp) 2009.Nov.11
 * Copyright (C) 2009  suzuki hironobu
 *
 * ---------------------------------------------------------------------------
 */
mtype = {Lock, Unlock};

mtype mutex_0 = Unlock;
mtype mutex_1 = Unlock;

inline lock (mtx)
{
	if
	:: atomic {(mtx == Unlock) -> mtx = Lock}
	fi
}

inline unlock (mtx)
{
	if
	:: atomic {(mtx == Lock); mtx = Unlock}
	fi
}

active proctype thread_0 ()
{
	do
	:: 	lock(mutex_0) -> lock(mutex_1);
		skip; /* critical section */
		unlock(mutex_1) -> unlock(mutex_0);

	::	lock(mutex_0) -> skip; unlock(mutex_0);

	::	lock(mutex_1) -> skip; unlock(mutex_1);
	od
}

active proctype thread_1 ()
{
	do
	:: 	lock(mutex_0) -> lock(mutex_1);
		skip; /* critical section */
		unlock(mutex_1) -> unlock(mutex_0);

	::	lock(mutex_0);
		skip;  /* critical section */
		unlock(mutex_0);

	::	lock(mutex_1);
		skip;  /* critical section */
		unlock(mutex_1);
	od
}



Last-modified: 2014-7-6