Files
ess-moxa-configuration-tools/Python/cube_activate_ssh.py
2025-12-04 08:11:00 +01:00

126 lines
3.7 KiB
Python

import requests
import json
import os
import urllib3
from dotenv import load_dotenv
import time
import sys
def resource_path(relative_path):
""" Get absolute path to resource, works for dev and for PyInstaller """
try:
# PyInstaller creates a temp folder and stores path in _MEIPASS
base_path = sys._MEIPASS
except Exception:
base_path = os.path.abspath(".")
return os.path.join(base_path, relative_path)
dotenv_path = resource_path('.env')
load_dotenv(dotenv_path=dotenv_path)
def authenticate(base_url):
"""
Authenticate with the CUBE API using username, password and certificate.
Returns the JWT token if successful.
"""
auth_url = f"{base_url}/api/auth"
ENV_WEB = {
"DEFAULT_CUBE_WEB_ADMIN_USER": os.getenv("DEFAULT_CUBE_WEB_ADMIN_USER"),
"DEFAULT_CUBE_WEB_ADMIN_PASSWORD": os.getenv("DEFAULT_CUBE_WEB_ADMIN_PASSWORD"),
"DEFAULT_CERTIFICATE": os.getenv("DEFAULT_CERTIFICATE")
}
username = ENV_WEB["DEFAULT_CUBE_WEB_ADMIN_USER"]
password = ENV_WEB["DEFAULT_CUBE_WEB_ADMIN_PASSWORD"]
certificate = ENV_WEB["DEFAULT_CERTIFICATE"].encode("utf-8")
auth_params = {
"login": username,
"password": password
}
files = {
"params": (None, json.dumps(auth_params), "application/json"),
"certificate": ("certificate.pem", certificate, "application/octet-stream")
}
try:
response = requests.post(auth_url, files=files, verify=False, timeout=10)
response.raise_for_status() # Raise exception for 4XX/5XX responses
# Extract token from response
auth_data = response.json()
token = auth_data.get("token")
if not token:
raise requests.exceptions.RequestException
return token
except requests.exceptions.RequestException as e:
if hasattr(e, 'response') and e.response:
raise Exception(e.response)
else:
raise
def set_ssh_status(base_url, token):
"""
Set SSH status (enable) using the provided JWT token.
"""
ssh_url = f"{base_url}/api/ssh"
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {token}"
}
# Set new SSH status
payload = { "currentStatus": True }
try:
response = requests.post(ssh_url, headers=headers, json=payload, verify=False, timeout=10)
response.raise_for_status()
except requests.exceptions.RequestException as e:
if hasattr(e, 'response') and e.response:
raise Exception(e.response)
else:
raise
def activate_ssh(ip_address, silent = False):
# Ensure the URL uses HTTPS
url = ip_address
if not url.startswith("https://"):
# Convert http:// to https:// or add https:// if no protocol specified
if url.startswith("http://"):
url = "https://" + url[7:]
else:
url = "https://" + url
if not url.endswith(":9080"):
url = url + ":9080"
verify_ssl = False
if not verify_ssl:
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
if not silent:
print(f"HTTPS", end=" ", flush=True)
try:
token = authenticate(url)
if not silent:
print(f"", end="", flush=True)
except Exception as e:
if not silent:
print(f"", flush=True)
raise
if not silent:
print(f"SSH", end=" ", flush=True)
try:
set_ssh_status(url, token)
if not silent:
print(f"", end="\n", flush=True)
except Exception as e:
if not silent:
print(f"", flush=True)
raise