import io from fabric import Connection from getpass import getpass from dotenv import load_dotenv import io import os from cube_activate_ssh import activate_ssh from ruamel.yaml import YAML from ruamel.yaml.scalarstring import DoubleQuotedScalarString import shlex import base64 load_dotenv(override=True) def execute_command(c, command): """Executes a simple command on the remote device.""" try: result = c.run(command, hide=True) return result.stdout except Exception as e: print(f"Error executing {command}: {e}") def execute_sudo_command(c, command, sudo_pass): """Executes a command with sudo on the remote device.""" # print(f"\n--- [{c.host}] Executing with sudo: {command} ---") try: result = c.sudo(command, password=sudo_pass, pty=True, hide=True) return result.stdout # print("STDOUT:") # print(result.stdout) # print("STDERR:") # print(result.stderr) except Exception as e: print(f"Error executing {command} as administrator: {e}") def read_remote_config_sudo(c, remote_path, sudo_pass): """ Reads a remote file with sudo and returns its content as a string. """ # print(f"\n--- [{c.host}] Reading remote file with sudo: {remote_path} ---") try: # Use sudo to cat the file and capture its output result = c.sudo(f"cat {remote_path}", password=sudo_pass, hide=True) return result.stdout except Exception as e: print(f"Error reading remote file with sudo: {e}") return None # Make sure to import shlex at the top of your script def write_remote_config_sudo(c, remote_path, content, sudo_pass, user_owner, group_owner, permissions): """ Writes content directly to a remote file using the 'sudo sh -c' pattern. This is the most robust and secure method for environments where SFTP is disabled. It avoids input stream conflicts and prevents shell injection. Args: c (fabric.Connection): The active connection object. remote_path (str): The absolute path to the file on the remote host. content (str): The string content to be written to the file. sudo_pass (str): The sudo password for the write operation. """ print(f"\n--- [{c.host}] Writing directly using 'sudo sh -c' method: {remote_path} ---") try: # Step 1: Securely escape the content for safe shell execution. # shlex.quote() wraps the string in single quotes and handles any # internal single quotes, making it safe to pass to a shell. safe_content = shlex.quote(content) # Step 2: Construct the command. # 'sudo sh -c "..."': This runs a new shell ('sh') with root privileges. # 'echo ... > file': This command is executed *by the root shell*. # The redirection '>' is therefore handled by root, which has permission # to write to the protected 'remote_path'. command = f"sh -c \"echo {safe_content} > {remote_path}\"" print("Step 1: Writing content via root shell...") # We run this entire command string using c.sudo(). Fabric will handle # the password prompt correctly for the 'sudo sh' part. c.sudo(command, password=sudo_pass, hide=True) # Step 3: Set ownership and permissions as a separate step. print("Step 2: Setting ownership and permissions...") c.sudo(f"chown {user_owner}:{group_owner} {remote_path}", password=sudo_pass) c.sudo(f"chmod {permissions} {remote_path}", password=sudo_pass) print(f"✅ Successfully wrote content to {remote_path}") except Exception as e: print(f"❌ Error writing directly to remote file: {e}") # Re-raise the exception so the calling function can handle it. raise def set_config_field(config_content, option, new_value, add_if_missing=False): """ Replaces a configuration value, or optionally adds it if it's missing. This function is idempotent: running it multiple times with the same parameters will result in the same configuration state. Args: config_content (str): The multi-line string of the configuration file. option (str): The configuration key to find (e.g., "PermitRootLogin"). new_value (str): The new value to set for the option. add_if_missing (bool): If True, adds the 'option=new_value' to the end of the content if it's not found. Defaults to False. Returns: str: The modified configuration content. """ lines = config_content.splitlines() new_lines = [] found = False # First, iterate through the existing lines to find and replace the option. for line in lines: # Check if the line is a non-commented definition of our option. if line.strip().startswith(option + '=') and not line.strip().startswith('#'): new_lines.append(f"{option}={new_value}") found = True else: # Keep the original line if it's not a match. new_lines.append(line) # After checking all lines, if the option was not found AND we are # instructed to add it, we append it to the end of the configuration. if not found and add_if_missing: print(f"Info: Option '{option}' not found. Appending it to the configuration.") # Ensure the new line is actually on a new line. if new_lines and new_lines[-1] != '': new_lines.append('') # Add a blank line for separation if needed new_lines.append(f"{option}={new_value}") elif not found: # This is the original behavior: do nothing if missing and not told to add. print(f"Warning: Option '{option}' not found. No changes made.") return "\n".join(new_lines) def find_config_value(config_content, option): """ Finds a specific option in a configuration string and returns its value. Args: config_content (str): The multi-line string of the configuration file. option (str): The configuration key to find (e.g., "PermitRootLogin"). Returns: str: The value of the option if found. None: If the option is not found or is commented out. """ try: # Go through each line in the configuration for line in config_content.splitlines(): # Clean up the line by removing leading/trailing whitespace clean_line = line.strip() # Check if the line is not a comment and starts with our option # The '=' is important to avoid matching partial keys (e.g., 'Port' matching 'Ports') if not clean_line.startswith('#') and clean_line.startswith(option + '='): # Split the line only on the first equals sign # This handles cases where the value itself might contain an '=' parts = clean_line.split('=', 1) # The value is the second part, stripped of any whitespace value = parts[1].strip() return value except Exception as e: # In case of any unexpected error (e.g., malformed content), return None print(f"An error occurred while parsing config: {e}") return None # If the loop finishes without finding the option, return None return None def cloud_configuration_check(hostname, result, iot_hub, proxy_host, proxy_port): print(f"\tLight telemetry:", end=" ", flush=True) status = find_config_value(result, "light-telemetry") if status == "false": print(f"✅", end="\n", flush=True) else: print(f"❌") print(f"\tTelemetry:", end=" ", flush=True) status = find_config_value(result, "telemetry-on") if status == "true": print(f"✅", end="\n", flush=True) else: print(f"❌") print(f"\tCompression:", end=" ", flush=True) status = find_config_value(result, "compression-enabled") if status == "true": print(f"✅", end="\n", flush=True) else: print(f"❌", end="\n", flush=True) print(f"\tRemote update:", end=" ", flush=True) status = find_config_value(result, "remote-update-on") if status == "true": print(f"✅", end="\n", flush=True) else: print(f"❌", end="\n", flush=True) print(f"\tConnection string:", end="\n", flush=True) status = find_config_value(result, "connection-string") parsed_data = parse_connection_string(status) print(f"\t\tIoT Hub:", end=" ", flush=True) print(f"{parsed_data.get('HostName')}", end=" ", flush=True) if parsed_data.get('HostName').strip() == iot_hub.strip(): print(f"✅", end="\n", flush=True) else: print(f"❌", end="\n", flush=True) print(f"\t\tDevice ID:", end=" ", flush=True) print(f"{parsed_data.get('DeviceId')}", end=" ", flush=True) if parsed_data.get('DeviceId').strip() == hostname.strip(): print(f"✅", end="\n", flush=True) else: print(f"❌", end="\n", flush=True) print(f"\tProxy:", end="\n", flush=True) print(f"\t\tHost:", end=" ", flush=True) status = find_config_value(result, "proxy-host") if status == None: print(f"NA ❌", end="\n", flush=True) elif status == proxy_host: print(f"✅", end="\n", flush=True) else: print(f"❌", end="\n", flush=True) print(f"\t\tPort:", end=" ", flush=True) status = find_config_value(result, "proxy-port") if status == None: print(f"NA ❌", end="\n", flush=True) elif status == proxy_port: print(f"✅", end="\n", flush=True) else: print(f"❌", end="\n", flush=True) def parse_connection_string(connection_string): """ Parses a semicolon-separated connection string into a dictionary, handling keys that have an escaped equals sign (e.g., 'Key\\=Value'). """ parsed_data = {} # 1. Split the entire string by the semicolon to get each pair parts = connection_string.split(';') for part in parts: # 2. IMPORTANT: Replace the escaped separator '\\=' with a plain '='. # This is the only replacement needed and uses the correct syntax. cleaned_part = part.replace('\\=', '=') # 3. Now, split the cleaned part by the first equals sign if '=' in cleaned_part: key, value = cleaned_part.split('=', 1) # Add the key-value pair to our dictionary, stripping any extra whitespace parsed_data[key.strip()] = value.strip() return parsed_data def find_yaml_value(yaml_content, key_path): """ Finds a value in a YAML string using a dot-separated path. Args: yaml_content (str): The string content of the YAML file. key_path (str): A dot-separated path to the key (e.g., "cubeProcess.cyber_check"). Returns: The value if found, otherwise None. """ try: yaml = YAML() data = yaml.load(yaml_content) # Traverse the path keys = key_path.split('.') current_level = data for key in keys: current_level = current_level[key] return current_level except (KeyError, TypeError): # KeyError if a key is not found, TypeError if trying to index a non-dict # print(f"Warning: Key path '{key_path}' not found in YAML content.") return None def set_yaml_value(yaml_content, key_path, new_value): """ Sets a value in a YAML string using a dot-separated path. Preserves comments, formatting, and quotes thanks to ruamel.yaml. This version correctly traverses nested keys. Args: yaml_content (str): The string content of the YAML file. key_path (str): A dot-separated path to the key (e.g., "cubeProcess.cyber_check"). new_value: The new value to set. Returns: str: The modified YAML content as a string, or the original content on error. """ try: # --- FIX 1: Configure the YAML object to preserve quotes --- yaml = YAML() yaml.preserve_quotes = True yaml.indent(mapping=2, sequence=4, offset=2) # Optional: ensures consistent indentation data = yaml.load(yaml_content) # --- FIX 2: Correct traversal logic --- keys = key_path.split('.') current_level = data # Traverse down to the final key's parent dictionary for key in keys[:-1]: current_level = current_level[key] final_key = keys[-1] # Check if the key exists before setting it if final_key not in current_level: print(f"❌ Error: Final key '{final_key}' not found in the structure. Aborting.") return yaml_content # Return original content # Set the new value current_level[final_key] = new_value # Dump the modified data back to a string string_stream = io.StringIO() yaml.dump(data, string_stream) return string_stream.getvalue() except (KeyError, TypeError) as e: print(f"❌ Error: Key path '{key_path}' is invalid or part of the path does not exist. Error: {e}") return yaml_content # Return original content on failure def ensure_iptables_port_rule(config_content, target_port, template_port): """ Ensures that iptables rules for a target port exist in the configuration. If rules for the target port are not found, it finds rules for a template port and replaces the port number. Args: config_content (str): The multi-line string of the iptables rules file. target_port (int or str): The port number that should exist (e.g., 8080). template_port (int or str): The port number to use as a template (e.g., 443). Returns: str: The modified (or original) configuration content. """ target_port_str = str(target_port) template_port_str = str(template_port) lines = config_content.splitlines() target_port_found = False # --- PASS 1: Check if the target port rule already exists --- for line in lines: # Check for the target port in an active rule line # The spaces around the port string prevent accidentally matching '8080' in '18080' if line.strip().startswith('-A') and (f"--dport {target_port_str}" in line or f"--sport {target_port_str}" in line): print(f"✅ Info: Rule for target port {target_port_str} already exists. No changes needed.") target_port_found = True break # If the rule was found, return the original content without any changes. if target_port_found: return config_content # --- PASS 2: If we get here, the rule was not found. We must replace the template. --- print(f"Info: Rule for target port {target_port_str} not found. Searching for template port {template_port_str} to replace.") new_lines = [] changes_made = False for line in lines: # Check for the template port in an active rule line if line.strip().startswith('-A') and (f"--dport {template_port_str}" in line or f"--sport {template_port_str}" in line): # This is a line we need to modify modified_line = line.replace(template_port_str, target_port_str) new_lines.append(modified_line) print(f" - Replacing: '{line}'") print(f" + With: '{modified_line}'") changes_made = True else: # This line doesn't need changing, add it as is. new_lines.append(line) if not changes_made: print(f"❌ Warning: Target port {target_port_str} was not found, AND template port {template_port_str} was also not found. No changes made.") return config_content # Return original if template wasn't found either return "\n".join(new_lines) def write_remote_config_base64_sudo(c, remote_path, content, sudo_pass, user_owner, group_owner, permissions): """ Writes content directly to a remote file by passing it as a Base64 string. This is the most robust method for no-SFTP environments, as it completely avoids all shell quoting and parsing issues for complex, multi-line content. Args: c (fabric.Connection): The active connection object. remote_path (str): The absolute path to the file on the remote host. content (str): The string content to be written to the file. sudo_pass (str): The sudo password for the write operation. """ print(f"\n--- [{c.host}] Writing content via Base64 to: {remote_path} ---") try: # Step 1: Encode the string content into Base64. # base64.b64encode requires bytes, so we encode the string to utf-8. # The result is bytes, so we decode it back to a simple ascii string to use in our command. base64_content = base64.b64encode(content.encode('utf-8')).decode('ascii') # Step 2: Construct the command. # 'echo ... | base64 --decode > file': This pipeline decodes the content # and redirects the output to the destination file. # We wrap the entire pipeline in 'sudo sh -c "..."' so that the # redirection ('>') is performed by a shell running as root. command = f"sh -c \"echo '{base64_content}' | base64 --decode > {remote_path}\"" print("Step 1: Writing Base64 content via root shell...") c.sudo(command, password=sudo_pass, hide=True) # Step 3: Set ownership and permissions. print("Step 2: Setting ownership and permissions...") c.sudo(f"chown {user_owner}:{group_owner} {remote_path}", password=sudo_pass) c.sudo(f"chmod {permissions} {remote_path}", password=sudo_pass) print(f"✅ Successfully wrote content to {remote_path}") except Exception as e: print(f"❌ Error writing Base64 content to remote file: {e}") # Re-raise the exception for the main loop. raise def main(): """Main function to parse arguments and orchestrate tasks.""" ip_address_prefix = "10.81.56." # DK2 subnet ip_address_range = list(range(129, 145)) # From 129 to 144 (16 CUBEs) # ip_address_range.append(72) # Add 85 after 74. hosts = [f"{ip_address_prefix}{suffix}" for suffix in ip_address_range] ssh_port = 11022 ssh_user = os.getenv("DEFAULT_CUBE_LINUX_ADMIN_USER") ssh_password = os.getenv("DEFAULT_CUBE_LINUX_ADMIN_PASSWORD") connect_args = {} connect_args["password"] = ssh_password connect_args["banner_timeout"] = 3 connect_args["auth_timeout"] = 60 connect_args["channel_timeout"] = 60 connect_args["look_for_keys"] = False connect_args["allow_agent"] = False for host in hosts: print(f"{host}", end=" - ", flush=True) hostname = "" result = "" try: activate_ssh(host) except Exception as e: print(f"Exception: {e}") continue with Connection(host=host, user=ssh_user, port=ssh_port, connect_timeout=60, connect_kwargs=connect_args) as c: try: print(f"Hostname:", end=" ", flush=True) result = execute_command(c, "hostname") print(f"{result.strip()}", end="\n", flush=True) hostname = str.lower(result) except Exception as e: print(f"[Hostname] Exception: {e}") continue try: print(f"cURL:", end=" ", flush=True) result = execute_command(c, "curl -m 15 -x https://10.81.35.126:8080 https://iot-ingest-ess-prod.azure-devices.net") print(f"{result.strip()}", end="\n", flush=True) except Exception as e: print(f"[cURL] Exception: {e}") continue # try: # print(f"Hostname:", end=" ", flush=True) # result = execute_command(c, "hostname") # print(f"{result.strip()}", end="\n", flush=True) # hostname = str.lower(result) # except Exception as e: # print(f"[Hostname] Exception: {e}") # continue # try: # print(f"Checking Cloud configuration:", end=" ", flush=True) # result = read_remote_config_sudo(c, "/etc/cube/config-azure.properties", ssh_password) # print(f"✅", end="\n", flush=True) # except Exception as e: # print(f"❌", end="\n", flush=True) # print(f"[Cloud configuration check] Exception: {e}") # continue # cloud_configuration_check(hostname, result, "iot-ingest-ess-prod.azure-devices.net", "10.81.35.126", "8080") # print(f"Setting proxy configuration:", end="\n", flush=True) # result_proxy_host = set_config_field(result, "proxy-host", "10.81.35.126", True) # result_proxy_host_port = set_config_field(result_proxy_host, "proxy-port", "8080", True) # result_proxy_host_port_compression = set_config_field(result_proxy_host_port, "compression-enabled", "true", True) # result = result_proxy_host_port # cloud_configuration_check(hostname, result, "iot-ingest-ess-prod.azure-devices.net", "10.81.35.126", "8080") # response = input(f"Apply the change on {hostname.strip()}? (y)es or (n)o, anything else to cancel - ").lower() # if response in ['y']: # print(f"Applying changes:", end=" ", flush=True) # try: # write_remote_config_sudo(c, "/etc/cube/config-azure.properties", result, ssh_password, "cube", "root", "644") # print(f"✅", end="\n", flush=True) # except Exception as e: # print(f"❌", end="\n", flush=True) # print(f"[Proxy configuration] Exception: {e}") # continue # print(f"Checking Cloud configuration:", end=" ", flush=True) # try: # result = read_remote_config_sudo(c, "/etc/cube/config-azure.properties", ssh_password) # print(f"✅", end="\n", flush=True) # except Exception as e: # print(f"❌", end="\n", flush=True) # print(f"[Proxy verification] Exception: {e}") # continue # cloud_configuration_check(hostname, result, "iot-ingest-ess-prod.azure-devices.net", "10.81.35.126", "8080") # elif response in ['n']: # print(f"Not applying configuration...") # else: # print(f"Not applying configuration...") # continue # print(f"Disabling Cyber Check:", end=" ", flush=True) # try: # execute_sudo_command(c, "systemctl stop cube-monit.service", ssh_password) # execute_sudo_command(c, "mount -o remount,rw /", ssh_password) # print(f"✅", end="\n", flush=True) # except Exception as e: # print(f"❌", end="\n", flush=True) # print(f"[Disabling Cyber Check] Exception: {e}") # continue # print(f"Reading Cyber Check configuration:", end=" ", flush=True) # try: # result = read_remote_config_sudo(c, "/etc/cube-default/configfile_monit.yaml", ssh_password) # print(f"✅", end="\n", flush=True) # except Exception as e: # print(f"❌", end="\n", flush=True) # print(f"[Cyber Check configuration] Exception: {e}") # continue # print(f"Checking cyber_check:", end=" ", flush=True) # try: # status = find_yaml_value(result, "cubeProcess.cyber_check") # if status == False: # print(f"✅", end="\n", flush=True) # else: # print(f"❌", end="\n", flush=True) # except Exception as e: # print(f"❌", end="\n", flush=True) # print(f"[cyber_check value] Exception: {e}") # continue # print(f"Modifying cyber_check:", end=" ", flush=True) # modified_result = "" # try: # modified_result = set_yaml_value(result, "cubeProcess.cyber_check", False) # print(f"✅", end="\n", flush=True) # except Exception as e: # print(f"❌", end="\n", flush=True) # print(f"[cyber_check modification] Exception: {e}") # continue # print(f"Checking modified cyber_check:", end=" ", flush=True) # try: # status = find_yaml_value(modified_result, "cubeProcess.cyber_check") # if status == False: # print(f"✅", end="\n", flush=True) # else: # print(f"❌", end="\n", flush=True) # except Exception as e: # print(f"❌", end="\n", flush=True) # print(f"[Modified cyber_check value] Exception: {e}") # continue # response = input(f"Apply the change on {hostname.strip()}? (y)es or (n)o, anything else to cancel - ").lower() # if response in ['y']: # print(f"Applying changes:", end=" ", flush=True) # try: # write_remote_config_base64_sudo(c, "/etc/cube-default/configfile_monit.yaml", modified_result, ssh_password, "root", "root", "644") # print(f"✅", end="\n", flush=True) # except Exception as e: # print(f"❌", end="\n", flush=True) # print(f"[cyber_check configuration] Exception: {e}") # continue # print(f"Checking cyber_check configuration:", end=" ", flush=True) # try: # result = read_remote_config_sudo(c, "/etc/cube-default/configfile_monit.yaml", ssh_password) # print(f"✅", end="\n", flush=True) # except Exception as e: # print(f"❌", end="\n", flush=True) # print(f"[cyber_check configuration] Exception: {e}") # continue # try: # status = find_yaml_value(result, "cubeProcess.cyber_check") # if status == False: # print(f"✅", end="\n", flush=True) # else: # print(f"❌", end="\n", flush=True) # except Exception as e: # print(f"❌", end="\n", flush=True) # print(f"[Modified cyber_check configuration verification] Exception: {e}") # continue # elif response in ['n']: # print(f"Not applying configuration...") # else: # print(f"Not applying configuration...") # continue # print(f"Firewall check:", end="\n", flush=True) # modified_result = "" # try: # result = read_remote_config_sudo(c, "/etc/iptables/iptables-cube.rules", ssh_password) # except Exception as e: # print(f"[Firewall reading] Exception: {e}") # continue # try: # modified_result = ensure_iptables_port_rule(result, 8080, 443) # except Exception as e: # print(f"[Firewall changes] Exception: {e}") # continue # response = input(f"Apply the change on {hostname.strip()}? (y)es or (n)o, anything else to cancel - ").lower() # if response in ['y']: # try: # write_remote_config_base64_sudo(c, "/etc/iptables/iptables-cube.rules", modified_result, ssh_password, "root", "root", 600) # except Exception as e: # print(f"[Firewall configuration] Exception: {e}") # continue # elif response in ['n']: # print(f"Not applying configuration...") # else: # print(f"Not applying configuration...") # continue # print(f"Restarting Cyber Check:", end=" ", flush=True) # try: # execute_sudo_command(c, "mount -o remount,ro /", ssh_password) # execute_sudo_command(c, "systemctl start cube-monit.service", ssh_password) # print(f"✅", end="\n", flush=True) # except Exception as e: # print(f"❌", end="\n", flush=True) # print(f"[Restarting Cyber Check] Exception: {e}") # continue # try: # execute_sudo_command(c, "systemctl restart iptables", ssh_password) # print(f"✅", end="\n", flush=True) # except Exception as e: # print(f"❌", end="\n", flush=True) # print(f"[Restart iptables] Exception: {e}") # continue if __name__ == "__main__": main()