MQTT是非常流行的IOT和IIOT通信协议,而OPC UA的PubSub功能可以与MQTT无缝连接,本文基于open62541讲解如何让OPC UA Server向MQTT服务器发送消息。
本文不会讲述MQTT的原理,具体可以网上搜索。
一 原理
发布者和订阅者之间通过Broker(代理)进行解耦和通信,原理图如下,
对于Broker来说,发布者和订阅者都是client
具体到OPC UA,则实现如下,
PS:OPC UA应用可以像常规的MQTT Client一样进行通信,非常强大!
发布者向某个topic发布消息,订阅这个topic的订阅者就可以获得消息。
二 安装mosquitto
mosquitto是一个开源的MQTT库,可以作为MQTT服务器使用。
本文使用的是debian10,ubuntu也是一样(windows安装可以去网上搜索)
sudo apt update
sudo apt install mosquitto mosquitto-clients
安装完成之后mosquitto就自动运行了,使用下面命令进行查看,
ps -A | grep mosquitto
其默认使用的端口号是1883
简单测试:
- 打开一个终端,输入mosquitto_sub -t GARY进行订阅
- 再打开一个终端,输入mosquitto_pub -t GARY -m "hello"进行发布
可以在mosquitto_sub那个终端下打印收到的消息 — hello
其它相关操作可以参考这篇文章,
三 配置open62541
打开open62541源码目录下的CMakeLists.txt,把以下2个option设置为ON,
- UA_ENABLE_PUBSUB
- UA_ENABLE_PUBSUB_MQTT
然后进行编译,创建build目录,然后cd进入,接着执行下面命令,
cmake .. && make
在build目录下生成库libopen62541.a和目录src_generated
四 代码和使用
代码来自open62541自带example,tutorial_pubsub_mqtt_publish.c,
/* This work is licensed under a Creative Commons CCZero 1.0 Universal License.
* See http://creativecommons.org/publicdomain/zero/1.0/ for more information. */
/**
* .. _pubsub-tutorial:
*
* Working with Publish/Subscribe
* ------------------------------
*
* Work in progress: This Tutorial will be continuously extended during the next
* PubSub batches. More details about the PubSub extension and corresponding
* open62541 API are located here: :ref:`pubsub`.
*
* Publishing Fields
* ^^^^^^^^^^^^^^^^^
* The PubSub MQTT publish example demonstrate the simplest way to publish
* informations from the information model over MQTT using the UADP (or later
* JSON) encoding. To receive information the subscribe functionality of mqtt is
* used. A periodical call to yield is necessary to update the mqtt stack.
*
* **Connection handling**
* PubSubConnections can be created and deleted on runtime. More details about
* the system preconfiguration and connection can be found in
* ``tutorial_pubsub_connection.c``.
*/
#include "open62541/server.h"
#include "open62541/server_config_default.h"
#include "ua_pubsub.h"
#include "ua_network_pubsub_mqtt.h"
#include "open62541/plugin/log_stdout.h"
#include <signal.h>
#define CONNECTION_NAME "MQTT Publisher Connection"
#define TRANSPORT_PROFILE_URI "http://opcfoundation.org/UA-Profile/Transport/pubsub-mqtt"
#define MQTT_CLIENT_ID "TESTCLIENTPUBSUBMQTT"
#define CONNECTIONOPTION_NAME "mqttClientId"
#define PUBLISHER_TOPIC "customTopic"
#define PUBLISHER_METADATAQUEUENAME "MetaDataTopic"
#define PUBLISHER_METADATAUPDATETIME 0
#define BROKER_ADDRESS_URL "opc.mqtt://127.0.0.1:1883"
#define PUBLISH_INTERVAL 500
static UA_Boolean useJson = false;
static UA_NodeId connectionIdent;
static UA_NodeId publishedDataSetIdent;
static UA_NodeId writerGroupIdent;
static void
addPubSubConnection(UA_Server *server, char *addressUrl) {
/* Details about the connection configuration and handling are located
* in the pubsub connection tutorial */
UA_PubSubConnectionConfig connectionConfig;
memset(&connectionConfig, 0, sizeof(connectionConfig));
connectionConfig.name = UA_STRING(CONNECTION_NAME);
connectionConfig.transportProfileUri = UA_STRING(TRANSPORT_PROFILE_URI);
connectionConfig.enabled = UA_TRUE;
/* configure address of the mqtt broker (local on default port) */
UA_NetworkAddressUrlDataType networkAddressUrl = {
UA_STRING_NULL , UA_STRING(addressUrl)};
UA_Variant_setScalar(&connectionConfig.address, &networkAddressUrl,
&UA_TYPES[UA_TYPES_NETWORKADDRESSURLDATATYPE]);
/* Changed to static publisherId from random generation to identify
* the publisher on Subscriber side */
connectionConfig.publisherId.numeric = 2234;
/* configure options, set mqtt client id */
UA_KeyValuePair connectionOptions[1];
connectionOptions[0].key = UA_QUALIFIEDNAME(0, CONNECTIONOPTION_NAME);
UA_String mqttClientId = UA_STRING(MQTT_CLIENT_ID);
UA_Variant_setScalar(&connectionOptions[0].value, &mqttClientId, &UA_TYPES[UA_TYPES_STRING]);
connectionConfig.connectionProperties = connectionOptions;
connectionConfig.connectionPropertiesSize = 1;
UA_Server_addPubSubConnection(server, &connectionConfig, &connectionIdent);
}
/**
* **PublishedDataSet handling**
* The PublishedDataSet (PDS) and PubSubConnection are the toplevel entities and
* can exist alone. The PDS contains the collection of the published fields. All
* other PubSub elements are directly or indirectly linked with the PDS or
* connection.
*/
static void
addPublishedDataSet(UA_Server *server) {
/* The PublishedDataSetConfig contains all necessary public
* informations for the creation of a new PublishedDataSet */
UA_PublishedDataSetConfig publishedDataSetConfig;
memset(&publishedDataSetConfig, 0, sizeof(UA_PublishedDataSetConfig));
publishedDataSetConfig.publishedDataSetType = UA_PUBSUB_DATASET_PUBLISHEDITEMS;
publishedDataSetConfig.name = UA_STRING("Demo PDS");
/* Create new PublishedDataSet based on the PublishedDataSetConfig. */
UA_Server_addPublishedDataSet(server, &publishedDataSetConfig, &publishedDataSetIdent);
}
/**
* **DataSetField handling**
* The DataSetField (DSF) is part of the PDS and describes exactly one published field.
*/
static void
addDataSetField(UA_Server *server) {
/* Add a field to the previous created PublishedDataSet */
UA_DataSetFieldConfig dataSetFieldConfig;
memset(&dataSetFieldConfig, 0, sizeof(UA_DataSetFieldConfig));
dataSetFieldConfig.dataSetFieldType = UA_PUBSUB_DATASETFIELD_VARIABLE;
dataSetFieldConfig.field.variable.fieldNameAlias = UA_STRING("Server localtime");
dataSetFieldConfig.field.variable.promotedField = UA_FALSE;
dataSetFieldConfig.field.variable.publishParameters.publishedVariable =
UA_NODEID_NUMERIC(0, UA_NS0ID_SERVER_SERVERSTATUS_CURRENTTIME);
dataSetFieldConfig.field.variable.publishParameters.attributeId = UA_ATTRIBUTEID_VALUE;
UA_Server_addDataSetField(server, publishedDataSetIdent, &dataSetFieldConfig, NULL);
}
/**
* **WriterGroup handling**
* The WriterGroup (WG) is part of the connection and contains the primary configuration
* parameters for the message creation.
*/
static UA_StatusCode
addWriterGroup(UA_Server *server, char *topic, int interval) {
UA_StatusCode retval = UA_STATUSCODE_GOOD;
/* Now we create a new WriterGroupConfig and add the group to the existing PubSubConnection. */
UA_WriterGroupConfig writerGroupConfig;
memset(&writerGroupConfig, 0, sizeof(UA_WriterGroupConfig));
writerGroupConfig.name = UA_STRING("Demo WriterGroup");
writerGroupConfig.publishingInterval = interval;
writerGroupConfig.enabled = UA_FALSE;
writerGroupConfig.writerGroupId = 100;
UA_UadpWriterGroupMessageDataType *writerGroupMessage;
/* decide whether to use JSON or UADP encoding*/
#ifdef UA_ENABLE_JSON_ENCODING
UA_JsonWriterGroupMessageDataType *Json_writerGroupMessage;
if(useJson) {
writerGroupConfig.encodingMimeType = UA_PUBSUB_ENCODING_JSON;
writerGroupConfig.messageSettings.encoding = UA_EXTENSIONOBJECT_DECODED;
writerGroupConfig.messageSettings.content.decoded.type = &UA_TYPES[UA_TYPES_JSONWRITERGROUPMESSAGEDATATYPE];
/* The configuration flags for the messages are encapsulated inside the
* message- and transport settings extension objects. These extension
* objects are defined by the standard. e.g.
* UadpWriterGroupMessageDataType */
Json_writerGroupMessage = UA_JsonWriterGroupMessageDataType_new();
/* Change message settings of writerGroup to send PublisherId,
* DataSetMessageHeader, SingleDataSetMessage and DataSetClassId in PayloadHeader
* of NetworkMessage */
Json_writerGroupMessage->networkMessageContentMask =
(UA_JsonNetworkMessageContentMask)(UA_JSONNETWORKMESSAGECONTENTMASK_NETWORKMESSAGEHEADER |
(UA_JsonNetworkMessageContentMask)UA_JSONNETWORKMESSAGECONTENTMASK_DATASETMESSAGEHEADER |
(UA_JsonNetworkMessageContentMask)UA_JSONNETWORKMESSAGECONTENTMASK_SINGLEDATASETMESSAGE |
(UA_JsonNetworkMessageContentMask)UA_JSONNETWORKMESSAGECONTENTMASK_PUBLISHERID |
(UA_JsonNetworkMessageContentMask)UA_JSONNETWORKMESSAGECONTENTMASK_DATASETCLASSID);
writerGroupConfig.messageSettings.content.decoded.data = Json_writerGroupMessage;
}
else
#endif
{
writerGroupConfig.encodingMimeType = UA_PUBSUB_ENCODING_UADP;
writerGroupConfig.messageSettings.encoding = UA_EXTENSIONOBJECT_DECODED;
writerGroupConfig.messageSettings.content.decoded.type = &UA_TYPES[UA_TYPES_UADPWRITERGROUPMESSAGEDATATYPE];
/* The configuration flags for the messages are encapsulated inside the
* message- and transport settings extension objects. These extension
* objects are defined by the standard. e.g.
* UadpWriterGroupMessageDataType */
writerGroupMessage = UA_UadpWriterGroupMessageDataType_new();
/* Change message settings of writerGroup to send PublisherId,
* WriterGroupId in GroupHeader and DataSetWriterId in PayloadHeader
* of NetworkMessage */
writerGroupMessage->networkMessageContentMask =
(UA_UadpNetworkMessageContentMask)(UA_UADPNETWORKMESSAGECONTENTMASK_PUBLISHERID |
(UA_UadpNetworkMessageContentMask)UA_UADPNETWORKMESSAGECONTENTMASK_GROUPHEADER |
(UA_UadpNetworkMessageContentMask)UA_UADPNETWORKMESSAGECONTENTMASK_WRITERGROUPID |
(UA_UadpNetworkMessageContentMask)UA_UADPNETWORKMESSAGECONTENTMASK_PAYLOADHEADER);
writerGroupConfig.messageSettings.content.decoded.data = writerGroupMessage;
}
/* configure the mqtt publish topic */
UA_BrokerWriterGroupTransportDataType brokerTransportSettings;
memset(&brokerTransportSettings, 0, sizeof(UA_BrokerWriterGroupTransportDataType));
/* Assign the Topic at which MQTT publish should happen */
/*ToDo: Pass the topic as argument from the writer group */
brokerTransportSettings.queueName = UA_STRING(topic);
brokerTransportSettings.resourceUri = UA_STRING_NULL;
brokerTransportSettings.authenticationProfileUri = UA_STRING_NULL;
/* Choose the QOS Level for MQTT */
brokerTransportSettings.requestedDeliveryGuarantee = UA_BROKERTRANSPORTQUALITYOFSERVICE_BESTEFFORT;
/* Encapsulate config in transportSettings */
UA_ExtensionObject transportSettings;
memset(&transportSettings, 0, sizeof(UA_ExtensionObject));
transportSettings.encoding = UA_EXTENSIONOBJECT_DECODED;
transportSettings.content.decoded.type = &UA_TYPES[UA_TYPES_BROKERWRITERGROUPTRANSPORTDATATYPE];
transportSettings.content.decoded.data = &brokerTransportSettings;
writerGroupConfig.transportSettings = transportSettings;
retval = UA_Server_addWriterGroup(server, connectionIdent, &writerGroupConfig, &writerGroupIdent);
if (retval == UA_STATUSCODE_GOOD)
UA_Server_setWriterGroupOperational(server, writerGroupIdent);
#ifdef UA_ENABLE_JSON_ENCODING
if (useJson) {
UA_JsonWriterGroupMessageDataType_delete(Json_writerGroupMessage);
}
#endif
if (!useJson && writerGroupMessage) {
UA_UadpWriterGroupMessageDataType_delete(writerGroupMessage);
}
return retval;
}
/**
* **DataSetWriter handling**
* A DataSetWriter (DSW) is the glue between the WG and the PDS. The DSW is
* linked to exactly one PDS and contains additional informations for the
* message generation.
*/
static void
addDataSetWriter(UA_Server *server, char *topic) {
/* We need now a DataSetWriter within the WriterGroup. This means we must
* create a new DataSetWriterConfig and add call the addWriterGroup function. */
UA_NodeId dataSetWriterIdent;
UA_DataSetWriterConfig dataSetWriterConfig;
memset(&dataSetWriterConfig, 0, sizeof(UA_DataSetWriterConfig));
dataSetWriterConfig.name = UA_STRING("Demo DataSetWriter");
dataSetWriterConfig.dataSetWriterId = 62541;
dataSetWriterConfig.keyFrameCount = 10;
#ifdef UA_ENABLE_JSON_ENCODING
UA_JsonDataSetWriterMessageDataType jsonDswMd;
UA_ExtensionObject messageSettings;
if(useJson) {
/* JSON config for the dataSetWriter */
jsonDswMd.dataSetMessageContentMask = (UA_JsonDataSetMessageContentMask)
(UA_JSONDATASETMESSAGECONTENTMASK_DATASETWRITERID |
UA_JSONDATASETMESSAGECONTENTMASK_SEQUENCENUMBER |
UA_JSONDATASETMESSAGECONTENTMASK_STATUS |
UA_JSONDATASETMESSAGECONTENTMASK_METADATAVERSION |
UA_JSONDATASETMESSAGECONTENTMASK_TIMESTAMP);
messageSettings.encoding = UA_EXTENSIONOBJECT_DECODED;
messageSettings.content.decoded.type = &UA_TYPES[UA_TYPES_JSONDATASETWRITERMESSAGEDATATYPE];
messageSettings.content.decoded.data = &jsonDswMd;
dataSetWriterConfig.messageSettings = messageSettings;
}
#endif
/*TODO: Modify MQTT send to add DataSetWriters broker transport settings */
/*TODO: Pass the topic as argument from the writer group */
/*TODO: Publish Metadata to metaDataQueueName */
/* configure the mqtt publish topic */
UA_BrokerDataSetWriterTransportDataType brokerTransportSettings;
memset(&brokerTransportSettings, 0, sizeof(UA_BrokerDataSetWriterTransportDataType));
/* Assign the Topic at which MQTT publish should happen */
brokerTransportSettings.queueName = UA_STRING(topic);
brokerTransportSettings.resourceUri = UA_STRING_NULL;
brokerTransportSettings.authenticationProfileUri = UA_STRING_NULL;
brokerTransportSettings.metaDataQueueName = UA_STRING(PUBLISHER_METADATAQUEUENAME);
brokerTransportSettings.metaDataUpdateTime = PUBLISHER_METADATAUPDATETIME;
/* Choose the QOS Level for MQTT */
brokerTransportSettings.requestedDeliveryGuarantee = UA_BROKERTRANSPORTQUALITYOFSERVICE_BESTEFFORT;
/* Encapsulate config in transportSettings */
UA_ExtensionObject transportSettings;
memset(&transportSettings, 0, sizeof(UA_ExtensionObject));
transportSettings.encoding = UA_EXTENSIONOBJECT_DECODED;
transportSettings.content.decoded.type = &UA_TYPES[UA_TYPES_BROKERDATASETWRITERTRANSPORTDATATYPE];
transportSettings.content.decoded.data = &brokerTransportSettings;
dataSetWriterConfig.transportSettings = transportSettings;
UA_Server_addDataSetWriter(server, writerGroupIdent, publishedDataSetIdent,
&dataSetWriterConfig, &dataSetWriterIdent);
}
/**
* That's it! You're now publishing the selected fields. Open a packet
* inspection tool of trust e.g. wireshark and take a look on the outgoing
* packages. The following graphic figures out the packages created by this
* tutorial.
*
* .. figure:: ua-wireshark-pubsub.png
* :figwidth: 100 %
* :alt: OPC UA PubSub communication in wireshark
*
* The open62541 subscriber API will be released later. If you want to process
* the the datagrams, take a look on the ua_network_pubsub_networkmessage.c
* which already contains the decoding code for UADP messages.
*
* It follows the main server code, making use of the above definitions. */
UA_Boolean running = true;
static void stopHandler(int sign) {
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "received ctrl-c");
running = false;
}
static void usage(void) {
printf("Usage: tutorial_pubsub_mqtt [--url <opc.mqtt://hostname:port>] "
"[--topic <mqttTopic>] "
"[--freq <frequency in ms> "
"[--json]\n"
" Defaults are:\n"
" - Url: opc.mqtt://127.0.0.1:1883\n"
" - Topic: customTopic\n"
" - Frequency: 500\n"
" - JSON: Off\n");
}
int main(int argc, char **argv) {
signal(SIGINT, stopHandler);
signal(SIGTERM, stopHandler);
/* TODO: Change to secure mqtt port:8883 */
char *addressUrl = BROKER_ADDRESS_URL;
char *topic = PUBLISHER_TOPIC;
int interval = PUBLISH_INTERVAL;
/* Parse arguments */
for(int argpos = 1; argpos < argc; argpos++) {
if(strcmp(argv[argpos], "--help") == 0) {
usage();
return 0;
}
if(strcmp(argv[argpos], "--json") == 0) {
useJson = true;
continue;
}
if(strcmp(argv[argpos], "--url") == 0) {
if(argpos + 1 == argc) {
usage();
return -1;
}
argpos++;
addressUrl = argv[argpos];
continue;
}
if(strcmp(argv[argpos], "--topic") == 0) {
if(argpos + 1 == argc) {
usage();
return -1;
}
argpos++;
topic = argv[argpos];
continue;
}
if(strcmp(argv[argpos], "--freq") == 0) {
if(argpos + 1 == argc) {
usage();
return -1;
}
if(sscanf(argv[argpos], "%d", &interval) != 1) {
usage();
return -1;
}
if(interval <= 10) {
UA_LOG_WARNING(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
"Publication interval too small");
return -1;
}
continue;
}
usage();
return -1;
}
UA_StatusCode retval = UA_STATUSCODE_GOOD;
/* Set up the server config */
UA_Server *server = UA_Server_new();
UA_ServerConfig *config = UA_Server_getConfig(server);
/* Details about the connection configuration and handling are located in
* the pubsub connection tutorial */
UA_ServerConfig_setDefault(config);
config->pubsubTransportLayers = (UA_PubSubTransportLayer *)
UA_malloc(1 * sizeof(UA_PubSubTransportLayer));
if(!config->pubsubTransportLayers) {
return -1;
}
config->pubsubTransportLayers[0] = UA_PubSubTransportLayerMQTT();
config->pubsubTransportLayersSize++;
addPubSubConnection(server, addressUrl);
addPublishedDataSet(server);
addDataSetField(server);
retval = addWriterGroup(server, topic, interval);
if (UA_STATUSCODE_GOOD != retval)
{
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "Error Name = %s", UA_StatusCode_name(retval));
return EXIT_FAILURE;
}
addDataSetWriter(server, topic);
UA_PubSubConnection *connection = UA_PubSubConnection_findConnectionbyId(server, connectionIdent);
if(!connection) {
UA_LOG_WARNING(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
"Could not create a PubSubConnection");
UA_Server_delete(server);
return -1;
}
UA_Server_run(server, &running);
UA_Server_delete(server);
return 0;
}
PS:函数addDataSetField()里设置了发布的node内容,可以根据自己的需要进行修改,如果node不能存,需要先在Server里添加好想要的node
这个例子提供的默认配置如下,
- topic名称: customTopic
- MQTT broker地址:opc.mqtt://127.0.0.1:1883
- 发布的时间间隔:500ms
- 发布内容:OPC UA Server的系统时间
项目工程搭建请参考第42篇文章,最后工程结构如下,
PS:open62541目录下的src_generated目录和libopen62541.a需要使用本文中重新生成的进行替换。
CMakeLists.txt内容如下,
cmake_minimum_required (VERSION 3.5)
project (pub_mqtt_demo)
include_directories(${
CMAKE_CURRENT_SOURCE_DIR}/open62541/include)
include_directories(${
CMAKE_CURRENT_SOURCE_DIR}/open62541/src_generated)
include_directories(${
CMAKE_CURRENT_SOURCE_DIR}/open62541/plugins)
include_directories(${
CMAKE_CURRENT_SOURCE_DIR}/open62541/plugins/include)
include_directories(${
CMAKE_CURRENT_SOURCE_DIR}/open62541/arch)
include_directories(${
CMAKE_CURRENT_SOURCE_DIR}/open62541/deps)
include_directories(${
CMAKE_CURRENT_SOURCE_DIR}/open62541/src/pubsub)
find_library(OPEN62541_LIB libopen62541.a HINTS ${
CMAKE_CURRENT_SOURCE_DIR}/open62541/)
add_executable(mqtt_pub ${
CMAKE_CURRENT_SOURCE_DIR}/src/tutorial_pubsub_mqtt_publish.c)
target_link_libraries(mqtt_pub ${
OPEN62541_LIB} pthread)
cd到build目录下,执行如下语句,
cmake .. && make
ok后在build目录下生成可执行文件mqtt_pub
运行之,
./mqtt_pub
这样就开始向mosquitto发布自身的系统时间了
此时打开另外一个终端,然后使用命令mosquitto_sub进行订阅,
mosquitto_sub -F '@Y-@m-@dT@H:@M:@S@z : %t : %x' -t 'customTopic' # 具体可以参考mosquitto_sub的man手册
PS:这个参数配置等价于“收到消息的时间戳 : topic名称 : 实际传输的字节”
此时可以获取发布者发送的消息,
2021-01-23T17:00:47+0800 : customTopic : f101ba08016400014df4e11018e48b3d66f1d601ec55d0179c55d01701000d68e48b3d66f1d601
2021-01-23T17:00:48+0800 : customTopic : f101ba08016400014df4e1103cefd93d66f1d601ec55d0179c55d01701000d64efd93d66f1d601
2021-01-23T17:00:48+0800 : customTopic : f101ba08016400014df4e11020c1283e66f1d601ec55d0179c55d01701000d8ec1283e66f1d601
内容解析:
-
时间戳:按照ISO-8601标准打印,即年月日时分秒,+0800是时区,因为这个打印格式只显示到秒,而发布的间隔是500ms,所以会出现2次相同的时间。
-
topic名称:customTopic,和设置的一样
-
实际数据:总共39个字节(长度和选取的node值大小有关),这个是结构体UA_NetworkMessage进行编码后的字节流,字节流类型是UA_ByteString,
typedef struct { UA_Byte version; UA_Boolean messageIdEnabled; UA_String messageId; /* For Json NetworkMessage */ UA_Boolean publisherIdEnabled; UA_Boolean groupHeaderEnabled; UA_Boolean payloadHeaderEnabled; UA_PublisherIdDatatype publisherIdType; UA_Boolean dataSetClassIdEnabled; UA_Boolean securityEnabled; UA_Boolean timestampEnabled; UA_Boolean picosecondsEnabled; UA_Boolean chunkMessage; UA_Boolean promotedFieldsEnabled; UA_NetworkMessageType networkMessageType; union { UA_Byte publisherIdByte; UA_UInt16 publisherIdUInt16; UA_UInt32 publisherIdUInt32; UA_UInt64 publisherIdUInt64; UA_Guid publisherIdGuid; UA_String publisherIdString; } publisherId; UA_Guid dataSetClassId; UA_NetworkMessageGroupHeader groupHeader; union { UA_DataSetPayloadHeader dataSetPayloadHeader; } payloadHeader; UA_DateTime timestamp; UA_UInt16 picoseconds; UA_UInt16 promotedFieldsSize; UA_Variant* promotedFields; /* BaseDataType */ UA_NetworkMessageSecurityHeader securityHeader; union { UA_DataSetPayload dataSetPayload; } payload; UA_ByteString securityFooter; UA_ByteString signature; } UA_NetworkMessage;
OPC UA Server(发布者)会调用函数sendNetworkMessage()来向MQTT发送消息,里面会调用UA_NetworkMessage_encodeBinary()把UA_NetworkMessage转为字节流,然后发布出去。
那么如何解析这个字节流呢?使用函数UA_NetworkMessage_decodeBinary()进行解析。
这里写了个简单的程序来解析,主要是参考UA_ReaderGroup_subscribeCallback(),
#include "open62541/server.h"
#include "open62541/server_config_default.h"
#include "ua_pubsub.h"
#include "ua_network_pubsub_mqtt.h"
#include "open62541/plugin/log_stdout.h"
int main()
{
//f101ba08016400014df4e11074c9fc7a67f1d6013aea4055f4e9405501000db0c9fc7a67f1d601
uint8_t encodedData[] = {
0xf1,0x01,0xba,0x08,0x01,0x64,0x00,0x01,
0x4d,0xf4,0xe1,0x10,0x74,0xc9,0xfc,0x7a,
0x67,0xf1,0xd6,0x01,0x3a,0xea,0x40,0x55,
0xf4,0xe9,0x40,0x55,0x01,0x00,0x0d,0xb0,
0xc9,0xfc,0x7a,0x67,0xf1,0xd6,0x01};
UA_ByteString buf;
buf.data = &(encodedData[0]);
buf.length = sizeof(encodedData)/sizeof(uint8_t);
UA_NetworkMessage dst;
memset(&dst, 0, sizeof(dst));
size_t offset = 0;
UA_NetworkMessage_decodeBinary(&buf, &offset, &dst);
UA_Byte anzDataSets = 1;
if(dst.payloadHeaderEnabled)
anzDataSets = dst.payloadHeader.dataSetPayloadHeader.count;
for(UA_Byte iterator = 0; iterator < anzDataSets; iterator++)
{
UA_DataSetMessage* dataSetMsg = &dst.payload.dataSetPayload.dataSetMessages[iterator];
if(dataSetMsg->header.dataSetMessageType == UA_DATASETMESSAGE_DATAKEYFRAME)
{
if(dataSetMsg->header.fieldEncoding != UA_FIELDENCODING_RAWDATA)
{
size_t anzFields = dataSetMsg->data.keyFrameData.fieldCount;
UA_StatusCode retVal = UA_STATUSCODE_GOOD;
for(UA_UInt16 i = 0; i < anzFields; i++)
{
if(dataSetMsg->data.keyFrameData.dataSetFields[i].hasValue)
{
UA_Variant *pValue = &(dataSetMsg->data.keyFrameData.dataSetFields[i].value);
if (UA_NodeId_equal(&(pValue->type->typeId), &UA_TYPES[UA_TYPES_DATETIME].typeId) == UA_FALSE)
{
continue;
}
UA_DateTime raw_date = *(UA_DateTime *) (pValue->data);
UA_DateTimeStruct dts = UA_DateTime_toStruct(raw_date);
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "date is: %u-%u-%u %u:%u:%u.%03u",
dts.year, dts.month, dts.day, dts.hour, dts.min, dts.sec, dts.milliSec);
}
}
}
}
}
}
最后打印如下,
这是系统的UTC时间
五 总结
本文讲述了OPC UA Server如何与MQTT进行通信,并使用open62541运行了一个简单demo,具体可以根据自己需要进行修改。
与MQTT的结合,可以使得OPC UA更加强大。
如果有写的不对的地方,请留言指正。谢谢。