【设计模式】实例 - (Super Link) Serial 版 - basic

版权声明:本文为博主原创文章,转载请注明出处。 https://blog.csdn.net/qq_29757283/article/details/85420058

(Super Link) Serial 版 - basic

结合了设计模式实现的 Serial 程序(用于实现 web serial(网页版串口调试助手))

用到:

  • 单例模式 (及其变体)
  • 观察者模式
  • 状态模式

观察者模式 - 检测硬件改动

"""serialpublisher.py
Author: Joseph Lin
E-mail: [email protected]
"""

import os
import sys
import serial
import serial.tools.list_ports

#
# package path
#
# N/A

# from .SerialState import SerialDevice


class HwSubject(object):
    def __new__(cls, *args, **kw):
        if not hasattr(cls, 'instance'):
            cls.instance = super(HwSubject, cls).__new__(cls, *args, **kw)
        return cls.instance

    def __init__(self):
        # except format: "STATE_OBJ"
        self.__observers = set()
        self.__current_device = self.__get_devices_name()

    def register(self, observer_obj):
        self.__observers.add(observer_obj)

    def notifyAll(self, *args, **kw):
        for observer in self.__observers:
            observer.notify(*args, **kw)

    def __monitor_hardware_change(self):
        import time
        import copy
        while True:
            time.sleep(5)
            new_devices_plug_state = self.__get_devices_name()
            if self.__current_device != new_devices_plug_state:
                self.__current_device = new_devices_plug_state
                self.notifyAll(copy.copy(new_devices_plug_state))

    def run(self):
        from threading import Thread

        t = Thread(target=self.__monitor_hardware_change,
                   args=(), )
        t.setDaemon(True)
        t.start(); del t;

    def __get_devices_name(self):
        portList = list(serial.tools.list_ports.comports())
        return [list(portList[i])[0] for i in range(len(portList))]

    def isPlug(self, device_name):
        return device_name in self.__get_devices_name()

观察者模式-observer + 状态模式 + 观察者模式-publisher

"""SerialState.py
Author: Joseph Lin
E-mail: [email protected]
"""

import sys
import os
from abc import abstractmethod, ABC

from contextlib import contextmanager
from collections import defaultdict

#
# package path
#
# n/a

from .serialpublisher import HwSubject

class Exchange:
    def __init__(self):
        self._subscribers = set()

    def attach(self, task):
        self._subscribers.add(task)

    def detach(self, task):
        self._subscribers.remove(task)

    @contextmanager
    def subscribe(self, *tasks):
        for task in tasks:
            self.attach(task)
        try:
            yield
        finally:
            for task in tasks:
                self.detach(task)

    def send(self, msg):
        for subscribe in self._subscribers:
            subscribe.send(msg)


# -[o] take care about _exchanges and get_exchange as globle variables
_exchanges = defaultdict(Exchange)


def get_exchange(name):
    return _exchanges[name]


class SerialDevice():
    # dict: {device_name: obj_instance, }
    __share_instance_devices = dict()

    def __init__(self, device_name):
        if device_name not in self.__share_instance_devices.keys():
            print("__init__ called...")
        else:
            print("already had an instance: ", self.getInstance())

    @classmethod
    def getInstance(cls, device_name):
        if device_name not in cls.__share_instance_devices.keys():
            _instance = SerialDevice(device_name)
            cls.__share_instance_devices[device_name] = _instance

            _instance.disconnectionState = DisconnectionState(_instance)
            _instance.connectionState = ConnectionState(_instance)
            _instance.unpluggingState = UnpluggingState(_instance)
            _instance.pluggingState = PluggingState(_instance)

            _instance._states = [
                _instance.disconnectionState, _instance.connectionState,
                _instance.unpluggingState, _instance.pluggingState]
            _instance.state = _instance.unpluggingState

            _instance.device_name = device_name

            # register; self as observer
            _instance.hw_monitor = HwSubject()
            _instance.hw_monitor.register(_instance)

            # publisher; self as publisher
            _instance.exc = get_exchange(device_name)

            return _instance
        else:
            return cls.__share_instance_devices[device_name]

    def setup(self):
        if self._is_device_plugged():
            if self._is_device_avaliable():
                self.setState(self.disconnectionState)
            else:
                self.setState(self.pluggingState)
        else:
            self.setState(self.unpluggingState)

    def _is_device_avaliable(self):
        # device_name = self.device_name
        return True  # -[o] update later

    def _is_device_plugged(self):
        # use pyserial do some check
        return self.hw_monitor.isPlug(self.device_name)

    def setState(self, _state):
        if _state not in self._states:
            raise RuntimeError("Error state gived!")
        else:
            self.state = _state

    def getState(self):
        return self.state

    def connected(self):
        self.state.connected()

    def disconnected(self):
        self.state.disconnected()

    #
    # device hardware state,
    # not operate in Software, use OBSERVER
    #
    def __plugged(self):
        self.state.plugged()

    def __unplugged(self):
        self.state.unplugged()

    # OBSERVER
    def notify(self, *args, **kw):
        self.update(*args, **kw)

    def update(self, devices_name):
        _change = "plugged" if self.device_name in devices_name else "unplugged"
        state = self.getState()
        if _change == 'plugged':
            if isinstance(state, UnpluggingState):
                self.__plugged()
                # sub-publisher
                self.exc.send(_change)
        elif _change == 'unplugged':
            if not isinstance(state, UnpluggingState):
                self.__unplugged()
                self.exc.send("unplugged")
        else:
            print("{} Unknow change type: {}".format(
                self.device_name, _change),
                file=sys.stderr)


