记录一下成果 主要是 domoticz 接收我们自己定义的传感器的数据。 我准备用 Stm32 进行数据采集 然后让 ESP8266 发送出去
目录
-
MQTT的一些认识
在学习这部分的时候我主要参考了 这篇文章 http://www.1zlab.com/wiki/micropython-esp32/mqtt/
以下的部分图片来自这篇文章的截图 感谢作者的分享
这是 MQTT的模型对于我们的智能家居的系统而言 服务端自然就是安装了 domoticz的 树莓派或者类似的 平台
剩下的传感器的角色就是 发布者(publisher) 而 我们的智能受控设备就是订阅者。
在domoticz的 MQTT 服务器上面有一些的topic 你可以理解成是 报纸 有财经报纸 时政报纸 还有娱乐报纸
然后不同的受控设备 就去订阅 不同的 topic 根据topic 里面的提及自己的指令 去执行指定的动作(开灯)
publisher的角色类似于 作者, 他们把读者( MQTT 服务器 domoticz)感兴趣的信息 以json 的数据包 上传到MQTT的 端口下的指定的 topic 。
这里面 每一个输出的 topic 里面的 消息是群发的,只要你订阅了这个topic 就可以在有新的数据到来的时候收到 信息(不管这条信息 是不是给你的),然后 设备根据 消息 来判断这个信息是不是自己的。
当我们设备少的时候 我们只需要 设置一个 输出 topic (domoticz/out) 所有受控的智能设备 都会受到信息。 当我们的设备多的时候肯定不能这样干 我们就在设置一些其他的topic ,以此来减 那些硬件的负担。
我们的传感器(上传数据的)会将自己收集到的信息上传到指定的topic 然后我们的domoticz 会去分析这些数据,分类显示 或者是留着做触发等等
microPython程序的实现 main.py
import json
import simple
import time
import network
import os
sta_if = network.WLAN(network.STA_IF)
ap = network.WLAN(network.AP_IF)
ap.active(0)
sta_if.active(True)
sta_if.connect('Jeason_test', '1a2b3c4d')
sta_if.isconnected()
print(sta_if.ifconfig())
data = {
"idx" : 13,
"nvalue" : 15,
"svalue" : str(12.3)#"19.0"
}
json_data = json.dumps(data)
print(json_data)
time.sleep_ms(1000)
from machine import Pin
p2 = Pin(2, Pin.OUT)
def ctrl_cb(topic, msg):
data_dict = json.loads(msg)
if data_dict['idx'] == 1:
print("it's me")
print('state: ' + str(data_dict['nvalue']))
p2.value(bool(1- data_dict['nvalue'] ))
#print(data_dict)
c = simple.MQTTClient('esp8266', '192.168.8.4', 1883)
c.set_callback(ctrl_cb)
c.connect()
#c.subscribe('domoticz/out')
while True:
# c.check_msg()
c.publish('domoticz/in',json_data,1)
p2.value(0)
time.sleep_ms(100)
p2.value(1)
time.sleep_ms(100)
import usocket as socket
import ustruct as struct
#from ubinascii import hexlify
class MQTTException(Exception):
pass
class MQTTClient:
def __init__(self, client_id, server, port=0, user=None, password=None, keepalive=0,ssl=False, ssl_params={}):
if port == 0:
port = 8883 if ssl else 1883
self.client_id = client_id
self.sock = None
self.addr = socket.getaddrinfo(server, port)[0][-1]
self.ssl = ssl
self.ssl_params = ssl_params
self.pid = 0
self.cb = None
self.user = user
self.pswd = password
self.keepalive = keepalive
self.lw_topic = None
self.lw_msg = None
self.lw_qos = 0
self.lw_retain = False
def _send_str(self, s):
self.sock.write(struct.pack("!H", len(s)))
self.sock.write(s)
def _recv_len(self):
n = 0
sh = 0
while 1:
b = self.sock.read(1)[0]
n |= (b & 0x7f) << sh
if not b & 0x80:
return n
sh += 7
def set_callback(self, f):
self.cb = f
def set_last_will(self, topic, msg, retain=False, qos=0):
assert 0 <= qos <= 2
assert topic
self.lw_topic = topic
self.lw_msg = msg
self.lw_qos = qos
self.lw_retain = retain
def connect(self, clean_session=True):
self.sock = socket.socket()
self.sock.connect(self.addr)
if self.ssl:
import ussl
self.sock = ussl.wrap_socket(self.sock, **self.ssl_params)
msg = bytearray(b"\x10\0\0\x04MQTT\x04\x02\0\0")
msg[1] = 10 + 2 + len(self.client_id)
msg[9] = clean_session << 1
if self.user is not None:
msg[1] += 2 + len(self.user) + 2 + len(self.pswd)
msg[9] |= 0xC0
if self.keepalive:
assert self.keepalive < 65536
msg[10] |= self.keepalive >> 8
msg[11] |= self.keepalive & 0x00FF
if self.lw_topic:
msg[1] += 2 + len(self.lw_topic) + 2 + len(self.lw_msg)
msg[9] |= 0x4 | (self.lw_qos & 0x1) << 3 | (self.lw_qos & 0x2) << 3
msg[9] |= self.lw_retain << 5
self.sock.write(msg)
#print(hex(len(msg)), hexlify(msg, ":"))
self._send_str(self.client_id)
if self.lw_topic:
self._send_str(self.lw_topic)
self._send_str(self.lw_msg)
if self.user is not None:
self._send_str(self.user)
self._send_str(self.pswd)
resp = self.sock.read(4)
assert resp[0] == 0x20 and resp[1] == 0x02
if resp[3] != 0:
raise MQTTException(resp[3])
return resp[2] & 1
def disconnect(self):
self.sock.write(b"\xe0\0")
self.sock.close()
def ping(self):
self.sock.write(b"\xc0\0")
def publish(self, topic, msg, retain=False, qos=0):
pkt = bytearray(b"\x30\0\0\0")
pkt[0] |= qos << 1 | retain
sz = 2 + len(topic) + len(msg)
if qos > 0:
sz += 2
assert sz < 2097152
i = 1
while sz > 0x7f:
pkt[i] = (sz & 0x7f) | 0x80
sz >>= 7
i += 1
pkt[i] = sz
#print(hex(len(pkt)), hexlify(pkt, ":"))
self.sock.write(pkt, i + 1)
self._send_str(topic)
if qos > 0:
self.pid += 1
pid = self.pid
struct.pack_into("!H", pkt, 0, pid)
self.sock.write(pkt, 2)
self.sock.write(msg)
if qos == 1:
while 1:
op = self.wait_msg()
if op == 0x40:
sz = self.sock.read(1)
assert sz == b"\x02"
rcv_pid = self.sock.read(2)
rcv_pid = rcv_pid[0] << 8 | rcv_pid[1]
if pid == rcv_pid:
return
elif qos == 2:
assert 0
def subscribe(self, topic, qos=0):
assert self.cb is not None, "Subscribe callback is not set"
pkt = bytearray(b"\x82\0\0\0")
self.pid += 1
struct.pack_into("!BH", pkt, 1, 2 + 2 + len(topic) + 1, self.pid)
#print(hex(len(pkt)), hexlify(pkt, ":"))
self.sock.write(pkt)
self._send_str(topic)
self.sock.write(qos.to_bytes(1, "little"))
while 1:
op = self.wait_msg()
if op == 0x90:
resp = self.sock.read(4)
#print(resp)
assert resp[1] == pkt[2] and resp[2] == pkt[3]
if resp[3] == 0x80:
raise MQTTException(resp[3])
return
# Wait for a single incoming MQTT message and process it.
# Subscribed messages are delivered to a callback previously
# set by .set_callback() method. Other (internal) MQTT
# messages processed internally.
def wait_msg(self):
res = self.sock.read(1)
self.sock.setblocking(True)
if res is None:
return None
if res == b"":
raise OSError(-1)
if res == b"\xd0": # PINGRESP
sz = self.sock.read(1)[0]
assert sz == 0
return None
op = res[0]
if op & 0xf0 != 0x30:
return op
sz = self._recv_len()
topic_len = self.sock.read(2)
topic_len = (topic_len[0] << 8) | topic_len[1]
topic = self.sock.read(topic_len)
sz -= topic_len + 2
if op & 6:
pid = self.sock.read(2)
pid = pid[0] << 8 | pid[1]
sz -= 2
msg = self.sock.read(sz)
self.cb(topic, msg)
if op & 6 == 2:
pkt = bytearray(b"\x40\x02\0\0")
struct.pack_into("!H", pkt, 2, pid)
self.sock.write(pkt)
elif op & 6 == 4:
assert 0
# Checks whether a pending message from server is available.
# If not, returns immediately with None. Otherwise, does
# the same processing as wait_msg.
def check_msg(self):
self.sock.setblocking(False)
return self.wait_msg()
分析一下代码。 我们的代码 总共俩文件 第一个是main.py 另一个是 simple.py 第二个simple 主要是 mqtt 相关的函数 不是我写的 我也不明白。main.py 中其实就是实现了一个 定时发送 数据的功能。
这个是我们的测试用的字典, 这里面有我们的 domoticz 感兴趣的一些关键的键值对, 我们的数据就是通过这个上传的
这一句的作用是将 我们的字典转换成一个 json 的格式 可以用来上传
最后就是循环发送了
domoticz 服务器的一些设置
这个名字可以随便起无关紧要 ,传感器类型不同 对应的 数据包的格式也不太一样 我测试用的是温度 关于其他的传感器的数据格式 我们需要去参考 官方的 API
我们做这个domoticz的 测试很重要的一个 参考信息就是调试信息 我们可以打开设置 去找
这里有我们发送的一些信息 以及一些其他的指令
这里是一些我用到的网网址以及参考的部分 希望对您有一定的帮助
domoticz中文API https://www.domoticz.cn/wiki/Domoticz_API%E5%8F%8AJSON%E7%BD%91%E5%9D%80
https://blog.csdn.net/xxmonstor/article/details/80479851
http://www.1zlab.com/wiki/micropython-esp32/mqtt/
Python 字典 https://www.runoob.com/python3/python3-dictionary.html
micropython wiki http://docs.micropython.org/en/latest/genrst/modules.html#json