一致性哈希算法
Walter Lv1

云计算与分布式存储课堂上,老师提出了这么一个问题:

在一个场景中,如果有大规模的数据集(用户主机),如何将它们分配到有限的存储节点上。另外,假设一个存储节点(服务器主机)宕机了,如何能够快速地完成数据迁移。

由此引出了一致性哈希算法。

相较于传统的哈希算法,哈希算法 hash(key) % n,其中n为节点数,因此当n发生变化,在最坏的情况下可能要所有的数据都迁移。

一致性哈希的本质是通过某个 hash 函数把 key 映射到一个很大的环形空间里。

image

如图,存储某个数据,先计算出哈希值,然后沿着哈希环顺时针寻找到第一个存储节点。如k1和k2的存储节点是B。

如果B宕机了,就沿着哈希表继续顺时针迁移到C,存储节点B中的所有节点都会迁移到C。

image

存储节点的位置同样由哈希函数得到。

但是,这里有两个问题,由于哈希环的空间通常很大,存储节点不一定能均匀的分布在哈希环上,可能会出现某一个存储节点负载过多。

另外,当某一个主机宕机,很有可能的原因是它的负载过大,然而我们如果将其数据全部迁移到某个主机,很容易导致“雪崩效应”,导致大量主机宕机。

因此,引入“虚拟节点”,一个物理存储主机生成多个虚拟主机,可以提高主机在环上的分散程度。这样以来,即使某个主机宕机,它的数据集也能很均匀地迁移到其他主机的虚拟节点上,不会造成雪崩现象。

image

一致性哈希算法的代码演示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
#include <functional>
#include <iostream>
#include <map>
#include <random>
#include <string>
#include <vector>

// Hash函数,可以根据需求更改为其他函数,这里使用了std::hash
typedef std::function<size_t(const std::string &)> HashFunction;
// 示例哈希函数
size_t hashFunction(const std::string &key) { return std::hash<std::string>{}(key); }


