皮鞋,湿而不胖!
说好的,周末写一篇关于reuseport的。凌晨一点多被正则给吵醒,索性一气之下就起床了,发周报,梳理工作,回答问题,写本文。
大家从来都没质疑过reuseport说明大家都不在乎reuseport。既然大家都不在乎,那我就来说说大家都不在乎的东西。大家都不管其实不是大家都不管,并不是大家觉得这很牛逼,实际上,真正使用这个机制的公司或者个人,早就偷偷地把它的实现机制给改掉了,只是很少有人公开罢了。
门槛永远是简单的算法,而不是复杂的工程!
Linux内核在3.9引入的reuseport的思路是好的,它第一次使得一组socket之间从热备关系变成了负载均衡关系,但是它的实现是垃圾的,不仅仅存在查找socket时的O(n)问题,而且根本就没法实现一致性哈希,这让一组相互负载均衡的进程很难被管理,一损俱损。
Linux 4.6内核对reuseport进行了重构,解决了O(n)问题,但是依然很难实现一致性哈希。虽然说它自带了bpf的支持,可以从用户态灌入哈希算法实现特殊的socket查找逻辑,但是说实话,这只是提供了另一种获取hash输入的方法,对于保持服务器端socket集群的一致性,几乎没有什么实际的用处。
关于socket的reuseport,详见我去年写的一篇文章:
关于Linux UDP/TCP reuseport 二三事: https://blog.csdn.net/dog250/article/details/80458669
本文以Linux 4.9内核为基础版本来进行实际操作。
先来看一下reuseport的一致性哈希问题。
我们实现一个简单UDP服务器程序,用于处理客户端的请求,在本例中,所谓的处理请求仅仅是打印接收到的消息。代码如下:
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <sys/socket.h>
#include <netinet/in.h>
#define PORT 8080
#define MAXLINE 1024
int main(int argc, char **argv)
{
int sockfd;
char buffer[MAXLINE];
struct sockaddr_in server, client;
int optval = 1;
int len;
int ret;
memset(&server, 0, sizeof(server));
memset(&client, 0, sizeof(client));
if ((sockfd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
perror("socket");
return -1;
}
if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEPORT, &optval, sizeof(optval)) < 0) {
perror("bind");
return -1;
}
server.sin_family = AF_INET;
server.sin_port = htons(PORT);
server.sin_addr.s_addr = INADDR_ANY;
if (bind(sockfd, (const struct sockaddr *)&server, sizeof(server)) < 0) {
perror("bind");
return -1;;
}
while (1) {
ret = recvfrom(sockfd, (char *)buffer, MAXLINE,
MSG_WAITALL, ( struct sockaddr *) &client, &len);
buffer[ret] = '\0';
printf("recv :%s\n", buffer);
}
return 0;
}
然后在另一台直连的机器上部署一系列的客户端,简单的持续不断地发送单一的字符串:
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <sys/socket.h>
#include <netinet/in.h>
#define PORT 8080
#define MAXLINE 1024
int main(int argc, char **argv)
{
int sockfd;
char *ser = argv[1];
char *buff = argv[2];
struct sockaddr_in server;
int ret;
int len;
if ((sockfd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
perror("socket");
return -1;
}
memset(&server, 0, sizeof(server));
server.sin_family = AF_INET;
server.sin_port = htons(PORT);
server.sin_addr.s_addr = inet_addr(ser);;
while (1) {
sendto(sockfd, (const char *)buff, strlen(buff),
MSG_CONFIRM, (const struct sockaddr *) &server, sizeof(server));
printf("%s\n", buff);
sleep(1);
}
close(sockfd);
return 0;
}
现在让我们在服务器端启动两个server进程,可想而知,它们互为reuseport而负载均衡,同时在客户端启动三个client进程,不断重试变换源端口,直到三个client进程分别被负载在三个不同的server进程,其中client进程的启动方式如下:
[root@localhost TEST]# ./a.out 172.16.1.2 111 &
[root@localhost TEST]# ./a.out 172.16.1.2 222 &
[root@localhost TEST]# ./a.out 172.16.1.2 333 &
然后观测server进程:
非常OK!完美的负载均衡!
此时,我们把第一个启动的server进程重启,看看会发生什么。第一个启动的server是最左边的那个,我们把它重启:
重启后,第一个server进程和第三个server进程的处理乱掉了,我们重启的是第一个进程,为什么会影响到第三个进程的处理呢?貌似二者相互对调了处理。如果不断有socket重启,那么整个处理关系将全部乱掉。我们很多时候还是希望数据处理可以 保持连接 的!
这还得看代码,先看socket退出时的处理:
再看socket启动后的处理:
非常简单,当某个socket退出后,最后面的那个socket会接管到退出socket的位置,如果退出的socket重启,那么它将被添加到最后的位置,相当于和接管它的那个socket进行了对调,这就是问题的根源了。
起初在我没有看reuseport的代码时,我一直以为它是用链表来管理socket集群的,没想到却是使用的预分配的数组,那么如此看来,其内部结构体里面的num_socks字段无非也就是一个计数而已,它的作用仅仅局限于追踪socket集群中socket的数量是0还是非0,如果是0即释放整个数组,仅此而已。
那么用这个num_socks字段来索引队尾slot中的socket,那就不太合适了,因为它的改变意味着hash取模时模的改变。
既然Linux内核本身也是使用的数组,索引追踪丝毫起不到节省内存的作用,那我就放心了,我也在这个数组里面折腾呗,实现了一致性哈希,还不用付出空间代价。
如何来修正这个问题呢?我们希望的是,当退出的socket重启后,一切恢复原样。
起初,我是准备做一个bpf程序然后注入进去,但是非常麻烦,实际上bpf嵌入到reuseport逻辑里面,那是让你用非默认的五元组来做hash的,比如QUIC取数据报文前面的session ID来做hash等。
bpf并没有修改当你已经计算出hash值之后的socket选择算法本身,它无非只是修改hash算法的输入而已。
不那么麻烦了,简单问题不要复杂化。
只要在socket退出的时候,记住退出的位置,新创建socket的时候,不再从最后来pending,而是填充当初记住的那个位置即可!
思路就是上面这句话,但是实现方案却是多种多样。如果我不想编译内核,希望使用热补丁,那么我也只是需要重写reuseport的几个函数就可以了,值得注意的是,由于不能在结构体添加新的字段,所以可能需要时间换空间了,这意味着在处理性能上要打上一些折扣,不管怎样,实现一个简版再说吧。
以下是代码:
int hook_reuseport_add_sock(struct sock *sk, struct sock *sk2)
{
struct sock_reuseport *reuse;
int i;
if (!rcu_access_pointer(sk2->sk_reuseport_cb)) {
int err = reuseport_alloc(sk2);
if (err)
return err;
}
spin_lock_bh(lock);
reuse = rcu_dereference_protected(sk2->sk_reuseport_cb,
lockdep_is_held(&reuseport_lock)),
WARN_ONCE(rcu_dereference_protected(sk->sk_reuseport_cb,
lockdep_is_held(&reuseport_lock)),
"socket already in reuseport group");
if (reuse->num_socks == reuse->max_socks) {
reuse = reuseport_grow(reuse);
if (!reuse) {
spin_unlock_bh(lock);
return -ENOMEM;
}
}
for (i = 0; i < reuse->max_socks; i++) {
// detach的时候,会将slot设置为NULL。
if (reuse->socks[i] == NULL) {
reuse->socks[i] = sk;
break;
}
}
/* paired with smp_rmb() in reuseport_select_sock() */
smp_wmb();
reuse->num_socks++;
rcu_assign_pointer(sk->sk_reuseport_cb, reuse);
spin_unlock_bh(lock);
return 0;
}
void hook_reuseport_detach_sock(struct sock *sk)
{
struct sock_reuseport *reuse;
int i;
spin_lock_bh(lock);
reuse = rcu_dereference_protected(sk->sk_reuseport_cb,
lockdep_is_held(lock));
rcu_assign_pointer(sk->sk_reuseport_cb, NULL);
for (i = 0; i < reuse->max_socks; i++) {
// 简单地设置为NULL,后续socket add的时候,检查NULL位即可。
// 更好地方法就是从后往前补。
if (reuse->socks[i] == sk) {
reuse->socks[i] = NULL;
reuse->num_socks--;
break;
}
}
if (reuse->num_socks == 0)
call_rcu(&reuse->rcu, reuseport_free_rcu);
spin_unlock_bh(lock);
}
struct sock *hook_reuseport_select_sock(struct sock *sk, u32 hash, struct sk_buff *skb, int hdr_len)
{
struct sock_reuseport *reuse;
struct bpf_prog *prog;
struct sock *sk2 = NULL;
u16 socks;
int i = 0, j = 0, idx = 0;
rcu_read_lock();
reuse = rcu_dereference(sk->sk_reuseport_cb);
/* if memory allocation failed or add call is not yet complete */
if (!reuse)
goto out;
prog = rcu_dereference(reuse->prog);
socks = READ_ONCE(reuse->max_socks);
j = socks;
if (likely(socks)) {
/* paired with smp_wmb() in reuseport_add_sock() */
smp_rmb();
if (prog && skb)
sk2 = run_bpf(reuse, socks, prog, skb, hdr_len);
else {
// 取模时需要基于最大的socket索引,这样才能保证一致性。
// 因此,需要找到最高的socket索引。
j = reuse->max_socks;
while (!reuse->socks[j-1]) {
j = j - 1;
}
idx = reciprocal_scale(hash, j);
sk2 = reuse->socks[idx];
}
}
i = 0;
// 如果hash到了一个NULL位置,那么就取下一个非NULL的slot中的socket。
while (sk2 == NULL && i < j) {
if (idx == j) {
idx = 0;
}
sk2 = reuse->socks[idx];
i++;
idx++;
}
out:
rcu_read_unlock();
return sk2;
}
上面的代码怎么跑起来就不多说了,简单讲就是用text_poke函数将原始函数的前面5个字节替换成jmp到hook函数的指令,具体参见:
x86_64体系结构动态替换内核函数hotpatch之完结篇: https://blog.csdn.net/dog250/article/details/84572893
为了避免在select的时候进行频繁的O(n)计算,需要在数据结构中添加字段以 记住 某些变量。所以说,需改源文件才是正道!
由于select是数据通道的关键路径,绝不能执行耗时的O(n)操作,所以就把这些操作压缩到detach和add中进行,这也是我的下面这个版本和热补丁版本不同的地方。
先看detach函数:
void reuseport_detach_sock(struct sock *sk)
{
struct sock_reuseport *reuse;
int i;
spin_lock_bh(&reuseport_lock);
reuse = rcu_dereference_protected(sk->sk_reuseport_cb,
lockdep_is_held(&reuseport_lock));
rcu_assign_pointer(sk->sk_reuseport_cb, NULL);
// 首先把所有此socket占据的slot清空。
for (i = 0; i < reuse->high_sock; i++) {
if (reuse->socks[i] == sk) {
reuse->socks[i] = NULL;
}
}
// 将所有在第一步清空的slot进行补充,即将其后面第一个不为NULL的socket补充到该slot
// 这个是一致性哈希的关键。
// 控制通道的O(n)并不是什么大问题,毕竟socket重启,断开,新建这种事不是什么频繁的操作。
for (i = 0; i < reuse->high_sock; i++) {
if (reuse->socks[i] == NULL) {
int j = i + 1, k = 0;
while (reuse->socks[j] == NULL && k++ < reuse->high_sock) {
j++;
if (j == reuse->high_sock)
j = 0;
}
reuse->socks[i] = reuse->socks[j];
}
}
reuse->num_socks--;
if (reuse->num_socks == 0)
call_rcu(&reuse->rcu, reuseport_free_rcu);
spin_unlock_bh(&reuseport_lock);
}
再看一下add操作:
int reuseport_add_sock(struct sock *sk, struct sock *sk2)
{
struct sock_reuseport *reuse;
if (!rcu_access_pointer(sk2->sk_reuseport_cb)) {
int err = reuseport_alloc(sk2);
if (err)
return err;
}
spin_lock_bh(&reuseport_lock);
reuse = rcu_dereference_protected(sk2->sk_reuseport_cb,
lockdep_is_held(&reuseport_lock)),
WARN_ONCE(rcu_dereference_protected(sk->sk_reuseport_cb,
lockdep_is_held(&reuseport_lock)),
"socket already in reuseport group");
if (reuse->num_socks == reuse->max_socks) {
reuse = reuseport_grow(reuse);
if (!reuse) {
spin_unlock_bh(&reuseport_lock);
return -ENOMEM;
}
}
// O(n)!!!
if (reuse->socks[0] && reuse->socks[0] == reuse->socks[reuse->high_sock-1]) {
reuse->socks[reuse->high_sock-1] = sk;
goto setting;
}
// 将detach中的补充进行复位。不影响原始的socket。
for (i = 1; i < reuse->max_socks; i++) {
// 旧socket复位
if (reuse->socks[i] && reuse->socks[i] == reuse->socks[i-1]) {
reuse->socks[i-1] = sk;
break;
}
// 新socket添加,队尾pending。
if (reuse->socks[i] == NULL) {
reuse->socks[i] = sk;
reuse->high_sock++;
break;
}
}
setting:
/* paired with smp_rmb() in reuseport_select_sock() */
smp_wmb();
reuse->num_socks++;
rcu_assign_pointer(sk->sk_reuseport_cb, reuse);
spin_unlock_bh(&reuseport_lock);
return 0;
}
最后我们看一下select操作,这是一个关键的操作,所以一定要简单,我已经将那些for循环等耗时的查找分担进detach和add里面了,所以留下了一个精简的select:
struct sock *reuseport_select_sock(struct sock *sk,
u32 hash,
struct sk_buff *skb,
int hdr_len)
{
struct sock_reuseport *reuse;
struct bpf_prog *prog;
struct sock *sk2 = NULL;
u16 socks;
u16 high;
rcu_read_lock();
reuse = rcu_dereference(sk->sk_reuseport_cb);
/* if memory allocation failed or add call is not yet complete */
if (!reuse)
goto out;
prog = rcu_dereference(reuse->prog);
socks = READ_ONCE(reuse->num_socks);
// 除了使用high而不是num之外,select函数没有任何修改!
high = READ_ONCE(reuse->high_sock);
if (likely(socks)) {
/* paired with smp_wmb() in reuseport_add_sock() */
smp_rmb();
if (prog && skb)
sk2 = run_bpf(reuse, socks, prog, skb, hdr_len);
else
sk2 = reuse->socks[reciprocal_scale(hash, high)];
}
out:
rcu_read_unlock();
return sk2;
}
几乎没有任何修改!
注意,review代码,我们可以看到,high_sock字段是只增不减的,这个字段是一个取模的关键字段,它表示一个reuseport集群系统最大的socket数量。
之所以将其设计为只增不减,是因为我没有办法区分一个尾部的slot中的socket释放,是有意的释放,还是故障导致的socket重启释放,所以也就只能先这么设计了。副作用就是, 你一定要让你的集群每一个socket全部都启动后,再开始提供服务!
好了,现在看看效果!
重复做上面的实验。从左到右依次是socket集群中的三个服务进程:
现在让我们重启第一个和第二个进程:
OK,就是这个效果。
以上说的reuseport貌似都是在说UDP,事实上对于TCP来讲,很多事情是没有必要做的。因为TCP是连接保持的,只有在建立连接的那一刻需要reuseport来做负载均衡,此后在连接过程中,会有单独的socket来保持一个连接,而不像UDP那样每一个包都要过一遍reuseport。
但是话也不能完全这么讲,TCP上层的连接语义可能并非基于TCP连接的,也就是说TCP也有可能不同的五元组连接对应一个会话。但那又如何呢?无非也还是一回事呗,在连接建立的时候把连接SYN数据报文hash到同一个socket上呗,但是且慢!有坑!
UDP可以根据数据包的内容来做hash,而TCP呢?一个SYN包什么都没有携带,拿什么做hash呢?见招拆招的解法就是使用Fastopen机制了,唉,越扯越远了,且Fastopen也不是都支持的。
总之,reuseport的一致性哈希之所以要 一致性 ,是因为下面的原因:
- 如果服务端集群中的某个socket节点断开重启了,保证不影响其它socket节点上的服务。
- 如果客户端断开重连了(五元组发生了变化),保证它连到服务器端集群中的同一个socket节点。
其中第一点是本文描述的算法保证的,Linux内核本身迄至5.1版本并没有实现。第二点可以通过bpf机制注入一段代码来实现。
OK,浙江温州皮鞋湿,下雨进水不会胖!