在多线程环境下,我们常常用Java的ConcurrentHashMap,但其实这个Map仍然是要使用锁的,只不过使用了一种被称为StripeLock的方式。这里我们试着实现一个完全无锁的HashTable。
首先我们必须弄清楚HashMap有哪些操作,哪些地方是临界区(CriticalRegion),哪里不是。一个完整的HashMap无非就以下几种操作:
1. 插入数据(PUT);
2. 取数据(GET);
3. 扩容(RESIZE);
4. 删除数据(DELETE)。
这里我们使用链表数据结构的Map。这种数据结构的特点是,解决冲突是将冲突数据持到上一个之后。
插入数据(PUT):
单线程的插入我想大家理解都不是问题,插入的过程分成三步:
1)新建一个结点node;
2)将head指针的next值赋值给node的next域;
3)将head指针指向新建的node。
在并发环境下,问题三于上述上个步骤并不是原子的。例如线程1进行到第二步时,如果另一个线程2一次性完成了以上三个步骤,这个时候线程1再进行3时就会造成线程插入的数据丢失。这种丢失在C语言中会直接导致内存泄漏(MemoryLeak)。
所以问题的关键的就于在完成第三步的时候必须确保head指针仍处于第一步操作时的状态,这样才能正确完成操作,这里我们就要用到CAS操作,Linux下glibc自带这个方法。CAS(CompareAnd Swap)方法签名如下:
T __sync_val_compare_and_swap(T* __p, T __compVal, T __exchVal, ...);
其中__p指的是指向数据类型T的指针,__compVal是要比较的值,__exchVal是如果__p与__compVal相等,则需要向__p指向的数据域赋值的数据。如果__p与__compVal不相等,则什么也不做。刚才说过了,INSERT在并发操作时的问题在于在操作head时,head指向的值已经不是INSERT开始时的值,所以我们可以利用__sync_val_compare_and_swap来进行原子操作,如果不成功,则重新开始插入操作。这样,当head发生变化时,就不会对head进行操作,取而代之的是重头开始执行插入操作,保证了整体的一致性。以C代码为例,实现的代码如下:
- void insert(Node* head, Key key, Value value)
- {
- Node* new_entry = malloc(sizeof(Entry));
- new_entry->key =key;
- new_entry->value= value;
- // use CAS to enablelock free
- while(1)
- {
- prev_head = head;
- new_entry->next = (struct Entry*)prev_head;
- if (CAS(&head,prev_head, new_entry) == prev_head)
- {
- break;
- }
- }
- }
为了方便取数据,我们需要对插入的数据进行排序(原因后面会说到),按key降序排列。这样在查找时当key>= cur->key时我们就可以停止向后搜寻了。
- Node* cur = head;
- Node* prev = cur;
- while(cur != NULL && key < cur->key)
- {
- prev =cur;
- cur =cur->next;
- }
- insert(&prev, key, value);
但如果这里不进行特殊处理,就会出现一个问题:在并发环境下,节点随时有可能被插入,但while中的语句并不是原子的,这就可能得到错误的插入点。例如有 key序列 10 9 5 2,在插入3时,我们遍历到节点5,发现5之后为2,满足while条件,于是我们认为应该在这里进行插入。但如果这个时候另一个线程在5之后插入了4,则显然正确的插入点应该变成4,而上述代码仍按5进行插入,则会导致乱序,变为10 9 5 3 4 2。同上,为了解决这个问题,每当指针发生变化时(见下),我们都必须重新查找:
- void insert(Node* head, Key key, Value value)
- {
- Entry* new_entry = malloc(sizeof(Entry));
- new_entry->key =key;
- new_entry->value= value;
- // use CAS to enablelock free
- while(1)
- {
- Node** prev = &head;
- Node* cur = head;
- while(cur != NULL && key < cur->key)
- {
- prev = &cur->next;
- cur = cur->next;
- }
- new_entry->next= *prev;
- if (CAS(prev, cur, new_entry) == cur)
- {
- break;
- }
- cur = *prev;
- }
- }
这样,当选定插入点后,如果有其它线程抢占插入,则if (CAS(prev, cur, new_entry) == cur)条件便不会成立,肯定以下三种情况之一发生了:
1、prev之后有新的结点插入;
2、cur指向的结点被删除了;
3、next被删除了。
这个时候就需要重新进行选定插入点的工作。
取数据(GET):
因为我们使用的是链接方式来解决冲突的,如果Hash对应的结点已经挂了多个数据,那么我们在取数据的时候就需要遍历所挂的节点来找到我们真正需要的数据,这里就可能出现因为数据后期插入而我们的游标已经移到其之后而返回没有对应数据这种不一致的情况。例如有key列表: 10 9 5 2,我们要搜索结点3,当我们遍历到5时,如果另一线程B在头处插入了3,由于遍历的时间复杂度为O(n),当数据量大时,就能明示察觉到这种延迟了,显然达不到设计要求。就会返回不存在该值的结果。为了解决这个问题,我们可以对链表进行排序来解决这个问题,当条件 if(key < cur->key) 满足时,这个时候要我们要查找的数据只会在cur之后,所以我们可以放地把cur向后移,直到上述条件不成立,这个时候我们再判断就可以了。为什么这样可以解决问题,因为排序后如果插入数据的key大于当前的,显示它会被插到cur之前,如果小于,至少也是在cur之后,最差的情况就是刚好在prev与cur之间,这时我们只需要重新扫描一次就可以了(其实也可以从prev处开始,但我们使用的是单链表,无法得知prev的prev了)。为了保险,我们可以再加一次判断。代码如下:
- Value find(Node* head, Key key)
- {
- Node** prev;
- Node* cur;
- Node** next;
- Value cvalue;
- Key ckey;
- while(1)
- {
- prev = &head;
- cur = head;
- while(1)
- {
- if(cur == NULL)
- {
- return NULL;
- }
- next = &cur->next;
- ckey = cur->key;
- cvalue = cvalue->value;
- if(*prev != cur)
- {
- break; // The list has been modified, start over
- }
- if(key >= ckey)
- {
- return key == ckey ? cvalue : NULL;
- }
- // else keep looking
- prev = next;
- cur = *next;
- }
- }
- }
为发增加find的利用价值,我们可以在find函数里带出prev,cur,next指针,以供其它函数使用,签名如下:
- Value find(Node* head, Key key, Node*** prev, Node** cur, Node*** next)
具体代码实现详见DELETE小节。
虽然这样做后仍然有可以出现另一线程在(prev_cur, cur]之间插入数据的可能,但这已经大大减少了其可能性,误差时间在实时操作范围之内。
代码中的条件语句
- <del>if(*prev != cur)
- {
- <span style="font-family: Arial, Helvetica, sans-serif;">break; // The list has been modified, start over</span>
- }</del>
对于这种情况,首先我们可以对key进行排序来简化此问题,当对key进行排序后,扫描后的结点key都是比搜索key大的结点,因此即便有其它线程插入了新结点,也不会是所求的结点,我们可以忽略此种插入。我们唯一需要关心的就是当 key >= cur->key时,因为这时候需要搜索的结点很可能就在 (prev_cur, cur] 区间,如果这时候另一线程向此区间插入了对应结点,则我们就可能错过。,假设我们搜索结点3,在遍历到2时,我们发现 2 < 3,即后面不可能有等于3的值了(key已经按降序排列),这个时候,如果另一个线程向5后插入了3,则我们仍然返回的是没有找到数据。这就会造成不一致的情况。为了解决这个问题,我们必须在prev_cur变更时进行重新搜索,解决方案也很简单,类似于上面的插入操作。代码如下:
删除数据(DELETE):
现在我们来谈谈数据的删除操作。要想删除数据首先我们得找到对应的数据结点,然后更新链表即可。假设当前结点前,中,后对应的指针分别为prev, cur, next,操作过程如下:
1)prev = cur->next;
2)free(cur);
在并发环境下有可能发生这样的情况
1)另一线程在prev与cur间插入了新的结点,这个时候如果再执行prev = cur->next就会导致新插入的结点丢失
2)另一线程在cur与next间插入了数据,同上,这种情况也会导致新插入的结点丢失去。
上述两种情况都会导致内存泄露,是不是接受的。我们可以使用CAS操作来完成删除操作。代码如下:
- int remove_node(Key key)
- {
- Node** prev;
- Node* cur;
- Node** next;
- while(1)
- {
- if(find(key, &prev, &cur, &next) == NULL)
- {
- return 0;
- }
- if(CAS(prev, cur, *next) == cur)
- {
- free(cur);
- }
- }
- }
但是这里有个问题,就是在并发环境下,新插入的结点如果恰好在被删结点之后,上面的算法就可能导致数据丢失。为了解决这个问题,我们需要在删除之前对其进行标记,此标记是个boolean型。当然我们可以简单地添加一个short型什么的,但由于C在编译时会进行内存对齐,所以其可能占用更多的空间。例如:
- Entry
- {
- void* key;
- void* value;
- short tag;
- }
则这个Node在32位机上实际占用的空间为12bytes,关于更多的内存对齐信息,可以访问 http://en.wikipedia.org/wiki/Data_structure_alignment。
这里有个小技巧,因为指针都是按照机器字进行对齐的,所以最低位始终为0(不然就是奇数了),我们可以利用其存储标记位,为了方便,我们将其中一些操作定义成宏:
- typedef uintptr_t marked_ptr_t;
- #define MARK_OF(x) ((x) & 1) // 对最低位进行标记
- #define PTR_MASK(x) ((x) & ~(marked_ptr_t)1)
- #define PTR_OF ((Node*)PTR_MASK(x)) // 取出除第1位的所有数据,组成Node的指针
- #define CONSTRUCT(mark, ptr) (PTR_MASK((uintptr_t)ptr) | (mark)) // 将Node指针的最低位置成mark(0或1)
注意,由于指针并不属于数值型数据,我们不能直接其进行位操作,所以我们需要把其转换成uintptr_t。
那么新的代码就应该是这样的了:
- int remove_node(marked_ptr_t* head, const char* key)
- {
- marked_ptr_t cur;
- marked_ptr_t* prev;
- while(1)
- {
- if(find(head, key, &prev, &cur) == NULL)
- {
- return 0;
- }
- if (CAS(prev, CONSTRUCT(0, cur), CONSTRUCT(1, cur)) != CONSTRUCT(0, cur)) {continue;}
- if (CAS(prev, CONSTRUCT(1, cur), PTR_OF(cur)->next) == CONSTRUCT(1, cur))
- {
- free(PTR_OF(cur)); // may be we should consider IBM freelist
- }
- else
- {
- find(head, key, NULL, NULL); // use find to remove the marked node
- }
- return 1;
- }
- }
那么由于标记的引入,我们还需要对find方法进行一些修改,也有就是如果该结点已经被标记了,我们不能使用之,如下:
- void* find(marked_ptr_t* head, const char* key, marked_ptr_t** prev, marked_ptr_t* cur)
- {
- marked_ptr_t* tp_prev;
- marked_ptr_t tp_cur;
- marked_ptr_t* tp_next;
- if(PTR_OF(*head) == NULL)
- {
- if(prev) {*prev = head;};
- if(cur){*cur = *head;};
- return NULL;
- }
- while(1)
- {
- tp_prev = head;
- tp_cur = *head;
- while(PTR_OF(tp_cur) != NULL)
- {
- tp_next = &PTR_OF(tp_cur)->next;
- if(*tp_prev != tp_cur)
- {
- break; // someone has mucked with the list, start over
- }
- if(MARK_OF(tp_cur))
- {
- if (CAS(tp_prev, CONSTRUCT(1, tp_cur), tp_next) == CONSTRUCT(1, tp_cur)) {
- free(PTR_OF(tp_cur)); // 标记1
- tp_cur = *tp_next;
- continue;
- } else {
- break; //start over
- }
- }
- if (key >= PTR_OF(tp_cur)->key)
- {
- if(prev){*prev = tp_prev;};
- if(cur){*cur = tp_cur;};
- return strcmp(key, PTR_OF(tp_cur)->key) == 0 ? PTR_OF(tp_cur)->value : NULL;
- }
- tp_prev = (marked_ptr_t*)&PTR_OF(tp_cur)->next;
- tp_cur = (marked_ptr_t)*tp_next;
- }
- return NULL;
- }
- }
这里请大家注意一下“标记1“处的代码,我们使用的是直接free的形式,这种形式的问题在于,我们在free的时候,可能另一个线程已经将其指针返回了,即有其它线程在引用。这个时候如果我们将其释放了必然会导致另一个线程段错误。这里我们可以简单地使用“引用计数”来解决这个问题,为此这个引用计数可以放在Node里或者返回的数据里,也就是说free的时候只free Node本身,为了简单我们选择后一种,即放在数据里,这样Value如下:
- typedef struct
- {
- Data data;
- volatile uint32_t ref_count;
- } Value;
这样我们就需要增加两个函数incr_ref 与 desc_ref,从概念上来说,desc_ref 可以这么写:
- void desc_ref(Value* value)
- {
- if(__sync_sub_and_fetch(&value->ref_count, 1) == 0)
- {
- free(value->data);
- free(value);
- return;
- }
- }
incr_ref 要稍稍复杂一些,因它必须保证 ref_count > 0 再在其基础上 +1 整个是个原子操作,否则就可能拿到脏数据。
- Value* incr_ref(Value* value)
- {
- uint32_t old_count;
- while(1)
- {
- old_count = value->ref_count;
- if(old_count <= 0)
- {
- return NULL; // already deleted
- }
- if(CAS(&value->ref_count, old_count, old_count + 1) == old_count)
- {
- break;
- }
- }
- if (value->tag != "0x32da2f") // 标记1
- {
- return NULL;
- }
- return value;
- }
大家注意“标记1”处的代码,为什么需要有这个tag呢? 这是个非常有趣的并发问题,在超高并发环境下,有可能出现incr_ref时,Value的内存已经被释放并分配给其它线程且存入了对应的数据,大家都知道C里同进程没有内存保护的概念,这个时候Value->ref_count仍然是可以取到数据的,只不过可能是其它的数据什么的,这时很可能就是一个大于0的值,incr_ref自然会正常返回,这里再去操作value就会有意想不到的结果,非常危险!所以我们需要指定个随机标识来确定这块内存还是不是我们认为的Value。当然要解决这个问题还其它很多方法了,这里只是起个抛砖引玉的作用了。
-------------------UPDATE 2013/05/24
!上述引用计数在高压测试时仍会出现heap corrupt错误!
原因是原引用计数逻辑没有写好。在高并发下实现引用计数是比较困难的,原因是引用+1是必须保证原引用值>0(否则已经进入删除状态),但有可能去获取引用计数值的时候可能已经无效了。尽管我使用了cas操作保证每次计数的增减是原子的,但线程在拿到针指时可能是已经被其它线程free掉的数据,由于里面的数据具有不确定性,这时如果按正常流程去访问就会取到一个大于0的数据,就会误认为该数据可以引用,从而反回错误的指针。
为此最开始我向数据块里放置了一个tag,用以标记该数据块有没有被删除,代码如下:
在要返回时判断是否是有效内存,这样做确实能够保证在不会返回free掉的数据块。这里面有个问题,tag的判断与CAS操作引用计数+1不是原子的,那么就会有这样一种情况,CAS成功了,但TAG判断失败了,这样确实不会返回错误的指针,但原数据块由于被CAS操作+1了,就会造成堆错误,引发各种奇特的问题。
01 | Value* incr_ref(Value* value) |
02 | { |
03 | uint32_t old_count; |
04 |
05 | while(1) |
06 | { |
07 | old_count = value->ref_count; |
08 |
09 | if(old_count <= 0) |
10 | { |
11 | return NULL; // already deleted |
12 | } |
13 |
14 | if(CAS(&value->ref_count, old_count, old_count + 1) == old_count) |
15 | { |
16 | break; |
17 | } |
18 | } |
19 |
20 | if (value->tag != "0x32da2f") // 标记1 |
21 | { |
22 | return NULL; |
23 | } |
24 |
25 | return value; |
26 | } |
解决的方法是将ref_count申请为64bit的uint64_t,将tag存在高32位,引用使用低32位,这样就可以利用一次CAS完成操作了。代码如下:
01 | IPListData* incr_ref(IPListData* ip_list_data) |
02 | { |
03 | uint32_t old_count; |
04 |
05 | while(1) |
06 | { |
07 | if(!IS_REFER_COUNT(ip_list_data->ref_count)) |
08 | { |
09 | return NULL; |
10 | } |
11 |
12 | old_count = READ_COUNT_VALUE(ip_list_data->ref_count); |
13 |
14 | if(old_count <= 0) |
15 | { |
16 | return NULL; // already deleted |
17 | } |
18 |
19 | if(CAS(&ip_list_data->ref_count, BUILD_REFER_COUNT(old_count), BUILD_REFER_COUNT(old_count + 1)) == BUILD_REFER_COUNT(old_count)) |
20 | { |
21 | break; |
22 | } |
23 | } |
24 |
25 | return ip_list_data; |
26 |
27 | } |
扩容(RESIZE):
在并发环境下,对hashtable进行扩容主要有两个问题:
1)由于每个key hash过后对应唯一的一个entry下标,在扩容后,对应的下标就会发生变化。例如如果我们采用对key取模来计算hash值,假设初始hashtable的大小为4,则5对应的地址为 5 % 4 = 1,当hashtable扩容到8以后,5对应的下标却成了5。这样,再次使用5作用key去取数据时便不能获取到数据了;
2)如果不能够继续分配连续的地址空间,则需要将原来的数据拷贝到新申请的地址空间,虽然我们使用的hashtable的链表实现方式,可以免去对具体元素的拷贝,但这样仍然需要复制指针,这里就很难保证一致性了。
对于这些问题,首先想到的就是一致性hash,但一致性hash也存在部分节点的迁移问题,如果采取先迁移再启用的机制,那么又会存在一个增量迁移(迁移速度能否跟上数据插入速度)的问题。为了解决这个问题我们可以预分配地址空间,事先把最大的bucket数量确定下来,一般一个整页为宜(在32位机器上一个指针的大小是4B,NTFS页文件大小为4KB,这样我们至少可以存储下1K个bucket,假设一个bucket链10个数据,一共就有10K存储空间,一般使用已足够。而且这样连初始化都省略了,非常方便~代码如下:
- typedef struct
- {
- marked_ptr_t buckets[MAX_NODE_SIZE];
- uint32_t size;
- } LF_HashTable;
C语言实现(IMPLEMENT):
好了,说了这么说,现在我们把上面的思想汇聚起来,最终的代码可以在这里找到:
https://github.com/sefler1987/lf_table
还有什么问题(PROBLEMS):
1)Node的分配释放问题,本文里对Node的创建释放都采取的是最简单的malloc与free方法, 这种方法效率低,不利于资源的回收,可以采用freelist或者pool的形式;
2)由于buckets数据是固定的,如果最开始的buckets没有确定好,在单bucket的load_factor较高时会退化成线性查找。
评论列表: