From 416263bd37620beeea01590634fd396fc6e3c21c Mon Sep 17 00:00:00 2001 From: Eduard Gerlitz Date: Fri, 5 Sep 2025 15:31:44 +0200 Subject: [PATCH] v1.0 . --- app.py | 5 ++--- opcua_connector.py | 17 ++++++++++++---- tool_gencert.py | 49 ++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 64 insertions(+), 7 deletions(-) create mode 100644 tool_gencert.py diff --git a/app.py b/app.py index 817ed79..28ba222 100644 --- a/app.py +++ b/app.py @@ -135,11 +135,10 @@ def push_button(OPCcon, btn_name): def initialize_connection(): """Initialize OPC UA connection""" try: - config = yaml.safe_load(_APP_CFG) - OPCcon = opcua_connector(config) + OPCcon = opcua_connector(_APP_CFG) # Connect and check status - OPCcon.opcuaclient.connect() + OPCcon.connect() if OPCcon.check_connection() == 1: st.session_state.opcua_connector = OPCcon st.session_state.connection_status = "Connected" diff --git a/opcua_connector.py b/opcua_connector.py index 8796de9..88b311a 100644 --- a/opcua_connector.py +++ b/opcua_connector.py @@ -16,12 +16,21 @@ class opcua_connector: self.opcuaclient = Client(config['cred']["robot"]["url"]) self.opcuaclient.set_user(config['cred']["robot"]["username"]) self.opcuaclient.set_password(config['cred']["robot"]["password"]) - self.influxclient = InfluxDBClient(url=config['cred']["influxdb"]["url"], - token=config['cred']["influxdb"]["token"], - org=config['cred']["influxdb"]["org"]) - self.influx_bucket = config['cred']["influxdb"]["bucket"] self.varlist_dict = self.read_varlist(config['data']["cfg_varlist"]) + self.opcuaclient.session_timeout = 60000 + make_cert() + + + def connect(self): + ''' + connect to the robot + ''' + try: + self.opcuaclient.set_security_string("Basic256Sha256,SignAndEncrypt,.cert/cert.der,.cert/key.pem") + self.opcuaclient.connect() + except Exception as e: + raise e def check_connection(self): diff --git a/tool_gencert.py b/tool_gencert.py new file mode 100644 index 0000000..0f97bd1 --- /dev/null +++ b/tool_gencert.py @@ -0,0 +1,49 @@ +# generate_cert.py – pure-Python, no OpenSSL needed +from cryptography import x509 +from cryptography.x509.oid import NameOID +from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives.asymmetric import rsa +from datetime import datetime, timedelta +from pathlib import Path +import socket + +def make_cert(cert_dir: str = ".cert", cn: str = "SinumerikPythonClient", DEBUG_PRINT=False): + Path(cert_dir).mkdir(exist_ok=True) + key_file = Path(cert_dir) / "key.pem" + cert_file = Path(cert_dir) / "cert.der" + + key = rsa.generate_private_key(public_exponent=65537, key_size=2048) + + # *** THIS _must_ match what the client will announce later *** + hostname = socket.gethostname() # e.g. “WIN-PC-01” + app_uri = f"urn:{hostname}:{cn}" # e.g. “urn:WIN-PC-01:SinumerikPythonClient” + + cert = ( + x509.CertificateBuilder() + .subject_name(x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, cn)])) + .issuer_name(x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, cn)])) + .public_key(key.public_key()) + .serial_number(x509.random_serial_number()) + .not_valid_before(datetime.utcnow() - timedelta(minutes=1)) + .not_valid_after(datetime.utcnow() + timedelta(days=365)) + .add_extension( # ← embed the same URI here + x509.SubjectAlternativeName([x509.UniformResourceIdentifier(app_uri)]), + critical=False, + ) + .sign(key, hashes.SHA256()) + ) + + key_pem = key.private_bytes( + serialization.Encoding.PEM, + serialization.PrivateFormat.TraditionalOpenSSL, + serialization.NoEncryption(), + ) + cert_der = cert.public_bytes(serialization.Encoding.DER) + + key_file.write_bytes(key_pem) + cert_file.write_bytes(cert_der) + if DEBUG_PRINT: + print(f"Created:\n {key_file}\n {cert_file}\n URI = {app_uri}") + +if __name__ == "__main__": + make_cert(DEBUG_PRINT=True)