[WiP] Added Cloud configuration check for proxy and setting up (if required)

This commit is contained in:
Quentin WEPHRE
2025-10-08 17:51:27 +02:00
parent 46a3c444dd
commit 49ce327722
6 changed files with 369 additions and 26 deletions

View File

@@ -55,11 +55,9 @@ def authenticate(base_url):
if not token: if not token:
raise requests.exceptions.RequestException raise requests.exceptions.RequestException
print("HTTPS ✅", end = " ", flush=True)
return token return token
except requests.exceptions.RequestException as e: except requests.exceptions.RequestException as e:
print(f"HTTPS ❌", flush=True)
if hasattr(e, 'response') and e.response: if hasattr(e, 'response') and e.response:
raise Exception(e.response) raise Exception(e.response)
else: else:
@@ -83,12 +81,7 @@ def set_ssh_status(base_url, token):
response = requests.post(ssh_url, headers=headers, json=payload, verify=False, timeout=10) response = requests.post(ssh_url, headers=headers, json=payload, verify=False, timeout=10)
response.raise_for_status() response.raise_for_status()
print(f"SSH ✅", end = " ", flush=True)
return True
except requests.exceptions.RequestException as e: except requests.exceptions.RequestException as e:
print("SSH ❌", flush=True)
if hasattr(e, 'response') and e.response: if hasattr(e, 'response') and e.response:
raise Exception(e.response) raise Exception(e.response)
else: else:
@@ -111,8 +104,17 @@ def activate_ssh(ip_address):
if not verify_ssl: if not verify_ssl:
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
token = authenticate(url) print(f"HTTPS", end=" ", flush=True)
if not token: try:
return token = authenticate(url)
time.sleep(3) print(f"", end="", flush=True)
set_ssh_status(url, token) except Exception as e:
print(f"", flush=True)
raise
print(f"SSH", end=" ", flush=True)
try:
set_ssh_status(url, token)
print(f"", end="\n", flush=True)
except Exception as e:
print(f"", flush=True)
raise

View File

@@ -38,18 +38,12 @@ def execute_ssh_command(ip, command, client):
stdin, stdout, stderr = client.exec_command(command, timeout=180) stdin, stdout, stderr = client.exec_command(command, timeout=180)
exit_status = stdout.channel.recv_exit_status() exit_status = stdout.channel.recv_exit_status()
result = stdout.read().decode().lower().strip() stdout_line = [line for line in stdout]
error = stderr.read().decode('utf-8') for output in stdout_line:
print(output.strip())
if exit_status == 0:
print(f"")
else:
print(f"")
print(f"{error}")
raise Exception(f"{str(error)}")
return result
except Exception as e: except Exception as e:
print(f"SSH error: {str(e)} --- {str(error)}", flush=True) print(f"SSH error: {str(e)}", flush=True)
raise raise
finally: finally:
client.close() client.close()
@@ -147,8 +141,8 @@ def main():
continue continue
try: try:
result = execute_ssh_command(ip_address, "cat /etc/cube", client) #result = execute_ssh_command(ip_address, "cat /etc/cube/config-azure.properties", client)
print(f"{result}", flush=True) print(f"coucou", flush=True)
except Exception as e: except Exception as e:
print(f"Error getting Cloud settings for {cube_id}.", flush=True) print(f"Error getting Cloud settings for {cube_id}.", flush=True)
print(f"{e}", flush=True) print(f"{e}", flush=True)

View File

@@ -7,3 +7,4 @@ pandas
openpyxl openpyxl
scp scp
textual textual
fabric

346
Python/ssh_fabric_batch.py Normal file
View File

@@ -0,0 +1,346 @@
import io
from fabric import Connection
from getpass import getpass
from dotenv import load_dotenv
import io # Make sure io is imported at the top of your script
import os
from cube_activate_ssh import activate_ssh
def execute_command(c, command):
"""Executes a simple command on the remote device."""
# print(f"--- [{c.host}] Executing: {command} ---")
try:
result = c.run(command, hide=True)
return result.stdout
# print("STDOUT:")
# print(result.stdout)
# print("STDERR:")
# print(result.stderr)
except Exception as e:
print(f"Error executing {command}: {e}")
def execute_sudo_command(c, command, sudo_pass):
"""Executes a command with sudo on the remote device."""
# print(f"\n--- [{c.host}] Executing with sudo: {command} ---")
try:
result = c.sudo(command, password=sudo_pass, pty=True, hide=True)
return result.stdout
# print("STDOUT:")
# print(result.stdout)
# print("STDERR:")
# print(result.stderr)
except Exception as e:
print(f"Error executing {command} as administrator: {e}")
def read_remote_config_sudo(c, remote_path, sudo_pass):
"""
Reads a remote file with sudo and returns its content as a string.
"""
# print(f"\n--- [{c.host}] Reading remote file with sudo: {remote_path} ---")
try:
# Use sudo to cat the file and capture its output
result = c.sudo(f"cat {remote_path}", password=sudo_pass, hide=True)
return result.stdout
except Exception as e:
print(f"Error reading remote file with sudo: {e}")
return None
import shlex # Make sure to import shlex at the top of your script
def write_remote_config_sudo(c, remote_path, content, sudo_pass, user_owner, group_owner, permissions):
"""
Writes content directly to a remote file using the 'sudo sh -c' pattern.
This is the most robust and secure method for environments where SFTP is
disabled. It avoids input stream conflicts and prevents shell injection.
Args:
c (fabric.Connection): The active connection object.
remote_path (str): The absolute path to the file on the remote host.
content (str): The string content to be written to the file.
sudo_pass (str): The sudo password for the write operation.
"""
print(f"\n--- [{c.host}] Writing directly using 'sudo sh -c' method: {remote_path} ---")
try:
# Step 1: Securely escape the content for safe shell execution.
# shlex.quote() wraps the string in single quotes and handles any
# internal single quotes, making it safe to pass to a shell.
safe_content = shlex.quote(content)
# Step 2: Construct the command.
# 'sudo sh -c "..."': This runs a new shell ('sh') with root privileges.
# 'echo ... > file': This command is executed *by the root shell*.
# The redirection '>' is therefore handled by root, which has permission
# to write to the protected 'remote_path'.
command = f"sh -c \"echo {safe_content} > {remote_path}\""
print("Step 1: Writing content via root shell...")
# We run this entire command string using c.sudo(). Fabric will handle
# the password prompt correctly for the 'sudo sh' part.
c.sudo(command, password=sudo_pass, hide=True)
# Step 3: Set ownership and permissions as a separate step.
print("Step 2: Setting ownership and permissions...")
c.sudo(f"chown {user_owner}:{group_owner} {remote_path}", password=sudo_pass)
c.sudo(f"chmod {permissions} {remote_path}", password=sudo_pass)
print(f"✅ Successfully wrote content to {remote_path}")
except Exception as e:
print(f"❌ Error writing directly to remote file: {e}")
# Re-raise the exception so the calling function can handle it.
raise
def set_config_field(config_content, option, new_value, add_if_missing=False):
"""
Replaces a configuration value, or optionally adds it if it's missing.
This function is idempotent: running it multiple times with the same
parameters will result in the same configuration state.
Args:
config_content (str): The multi-line string of the configuration file.
option (str): The configuration key to find (e.g., "PermitRootLogin").
new_value (str): The new value to set for the option.
add_if_missing (bool): If True, adds the 'option=new_value' to the end
of the content if it's not found. Defaults to False.
Returns:
str: The modified configuration content.
"""
lines = config_content.splitlines()
new_lines = []
found = False
# First, iterate through the existing lines to find and replace the option.
for line in lines:
# Check if the line is a non-commented definition of our option.
if line.strip().startswith(option + '=') and not line.strip().startswith('#'):
new_lines.append(f"{option}={new_value}")
found = True
else:
# Keep the original line if it's not a match.
new_lines.append(line)
# After checking all lines, if the option was not found AND we are
# instructed to add it, we append it to the end of the configuration.
if not found and add_if_missing:
print(f"Info: Option '{option}' not found. Appending it to the configuration.")
# Ensure the new line is actually on a new line.
if new_lines and new_lines[-1] != '':
new_lines.append('') # Add a blank line for separation if needed
new_lines.append(f"{option}={new_value}")
elif not found:
# This is the original behavior: do nothing if missing and not told to add.
print(f"Warning: Option '{option}' not found. No changes made.")
return "\n".join(new_lines)
def find_config_value(config_content, option):
"""
Finds a specific option in a configuration string and returns its value.
Args:
config_content (str): The multi-line string of the configuration file.
option (str): The configuration key to find (e.g., "PermitRootLogin").
Returns:
str: The value of the option if found.
None: If the option is not found or is commented out.
"""
try:
# Go through each line in the configuration
for line in config_content.splitlines():
# Clean up the line by removing leading/trailing whitespace
clean_line = line.strip()
# Check if the line is not a comment and starts with our option
# The '=' is important to avoid matching partial keys (e.g., 'Port' matching 'Ports')
if not clean_line.startswith('#') and clean_line.startswith(option + '='):
# Split the line only on the first equals sign
# This handles cases where the value itself might contain an '='
parts = clean_line.split('=', 1)
# The value is the second part, stripped of any whitespace
value = parts[1].strip()
return value
except Exception as e:
# In case of any unexpected error (e.g., malformed content), return None
print(f"An error occurred while parsing config: {e}")
return None
# If the loop finishes without finding the option, return None
return None
def cloud_configuration_check(hostname, result, iot_hub, proxy_host, proxy_port):
print(f"\tLight telemetry:", end=" ", flush=True)
status = find_config_value(result, "light-telemetry")
if status == "false":
print(f"", end="\n", flush=True)
else:
print(f"")
print(f"\tTelemetry:", end=" ", flush=True)
status = find_config_value(result, "telemetry-on")
if status == "true":
print(f"", end="\n", flush=True)
else:
print(f"")
print(f"\tCompression:", end=" ", flush=True)
status = find_config_value(result, "compression-enabled")
if status == "true":
print(f"", end="\n", flush=True)
else:
print(f"", end="\n", flush=True)
print(f"\tRemote update:", end=" ", flush=True)
status = find_config_value(result, "remote-update-on")
if status == "true":
print(f"", end="\n", flush=True)
else:
print(f"", end="\n", flush=True)
print(f"\tConnection string:", end="\n", flush=True)
status = find_config_value(result, "connection-string")
parsed_data = parse_connection_string(status)
print(f"\t\tIoT Hub:", end=" ", flush=True)
print(f"{parsed_data.get('HostName')}", end=" ", flush=True)
if parsed_data.get('HostName').strip() == iot_hub.strip():
print(f"", end="\n", flush=True)
else:
print(f"", end="\n", flush=True)
print(f"\t\tDevice ID:", end=" ", flush=True)
print(f"{parsed_data.get('DeviceId')}", end=" ", flush=True)
if parsed_data.get('DeviceId').strip() == hostname.strip():
print(f"", end="\n", flush=True)
else:
print(f"", end="\n", flush=True)
print(f"\tProxy:", end="\n", flush=True)
print(f"\t\tHost:", end=" ", flush=True)
status = find_config_value(result, "proxy-host")
if status == None:
print(f"NA ❌", end="\n", flush=True)
elif status == proxy_host:
print(f"", end="\n", flush=True)
else:
print(f"", end="\n", flush=True)
print(f"\t\tPort:", end=" ", flush=True)
status = find_config_value(result, "proxy-port")
if status == None:
print(f"NA ❌", end="\n", flush=True)
elif status == proxy_port:
print(f"", end="\n", flush=True)
else:
print(f"", end="\n", flush=True)
def parse_connection_string(connection_string):
"""
Parses a semicolon-separated connection string into a dictionary,
handling keys that have an escaped equals sign (e.g., 'Key\\=Value').
"""
parsed_data = {}
# 1. Split the entire string by the semicolon to get each pair
parts = connection_string.split(';')
for part in parts:
# 2. IMPORTANT: Replace the escaped separator '\\=' with a plain '='.
# This is the only replacement needed and uses the correct syntax.
cleaned_part = part.replace('\\=', '=')
# 3. Now, split the cleaned part by the first equals sign
if '=' in cleaned_part:
key, value = cleaned_part.split('=', 1)
# Add the key-value pair to our dictionary, stripping any extra whitespace
parsed_data[key.strip()] = value.strip()
return parsed_data
def main():
"""Main function to parse arguments and orchestrate tasks."""
ip_address_prefix = "10.81.35." # Carling subnet
ip_address_range = [] # list(range(65, 75)) # From 65 to 74
ip_address_range.append(66) #(85) # Add 85 after 74.
hosts = [f"{ip_address_prefix}{suffix}" for suffix in ip_address_range]
ssh_port = 11022
ssh_user = os.getenv("DEFAULT_CUBE_LINUX_ADMIN_USER")
ssh_password = os.getenv("DEFAULT_CUBE_LINUX_ADMIN_PASSWORD")
connect_args = {}
connect_args["password"] = ssh_password
connect_args["banner_timeout"] = 3
connect_args["auth_timeout"] = 60
connect_args["channel_timeout"] = 60
for host in hosts:
print(f"{host}", end=" - ", flush=True)
hostname = ""
result = ""
try:
activate_ssh(host)
except Exception as e:
print(f"Exception: {e}")
continue
with Connection(host=host, user=ssh_user, port=ssh_port, connect_timeout=60, connect_kwargs=connect_args) as c:
try:
print(f"Hostname:", end=" ", flush=True)
result = execute_command(c, "hostname")
print(f"{result.strip()}", end="\n", flush=True)
hostname = str.lower(result)
except Exception as e:
print(f"[Hostname] Exception: {e}")
continue
try:
print(f"Checking Cloud configuration:", end=" ", flush=True)
result = read_remote_config_sudo(c, "/etc/cube/config-azure.properties", ssh_password)
print(f"", end="\n", flush=True)
except:
print(f"", end="\n", flush=True)
print(f"[Cloud configuration check] Exception: {e}")
continue
cloud_configuration_check(hostname, result, "iot-ingest-ess-prod.azure-devices.net", "10.81.35.126", "8080")
print(f"Setting proxy configuration:", end="\n", flush=True)
result_proxy_host = set_config_field(result, "proxy-host", "10.81.35.126", True)
result_proxy_host_port = set_config_field(result_proxy_host, "proxy-port", "8080", True)
result = result_proxy_host_port
cloud_configuration_check(hostname, result, "iot-ingest-ess-prod.azure-devices.net", "10.81.35.126", "8080")
response = input(f"Apply the change on {hostname.strip()}? (y)es to apply, anything else to cancel - ").lower()
if response in ['y']:
print(f"Applying changes:", end=" ", flush=True)
try:
write_remote_config_sudo(c, "/etc/cube/config-azure.properties", result, ssh_password, "cube", "root", "644")
print(f"", end="\n", flush=True)
except Exception as e:
print(f"", end="\n", flush=True)
print(f"[Proxy configuration] Exception: {e}")
continue
print(f"Checking Cloud configuration:", end=" ", flush=True)
try:
result = read_remote_config_sudo(c, "/etc/cube/config-azure.properties", ssh_password)
print(f"", end="\n", flush=True)
except:
print(f"", end="\n", flush=True)
print(f"[Proxy verification] Exception: {e}")
continue
cloud_configuration_check(hostname, result, "iot-ingest-ess-prod.azure-devices.net", "10.81.35.126", "8080")
else:
print(f"Not applying configuration...")
continue
if __name__ == "__main__":
main()