opc ua

是一种应用层协议,基于tcp之上,其url通常为opc.tcp://127.0.0.1:4840/abc,在opc ua中常被称为endpoint

两种模式

opc ua支持c/s模式,同时也支持类似mqtt的发布订阅模式,通常各种设备作为opc ua的服务端提供各种服务。

opcua协议介绍-风君雪科技博客

信息模型

opc ua采用面向对象的设计思路, 使用了对象(objects)作为过程系统表示数据和活动的基础。对象包含了变量事件方法,它们通过引用(reference)来互相连接。

OPC UA 信息模型是节点的网络(Network of Node,),或者称为结构化图(graph),由节点(node)和引用(References)组成,这种结构图称之为OPC UA 的地址空间。这种图形结构可以描述各种各样的结构化信息(对象)。

opcua协议介绍-风君雪科技博客

注意⚠️:opc ua中所说的节点是在一个opc ua服务器中,不要理解为一个服务器对应一个node

节点

opc ua定义了8种类型的节点

对象(Object)
对象类型(Object Type)
变量(Variable)
变量类型(Variable Type)
方法(Method)
视图(View)
引用(Reference)
数据类型(Data Type)

每种节点都包含一些公共属性,如下:

属性 数据类型 说明
NodeId NodeId 在OPC UA服务器内唯一确定的一个节点,并且在OPC UA服务器中定位该节点
NodeClass Int32 该节点的类型(上面列出的8种之一)
BrowseName QualifiedName 浏览OPC UA服务器事定义的节点。它是非本地化的
DisplayName LocalizedText 包含节点的名字,用来在用户接口中显示名字,本地化
Description LocalizedText 本地化的描述(可选)
WriteMask Uint32 指示哪个节点属性是可写的,即可被OPC UA客户端修改(可选)
UserWriteMask Uint32 指示哪个节点属性可以被当前连接到服务器上的用户修改(可选)

除了数据类型节点之外,其他各个节点都有额外的专属属性

opcua协议介绍-风君雪科技博客

引用

引用描述了两个节点之间的关系,用来连接多个节点。OPC UA预定义了多种引用,常见的引用有:

hasTypeDefinition

描述对象、变量和类型之间的关系

ObjectNode的hasTypeDefinition引用,指向了一个ObjectTypeNode,表示该ObjectNode的类型;
VariableNode的hasTypeDefinition引用,指向一个VariableTypeNode,表示该 VariableNode的类型。

hasSubType

描述对象的挤成关系,当子类从父类继承后,子类拥有一个hasSubType引用指向父类。

hasComponents

描述一种组合关系

ObjectNode一般都由多个VariableNode组成,ObjectNode包含某个VariableNode时,ObjectNode拥有一个hasComponents引用,指向该VariableNode;
VariableNode也可以包含子VariableNode,此时也用hasComponents描述它们的关系。

Organizes

指明两个节点的层次结构,通过organizes可以把多个节点组织到同一个父节点下。

完整引用如下

opcua协议介绍-风君雪科技博客

服务

服务可以看成是OPC UA服务器提供的API集合,OPC UA与定义了37个标准服务,常用的服务有:

读写服务

可以获取和修改服务器指定节点指定属性的值

调用服务

执行服务器上指定节点的方法

订阅数据变化和订阅事件

可以监控服务器数据的变化

opc ua编程

Sdk

python(支持客户端和服务端)

https://github.com/FreeOpcUa/python-opcua