class SerialState(ABC):
    def __init__(self, serialdevice):
        self.serialdevice = serialdevice

    @abstractmethod
    def connected(self):
        pass

    @abstractmethod
    def disconnected(self):
        pass

    @abstractmethod
    def unplugged(self):
        pass

    @abstractmethod
    def plugged(self):
        pass


class DisconnectionState(SerialState):
    def connected(self):
        print("change to ConnectionState")
        self.serialdevice.setState(self.serialdevice.connectionState)

    def disconnected(self):
        # should not hanpped
        print("you can't disconnect again on disconnection state!",
              file=sys.stderr)

    def unplugged(self):
        print("Change to unplugging state")
        self.serialdevice.setState(self.serialdevice.unpluggingState)

    def plugged(self):
        print("already plugged!",
              file=sys.stderr)


class ConnectionState(SerialState):
    def connected(self):
        # should not hanpped
        print("you can't connect again on connection state!",
              file=sys.stderr)

    def disconnected(self):
        print("change to DisconnectionState")
        self.serialdevice.setState(self.serialdevice.disconnectionState)

    def unplugged(self):
        print("Change to unplugging state")
        self.serialdevice.setState(self.serialdevice.unpluggingState)

    def plugged(self):
        print("already plugged!",
              file=sys.stderr)


class UnpluggingState(SerialState):
    def connected(self):
        # should not hanpped
        print("you can't connect at unplugged state!",
              file=sys.stderr)

    def disconnected(self):
        print("Unplugging State, no need to disconnect!",
              file=sys.stderr)

    def unplugged(self):
        print("Already at unplugging state",
              file=sys.stderr)

    def plugged(self):
        print("change to plugging state")
        print("checking device is avaliable...")
        if self.serialdevice._is_device_avaliable():
            print("device is avaliable, change to disconnect state")
            self.serialdevice.setState(self.serialdevice.disconnectionState)
        else:
            print("device is unavaliable, at plugging state")


class PluggingState(SerialState):
    def connected(self):
        # should not hanpped
        if self._is_device_avaliable():
            self.serialdevice.setState(self.serialdevice.connectionState)
        else:
            print("Unavaliable, Device already be used!",
                  file=sys.stderr)

    def disconnected(self):
        print("not connected at plugging state yet!",
              file=sys.stderr)

    def unplugged(self):
        print("change to unplugging state")
        self.serialdevice.setState(self.serialdevice.unpluggingState)

    def plugged(self):
        print("already at plugged state!", file=sys.stderr)
        print("checking avaliable now...")
        if self.serialdevice._is_device_avaliable():
            print("Avaliabled, change to disconnection state")
            self.serialdevice.setState(self.serialdevice.disconnectionState)
        else:
            print("device still plugging, but unavaliable!")

运行测试/使用 demo

"""demo.py
"""
#
# demo, receive;
# Reference those codes logical, only for remeber!
#
import queue
import serial
import sys
import os
from abc import abstractmethod, ABC
from .serialState import *

def connect(device_name):
    serialState = SerialDevice.getInstance(device_name)
    state = serialState.getState()
    if isinstance(state, UnpluggingState):
        print("Unplugging!")
        return False
    elif isinstance(state, ConnectionState):
        print("Already connection!")
        return serialState.serial_handler  # this could work, but need update code later
    elif isinstance(state, DisconnectionState):
        try:
            ser = serial.Serial(device_name, 9600, timeout=1)
            serialState.serial_handler = ser
            serialState.setState(serialState.connectionState)
            print(serialState.__dict__)
            return ser
        except Exception as err:
            print("Exception: ", err)
            serialState.setState(serialState.pluggingState)
            return False
    else:
        print("plugging, but not avaliable")
        return False


class Actor(ABC):
    @abstractmethod
    def send():
        pass

