版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/u013278314/article/details/82717786
原理介绍:
要基于数据库实现分布式锁,最简单的方式可能就是直接创建一张锁表,然后通过操作该表中的数据来实现。
当需要锁住某个方法或资源时,就在该表中增加一条记录,想要释放锁的时候就删除这条记录
问题:
1.这把锁依赖数据库的可用性,数据库是一个单点,一旦数据库挂掉,会导致业务系统不可用
2.这把锁没有失效时间,一旦解锁操作失败,会导致锁记录一直在数据库中,其他线程无法再获得锁
3.这把锁只能是非阻塞的,因为数据的insert操作,一旦插入失败就会直接报错,没有获得锁的线程并不会进入排队队列,想要再次获得锁就要再次触发获得锁操作。
4,这把锁是非重入的,同一个线程没有释放锁之前无法在获得该锁,因为数据已经存在了。
解决:
1.数据库是单点?搞两个数据库,数据之前双向同步,一旦挂掉迅速切换到备库上。
2.没有失效时间?做一个定时任务,每隔一定时间把数据库中的超时数据清理一遍。
3.非阻塞?搞一个while循环,直到insert成功再返回
4.非重入的?在数据库中加个字段,记录当前获得锁的机器的主机信息和线程信息,那么下次再获取的时候先查询数据库,如果当前机器的主机信息和线程信息在数据库可以查询的话,直接把锁分配给他就可以了。
基于数据库排他锁:
除了可以通过增删操作数据库中的记录以外,其实还可以借助数据中自带的锁来实现分布式的锁。
在查询语句后面增加for update语句,数据库会在查询过程中给数据表增加排他锁(innodb引擎在加锁的时候,只有通过索引进行检索的时候才会使用行级锁,否则会使用表级锁)
使用数据库来实现分布式锁的方式,这两种方式都是依赖数据库的一张表,一种是通过表中的记录的存在情况确定当前是否有锁存在,另外一种是通过数据库的排他锁来实现分布式锁。
引入依赖:
<dependency>
<groupId>org.mybatis</groupId>
<artifactId>mybatis</artifactId>
<version>3.4.6</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.11</version>
</dependency>
创建表user:
CREATE TABLE `user` (
`user_id` varchar(255) NOT NULL,
`user_name` varchar(255) DEFAULT NULL,
PRIMARY KEY (`user_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
MybatisConfig.xml:
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE configuration
PUBLIC "-//mybatis.org//DTD Config 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-config.dtd">
<configuration>
<settings>
<!-- 这个地方很重要 设置sql的执行时间 超过时间 没有完成 就会抛出异常 这里设置一秒 -->
<setting name="defaultStatementTimeout" value="1" />
</settings>
<environments default="mysql">
<environment id="mysql">
<!-- 配置事务 -->
<transactionManager type="JDBC"></transactionManager>
<!-- 配置数据源 -->
<dataSource type="POOLED">
<property name="driver" value="com.mysql.cj.jdbc.Driver" />
<property name="url" value="jdbc:mysql://localhost:3306/clouddb01?useSSL=true" />
<property name="username" value="root" />
<property name="password" value="123456" />
<property name="poolMaximumIdleConnections" value="50" />
<property name="poolMaximumActiveConnections" value="1000" />
<property name="poolPingQuery" value="SELECT 1 FROM DUAL" />
<property name="poolPingEnabled" value="true" />
</dataSource>
</environment>
</environments>
<mappers>
<mapper resource="com/th/mapper/userMapper.xml" />
</mappers>
</configuration>
userMapper.xml
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<!-- namespace:命名空间,是某一个 dao 层的具体路径 -->
<mapper namespace="com.th.dao.user">
<resultMap type="com.th.entity.User" id="User">
<result property="userId" column="user_id" />
<result property="userName" column="user_name" />
</resultMap>
<select id="getUserAllEentity" resultType="com.th.entity.User" resultMap="User">
SELECT * FROM user
</select>
<update id="update" parameterType="com.th.entity.User">
UPDATE user SET user_name = #{userName} WHERE user_id=#{userId}
</update>
</mapper>
user:
package com.th.entity;
public class User {
private String userId;
private String userName;
public User() {
super();
}
public User(String userId, String userName) {
super();
this.userId = userId;
this.userName = userName;
}
public String getUserId() {
return userId;
}
public void setUserId(String userId) {
this.userId = userId;
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
}
和一 zk分布式锁一样的OrderService接口:
public interface OrderService {
void createOrder();
}
模式公共资源的OrderCodeGenerator:
package com.th.order;
import java.text.SimpleDateFormat;
import java.util.Date;
public class OrderCodeGenerator {
private static int i = 0;
public String getOrderCode() {
Date now = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss-");
return sdf.format(now) + ++i;
}
}
基于数据库实现的lock:
package com.th.order;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import org.apache.ibatis.io.Resources;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import org.apache.ibatis.session.SqlSessionFactoryBuilder;
import com.th.entity.User;
public class DbLock implements Lock {
private SqlSession session;
private User user;
DbLock(User user) {
InputStream inputStream = null;
try {
inputStream = Resources.getResourceAsStream("MybatisConfig.xml");
} catch (IOException e) {
e.printStackTrace();
}
SqlSessionFactoryBuilder sessionFactoryBuilder = new SqlSessionFactoryBuilder();
SqlSessionFactory sessionFactory = sessionFactoryBuilder.build(inputStream);
this.session = sessionFactory.openSession();
this.user = user;
}
@Override
public void lock() {
tryLock();
}
@Override
public boolean tryLock() {
int count = session.update("com.th.dao.user.update", user);
return count == 1 ? true : false;
}
@Override
public void unlock() {
session.commit();
}
@Override
public void lockInterruptibly() throws InterruptedException {
// TODO Auto-generated method stub
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false;
}
@Override
public Condition newCondition() {
return null;
}
}
OrderServiceImplWithZkDis 及测试方法main:
package com.th.order;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.locks.Lock;
import com.th.entity.User;
public class OrderServiceImplWithZkDis implements OrderService {
private static OrderCodeGenerator org = new OrderCodeGenerator();
// private Lock lock = new ZookeeperDisLock("/LOCK_TEST");
// private Lock lock = new ZookeeperReAbleDisLock("/LOCK_TEST");
private Lock lock = new DbLock(new User("1","张三丰"));
@Override
public void createOrder() {
String orderCode = null;
try {
lock.lock();
orderCode = org.getOrderCode();
//TestReLock();
System.out.println(Thread.currentThread().getName() + "生成订单:" + orderCode);
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void TestReLock() {
lock.lock();
System.out.println(Thread.currentThread().getName() + "测试重入锁成功...");
lock.unlock();
}
public static void main(String[] args) {
int num = 20;
CyclicBarrier cyclicBarrier = new CyclicBarrier(num);
for (int i = 0; i < num; i++) {
new Thread(new Runnable() {
@Override
public void run() {
OrderService orderService = new OrderServiceImplWithZkDis();
System.out.println(Thread.currentThread().getName() + ": 我准备好了");
try {
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
orderService.createOrder();
}
}).start();
}
}
}