二、Dubbo原理
生产者:
<!-- 提供方应用信息,用于计算依赖关系 -->
<dubbo:application name="provider"/>
<!-- 使用zookeeper注册中心暴露服务地址 -->
<dubbo:registry address="${zookeeper.register.address}" protocol="zookeeper" client="zkclient"/>
<!-- 用dubbo协议在20880端口暴露服务 -->
<dubbo:protocol name="dubbo" port="${dubbo.provider.port}"/>
<!-- 测试接口-->
<dubbo:service interface="com.dubbo.TestService" ref="testService" version="1.0.1"/>
消费者:
<!-- 提供方应用信息,用于计算依赖关系 -->
<dubbo:application name="consumer"/>
<dubbo:consumer timeout="300000" retries="0"/>
<!-- 使用zookeeper注册中心暴露服务地址 -->
<dubbo:registry address="${zookeeper.register.address}" protocol="zookeeper" client="zkclient"/>
<!-- 测试接口-->
<dubbo:reference id="testService" interface="com.dubbo.TestService" version="1.0.1"/>
三、Zookeeper概述
四、Java操作Zookeeper
启动zookeeper: 将zoo_sample.cfg重命名zoo.cfg 运行zkServer.cmd
Zookeeper数据查看工具ZooInspector
1、下载https://issues.apache.org/jira/secure/attachment/12436620/ZooInspector.zip
2、解压,进入目录ZooInspector\build,运行zookeeper-dev-ZooInspector.jar
引入pom.xml依赖
<dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.10</version> </dependency>
创建永久节点:PERSISTENT
import org.I0Itec.zkclient.ZkClient; import org.apache.zookeeper.CreateMode; /** * Created by yz on 2018/03/31. */ public class TestZookeeper { public static void main(String[] args) { // 60000 session超时时间;1000 连接超时时间 ZkClient zkClient = new ZkClient("127.0.0.1:2181", 60000, 1000); // 节点(路径);值;节点类型 PERSISTENT永久节点 zkClient.create("/user","xiaoming", CreateMode.PERSISTENT); zkClient.close(); System.out.println("###注册成功###"); } }
节点不允许有重复,再次创建/user节点,会报错
创建子节点:
public static void main(String[] args) { // 60000 session超时时间;1000 连接超时时间 ZkClient zkClient = new ZkClient("127.0.0.1:2181", 60000, 1000); // 节点(路径);值;节点类型 PERSISTENT永久节点 zkClient.create("/user/zk01","xiaohong", CreateMode.PERSISTENT); zkClient.close(); System.out.println("###注册成功###"); }
创建临时节点:EPHEMERAL
/** * Created by yz on 2018/03/31. */ public class TestZookeeper { public static void main(String[] args) { // 60000 session超时时间;1000 连接超时时间 ZkClient zkClient = new ZkClient("127.0.0.1:2181", 60000, 1000); // 节点(路径);值;节点类型 PERSISTENT永久节点 zkClient.create("/user_temp","xiaoming", CreateMode.EPHEMERAL); zkClient.close(); System.out.println("###注册成功###"); } }
临时节点可以创建成功,但是zk会话关闭之后,会删除。
Thread.sleep(10000); zkClient.close();五、Dubbo负载均衡原理
启动两个provider 端口号20880,端口号20881
启动consumer调用provider,轮询机制调用,dubbo默认有负载均衡算法。
dubbo怎么实现负载均衡的?
生产者将服务接口地址注册到注册中心,在注册中心会有两个集群地址
消费者只需要通过“com.alibaba.dubbo.demo.DemoService” 这个节点名称,先找下面的子节点,订单服务能够获取到会员服务整个集群地址,通过本地负载均衡算法,算出来去调用哪一台会员服务。
六、服务注册到Zookeeper节点上
使用代码去实现在Zookeeper上注册多节点(模拟dubbo注册到Zookeeper) 步骤:1.建两个服务器端,把会员服务信息地址注册到注册中心上去
2.订单服务订阅注册中心节点地址,当有新的地址之后,及时更新
3.本地通过负载均衡算法去调用哪一台服务器
服务器端(生产者):
import org.I0Itec.zkclient.ZkClient; import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; import java.util.concurrent.atomic.AtomicInteger; /** * 服务端 生产者 每启一个服务都注册到zookeeper中心上去 * socket服务器端 * Created by yz on 2018/03/31. */ public class ZkServerSocket implements Runnable{ private static int port = 8081; public ZkServerSocket(int port) { this.port = port; } // 多线程线程安全计数器 AtomicInteger ai=new AtomicInteger(0); public static void main(String[] args) { ZkServerSocket server = new ZkServerSocket(port); Thread thread = new Thread(server); thread.start(); } /** * 将服务注册到zookeeper,子节点不要用持久节点,用临时节点 */ public void regZkServer(){ ZkClient zkClient = new ZkClient("127.0.0.1:2181", 60000, 1000); // 1.创建父节点/server 首先运行TestZookeeper 创建/server父节点 持久节点 String path = "/server/server"+port; // 判断节点是否在zookeeper上存在,如果存在删除 if(zkClient.exists(path)){ zkClient.delete(path); } // 2.将每台服务启动,将所有子节点全部注册到/server节点下 // key:表示当前节点(path) value:表示服务器IP地址和端口号 zkClient.createEphemeral(path,"127.0.0.1:"+port); } public void run() { ServerSocket serverSocket = null; try { serverSocket = new ServerSocket(port); System.out.println("Server start port:"+port); // 当前服务启起来后,把当前所有的启动信息注册到zookeeper上,创建节点 regZkServer(); Socket socket = null; while (true){ socket = serverSocket.accept(); // 客户端发送的消息全部放在ServerHandler中,它开启一个线程,提高整个程序的一个执行效率 new Thread(new ServerHandler(socket,ai)).start(); } } catch (IOException e) { e.printStackTrace(); }finally { try { if(serverSocket !=null){ serverSocket.close(); } } catch (IOException e) { e.printStackTrace(); } } } }
import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.Socket; import java.util.concurrent.atomic.AtomicInteger; /** * Created by yz on 2018/03/31. */ public class ServerHandler implements Runnable{ private Socket socket; private AtomicInteger ai; public ServerHandler(Socket socket,AtomicInteger ai) { this.socket = socket; this.ai = ai; } public void run() { BufferedReader in = null; PrintWriter out = null; try { in = new BufferedReader(new InputStreamReader(this.socket.getInputStream())); out = new PrintWriter(this.socket.getOutputStream(),true); String body = null; while (true){ body = in.readLine(); if(body == null){ break; } System.out.println("接收到客户端发送来的消息: "+body); out.println("这里是服务器端发送的消息,Hello, "+body+",count:"+ai.getAndIncrement()); } } catch (IOException e) { if(in !=null){ try { in.close(); } catch (IOException e1) { e1.printStackTrace(); } } if(out !=null){ out.close(); } } } }客户端(消费者):
import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.Socket; import java.util.ArrayList; import java.util.List; /** * 客户端 消费者 * socket客户端连接 * Created by yz on 2018/03/31. */ public class ZkServerClient { // 用来存放所有服务器端IP地址 public static List<String> listServer = new ArrayList<String>(); public static void main(String[] args) { initServer(); ZkServerClient client = new ZkServerClient(); // 读取键盘输入数据 BufferedReader console = new BufferedReader(new InputStreamReader(System.in)); while (true){ String name; try { name = console.readLine(); if("exit".equals(name)){ System.exit(0); } // 给服务器端发送消息 client.send(name); } catch (IOException e) { e.printStackTrace(); } } } /** * 注册所有server */ public static void initServer(){ listServer.clear(); listServer.add("127.0.0.1:8080"); } /** * 获取当前server信息 * @return */ public static String getServer(){ return listServer.get(0); } public void send(String name){ String server = ZkServerClient.getServer(); String[] cfg = server.split(":"); Socket socket = null; BufferedReader in = null; PrintWriter out = null; try { // 参数1:服务器端IP 参数2:端口号 socket = new Socket(cfg[0],Integer.parseInt(cfg[1])); in = new BufferedReader(new InputStreamReader(socket.getInputStream())); out = new PrintWriter(socket.getOutputStream(),true); out.println(name); while (true){ String resp = in.readLine(); if(resp == null){ break; }else if(resp.length()>0){ System.out.println("接收到服务器端传回来的消息: "+resp); break; } } } catch (IOException e) { e.printStackTrace(); }finally { if(in !=null){ try { in.close(); } catch (IOException e1) { e1.printStackTrace(); } } if(out != null){ out.close(); } } } }
启动两个服务器端,端口号分别为:8080、8081 效果如下:
七、消费者使用Zookeeper实现动态负载均衡
消费者使用Zookeeper实现动态负载均衡,即所有服务器地址都不是写死的,而是实时去Zookeeper注册中心上去获取到的。
import org.I0Itec.zkclient.IZkChildListener; import org.I0Itec.zkclient.ZkClient; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.Socket; import java.util.ArrayList; import java.util.List; /** * 客户端 消费者 * socket客户端连接 * Created by yz on 2018/03/31. */ public class ZkServerClient { // 用来存放所有服务器端IP地址 public static List<String> listServer = new ArrayList<String>(); public static void main(String[] args) { initServer(); ZkServerClient client = new ZkServerClient(); // 读取键盘输入数据 BufferedReader console = new BufferedReader(new InputStreamReader(System.in)); while (true){ String name; try { name = console.readLine(); if("exit".equals(name)){ System.exit(0); } // 给服务器端发送消息 client.send(name); } catch (IOException e) { e.printStackTrace(); } } } /** * 注册所有server */ public static void initServer(){ final ZkClient zkClient = new ZkClient("127.0.0.1:2181", 60000, 1000); // 读取集群服务器下节点信息 final String path = "/server"; // 获取/server 节点下所有子节点 final List<String> children = zkClient.getChildren(path); for (String chipath : children) { // 根据子节点完整路径读取value,添加到集合 listServer.add((String) zkClient.readData(path+"/"+chipath)); } System.out.println("###listServer:"+listServer.toString()); // 重要的一步,当节点发生变化的时候 重新读取,比如服务端有一台服务器挂掉,客户端也要做更新,通过zk事件监听 // 参数1表示订阅的监听地址 参数2 监听 zkClient.subscribeChildChanges(path, new IZkChildListener() { // 参数1父节点 参数2所有子节点 public void handleChildChange(String parentPath, List<String> parentPathchilist) throws Exception { // 监听到节点发生变化,更新节点信息 listServer.clear(); for (String chipath : parentPathchilist) { // 根据子节点完整路径读取value,添加到集合 listServer.add((String) zkClient.readData(parentPath+"/"+chipath)); } System.out.println("###事件通知,listServer:"+listServer.toString()); } }); // listServer.clear(); // listServer.add("127.0.0.1:8080"); } /** * 获取当前server信息 * @return */ public static String getServer(){ // 还没有使用负载均衡 return listServer.get(0); } public void send(String name){ String server = ZkServerClient.getServer(); String[] cfg = server.split(":"); Socket socket = null; BufferedReader in = null; PrintWriter out = null; try { // 参数1:服务器端IP 参数2:端口号 socket = new Socket(cfg[0],Integer.parseInt(cfg[1])); in = new BufferedReader(new InputStreamReader(socket.getInputStream())); out = new PrintWriter(socket.getOutputStream(),true); out.println(name); while (true){ String resp = in.readLine(); if(resp == null){ break; }else if(resp.length()>0){ System.out.println("接收到服务器端传回来的消息: "+resp); break; } } } catch (IOException e) { e.printStackTrace(); }finally { if(in !=null){ try { in.close(); } catch (IOException e1) { e1.printStackTrace(); } } if(out != null){ out.close(); } } } }启动消费者端,查看通过zk注册中心订阅效果:
现在手动强制停掉8081服务端(会有延迟)
现在消费者再次发送消息,只会发给8080端口服务器端
八、消费者加入轮训机制算法
负载均衡轮询机制算法---请求次数%服务器数量=机器位置
第一台服务器,位置0 8080
第二台服务器,位置1 8081
第一次请求
1%2=1 找8081
第二次请求
2%2=0 找8080
第三次请求
3%2=1 找8081
......
代码实现:
import org.I0Itec.zkclient.IZkChildListener; import org.I0Itec.zkclient.ZkClient; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.Socket; import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; /** * 客户端 消费者 * socket客户端连接 * Created by yz on 2018/03/31. */ public class ZkServerClient { // 用来存放所有服务器端IP地址 public static List<String> listServer = new ArrayList<String>(); public static void main(String[] args) { initServer(); ZkServerClient client = new ZkServerClient(); // 读取键盘输入数据 BufferedReader console = new BufferedReader(new InputStreamReader(System.in)); while (true){ String name; try { name = console.readLine(); if("exit".equals(name)){ System.exit(0); } // 给服务器端发送消息 client.send(name); } catch (IOException e) { e.printStackTrace(); } } } /** * 注册所有server */ public static void initServer(){ final ZkClient zkClient = new ZkClient("127.0.0.1:2181", 60000, 1000); // 读取集群服务器下节点信息 final String path = "/server"; // 获取/server 节点下所有子节点 final List<String> children = zkClient.getChildren(path); for (String chipath : children) { // 根据子节点完整路径读取value,添加到集合 listServer.add((String) zkClient.readData(path+"/"+chipath)); } System.out.println("###listServer:"+listServer.toString()); // 重要的一步,当节点发生变化的时候 重新读取,比如服务端有一台服务器挂掉,客户端也要做更新,通过zk事件监听 // 参数1表示订阅的监听地址 参数2 监听 zkClient.subscribeChildChanges(path, new IZkChildListener() { // 参数1父节点 参数2所有子节点 public void handleChildChange(String parentPath, List<String> parentPathchilist) throws Exception { // 监听到节点发生变化,更新节点信息 listServer.clear(); for (String chipath : parentPathchilist) { // 根据子节点完整路径读取value,添加到集合 listServer.add((String) zkClient.readData(parentPath+"/"+chipath)); } System.out.println("###事件通知,listServer:"+listServer.toString()); } }); // listServer.clear(); // listServer.add("127.0.0.1:8080"); } // 定义请求次数 int count = 0;--> 多线程时需要加同步 // 多线程线程安全计数器 static AtomicInteger ai = new AtomicInteger(0); /** * 获取当前server信息 * @return */ public static String getServer(){ // 获取集群服务器数量 int listServerCount = listServer.size(); // 负载均衡轮询算法,算出要调用的机器. 请求数%服务器数量=机器位置 // getAndIncrement 表示count++操作 String serverhost = listServer.get(ai.getAndIncrement() % listServerCount); return serverhost; } public void send(String name){ String server = ZkServerClient.getServer(); String[] cfg = server.split(":"); Socket socket = null; BufferedReader in = null; PrintWriter out = null; try { // 参数1:服务器端IP 参数2:端口号 socket = new Socket(cfg[0],Integer.parseInt(cfg[1])); in = new BufferedReader(new InputStreamReader(socket.getInputStream())); out = new PrintWriter(socket.getOutputStream(),true); out.println(name); while (true){ String resp = in.readLine(); if(resp == null){ break; }else if(resp.length()>0){ System.out.println("接收到服务器端传回来的消息: "+resp); break; } } } catch (IOException e) { e.printStackTrace(); }finally { if(in !=null){ try { in.close(); } catch (IOException e1) { e1.printStackTrace(); } } if(out != null){ out.close(); } } } }
将8081服务器启起来
重启消费者端,查看消费者端使用负载均衡轮询机制算法调用效果 ,见证奇迹的时刻: