diff --git a/Python/calc_three_phase.py b/Python/Archive/calc_three_phase.py similarity index 100% rename from Python/calc_three_phase.py rename to Python/Archive/calc_three_phase.py diff --git a/Python/check_folders.sh b/Python/Archive/check_folders.sh similarity index 100% rename from Python/check_folders.sh rename to Python/Archive/check_folders.sh diff --git a/Python/azure_iot_hub_manage_iot_edge_devices.py b/Python/azure_iot_hub_manage_iot_edge_devices.py new file mode 100644 index 0000000..0e414ce --- /dev/null +++ b/Python/azure_iot_hub_manage_iot_edge_devices.py @@ -0,0 +1,210 @@ +from azure.iot.hub import IoTHubRegistryManager +from azure.iot.hub.protocol.models import QuerySpecification, Module +from azure.iot.hub.models import CloudToDeviceMethod, CloudToDeviceMethodResult +from dotenv import load_dotenv +from isight_device import iSightDevice + +from textual.app import App, ComposeResult +from textual.widgets import Header, Footer, ListView, ListItem, Label, Input, Button, Static, DataTable +from textual.containers import Vertical, VerticalScroll +from textual.binding import Binding +from textual.screen import Screen +from textual import on + +from datetime import datetime + +import json +import os + +load_dotenv() + +class ObjectListItem(ListItem): + """A widget to display an iSightDevice object in the ListView.""" + + def __init__(self, device: iSightDevice) -> None: + super().__init__() + self.device = device + + def compose(self) -> ComposeResult: + """Create the displayable content of the list item.""" + # This line is customized to display the attributes of your iSightDevice object. + # It shows site, number, device_id, and version. + yield Label(f"{self.device.getDeviceId()} | {self.device.getSite()} | {self.device.getNumber()} ({self.device.getVersion()})") + +class ModuleScreen(Screen): + + BINDINGS = [Binding("escape", "app.pop_screen", "Go Back")] + + def __init__(self, device: iSightDevice, device_modules: list[Module]): + super().__init__() + self.device = device + self.device_modules = device_modules + + def compose(self) -> ComposeResult: + yield Header(f"IoT Edge modules of {self.device.getDeviceId}") + yield DataTable() + yield Footer() + + def on_mount(self) -> None: + table = self.query_one(DataTable) + table.add_columns("Module", "Status", "Since", "Last activity") + for module in self.device_modules: + try: + local_connection_time = datetime.fromisoformat(str(module.connection_state_updated_time)).astimezone().replace(microsecond=0).isoformat() + except: + local_connection_time = datetime.min.isoformat() + try: + local_activity_time = datetime.fromisoformat(str(module.last_activity_time)).astimezone().replace(microsecond=0).isoformat() + except: + local_activity_time = datetime.min.isoformat() + try: + module_id = module.module_id + except: + module_id = "NA" + try: + connection_state = module.connection_state + except: + connection_state = "NA" + table.add_row(f"{module_id}", f"{connection_state}", f"{local_connection_time}", f"{local_activity_time}") + + +## NEW: The Screen for showing device details and actions +class DetailScreen(Screen): + """A screen to display details and actions for a single device.""" + + BINDINGS = [ + Binding("escape", "app.pop_screen", "Go Back"), + ] + + def __init__(self, device: iSightDevice, registry_manager: IoTHubRegistryManager) -> None: + super().__init__() + self.device = device + self.registry_manager = registry_manager + + def compose(self) -> ComposeResult: + yield Header(name=f"Details for {self.device.getDeviceId()}") + + # Use VerticalScroll so the layout works on small terminals + with VerticalScroll(id="details-container"): + # Static, non-interactive information section + yield Static(f"[b]Site:[/b] {self.device.getSite()}", markup=True) + yield Static(f"[b]Number:[/b] {self.device.getNumber()}", markup=True) + yield Static(f"[b]Version:[/b] {self.device.getVersion()}", markup=True) + yield Static(f"[b]Device ID:[/b] {self.device.getDeviceId()}", markup=True) + + # Interactive action buttons + yield Static("\n[b]Actions:[/b]", markup=True) + yield Button("Check IoT Edge modules", variant="primary", id="modules") + + yield Footer() + + ## NEW: Handlers for button presses + @on(Button.Pressed, "#modules") + def check_modules(self) -> None: + device_modules = self.registry_manager.get_modules(self.device.getDeviceId()) + device_modules.sort(key=lambda m: m.module_id) + self.app.push_screen(ModuleScreen(self.device, device_modules)) + + # @on(Button.Pressed, "#logs") + # def check_logs(self) -> None: + # # Placeholder for your logging logic + # self.app.notify(f"Fetching logs for {self.device.getDeviceId()}...") + + # @on(Button.Pressed, "#diag") + # def run_diagnostics(self) -> None: + # # Placeholder for your diagnostics logic + # self.app.notify(f"Running diagnostics on {self.device.getDeviceId()}...") + + # @on(Button.Pressed, "#delete") + # def delete_device(self) -> None: + # # Placeholder for delete logic + # self.app.notify(f"Deleting {self.device.getDeviceId()} is not yet implemented.", severity="error") + # # You would likely want a confirmation dialog here in a real app. + + +class DeviceTUI(App): + """An interactive TUI to list and filter iSightDevice objects.""" + + BINDINGS = [ + Binding("q", "quit", "Quit"), + ] + + def __init__(self, devices: list[iSightDevice], registry_manager: IoTHubRegistryManager): + super().__init__() + self.all_devices = devices + self.filtered_devices = self.all_devices[:] + self.registry_manager = registry_manager + + def compose(self) -> ComposeResult: + """Create child widgets for the app.""" + yield Header(name="iSight Device Viewer") + yield Input(placeholder="Filter by site...") + with Vertical(id="list-container"): + yield ListView(*[ObjectListItem(dev) for dev in self.filtered_devices], id="device-list") + yield Footer() + + def on_mount(self) -> None: + """Called when the app is first mounted.""" + self.query_one(Input).focus() + + def on_input_changed(self, event: Input.Changed) -> None: + """Handle changes to the input field and filter the list.""" + filter_text = event.value.lower() + self.filter_devices(filter_text) + + def filter_devices(self, filter_text: str): + """Filter the list of devices based on the site.""" + if filter_text: + self.filtered_devices = [ + dev for dev in self.all_devices if filter_text in dev.getSite().lower() + ] + else: + self.filtered_devices = self.all_devices[:] + + list_view = self.query_one(ListView) + list_view.clear() + for dev in self.filtered_devices: + list_view.append(ObjectListItem(dev)) + + def on_list_view_selected(self, event: ListView.Selected) -> None: + """Handle item selection in the ListView.""" + selected_item = event.item + if isinstance(selected_item, ObjectListItem): + selected_device = selected_item.device + # Instead of a notification, we push the new detail screen + self.push_screen(DetailScreen(device=selected_device, registry_manager=registry_manager)) + +if __name__ == "__main__": + CONNECTION_STRING = str(os.getenv("CONNECTION_STRING_INOX_PROD")) + if CONNECTION_STRING == "": + print("Provide a connection string for the Iot Hub before running the script!") + exit(13) + + print(f"Connecting to IoT Hub ", end="") + try: + registry_manager = IoTHubRegistryManager.from_connection_string(CONNECTION_STRING) + except Exception as e: + print(f"❌") + print(f"Error {e}") + exit + print(f"✅") + + print(f"Getting list of IoT Edge devices ", end="") + try: + query_spec = QuerySpecification(query="SELECT * FROM devices WHERE capabilities.iotEdge = true ") + query_result = registry_manager.query_iot_hub(query_spec) + except Exception as e: + print(f"❌") + print(f"Error {e}") + exit + print(f"✅") + + print(f"Sorting {len(query_result.items)} devices ", end="") + devices = [] + for item in query_result.items: + devices.append(iSightDevice(str(item.device_id), str(item.tags['site']), int(item.tags['number']), str(item.tags['version']))) + devices.sort(key = lambda d: (d.site, d.number)) + print(f"✅") + + app = DeviceTUI(devices=devices, registry_manager=registry_manager) + app.run() \ No newline at end of file diff --git a/Python/cube_activate_ssh.py b/Python/cube_activate_ssh.py index 2319f46..a977ffd 100644 --- a/Python/cube_activate_ssh.py +++ b/Python/cube_activate_ssh.py @@ -54,12 +54,10 @@ def authenticate(base_url): if not token: raise requests.exceptions.RequestException - - print("HTTPS ✅", end = " ", flush=True) + return token except requests.exceptions.RequestException as e: - print(f"HTTPS ❌", flush=True) if hasattr(e, 'response') and e.response: raise Exception(e.response) else: @@ -82,13 +80,8 @@ def set_ssh_status(base_url, token): try: response = requests.post(ssh_url, headers=headers, json=payload, verify=False, timeout=10) response.raise_for_status() - - print(f"SSH ✅", end = " ", flush=True) - return True - except requests.exceptions.RequestException as e: - print("SSH ❌", flush=True) if hasattr(e, 'response') and e.response: raise Exception(e.response) else: @@ -111,8 +104,17 @@ def activate_ssh(ip_address): if not verify_ssl: urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) - token = authenticate(url) - if not token: - return - time.sleep(3) - set_ssh_status(url, token) \ No newline at end of file + print(f"HTTPS", end=" ", flush=True) + try: + token = authenticate(url) + print(f"✅", end="", flush=True) + except Exception as e: + print(f"❌", flush=True) + raise + print(f"SSH", end=" ", flush=True) + try: + set_ssh_status(url, token) + print(f"✅", end="\n", flush=True) + except Exception as e: + print(f"❌", flush=True) + raise \ No newline at end of file diff --git a/Python/cube_ssh_batch.py b/Python/cube_ssh_batch.py index 3691af0..dfe5348 100644 --- a/Python/cube_ssh_batch.py +++ b/Python/cube_ssh_batch.py @@ -10,11 +10,11 @@ from azure.iot.hub.models import Twin, TwinProperties load_dotenv(override=True) -ip_address_prefix = "10.188.11." +ip_address_prefix = "10.81.60." ssh_command = "hostname" -csv_filename = "hoohana7.csv" -SITE_NAME = "HOOHANA" +csv_filename = "Grandpuits_01.csv" +SITE_NAME = "Grandpuits" ssh_username = os.getenv("DEFAULT_CUBE_LINUX_ADMIN_USER") ssh_password = os.getenv("DEFAULT_CUBE_LINUX_ADMIN_PASSWORD") @@ -82,7 +82,7 @@ def main(): results = [] - numbers = [16, 18, 34, 35] + numbers = [193, 194, 195, 196, 197, 198, 199, 200, 201, 202, 203, 204, 205, 206, 207, 208, 209, 210, 211, 212, 213, 214] for i in numbers: ip_address = f"{ip_address_prefix}{i}" print(f"Activating SSH for {ip_address}:", end=" ") diff --git a/Python/cube_ssh_batch_passive.py b/Python/cube_ssh_batch_passive.py index 3e5a7a6..b70bdc8 100644 --- a/Python/cube_ssh_batch_passive.py +++ b/Python/cube_ssh_batch_passive.py @@ -48,10 +48,10 @@ def update_cloud_config(ip, new_content): def main(): - ip_address_prefix = "10.84.171." - start_ip = 1 - end_ip = 5 - SITE_NAME = "MYRTLE" + ip_address_prefix = "10.81.60." + start_ip = 193 + end_ip = 214 + SITE_NAME = "Grandpuits" print(f"Site: {SITE_NAME}") print(f"From {ip_address_prefix}{str(start_ip)} to {ip_address_prefix}{str(end_ip)}") diff --git a/Python/cube_ssh_set_proxy.py b/Python/cube_ssh_set_proxy.py new file mode 100644 index 0000000..c9f94c0 --- /dev/null +++ b/Python/cube_ssh_set_proxy.py @@ -0,0 +1,153 @@ +from paramiko import SSHClient, AutoAddPolicy +import paramiko +from cube_activate_ssh import activate_ssh +from dotenv import load_dotenv +import os +import sys +import shlex +from scp import SCPClient +import time + +def resource_path(relative_path): + """ Get absolute path to resource, works for dev and for PyInstaller """ + try: + # PyInstaller creates a temp folder and stores path in _MEIPASS + base_path = sys._MEIPASS + except Exception: + base_path = os.path.abspath(".") + + return os.path.join(base_path, relative_path) + +dotenv_path = resource_path('.env') +load_dotenv(dotenv_path=dotenv_path) + +ip_address_prefix = "10.81.35." # Carling subnet +ip_address_range = list(range(65, 75)) # From 65 to 74 +ip_address_range.append(85) # Add 85 after 74. + +ENV_SSH = { + "DEFAULT_CUBE_LINUX_ADMIN_USER": os.getenv("DEFAULT_CUBE_LINUX_ADMIN_USER"), + "DEFAULT_CUBE_LINUX_ADMIN_PASSWORD": os.getenv("DEFAULT_CUBE_LINUX_ADMIN_PASSWORD") +} + +ssh_username = ENV_SSH["DEFAULT_CUBE_LINUX_ADMIN_USER"] +ssh_password = ENV_SSH["DEFAULT_CUBE_LINUX_ADMIN_PASSWORD"] + +def execute_ssh_command(ip, command, client): + try: + stdin, stdout, stderr = client.exec_command(command, timeout=180) + exit_status = stdout.channel.recv_exit_status() + + stdout_line = [line for line in stdout] + for output in stdout_line: + print(output.strip()) + + except Exception as e: + print(f"SSH error: {str(e)}", flush=True) + raise + finally: + client.close() + +def execute_sudo_ssh_command(ip, command, client): + try: + quoted_command = f"bash -c {shlex.quote(command)}" + sudo_command = f"sudo -S -p '' {quoted_command}" + stdin, stdout, stderr = client.exec_command(sudo_command, timeout=180) + time.sleep(3) + stdin.write(ssh_password + '\n') + stdin.flush() + exit_status = stdout.channel.recv_exit_status() + + output = stdout.read().decode('utf-8') + error = stderr.read().decode('utf-8') + if exit_status == 0: + + print(f"{output}") + else: + print(f"❌") + print(f"{error}") + raise Exception("Error during SSH sudo command.") + + result = stdout.read().decode().lower().strip() + return result + except Exception as e: + raise + finally: + client.close() + +def scp_get_file(ip, remote_path, local_path): + client = SSHClient() + client.set_missing_host_key_policy(AutoAddPolicy()) + + local_path = os.path.expanduser(local_path) + local_path = os.path.abspath(local_path) + + local_dir = os.path.dirname(local_path) + if local_dir: + os.makedirs(local_dir, exist_ok=True) + + + try: + client.connect( + ip, + port=11022, + username=ssh_username, + password=ssh_password, + allow_agent=False, + look_for_keys=False, + timeout=180 + ) + + client.get_transport().set_keepalive(5) + with SCPClient(client.get_transport()) as scp: + scp.get(remote_path, local_path) + + except Exception as e: + raise + finally: + client.close() + +def main(): + + print(f"Starting...\n", flush=True) + + for i in ip_address_range: + ip_address = f"{ip_address_prefix}{i}" + print(f"[{time.ctime(time.time())}] {str(i)} ({ip_address})", end=" ", flush=True) + + try: + activate_ssh(ip_address) + except Exception as e: + print(f"SSH activation failed for {ip_address}:", flush=True) + print(f"{e}", flush=True) + print(f"Skipping CUBE...", flush=True) + continue + + cube_id = "NA" + + client = paramiko.SSHClient() + client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + client.connect(ip_address, port=11022, username=ssh_username, password=ssh_password, allow_agent=False, look_for_keys=False, timeout=180, banner_timeout=180) + client.get_transport().set_keepalive(5) + + try: + cube_id = execute_ssh_command(ip_address, "hostname", client) + print(f"{cube_id} ✅", flush=True) + except Exception as e: + print(f"cube-xxxxx ❌", flush=True) + print(f"Error getting hostname for {ip_address}:", flush=True) + print(f"{e}", flush=True) + print(f"Skipping CUBE...", flush=True) + continue + + try: + #result = execute_ssh_command(ip_address, "cat /etc/cube/config-azure.properties", client) + print(f"coucou", flush=True) + except Exception as e: + print(f"Error getting Cloud settings for {cube_id}.", flush=True) + print(f"{e}", flush=True) + print(f"Skipping CUBE...", flush=True) + continue + +if __name__ == "__main__": + main() diff --git a/Python/isight_device.py b/Python/isight_device.py index 5f4414a..143048b 100644 --- a/Python/isight_device.py +++ b/Python/isight_device.py @@ -31,6 +31,9 @@ class iSightDevice: def getNumber(self): return f"{self.number}" + def getVersion(self): + return f"{self.cloudVersion}" + class CubeDevice: def __init__(self, deviceId: str): if not isinstance(deviceId, str): diff --git a/Python/requirements.txt b/Python/requirements.txt index e376340..82c65b0 100644 --- a/Python/requirements.txt +++ b/Python/requirements.txt @@ -5,4 +5,9 @@ paramiko requests pandas openpyxl -scp \ No newline at end of file +scp +textual +fabric +ruamel.yaml +netmiko +pexpect \ No newline at end of file diff --git a/Python/ssh_fabric_batch.py b/Python/ssh_fabric_batch.py new file mode 100644 index 0000000..f823308 --- /dev/null +++ b/Python/ssh_fabric_batch.py @@ -0,0 +1,651 @@ +import io +from fabric import Connection +from getpass import getpass +from dotenv import load_dotenv +import io +import os +from cube_activate_ssh import activate_ssh +from ruamel.yaml import YAML +from ruamel.yaml.scalarstring import DoubleQuotedScalarString +import shlex +import base64 + +load_dotenv(override=True) + +def execute_command(c, command): + """Executes a simple command on the remote device.""" + try: + result = c.run(command, hide=True) + return result.stdout + except Exception as e: + print(f"Error executing {command}: {e}") + +def execute_sudo_command(c, command, sudo_pass): + """Executes a command with sudo on the remote device.""" + # print(f"\n--- [{c.host}] Executing with sudo: {command} ---") + try: + result = c.sudo(command, password=sudo_pass, pty=True, hide=True) + return result.stdout + # print("STDOUT:") + # print(result.stdout) + # print("STDERR:") + # print(result.stderr) + except Exception as e: + print(f"Error executing {command} as administrator: {e}") + +def read_remote_config_sudo(c, remote_path, sudo_pass): + """ + Reads a remote file with sudo and returns its content as a string. + """ + # print(f"\n--- [{c.host}] Reading remote file with sudo: {remote_path} ---") + try: + # Use sudo to cat the file and capture its output + result = c.sudo(f"cat {remote_path}", password=sudo_pass, hide=True) + return result.stdout + except Exception as e: + print(f"Error reading remote file with sudo: {e}") + return None + + # Make sure to import shlex at the top of your script + +def write_remote_config_sudo(c, remote_path, content, sudo_pass, user_owner, group_owner, permissions): + """ + Writes content directly to a remote file using the 'sudo sh -c' pattern. + + This is the most robust and secure method for environments where SFTP is + disabled. It avoids input stream conflicts and prevents shell injection. + + Args: + c (fabric.Connection): The active connection object. + remote_path (str): The absolute path to the file on the remote host. + content (str): The string content to be written to the file. + sudo_pass (str): The sudo password for the write operation. + """ + print(f"\n--- [{c.host}] Writing directly using 'sudo sh -c' method: {remote_path} ---") + try: + # Step 1: Securely escape the content for safe shell execution. + # shlex.quote() wraps the string in single quotes and handles any + # internal single quotes, making it safe to pass to a shell. + safe_content = shlex.quote(content) + + # Step 2: Construct the command. + # 'sudo sh -c "..."': This runs a new shell ('sh') with root privileges. + # 'echo ... > file': This command is executed *by the root shell*. + # The redirection '>' is therefore handled by root, which has permission + # to write to the protected 'remote_path'. + command = f"sh -c \"echo {safe_content} > {remote_path}\"" + + print("Step 1: Writing content via root shell...") + # We run this entire command string using c.sudo(). Fabric will handle + # the password prompt correctly for the 'sudo sh' part. + c.sudo(command, password=sudo_pass, hide=True) + + # Step 3: Set ownership and permissions as a separate step. + print("Step 2: Setting ownership and permissions...") + c.sudo(f"chown {user_owner}:{group_owner} {remote_path}", password=sudo_pass) + c.sudo(f"chmod {permissions} {remote_path}", password=sudo_pass) + + print(f"✅ Successfully wrote content to {remote_path}") + + except Exception as e: + print(f"❌ Error writing directly to remote file: {e}") + # Re-raise the exception so the calling function can handle it. + raise + +def set_config_field(config_content, option, new_value, add_if_missing=False): + """ + Replaces a configuration value, or optionally adds it if it's missing. + + This function is idempotent: running it multiple times with the same + parameters will result in the same configuration state. + + Args: + config_content (str): The multi-line string of the configuration file. + option (str): The configuration key to find (e.g., "PermitRootLogin"). + new_value (str): The new value to set for the option. + add_if_missing (bool): If True, adds the 'option=new_value' to the end + of the content if it's not found. Defaults to False. + + Returns: + str: The modified configuration content. + """ + lines = config_content.splitlines() + new_lines = [] + found = False + + # First, iterate through the existing lines to find and replace the option. + for line in lines: + # Check if the line is a non-commented definition of our option. + if line.strip().startswith(option + '=') and not line.strip().startswith('#'): + new_lines.append(f"{option}={new_value}") + found = True + else: + # Keep the original line if it's not a match. + new_lines.append(line) + + # After checking all lines, if the option was not found AND we are + # instructed to add it, we append it to the end of the configuration. + if not found and add_if_missing: + print(f"Info: Option '{option}' not found. Appending it to the configuration.") + # Ensure the new line is actually on a new line. + if new_lines and new_lines[-1] != '': + new_lines.append('') # Add a blank line for separation if needed + new_lines.append(f"{option}={new_value}") + elif not found: + # This is the original behavior: do nothing if missing and not told to add. + print(f"Warning: Option '{option}' not found. No changes made.") + + return "\n".join(new_lines) + +def find_config_value(config_content, option): + """ + Finds a specific option in a configuration string and returns its value. + + Args: + config_content (str): The multi-line string of the configuration file. + option (str): The configuration key to find (e.g., "PermitRootLogin"). + + Returns: + str: The value of the option if found. + None: If the option is not found or is commented out. + """ + try: + # Go through each line in the configuration + for line in config_content.splitlines(): + # Clean up the line by removing leading/trailing whitespace + clean_line = line.strip() + + # Check if the line is not a comment and starts with our option + # The '=' is important to avoid matching partial keys (e.g., 'Port' matching 'Ports') + if not clean_line.startswith('#') and clean_line.startswith(option + '='): + + # Split the line only on the first equals sign + # This handles cases where the value itself might contain an '=' + parts = clean_line.split('=', 1) + + # The value is the second part, stripped of any whitespace + value = parts[1].strip() + return value + + except Exception as e: + # In case of any unexpected error (e.g., malformed content), return None + print(f"An error occurred while parsing config: {e}") + return None + + # If the loop finishes without finding the option, return None + return None + +def cloud_configuration_check(hostname, result, iot_hub, proxy_host, proxy_port): + print(f"\tLight telemetry:", end=" ", flush=True) + status = find_config_value(result, "light-telemetry") + if status == "false": + print(f"✅", end="\n", flush=True) + else: + print(f"❌") + + print(f"\tTelemetry:", end=" ", flush=True) + status = find_config_value(result, "telemetry-on") + if status == "true": + print(f"✅", end="\n", flush=True) + else: + print(f"❌") + + + print(f"\tCompression:", end=" ", flush=True) + status = find_config_value(result, "compression-enabled") + if status == "true": + print(f"✅", end="\n", flush=True) + else: + print(f"❌", end="\n", flush=True) + + print(f"\tRemote update:", end=" ", flush=True) + status = find_config_value(result, "remote-update-on") + if status == "true": + print(f"✅", end="\n", flush=True) + else: + print(f"❌", end="\n", flush=True) + + print(f"\tConnection string:", end="\n", flush=True) + status = find_config_value(result, "connection-string") + parsed_data = parse_connection_string(status) + print(f"\t\tIoT Hub:", end=" ", flush=True) + print(f"{parsed_data.get('HostName')}", end=" ", flush=True) + if parsed_data.get('HostName').strip() == iot_hub.strip(): + print(f"✅", end="\n", flush=True) + else: + print(f"❌", end="\n", flush=True) + print(f"\t\tDevice ID:", end=" ", flush=True) + print(f"{parsed_data.get('DeviceId')}", end=" ", flush=True) + if parsed_data.get('DeviceId').strip() == hostname.strip(): + print(f"✅", end="\n", flush=True) + else: + print(f"❌", end="\n", flush=True) + + print(f"\tProxy:", end="\n", flush=True) + print(f"\t\tHost:", end=" ", flush=True) + status = find_config_value(result, "proxy-host") + if status == None: + print(f"NA ❌", end="\n", flush=True) + elif status == proxy_host: + print(f"✅", end="\n", flush=True) + else: + print(f"❌", end="\n", flush=True) + print(f"\t\tPort:", end=" ", flush=True) + status = find_config_value(result, "proxy-port") + if status == None: + print(f"NA ❌", end="\n", flush=True) + elif status == proxy_port: + print(f"✅", end="\n", flush=True) + else: + print(f"❌", end="\n", flush=True) + +def parse_connection_string(connection_string): + """ + Parses a semicolon-separated connection string into a dictionary, + handling keys that have an escaped equals sign (e.g., 'Key\\=Value'). + """ + parsed_data = {} + + # 1. Split the entire string by the semicolon to get each pair + parts = connection_string.split(';') + + for part in parts: + # 2. IMPORTANT: Replace the escaped separator '\\=' with a plain '='. + # This is the only replacement needed and uses the correct syntax. + cleaned_part = part.replace('\\=', '=') + + # 3. Now, split the cleaned part by the first equals sign + if '=' in cleaned_part: + key, value = cleaned_part.split('=', 1) + # Add the key-value pair to our dictionary, stripping any extra whitespace + parsed_data[key.strip()] = value.strip() + + return parsed_data + +def find_yaml_value(yaml_content, key_path): + """ + Finds a value in a YAML string using a dot-separated path. + + Args: + yaml_content (str): The string content of the YAML file. + key_path (str): A dot-separated path to the key (e.g., "cubeProcess.cyber_check"). + + Returns: + The value if found, otherwise None. + """ + try: + yaml = YAML() + data = yaml.load(yaml_content) + + # Traverse the path + keys = key_path.split('.') + current_level = data + for key in keys: + current_level = current_level[key] + + return current_level + except (KeyError, TypeError): + # KeyError if a key is not found, TypeError if trying to index a non-dict + # print(f"Warning: Key path '{key_path}' not found in YAML content.") + return None + +def set_yaml_value(yaml_content, key_path, new_value): + """ + Sets a value in a YAML string using a dot-separated path. + Preserves comments, formatting, and quotes thanks to ruamel.yaml. + This version correctly traverses nested keys. + + Args: + yaml_content (str): The string content of the YAML file. + key_path (str): A dot-separated path to the key (e.g., "cubeProcess.cyber_check"). + new_value: The new value to set. + + Returns: + str: The modified YAML content as a string, or the original content on error. + """ + try: + # --- FIX 1: Configure the YAML object to preserve quotes --- + yaml = YAML() + yaml.preserve_quotes = True + yaml.indent(mapping=2, sequence=4, offset=2) # Optional: ensures consistent indentation + + data = yaml.load(yaml_content) + + # --- FIX 2: Correct traversal logic --- + keys = key_path.split('.') + current_level = data + + # Traverse down to the final key's parent dictionary + for key in keys[:-1]: + current_level = current_level[key] + + final_key = keys[-1] + + # Check if the key exists before setting it + if final_key not in current_level: + print(f"❌ Error: Final key '{final_key}' not found in the structure. Aborting.") + return yaml_content # Return original content + + # Set the new value + current_level[final_key] = new_value + + # Dump the modified data back to a string + string_stream = io.StringIO() + yaml.dump(data, string_stream) + return string_stream.getvalue() + + except (KeyError, TypeError) as e: + print(f"❌ Error: Key path '{key_path}' is invalid or part of the path does not exist. Error: {e}") + return yaml_content # Return original content on failure +def ensure_iptables_port_rule(config_content, target_port, template_port): + """ + Ensures that iptables rules for a target port exist in the configuration. + + If rules for the target port are not found, it finds rules for a + template port and replaces the port number. + + Args: + config_content (str): The multi-line string of the iptables rules file. + target_port (int or str): The port number that should exist (e.g., 8080). + template_port (int or str): The port number to use as a template (e.g., 443). + + Returns: + str: The modified (or original) configuration content. + """ + target_port_str = str(target_port) + template_port_str = str(template_port) + lines = config_content.splitlines() + + target_port_found = False + + # --- PASS 1: Check if the target port rule already exists --- + for line in lines: + # Check for the target port in an active rule line + # The spaces around the port string prevent accidentally matching '8080' in '18080' + if line.strip().startswith('-A') and (f"--dport {target_port_str}" in line or f"--sport {target_port_str}" in line): + print(f"✅ Info: Rule for target port {target_port_str} already exists. No changes needed.") + target_port_found = True + break + + # If the rule was found, return the original content without any changes. + if target_port_found: + return config_content + + # --- PASS 2: If we get here, the rule was not found. We must replace the template. --- + print(f"Info: Rule for target port {target_port_str} not found. Searching for template port {template_port_str} to replace.") + + new_lines = [] + changes_made = False + for line in lines: + # Check for the template port in an active rule line + if line.strip().startswith('-A') and (f"--dport {template_port_str}" in line or f"--sport {template_port_str}" in line): + # This is a line we need to modify + modified_line = line.replace(template_port_str, target_port_str) + new_lines.append(modified_line) + print(f" - Replacing: '{line}'") + print(f" + With: '{modified_line}'") + changes_made = True + else: + # This line doesn't need changing, add it as is. + new_lines.append(line) + + if not changes_made: + print(f"❌ Warning: Target port {target_port_str} was not found, AND template port {template_port_str} was also not found. No changes made.") + return config_content # Return original if template wasn't found either + + return "\n".join(new_lines) + +def write_remote_config_base64_sudo(c, remote_path, content, sudo_pass, user_owner, group_owner, permissions): + """ + Writes content directly to a remote file by passing it as a Base64 string. + + This is the most robust method for no-SFTP environments, as it completely + avoids all shell quoting and parsing issues for complex, multi-line content. + + Args: + c (fabric.Connection): The active connection object. + remote_path (str): The absolute path to the file on the remote host. + content (str): The string content to be written to the file. + sudo_pass (str): The sudo password for the write operation. + """ + print(f"\n--- [{c.host}] Writing content via Base64 to: {remote_path} ---") + try: + # Step 1: Encode the string content into Base64. + # base64.b64encode requires bytes, so we encode the string to utf-8. + # The result is bytes, so we decode it back to a simple ascii string to use in our command. + base64_content = base64.b64encode(content.encode('utf-8')).decode('ascii') + + # Step 2: Construct the command. + # 'echo ... | base64 --decode > file': This pipeline decodes the content + # and redirects the output to the destination file. + # We wrap the entire pipeline in 'sudo sh -c "..."' so that the + # redirection ('>') is performed by a shell running as root. + command = f"sh -c \"echo '{base64_content}' | base64 --decode > {remote_path}\"" + + print("Step 1: Writing Base64 content via root shell...") + c.sudo(command, password=sudo_pass, hide=True) + + # Step 3: Set ownership and permissions. + print("Step 2: Setting ownership and permissions...") + c.sudo(f"chown {user_owner}:{group_owner} {remote_path}", password=sudo_pass) + c.sudo(f"chmod {permissions} {remote_path}", password=sudo_pass) + + print(f"✅ Successfully wrote content to {remote_path}") + + except Exception as e: + print(f"❌ Error writing Base64 content to remote file: {e}") + # Re-raise the exception for the main loop. + raise + + +def main(): + """Main function to parse arguments and orchestrate tasks.""" + ip_address_prefix = "10.81.60." # Grandpuits subnet + ip_address_range = list(range(193, 215)) # From 193 to 214 + # ip_address_range.append(85) # Add 85 after 74. + hosts = [f"{ip_address_prefix}{suffix}" for suffix in ip_address_range] + + ssh_port = 11022 + ssh_user = os.getenv("DEFAULT_CUBE_LINUX_ADMIN_USER") + ssh_password = os.getenv("DEFAULT_CUBE_LINUX_ADMIN_PASSWORD") + + connect_args = {} + connect_args["password"] = ssh_password + connect_args["banner_timeout"] = 3 + connect_args["auth_timeout"] = 60 + connect_args["channel_timeout"] = 60 + connect_args["look_for_keys"] = False + connect_args["allow_agent"] = False + + + for host in hosts: + print(f"{host}", end=" - ", flush=True) + + hostname = "" + result = "" + + try: + activate_ssh(host) + except Exception as e: + print(f"Exception: {e}") + continue + + with Connection(host=host, user=ssh_user, port=ssh_port, connect_timeout=60, connect_kwargs=connect_args) as c: + try: + print(f"Hostname:", end=" ", flush=True) + result = execute_command(c, "hostname") + print(f"{result.strip()}", end="\n", flush=True) + hostname = str.lower(result) + except Exception as e: + print(f"[Hostname] Exception: {e}") + continue + + try: + print(f"Checking Cloud configuration:", end=" ", flush=True) + result = read_remote_config_sudo(c, "/etc/cube/config-azure.properties", ssh_password) + print(f"✅", end="\n", flush=True) + except Exception as e: + print(f"❌", end="\n", flush=True) + print(f"[Cloud configuration check] Exception: {e}") + continue + + cloud_configuration_check(hostname, result, "iot-ingest-ess-prod.azure-devices.net", "10.81.35.126", "8080") + + print(f"Setting proxy configuration:", end="\n", flush=True) + result_proxy_host = set_config_field(result, "proxy-host", "10.81.35.126", True) + result_proxy_host_port = set_config_field(result_proxy_host, "proxy-port", "8080", True) + result = result_proxy_host_port + cloud_configuration_check(hostname, result, "iot-ingest-ess-prod.azure-devices.net", "10.81.35.126", "8080") + + response = input(f"Apply the change on {hostname.strip()}? (y)es or (n)o, anything else to cancel - ").lower() + if response in ['y']: + print(f"Applying changes:", end=" ", flush=True) + try: + write_remote_config_sudo(c, "/etc/cube/config-azure.properties", result, ssh_password, "cube", "root", "644") + print(f"✅", end="\n", flush=True) + except Exception as e: + print(f"❌", end="\n", flush=True) + print(f"[Proxy configuration] Exception: {e}") + continue + print(f"Checking Cloud configuration:", end=" ", flush=True) + try: + result = read_remote_config_sudo(c, "/etc/cube/config-azure.properties", ssh_password) + print(f"✅", end="\n", flush=True) + except Exception as e: + print(f"❌", end="\n", flush=True) + print(f"[Proxy verification] Exception: {e}") + continue + + cloud_configuration_check(hostname, result, "iot-ingest-ess-prod.azure-devices.net", "10.81.35.126", "8080") + elif response in ['n']: + print(f"Not applying configuration...") + else: + print(f"Not applying configuration...") + continue + + print(f"Disabling Cyber Check:", end=" ", flush=True) + try: + execute_sudo_command(c, "systemctl stop cube-monit.service", ssh_password) + execute_sudo_command(c, "mount -o remount,rw /", ssh_password) + print(f"✅", end="\n", flush=True) + except Exception as e: + print(f"❌", end="\n", flush=True) + print(f"[Disabling Cyber Check] Exception: {e}") + continue + + print(f"Reading Cyber Check configuration:", end=" ", flush=True) + try: + result = read_remote_config_sudo(c, "/etc/cube-default/configfile_monit.yaml", ssh_password) + print(f"✅", end="\n", flush=True) + except Exception as e: + print(f"❌", end="\n", flush=True) + print(f"[Cyber Check configuration] Exception: {e}") + continue + + print(f"Checking cyber_check:", end=" ", flush=True) + try: + status = find_yaml_value(result, "cubeProcess.cyber_check") + if status == False: + print(f"✅", end="\n", flush=True) + else: + print(f"❌", end="\n", flush=True) + except Exception as e: + print(f"❌", end="\n", flush=True) + print(f"[cyber_check value] Exception: {e}") + continue + + print(f"Modifying cyber_check:", end=" ", flush=True) + modified_result = "" + try: + modified_result = set_yaml_value(result, "cubeProcess.cyber_check", False) + print(f"✅", end="\n", flush=True) + except Exception as e: + print(f"❌", end="\n", flush=True) + print(f"[cyber_check modification] Exception: {e}") + continue + + print(f"Checking modified cyber_check:", end=" ", flush=True) + try: + status = find_yaml_value(modified_result, "cubeProcess.cyber_check") + if status == False: + print(f"✅", end="\n", flush=True) + else: + print(f"❌", end="\n", flush=True) + except Exception as e: + print(f"❌", end="\n", flush=True) + print(f"[Modified cyber_check value] Exception: {e}") + continue + + response = input(f"Apply the change on {hostname.strip()}? (y)es or (n)o, anything else to cancel - ").lower() + if response in ['y']: + print(f"Applying changes:", end=" ", flush=True) + try: + write_remote_config_base64_sudo(c, "/etc/cube-default/configfile_monit.yaml", modified_result, ssh_password, "root", "root", "644") + print(f"✅", end="\n", flush=True) + except Exception as e: + print(f"❌", end="\n", flush=True) + print(f"[cyber_check configuration] Exception: {e}") + continue + print(f"Checking cyber_check configuration:", end=" ", flush=True) + try: + result = read_remote_config_sudo(c, "/etc/cube-default/configfile_monit.yaml", ssh_password) + print(f"✅", end="\n", flush=True) + except Exception as e: + print(f"❌", end="\n", flush=True) + print(f"[cyber_check configuration] Exception: {e}") + continue + try: + status = find_yaml_value(result, "cubeProcess.cyber_check") + if status == False: + print(f"✅", end="\n", flush=True) + else: + print(f"❌", end="\n", flush=True) + except Exception as e: + print(f"❌", end="\n", flush=True) + print(f"[Modified cyber_check configuration verification] Exception: {e}") + continue + elif response in ['n']: + print(f"Not applying configuration...") + else: + print(f"Not applying configuration...") + continue + + + print(f"Firewall check:", end="\n", flush=True) + modified_result = "" + try: + result = read_remote_config_sudo(c, "/etc/iptables/iptables-cube.rules", ssh_password) + except Exception as e: + print(f"[Firewall reading] Exception: {e}") + continue + try: + modified_result = ensure_iptables_port_rule(result, 8080, 443) + except Exception as e: + print(f"[Firewall changes] Exception: {e}") + continue + + response = input(f"Apply the change on {hostname.strip()}? (y)es or (n)o, anything else to cancel - ").lower() + if response in ['y']: + try: + write_remote_config_base64_sudo(c, "/etc/iptables/iptables-cube.rules", modified_result, ssh_password, "root", "root", 600) + except Exception as e: + print(f"[Firewall configuration] Exception: {e}") + continue + elif response in ['n']: + print(f"Not applying configuration...") + else: + print(f"Not applying configuration...") + continue + + print(f"Restarting Cyber Check:", end=" ", flush=True) + try: + execute_sudo_command(c, "mount -o remount,ro /", ssh_password) + execute_sudo_command(c, "systemctl start cube-monit.service", ssh_password) + print(f"✅", end="\n", flush=True) + except Exception as e: + print(f"❌", end="\n", flush=True) + print(f"[Restarting Cyber Check] Exception: {e}") + continue + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/Python/ssh_fabric_batch_fw.py b/Python/ssh_fabric_batch_fw.py new file mode 100644 index 0000000..0d26d85 --- /dev/null +++ b/Python/ssh_fabric_batch_fw.py @@ -0,0 +1,106 @@ +import pexpect +import getpass +import sys +import time + +import os +from dotenv import load_dotenv +load_dotenv(override=True) + +# --- Firewall Rule Definitions are correct as is --- +FIREWALL_RULE_1_CMDS = [ + "firewall 3", "action accept", "interface LAN_ETH1_CUBE WAN", "protocol TCP", + "mode ip", "src-ip all", "src-port all", "dst-ip all", "dst-port single 8080", + "logging severity 6", "name proxy_retour", "exit", +] + +FIREWALL_RULE_2_CMDS = [ + "firewall 4", "action accept", "interface WAN LAN_ETH1_CUBE", "protocol TCP", + "mode ip", "src-ip all", "src-port single 8080", "dst-ip all", "dst-port all", + "logging severity 6", "name proxy_aller", "exit", +] + +def configure_moxa_firewall_pexpect(router_ip): + ip_address = router_ip + username = os.getenv("DEFAULT_EDR_810_USER") + password = os.getenv("DEFAULT_EDR_810_PASSWORD") + + try: + command = f"ssh -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null {username}@{ip_address}" + print(f"--- Starting SSH session...") + child = pexpect.spawn(command, encoding='utf-8', timeout=20) + + child.logfile_read = sys.stdout + + child.expect("[Pp]assword:") + child.sendline(password) + child.expect(r"[\#\>\$]\s*$") + base_prompt = child.after.strip() + print(f"\n>>> SUCCESSFULLY CONNECTED! Base prompt is: '{base_prompt}'") + + # --- Check for existing rules --- + # print("\n--- Checking for existing rules...") + # child.sendline("terminal length 0") + # child.expect(base_prompt) + # child.sendline("show running-config") + # child.expect(base_prompt) + # running_config = child.before + + # if "name proxy_retour" in running_config and "name proxy_aller" in running_config: + # print("--- Firewall rules already exist. No action needed.") + # child.sendline("exit") + # child.close() + # return + + # print("--- Rules not found. Proceeding with configuration.") + + # --- Enter Configuration Mode --- + # print("\n>>> Entering configuration mode...") + # child.sendline("configure") + # config_prompt_re = r"\(config\)#\s*$" + # child.expect(config_prompt_re) + + # --- Apply Rule 1 --- + # print("\n>>> Applying Rule 1: proxy_retour") + # for cmd in FIREWALL_RULE_1_CMDS: + # child.sendline(cmd) + # child.expect([r"\(config-firewall\)#\s*$", config_prompt_re]) + # time.sleep(0.2) + + # --- Apply Rule 2 --- + # print("\n>>> Applying Rule 2: proxy_aller") + # for cmd in FIREWALL_RULE_2_CMDS: + # child.sendline(cmd) + # child.expect([r"\(config-firewall\)#\s*$", config_prompt_re]) + # time.sleep(0.2) + + # --- THE CORRECTED SAVE LOGIC --- + # 1. Exit from configuration mode to return to the base prompt + # print("\n>>> Exiting configuration mode to save...") + # child.sendline("exit") + # child.expect(base_prompt) + + # 2. Now, from the base prompt, issue the save command + print("\n>>> Saving configuration...") + child.sendline("save") + child.expect(base_prompt) + + print("\n--- Configuration complete. Closing session.") + child.sendline("exit") # Log out + child.close() + + except pexpect.exceptions.TIMEOUT: + print("\n!!! CRITICAL: Connection timed out.") + print(f"--- Last output seen (child.before): {child.before}") + except pexpect.exceptions.EOF: + print("\n!!! CRITICAL: Connection closed unexpectedly.") + print(f"--- Last output seen (child.before): {child.before}") + except Exception as e: + print(f"\n!!! An unexpected error occurred: {e}") + +if __name__ == "__main__": + ip_prefix = "10.81.60." + ip_suffixes = list(range(194, 215)) + for suffix in ip_suffixes: + router_ip = ip_prefix + str(suffix) + configure_moxa_firewall_pexpect(router_ip) \ No newline at end of file