Zookeeper(五)案例模拟
案例-模拟美团商家上下线
模拟美团服务平台,商家营业通知,商家打烊通知
提前在根节点下,创建好 /meituan 节点
商家服务类
/**
* @Program: test-zookeeper
* @Author: XiaoXing
* @Create: 2021-01-12 21:56
* @Description: 商家服务类
**/
public class ShopServer {
//zookeeper集群的IP和端口
private String connectString = "106.75.245.83:2181,106.75.245.83:2181,106.75.245.83:2181";
//session超时时间,毫秒单位,此处为60秒
//时间不宜设置太小,因为zookeeper和加载集群环境会因为性能等原因而延迟略高
//如果时间太少,还没有创建好客户端,就开始创建节点,会报错
private int sesionTimeout = 60 * 1000;
//zookeeper客户端对象
private ZooKeeper zooKeeperClient;
/**
* 创建连接zookeper方法
*/
public void conntect(){
/**
* new Watcher():监听器
*/
try {
zooKeeperClient = new ZooKeeper(connectString, sesionTimeout, new Watcher() {
public void process(WatchedEvent watchedEvent) {
}
});
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 注册到zookeeper
* @param shopName
*/
public void register(String shopName){
try {
//一定要创建临时有顺序节点 CreateMode.EPHEMERAL_SEQUENTIA
//以来可以自动编号,二来断开时,节点自动删除(打样)
String string = zooKeeperClient.create("/meituan/shop", shopName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println("【" + shopName + "】开始营业!" + string);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 做生意
* @param shopName
*/
private void business(String shopName) {
System.out.println("【" + shopName + "】正在火爆营业中!");
try {
//线程等待
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
//连接zookeeper集群
ShopServer shop = new ShopServer();
//和美团取得联系
shop.conntect();;
//入住美团,将服务节点注册到zookeeper
shop.register(args[0]);
//业务逻辑处理
shop.business(args[0]);
}
}
客户类
/**
* @Program: test-zookeeper
* @Author: XiaoXing
* @Create: 2021-01-12 22:07
* @Description: 客户消费者
**/
public class Customers {
//zookeeper集群的IP和端口
private String connectString = "106.75.245.83:2181,106.75.245.83:2181,106.75.245.83:2181";
//session超时时间,毫秒单位,此处为60秒
//时间不宜设置太小,因为zookeeper和加载集群环境会因为性能等原因而延迟略高
//如果时间太少,还没有创建好客户端,就开始创建节点,会报错
private int sesionTimeout = 60 * 1000;
//zookeeper客户端对象
private ZooKeeper zooKeeperClient;
/**
* 创建连接zookeper方法
*/
public void conntect() {
/**
* new Watcher():监听器
*/
try {
zooKeeperClient = new ZooKeeper(connectString, sesionTimeout, new Watcher() {
public void process(WatchedEvent watchedEvent) {
//重新再次获取商家列表
getShopList();
}
});
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 获取商家,获取子节点列表
*/
private void getShopList() {
try {
//获取服务器子节点,并对父节点进行监听
List<String> shops = zooKeeperClient.getChildren("/meituan", true);
//声明储存服务器信息集合
ArrayList<String> shopList = new ArrayList<>();
shops.forEach(item -> {
try {
//获取节点上数据,某一个商店
byte[] data = zooKeeperClient.getData("/meituan/" + item, false, new Stat());
shopList.add(new String(data));
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
System.out.println("目前正在营业的商家:" + shopList);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 做生意
*/
private void business() {
System.out.println("客户正在浏览商家。。。");
try {
//线程等待
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
Customers client = new Customers();
//获得zookeeper连接,打开美团
client.conntect();
//获取美团下的子节点,获取商家列表
client.getShopList();
//业务逻辑处理,对比商家,下单点餐
client.business();
}
}
运行商家服务类(以main方法带参数的形式运行)
案例-分布式锁-商品秒杀
-
锁:我们在多线程中接触过,作用就是让当前的资源不会被其他线程访问!
- 我的日记本,不可以被别人看到。所以要锁在保险柜中
- 当我打开锁,将日记本拿走了,别人才能使用这个保险柜
-
在zookeeper中使用传统的锁引发的 “羊群效应” :1000个人创建节点,只有一个人能成功,999人需要等待!
-
羊群是一种很散乱的组织,平时在一起也是盲目地左冲右撞,但一旦有一只头羊动起来,其他的羊也会不假思索地一哄而上,全然不顾旁边可能有的狼和不远处更好的草。羊群效应就是比喻人都有一种从众心理,从众心理很容易导致盲从,而盲从往往会陷入骗局或遭到失败。
- 避免“羊群效应”,zookeeper采用分布式锁
- 所有请求进来,在/lock下创建 临时顺序节点 ,放心,zookeeper会帮你编号排序
- 判断自己是不是/lock下最小的节点
- 是,获得锁(创建节点)
- 否,对前面小我一级的节点进行监听
- 获得锁请求,处理完业务逻辑,释放锁(删除节点),后一个节点得到通知(比你年轻的死了,你成为最嫩的了)
- 重复步骤2
实现步骤
<packaging>war</packaging>
<properties>
<spring.version>5.2.7.RELEASE</spring.version>
</properties>
<dependencies>
<!-- Spring -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-beans</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
<version>${spring.version}</version>
</dependency>
<!-- Mybatis -->
<dependency>
<groupId>org.mybatis</groupId>
<artifactId>mybatis</artifactId>
<version>3.5.5</version>
</dependency>
<dependency>
<groupId>org.mybatis</groupId>
<artifactId>mybatis-spring</artifactId>
<version>2.0.5</version>
</dependency>
<!-- 连接池 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.10</version>
</dependency>
<!-- 数据库 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.20</version>
</dependency>
<!-- junit -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!-- maven内嵌的tomcat插件 -->
<plugin>
<groupId>org.apache.tomcat.maven</groupId>
<!-- 目前apache只提供了tomcat6和tomcat7两个插件 -->
<artifactId>tomcat7-maven-plugin</artifactId>
<version>2.2</version>
<configuration>
<port>8001</port>
<path>/</path>
</configuration>
<executions>
<execution>
<!-- 打包完成后,运行服务 -->
<phase>package</phase>
<goals>
<goal>run</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE configuration
PUBLIC "-//mybatis.org//DTD Config 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-config.dtd">
<configuration>
<!--开启驼峰命名-->
<settings>
<!-- 是否开启自动驼峰命名规则(camel case)映射,即从数据库列名 A_COLUMN 到属性名aColumn 的类似映射 a_name aName-->
<setting name="mapUnderscoreToCamelCase" value="true"/>
<!-- 后台的日志输出:针对开发者-->
<setting name="logImpl" value="STDOUT_LOGGING"/>
</settings>
</configuration>
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:p="http://www.springframework.org/schema/p"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:tx="http://www.springframework.org/schema/tx"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop.xsd
http://www.springframework.org/schema/tx
https://www.springframework.org/schema/tx/spring-tx.xsd">
<!--开启注解扫描-->
<context:component-scan base-package="com.szx.*"></context:component-scan>
<!--引入数据库配置文件-->
<context:property-placeholder location="classpath:jdbc.properties"></context:property-placeholder>
<!--配置数据源-->
<bean id="dataSource" class="com.alibaba.druid.pool.DruidDataSource">
<property name="driverClassName" value="${jdbc.driver}"/>
<property name="url" value="${jdbc.url}"/>
<property name="username" value="${jdbc.username}"/>
<property name="password" value="${jdbc.password}"/>
</bean>
<bean id="sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
<!--连接数据源-->
<property name="dataSource" ref="dataSource"></property>
<!--配置别名-->
<property name="typeAliasesPackage" value="com.szx.pojo"></property>
<!--引入mybatis配置文件-->
<property name="configLocation" value="classpath:mybatis/mybatis-config.xml"></property>
</bean>
<!--事务管理器-->
<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<!--注入数据源-->
<property name="dataSource" ref="dataSource"></property>
</bean>
<!--开启事务-->
<tx:annotation-driven></tx:annotation-driven>
</beans>
/**
* @Program: zk_product
* @Author: XiaoXing
* @Create: 2021-01-13 18:29
* @Description: 订单Mapper层
**/
@Mapper
public interface OrderMapper {
/**
* 添加订单信息
* @param order
* @return
*/
@Insert("insert into `order`(id,pid,userid) values(#{id},#{pid},#{userid})")
public int insert(Order order);
}
/**
* @Program: zk_product
* @Author: XiaoXing
* @Create: 2021-01-13 18:30
* @Description: 商品Mapper层
**/
@Mapper
public interface ProductMapper {
/**
* 查询商品(目的查询库存)
* @param id
* @return
*/
@Select("select * from product where id = #{id}")
public Product getProduct(@Param("id")int id);
/**
* 减库存
* @param id
* @return
*/
@Update("update product set stock = stock - 1 where id = #{id}")
public int reduceStock(@Param("id")int id);
}
/**
* @Program: zk_product
* @Author: XiaoXing
* @Create: 2021-01-13 18:31
* @Description: 商品服务接口
**/
public interface ProductService {
/**
* 减库存
* @param id
*/
public void reduceStock(int id);
}
/**
* @Program: zk_product
* @Author: XiaoXing
* @Create: 2021-01-13 18:32
* @Description: 商品服务接口实现类
**/
@Service("productService")
public class ProductServiceImpl implements ProductService {
@Autowired
private ProductMapper productMapper;
@Autowired
private OrderMapper orderMapper;
/**
* 减库存
* @param id
*/
public void reduceStock(int id) {
//获取库存,根据商品id查询商品
Product product = productMapper.getProduct(id);
//判断商品库存
if (product.getStock() <= 0){
throw new RuntimeException("抢光了,你来晚了呦。。。。");
}
//减库存
int i = productMapper.reduceStock(id);
//判断是否成功
if (i == 1){
//生成订单
orderMapper.insert(new Order(UUID.randomUUID().toString(),id,101));
} else {
throw new RuntimeException("哎呀,咋没减去库存。。。。");
}
}
}
/**
* @Program: zk_product
* @Author: XiaoXing
* @Create: 2021-01-13 18:26
* @Description: 控制层
**/
@Controller
@MapperScan("com.szx.mapper")
public class ProductActionController {
@Autowired
private ProductService productService;
/**
* 减库存控制器
* @param id
* @return
*/
@GetMapping("/product/reduce")
@ResponseBody
public Object reduce(int id){
productService.reduceStock(id);
return "ok";
}
}
Zookeeper分布式锁(Curator)
/**
* @Program: zk_product
* @Author: XiaoXing
* @Create: 2021-01-13 18:26
* @Description: 控制层
**/
@Controller
@MapperScan("com.szx.mapper")
public class ProductActionController {
//zookeeper集群的IP和端口
private String connectString = "106.75.245.83:2181,106.75.245.83:2181,106.75.245.83:2181";
@Autowired
private ProductService productService;
/**
* 减库存控制器
* @param id
* @return
*/
@GetMapping("/product/reduce")
@ResponseBody
public Object reduce(int id){
//重试策略(1000毫秒试一次,最多试三次)
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
//创建curator工具对象
CuratorFramework client = CuratorFrameworkFactory.newClient(connectString, retryPolicy);
//启动
client.start();
//根据客户端工具对象创建内部互斥锁
InterProcessMutex lock = new InterProcessMutex(client,"/product_" + id);
//加锁
try {
lock.acquire();
productService.reduceStock(id);
} catch (Exception e) {
if (e instanceof RuntimeException){
try {
throw e;
} catch (Exception ex) {
ex.printStackTrace();
}
}
} finally {
try {
//释放锁
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
return "ok";
}
}