Lua是一个脚本语言,使用C编写并开放源代码,主要是为了嵌入应用程序中,为应用提供扩展和定制。Lua语言这里不多提及,可阅读Lua官网http://www.lua.org/ 或者菜鸟教程了解(更容易一些)。这里重点介绍Lua在Redis中的应用,以一个简单的升级版的Redis去除服务单点问题作为实践(另一片文章Redis、Zookeeper去服务单点问题实践 中使用Redis的实现并不严谨,也在这里进行强化)。
Lua在Redis中的使用方式
redis中内嵌了Lua脚本的解释器,并提供了执行Lua脚本的入口“eval”命令,格式为 EVAL script numkeys key [key ...] arg [arg...] .其中eval 为命令,script为执行的命令脚本,numkeys 为脚本中共涉及到的key的数量,后续接收若干个key的输入和若干个arg的输入.整个脚本中使用KEYS[index],和ARGS[index]来获取实际的输入有点类似于SQL的占位符.另外一层原因由于Redis集群的固有模式导致EVAL在集群中涉及多个KEY的操作时要求所有的KEY都在同一个Hash Solt上,集群环境中调用EVAL Redis会对脚本先做一个的校验.
eval "redis.call('set',KEYS[1],ARGV[1]);local value=redis.call('get',KEYS[1]);return value+ARGV[1];" 1 mykey 100
上面的脚本向Redis中写入key=mykey,value=100的一个键值对,后边取出key为mykey的值加上输入的值100返回.
Rredis 执行Lua的保证
Redis中保证对一个Lua脚本执行的完整性,也就是说一个Lua脚本的执行只会有成功和失败,且保证在Redis Server端同时只会有一个Lua脚本在运行,这样就意味着Lua脚本中的操作是一个完整的原子操作,不会伴随中间状态和资源竞争,同时也意味着在Lua脚本中不适合进行一些耗时长的操作.由于有以上的保证,使用Redis来进行一些复杂的原子操作就在合适不过了,setNx方法的局限性也被Redis Lua进行了弥补.
Redis对嵌入的Lua做了若干的限制,包保证脚本不对Redis 造成破坏.不提供访问系统状态的库,禁止使用loadfile函数,禁止带有随机性质的命令或者带有副作用的命令, 对随机读命令的结果进行排序,替换math原有的random方法,不允许定义函数,不允许声明全局变量等等.
实战Redis+Lua去服务单点问题
@Service public class MasterHeartbeat { private static final Log log = LogFactory.getLog(MasterHeartbeat.class); private static final String LOCK_KEY = "lock.carflowclient"; public static final AtomicBoolean isMaster = new AtomicBoolean(false); private static final ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(); @Autowired JedisCluster jedisCluster; // 节点标识 private String id = UUID.randomUUID().toString(); // 最大存活周期[秒] private long maxSurvivalTime = 30; // 心跳周期[毫秒] private long heartbeatPeriod = 3000; @Autowired private Config config; public void init() { if (!config.getHighavailability()) { log.warn("服务未启用高可用模式..."); return; } log.info(this.toString()); // 首次启动先获取锁 tryLock(); // 启动调度任务定期的获取锁或者注册心跳信息 service.scheduleAtFixedRate(new HeartbeatTask(), maxSurvivalTime, heartbeatPeriod, TimeUnit.MILLISECONDS); } /** * 尝试获取锁操作,不管是否成功 */ public void tryLock() { try { Object result = jedisCluster.eval( "redis.call('set',KEYS[1],ARGV[1],'NX','EX',ARGV[2]);return redis.call('get',KEYS[1])", 1, LOCK_KEY, id, String.valueOf(maxSurvivalTime * 2)); log.info(result); Boolean isLock = result != null && id.equals(result.toString()) ? true : false; isMaster.compareAndSet(!isLock, isLock); if (isLock) { log.info("try lock and success i'm master "); } else { log.info("try lock fail "); } } catch (Exception e) { log.error(e); try { Thread.sleep(2 * 1000); } catch (InterruptedException e1) { e1.printStackTrace(); } } finally { } } class HeartbeatTask implements Runnable { @Override public void run() { try { Object result = jedisCluster.eval( "local value=redis.call('get',KEYS[1]); if(not value) then redis.call('set',KEYS[1],ARGV[1],'NX','EX',ARGV[2]); elseif(value==ARGV[1]) then redis.call('expire',KEYS[1],ARGV[2]); else end; return redis.call('get',KEYS[1]);", 1, LOCK_KEY, id, String.valueOf(maxSurvivalTime * 2)); Boolean isLock = result != null && id.equals(result.toString()) ? true : false; isMaster.compareAndSet(!isLock, isLock); if (isLock) { } else { log.info("try lock fail "); } } catch (Exception e) { log.error(e); } finally { } } } @Override public String toString() { return "MasterHeartbeat [id=" + id + ", maxSurvivalTime=" + maxSurvivalTime + ", heartbeatPeriod=" + heartbeatPeriod + "]"; } }
- 服务启动的时候 在恰当的时机来调用init方法,这里加了开关控制是否需要启用HA模式,首次启动先获取一次锁即“tryLock()”,核心是执行了以下Lua脚本,先调用Set方法去获取一个锁,如果为lock.carflowclient的key不存在则设置key=lock.carflowclient,value为当前节点的唯一标识ID,同时指定key的过期时间为指定时间.无论key是否存在组后都返回key=lock.carflowclient的值,也就是返回当前获得锁的节点标识符,接下来可在客户端判断当前节点是否是获得锁的节点来及时更新当前节点的状态[主节点还是备节点]
Object result = jedisCluster.eval("redis.call('set',KEYS[1],ARGV[1],'NX','EX',ARGV[2]);return redis.call('get',KEYS[1])", 1, LOCK_KEY, id, String.valueOf(maxSurvivalTime * 2));
- 首次获取锁完成后,需要持续的去获取锁[当前是备节点]或者持续的维持心跳信息[当前是主节点],该操作使用以下脚本完成,首先获取key=lock.carflowclient的值,如果key不存在则执行和获取锁同样的操作[设置key,value,指定过期时间],否则的话判断当前key=lock.carflowclient的值是否当前节点id,如果是意味着当前节点持有锁,只需更新过期时间即可,否则就什么都不做,最后返回key=lock.carflowclient的值,交给客户端去判断当前节点是否仍然持有锁.
Object result = jedisCluster.eval("local value=redis.call('get',KEYS[1]); if(not value) then redis.call('set',KEYS[1],ARGV[1],'NX','EX',ARGV[2]); elseif(value==ARGV[1]) then redis.call('expire',KEYS[1],ARGV[2]); else end; return redis.call('get',KEYS[1]);", 1, LOCK_KEY, id, String.valueOf(maxSurvivalTime * 2));
- 经过以上的简化,最终暴露给客户端的仅仅是一个持有当前锁的节点的ID,由客户端判定是否自己持有锁来更新当前服务的状态,节点决策使用静态原子变量来标识,在以上的设计中不足是由于节点的ID使用了UUID,每次重启都会改变,如果停止主节点后立即重启可能会存在上一个锁未释放当前节点不能立即释放锁的情况,当然可以通过使用热定算法来实现节点ID和相关的硬件环境绑定来处理,就目前来说还是可以容忍这种情况的.
public static final AtomicBoolean isMaster = new AtomicBoolean(false);
总结
公共模块的抽象力求简洁,可靠,通用。