# opcua_connector.py from opcua import Client, ua from time import sleep, time import datetime import pandas as pd from influxdb_client import InfluxDBClient, Point, WritePrecision from influxdb_client.client.write_api import SYNCHRONOUS from tool_gencert import make_cert class opcua_connector: def __init__(self, config): self.config = config self.opcuaclient = Client(config['cred']["robot"]["url"]) self.opcuaclient.set_user(config['cred']["robot"]["username"]) self.opcuaclient.set_password(config['cred']["robot"]["password"]) self.varlist_dict = self.read_varlist(config['data']["cfg_varlist"]) self.opcuaclient.session_timeout = 60000 make_cert() def connect(self): ''' connect to the robot ''' try: self.opcuaclient.set_security_string("Basic256Sha256,SignAndEncrypt,.cert/cert.der,.cert/key.pem") self.opcuaclient.connect() except Exception as e: raise e def check_connection(self): ''' check if the connection is established ''' try: if self.opcuaclient: print("ROBOT-OPCUA connection established.") return 1 else: print("ROBOT-OPCUA connection not established.") return 0 except Exception as e: print(f"ROBOT-OPCUA Error establishing connection") return 0 def disconnect(self): ''' disconnect from the robot ''' try: self.opcuaclient.disconnect() except Exception as e: raise e def press_btn(self, btn_name): var = self.varlist_dict[btn_name] print(var) _sent_opcua_data_impuls(self, var) def read_varlist(self, filename, DELIMITER='\t'): df = pd.read_csv(filename, delimiter=DELIMITER, header=None) return dict(zip(df.iloc[:, 0], df.iloc[:, 1])) def adapt_access_rights(self, access_rights="PlcWrite"): ''' adapt the userrights ''' method_node = self.opcuaclient.get_node("ns=2;s=/Methods/GiveUserAccess") object_node = self.opcuaclient.get_node("ns=2;s=/Methods") object_node.call_method(method_node, self.config['cred']["robot"]["username"], access_rights) def _get_opcua_data(self, var_list): """ Retrieve OPC UA node values based on the list of variable nodes. """ return [self.opcuaclient.get_node(var).get_value() for var in var_list] def _sent_opcua_data_impuls(self, var): node = self.opcuaclient.get_node(var) node.set_value(1) # Set node to 1 # time.sleep(1) # Wait for 1 second # node.set_value(0) # Set node back to 0