v1.0 .
This commit is contained in:
parent
512b5714d1
commit
416263bd37
5
app.py
5
app.py
@ -135,11 +135,10 @@ def push_button(OPCcon, btn_name):
|
|||||||
def initialize_connection():
|
def initialize_connection():
|
||||||
"""Initialize OPC UA connection"""
|
"""Initialize OPC UA connection"""
|
||||||
try:
|
try:
|
||||||
config = yaml.safe_load(_APP_CFG)
|
OPCcon = opcua_connector(_APP_CFG)
|
||||||
OPCcon = opcua_connector(config)
|
|
||||||
|
|
||||||
# Connect and check status
|
# Connect and check status
|
||||||
OPCcon.opcuaclient.connect()
|
OPCcon.connect()
|
||||||
if OPCcon.check_connection() == 1:
|
if OPCcon.check_connection() == 1:
|
||||||
st.session_state.opcua_connector = OPCcon
|
st.session_state.opcua_connector = OPCcon
|
||||||
st.session_state.connection_status = "Connected"
|
st.session_state.connection_status = "Connected"
|
||||||
|
|||||||
@ -16,12 +16,21 @@ class opcua_connector:
|
|||||||
self.opcuaclient = Client(config['cred']["robot"]["url"])
|
self.opcuaclient = Client(config['cred']["robot"]["url"])
|
||||||
self.opcuaclient.set_user(config['cred']["robot"]["username"])
|
self.opcuaclient.set_user(config['cred']["robot"]["username"])
|
||||||
self.opcuaclient.set_password(config['cred']["robot"]["password"])
|
self.opcuaclient.set_password(config['cred']["robot"]["password"])
|
||||||
self.influxclient = InfluxDBClient(url=config['cred']["influxdb"]["url"],
|
|
||||||
token=config['cred']["influxdb"]["token"],
|
|
||||||
org=config['cred']["influxdb"]["org"])
|
|
||||||
self.influx_bucket = config['cred']["influxdb"]["bucket"]
|
|
||||||
self.varlist_dict = self.read_varlist(config['data']["cfg_varlist"])
|
self.varlist_dict = self.read_varlist(config['data']["cfg_varlist"])
|
||||||
|
self.opcuaclient.session_timeout = 60000
|
||||||
|
|
||||||
|
make_cert()
|
||||||
|
|
||||||
|
|
||||||
|
def connect(self):
|
||||||
|
'''
|
||||||
|
connect to the robot
|
||||||
|
'''
|
||||||
|
try:
|
||||||
|
self.opcuaclient.set_security_string("Basic256Sha256,SignAndEncrypt,.cert/cert.der,.cert/key.pem")
|
||||||
|
self.opcuaclient.connect()
|
||||||
|
except Exception as e:
|
||||||
|
raise e
|
||||||
|
|
||||||
|
|
||||||
def check_connection(self):
|
def check_connection(self):
|
||||||
|
|||||||
49
tool_gencert.py
Normal file
49
tool_gencert.py
Normal file
@ -0,0 +1,49 @@
|
|||||||
|
# generate_cert.py – pure-Python, no OpenSSL needed
|
||||||
|
from cryptography import x509
|
||||||
|
from cryptography.x509.oid import NameOID
|
||||||
|
from cryptography.hazmat.primitives import hashes, serialization
|
||||||
|
from cryptography.hazmat.primitives.asymmetric import rsa
|
||||||
|
from datetime import datetime, timedelta
|
||||||
|
from pathlib import Path
|
||||||
|
import socket
|
||||||
|
|
||||||
|
def make_cert(cert_dir: str = ".cert", cn: str = "SinumerikPythonClient", DEBUG_PRINT=False):
|
||||||
|
Path(cert_dir).mkdir(exist_ok=True)
|
||||||
|
key_file = Path(cert_dir) / "key.pem"
|
||||||
|
cert_file = Path(cert_dir) / "cert.der"
|
||||||
|
|
||||||
|
key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
|
||||||
|
|
||||||
|
# *** THIS _must_ match what the client will announce later ***
|
||||||
|
hostname = socket.gethostname() # e.g. “WIN-PC-01”
|
||||||
|
app_uri = f"urn:{hostname}:{cn}" # e.g. “urn:WIN-PC-01:SinumerikPythonClient”
|
||||||
|
|
||||||
|
cert = (
|
||||||
|
x509.CertificateBuilder()
|
||||||
|
.subject_name(x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, cn)]))
|
||||||
|
.issuer_name(x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, cn)]))
|
||||||
|
.public_key(key.public_key())
|
||||||
|
.serial_number(x509.random_serial_number())
|
||||||
|
.not_valid_before(datetime.utcnow() - timedelta(minutes=1))
|
||||||
|
.not_valid_after(datetime.utcnow() + timedelta(days=365))
|
||||||
|
.add_extension( # ← embed the same URI here
|
||||||
|
x509.SubjectAlternativeName([x509.UniformResourceIdentifier(app_uri)]),
|
||||||
|
critical=False,
|
||||||
|
)
|
||||||
|
.sign(key, hashes.SHA256())
|
||||||
|
)
|
||||||
|
|
||||||
|
key_pem = key.private_bytes(
|
||||||
|
serialization.Encoding.PEM,
|
||||||
|
serialization.PrivateFormat.TraditionalOpenSSL,
|
||||||
|
serialization.NoEncryption(),
|
||||||
|
)
|
||||||
|
cert_der = cert.public_bytes(serialization.Encoding.DER)
|
||||||
|
|
||||||
|
key_file.write_bytes(key_pem)
|
||||||
|
cert_file.write_bytes(cert_der)
|
||||||
|
if DEBUG_PRINT:
|
||||||
|
print(f"Created:\n {key_file}\n {cert_file}\n URI = {app_uri}")
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
make_cert(DEBUG_PRINT=True)
|
||||||
Loading…
x
Reference in New Issue
Block a user