import requests import json import argparse import sys import os import urllib3 from dotenv import load_dotenv import io load_dotenv(override=True) def authenticate(base_url, username, password, certificate_path, verify_ssl=True): """ Authenticate with the CUBE API using username, password and certificate. Returns the JWT token if successful. """ auth_url = f"{base_url}/api/auth" # Verify certificate file exists # if not os.path.isfile(certificate_path): # print(f"Error: Certificate file not found at: {certificate_path}") # sys.exit(1) # print(os.getenv("DEFAULT_CERTIFICATE").encode("utf-8")) # Prepare the multipart form data auth_params = { "login": username, "password": password } files = { "params": (None, json.dumps(auth_params), "application/json"), "certificate": ("certificate.pem", os.getenv("DEFAULT_CERTIFICATE").encode("utf-8"), "application/octet-stream") } # print(files) try: print(f"Authenticating as {username}...") response = requests.post(auth_url, files=files, verify=verify_ssl) response.raise_for_status() # Raise exception for 4XX/5XX responses # Extract token from response auth_data = response.json() token = auth_data.get("token") if not token: print("Error: No token received in authentication response") sys.exit(1) print("Authentication successful.") return token except requests.exceptions.RequestException as e: print(f"Authentication failed: {e}") if hasattr(e, 'response') and e.response: print(f"Response: {e.response.text}") sys.exit(1) def set_ssh_status(base_url, token, verify_ssl=True): """ Set SSH status (enable) using the provided JWT token. """ ssh_url = f"{base_url}/api/ssh" headers = { "Content-Type": "application/json", "Authorization": f"Bearer {token}" } # Set new SSH status payload = { "currentStatus": True } try: print(f"Sending request to enable SSH...") response = requests.post(ssh_url, headers=headers, json=payload, verify=verify_ssl) response.raise_for_status() print(f"SSH enabled successfully!") return True except requests.exceptions.RequestException as e: print(f"SSH activation failed: {e}") if hasattr(e, 'response') and e.response: print(f"Response: {e.response.text}") return False def main(): parser = argparse.ArgumentParser(description="Manage SSH on CUBE application") parser.add_argument("--url", help="Base URL of the CUBE API (e.g., https://cube-04fe12:9080)", default="https://cube-04fe12:9080") parser.add_argument("--username", help="Admin username with ROLE_SAFT_ADMIN permissions", default=os.getenv("DEFAULT_CUBE_WEB_ADMIN_USER")) parser.add_argument("--password", help="Admin password", default=os.getenv("DEFAULT_CUBE_WEB_ADMIN_PASSWORD")) parser.add_argument("--certificate", help="Path to mission certificate file", default=os.getenv("DEFAULT_CERTIFICATE")) args = parser.parse_args() # Ensure the URL uses HTTPS url = args.url if not url.startswith("https://"): # Convert http:// to https:// or add https:// if no protocol specified if url.startswith("http://"): url = "https://" + url[7:] print(f"Converting to HTTPS: {url}") else: url = "https://" + url print(f"Adding HTTPS protocol: {url}") verify_ssl = False if not verify_ssl: urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) token = authenticate(url, args.username, args.password, args.certificate, verify_ssl) if not token: return set_ssh_status(url, token, verify_ssl) if __name__ == "__main__": main()