class Receive(Actor):
    def __init__(self, device_name):
        self._q = queue.Queue()
        self.device_name = device_name

        self.deviceState = SerialDevice.getInstance(self.device_name)

        if not isinstance(self._get_device_state(), ConnectionState):
            raise RuntimeError("Not connected!")

    def _get_device_state(self):
        return self.deviceState.getState()

    def send(self, msg):
        self._q.put(msg)

    def _is_device_hardware_change(self):
        try:
            rst = True if self._q.get(timeout=1) == "unplugged" else False
        except queue.Empty:
            rst = False
        finally:
            return rst

    def run(self):
        while True:
            try:
                if self._is_device_hardware_change():
                    # end run()
                    raise RuntimeError("Device unplugged")
                # do the receive data things from serial
                # but do not block here, While True had
                # already make sure check receive data again and again
                _ser = self.deviceState.serial_handler
                # return "...here should be real receive data..."
                return _ser.read_all().decode()
            except serial.serialutil.SerialException:
                '''this could run first then "Observer mode" change
                '''
                raise RuntimeError("Device unavaliable!")
            except RuntimeError:
                raise

            except Exception:
                import traceback; traceback.print_exc();
                raise
        return None


def fake_views_recv_thread(_device_name):
    token = "get from POST/GET"

    def getByCommandMode(_token):
        # return "COM7"
        return _device_name

    device_name = getByCommandMode(token)
    try:
        recv_task = Receive(device_name)
    except Exception:
        raise  # Http404("not connection, shouldn't request recv")

    exc = get_exchange(device_name)
    with exc.subscribe(recv_task):
        try:
            recv_data = recv_task.run()
        except RuntimeError as e:
            print("\n", e)
            raise RuntimeError("Front-end Stop call recv again!")

    # do response to update Web-GUI display
    #   it will call this function again and again
    #   until disconnect or unplugged
    return recv_data


def front_end_js_work_simulate_as_thread(device_name):
    # front call view recv func

    while True:
        try:
            # ajax got data, will call again and again
            ajax_got_recv_data = fake_views_recv_thread(device_name)
            print(ajax_got_recv_data)
        except RuntimeError as e:
            print(e)
            break
        except Exception:
            import traceback; traceback.print_exc();
            raise

run

""" run under at ipython, jupyter notebook, or else...
"""
from .serialState import *
from .serialpublisher import HwSubject
from .demo import *
fromt threading import Thread

obj = HwSubject()
obj.isPlug("COM7")
## assume got True

serialDeviceState_COM7 = SerialDevice.getInstance("COM7")
## output:
# __init__ called...
serialDeviceState_COM7.setup()

serialDeviceState_COM7.__dict__  # obj.__dict__ could display variable states, it's useful

obj.run()
## output:
# change to plugging state
# checking device is avaliable...
# device is avaliable, change to disconnect state


## start font-end work
def run_recv():
    t = Thread(target=front_end_js_work_simulate_as_thread,
               args = ("COM7", ))
    t.setDaemon(True)
    t.start(); del t

run_recv()
## output:
# Not connected!

connect("COM7")
## output:
# ..skip...

serialDeviceState_COM7.__dict__  # here can check state again

run_recv()
## output:
# it will print when receive serial data

serialDeviceState_COM7.__dict__  # you can check state many times

#### unplugged the COM7 device: #####
##  output:
# Device unavaliable!
# Front-end Stop call recv again!
# Change to unplugging state
#
## or output display like:
# Device unplugged
# Front-end Stop call recv again!
# Change to unplugging state

最后收到外力(非程序内部导致设备状态改变)改变设备的链接状态之后,两种消息哪种输出只是取决与那个正在运行,或者调高检测频率,则是哪个先运行到。
但是不论是何种输出,设备状态总是经过观察者模式“订阅”之后,被通知,会“自动”地切换“状态”(状态模式)。
所以整体程序是相当健壮的,只要完善“使用”的功能即可。

  1. 即增加一个“发送”功能 - 创建 state 实例(单例模式,同一个实例,共同的状态),判断是否是链接状态即可(链接状态才可以发送数据)。
  2. 如果在 GUI (desktop/web)上显示“链接/断开”字样,再增加一个观察者模式的 client “注册”状态通知即可。
    • desktop GUI - 在观察者模式 client 的 send 中直接改变显示即可
    • web GUI - ajax 请求,后台建立一个注册后一直等待变化的线程,send 之后,收到状态变化,返回给前端即可。

缺点:
逻辑有一定的复杂度。需要理解。

优点:
各自做各自的事情。发送负责发送,接收负责接收,当有状态变化时,“自然有人通知”;“链接/断开”字样(按钮)负责反馈设备状态。

即,不用在发送的时候检测设备状态之后,还要考虑要不要去负责改变“链接/断开”字样这个行为,同理,接收的时候也不用在出现接收异常等情况的时候,考虑要不要去负责改变“链接/断开”字样这个行为。

虽然这种用途下,增加“状态” 或 “行为”的情况不多见,但是作为练习一下 设计模式还是可以的。

如果需要“定制”功能 即行为,变得自动化等… 在增加行为的时候反而需要考虑的东西很简单和直接。

最后,这转变成了是一种解决问题的方式,程序逻辑有一定的复杂度,但是也并不算太复杂。但是条理变得十分清晰。解决问题(实现功能)的思维也大不相同。

猜你喜欢

转载自blog.csdn.net/qq_29757283/article/details/85420058