应用场景:
在RPC框架中,使用Netty作为高性能的网络通信框架时,每一次服务调用,都需要与Netty服务端建立连接的话,很容易导致Netty服务器资源耗尽。
所以,想到连接池技术,将同一个服务地址建立的连接放入池中维护,同一个地址的连接确保只建立一次。
这样就可以大大减少连接的个数,从而大幅度提升服务器性能。
模拟代码:
下面就通过几段代码来模拟Tcp客户端和Tcp服务器端建立连接并发送消息的场景。
这里并没有真的使用Netty框架,因为本文不是讲怎么使用Netty框架,而是分享如何管理连接。
首先,模拟一个Tcp客户端程序(就当做是Netty的客户端):
1 /** 2 * 模拟TCP客户端 3 * 4 * @author syj 5 */ 6 public class NetChannel { 7 8 /** 9 * 建立连接 10 * 11 * @param host 12 * @param port 13 */ 14 public void connect(String host, int port) { 15 System.out.println("模拟连接TCP服务器成功: host=" + host + ",port=" + port); 16 } 17 18 /** 19 * 发送消息 20 * 21 * @param msg 22 */ 23 public void send(String msg) { 24 System.out.println("模拟向TCP服务器发送消息成功:" + msg); 25 } 26 }
封装一下上面的Netty客户端:
1 /** 2 * 模拟TCP客户端 3 * 4 * @author syj 5 */ 6 public class NetClient extends ConnectClient { 7 8 // 模拟TCP客户端 9 private NetChannel channel; 10 11 /** 12 * 建立连接 13 * 14 * @param address 格式 host:port, 例如 192.168.1.103:9999 15 * @throws Exception 16 */ 17 @Override 18 public void init(String address) throws Exception { 19 if (address == null || address.trim().length() == 0) { 20 throw new RuntimeException(">>>> address error"); 21 } 22 String[] split = address.split(":"); 23 if (split.length != 2) { 24 throw new RuntimeException(">>>> address error"); 25 } 26 String host = split[0]; 27 int port = Integer.valueOf(split[1]); 28 channel = new NetChannel(); 29 channel.connect(host, port); 30 } 31 32 /** 33 * 发送消息 34 * 35 * @param msg 36 * @throws Exception 37 */ 38 @Override 39 public void send(String msg) throws Exception { 40 channel.send(msg); 41 } 42 }
连接管理类:
该类使用一个ConcurrentHashMap作为连接池,来保存与TCP服务器建立的连接,key是TCP服务器的地址,value是连接对象。
由于是多线程环境,为保证线程安全问题,使用synchronized加锁,避免一个连接被创建多次。
由于可能会有很多针对同一个TCP服务器的连接请求,使用lockClientMap来管理锁,同一个TCP服务器的请求使用同一把锁,保证同一个TCP服务器的连接只创建一次。
这样既保证了线程安全,又能降低性能消耗。
1 import java.util.concurrent.ConcurrentHashMap; 2 3 /** 4 * TCP连接管理 5 * 6 * @author syj 7 */ 8 public abstract class ConnectClient { 9 10 /** 11 * 建立连接 12 * 13 * @param address 14 * @throws Exception 15 */ 16 public abstract void init(String address) throws Exception; 17 18 /** 19 * 发送消息 20 * 21 * @param msg 22 * @throws Exception 23 */ 24 public abstract void send(String msg) throws Exception; 25 26 /** 27 * 发送消息 28 * 29 * @param address 30 * @param msg 31 * @param netImpl 32 * @throws Exception 33 */ 34 public static void asyncSend(String address, String msg, Class<? extends ConnectClient> netImpl) throws Exception { 35 ConnectClient connect = ConnectClient.getConnect(address, netImpl); 36 connect.send(msg); 37 } 38 39 // 连接池 40 private static volatile ConcurrentHashMap<String, ConnectClient> connectClientMap; 41 // 锁 42 private static volatile ConcurrentHashMap<String, Object> lockClientMap = new ConcurrentHashMap<>(); 43 44 /** 45 * 获取连接 46 * 确保同一个TCP服务器地址对应的连接只建立一次 47 * 48 * @param netImpl 49 * @return 50 * @throws Exception 51 */ 52 public static ConnectClient getConnect(String address, Class<? extends ConnectClient> netImpl) throws Exception { 53 // 创建连接池 54 if (connectClientMap == null) { 55 synchronized (ConnectClient.class) { 56 if (connectClientMap == null) { 57 connectClientMap = new ConcurrentHashMap<>(); 58 } 59 } 60 } 61 62 // 获取连接 63 ConnectClient connectClient = connectClientMap.get(address); 64 if (connectClient != null) { 65 return connectClient; 66 } 67 68 // 获取锁,同一个地址使用同一把锁 69 Object lock = lockClientMap.get(address); 70 if (lock == null) { 71 lockClientMap.putIfAbsent(address, new Object()); 72 lock = lockClientMap.get(address); 73 } 74 synchronized (lock) { 75 connectClient = connectClientMap.get(address); 76 if (connectClient != null) { 77 return connectClient; 78 } 79 80 // 新建连接 81 ConnectClient client = netImpl.newInstance(); 82 client.init(address); 83 // 放入连接池 84 connectClientMap.put(address, client); 85 } 86 connectClient = connectClientMap.get(address); 87 return connectClient; 88 } 89 }
任务类用于并发连接测试:
1 import java.util.UUID; 2 3 /** 4 * 任务 5 * 6 * @author syj 7 */ 8 public class Task implements Runnable { 9 10 private Class<? extends ConnectClient> netType;// 客户端类型 11 private String address; 12 private long count; 13 14 public Task(String address, long count, Class<? extends ConnectClient> netType) { 15 this.address = address; 16 this.count = count; 17 this.netType = netType; 18 } 19 20 @Override 21 public void run() { 22 try { 23 String uuid = UUID.randomUUID().toString().replace("-", ""); 24 String msg = String.format("%s \t %s \t %s \t %s", Thread.currentThread().getName(), count, address, uuid); 25 ConnectClient.asyncSend(address, msg, netType); 26 } catch (Exception e) { 27 e.printStackTrace(); 28 } 29 } 30 }
测试类(模拟了10个TCP服务器的地址和端口):
通过一个死循环来模拟测试高并发场景下,连接的线程安全和性能表现。
1 import java.util.concurrent.ExecutorService; 2 import java.util.concurrent.Executors; 3 4 /** 5 * 模拟TCP客户端并发获取连接发送消息 6 * 7 * @author syj 8 */ 9 public class App { 10 11 // TCP服务器通信地址和端口 12 public static final String[] NET_ADDRESS_ARR = { 13 "192.168.1.101:9999", 14 "192.168.1.102:9999", 15 "192.168.1.103:9999", 16 "192.168.1.104:9999", 17 "192.168.1.105:9999", 18 "192.168.1.106:9999", 19 "192.168.1.107:9999", 20 "192.168.1.108:9999", 21 "192.168.1.109:9999", 22 "192.168.1.110:9999" 23 }; 24 25 public static ExecutorService executorService = Executors.newCachedThreadPool(); 26 public static volatile long count;// 统计任务执行总数 27 28 public static void main(String[] args) { 29 while (true) { 30 try { 31 Thread.sleep(5);// 防止 CPU 100% 32 } catch (InterruptedException e) { 33 e.printStackTrace(); 34 } 35 executorService.execute(new Task(NET_ADDRESS_ARR[(int) (Math.random() * 10)], ++count, NetClient.class)); 36 executorService.execute(new Task(NET_ADDRESS_ARR[(int) (Math.random() * 10)], ++count, NetClient.class)); 37 } 38 } 39 }
测试结果:
1 模拟连接TCP服务器成功: host=192.168.1.110,port=9999 2 模拟向TCP服务器发送消息成功:pool-1-thread-57 57 192.168.1.110:9999 3ff9c0c7e6fc4439be3bb037659441d4 3 模拟连接TCP服务器成功: host=192.168.1.102,port=9999 4 模拟向TCP服务器发送消息成功:pool-1-thread-58 58 192.168.1.102:9999 4ab4ecf93aed413199e364fb5dd29009 5 模拟连接TCP服务器成功: host=192.168.1.108,port=9999 6 模拟向TCP服务器发送消息成功:pool-1-thread-54 54 192.168.1.108:9999 4558ed7db32f43b6b79055fd69591b12 7 模拟连接TCP服务器成功: host=192.168.1.107,port=9999 8 模拟向TCP服务器发送消息成功:pool-1-thread-53 53 192.168.1.107:9999 139fed295dfe4014830ffe859dbdcbcd 9 模拟连接TCP服务器成功: host=192.168.1.101,port=9999 10 模拟向TCP服务器发送消息成功:pool-1-thread-53 60 192.168.1.101:9999 e38c8c388e164f74bcd01a944e099d4c 11 模拟向TCP服务器发送消息成功:pool-1-thread-59 59 192.168.1.102:9999 f4de1563458f45278375678f17afe031 12 模拟连接TCP服务器成功: host=192.168.1.103,port=9999 13 模拟向TCP服务器发送消息成功:pool-1-thread-1 1 192.168.1.103:9999 32abccc634a047fea2da46ca4152e4fa 14 模拟向TCP服务器发送消息成功:pool-1-thread-52 52 192.168.1.110:9999 f657227b3c2d4d888cb55bb478030c96 15 模拟向TCP服务器发送消息成功:pool-1-thread-51 51 192.168.1.103:9999 1ba88ef58f0a498e9b73a8c5786564a4 16 模拟连接TCP服务器成功: host=192.168.1.104,port=9999 17 模拟向TCP服务器发送消息成功:pool-1-thread-50 50 192.168.1.104:9999 fa5db1f0226043c09e62f77b8fb78e99 18 模拟向TCP服务器发送消息成功:pool-1-thread-55 55 192.168.1.103:9999 34b43f984a8b4fff8bbc86a9f27a9e26 19 模拟向TCP服务器发送消息成功:pool-1-thread-49 49 192.168.1.104:9999 a9b08d8a88054ec1ba25064c016d7bf7 20 模拟向TCP服务器发送消息成功:pool-1-thread-56 56 192.168.1.103:9999 0b63e1c03fdd4b10b2006d1dc7c5c4cd 21 模拟连接TCP服务器成功: host=192.168.1.109,port=9999 22 模拟向TCP服务器发送消息成功:pool-1-thread-48 48 192.168.1.109:9999 4379e43d62f9459bac75fe3f2b0fe0c1 23 模拟向TCP服务器发送消息成功:pool-1-thread-47 47 192.168.1.104:9999 dd612ebb20574eee88beb5cf07126349 24 模拟向TCP服务器发送消息成功:pool-1-thread-46 46 192.168.1.103:9999 7e5a63fdd6474567b792c8d2c3da40de 25 模拟向TCP服务器发送消息成功:pool-1-thread-45 45 192.168.1.103:9999 52540cb7dc74410e8bc5081c1b026f67 26 模拟向TCP服务器发送消息成功:pool-1-thread-44 44 192.168.1.108:9999 ffb7663837b54fd58cf72287eabd3006 27 模拟向TCP服务器发送消息成功:pool-1-thread-43 43 192.168.1.103:9999 032b0a5b8c7c4be2a4f9864630cf6ca9 28 模拟向TCP服务器发送消息成功:pool-1-thread-42 42 192.168.1.103:9999 eb2416d001ef4be7a9571965ed7752a5 29 模拟向TCP服务器发送消息成功:pool-1-thread-41 41 192.168.1.108:9999 0dc4aa39067f402f86844c433ed51e4c 30 模拟向TCP服务器发送消息成功:pool-1-thread-41 61 192.168.1.101:9999 df8d40a6f9c042f498ce9b851d80bc0d 31 模拟向TCP服务器发送消息成功:pool-1-thread-42 62 192.168.1.102:9999 79afe4c704604815861ccabd3e649df2 32 模拟向TCP服务器发送消息成功:pool-1-thread-40 40 192.168.1.104:9999 f0590bd974914dd583cedb27af134e99 33 模拟向TCP服务器发送消息成功:pool-1-thread-39 39 192.168.1.103:9999 d4f8a89e655c4c5788a52d3f64a94f31 34 模拟向TCP服务器发送消息成功:pool-1-thread-38 38 192.168.1.110:9999 2cd6b5c7558f473db1d1de04ca64b2de 35 模拟向TCP服务器发送消息成功:pool-1-thread-37 37 192.168.1.101:9999 36f3ade303ce41bb9117d818601fd034 36 模拟向TCP服务器发送消息成功:pool-1-thread-36 36 192.168.1.108:9999 5a094a9f93334fd79f6255da4b8e4eaa 37 模拟连接TCP服务器成功: host=192.168.1.106,port=9999 38 模拟向TCP服务器发送消息成功:pool-1-thread-35 35 192.168.1.106:9999 ba84b102d25d49868dbdef4aaf2988a7 39 模拟连接TCP服务器成功: host=192.168.1.105,port=9999 40 模拟向TCP服务器发送消息成功:pool-1-thread-34 34 192.168.1.105:9999 531c9c69ebfe4654bad581109494520a 41 模拟向TCP服务器发送消息成功:pool-1-thread-33 33 192.168.1.101:9999 64380562c78e402ab8b64921837052b2 42 模拟向TCP服务器发送消息成功:pool-1-thread-32 32 192.168.1.106:9999 bb9835e6e55c4799bb56a701cf148c54 43 模拟向TCP服务器发送消息成功:pool-1-thread-31 31 192.168.1.105:9999 34392954e7124702a0467294f5357377 44 模拟向TCP服务器发送消息成功:pool-1-thread-30 30 192.168.1.109:9999 35d1921050a14cbb84580e228616e8b6 45 模拟向TCP服务器发送消息成功:pool-1-thread-29 29 192.168.1.108:9999 3cd158b975934b08b1cd47ace3cb5a2a 46 模拟向TCP服务器发送消息成功:pool-1-thread-30 63 192.168.1.103:9999 b2d96a09018043cbb216882468ed9bbb 47 模拟向TCP服务器发送消息成功:pool-1-thread-31 64 192.168.1.110:9999 8dac07efebb44e79acc52c5c574c4497 48 模拟向TCP服务器发送消息成功:pool-1-thread-28 28 192.168.1.103:9999 dc34d5c8099245b1980ec1f97d0158bf 49 模拟向TCP服务器发送消息成功:pool-1-thread-26 26 192.168.1.108:9999 7e16edaea00c44c586ea9e712b001c17 50 模拟向TCP服务器发送消息成功:pool-1-thread-25 25 192.168.1.103:9999 60c3c3d505c24eb9a4b240ddb5486996
可见,与每个TCP服务器的连接只会建立一次,连接得到复用。