短轮询
-
<%@ page language = "java" contentType= "text/html; charset=UTF-8" pageEncoding= "UTF-8" %> <!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> <html> <head> <meta http-equiv="Content-Type" content="text/html; charset=UTF-8"> <title>服务器时间</title> </head> <body> <h1>服务器时间</h1> <div> <div> <h2>服务器当前时间为:</h2> <div style="color:#F00"><b><p id="serverTime"> </p></b></div> </div> </div> <script type="text/javascript" src="assets/js/jquery-1.9.1.min.js"></script> <script type="text/javascript"> //showTime(); function showTime(){ $.get("showTime",function (data) { console.log(data); $("#serverTime").html(data); }) } setInterval(showTime, 1000); </script> </body> </html>
-
package cn.enjoyedu.normal; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; import java.text.SimpleDateFormat; import java.util.Date; /** * @author Mark老师 * 往期课程和VIP课程咨询 依娜老师 QQ:2133576719 * 类说明: */ @Controller public class ShowTimeController { private static Logger logger = LoggerFactory.getLogger(ShowTimeController.class); @RequestMapping("/time") public String normal(){ return "showtime"; } @RequestMapping(value="/showTime",produces = "text/html;charset=UTF-8") @ResponseBody public String getTime(){ SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); return formatter.format(new Date()); } }
-
js定时器函数setInterval(),定时调用后端showTime方法
-
缺点
- 对服务器有压力
- 数据同步不及时,轮询过程中服务器更新的数据得不到响应
长轮询
- 在短轮询的基础上,服务器不是立马响应,而是服务器有数据更新后才会往客户端发送响应
servlet3异步任务
- 为什么长轮询要用异步任务?
- 消耗服务器资源,需要启动线程应付请求
- tomcat默认是200
- 如果用同步任务,tomcat很容易就打满了
- servlet3新建一个线程,把请求打包,交给其他的线程执行,从而这个tomcat线程就释放出来了
原生servlet
-
package cn.enjoyedu; /** * @author Mark老师 * 往期课程和VIP课程咨询 依娜老师 QQ:2133576719 * 类说明: */ public class AsyncServletDemo { //监听器里初始化线程池 static class listener implements ServletContextListener{ public void contextInitialized(ServletContextEvent sce) { //初始化 } public void contextDestroyed(ServletContextEvent sce) { //销毁线程池 } } class work implements Runnable{ public void run() { //do business work //AsyncContext //get response } } //3 class AsnycLi implements AsyncListener{ public void onComplete(AsyncEvent event) throws IOException { } public void onTimeout(AsyncEvent event) throws IOException { } public void onError(AsyncEvent event) throws IOException { } public void onStartAsync(AsyncEvent event) throws IOException { } } //4 @WebServlet(asyncSupported=true) class AsyncBusiServlet extends HttpServlet{ doGet(){ AsyncContext asyncContext = request.statrAsync(); asyncContext(new AppAsyncListener()); threadPool.execute(new BusiProcessor(asyncContext)); } } }
-
使用起来很麻烦
模拟推送消息–长轮询实现–spring中的DeferedResult
-
<%@ page language = "java" contentType= "text/html; charset=UTF-8" pageEncoding= "UTF-8" %> <!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> <html> <head> <meta http-equiv="Content-Type" content="text/html; charset=UTF-8"> <title>新闻推送</title> </head> <body> <h1>每日头条</h1> <div> <div> <h2>每日头条新闻实时看</h2> <div style="color:#F00"><b><p id="realTimeNews"> </p></b></div> </div> <hr> <div> 对于美制裁中兴一事,商务部新闻发言人高峰19日在回答记者提问时再次强调,美方行径引起了市场对美国贸易和投资环境的普遍担忧, 美方的行为表面针对中国,但最终伤害的是美国自身,不仅会使其丧失数以万计的就业机会,还会影响成百上千的美国关联企业, 将会动摇国际社会对美国投资和营商环境稳定的信心。希望美方不要自作聪明,否则只会自食其果。也希望美方不要低估中方的决心, 如果美方坚持通过单边主义的保护政策,不惜伤害中美两国企业利益,企图遏制中国发展,迫使中国作出让步,那是打错算盘, 中方坚决捍卫国家和人民利益的决心和信心不会有丝毫动摇,我们会进行坚决的斗争。(记者于佳欣) </div> <hr> <div> [中国空军多型战机连续“绕岛巡航”检验实战能力]中国空军新闻发言人申进科大校4月19日发布消息,空军近日连续组织多架轰炸机、侦察机成体系“绕岛巡航”,锤炼提升维护国家主权和领土完整的能力。 空军开展的海上方向实战化军事训练,出动了轰-6K、苏-30、歼-11和侦察机、预警机等多型多架战机。轰-6K等战机实施了“绕岛巡航”训练课题,提升了机动能力,检验了实战能力。 轰-6K战机是中国自主研发的中远程新型轰炸机,担当投送国家威力和意志的重要使命。空军近年来远海远洋训练和绕岛巡航中,都有轰-6K战机的英姿。 空军航空兵某师轰-6K机长翟培松表示,“这次绕岛巡航,我们用战神的航迹丈量祖国的大好河山,除了自豪,更多的是自信。改革开放、强军兴军,我们的战机更先进了,我们飞行员的翅膀更硬了,有自信和胆气应对任何挑战。祖国在我们心中,宝岛在祖国怀中。捍卫祖国的大好河山,是空军飞行员的神圣使命。” 在新时代练兵备战中,空军依照相关国际法和国际实践,飞越宫古海峡、巴士海峡、对马海峡,持续组织海上方向实战实训。空军还要按照既定计划,继续组织多型战机“绕岛巡航”。 空军航空兵某团轰-6K飞行员杨勇说,“这两天,我们接连绕岛巡航,战斗机不断刷新战斗航迹,飞行员不断刷新战斗经历。每一次绕岛巡航,都强一份使命担当、多一分血性胆气。听从祖国和人民的召唤,空军飞行员勇往直前。” 走实训之路,练打赢之功。空军轰-6K飞行员群体聚研战谋战的心气、砺勇往直前的胆气、壮敢打必胜的底气、养砺剑亮剑的霸气,把“思想政治要过硬、打仗本领要过硬、战斗作风要过硬”的战略要求,落实到每一个战斗岗位、每一次战斗起飞。 空军新闻发言人表示,按照“空天一体、攻防兼备”战略目标,空军深入开展海上方向实战化军事训练,备战打仗能力发生历史性变化。空军有坚定的意志、充分的信心和足够的能力,捍卫国家主权和领土完整。 </div> </div> <script type="text/javascript" src="assets/js/jquery-1.9.1.min.js"></script> <script type="text/javascript"> longLoop(); function longLoop() { $.get("realTimeNews",function (data) { console.log(data); $("#realTimeNews").html(data); longLoop(); }) } </script> </body> </html>
-
package cn.enjoyedu.servlet3; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.context.request.async.DeferredResult; import javax.servlet.http.HttpServletRequest; import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * @author Mark老师 * 往期课程和VIP课程咨询 依娜老师 QQ:2133576719 * 类说明: */ @Controller @RequestMapping(produces="text/html;charset=UTF-8") /*记得要在WebInitializer中增加servlet.setAsyncSupported(true);*/ public class PushNewsController { private ExecutorService executorService = Executors.newFixedThreadPool(1); @RequestMapping("/pushnews") public String news(){ return "pushNews"; } @RequestMapping(value="/realTimeNews") @ResponseBody /*在WebInitializer中要加上servlet.setAsyncSupported(true);*/ public DeferredResult<String> realtimeNews(HttpServletRequest request){ // 有一个请求,就产生一个DeferredResult实例 final DeferredResult<String> dr = new DeferredResult<String>(); // 异步执行 executorService.submit(new Runnable() { @Override public void run() { try { // 模拟服务器更新了 Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } int index = new Random().nextInt(Const.NEWS.length); // 有结果后,把result结果set进DeferredResult中 dr.setResult(Const.NEWS[index]); } }); return dr; } }
-
只有服务器数据更新了,才会返回,返回到客户端,由于客户端的连接关闭了,此时又需要重新建立连接,如果在这段时间了数据又更新了,这部分怎么办?还是不够实时
Server-sent-events(SSE)
-
和websocket都是h5提出来的
-
完全基于浏览器,还是一个http协议
-
在向客户端发出响应时,明确告诉是流信息,而轮询是数据包,sse会连续不断发送过来,要求客户端不要关闭连接
-
如果服务器再一次向客户端发起请求,将会是一次新的sse,所以sse也是单工的,真正的双工是websocket
-
<%@ page language = "java" contentType= "text/html; charset=UTF-8" pageEncoding= "UTF-8" %> <!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> <html> <head> <meta http-equiv="Content-Type" content="text/html; charset=UTF-8"> <title>贵金属期货</title> </head> <body> <h1>贵金属期货</h1> <div> <div> <h2>贵金属列表</h2> </div> <div> <h2 id="hint"></h2> </div> <hr> <div> <div><p>黄金</p><p id="c0" style="color:#F00"></p><b><p id="s0">历史价格:</p></b></div> <div><p>白银</p><p id="c1" style="color:#F00"></p><b><p id="s1">历史价格:</p></b></div> <div><p>铂</p><p id="c2" style="color:#F00"></p><b><p id="s2">历史价格:</p></b></div> <div><p>铱</p><p id="c3" style="color:#F00"></p><b><p id="s3">历史价格:</p></b></div> </div> <hr> </div> <script type="text/javascript" src="assets/js/jquery-1.9.1.min.js"></script> <script type="text/javascript"> function showPrice(index,data){ $("#c"+index).html("当前价格:"+data); var s = $("#s"+index).html(); $("#s"+index).html(s+data+" "); } if(!!window.EventSource){ var source = new EventSource('needPrice'); source.onmessage=function (e) { var dataObj=e.data; var arr = dataObj.split(','); $.each(arr, function (i, item) { showPrice(i,item); }); $("#hint").html(""); }; source.onopen=function (e) { console.log("Connecting server!"); }; source.onerror=function () { console.log("error"); }; }else{ $("#hint").html("您的浏览器不支持SSE!"); } </script> </body> </html>
-
要求浏览器要支持window.EventSource对象
-
onmessage:收到消息,业务逻辑
-
onopen:收到连接
扫描二维码关注公众号,回复: 12874667 查看本文章 -
onerror:出现错误
-
package cn.enjoyedu.sse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; import javax.annotation.PostConstruct; import javax.servlet.http.HttpServletResponse; import java.io.PrintWriter; import java.util.Random; /** *@author Mark老师 * *类说明:贵金属期货的实现 */ @Controller public class NobleMetalController { private static Logger logger = LoggerFactory.getLogger(NobleMetalController.class); @RequestMapping("/nobleMetal") public String stock(){ return "nobleMetal"; } @RequestMapping(value="/needPrice" ,produces="text/event-stream;charset=UTF-8" ) @ResponseBody public String push(){ Random r = new Random(); try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } return makeResp(r); } /*业务方法,生成贵金属的实时价格*/ private String makeResp(Random r){ StringBuilder stringBuilder = new StringBuilder(""); // sse一定要按照这个格式 stringBuilder.append("retry:2000\n") .append("data:") .append(r.nextInt(100)+50+",") .append(r.nextInt(40)+35) .append("\n\n"); return stringBuilder.toString(); } /*------------以下为正确使用SSE的姿势------------------*/ @RequestMapping("/nobleMetalr") public String stockr(){ return "nobleMetalAlso"; } @RequestMapping(value="needPricer") public void pushRight(HttpServletResponse response){ // 需要配置,不然仍然是普通的http请求 response.setContentType("text/event-stream"); response.setCharacterEncoding("utf-8"); Random r = new Random(); try { // 通过response拿到一个输出流,直接write和flush往浏览器上写 PrintWriter pw = response.getWriter(); int i = 0; while(i<10){ if(pw.checkError()){ System.out.println("客户端断开连接"); return; } Thread.sleep(1000); pw.write(makeResp(r)); pw.flush(); i++; } System.out.println("达到阈值,结束发送......."); pw.write("data:stop\n\n"); pw.flush(); } catch (Exception e) { e.printStackTrace(); } } }
-
@RequestMapping(value="/needPrice"
,produces=“text/event-stream;charset=UTF-8”
)- 传输对象是文本,并且是数据流,要求客户端不能关闭连接
-
凡是通过sse发送的数据,需要遵循相关格式
-
[field]:value data:123456\n data:123456\n\n id: event: retry:
-
注意
- 上述sse实例代码中 /------------以下为正确使用SSE的姿势------------------/之上的部分使用是错误的,表现出的样子和轮询的样子没有区别,也是在不停的请求服务器
- 因为有tomcat和浏览器通讯,return后,tomcat会自动关闭连接
- 而sse有短线重连的功能,所以会不断的请求
- 产生了上述这样一个矛盾?
- 通过response拿到一个输出流,直接write和flush往浏览器上写
- 缺点
- 只支持文本
WebSocket通信
- 真正意义上的全双工,服务器往客户端发,客户端往服务器也可以发
- 不止可以发送文本,还可以发送二进制文件
特点
- HTML5中的协议,实现与客户端与服务器双向,基于消息的文本或二进制数据通信
- 适合于对数据的实时性要求比较强的场景,如通信、直播、共享桌面,特别适合于客户与服务频繁交互的情况下,如实时共享、多人协作等平台。
- 采用新的协议,后端需要单独实现
- 客户端并不是所有浏览器都支持
各种推送方式比较
-
1、短轮询 2、长轮询 3、SSE 4、WebSocket 浏览器支持度 最高 很高 中(IE和Edge均不支持) 中(早期的浏览器不支持) 实时性 最低 较高 很高 很高 代码实现复杂度 最容易 较容易 容易 最复杂 连接性质 短连接 长连接 长连接 长连接 适用 需要服务极大量或极小量的用户,实时性要求不高 准实时性的应用,比较关注浏览器的兼容性 实时,基本都是文本交互的应用 实时,需要支持多样化的用户数据类型的应用或者是原生程序
一线互联网公司会用什么?
-
淘宝扫描登陆和京东扫描支付
-
短轮询
1.体量大,浏览器支持度是要考虑的
-
-
网页版微信和网页版QQ
-
长轮询
1.体量大,浏览器支持度是要考虑的
2.发送消息要求有一定的实时性,5秒肯定不行,0.5秒察觉不到,可行
-
WebSocket通信
-
Websocket借用了HTTP的协议来完成一部分握手
- 这个握手是应用层的握手,服务器端和客户端各自确认能够接受的websocket版本
-
流程
-
1.客户端通过HTTP(S)向服务器发起WebSocket握手,并等待确认
注意:不仅是客户端向服务器,也可以是服务器向客户端
-
2.连接协议升级到WebSocket
-
请求头(597字节)
-
Accept:text/html,application/xhtml+xm...plication/xml;q=0.9,*/*;q=0.8 Accept-Encoding:gzip,deflate Accept-Language:zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2 Cache-Control:no-cache Connect:keep-alive,Upgrage Host:localhost:8080 Origin:http://localhost:8080 Pragma:no-cache Sec-WebSocket-Extensions:permessage-deflate Sec-WebSocket-key:6p2n3gEho07Rq+woxnjMwg== Sec-WebSocket-Version:13 Upgrade:websocket User-Agent:Mozilla/5.0(windows NT 10.0;...) Gecko/20100101 Firefox/62.0
-
Connect:keep-alive,Upgrage
- 告诉服务器需要升级
-
Sec-WebSocket-Extensions:permessage-deflate
Sec-WebSocket-key:6p2n3gEho07Rq+woxnjMwg==
Sec-WebSocket-Version:13
Upgrade:websocket- 升级到websocket
响应头(345字节)
-
Access-Control-Allow-Credentials:true Access-Control-Allow-Origin:http://localhost:8080 Connection:upgrage Date:Fri,21 Sep 2018 02:36:52 GMT Sec-WebSocket-Accept:KErWZcdG0VnncKypybMYAKf/WUE= Sec-WebSocket-Extensions:permessage-deflate Server:Apache-Coyote/1.1 Upgrage:websocket Vary:Origin
-
Connection:upgrage
- 支持升级
-
Upgrage:websocket
- 升级到websocket
-
Sec-WebSocket-Accept:KErWZcdG0VnncKypybMYAKf/WUE=
- 信息摘要数据
WebSocket通信-STOMP
- WebSocket是个规范,在实际的实现中有HTML5规范中的WebSocket API和WebSocket的子协议STOMP。
STOMP(Simple Text Oriented Messaging Protocol)
- 简单(流)文本定向消息协议
- STOMP协议的前身是TTMP协议(一个简单的基于文本的协议),专为消息中间件设计。是属于消息队列的一种协议, 和AMQP, JMS平级. 它的简单性恰巧可以用于定义websocket的消息体格式. STOMP协议很多MQ都已支持, 比如RabbitMq, ActiveMq。
- 生产者(发送消息)、消息代理、消费者(订阅然后收到消息)
- STOMP是基于帧的协议
WebSocket通信实现
SpringBoot
-
基于Stomp的网页版聊天室/IM的实现
需要sockjs.min.js、stomp.min.js、jquery.js
浏览器 ----> Endpoint 1.建立连接 3.发送请求 @MessageMapping | V 消息代理(broker) 浏览器 --> 2.订阅 <--> mass(广播) 浏览器 --> 2.订阅单聊的 <--> user
wechat_room.html
-
<!DOCTYPE html> <html xmlns:th="http://www.thymeleaf.org"> <head> <meta charset="utf-8"> <meta name="aplus-terminal" content="1"> <meta name="apple-mobile-web-app-title" content=""> <meta name="apple-mobile-web-app-capable" content="yes"> <meta name="apple-mobile-web-app-status-bar-style" content="black-translucent"> <meta name="viewport" content="width=device-width,initial-scale=1,maximum-scale=1,user-scalable=no"> <meta name="format-detection" content="telephone=no, address=no"> <title>聊天</title> <style type="text/css"> /*公共样式*/ body,h1,h2,h3,h4,p,ul,ol,li,form,button,input,textarea,th,td { margin:0; padding:0 } body,button,input,select,textarea { font:12px/1.5 Microsoft YaHei UI Light,tahoma,arial,"\5b8b\4f53"; *line-height:1.5; -ms-overflow-style:scrollbar } h1,h2,h3,h4{ font-size:100% } ul,ol { list-style:none } a { text-decoration:none } a:hover { text-decoration:underline } img { border:0 } button,input,select,textarea { font-size:100% } table { border-collapse:collapse; border-spacing:0 } /*rem*/ html { font-size:62.5%; } body { font:16px/1.5 "microsoft yahei", 'tahoma'; } body .mobile-page { font-size: 1.6rem; } /*浮动*/ .fl{ float: left; } .fr{ float: right; } .clearfix:after{ content: ''; display: block; height: 0; clear: both; visibility: hidden; } body{ background-color: #F5F5F5; } .mobile-page{ max-width: 600px; } .mobile-page .admin-img, .mobile-page .user-img{ width: 45px; height: 45px; } i.triangle-admin,i.triangle-user{ width: 0; height: 0; position: absolute; top: 10px; display: inline-block; border-top: 10px solid transparent; border-bottom: 10px solid transparent; } .mobile-page i.triangle-admin{ left: 4px; border-right: 12px solid #fff; } .mobile-page i.triangle-user{ right: 4px; border-left: 12px solid #9EEA6A; } .mobile-page .admin-group, .mobile-page .user-group{ padding: 6px; display: flex; display: -webkit-flex; } .mobile-page .admin-group{ justify-content: flex-start; -webkit-justify-content: flex-start; } .mobile-page .user-group{ justify-content: flex-end; -webkit-justify-content: flex-end; } .mobile-page .admin-reply, .mobile-page .user-reply{ display: inline-block; padding: 8px; border-radius: 4px; background-color: #fff; margin:0 15px 12px; } .mobile-page .admin-reply{ box-shadow: 0px 0px 2px #ddd; } .mobile-page .user-reply{ text-align: left; background-color: #9EEA6A; box-shadow: 0px 0px 2px #bbb; } .mobile-page .user-msg, .mobile-page .admin-msg{ width: 75%; position: relative; } .mobile-page .user-msg{ text-align: right; } .chatRecord{ width: 100%; height: 400px; border-bottom: 1px solid blue; line-height:20px; overflow:auto; overflow-x:hidden; } </style> </head> <body> <div> <div style="float:left;width:47%"> <p>请选择你是谁: <select id="selectName" onchange="stompQueue();"> <option value="1">请选择</option> <option value="Mark">Mark</option> <option value="James">James</option> <option value="Lison">Lison</option> <option value="Peter">Peter</option> <option value="King">King</option> </select> </p> <div class="chatWindow"> <p style="color:darkgrey">群聊:</p> <section id="chatRecord1" class="chatRecord"> <div id="mass_div" class="mobile-page"> </div> </section> <section class="sendWindow"> <textarea name="sendChatValue" id="sendChatValue" class="sendChatValue"></textarea> <input type="button" name="sendMessage" id="sendMassMessage" class="sendMessage" onclick="sendMassMessage()" value="发送"> </section> </div> </div> <div style="float:right; width:47%"> <p>请选择你要发给谁: <select id="selectName2"> <option value="1">请选择</option> <option value="Mark">Mark</option> <option value="James">James</option> <option value="Lison">Lison</option> <option value="Peter">Peter</option> <option value="King">King</option> </select> </p> <div class="chatWindow"> <p style="color:darkgrey">单聊:</p> <section id="chatRecord2" class="chatRecord"> <div id="alone_div" class="mobile-page"> </div> </section> <section class="sendWindow"> <textarea name="sendChatValue2" id="sendChatValue2" class="sendChatValue"></textarea> <input type="button" name="sendMessage" id="sendAloneMessage" class="sendMessage" onclick="sendAloneMessage()" value="发送"> </section> </div> </div> </div> <!-- 独立JS --> <script th:src="@{sockjs.min.js}"></script> <script th:src="@{stomp.min.js}"></script> <script th:src="@{jquery.js}"></script> <script th:src="@{wechat_room.js}"></script> </body> </html>
wechat_room.js
-
var stompClient = null; //加载完浏览器后调用connect(),打开通道 $(function(){ //打开双通道 connect() }) //强制关闭浏览器时调用websocket.close(),进行正常关闭 window.onunload = function() { disconnect() } //打开通道 function connect(){ var socket = new SockJS('/endpointMark'); //连接SockJS的endpoint名称为"endpointMark" stompClient = Stomp.over(socket);//使用STMOP子协议的WebSocket客户端 stompClient.connect({ },function(frame){ //连接WebSocket服务端 console.log('Connected:' + frame); //接收广播信息 // 订阅广播消息 // 下面新定义了一个js的函数 stompTopic(); }); } //关闭通道 function disconnect(){ if(stompClient != null) { stompClient.disconnect(); } console.log("Disconnected"); } //一对多,发起订阅 function stompTopic(){ //通过stompClient.subscribe订阅目标(destination)发送的消息(广播接收信息) stompClient.subscribe('/mass/getResponse',function(response){ var message=JSON.parse(response.body); //展示广播的接收的内容接收 var response = $("#mass_div"); var userName=$("#selectName").val(); if(userName==message.name){ response.append("<div class='user-group'>" + " <div class='user-msg'>" + " <span class='user-reply'>"+message.chatValue+"</span>" + " <i class='triangle-user'></i>" + " </div>" +userName+ " </div>"); }else{ response.append(" <div class='admin-group'>"+ message.name+ "<div class='admin-msg'>"+ " <i class='triangle-admin'></i>"+ " <span class='admin-reply'>"+message.chatValue+"</span>"+ "</div>"+ "</div>"); } }); } //群发消息 function sendMassMessage(){ var postValue={ }; var chatValue=$("#sendChatValue"); var userName=$("#selectName").val(); postValue.name=userName; postValue.chatValue=chatValue.val(); //postValue.userId="0"; if(userName==1||userName==null){ alert("请选择你是谁!"); return; } if(chatValue==""||userName==null){ alert("不能发送空消息!"); return; } stompClient.send("/massRequest",{ },JSON.stringify(postValue)); chatValue.val(""); } //单独发消息 function sendAloneMessage(){ var postValue={ }; var chatValue=$("#sendChatValue2"); var userName=$("#selectName").val(); var sendToId=$("#selectName2").val(); var response = $("#alone_div"); postValue.name=userName;//发送者姓名 postValue.chatValue=chatValue.val();//聊天内容 postValue.userId=sendToId;//发送给谁 if(userName==1||userName==null){ alert("请选择你是谁!"); return; } if(sendToId==1||sendToId==null){ alert("请选择你要发给谁!"); return; } if(chatValue==""||userName==null){ alert("不能发送空消息!"); return; } stompClient.send("/aloneRequest",{ },JSON.stringify(postValue)); response.append("<div class='user-group'>" + " <div class='user-msg'>" + " <span class='user-reply'>"+chatValue.val()+"</span>" + " <i class='triangle-user'></i>" + " </div>" +userName+ " </div>"); chatValue.val(""); } //一对一,发起订阅 function stompQueue(){ var userId=$("#selectName").val(); //通过stompClient.subscribe订阅目标(destination)发送的消息(队列接收信息) stompClient.subscribe('/user/' + userId + '/alone', function(response){ var message=JSON.parse(response.body); //展示一对一的接收的内容接收 var response = $("#alone_div"); response.append(" <div class='admin-group'>"+ message.name+ "<div class='admin-msg'>"+ " <i class='triangle-admin'></i>"+ " <span class='admin-reply'>"+message.chatValue+"</span>"+ "</div>"+ "</div>"); }); }
WebSocketConfig
-
package cn.enjoyedu.stomp; import org.springframework.context.annotation.Configuration; import org.springframework.messaging.simp.config.MessageBrokerRegistry; import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker; import org.springframework.web.socket.config.annotation.StompEndpointRegistry; import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer; /** * @author Mark老师 享学课堂 https://enjoy.ke.qq.com * 往期课程和VIP课程咨询 依娜老师 QQ:2133576719 * 类说明: */ @Configuration /*开启使用Stomp协议来传输基于消息broker的消息 这时控制器支持使用@MessageMapping,就像使用@RequestMapping一样*/ @EnableWebSocketMessageBroker public class WebSocketConfig implements WebSocketMessageBrokerConfigurer { @Override public void registerStompEndpoints(StompEndpointRegistry registry) { /*注册STOMP协议的节点(endpoint),并映射指定的url, * 添加一个访问端点“/endpointMark”,客户端打开双通道时需要的url, * 允许所有的域名跨域访问,指定使用SockJS协议。*/ registry.addEndpoint("/endpointMark") .setAllowedOrigins("*") .withSockJS(); } @Override public void configureMessageBroker(MessageBrokerRegistry registry) { registry.enableSimpleBroker("/mass","/user"); registry.setUserDestinationPrefix("/user/"); } }
-
@EnableWebSocketMessageBroker
表示开启websocket支持
-
registry.addEndpoint("/endpointMark")
注册一个endPoint,也就是前端js中打开连接connect的对象
-
消息代理
registry.enableSimpleBroker("/mass","/user"); // 建立简单消息代理
registry.setUserDestinationPrefix("/user/"); // 关于单聊需要加前缀/user/
StompController
-
package cn.enjoyedu.stomp.web; import cn.enjoyedu.stomp.domain.ChatRoomRequest; import cn.enjoyedu.stomp.domain.ChatRoomResponse; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.handler.annotation.MessageMapping; import org.springframework.messaging.handler.annotation.SendTo; import org.springframework.messaging.simp.SimpMessagingTemplate; import org.springframework.messaging.simp.annotation.SendToUser; import org.springframework.stereotype.Controller; /* * *@author Mark老师 * *类说明:在线聊天室 */ @Controller public class StompController { @Autowired private SimpMessagingTemplate template;/*Spring实现的一个发送模板类*/ /*消息群发,接受发送至自massRequest的请求*/ @MessageMapping("/massRequest") @SendTo("/mass/getResponse") public ChatRoomResponse mass(ChatRoomRequest chatRoomRequest){ System.out.println("name = " + chatRoomRequest.getName()); System.out.println("chatValue = " + chatRoomRequest.getChatValue()); ChatRoomResponse response=new ChatRoomResponse(); response.setName(chatRoomRequest.getName()); response.setChatValue(chatRoomRequest.getChatValue()); return response; } /*单独聊天,接受发送至自aloneRequest的请求*/ @MessageMapping("/aloneRequest") public ChatRoomResponse alone(ChatRoomRequest chatRoomRequest){ System.out.println("SendToUser = " + chatRoomRequest.getUserId() +" FromName = " + chatRoomRequest.getName() +" ChatValue = " + chatRoomRequest.getChatValue()); ChatRoomResponse response=new ChatRoomResponse(); response.setName(chatRoomRequest.getName()); response.setChatValue(chatRoomRequest.getChatValue()); this.template.convertAndSendToUser(chatRoomRequest.getUserId()+"", "/alone",response); return response; } }
-
@SendTo("/mass/getResponse")
表示群聊消息要发送到哪个位置
-
this.template.convertAndSendToUser(chatRoomRequest.getUserId()+"",
“/alone”,response);这一步就拼装了js中一对一订阅的/user和/alone
原生websocket实现,不用stomp
- 和WebSocket的集成
WebSocketConfig
-
package cn.enjoyedu.ws; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.server.standard.ServerEndpointExporter; /** * @author Mark老师 * 往期课程和VIP课程咨询 依娜老师 QQ:2133576719 * 类说明:开启WebSocket支持 */ @Configuration public class WebSocketConfig { @Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); } }
WebSocketServer—WebSocket服务器端
-
package cn.enjoyedu.ws; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import javax.websocket.*; import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.atomic.AtomicInteger; /** *@author Mark老师 *往期课程和VIP课程咨询 依娜老师 QQ:2133576719 *类说明:提供WebSocket服务 */ @ServerEndpoint(value = "/ws/asset")/*将类定义成一个WebSocket服务器端*/ @Component public class WebSocketServer { private static Logger log = LoggerFactory.getLogger(WebSocketServer.class); private static final AtomicInteger onlineCount = new AtomicInteger(0); /*线程安全Set,用来存放每个客户端对应的Session对象。*/ private static CopyOnWriteArraySet<Session> sessionSet = new CopyOnWriteArraySet<Session>(); /*线程安全Map,用来存放每个客户端sessionid和用户名的对应关系。*/ private static Map<String,String> sessionMap = new ConcurrentHashMap<>(); /*** 连接建立成功调用的方法*/ @OnOpen public void onOpen(Session session) { //将用户session,session和用户名对应关系放入本地缓存 sessionSet.add(session); Map<String, List<String>> pathParameters = session.getRequestParameterMap(); String userId = pathParameters.get("toUserId").get(0); sessionMap.put(session.getId(),userId); log.info("有连接加入,当前连接数为:{}", onlineCount.incrementAndGet()); try { //通知所有用户有新用户上线 broadCastInfo("系统消息@^用户["+userId+"]加入群聊。"); } catch (IOException e) { e.printStackTrace(); } } /*** 连接关闭调用的方法*/ @OnClose public void onClose(Session session) { //将用户session,session和用户名对应关系从本地缓存移除 sessionSet.remove(session); Map<String, List<String>> pathParameters = session.getRequestParameterMap(); String userId = sessionMap.get(session.getId()); sessionMap.remove(session.getId()); int cnt = onlineCount.decrementAndGet(); log.info("有连接关闭,当前连接数为:{}", cnt); try { //通知所有用户有用户下线 broadCastInfo("系统消息@^用户["+userId+"]退出群聊。"); } catch (IOException e) { e.printStackTrace(); } } /*** 收到客户端消息后调用的方法*/ @OnMessage public void onMessage(String message, Session session) { log.info("来自客户端{}的消息:{}", sessionMap.get(session.getId()),message); if(message.startsWith("ToUser:")){ //这里可以实现一对一聊天sendMessageAlone(); }else{ //实现群聊 String msger = sessionMap.get(session.getId()); try { broadCastInfo(msger+"@^"+message); } catch (IOException e) { e.printStackTrace(); } } } /***出现错误时的处理*/ @OnError public void onError(Session session, Throwable error) { log.error("发生错误:{},Session ID: {}",error.getMessage(),session.getId()); error.printStackTrace(); } /*** 发送消息的基础方法*/ public static void basicSendMessage(Session session, String message) { try { session.getBasicRemote().sendText(message); } catch (IOException e) { log.error("发送消息出错:{}", e.getMessage()); e.printStackTrace(); } } /*** 群发消息*/ public static void broadCastInfo(String message) throws IOException { for (Session session : sessionSet) { if(session.isOpen()){ basicSendMessage(session, message); } } } /*** 指定Session发送消息*/ public static void sendMessageAlone(String sessionId, String message) throws IOException { Session session = null; for (Session s : sessionSet) { if(s.getId().equals(sessionId)){ session = s; break; } } if(session!=null){ basicSendMessage(session, message); } else{ log.warn("没有找到你指定ID的会话:{}",sessionId); } } }
-
实现用到的四个注解
-
@OnOpen
把session放到本地缓存中,并广播
-
@OnClose
-
@OnMessage
向群聊就群聊,向群发就群发
-
@OnError
-
-
为了实现群发
- /线程安全Set,用来存放每个客户端对应的Session对象。/
private static CopyOnWriteArraySet sessionSet
= new CopyOnWriteArraySet(); - /线程安全Map,用来存放每个客户端sessionid和用户名的对应关系。/
private static Map<String,String> sessionMap
= new ConcurrentHashMap<>();
- /线程安全Set,用来存放每个客户端对应的Session对象。/
netty集成websocket
- 数据帧就有6种,二进制、文本、关闭、心跳两种、续传的帧
服务器端
WebSocketServer
-
package cn.enjoyedu.nettyws.server; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.EventLoopGroup; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.util.SelfSignedCertificate; import io.netty.util.concurrent.ImmediateEventExecutor; /** * @author Mark老师 * 往期课程和VIP课程咨询 依娜老师 QQ:2133576719 * 类说明: */ public final class WebSocketServer { /*创建 DefaultChannelGroup,用来保存所 有已经连接的 WebSocket Channel,群发和一对一功能可以用上*/ private final static ChannelGroup channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE); static final boolean SSL = false;//是否启用ssl /*通过ssl访问端口为8443,否则为8080*/ static final int PORT = Integer.parseInt( System.getProperty("port", SSL? "8443" : "8080")); public static void main(String[] args) throws Exception { /*SSL配置*/ final SslContext sslCtx; if (SSL) { SelfSignedCertificate ssc = new SelfSignedCertificate(); sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build(); } else { sslCtx = null; } EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new WebSocketServerInitializer(sslCtx,channelGroup)); Channel ch = b.bind(PORT).sync().channel(); System.out.println("打开浏览器访问: " + (SSL? "https" : "http") + "://127.0.0.1:" + PORT + '/'); ch.closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
WebSocketServerInitializer
-
package cn.enjoyedu.nettyws.server; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.group.ChannelGroup; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler; import io.netty.handler.ssl.SslContext; /** * @author Mark老师 * 往期课程和VIP课程咨询 依娜老师 QQ:2133576719 * 类说明:增加handler */ public class WebSocketServerInitializer extends ChannelInitializer<SocketChannel> { /*websocket访问路径*/ private static final String WEBSOCKET_PATH = "/websocket"; private final ChannelGroup group; private final SslContext sslCtx; public WebSocketServerInitializer(SslContext sslCtx,ChannelGroup group) { this.sslCtx = sslCtx; this.group = group; } @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); if (sslCtx != null) { pipeline.addLast(sslCtx.newHandler(ch.alloc())); } // 编解码 pipeline.addLast(new HttpServerCodec()); // 报文包聚合 pipeline.addLast(new HttpObjectAggregator(65536)); /*支持ws数据的压缩传输*/ // websocket协议消息的压缩支持 pipeline.addLast(new WebSocketServerCompressionHandler()); // websocket协议handler pipeline.addLast(new WebSocketServerProtocolHandler(WEBSOCKET_PATH, null,true)); /*浏览器访问的时候展示index页面*/ pipeline.addLast(new ProcessWsIndexPageHandler(WEBSOCKET_PATH)); /*对WS的数据进行处理*/ // 实现群发的handler // group是什么呢?----ChannelGroup pipeline.addLast(new ProcesssWsFrameHandler(group)); } }
ProcesssWsFrameHandler
-
package cn.enjoyedu.nettyws.server; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.group.ChannelGroup; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Locale; /** * @author Mark老师 * 往期课程和VIP课程咨询 依娜老师 QQ:2133576719 * 类说明:对websocket的数据进行处理 */ public class ProcesssWsFrameHandler extends SimpleChannelInboundHandler<WebSocketFrame> { private final ChannelGroup group; public ProcesssWsFrameHandler(ChannelGroup group) { this.group = group; } private static final Logger logger = LoggerFactory.getLogger(ProcesssWsFrameHandler.class); @Override protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception { if(frame instanceof TextWebSocketFrame){ String request = ((TextWebSocketFrame)frame).text(); ctx.channel().writeAndFlush( new TextWebSocketFrame(request.toUpperCase(Locale.CHINA))); /*群发*/ group.writeAndFlush( new TextWebSocketFrame( "Client"+ctx.channel()+"say:"+request.toUpperCase(Locale.CHINA) )); }else{ throw new UnsupportedOperationException("unsupport data frame"); } } /*重写 userEventTriggered()方法以处理自定义事件*/ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { /*检查事件类型,如果是ws握手成功事件,群发通知*/ if(evt == WebSocketServerProtocolHandler. ServerHandshakeStateEvent.HANDSHAKE_COMPLETE){ group.writeAndFlush( new TextWebSocketFrame("Client"+ctx.channel()+" joined")); // 把新加的用户添加进来,不然新加的用户是收不到消息的 group.add(ctx.channel()); } } }
-
TextWebSocketFrame ---- netty定义好的文本帧
-
会发送两次
-
1.自己收到一份
ctx.channel().writeAndFlush(
new TextWebSocketFrame(request.toUpperCase(Locale.CHINA))); -
2.群发了一份
group.writeAndFlush(
new TextWebSocketFrame(
“Client”+ctx.channel()+“say:”+request.toUpperCase(Locale.CHINA)
));
-
ProcessWsIndexPageHandler
-
package cn.enjoyedu.nettyws.server; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.handler.codec.http.*; import io.netty.handler.ssl.SslHandler; import io.netty.util.CharsetUtil; import static io.netty.handler.codec.http.HttpMethod.GET; import static io.netty.handler.codec.http.HttpResponseStatus.*; import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; /** * @author Mark老师 * 往期课程和VIP课程咨询 依娜老师 QQ:2133576719 * 类说明:对http请求,将index的页面返回给前端 */ public class ProcessWsIndexPageHandler extends SimpleChannelInboundHandler<FullHttpRequest> { private final String websocketPath; public ProcessWsIndexPageHandler(String websocketPath) { this.websocketPath = websocketPath; } @Override protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest req) throws Exception { // 处理错误或者无法解析的http请求 if (!req.decoderResult().isSuccess()) { sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, BAD_REQUEST)); return; } //只允许Get请求 if (req.method() != GET) { sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, FORBIDDEN)); return; } // 发送index页面的内容 if ("/".equals(req.uri()) || "/index.html".equals(req.uri())) { //生成WebSocket的访问地址,写入index页面中 String webSocketLocation = getWebSocketLocation(ctx.pipeline(), req, websocketPath); System.out.println("WebSocketLocation:["+webSocketLocation+"]"); //生成index页面的具体内容,并送往浏览器 ByteBuf content = MakeIndexPage.getContent( webSocketLocation); FullHttpResponse res = new DefaultFullHttpResponse( HTTP_1_1, OK, content); res.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html; charset=UTF-8"); HttpUtil.setContentLength(res, content.readableBytes()); sendHttpResponse(ctx, req, res); } else { sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, NOT_FOUND)); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } /*发送应答*/ private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse res) { // 错误的请求进行处理 (code<>200). if (res.status().code() != 200) { ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8); res.content().writeBytes(buf); buf.release(); HttpUtil.setContentLength(res, res.content().readableBytes()); } // 发送应答. ChannelFuture f = ctx.channel().writeAndFlush(res); //对于不是长连接或者错误的请求直接关闭连接 if (!HttpUtil.isKeepAlive(req) || res.status().code() != 200) { f.addListener(ChannelFutureListener.CLOSE); } } /*根据用户的访问,告诉用户的浏览器,WebSocket的访问地址*/ private static String getWebSocketLocation(ChannelPipeline cp, HttpRequest req, String path) { String protocol = "ws"; if (cp.get(SslHandler.class) != null) { protocol = "wss"; } return protocol + "://" + req.headers().get(HttpHeaderNames.HOST) + path; } }
MakeIndexPage
-
package cn.enjoyedu.nettyws.server; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.util.CharsetUtil; /** * @author Mark老师 * 类说明:生成index页面的内容 */ public final class MakeIndexPage { private static final String NEWLINE = "\r\n"; public static ByteBuf getContent(String webSocketLocation) { return Unpooled.copiedBuffer( "<html><head><title>Web Socket Test</title></head>" + NEWLINE + "<body>" + NEWLINE + "<script type=\"text/javascript\">" + NEWLINE + "var socket;" + NEWLINE + "if (!window.WebSocket) {" + NEWLINE + " window.WebSocket = window.MozWebSocket;" + NEWLINE + '}' + NEWLINE + "if (window.WebSocket) {" + NEWLINE + " socket = new WebSocket(\"" + webSocketLocation + "\");" + NEWLINE + " socket.onmessage = function(event) {" + NEWLINE + " var ta = document.getElementById('responseText');" + NEWLINE + " ta.value = ta.value + '\\n' + event.data" + NEWLINE + " };" + NEWLINE + " socket.onopen = function(event) {" + NEWLINE + " var ta = document.getElementById('responseText');" + NEWLINE + " ta.value = \"Web Socket opened!\";" + NEWLINE + " };" + NEWLINE + " socket.onclose = function(event) {" + NEWLINE + " var ta = document.getElementById('responseText');" + NEWLINE + " ta.value = ta.value + \"Web Socket closed\"; " + NEWLINE + " };" + NEWLINE + "} else {" + NEWLINE + " alert(\"Your browser does not support Web Socket.\");" + NEWLINE + '}' + NEWLINE + NEWLINE + "function send(message) {" + NEWLINE + " if (!window.WebSocket) { return; }" + NEWLINE + " if (socket.readyState == WebSocket.OPEN) {" + NEWLINE + " socket.send(message);" + NEWLINE + " } else {" + NEWLINE + " alert(\"The socket is not open.\");" + NEWLINE + " }" + NEWLINE + '}' + NEWLINE + "</script>" + NEWLINE + "<form οnsubmit=\"return false;\">" + NEWLINE + "<input type=\"text\" name=\"message\" " + "value=\"Hello, World!\"/>" + "<input type=\"button\" value=\"Send Web Socket Data\"" + NEWLINE + " οnclick=\"send(this.form.message.value)\" />" + NEWLINE + "<h3>Output</h3>" + NEWLINE + "<textarea id=\"responseText\" " + "style=\"width:500px;height:300px;\"></textarea>" + NEWLINE + "</form>" + NEWLINE + "</body>" + NEWLINE + "</html>" + NEWLINE, CharsetUtil.US_ASCII); } }
客户端
WebSocketClient
-
package cn.enjoyedu.nettyws.client; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.http.DefaultHttpHeaders; import io.netty.handler.codec.http.HttpClientCodec; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.websocketx.*; import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketClientCompressionHandler; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; import java.io.BufferedReader; import java.io.InputStreamReader; import java.net.URI; /** * @author Mark老师 * 往期课程和VIP课程咨询 依娜老师 QQ:2133576719 * 类说明: */ /** 这是WebSocket客户端的示例。 要运行此示例,需要兼容的WebSocket服务器。 因此,可以通过运行cn.enjoyedu.server.WebSocketServer来启动WebSocket服务器, */ public final class WebSocketClient { static final String URL = System.getProperty("url", "ws://127.0.0.1:8080/websocket"); static final String SURL = System.getProperty("url", "wss://127.0.0.1:8443/websocket"); public static void main(String[] args) throws Exception { URI uri = new URI(URL); String scheme = uri.getScheme() == null? "ws" : uri.getScheme(); final String host = uri.getHost() == null? "127.0.0.1" : uri.getHost(); final int port = uri.getPort(); if (!"ws".equalsIgnoreCase(scheme) && !"wss".equalsIgnoreCase(scheme)) { System.err.println("Only WS(S) is supported."); return; } final boolean ssl = "wss".equalsIgnoreCase(scheme); final SslContext sslCtx; if (ssl) { sslCtx = SslContextBuilder.forClient() .trustManager(InsecureTrustManagerFactory.INSTANCE).build(); } else { sslCtx = null; } EventLoopGroup group = new NioEventLoopGroup(); try { // Connect with V13 (RFC 6455 aka HyBi-17). You can change it to V08 or V00. // If you change it to V00, ping is not supported and remember to change // HttpResponseDecoder to WebSocketHttpResponseDecoder in the pipeline. final WebSocketClientHandler handler = new WebSocketClientHandler( WebSocketClientHandshakerFactory .newHandshaker( uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders())); Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { ChannelPipeline p = ch.pipeline(); if (sslCtx != null) { p.addLast(sslCtx.newHandler(ch.alloc(), host, port)); } p.addLast( //http协议为握手必须 new HttpClientCodec(), new HttpObjectAggregator(8192), //支持WebSocket数据压缩 WebSocketClientCompressionHandler.INSTANCE, handler); } }); //连接服务器 Channel ch = b.connect(uri.getHost(), port).sync().channel(); //等待握手完成 handler.handshakeFuture().sync(); BufferedReader console = new BufferedReader( new InputStreamReader(System.in)); while (true) { String msg = console.readLine(); if (msg == null) { break; } else if ("bye".equals(msg.toLowerCase())) { ch.writeAndFlush(new CloseWebSocketFrame()); ch.closeFuture().sync(); break; } else if ("ping".equals(msg.toLowerCase())) { WebSocketFrame frame = new PingWebSocketFrame( Unpooled.wrappedBuffer(new byte[] { 8, 1, 8, 1 })); ch.writeAndFlush(frame); } else { WebSocketFrame frame = new TextWebSocketFrame(msg); ch.writeAndFlush(frame); } } } finally { group.shutdownGracefully(); } } }
WebSocketClientHandler
-
package cn.enjoyedu.nettyws.client; import io.netty.channel.*; import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.websocketx.*; import io.netty.util.CharsetUtil; /** * @author Mark老师 * 往期课程和VIP课程咨询 依娜老师 QQ:2133576719 * 类说明: */ public class WebSocketClientHandler extends SimpleChannelInboundHandler<Object> { //负责和服务器进行握手 private final WebSocketClientHandshaker handshaker; //握手的结果 private ChannelPromise handshakeFuture; public WebSocketClientHandler(WebSocketClientHandshaker handshaker) { this.handshaker = handshaker; } public ChannelFuture handshakeFuture() { return handshakeFuture; } //当前Handler被添加到ChannelPipeline时, // new出握手的结果的实例,以备将来使用 @Override public void handlerAdded(ChannelHandlerContext ctx) { handshakeFuture = ctx.newPromise(); } //通道建立,进行握手 @Override public void channelActive(ChannelHandlerContext ctx) { handshaker.handshake(ctx.channel()); } //通道关闭 @Override public void channelInactive(ChannelHandlerContext ctx) { System.out.println("WebSocket Client disconnected!"); } //读取数据 @Override public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { Channel ch = ctx.channel(); //握手未完成,完成握手 if (!handshaker.isHandshakeComplete()) { try { handshaker.finishHandshake(ch, (FullHttpResponse) msg); System.out.println("WebSocket Client connected!"); handshakeFuture.setSuccess(); } catch (WebSocketHandshakeException e) { System.out.println("WebSocket Client failed to connect"); handshakeFuture.setFailure(e); } return; } //握手已经完成,升级为了websocket,不应该再收到http报文 if (msg instanceof FullHttpResponse) { FullHttpResponse response = (FullHttpResponse) msg; throw new IllegalStateException( "Unexpected FullHttpResponse (getStatus=" + response.status() + ", content=" + response.content().toString(CharsetUtil.UTF_8) + ')'); } //处理websocket报文 WebSocketFrame frame = (WebSocketFrame) msg; if (frame instanceof TextWebSocketFrame) { TextWebSocketFrame textFrame = (TextWebSocketFrame) frame; System.out.println("WebSocket Client received message: " + textFrame.text()); } else if (frame instanceof PongWebSocketFrame) { System.out.println("WebSocket Client received pong"); } else if (frame instanceof CloseWebSocketFrame) { System.out.println("WebSocket Client received closing"); ch.close(); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); if (!handshakeFuture.isDone()) { handshakeFuture.setFailure(cause); } ctx.close(); } }