opcuaCOM_PY/RUN_BTN.py
2025-04-04 11:15:37 +02:00

72 lines
2.1 KiB
Python

import yaml
from opcua_connector import opcua_connector
import time
import argparse
def check_opcua_server(OPCcon):
"""State 0: Check if OPC UA server is reachable."""
print("---- State 0 ----")
try:
OPCcon.opcuaclient.connect() # Connection happens here
print("OPC UA server is reachable.")
return 1 # Move to state 1
except Exception as e:
print(f"Error connecting to OPC UA server: {e}")
return 99
def establish_connection(OPCcon):
"""State 1: Establish connection to OPC UA server."""
print("---- State 1 ----")
try:
if OPCcon.opcuaclient.uaclient:
print("OPC UA connection established.")
return 2
else:
print(f"Error establishing connection")
return 99
except Exception as e:
print(f"Error establishing connection: {e}")
return 99
def push_btn(OPCcon, btn_name):
"""State 2:"""
print("---- State 2 ----")
try:
OPCcon.press_btn(btn_name)
print("Press button done.")
return 99
except Exception as e:
print(f"Error in Press button: {e}.")
return 99
if __name__ == '__main__':
# Argument parsing
parser = argparse.ArgumentParser(description='Run OPC UA button press script.')
parser.add_argument('-n', '--name', type=str, required=True, help='Button name to press (BTN_START or BTN_STOP)')
args = parser.parse_args()
btn_name = args.name # Get button name from arguments
# btn_name = 'BTN_STOP' # Get button name from arguments
# INIT
config = yaml.safe_load(open('./cfg.yaml'))
OPCcon = opcua_connector(config)
# State machine loop
state = 0
active = True
while active:
if state == 0:
state = check_opcua_server(OPCcon)
elif state == 1:
state = establish_connection(OPCcon)
elif state == 2:
state = push_btn(OPCcon, btn_name)
else:
active = False
# Close the InfluxDB connection when done
OPCcon.influxclient.close()
OPCcon.opcuaclient.disconnect() # Disconnect from OPC UA after state machine finishes