55 lines
2.0 KiB
Python
55 lines
2.0 KiB
Python
# opcua_connector.py
|
|
|
|
from opcua import Client, ua
|
|
from time import sleep, time
|
|
import datetime
|
|
import pandas as pd
|
|
from influxdb_client import InfluxDBClient, Point, WritePrecision
|
|
from influxdb_client.client.write_api import SYNCHRONOUS
|
|
|
|
|
|
|
|
|
|
class opcua_connector:
|
|
def __init__(self, config):
|
|
self.config = config
|
|
self.opcuaclient = Client(config['cred']["robot"]["url"])
|
|
self.opcuaclient.set_user(config['cred']["robot"]["username"])
|
|
self.opcuaclient.set_password(config['cred']["robot"]["password"])
|
|
self.influxclient = InfluxDBClient(url=config['cred']["influxdb"]["url"],
|
|
token=config['cred']["influxdb"]["token"],
|
|
org=config['cred']["influxdb"]["org"])
|
|
self.influx_bucket = config['cred']["influxdb"]["bucket"]
|
|
self.varlist_dict = self.read_varlist(config['data']["cfg_varlist"])
|
|
|
|
def press_btn(self, btn_name):
|
|
var = self.varlist_dict[btn_name]
|
|
print(var)
|
|
_sent_opcua_data_impuls(self, var)
|
|
|
|
def read_varlist(self, filename, DELIMITER='\t'):
|
|
df = pd.read_csv(filename, delimiter=DELIMITER, header=None)
|
|
return dict(zip(df.iloc[:, 0], df.iloc[:, 1]))
|
|
|
|
|
|
def adapt_access_rights(self, access_rights="PlcWrite"):
|
|
'''
|
|
adapt the userrights
|
|
'''
|
|
method_node = self.opcuaclient.get_node("ns=2;s=/Methods/GiveUserAccess")
|
|
object_node = self.opcuaclient.get_node("ns=2;s=/Methods")
|
|
object_node.call_method(method_node, self.config['cred']["robot"]["username"], access_rights)
|
|
|
|
def _get_opcua_data(self, var_list):
|
|
""" Retrieve OPC UA node values based on the list of variable nodes. """
|
|
return [self.opcuaclient.get_node(var).get_value() for var in var_list]
|
|
|
|
|
|
def _sent_opcua_data_impuls(self, var):
|
|
node = self.opcuaclient.get_node(var)
|
|
node.set_value(1) # Set node to 1
|
|
# time.sleep(1) # Wait for 1 second
|
|
# node.set_value(0) # Set node back to 0
|
|
|
|
|