port-dependant (nat) batch for cube inventory
This commit is contained in:
118
Python/cube_activate_ssh_port.py
Normal file
118
Python/cube_activate_ssh_port.py
Normal file
@@ -0,0 +1,118 @@
|
||||
import requests
|
||||
import json
|
||||
import os
|
||||
import urllib3
|
||||
from dotenv import load_dotenv
|
||||
import time
|
||||
import sys
|
||||
|
||||
def resource_path(relative_path):
|
||||
""" Get absolute path to resource, works for dev and for PyInstaller """
|
||||
try:
|
||||
# PyInstaller creates a temp folder and stores path in _MEIPASS
|
||||
base_path = sys._MEIPASS
|
||||
except Exception:
|
||||
base_path = os.path.abspath(".")
|
||||
|
||||
return os.path.join(base_path, relative_path)
|
||||
|
||||
dotenv_path = resource_path('.env')
|
||||
load_dotenv(dotenv_path=dotenv_path)
|
||||
|
||||
def authenticate(base_url):
|
||||
"""
|
||||
Authenticate with the CUBE API using username, password and certificate.
|
||||
Returns the JWT token if successful.
|
||||
"""
|
||||
auth_url = f"{base_url}/api/auth"
|
||||
|
||||
ENV_WEB = {
|
||||
"DEFAULT_CUBE_WEB_ADMIN_USER": os.getenv("DEFAULT_CUBE_WEB_ADMIN_USER"),
|
||||
"DEFAULT_CUBE_WEB_ADMIN_PASSWORD": os.getenv("DEFAULT_CUBE_WEB_ADMIN_PASSWORD"),
|
||||
"DEFAULT_CERTIFICATE": os.getenv("DEFAULT_CERTIFICATE")
|
||||
}
|
||||
|
||||
username = ENV_WEB["DEFAULT_CUBE_WEB_ADMIN_USER"]
|
||||
password = ENV_WEB["DEFAULT_CUBE_WEB_ADMIN_PASSWORD"]
|
||||
certificate = ENV_WEB["DEFAULT_CERTIFICATE"].encode("utf-8")
|
||||
|
||||
auth_params = {
|
||||
"login": username,
|
||||
"password": password
|
||||
}
|
||||
files = {
|
||||
"params": (None, json.dumps(auth_params), "application/json"),
|
||||
"certificate": ("certificate.pem", certificate, "application/octet-stream")
|
||||
}
|
||||
try:
|
||||
response = requests.post(auth_url, files=files, verify=False, timeout=10)
|
||||
response.raise_for_status() # Raise exception for 4XX/5XX responses
|
||||
|
||||
# Extract token from response
|
||||
auth_data = response.json()
|
||||
token = auth_data.get("token")
|
||||
|
||||
if not token:
|
||||
raise requests.exceptions.RequestException
|
||||
|
||||
print("HTTPS ✅", end = " ", flush=True)
|
||||
return token
|
||||
|
||||
except requests.exceptions.RequestException as e:
|
||||
print(f"HTTPS ❌", flush=True)
|
||||
if hasattr(e, 'response') and e.response:
|
||||
raise Exception(e.response)
|
||||
else:
|
||||
raise
|
||||
|
||||
def set_ssh_status(base_url, token):
|
||||
"""
|
||||
Set SSH status (enable) using the provided JWT token.
|
||||
"""
|
||||
ssh_url = f"{base_url}/api/ssh"
|
||||
|
||||
headers = {
|
||||
"Content-Type": "application/json",
|
||||
"Authorization": f"Bearer {token}"
|
||||
}
|
||||
|
||||
# Set new SSH status
|
||||
payload = { "currentStatus": True }
|
||||
|
||||
try:
|
||||
response = requests.post(ssh_url, headers=headers, json=payload, verify=False, timeout=10)
|
||||
response.raise_for_status()
|
||||
|
||||
print(f"SSH ✅", end = " ", flush=True)
|
||||
|
||||
return True
|
||||
|
||||
except requests.exceptions.RequestException as e:
|
||||
print("SSH ❌", flush=True)
|
||||
if hasattr(e, 'response') and e.response:
|
||||
raise Exception(e.response)
|
||||
else:
|
||||
raise
|
||||
|
||||
def activate_ssh(ip_address, port):
|
||||
|
||||
# Ensure the URL uses HTTPS
|
||||
url = ip_address
|
||||
if not url.startswith("https://"):
|
||||
# Convert http:// to https:// or add https:// if no protocol specified
|
||||
if url.startswith("http://"):
|
||||
url = "https://" + url[7:]
|
||||
else:
|
||||
url = "https://" + url
|
||||
if not url.endswith(f":{port}"):
|
||||
url = url + f":{port}"
|
||||
|
||||
verify_ssl = False
|
||||
if not verify_ssl:
|
||||
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
||||
|
||||
token = authenticate(url)
|
||||
if not token:
|
||||
return
|
||||
time.sleep(3)
|
||||
set_ssh_status(url, token)
|
||||
Reference in New Issue
Block a user