// 一致性哈希类
class ConsistentHash {
private:
// 虚拟节点数量,可根据需求调整
const size_t virtualNodes;
// 哈希函数
HashFunction hashFunction;
// 圆环
std::map<size_t, std::string> hashRing;

// 物理节点负载信息,存储节点上的 IP 数量
std::map<std::string, std::vector<std::string>> nodeIPs;

public:
// 需要服务的IP地址
std::vector<std::string> ipDataset; // 将ipDataset作为类成员
// 构造函数
ConsistentHash(size_t vNodes, HashFunction func) : virtualNodes(vNodes), hashFunction(func) {}

void init(std::vector<std::string> nodes) {
for (const std::string &node : nodes) {
addNode(node);
}
for (auto &ip : ipDataset) {
getNode(ip);
}
}

// 生成模拟的 IP 地址数据集
void generateIPDataset(int dataSize) {
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<> dis(1, 255);

for (int i = 0; i < dataSize; ++i) {
std::string ip = std::to_string(dis(gen)) + "." + std::to_string(dis(gen)) + "." +
std::to_string(dis(gen)) + "." + std::to_string(dis(gen));
ipDataset.push_back(ip);
}
}

// 添加节点到哈希环
void addNode(const std::string &node) {
for (size_t i = 0; i < virtualNodes; ++i) {
// 为每个节点创建多个虚拟节点,用不同的哈希值
std::string virtualNodeName = node + "_" + std::to_string(i);
size_t hash = hashFunction(virtualNodeName);
hashRing[hash] = node; // 虚拟节点映射到实际节点
}
}

void resetRing() {
nodeIPs.clear(); // 清除负载记录
// 重新分配 IP 地址到虚拟节点
for (auto &ip : ipDataset) {
std::string selectedNode = getNode(ip);
}
}

// 删除节点及其对应的所有虚拟节点,并重新分配 IP 地址
void removeNode(const std::string &node) {
std::vector<size_t> virtualNodeHashesToRemove;

// 找到需要移除的节点对应的虚拟节点
for (auto it = hashRing.begin(); it != hashRing.end();) {
if (it->second == node) {
virtualNodeHashesToRemove.push_back(it->first);
it = hashRing.erase(it);
} else {
++it;
}
}

// 从节点负载信息中移除节点
nodeIPs.erase(node);

// 重新分配需要移除节点上的虚拟节点对应的 IP 地址到其他节点
for (size_t hash : virtualNodeHashesToRemove) {
for (auto &pair : nodeIPs) {
std::string newNode = pair.first;
if (newNode != node) {
size_t newHash = hashFunction(newNode + "_" + std::to_string(hash));
hashRing[newHash] = newNode;
}
}
}
}

// 根据给定的键查找对应的节点
std::string getNode(const std::string &key) {
size_t hash = hashFunction(key);
auto it = hashRing.lower_bound(hash);
// std::cout << it->first << std::endl;
if (it == hashRing.end()) {
// 如果超出了范围,返回第一个节点(环形结构)
it = hashRing.begin();
}
// 将 IP 添加到所选节点
std::string selectedNode = it->second;
nodeIPs[selectedNode].push_back(key);
return selectedNode;
}

// 获取节点负载信息,按照 IP 数量计算负载百分比
void statisticPerf() const {
std::map<std::string, double> loadPercentages;
size_t totalKeys = 0;

// Calculate total number of keys (IPs) on the entire hash ring
for (const auto &nodeIP : nodeIPs) {
totalKeys += nodeIP.second.size();
}

// Calculate load percentage for each node
for (const auto &nodeIP : nodeIPs) {
double percentage = (static_cast<double>(nodeIP.second.size()) / totalKeys) * 100.0;
loadPercentages[nodeIP.first] = percentage;
}

// Output node load percentages
std::cout << "Node Loads (%):\n";
for (const auto &load : loadPercentages) {
std::cout << load.first << ": " << load.second << "%" << std::endl;
}
}
};

int main() {
// 创建一致性哈希对象并传入MurmurHash3作为哈希函数
ConsistentHash consistentHash(1000, hashFunction); // 构造哈希环,设置虚拟节点个数
// 生成模拟的 IP 地址数据集
consistentHash.generateIPDataset(100000); // 生成模拟 IP 地址

// 初始化存储节点,分配IP到存储节点
std::vector<std::string> nodeList = {"node1", "node2", "node3", "node4"};
consistentHash.init(nodeList);
consistentHash.statisticPerf();

// 增加存储节点,重新分配所有IP地址
consistentHash.addNode("node5");
consistentHash.resetRing();
consistentHash.statisticPerf();

// 删除存储节点,重新分配该节点上的IP
consistentHash.removeNode("node3");
consistentHash.statisticPerf();

return 0;
}

代码中为了负载均衡,addNode后,会重新分配数据集到所有存储节点;

removeNode方法,只分配该待删除节点上的数据。

ConsistentHash consistentHash(1, hashFunction);
consistentHash.generateIPDataset(100000);

即不设置虚拟节点,可以发现负载情况非常不均衡

Node Loads (%):

node1: 22.478%

node2: 2.843%

node3: 49.858%

node4: 24.821%

ConsistentHash consistentHash(1000, hashFunction);
consistentHash.generateIPDataset(100000);

由于哈希函数的范围比较大,这里多设置几个虚拟节点和数据集,负载均衡。

–初始情况–
Node Loads (%):
node1: 24.96%
node2: 24.93%
node3: 25.333%
node4: 24.777%
–插入存储节点–
Node Loads (%):
node1: 19.978%
node2: 19.427%
node3: 19.634%
node4: 20.024%
node5: 20.937%
–删除存储节点–
Node Loads (%):
node1: 24.8588%
node2: 24.1732%
node4: 24.916%
node5: 26.0521%

 评论