前言
工业物联网是一个范围很大的概念,本文从数据可视化的角度介绍了一个最小化的工业物联网平台,从 Modbus 数据采集到前端数据可视化呈现的基本实现思路。这里面主要涉及基于 Modbus 通讯规约的数据采集、后台实时数据处理、前端实时数据接收、前端实时数据可视化显示。物联网平台架构主要参考了图扑物联工业物联网平台,并从中提取了部分功能进行介绍,前端数据可视化采用的是HT for Web。
由于内容比较多,具体实现上涉及到前端工程师、后台工程师、数据采集工程师等多个开发角色的参与,所以本文重点介绍实现思路和 WebSocket 消息推送的实现,其它环节的具体实现细节作者会在其它文章中进行详细介绍。
一、物联网平台架构
物联网平台主要是B/S模式,工业物联网平台大都采用的是微服务架构,本文主要涉及两个微服务:前置数据采集服务和 Web 实时消息推送服务。
前置数据采集服务主要用于现场设备、仪器、仪表、传感器实时数据的采集,IoTopo工业物联网平台支持MQTT和透传云解析两种方式,透传云解析支持 Modbus 通讯规约。
实时数据采集到平台后,需要推送到浏览器端进行显示,Web 实时消息推送服务采用 Web Socket 进行实时数据推送,可以确保数据的实时性和高效性。
前端可视化技术采用的是HT for Web, HT for Web 是基于HTML5标准的企业应用图形界面一站式解决方案,其包含通用组件、拓扑组件和3D渲染引擎等丰富的图形界面开发类库。虽然 HT for Web 是商业软件但其提供的一站式解决方案可以极大缩短产品开发周期、减少研发成本、补齐我们在 Web 图形界面可视化技术上的短板。
二、Modbus 数据采集
Modbus是一种串行通信协议,是Modicon公司(现在的施耐德电气Schneider Electric)于1979年为使用可编程逻辑控制器(PLC)通信而发表。Modbus已经成为工业领域通信协议的业界标准,并且现在是工业电子设备之间常用的连接方式。Modbus比其他通信协议使用的更广泛的主要原因有:
- 公开发表并且无版权要求
- 易于部署和维护
- 对供应商来说,修改移动本地的比特或字节没有很多限制
Modbus允许多个 (大约240个) 设备连接在同一个网络上进行通信,举个例子,一个由测量温度和湿度的装置,并且将结果发送给计算机。在数据采集与监视控制系统(SCADA)中,Modbus通常用来连接监控计算机和远程终端控制系统(RTU)。
目前主流的编辑语言都有 Modbus 开发库,由于 Modbus 相对比较简单,很多企业也选择自行开发实现。Modbus 数据采集属于后台通讯,数据采集到平台后首先会进行数据清理和预处理,过滤掉冗余和无效数据,形成实时数据。平台获取到实时数据后一般会做 3 项工作:
1. 推送到 Web 前端进行显示
2. 存储到时序数据库
3. 判断是否产生告警
三、将实时数据推送到 Web 前端
基于 Web 的实时数据推送需要用到 WebSocket,初学者可以学习阮一峰老师的 WebSocket 教程。我们基于 WebSocket 封装了一套消息传输协议,类似于一个消息中间件,前端部分可以订阅实时数据。考虑到海量实时数据的推送需求,将实时数据分为平台级、站点级、设备级,前端在订阅实时数据时,可以通过消息主题规则订阅不同级别的数据。平台侧在收到订阅请求时,可以主动推送一次实时数据。这样可以确保数据可视化界面在订阅实时数据成功后,第一时间显示出正确的界面。
下面给出一个简化的 WebSocket 消息协议的客户端代码,大家可以在些基础上进行改造以适合自己的业务场景。
消息主题正则表达式,用来匹配消息主题:
1 const matchWildcard = function(str, rule) {
2 return new RegExp('^' + rule.split('*').join('.*') + '$').test(str) 3 }
WebSocket 客户端,支持消息主题订阅、取消消息主题订阅、同一个消息主题支持多个订阅者:
1 class WebSocketClient {
2 constructor() {
3 this.ws = null 4 this.opts = { 5 debug: false, 6 autoReconnect: true, 7 reconnectInterval: 10000, 8 subscriber: {}, 9 } 10 this.opened = false 11 } 12 13 connect() { 14 if (!this.opened) { 15 return 16 } 17 18 const url = 'ws://www.iotopo.com/msg/v1' 19 console.debug('websocket connect', url) 20 21 let ws = this.ws = new WebSocket(url) 22 ws.onmessage = event => { 23 if (this.opts.debug) { 24 console.log(event) 25 } 26 let data = JSON.parse(event.data) 27 28 for (let topic in this.opts.subscriber) { 29 if (matchWildcard(data.topic, topic)) { 30 let listeners = this.opts.subscriber[topic] 31 if (Array.isArray(listeners)) { 32 listeners.forEach(cb => { 33 if (typeof cb === 'function') { 34 cb(data.payload) 35 } 36 }) 37 } 38 } 39 } 40 } 41 ws.onopen = e => { 42 if (this.opts.debug) { 43 console.log(e) 44 } 45 // 执行订阅请求 46 for (let topic in this.opts.subscriber) { 47 this._sendSubscribe(topic) 48 } 49 if (typeof this.opts.onopen === 'function') { 50 this.opts.onopen(e) 51 } 52 } 53 ws.onclose = e => { 54 if (this.opts.debug) { 55 console.log(e) 56 } 57 if (typeof this.opts.onclose === 'function') { 58 this.opts.onclose(e) 59 } 60 if (this.opened && this.opts.autoReconnect) { 61 setTimeout(() => { 62 this.connect() 63 }, this.opts.reconnectInterval) 64 } 65 } 66 ws.onerror = e => { 67 if (this.opts.debug) { 68 console.log(e) 69 } 70 if (typeof this.opts.onerror === 'function') { 71 this.opts.onerror(e) 72 } 73 } 74 } 75 76 open(opts) { 77 if (!this.opened) { 78 Object.assign(this.opts, opts || {}) 79 this.opened = true 80 this.connect() 81 } 82 } 83 84 close() { 85 this.opened = false 86 if (this.ws !== null) { 87 this.ws.close() 88 } 89 this.ws = null 90 } 91 92 isOpened() { 93 return this.opened 94 } 95 96 isConnected() { 97 return this.ws !== null 98 } 99 100 _sendSubscribe(topic) { 101 if (this.ws === null) { 102 return Error('websocet not opened') 103 } 104 if (typeof topic !== 'string') { 105 return Error('topic should be a string value') 106 } 107 108 if (this.ws.readyState === WebSocket.OPEN) { 109 let msg = { 110 type: 'subscribe', 111 topic: topic, 112 } 113 this.ws.send(JSON.stringify(msg)) 114 } else { 115 return Error('websocet not connected') 116 } 117 } 118 119 subscribe(topic, cb) { 120 if (this.opts.debug) { 121 console.log('subscribe:', topic) 122 } 123 let listeners = this.opts.subscriber[topic] 124 if (!Array.isArray(listeners)) { 125 listeners = [ 126 cb 127 ] 128 this.opts.subscriber[topic] = listeners 129 } else { 130 listeners.push(cb) 131 } 132 this._sendSubscribe(topic) 133 134 return { topic, cb } 135 } 136 137 unsubscribe({topic, cb}) { 138 if (this.opts.debug) { 139 console.log('unsubscribe:', topic) 140 } 141 142 if (this.ws === null) { 143 return Error('websocet not opened') 144 } 145 146 if (typeof topic !== 'string') { 147 return Error('topic should be a string value') 148 } 149 150 let listeners = this.opts.subscriber[topic] 151 if (cb) { 152 if (Array.isArray(listeners)) { 153 let idx = listeners.indexOf(cb) 154 if (idx >= 0) { 155 listeners.splice(idx, 1) 156 } 157 } 158 } else { 159 delete this.opts.subscriber[topic] 160 } 161 162 if (Array.isArray(listeners) && listeners == 0) { 163 if (this.ws.readyState === WebSocket.OPEN) { 164 let msg = { 165 type: 'unsubscribe', 166 topic: topic, 167 } 168 this.ws.send(JSON.stringify(msg)) 169 } else { 170 return Error('websocet not connected') 171 } 172 } 173 } 174 }
用法举例:
1 // 初始化客户端
2 const ws = new WebSocketClient() 3 // 与 WebSocket 服务器建议连接 4 ws.open({ 5 debug: false 6 }) 7 // 订阅消息 8 ws.subscribe('/foo/bar/*', function(msg) { 9 console.log('recv ws msg:', msg) 10 })
四、数据可视化界面实现
基于 HT for Web 可以简单快速地搭建一个符合 HTML5 标准的可视化图形界面,通过 WebSocket 订阅实时数据,然后驱动图形界面的变化。数据驱动图形界面变化的实现方式很多,基本方法是采用数据绑定的方式,具体可以参考 HT for Web 的官方文档。
在后面的文章中,作者会介绍一种基于 HT for Web 实现的业务数据和图形数据分离的数据绑定方法。