# opcua_connector.py from opcua import Client, ua from time import sleep, time import datetime import pandas as pd from influxdb_client import InfluxDBClient, Point, WritePrecision from influxdb_client.client.write_api import SYNCHRONOUS class opcua_connector: def __init__(self, config): self.config = config self.opcuaclient = Client(config['cred']["robot"]["url"]) self.opcuaclient.set_user(config['cred']["robot"]["username"]) self.opcuaclient.set_password(config['cred']["robot"]["password"]) self.influxclient = InfluxDBClient(url=config['cred']["influxdb"]["url"], token=config['cred']["influxdb"]["token"], org=config['cred']["influxdb"]["org"]) self.influx_bucket = config['cred']["influxdb"]["bucket"] self.varlist_dict = self.read_varlist(config['data']["cfg_varlist"]) def press_btn(self, btn_name): var = self.varlist_dict[btn_name] print(var) # _sent_opcua_data_impuls(self, var) def read_varlist(self, filename, DELIMITER='\t'): df = pd.read_csv(filename, delimiter=DELIMITER, header=None) return dict(zip(df.iloc[:, 0], df.iloc[:, 1])) # def track_robot_position(self, SAMPLE_DELAY=0.07, TOTMANN_SEC=3, stop_trigger_node=None, stop_trigger_value=None): # """Track robot position and check stop trigger from OPC UA client.""" # try: # start_time = time() # write_api = self.influxclient.write_api(write_options=SYNCHRONOUS) # living_point2 = 0 # treshold_living = 1 # # def RECORDING(): # fieldnames = self.var_list.values[:, 0].tolist() # nodes = self.var_list.values[:, 1].tolist() # val = self.__get_opcua_data(nodes) # points = [Point("mvalues").field(field, value).time(datetime.datetime.utcnow(), WritePrecision.NS) # for field, value in zip(fieldnames, val)] # write_api.write(bucket=self.influx_bucket, org=self.config['cred']["influxdb"]["org"], record=points) # return val[0] # # while TOTMANN_SEC == 0 or time() - start_time < TOTMANN_SEC: # # Check stop trigger # stop_trigger_value_current = self.opcuaclient.get_node(stop_trigger_node).get_value() # if stop_trigger_value_current == stop_trigger_value: # print("Stop trigger detected.") # break # Exit the loop when stop trigger is detected # living_point1 = living_point2 # living_point2 = RECORDING() # if abs(living_point1 - living_point2) >= treshold_living: # start_time = time() # Reset start time to prolong session # sleep(SAMPLE_DELAY) # # except Exception as e: # print(f"Error during tracking: {e}") def _get_opcua_data(self, var_list): """ Retrieve OPC UA node values based on the list of variable nodes. """ return [self.opcuaclient.get_node(var).get_value() for var in var_list] def _sent_opcua_data_impuls(self, var): node = self.opcuaclient.get_node(var) node.set_value(1) # Set node to 1 time.sleep(1) # Wait for 1 second node.set_value(0) # Set node back to 0