golang(支持客户端,服务端尚不完善

https://github.com/gopcua/opcua

客户端

opcua-client-gui

使用python(pyqt5)开发使用pip可以安装,跨平台

sudo pip3 install pyqt5 -i https://pypi.mirrors.ustc.edu.cn/simple/
sudo pip3 install numpy -i https://pypi.mirrors.ustc.edu.cn/simple/
sudo pip3 install pyqtgraph -i https://pypi.mirrors.ustc.edu.cn/simple/
sudo pip3 install cryptography -i https://pypi.mirrors.ustc.edu.cn/simple/
sudo pip3 install opcua-client -i https://pypi.mirrors.ustc.edu.cn/simple/

模拟设备

可利用sdk自己开发 见下面的python demo

golang Demo

读取服务器数据

package main

import (
	"context"
	"log"

	"github.com/gopcua/opcua"
	"github.com/gopcua/opcua/ua"
)

func main() {
	endpoint := "opc.tcp://milo.digitalpetri.com:62541/milo"
	nodeID := "ns=2;s=Dynamic/RandomFloat"

	ctx := context.Background()

	c := opcua.NewClient(endpoint, opcua.SecurityMode(ua.MessageSecurityModeNone))
	if err := c.Connect(ctx); err != nil {
		log.Fatal(err)
	}
	defer c.Close()

	id, err := ua.ParseNodeID(nodeID)
	if err != nil {
		log.Fatalf("invalid node id: %v", err)
	}

	req := &ua.ReadRequest{
		MaxAge:             2000,
		NodesToRead:        []*ua.ReadValueID{{NodeID: id}},
		TimestampsToReturn: ua.TimestampsToReturnBoth,
	}

	resp, err := c.Read(req)
	if err != nil {
		log.Fatalf("Read failed: %s", err)
	}
	if resp.Results[0].Status != ua.StatusOK {
		log.Fatalf("Status not OK: %v", resp.Results[0].Status)
	}
	log.Printf("%#v", resp.Results[0].Value.Value())
}

向服务器写数据

package main

import (
	"context"
	"github.com/gopcua/opcua"
	"github.com/gopcua/opcua/ua"
	"log"
)

func main() {
	endpoint := "opc.tcp://milo.digitalpetri.com:62541/milo"
	nodeID := "ns=2;s=Dynamic/RandomFloat"

	ctx := context.Background()

	c := opcua.NewClient(endpoint, opcua.SecurityMode(ua.MessageSecurityModeNone))
	if err := c.Connect(ctx); err != nil {
		log.Fatal(err)
	}
	defer c.Close()

	id, err := ua.ParseNodeID(nodeID)
	if err != nil {
		log.Fatalf("invalid node id: %v", err)
	}

	v, err := ua.NewVariant(10.0)
	if err != nil {
		log.Fatalf("invalid value: %v", err)
	}

	req := &ua.WriteRequest{
		NodesToWrite: []*ua.WriteValue{
			{
				NodeID:      id,
				AttributeID: ua.AttributeIDValue,
				Value: &ua.DataValue{
					EncodingMask: ua.DataValueValue,
					Value:        v,
				},
			},
		},
	}

	resp, err := c.Write(req)
	if err != nil {
		log.Fatalf("Read failed: %s", err)
	}
	log.Printf("%v", resp.Results[0])
}

监听服务器数据变化

package main

import (
	"context"
	"github.com/gopcua/opcua/monitor"
	"log"
	"os"
	"os/signal"
	"sync"
	"time"

	"github.com/gopcua/opcua"
	"github.com/gopcua/opcua/ua"
)

func cleanup(sub *monitor.Subscription, wg *sync.WaitGroup) {
	log.Printf("stats: sub=%d delivered=%d dropped=%d", sub.SubscriptionID(), sub.Delivered(), sub.Dropped())
	sub.Unsubscribe()
	wg.Done()
}

func startCallbackSub(ctx context.Context, m *monitor.NodeMonitor, interval, lag time.Duration, wg *sync.WaitGroup, nodes ...string) {
	sub, err := m.Subscribe(
		ctx,
		&opcua.SubscriptionParameters{
			Interval: interval,
		},
		func(s *monitor.Subscription, msg *monitor.DataChangeMessage) {
			if msg.Error != nil {
				log.Printf("[callback] error=%s", msg.Error)
			} else {
				log.Printf("[callback] node=%s value=%v", msg.NodeID, msg.Value.Value())
			}
			time.Sleep(lag)
		},
		nodes...)

	if err != nil {
		log.Fatal(err)
	}

	defer cleanup(sub, wg)

	<-ctx.Done()
}

func main() {
	endpoint := "opc.tcp://milo.digitalpetri.com:62541/milo"
	nodeID := "ns=2;s=Dynamic/RandomFloat"

	signalCh := make(chan os.Signal, 1)
	signal.Notify(signalCh, os.Interrupt)

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	go func() {
		<-signalCh
		println()
		cancel()
	}()

	c := opcua.NewClient(endpoint, opcua.SecurityMode(ua.MessageSecurityModeNone))
	if err := c.Connect(ctx); err != nil {
		log.Fatal(err)
	}
	defer c.Close()

	m, err := monitor.NewNodeMonitor(c)
	if err != nil {
		log.Fatal(err)
	}

	m.SetErrorHandler(func(_ *opcua.Client, sub *monitor.Subscription, err error) {
		log.Printf("error: sub=%d err=%s", sub.SubscriptionID(), err.Error())
	})
	wg := &sync.WaitGroup{}

	// start callback-based subscription
	wg.Add(1)
	go startCallbackSub(ctx, m, time.Second, 0, wg, nodeID)

	<-ctx.Done()
	wg.Wait()
}

python opcua server demo

#!/usr/bin/env python3

from threading import Thread
import random
import time
from opcua import ua, uamethod, Server

@uamethod
def set_temperature(parent, variant):
    print(f"set_temperature {variant.Value}")
    temperature_thread.temperature.set_value(variant.Value)

@uamethod
def set_onoff(parent, variant):
    print(f"set_onoff {variant.Value}")
    temperature_thread.temperature.set_value(variant.Value)

# 这个类用于后台定时随机修改值
class Temperature(Thread):
    def __init__(self, temperature, onoff):
        Thread.__init__(self)
        self._stop = False
        self.temperature = temperature
        self.onoff = onoff

    def stop(self):
        self._stop = True

    def run(self):
        count = 1
        while not self._stop:
            value = random.randint(-20, 100)
            self.temperature.set_value(value)
            print(f"random set temperature {value}")

            value = bool(random.randint(0, 1))
            self.onoff.set_value(value)
            print(f"random set onoff {value}")

            led_event.event.Message = ua.LocalizedText("high_temperature %d" % count)
            led_event.event.Severity = count
            #led_event.event.temperature = random.randint(60, 100)
            led_event.event.onoff = bool(random.randint(0, 1))
            led_event.trigger()
            count += 1

            time.sleep(10)

if __name__ == "__main__":
    # now setup our server
    server = Server()
    server.set_endpoint("opc.tcp://0.0.0.0:40840/tuyaopcua/server/")
    server.set_server_name("TuyaOpcUa Driver Demo Device")

    # set all possible endpoint policies for clients to connect through
    server.set_security_policy([
        ua.SecurityPolicyType.NoSecurity,
        ua.SecurityPolicyType.Basic128Rsa15_SignAndEncrypt,
        ua.SecurityPolicyType.Basic128Rsa15_Sign,
        ua.SecurityPolicyType.Basic256_SignAndEncrypt,
        ua.SecurityPolicyType.Basic256_Sign])

    # setup our own namespace
    uri = "http://tuya.com"
    idx = server.register_namespace(uri)

    # 添加一个 `空调` 对象
    air_conditioner = server.nodes.objects.add_object(idx, "AirConditioner")
    temperature = air_conditioner.add_variable(idx, "temperature", 20)
    temperature.set_writable()
    onoff = air_conditioner.add_variable(idx, "onoff", True)
    onoff.set_writable()


    air_conditioner.add_method(idx, "set_temperature", set_temperature, [ua.VariantType.UInt32])
    air_conditioner.add_method(idx, "set_onoff", set_onoff, [ua.VariantType.Boolean])

    # creating a default event object, the event object automatically will have members for all events properties
    led_event_type = server.create_custom_event_type(idx,
                                                     'high_temperature',
                                                     ua.ObjectIds.BaseEventType,
                                                     [('temperature', ua.VariantType.UInt32), ('onoff', ua.VariantType.Boolean)])

    led_event = server.get_event_generator(led_event_type, air_conditioner)
    led_event.event.Severity = 300

    # start opcua server
    server.start()
    print("Start opcua server...")

    temperature_thread = Temperature(temperature, onoff)
    temperature_thread.start()

    try:

        led_event.trigger(message="This is BaseEvent")

        while True:
            time.sleep(5)
    finally:
        print("Exit opcua server...")
        temperature_thread.stop()
        server.stop()