锁
- 线程锁
lock(mutex)
资源操作
unlock(mutex)
-
进程锁(nginx 中 accept 锁-共享内存+信号量)
-
分布式锁 在不同机器的进程的锁
分布式锁的实现方案
- 数据库 redis, mysql
- zookeeper
分布式锁有哪些特征
- 互斥性。
- 可重入性,一个进程允许递归获取锁(需要递归释放锁)。
- 锁超时(进程 crash 掉, 需要考虑释放分布式锁)。
- 高效、 高可用(redis, zk)。
- 公平锁和非公平锁,按获取锁顺序执行, 反之就是随机执行。
redis
- 操作redis实际上是操作redis上的数据结构(string,list,hash,set,zset,stream)
- redis通过rpc远程调用的方式,做数据请求。同时也可以做监听发布。
- redis按照请求顺序来执行命令,返回的数据按照执行顺序执行,因为redis执行命令是单线程的,所有客户端共用。
redis 的分布式实现
- 进程获取锁
- 执行逻辑
- 进程释放锁
方案1
- 获取锁:调用setnx(key,val)函数,该函数设置key,只有在key不存在的时候才能设置成功。成功会收到+OK,失败会收到nil。使用setnx需要使用额外命令设置超时expire(key),防止进程死锁,或者进程崩溃,无人释放锁,在这里需要使用redis事务lua。
- 释放锁:del(key)
方案2
- 获取锁:将setnx和expire合并为set(“lock”,四元组,“NX”,“EX”,30);还需要添加线程给操作续时间操作(因为可能执行逻辑超过30秒)。四元组解决的是只由该进程释放锁。
注:
四元组
:ip+port+starttime+pid。ip确认用户,port确认功能,pid确认的是哪个进程,用来识别进程重启,starttime确认起始时间,防止只是通过pid号确认的时候,如果进程重启,刚好pid重合的情况。
-
释放锁:先get(key)比较四元组,确认是自己后,再delete,以放别人释放了自己的锁。此处需要加上redis事务lua去使用。
if(get(key)==四元组) delete(key); -
释放锁为什么需要比较四元组的原因?
例如此时有进程ABCD,4个,此时A获取到锁,但是A进程卡机了,A进程超时,redis通过超时判断,已经释放了A的锁,BCD开始抢占,B获取到锁,如果此时A不卡了,然后执行完任务就去释放锁,锁再次被释放,C此时获取到资源的话,那么此时是有两个进程获取到同一个资源的,不符合我们要的锁的预期。
zookeeper
zookeeper互斥锁的实现(方案1)
- 获取锁:创建同名锁节点/lock,且是临时的节点(此时多个客户端必然只有一个可以创建成功)。其他客户端通过添加watch来监听节点的变化来抢占锁。
- 释放锁:delete命令删除节点或者客户端断开连接或超时后节点自动被删除。
zookeeper互斥锁的实现(方案2)
如果zookeeper的连接抢占锁的客户端过多,此时zookeeper需要大量的性能去维护大量的监听,并发送通知,所以思考出了一个客户端一个lock节点,然后通过先后顺序编号(使用zookeeper的顺序临时节点)。编号最小的可以有资格获取锁,没拿到最小锁的监听比自己小一个号的节点。。
- 获取锁:创建同名,且是临时的顺序节点(此时最先到的客户端是/lock/0),第一个客户端拿到锁资格,其他客户端根据该编号往下延续,并监听前比自己小一个号的节点。
- 释放锁:delete命令删除节点或者客户端断开连接或超时后节点自动被删除。
公平锁的实现
- 每个进程创建 lock 节点(临时顺序节点);
- 每个进程都有独自的一个序号; 假设序号为 0, 说明它获取锁, 那么序号不为 0, 只
需要监听序号排序中前一位(可能序号小一, 可能序号小更多, 因为前一个节点可能自动删
除了) 。
代码实现
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <unistd.h>
#include "include/zookeeper.h"
#include "include/zookeeper_log.h"
#define TEMP_LOCK "/lock/res1"
typedef struct zk_mutex_s {
int seq_me; // 序号
int seq_watch; // 监听的序号
char watch[64]; // 监听的节点名
} zk_mutex_t;
static zk_mutex_t zmt = {
0};
static int quit = 0;
/*
typedef void (*void_completion_t)(int rc, const void *data);
*/
void zk_delete_lock_res1_children(int rc, const void *data) {
printf("zk_delete_lock_res1_children rc = %d\n", rc);
quit = 1;
}
/*
typedef void (*stat_completion_t)(int rc, const struct Stat *stat,
const void *data);
*/
void zk_watch_children(int rc, const struct Stat *stat, const void *data) {
printf("zk_watch_children_disappear rc = %d\n", rc);
}
/*
typedef void (*strings_completion_t)(int rc,
const struct String_vector *strings, const void *data);
*/
void zk_get_children_lock_res1(int rc, const struct String_vector *strings, const void *data) {
printf("zk_get_children_lock_res1 rc = %d\n", rc);
if (rc == 0) {
int i,v,n;
n = 0;
char * value_self = NULL;
char * value_other = NULL;
zhandle_t* zk_hdl = (zhandle_t*)data;
for (i = 0; i < strings->count; i++) {
sscanf(strings->data[i], "%d", &v);
if (v < zmt.seq_me) {
if (n == 0) {
zmt.seq_watch = v;
value_other = strings->data[i];
n++;
} else if (n > 0 && v > zmt.seq_watch) {
zmt.seq_watch = v;
value_other = strings->data[i];
n++;
}
} else if (v == zmt.seq_me) {
value_self = strings->data[i];
}
}
if (n == 0) {
// 说明自己已经最小的了
char temp[64] = {
0};
sprintf(temp, "%s/%s", TEMP_LOCK, value_self);
printf("%s 获取锁, 获取执行权, 释放锁\n", temp);
zoo_adelete(zk_hdl, temp, -1, zk_delete_lock_res1_children, zk_hdl);
} else if (n > 0) {
// 找到可以监听的对象
memset(zmt.watch, 0, 64*sizeof(char));
sprintf(zmt.watch, "%s/%s", TEMP_LOCK, value_other);
zoo_aexists(zk_hdl, zmt.watch, 1, zk_watch_children, zk_hdl);
}
}
}
/*
typedef void
(*string_completion_t)(int rc, const char *value, const void *data);
*/
void zk_create_lock_res1_es(int rc, const char *value, const void *data) {
printf("zk_create_lock_res1_es rc = %d\n", rc);
if (rc == 0) {
zhandle_t* zk_hdl = (zhandle_t*)data;
int seq;
sscanf(value, "%s/%d", TEMP_LOCK, &seq);
printf("\tname = %s seq = %d\n", value, seq);
zmt.seq_me = seq;
if (seq > 0) {
int ret;
ret = zoo_aget_children(zk_hdl, TEMP_LOCK, 0, zk_get_children_lock_res1, data);
if (ret) {
printf("error: zk_create_lock_res1_es:zoo_aget_children\n");
exit(EXIT_FAILURE);
}
return;
}
printf("%s 获取锁, 获取执行权, 释放锁\n", value);
zoo_adelete(zk_hdl, value, -1, zk_delete_lock_res1_children, "");
}
}
/*
typedef void (*watcher_fn)(zhandle_t *zh, int type,
int state, const char *path,void *watcherCtx);
*/
void zk_watcher_disconnect(zhandle_t *zh, int type, int state, const char *path, void *ctx) {
if (0 == strcmp(zmt.watch, path)) {
int ret = zoo_aget_children(zh, TEMP_LOCK, 0, zk_get_children_lock_res1, zh);
if (ret) {
printf("error: zk_watcher_disconnect:zoo_aget_children\n");
exit(EXIT_FAILURE);
}
}
}
int main(int argc, const char *argv[]) {
zoo_set_debug_level(ZOO_LOG_LEVEL_WARN);
zoo_set_log_stream(stdout);
/* 初始化 zk */
zhandle_t* zk_hdl = zookeeper_init("127.0.0.1:2181",
zk_watcher_disconnect, 30000, 0, "zookeeper for distribute mutex.", 0);
if (!zk_hdl) {
printf("error: connecting to zookeeper server...\n");
exit(EXIT_FAILURE);
}
int ret;
char znode[64] = TEMP_LOCK;
strcat(znode, "/");
/* 创建 /lock/res1/ 临时顺序节点*/
ret = zoo_acreate(zk_hdl, znode, "mark", 5, &ZOO_OPEN_ACL_UNSAFE,
ZOO_EPHEMERAL_SEQUENTIAL, zk_create_lock_res1_es, zk_hdl);
if (ret) {
printf("error: create %s EPHEMERAL SEQUENTIAL\n",TEMP_LOCK);
exit(EXIT_FAILURE);
}
ret = 0;
for (;;) {
if (quit) break;
usleep(2500);
}
zookeeper_close(zk_hdl);
return 0;
}