opcuaCOM_PY/opcua_connector.py
Eduard Gerlitz 88eb33fde4 v1.0 HF4
2025-09-05 16:00:06 +02:00

92 lines
2.7 KiB
Python

# opcua_connector.py
from opcua import Client, ua
from time import sleep, time
import datetime
import pandas as pd
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS
from tool_gencert import make_cert
class opcua_connector:
def __init__(self, config):
self.config = config
self.opcuaclient = Client(config['cred']["robot"]["url"])
self.opcuaclient.set_user(config['cred']["robot"]["username"])
self.opcuaclient.set_password(config['cred']["robot"]["password"])
self.varlist_dict = self.read_varlist(config['data']["cfg_varlist"])
self.opcuaclient.session_timeout = 60000
#make_cert()
def connect(self):
'''
connect to the robot
'''
try:
#self.opcuaclient.set_security_string("Basic256Sha256,SignAndEncrypt,.cert/cert.der,.cert/key.pem")
self.opcuaclient.connect()
except Exception as e:
raise e
def check_connection(self):
'''
check if the connection is established
'''
try:
if self.opcuaclient:
print("ROBOT-OPCUA connection established.")
return 1
else:
print("ROBOT-OPCUA connection not established.")
return 0
except Exception as e:
print(f"ROBOT-OPCUA Error establishing connection")
return 0
def disconnect(self):
'''
disconnect from the robot
'''
try:
self.opcuaclient.disconnect()
except Exception as e:
raise e
def press_btn(self, btn_name):
var = self.varlist_dict[btn_name]
print(var)
_sent_opcua_data_impuls(self, var)
def read_varlist(self, filename, DELIMITER='\t'):
df = pd.read_csv(filename, delimiter=DELIMITER, header=None)
return dict(zip(df.iloc[:, 0], df.iloc[:, 1]))
def adapt_access_rights(self, access_rights="PlcWrite"):
'''
adapt the userrights
'''
method_node = self.opcuaclient.get_node("ns=2;s=/Methods/GiveUserAccess")
object_node = self.opcuaclient.get_node("ns=2;s=/Methods")
object_node.call_method(method_node, self.config['cred']["robot"]["username"], access_rights)
def _get_opcua_data(self, var_list):
""" Retrieve OPC UA node values based on the list of variable nodes. """
return [self.opcuaclient.get_node(var).get_value() for var in var_list]
def _sent_opcua_data_impuls(self, var):
node = self.opcuaclient.get_node(var)
node.set_value(1) # Set node to 1
# time.sleep(1) # Wait for 1 second
# node.set_value(0) # Set node back to 0