「这是我参与11月更文挑战的第16天,活动详情查看:2021最后一次更文挑战」
一、主流服务器推送技术说明
1.1 需求与背景
若干年前,所有的请求都是由浏览器端发起,浏览器本身并没有接受请求的能力。所以一些特殊需求都是用ajax轮询的方式来实现的。
比如对于某些需要实时更新的数据(例如Facebook/Twitter 更新、股价更新、新的博文、赛事结果等)。又例如我们通过页面启动一个定时任务,前端想知道任务后台的的实时运行状态,就需要以较小的间隔,频繁的向服务器建立http连接询问定时任务是否完成,然后更新页面状态。但这样做的后果就是浪费大量流量,对服务端造成了巨大压力。
在html5被广泛推广之后,我们可以使用服务端主动推动数据的方式来,解决上面提到的问题。
1.2 服务端推送常用技术
1、全双工通信:WebSocket全双工的,全双工就是双向通信,http协议是“对讲机”之间的通话,那我们websocket就是移动电话。本质上是一个额外的tcp连接,建立和关闭时握手使用http协议,其他数据传输不使用http协议 ,更加复杂一些,比较适用于需要进行复杂双向实时数据通讯的场景。 2、服务端主动推送:SSE (Server Send Event)html5新标准,用来从服务端实时推送数据到浏览器端, 直接建立在当前http连接上,本质上是保持一个http长连接,轻量协议 。客户端发送一个请求到服务端 ,服务端保持这个请求直到一个新的消息准备好,将消息返回至客户端,此时不关闭连接,仍然保持它,供其它消息使用。SSE的一大特色就是重复利用一个连接来处理每一个消息(又称event)。
1.3 websocket与SSE比较
是否基于新协议 | 是否双向通信 | 是否支持跨域 | 编码难度 | |
---|---|---|---|---|
SSE | 否(Http ) |
否(服务器单向) | 否(Firefox 支持跨域) | 低 |
WebSocket | 是(ws ) |
是 | 是 | 略高 |
但是IE和Edge浏览器不支持SSE。 |
二、服务端推送事件SSE
2.1 模拟网络支付场景
ssetest.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>SSE</title>
</head>
<body>
<div id = "message"></div>
<script>
if (window.EventSource) {
var source = new EventSource('orderpay');
innerHTML = '';
source.addEventListener('message', function(e) {
innerHTML += e.data + "<br/>";
document.getElementById("message").innerHTML = innerHTML;
});
source.addEventListener('open', function(e) {
console.log("连接打开.");
}, false);
// 响应finish事件,主动关闭EventSource
source.addEventListener('finish', function(e) {
console.log("数据接收完毕,关闭EventSource");
source.close();
console.log(e);
}, false);
source.addEventListener('error', function(e) {
if (e.readyState == EventSource.CLOSED) {
console.log("连接关闭");
} else {
console.log(e);
}
}, false);
} else {
console.log("你的浏览器不支持SSE");
}
</script>
</body>
</html>
复制代码
@Controller
@RequestMapping("sse")
public class SSEControler {
public static final ConcurrentHashMap<Long,SseEmitter> sseEmitters = new ConcurrentHashMap<>();
@GetMapping("/test")
public String ssetest(){
return "ssetest";
}
@GetMapping("/orderpay")
public SseEmitter orderpay(){
Long payRecordId = 1L;
//设置默认的超时时间60秒
final SseEmitter emitter = new SseEmitter(60 * 1000L);
try {
System.out.println("连接建立成功");
//TODO 这里可以做一些订单保存的操作
sseEmitters.put(payRecordId,emitter);
}catch (Exception e){
emitter.completeWithError(e);
}
return emitter;
}
@GetMapping("/payback")
public @ResponseBody String payback (){
SseEmitter emitter = sseEmitters.get(1L);
try {
emitter.send("支付成功");
System.out.println("发送finish事件");
emitter.send(SseEmitter.event().name("finish").id("6666").data("哈哈"));
System.out.println("调用complete");
emitter.complete();
} catch (IOException e) {
emitter.completeWithError(e);
}
return "ok";
}
}
复制代码
对连接超时异常进行全局处理
@ExceptionHandler(AsyncRequestTimeoutException.class)
@ResponseBody
public String handleAsyncRequestTimeoutException(AsyncRequestTimeoutException e) {
return SseEmitter.event().data("timeout!!").build().stream()
.map(d -> d.getData().toString())
.collect(Collectors.joining());
}
复制代码
2.2访问测试
http://localhost:8888/sse/test
http://localhost:8888/sse/payback
三、双向实时通信websocket
3.1 整合websocket
<!-- 引入websocket依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
复制代码
开启websocket功能
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
复制代码
3.2 websocket 用法实验
WebSocketServer内容的一些核心代码,websocket服务端代码
- @ServerEndpoint(value = "/ws/asset")表示websocket的接口服务地址
- @OnOpen注解的方法,为连接建立成功时调用的方法
- @OnClose注解的方法,为连接关闭调用的方法
- @OnMessage注解的方法,为收到客户端消息后调用的方法
- @OnError注解的方法,为出现异常时调用的方法
/**
* WebSocket服务端示例
*/
@Component
@Slf4j
@ServerEndpoint(value = "/ws/asset")
public class WebSocketServer {
private static final AtomicInteger OnlineCount = new AtomicInteger(0);
// concurrent包的线程安全Set,用来存放每个客户端对应的Session对象。
private static CopyOnWriteArraySet<Session> SessionSet = new CopyOnWriteArraySet<>();
/**
* 连接建立成功调用的方法
*/
@OnOpen
public void onOpen(Session session) throws IOException {
SessionSet.add(session);
int cnt = OnlineCount.incrementAndGet(); // 在线数加1
log.info("有连接加入,当前连接数为:{}", cnt);
SendMessage(session, "连接成功");
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose(Session session) {
SessionSet.remove(session);
int cnt = OnlineCount.decrementAndGet();
log.info("有连接关闭,当前连接数为:{}", cnt);
}
/**
* 收到客户端消息后调用的方法
* @param message 客户端发送过来的消息
*/
@OnMessage
public void onMessage(String message, Session session) throws IOException {
log.info("来自客户端的消息:{}",message);
SendMessage(session, "收到消息,消息内容:"+message);
}
/**
* 出现错误
*/
public void onError(Session session, Throwable error) {
log.error("发生错误:{},Session ID: {}",error.getMessage(),session.getId());
}
/**
* 发送消息,实践表明,每次浏览器刷新,session会发生变化。
* @param session session
* @param message 消息
*/
private static void SendMessage(Session session, String message) throws IOException {
session.getBasicRemote().sendText(String.format("%s (From Server,Session ID=%s)",message,session.getId()));
}
/**
* 群发消息
* @param message 消息
*/
public static void BroadCastInfo(String message) throws IOException {
for (Session session : SessionSet) {
if(session.isOpen()){
SendMessage(session, message);
}
}
}
/**
* 指定Session发送消息
* @param sessionId sessionId
* @param message 消息
*/
public static void SendMessage(String sessionId,String message) throws IOException {
Session session = null;
for (Session s : SessionSet) {
if(s.getId().equals(sessionId)){
session = s;
break;
}
}
if(session!=null){
SendMessage(session, message);
} else{
log.warn("没有找到你指定ID的会话:{}",sessionId);
}
}
}
复制代码
客户端代码,做几次实验,自然明了代码的意思,先不要看代码,先看效果。public/wstest/html
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>websocket测试</title>
<style type="text/css">
h3,h4{
text-align:center;
}
</style>
</head>
<body>
<h3>WebSocket测试,在<span style="color:red">控制台</span>查看测试信息输出!</h3>
<h4>
<br>
http://localhost:8888/api/ws/sendOne?message=单发消息内容&id=none
<br>
http://localhost:8888/api/ws/sendAll?message=群发消息内容
</h4>
<h3>请输入要发送给服务器端的消息:</h3><br/>
<input id="text" type="text" />
<button onclick="sendToServer()">发送服务器消息</button>
<button onclick="closeWebSocket()">关闭连接</button>
<br>信息:<span id="message"></span>
<script type="text/javascript">
var socket;
if (typeof (WebSocket) == "undefined") {
console.log("遗憾:您的浏览器不支持WebSocket");
} else {
socket = new WebSocket("ws://localhost:8888/ws/asset");
//连接打开事件
socket.onopen = function() {
console.log("Socket 已打开");
socket.send("消息发送测试(From Client)");
};
//收到消息事件
socket.onmessage = function(msg) {
document.getElementById('message').innerHTML += msg.data + '<br/>';
};
//连接关闭事件
socket.onclose = function() {
console.log("Socket已关闭");
};
//发生了错误事件
socket.onerror = function() {
alert("Socket发生了错误");
}
//窗口关闭时,关闭连接
window.unload=function() {
socket.close();
};
}
//关闭连接
function closeWebSocket(){
socket.close();
}
//发送消息给服务器
function sendToServer(){
var message = document.getElementById('text').value;
socket.send(message);
}
</script>
</body>
</html>
复制代码
测试页面
http://localhost:8888/wstest.html
3.3 服务端广播与指定session消息发送
@RestController
@RequestMapping("/api/ws")
public class WebSocketController {
/**
* 群发消息内容
* @param message 消息内容
*/
@RequestMapping(value="/sendAll", method=RequestMethod.GET)
AjaxResponse sendAllMessage(@RequestParam String message){
try {
WebSocketServer.BroadCastInfo(message);
} catch (IOException e) {
throw new CustomException(CustomExceptionType.SYSTEM_ERROR,"群发消息失败");
}
return AjaxResponse.success();
}
/**
* 指定会话ID发消息
* @param message 消息内容
* @param id 连接会话ID
*/
@RequestMapping(value="/sendOne", method=RequestMethod.GET)
AjaxResponse sendOneMessage(@RequestParam String message,@RequestParam String id){
try {
WebSocketServer.SendMessage(id,message);
} catch (IOException e) {
throw new CustomException(CustomExceptionType.SYSTEM_ERROR,"指定会话ID发消息失败");
}
return AjaxResponse.success();
}
}
复制代码