版权声明:本文为博主原创文章,请转载时注明出处,欢迎浏览! https://blog.csdn.net/leemboy/article/details/81107378
OPCUA Server侧Push代码如下:
--------------------------------------------
import logging import pickle import random from opcua import ua, Server class data_struct: def _init_(self,para1,para2,para3): self.para1=para1 self.para2=para2 self.para3=para3 ###.................................. if __name__ == "__main__": #the following is only for debuging logging.basicConfig(level=logging.WARN) logger = logging.getLogger("opcua.server.internal_subscription") logger.setLevel(logging.DEBUG)#logging level of this logger: 10 server = Server() server.set_endpoint("opc.tcp://127.0.0.1:4840/freeopcua/server/") # setup namespace uri = "http://examples.io"# this can be any idx = server.register_namespace(uri) #register the uri name space in the server print("idx is :",idx) objs = server.get_objects_node() print("obj is: ", objs) myobj = objs.add_object(idx, "OT_Object") # populating our address space print("myobj is: ",myobj) data = data_struct() custom_etype = server.nodes.base_event_type.add_object_type(1, 'TSS_EVENT_MSG') print("custom_etype is: ",custom_etype) server.start() try: import time count=0 while True: time.sleep(3) #we only use random value as a demo data.para1 = 12345+random.randint(10,90) data.para2 = "para2"+str(random.randint(10,90)) data.para3 = "para3"+str(random.randint(10,90)) data_byte = pickle.dumps(data) #Serilization #Here 'MyObjProperty' is a object property name which can be indexed and accessed in client side. custom_etype.add_property(0, 'MyObjProperty', ua.Variant(data_byte, ua.VariantType.ByteString)) event_obj = server.get_event_generator(custom_etype, myobj) event_obj.event.Security =count #Trigger the messager push with the message name "YOUR_EVENT_MSG_NAME” event_obj.trigger(message="YOUR_EVENT_MSG_NAME %d" % count) count += 1 #embed() finally: server.stop()
-----------------------------------------------------------------
OPCUA Client 侧基于事件驱动的对象Pull代码
import pickle from IPython import embed from opcua import Client class data_struct: def _init_(self,para1,para2,para3): self.para1=para1 self.para2=para2 self.para3=para3 #................................. class SubHandler(object): #this is event notification , you can get object and deseriliazation it def event_notification(self, event): data_st=data_struct() data_st=pickle.loads(event.MyObjProperty) print("************************************") print("Data para1: ",data_st.para1) print("Data para2: ",data_st.para2) print("Data para3: ",data_st.para3) if __name__ == "__main__": server_url="opc.tcp://127.0.0.1:4840/freeopcua/server/" client = Client(server_url) try: client.connect() root = client.get_root_node() print("root is :",root) obj = root.get_child(["0:Objects", "2:OT_Object"]) print("obj is: ",obj) myevent_tss = root.get_child(["0:Types", "0:EventTypes","0:BaseEventType", "1:YOUR_EVENT_MSG_NAME"]) print("event is: ",myevent_tss) msclt_tss = SubHandler() sub_tss = client.create_subscription(100, msclt_tss) handle_tss = sub_tss.subscribe_events(obj, myevent_tss) embed() sub_tss.unsubscribe(handle_tss) sub_tss.delete() finally: client.disconnect()