版权声明:本文为博主原创文章,转载请注明出处。 https://blog.csdn.net/qq_29757283/article/details/85420058
(Super Link) Serial 版 - basic
结合了设计模式实现的 Serial 程序(用于实现 web serial(网页版串口调试助手))
用到:
- 单例模式 (及其变体)
- 观察者模式
- 状态模式
观察者模式 - 检测硬件改动
"""serialpublisher.py
Author: Joseph Lin
E-mail: [email protected]
"""
import os
import sys
import serial
import serial.tools.list_ports
#
# package path
#
# N/A
# from .SerialState import SerialDevice
class HwSubject(object):
def __new__(cls, *args, **kw):
if not hasattr(cls, 'instance'):
cls.instance = super(HwSubject, cls).__new__(cls, *args, **kw)
return cls.instance
def __init__(self):
# except format: "STATE_OBJ"
self.__observers = set()
self.__current_device = self.__get_devices_name()
def register(self, observer_obj):
self.__observers.add(observer_obj)
def notifyAll(self, *args, **kw):
for observer in self.__observers:
observer.notify(*args, **kw)
def __monitor_hardware_change(self):
import time
import copy
while True:
time.sleep(5)
new_devices_plug_state = self.__get_devices_name()
if self.__current_device != new_devices_plug_state:
self.__current_device = new_devices_plug_state
self.notifyAll(copy.copy(new_devices_plug_state))
def run(self):
from threading import Thread
t = Thread(target=self.__monitor_hardware_change,
args=(), )
t.setDaemon(True)
t.start(); del t;
def __get_devices_name(self):
portList = list(serial.tools.list_ports.comports())
return [list(portList[i])[0] for i in range(len(portList))]
def isPlug(self, device_name):
return device_name in self.__get_devices_name()
观察者模式-observer + 状态模式 + 观察者模式-publisher
"""SerialState.py
Author: Joseph Lin
E-mail: [email protected]
"""
import sys
import os
from abc import abstractmethod, ABC
from contextlib import contextmanager
from collections import defaultdict
#
# package path
#
# n/a
from .serialpublisher import HwSubject
class Exchange:
def __init__(self):
self._subscribers = set()
def attach(self, task):
self._subscribers.add(task)
def detach(self, task):
self._subscribers.remove(task)
@contextmanager
def subscribe(self, *tasks):
for task in tasks:
self.attach(task)
try:
yield
finally:
for task in tasks:
self.detach(task)
def send(self, msg):
for subscribe in self._subscribers:
subscribe.send(msg)
# -[o] take care about _exchanges and get_exchange as globle variables
_exchanges = defaultdict(Exchange)
def get_exchange(name):
return _exchanges[name]
class SerialDevice():
# dict: {device_name: obj_instance, }
__share_instance_devices = dict()
def __init__(self, device_name):
if device_name not in self.__share_instance_devices.keys():
print("__init__ called...")
else:
print("already had an instance: ", self.getInstance())
@classmethod
def getInstance(cls, device_name):
if device_name not in cls.__share_instance_devices.keys():
_instance = SerialDevice(device_name)
cls.__share_instance_devices[device_name] = _instance
_instance.disconnectionState = DisconnectionState(_instance)
_instance.connectionState = ConnectionState(_instance)
_instance.unpluggingState = UnpluggingState(_instance)
_instance.pluggingState = PluggingState(_instance)
_instance._states = [
_instance.disconnectionState, _instance.connectionState,
_instance.unpluggingState, _instance.pluggingState]
_instance.state = _instance.unpluggingState
_instance.device_name = device_name
# register; self as observer
_instance.hw_monitor = HwSubject()
_instance.hw_monitor.register(_instance)
# publisher; self as publisher
_instance.exc = get_exchange(device_name)
return _instance
else:
return cls.__share_instance_devices[device_name]
def setup(self):
if self._is_device_plugged():
if self._is_device_avaliable():
self.setState(self.disconnectionState)
else:
self.setState(self.pluggingState)
else:
self.setState(self.unpluggingState)
def _is_device_avaliable(self):
# device_name = self.device_name
return True # -[o] update later
def _is_device_plugged(self):
# use pyserial do some check
return self.hw_monitor.isPlug(self.device_name)
def setState(self, _state):
if _state not in self._states:
raise RuntimeError("Error state gived!")
else:
self.state = _state
def getState(self):
return self.state
def connected(self):
self.state.connected()
def disconnected(self):
self.state.disconnected()
#
# device hardware state,
# not operate in Software, use OBSERVER
#
def __plugged(self):
self.state.plugged()
def __unplugged(self):
self.state.unplugged()
# OBSERVER
def notify(self, *args, **kw):
self.update(*args, **kw)
def update(self, devices_name):
_change = "plugged" if self.device_name in devices_name else "unplugged"
state = self.getState()
if _change == 'plugged':
if isinstance(state, UnpluggingState):
self.__plugged()
# sub-publisher
self.exc.send(_change)
elif _change == 'unplugged':
if not isinstance(state, UnpluggingState):
self.__unplugged()
self.exc.send("unplugged")
else:
print("{} Unknow change type: {}".format(
self.device_name, _change),
file=sys.stderr)
class SerialState(ABC):
def __init__(self, serialdevice):
self.serialdevice = serialdevice
@abstractmethod
def connected(self):
pass
@abstractmethod
def disconnected(self):
pass
@abstractmethod
def unplugged(self):
pass
@abstractmethod
def plugged(self):
pass
class DisconnectionState(SerialState):
def connected(self):
print("change to ConnectionState")
self.serialdevice.setState(self.serialdevice.connectionState)
def disconnected(self):
# should not hanpped
print("you can't disconnect again on disconnection state!",
file=sys.stderr)
def unplugged(self):
print("Change to unplugging state")
self.serialdevice.setState(self.serialdevice.unpluggingState)
def plugged(self):
print("already plugged!",
file=sys.stderr)
class ConnectionState(SerialState):
def connected(self):
# should not hanpped
print("you can't connect again on connection state!",
file=sys.stderr)
def disconnected(self):
print("change to DisconnectionState")
self.serialdevice.setState(self.serialdevice.disconnectionState)
def unplugged(self):
print("Change to unplugging state")
self.serialdevice.setState(self.serialdevice.unpluggingState)
def plugged(self):
print("already plugged!",
file=sys.stderr)
class UnpluggingState(SerialState):
def connected(self):
# should not hanpped
print("you can't connect at unplugged state!",
file=sys.stderr)
def disconnected(self):
print("Unplugging State, no need to disconnect!",
file=sys.stderr)
def unplugged(self):
print("Already at unplugging state",
file=sys.stderr)
def plugged(self):
print("change to plugging state")
print("checking device is avaliable...")
if self.serialdevice._is_device_avaliable():
print("device is avaliable, change to disconnect state")
self.serialdevice.setState(self.serialdevice.disconnectionState)
else:
print("device is unavaliable, at plugging state")
class PluggingState(SerialState):
def connected(self):
# should not hanpped
if self._is_device_avaliable():
self.serialdevice.setState(self.serialdevice.connectionState)
else:
print("Unavaliable, Device already be used!",
file=sys.stderr)
def disconnected(self):
print("not connected at plugging state yet!",
file=sys.stderr)
def unplugged(self):
print("change to unplugging state")
self.serialdevice.setState(self.serialdevice.unpluggingState)
def plugged(self):
print("already at plugged state!", file=sys.stderr)
print("checking avaliable now...")
if self.serialdevice._is_device_avaliable():
print("Avaliabled, change to disconnection state")
self.serialdevice.setState(self.serialdevice.disconnectionState)
else:
print("device still plugging, but unavaliable!")
运行测试/使用 demo
"""demo.py
"""
#
# demo, receive;
# Reference those codes logical, only for remeber!
#
import queue
import serial
import sys
import os
from abc import abstractmethod, ABC
from .serialState import *
def connect(device_name):
serialState = SerialDevice.getInstance(device_name)
state = serialState.getState()
if isinstance(state, UnpluggingState):
print("Unplugging!")
return False
elif isinstance(state, ConnectionState):
print("Already connection!")
return serialState.serial_handler # this could work, but need update code later
elif isinstance(state, DisconnectionState):
try:
ser = serial.Serial(device_name, 9600, timeout=1)
serialState.serial_handler = ser
serialState.setState(serialState.connectionState)
print(serialState.__dict__)
return ser
except Exception as err:
print("Exception: ", err)
serialState.setState(serialState.pluggingState)
return False
else:
print("plugging, but not avaliable")
return False
class Actor(ABC):
@abstractmethod
def send():
pass
class Receive(Actor):
def __init__(self, device_name):
self._q = queue.Queue()
self.device_name = device_name
self.deviceState = SerialDevice.getInstance(self.device_name)
if not isinstance(self._get_device_state(), ConnectionState):
raise RuntimeError("Not connected!")
def _get_device_state(self):
return self.deviceState.getState()
def send(self, msg):
self._q.put(msg)
def _is_device_hardware_change(self):
try:
rst = True if self._q.get(timeout=1) == "unplugged" else False
except queue.Empty:
rst = False
finally:
return rst
def run(self):
while True:
try:
if self._is_device_hardware_change():
# end run()
raise RuntimeError("Device unplugged")
# do the receive data things from serial
# but do not block here, While True had
# already make sure check receive data again and again
_ser = self.deviceState.serial_handler
# return "...here should be real receive data..."
return _ser.read_all().decode()
except serial.serialutil.SerialException:
'''this could run first then "Observer mode" change
'''
raise RuntimeError("Device unavaliable!")
except RuntimeError:
raise
except Exception:
import traceback; traceback.print_exc();
raise
return None
def fake_views_recv_thread(_device_name):
token = "get from POST/GET"
def getByCommandMode(_token):
# return "COM7"
return _device_name
device_name = getByCommandMode(token)
try:
recv_task = Receive(device_name)
except Exception:
raise # Http404("not connection, shouldn't request recv")
exc = get_exchange(device_name)
with exc.subscribe(recv_task):
try:
recv_data = recv_task.run()
except RuntimeError as e:
print("\n", e)
raise RuntimeError("Front-end Stop call recv again!")
# do response to update Web-GUI display
# it will call this function again and again
# until disconnect or unplugged
return recv_data
def front_end_js_work_simulate_as_thread(device_name):
# front call view recv func
while True:
try:
# ajax got data, will call again and again
ajax_got_recv_data = fake_views_recv_thread(device_name)
print(ajax_got_recv_data)
except RuntimeError as e:
print(e)
break
except Exception:
import traceback; traceback.print_exc();
raise
run
""" run under at ipython, jupyter notebook, or else...
"""
from .serialState import *
from .serialpublisher import HwSubject
from .demo import *
fromt threading import Thread
obj = HwSubject()
obj.isPlug("COM7")
## assume got True
serialDeviceState_COM7 = SerialDevice.getInstance("COM7")
## output:
# __init__ called...
serialDeviceState_COM7.setup()
serialDeviceState_COM7.__dict__ # obj.__dict__ could display variable states, it's useful
obj.run()
## output:
# change to plugging state
# checking device is avaliable...
# device is avaliable, change to disconnect state
## start font-end work
def run_recv():
t = Thread(target=front_end_js_work_simulate_as_thread,
args = ("COM7", ))
t.setDaemon(True)
t.start(); del t
run_recv()
## output:
# Not connected!
connect("COM7")
## output:
# ..skip...
serialDeviceState_COM7.__dict__ # here can check state again
run_recv()
## output:
# it will print when receive serial data
serialDeviceState_COM7.__dict__ # you can check state many times
#### unplugged the COM7 device: #####
## output:
# Device unavaliable!
# Front-end Stop call recv again!
# Change to unplugging state
#
## or output display like:
# Device unplugged
# Front-end Stop call recv again!
# Change to unplugging state
最后收到外力(非程序内部导致设备状态改变)改变设备的链接状态之后,两种消息哪种输出只是取决与那个正在运行,或者调高检测频率,则是哪个先运行到。
但是不论是何种输出,设备状态总是经过观察者模式“订阅”之后,被通知,会“自动”地切换“状态”(状态模式)。
所以整体程序是相当健壮的,只要完善“使用”的功能即可。
- 即增加一个“发送”功能 - 创建 state 实例(单例模式,同一个实例,共同的状态),判断是否是链接状态即可(链接状态才可以发送数据)。
- 如果在 GUI (desktop/web)上显示“链接/断开”字样,再增加一个观察者模式的 client “注册”状态通知即可。
- desktop GUI - 在观察者模式 client 的
send
中直接改变显示即可 - web GUI - ajax 请求,后台建立一个注册后一直等待变化的线程,
send
之后,收到状态变化,返回给前端即可。
- desktop GUI - 在观察者模式 client 的
缺点:
逻辑有一定的复杂度。需要理解。
优点:
各自做各自的事情。发送负责发送,接收负责接收,当有状态变化时,“自然有人通知”;“链接/断开”字样(按钮)负责反馈设备状态。
即,不用在发送的时候检测设备状态之后,还要考虑要不要去负责改变“链接/断开”字样这个行为,同理,接收的时候也不用在出现接收异常等情况的时候,考虑要不要去负责改变“链接/断开”字样这个行为。
虽然这种用途下,增加“状态” 或 “行为”的情况不多见,但是作为练习一下 设计模式还是可以的。
如果需要“定制”功能 即行为,变得自动化等… 在增加行为的时候反而需要考虑的东西很简单和直接。
最后,这转变成了是一种解决问题的方式,程序逻辑有一定的复杂度,但是也并不算太复杂。但是条理变得十分清晰。解决问题(实现功能)的思维也大不相同。