Merge branch 'main' of github.com:TotalEnergiesCode/ess-moxa-configuration-tools

This commit is contained in:
Quentin WEPHRE
2025-10-14 15:12:31 +02:00
11 changed files with 1152 additions and 22 deletions

View File

@@ -0,0 +1,210 @@
from azure.iot.hub import IoTHubRegistryManager
from azure.iot.hub.protocol.models import QuerySpecification, Module
from azure.iot.hub.models import CloudToDeviceMethod, CloudToDeviceMethodResult
from dotenv import load_dotenv
from isight_device import iSightDevice
from textual.app import App, ComposeResult
from textual.widgets import Header, Footer, ListView, ListItem, Label, Input, Button, Static, DataTable
from textual.containers import Vertical, VerticalScroll
from textual.binding import Binding
from textual.screen import Screen
from textual import on
from datetime import datetime
import json
import os
load_dotenv()
class ObjectListItem(ListItem):
"""A widget to display an iSightDevice object in the ListView."""
def __init__(self, device: iSightDevice) -> None:
super().__init__()
self.device = device
def compose(self) -> ComposeResult:
"""Create the displayable content of the list item."""
# This line is customized to display the attributes of your iSightDevice object.
# It shows site, number, device_id, and version.
yield Label(f"{self.device.getDeviceId()} | {self.device.getSite()} | {self.device.getNumber()} ({self.device.getVersion()})")
class ModuleScreen(Screen):
BINDINGS = [Binding("escape", "app.pop_screen", "Go Back")]
def __init__(self, device: iSightDevice, device_modules: list[Module]):
super().__init__()
self.device = device
self.device_modules = device_modules
def compose(self) -> ComposeResult:
yield Header(f"IoT Edge modules of {self.device.getDeviceId}")
yield DataTable()
yield Footer()
def on_mount(self) -> None:
table = self.query_one(DataTable)
table.add_columns("Module", "Status", "Since", "Last activity")
for module in self.device_modules:
try:
local_connection_time = datetime.fromisoformat(str(module.connection_state_updated_time)).astimezone().replace(microsecond=0).isoformat()
except:
local_connection_time = datetime.min.isoformat()
try:
local_activity_time = datetime.fromisoformat(str(module.last_activity_time)).astimezone().replace(microsecond=0).isoformat()
except:
local_activity_time = datetime.min.isoformat()
try:
module_id = module.module_id
except:
module_id = "NA"
try:
connection_state = module.connection_state
except:
connection_state = "NA"
table.add_row(f"{module_id}", f"{connection_state}", f"{local_connection_time}", f"{local_activity_time}")
## NEW: The Screen for showing device details and actions
class DetailScreen(Screen):
"""A screen to display details and actions for a single device."""
BINDINGS = [
Binding("escape", "app.pop_screen", "Go Back"),
]
def __init__(self, device: iSightDevice, registry_manager: IoTHubRegistryManager) -> None:
super().__init__()
self.device = device
self.registry_manager = registry_manager
def compose(self) -> ComposeResult:
yield Header(name=f"Details for {self.device.getDeviceId()}")
# Use VerticalScroll so the layout works on small terminals
with VerticalScroll(id="details-container"):
# Static, non-interactive information section
yield Static(f"[b]Site:[/b] {self.device.getSite()}", markup=True)
yield Static(f"[b]Number:[/b] {self.device.getNumber()}", markup=True)
yield Static(f"[b]Version:[/b] {self.device.getVersion()}", markup=True)
yield Static(f"[b]Device ID:[/b] {self.device.getDeviceId()}", markup=True)
# Interactive action buttons
yield Static("\n[b]Actions:[/b]", markup=True)
yield Button("Check IoT Edge modules", variant="primary", id="modules")
yield Footer()
## NEW: Handlers for button presses
@on(Button.Pressed, "#modules")
def check_modules(self) -> None:
device_modules = self.registry_manager.get_modules(self.device.getDeviceId())
device_modules.sort(key=lambda m: m.module_id)
self.app.push_screen(ModuleScreen(self.device, device_modules))
# @on(Button.Pressed, "#logs")
# def check_logs(self) -> None:
# # Placeholder for your logging logic
# self.app.notify(f"Fetching logs for {self.device.getDeviceId()}...")
# @on(Button.Pressed, "#diag")
# def run_diagnostics(self) -> None:
# # Placeholder for your diagnostics logic
# self.app.notify(f"Running diagnostics on {self.device.getDeviceId()}...")
# @on(Button.Pressed, "#delete")
# def delete_device(self) -> None:
# # Placeholder for delete logic
# self.app.notify(f"Deleting {self.device.getDeviceId()} is not yet implemented.", severity="error")
# # You would likely want a confirmation dialog here in a real app.
class DeviceTUI(App):
"""An interactive TUI to list and filter iSightDevice objects."""
BINDINGS = [
Binding("q", "quit", "Quit"),
]
def __init__(self, devices: list[iSightDevice], registry_manager: IoTHubRegistryManager):
super().__init__()
self.all_devices = devices
self.filtered_devices = self.all_devices[:]
self.registry_manager = registry_manager
def compose(self) -> ComposeResult:
"""Create child widgets for the app."""
yield Header(name="iSight Device Viewer")
yield Input(placeholder="Filter by site...")
with Vertical(id="list-container"):
yield ListView(*[ObjectListItem(dev) for dev in self.filtered_devices], id="device-list")
yield Footer()
def on_mount(self) -> None:
"""Called when the app is first mounted."""
self.query_one(Input).focus()
def on_input_changed(self, event: Input.Changed) -> None:
"""Handle changes to the input field and filter the list."""
filter_text = event.value.lower()
self.filter_devices(filter_text)
def filter_devices(self, filter_text: str):
"""Filter the list of devices based on the site."""
if filter_text:
self.filtered_devices = [
dev for dev in self.all_devices if filter_text in dev.getSite().lower()
]
else:
self.filtered_devices = self.all_devices[:]
list_view = self.query_one(ListView)
list_view.clear()
for dev in self.filtered_devices:
list_view.append(ObjectListItem(dev))
def on_list_view_selected(self, event: ListView.Selected) -> None:
"""Handle item selection in the ListView."""
selected_item = event.item
if isinstance(selected_item, ObjectListItem):
selected_device = selected_item.device
# Instead of a notification, we push the new detail screen
self.push_screen(DetailScreen(device=selected_device, registry_manager=registry_manager))
if __name__ == "__main__":
CONNECTION_STRING = str(os.getenv("CONNECTION_STRING_INOX_PROD"))
if CONNECTION_STRING == "":
print("Provide a connection string for the Iot Hub before running the script!")
exit(13)
print(f"Connecting to IoT Hub ", end="")
try:
registry_manager = IoTHubRegistryManager.from_connection_string(CONNECTION_STRING)
except Exception as e:
print(f"")
print(f"Error {e}")
exit
print(f"")
print(f"Getting list of IoT Edge devices ", end="")
try:
query_spec = QuerySpecification(query="SELECT * FROM devices WHERE capabilities.iotEdge = true ")
query_result = registry_manager.query_iot_hub(query_spec)
except Exception as e:
print(f"")
print(f"Error {e}")
exit
print(f"")
print(f"Sorting {len(query_result.items)} devices ", end="")
devices = []
for item in query_result.items:
devices.append(iSightDevice(str(item.device_id), str(item.tags['site']), int(item.tags['number']), str(item.tags['version'])))
devices.sort(key = lambda d: (d.site, d.number))
print(f"")
app = DeviceTUI(devices=devices, registry_manager=registry_manager)
app.run()

View File

@@ -55,11 +55,9 @@ def authenticate(base_url):
if not token: if not token:
raise requests.exceptions.RequestException raise requests.exceptions.RequestException
print("HTTPS ✅", end = " ", flush=True)
return token return token
except requests.exceptions.RequestException as e: except requests.exceptions.RequestException as e:
print(f"HTTPS ❌", flush=True)
if hasattr(e, 'response') and e.response: if hasattr(e, 'response') and e.response:
raise Exception(e.response) raise Exception(e.response)
else: else:
@@ -83,12 +81,7 @@ def set_ssh_status(base_url, token):
response = requests.post(ssh_url, headers=headers, json=payload, verify=False, timeout=10) response = requests.post(ssh_url, headers=headers, json=payload, verify=False, timeout=10)
response.raise_for_status() response.raise_for_status()
print(f"SSH ✅", end = " ", flush=True)
return True
except requests.exceptions.RequestException as e: except requests.exceptions.RequestException as e:
print("SSH ❌", flush=True)
if hasattr(e, 'response') and e.response: if hasattr(e, 'response') and e.response:
raise Exception(e.response) raise Exception(e.response)
else: else:
@@ -111,8 +104,17 @@ def activate_ssh(ip_address):
if not verify_ssl: if not verify_ssl:
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
print(f"HTTPS", end=" ", flush=True)
try:
token = authenticate(url) token = authenticate(url)
if not token: print(f"", end="", flush=True)
return except Exception as e:
time.sleep(3) print(f"", flush=True)
raise
print(f"SSH", end=" ", flush=True)
try:
set_ssh_status(url, token) set_ssh_status(url, token)
print(f"", end="\n", flush=True)
except Exception as e:
print(f"", flush=True)
raise

View File

@@ -10,11 +10,11 @@ from azure.iot.hub.models import Twin, TwinProperties
load_dotenv(override=True) load_dotenv(override=True)
ip_address_prefix = "10.188.11." ip_address_prefix = "10.81.60."
ssh_command = "hostname" ssh_command = "hostname"
csv_filename = "hoohana7.csv" csv_filename = "Grandpuits_01.csv"
SITE_NAME = "HOOHANA" SITE_NAME = "Grandpuits"
ssh_username = os.getenv("DEFAULT_CUBE_LINUX_ADMIN_USER") ssh_username = os.getenv("DEFAULT_CUBE_LINUX_ADMIN_USER")
ssh_password = os.getenv("DEFAULT_CUBE_LINUX_ADMIN_PASSWORD") ssh_password = os.getenv("DEFAULT_CUBE_LINUX_ADMIN_PASSWORD")
@@ -82,7 +82,7 @@ def main():
results = [] results = []
numbers = [16, 18, 34, 35] numbers = [193, 194, 195, 196, 197, 198, 199, 200, 201, 202, 203, 204, 205, 206, 207, 208, 209, 210, 211, 212, 213, 214]
for i in numbers: for i in numbers:
ip_address = f"{ip_address_prefix}{i}" ip_address = f"{ip_address_prefix}{i}"
print(f"Activating SSH for {ip_address}:", end=" ") print(f"Activating SSH for {ip_address}:", end=" ")

View File

@@ -48,10 +48,10 @@ def update_cloud_config(ip, new_content):
def main(): def main():
ip_address_prefix = "10.84.171." ip_address_prefix = "10.81.60."
start_ip = 1 start_ip = 193
end_ip = 5 end_ip = 214
SITE_NAME = "MYRTLE" SITE_NAME = "Grandpuits"
print(f"Site: {SITE_NAME}") print(f"Site: {SITE_NAME}")
print(f"From {ip_address_prefix}{str(start_ip)} to {ip_address_prefix}{str(end_ip)}") print(f"From {ip_address_prefix}{str(start_ip)} to {ip_address_prefix}{str(end_ip)}")

View File

@@ -0,0 +1,153 @@
from paramiko import SSHClient, AutoAddPolicy
import paramiko
from cube_activate_ssh import activate_ssh
from dotenv import load_dotenv
import os
import sys
import shlex
from scp import SCPClient
import time
def resource_path(relative_path):
""" Get absolute path to resource, works for dev and for PyInstaller """
try:
# PyInstaller creates a temp folder and stores path in _MEIPASS
base_path = sys._MEIPASS
except Exception:
base_path = os.path.abspath(".")
return os.path.join(base_path, relative_path)
dotenv_path = resource_path('.env')
load_dotenv(dotenv_path=dotenv_path)
ip_address_prefix = "10.81.35." # Carling subnet
ip_address_range = list(range(65, 75)) # From 65 to 74
ip_address_range.append(85) # Add 85 after 74.
ENV_SSH = {
"DEFAULT_CUBE_LINUX_ADMIN_USER": os.getenv("DEFAULT_CUBE_LINUX_ADMIN_USER"),
"DEFAULT_CUBE_LINUX_ADMIN_PASSWORD": os.getenv("DEFAULT_CUBE_LINUX_ADMIN_PASSWORD")
}
ssh_username = ENV_SSH["DEFAULT_CUBE_LINUX_ADMIN_USER"]
ssh_password = ENV_SSH["DEFAULT_CUBE_LINUX_ADMIN_PASSWORD"]
def execute_ssh_command(ip, command, client):
try:
stdin, stdout, stderr = client.exec_command(command, timeout=180)
exit_status = stdout.channel.recv_exit_status()
stdout_line = [line for line in stdout]
for output in stdout_line:
print(output.strip())
except Exception as e:
print(f"SSH error: {str(e)}", flush=True)
raise
finally:
client.close()
def execute_sudo_ssh_command(ip, command, client):
try:
quoted_command = f"bash -c {shlex.quote(command)}"
sudo_command = f"sudo -S -p '' {quoted_command}"
stdin, stdout, stderr = client.exec_command(sudo_command, timeout=180)
time.sleep(3)
stdin.write(ssh_password + '\n')
stdin.flush()
exit_status = stdout.channel.recv_exit_status()
output = stdout.read().decode('utf-8')
error = stderr.read().decode('utf-8')
if exit_status == 0:
print(f"{output}")
else:
print(f"")
print(f"{error}")
raise Exception("Error during SSH sudo command.")
result = stdout.read().decode().lower().strip()
return result
except Exception as e:
raise
finally:
client.close()
def scp_get_file(ip, remote_path, local_path):
client = SSHClient()
client.set_missing_host_key_policy(AutoAddPolicy())
local_path = os.path.expanduser(local_path)
local_path = os.path.abspath(local_path)
local_dir = os.path.dirname(local_path)
if local_dir:
os.makedirs(local_dir, exist_ok=True)
try:
client.connect(
ip,
port=11022,
username=ssh_username,
password=ssh_password,
allow_agent=False,
look_for_keys=False,
timeout=180
)
client.get_transport().set_keepalive(5)
with SCPClient(client.get_transport()) as scp:
scp.get(remote_path, local_path)
except Exception as e:
raise
finally:
client.close()
def main():
print(f"Starting...\n", flush=True)
for i in ip_address_range:
ip_address = f"{ip_address_prefix}{i}"
print(f"[{time.ctime(time.time())}] {str(i)} ({ip_address})", end=" ", flush=True)
try:
activate_ssh(ip_address)
except Exception as e:
print(f"SSH activation failed for {ip_address}:", flush=True)
print(f"{e}", flush=True)
print(f"Skipping CUBE...", flush=True)
continue
cube_id = "NA"
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(ip_address, port=11022, username=ssh_username, password=ssh_password, allow_agent=False, look_for_keys=False, timeout=180, banner_timeout=180)
client.get_transport().set_keepalive(5)
try:
cube_id = execute_ssh_command(ip_address, "hostname", client)
print(f"{cube_id}", flush=True)
except Exception as e:
print(f"cube-xxxxx ❌", flush=True)
print(f"Error getting hostname for {ip_address}:", flush=True)
print(f"{e}", flush=True)
print(f"Skipping CUBE...", flush=True)
continue
try:
#result = execute_ssh_command(ip_address, "cat /etc/cube/config-azure.properties", client)
print(f"coucou", flush=True)
except Exception as e:
print(f"Error getting Cloud settings for {cube_id}.", flush=True)
print(f"{e}", flush=True)
print(f"Skipping CUBE...", flush=True)
continue
if __name__ == "__main__":
main()

View File

@@ -31,6 +31,9 @@ class iSightDevice:
def getNumber(self): def getNumber(self):
return f"{self.number}" return f"{self.number}"
def getVersion(self):
return f"{self.cloudVersion}"
class CubeDevice: class CubeDevice:
def __init__(self, deviceId: str): def __init__(self, deviceId: str):
if not isinstance(deviceId, str): if not isinstance(deviceId, str):

View File

@@ -6,3 +6,8 @@ requests
pandas pandas
openpyxl openpyxl
scp scp
textual
fabric
ruamel.yaml
netmiko
pexpect

651
Python/ssh_fabric_batch.py Normal file
View File

@@ -0,0 +1,651 @@
import io
from fabric import Connection
from getpass import getpass
from dotenv import load_dotenv
import io
import os
from cube_activate_ssh import activate_ssh
from ruamel.yaml import YAML
from ruamel.yaml.scalarstring import DoubleQuotedScalarString
import shlex
import base64
load_dotenv(override=True)
def execute_command(c, command):
"""Executes a simple command on the remote device."""
try:
result = c.run(command, hide=True)
return result.stdout
except Exception as e:
print(f"Error executing {command}: {e}")
def execute_sudo_command(c, command, sudo_pass):
"""Executes a command with sudo on the remote device."""
# print(f"\n--- [{c.host}] Executing with sudo: {command} ---")
try:
result = c.sudo(command, password=sudo_pass, pty=True, hide=True)
return result.stdout
# print("STDOUT:")
# print(result.stdout)
# print("STDERR:")
# print(result.stderr)
except Exception as e:
print(f"Error executing {command} as administrator: {e}")
def read_remote_config_sudo(c, remote_path, sudo_pass):
"""
Reads a remote file with sudo and returns its content as a string.
"""
# print(f"\n--- [{c.host}] Reading remote file with sudo: {remote_path} ---")
try:
# Use sudo to cat the file and capture its output
result = c.sudo(f"cat {remote_path}", password=sudo_pass, hide=True)
return result.stdout
except Exception as e:
print(f"Error reading remote file with sudo: {e}")
return None
# Make sure to import shlex at the top of your script
def write_remote_config_sudo(c, remote_path, content, sudo_pass, user_owner, group_owner, permissions):
"""
Writes content directly to a remote file using the 'sudo sh -c' pattern.
This is the most robust and secure method for environments where SFTP is
disabled. It avoids input stream conflicts and prevents shell injection.
Args:
c (fabric.Connection): The active connection object.
remote_path (str): The absolute path to the file on the remote host.
content (str): The string content to be written to the file.
sudo_pass (str): The sudo password for the write operation.
"""
print(f"\n--- [{c.host}] Writing directly using 'sudo sh -c' method: {remote_path} ---")
try:
# Step 1: Securely escape the content for safe shell execution.
# shlex.quote() wraps the string in single quotes and handles any
# internal single quotes, making it safe to pass to a shell.
safe_content = shlex.quote(content)
# Step 2: Construct the command.
# 'sudo sh -c "..."': This runs a new shell ('sh') with root privileges.
# 'echo ... > file': This command is executed *by the root shell*.
# The redirection '>' is therefore handled by root, which has permission
# to write to the protected 'remote_path'.
command = f"sh -c \"echo {safe_content} > {remote_path}\""
print("Step 1: Writing content via root shell...")
# We run this entire command string using c.sudo(). Fabric will handle
# the password prompt correctly for the 'sudo sh' part.
c.sudo(command, password=sudo_pass, hide=True)
# Step 3: Set ownership and permissions as a separate step.
print("Step 2: Setting ownership and permissions...")
c.sudo(f"chown {user_owner}:{group_owner} {remote_path}", password=sudo_pass)
c.sudo(f"chmod {permissions} {remote_path}", password=sudo_pass)
print(f"✅ Successfully wrote content to {remote_path}")
except Exception as e:
print(f"❌ Error writing directly to remote file: {e}")
# Re-raise the exception so the calling function can handle it.
raise
def set_config_field(config_content, option, new_value, add_if_missing=False):
"""
Replaces a configuration value, or optionally adds it if it's missing.
This function is idempotent: running it multiple times with the same
parameters will result in the same configuration state.
Args:
config_content (str): The multi-line string of the configuration file.
option (str): The configuration key to find (e.g., "PermitRootLogin").
new_value (str): The new value to set for the option.
add_if_missing (bool): If True, adds the 'option=new_value' to the end
of the content if it's not found. Defaults to False.
Returns:
str: The modified configuration content.
"""
lines = config_content.splitlines()
new_lines = []
found = False
# First, iterate through the existing lines to find and replace the option.
for line in lines:
# Check if the line is a non-commented definition of our option.
if line.strip().startswith(option + '=') and not line.strip().startswith('#'):
new_lines.append(f"{option}={new_value}")
found = True
else:
# Keep the original line if it's not a match.
new_lines.append(line)
# After checking all lines, if the option was not found AND we are
# instructed to add it, we append it to the end of the configuration.
if not found and add_if_missing:
print(f"Info: Option '{option}' not found. Appending it to the configuration.")
# Ensure the new line is actually on a new line.
if new_lines and new_lines[-1] != '':
new_lines.append('') # Add a blank line for separation if needed
new_lines.append(f"{option}={new_value}")
elif not found:
# This is the original behavior: do nothing if missing and not told to add.
print(f"Warning: Option '{option}' not found. No changes made.")
return "\n".join(new_lines)
def find_config_value(config_content, option):
"""
Finds a specific option in a configuration string and returns its value.
Args:
config_content (str): The multi-line string of the configuration file.
option (str): The configuration key to find (e.g., "PermitRootLogin").
Returns:
str: The value of the option if found.
None: If the option is not found or is commented out.
"""
try:
# Go through each line in the configuration
for line in config_content.splitlines():
# Clean up the line by removing leading/trailing whitespace
clean_line = line.strip()
# Check if the line is not a comment and starts with our option
# The '=' is important to avoid matching partial keys (e.g., 'Port' matching 'Ports')
if not clean_line.startswith('#') and clean_line.startswith(option + '='):
# Split the line only on the first equals sign
# This handles cases where the value itself might contain an '='
parts = clean_line.split('=', 1)
# The value is the second part, stripped of any whitespace
value = parts[1].strip()
return value
except Exception as e:
# In case of any unexpected error (e.g., malformed content), return None
print(f"An error occurred while parsing config: {e}")
return None
# If the loop finishes without finding the option, return None
return None
def cloud_configuration_check(hostname, result, iot_hub, proxy_host, proxy_port):
print(f"\tLight telemetry:", end=" ", flush=True)
status = find_config_value(result, "light-telemetry")
if status == "false":
print(f"", end="\n", flush=True)
else:
print(f"")
print(f"\tTelemetry:", end=" ", flush=True)
status = find_config_value(result, "telemetry-on")
if status == "true":
print(f"", end="\n", flush=True)
else:
print(f"")
print(f"\tCompression:", end=" ", flush=True)
status = find_config_value(result, "compression-enabled")
if status == "true":
print(f"", end="\n", flush=True)
else:
print(f"", end="\n", flush=True)
print(f"\tRemote update:", end=" ", flush=True)
status = find_config_value(result, "remote-update-on")
if status == "true":
print(f"", end="\n", flush=True)
else:
print(f"", end="\n", flush=True)
print(f"\tConnection string:", end="\n", flush=True)
status = find_config_value(result, "connection-string")
parsed_data = parse_connection_string(status)
print(f"\t\tIoT Hub:", end=" ", flush=True)
print(f"{parsed_data.get('HostName')}", end=" ", flush=True)
if parsed_data.get('HostName').strip() == iot_hub.strip():
print(f"", end="\n", flush=True)
else:
print(f"", end="\n", flush=True)
print(f"\t\tDevice ID:", end=" ", flush=True)
print(f"{parsed_data.get('DeviceId')}", end=" ", flush=True)
if parsed_data.get('DeviceId').strip() == hostname.strip():
print(f"", end="\n", flush=True)
else:
print(f"", end="\n", flush=True)
print(f"\tProxy:", end="\n", flush=True)
print(f"\t\tHost:", end=" ", flush=True)
status = find_config_value(result, "proxy-host")
if status == None:
print(f"NA ❌", end="\n", flush=True)
elif status == proxy_host:
print(f"", end="\n", flush=True)
else:
print(f"", end="\n", flush=True)
print(f"\t\tPort:", end=" ", flush=True)
status = find_config_value(result, "proxy-port")
if status == None:
print(f"NA ❌", end="\n", flush=True)
elif status == proxy_port:
print(f"", end="\n", flush=True)
else:
print(f"", end="\n", flush=True)
def parse_connection_string(connection_string):
"""
Parses a semicolon-separated connection string into a dictionary,
handling keys that have an escaped equals sign (e.g., 'Key\\=Value').
"""
parsed_data = {}
# 1. Split the entire string by the semicolon to get each pair
parts = connection_string.split(';')
for part in parts:
# 2. IMPORTANT: Replace the escaped separator '\\=' with a plain '='.
# This is the only replacement needed and uses the correct syntax.
cleaned_part = part.replace('\\=', '=')
# 3. Now, split the cleaned part by the first equals sign
if '=' in cleaned_part:
key, value = cleaned_part.split('=', 1)
# Add the key-value pair to our dictionary, stripping any extra whitespace
parsed_data[key.strip()] = value.strip()
return parsed_data
def find_yaml_value(yaml_content, key_path):
"""
Finds a value in a YAML string using a dot-separated path.
Args:
yaml_content (str): The string content of the YAML file.
key_path (str): A dot-separated path to the key (e.g., "cubeProcess.cyber_check").
Returns:
The value if found, otherwise None.
"""
try:
yaml = YAML()
data = yaml.load(yaml_content)
# Traverse the path
keys = key_path.split('.')
current_level = data
for key in keys:
current_level = current_level[key]
return current_level
except (KeyError, TypeError):
# KeyError if a key is not found, TypeError if trying to index a non-dict
# print(f"Warning: Key path '{key_path}' not found in YAML content.")
return None
def set_yaml_value(yaml_content, key_path, new_value):
"""
Sets a value in a YAML string using a dot-separated path.
Preserves comments, formatting, and quotes thanks to ruamel.yaml.
This version correctly traverses nested keys.
Args:
yaml_content (str): The string content of the YAML file.
key_path (str): A dot-separated path to the key (e.g., "cubeProcess.cyber_check").
new_value: The new value to set.
Returns:
str: The modified YAML content as a string, or the original content on error.
"""
try:
# --- FIX 1: Configure the YAML object to preserve quotes ---
yaml = YAML()
yaml.preserve_quotes = True
yaml.indent(mapping=2, sequence=4, offset=2) # Optional: ensures consistent indentation
data = yaml.load(yaml_content)
# --- FIX 2: Correct traversal logic ---
keys = key_path.split('.')
current_level = data
# Traverse down to the final key's parent dictionary
for key in keys[:-1]:
current_level = current_level[key]
final_key = keys[-1]
# Check if the key exists before setting it
if final_key not in current_level:
print(f"❌ Error: Final key '{final_key}' not found in the structure. Aborting.")
return yaml_content # Return original content
# Set the new value
current_level[final_key] = new_value
# Dump the modified data back to a string
string_stream = io.StringIO()
yaml.dump(data, string_stream)
return string_stream.getvalue()
except (KeyError, TypeError) as e:
print(f"❌ Error: Key path '{key_path}' is invalid or part of the path does not exist. Error: {e}")
return yaml_content # Return original content on failure
def ensure_iptables_port_rule(config_content, target_port, template_port):
"""
Ensures that iptables rules for a target port exist in the configuration.
If rules for the target port are not found, it finds rules for a
template port and replaces the port number.
Args:
config_content (str): The multi-line string of the iptables rules file.
target_port (int or str): The port number that should exist (e.g., 8080).
template_port (int or str): The port number to use as a template (e.g., 443).
Returns:
str: The modified (or original) configuration content.
"""
target_port_str = str(target_port)
template_port_str = str(template_port)
lines = config_content.splitlines()
target_port_found = False
# --- PASS 1: Check if the target port rule already exists ---
for line in lines:
# Check for the target port in an active rule line
# The spaces around the port string prevent accidentally matching '8080' in '18080'
if line.strip().startswith('-A') and (f"--dport {target_port_str}" in line or f"--sport {target_port_str}" in line):
print(f"✅ Info: Rule for target port {target_port_str} already exists. No changes needed.")
target_port_found = True
break
# If the rule was found, return the original content without any changes.
if target_port_found:
return config_content
# --- PASS 2: If we get here, the rule was not found. We must replace the template. ---
print(f"Info: Rule for target port {target_port_str} not found. Searching for template port {template_port_str} to replace.")
new_lines = []
changes_made = False
for line in lines:
# Check for the template port in an active rule line
if line.strip().startswith('-A') and (f"--dport {template_port_str}" in line or f"--sport {template_port_str}" in line):
# This is a line we need to modify
modified_line = line.replace(template_port_str, target_port_str)
new_lines.append(modified_line)
print(f" - Replacing: '{line}'")
print(f" + With: '{modified_line}'")
changes_made = True
else:
# This line doesn't need changing, add it as is.
new_lines.append(line)
if not changes_made:
print(f"❌ Warning: Target port {target_port_str} was not found, AND template port {template_port_str} was also not found. No changes made.")
return config_content # Return original if template wasn't found either
return "\n".join(new_lines)
def write_remote_config_base64_sudo(c, remote_path, content, sudo_pass, user_owner, group_owner, permissions):
"""
Writes content directly to a remote file by passing it as a Base64 string.
This is the most robust method for no-SFTP environments, as it completely
avoids all shell quoting and parsing issues for complex, multi-line content.
Args:
c (fabric.Connection): The active connection object.
remote_path (str): The absolute path to the file on the remote host.
content (str): The string content to be written to the file.
sudo_pass (str): The sudo password for the write operation.
"""
print(f"\n--- [{c.host}] Writing content via Base64 to: {remote_path} ---")
try:
# Step 1: Encode the string content into Base64.
# base64.b64encode requires bytes, so we encode the string to utf-8.
# The result is bytes, so we decode it back to a simple ascii string to use in our command.
base64_content = base64.b64encode(content.encode('utf-8')).decode('ascii')
# Step 2: Construct the command.
# 'echo ... | base64 --decode > file': This pipeline decodes the content
# and redirects the output to the destination file.
# We wrap the entire pipeline in 'sudo sh -c "..."' so that the
# redirection ('>') is performed by a shell running as root.
command = f"sh -c \"echo '{base64_content}' | base64 --decode > {remote_path}\""
print("Step 1: Writing Base64 content via root shell...")
c.sudo(command, password=sudo_pass, hide=True)
# Step 3: Set ownership and permissions.
print("Step 2: Setting ownership and permissions...")
c.sudo(f"chown {user_owner}:{group_owner} {remote_path}", password=sudo_pass)
c.sudo(f"chmod {permissions} {remote_path}", password=sudo_pass)
print(f"✅ Successfully wrote content to {remote_path}")
except Exception as e:
print(f"❌ Error writing Base64 content to remote file: {e}")
# Re-raise the exception for the main loop.
raise
def main():
"""Main function to parse arguments and orchestrate tasks."""
ip_address_prefix = "10.81.60." # Grandpuits subnet
ip_address_range = list(range(193, 215)) # From 193 to 214
# ip_address_range.append(85) # Add 85 after 74.
hosts = [f"{ip_address_prefix}{suffix}" for suffix in ip_address_range]
ssh_port = 11022
ssh_user = os.getenv("DEFAULT_CUBE_LINUX_ADMIN_USER")
ssh_password = os.getenv("DEFAULT_CUBE_LINUX_ADMIN_PASSWORD")
connect_args = {}
connect_args["password"] = ssh_password
connect_args["banner_timeout"] = 3
connect_args["auth_timeout"] = 60
connect_args["channel_timeout"] = 60
connect_args["look_for_keys"] = False
connect_args["allow_agent"] = False
for host in hosts:
print(f"{host}", end=" - ", flush=True)
hostname = ""
result = ""
try:
activate_ssh(host)
except Exception as e:
print(f"Exception: {e}")
continue
with Connection(host=host, user=ssh_user, port=ssh_port, connect_timeout=60, connect_kwargs=connect_args) as c:
try:
print(f"Hostname:", end=" ", flush=True)
result = execute_command(c, "hostname")
print(f"{result.strip()}", end="\n", flush=True)
hostname = str.lower(result)
except Exception as e:
print(f"[Hostname] Exception: {e}")
continue
try:
print(f"Checking Cloud configuration:", end=" ", flush=True)
result = read_remote_config_sudo(c, "/etc/cube/config-azure.properties", ssh_password)
print(f"", end="\n", flush=True)
except Exception as e:
print(f"", end="\n", flush=True)
print(f"[Cloud configuration check] Exception: {e}")
continue
cloud_configuration_check(hostname, result, "iot-ingest-ess-prod.azure-devices.net", "10.81.35.126", "8080")
print(f"Setting proxy configuration:", end="\n", flush=True)
result_proxy_host = set_config_field(result, "proxy-host", "10.81.35.126", True)
result_proxy_host_port = set_config_field(result_proxy_host, "proxy-port", "8080", True)
result = result_proxy_host_port
cloud_configuration_check(hostname, result, "iot-ingest-ess-prod.azure-devices.net", "10.81.35.126", "8080")
response = input(f"Apply the change on {hostname.strip()}? (y)es or (n)o, anything else to cancel - ").lower()
if response in ['y']:
print(f"Applying changes:", end=" ", flush=True)
try:
write_remote_config_sudo(c, "/etc/cube/config-azure.properties", result, ssh_password, "cube", "root", "644")
print(f"", end="\n", flush=True)
except Exception as e:
print(f"", end="\n", flush=True)
print(f"[Proxy configuration] Exception: {e}")
continue
print(f"Checking Cloud configuration:", end=" ", flush=True)
try:
result = read_remote_config_sudo(c, "/etc/cube/config-azure.properties", ssh_password)
print(f"", end="\n", flush=True)
except Exception as e:
print(f"", end="\n", flush=True)
print(f"[Proxy verification] Exception: {e}")
continue
cloud_configuration_check(hostname, result, "iot-ingest-ess-prod.azure-devices.net", "10.81.35.126", "8080")
elif response in ['n']:
print(f"Not applying configuration...")
else:
print(f"Not applying configuration...")
continue
print(f"Disabling Cyber Check:", end=" ", flush=True)
try:
execute_sudo_command(c, "systemctl stop cube-monit.service", ssh_password)
execute_sudo_command(c, "mount -o remount,rw /", ssh_password)
print(f"", end="\n", flush=True)
except Exception as e:
print(f"", end="\n", flush=True)
print(f"[Disabling Cyber Check] Exception: {e}")
continue
print(f"Reading Cyber Check configuration:", end=" ", flush=True)
try:
result = read_remote_config_sudo(c, "/etc/cube-default/configfile_monit.yaml", ssh_password)
print(f"", end="\n", flush=True)
except Exception as e:
print(f"", end="\n", flush=True)
print(f"[Cyber Check configuration] Exception: {e}")
continue
print(f"Checking cyber_check:", end=" ", flush=True)
try:
status = find_yaml_value(result, "cubeProcess.cyber_check")
if status == False:
print(f"", end="\n", flush=True)
else:
print(f"", end="\n", flush=True)
except Exception as e:
print(f"", end="\n", flush=True)
print(f"[cyber_check value] Exception: {e}")
continue
print(f"Modifying cyber_check:", end=" ", flush=True)
modified_result = ""
try:
modified_result = set_yaml_value(result, "cubeProcess.cyber_check", False)
print(f"", end="\n", flush=True)
except Exception as e:
print(f"", end="\n", flush=True)
print(f"[cyber_check modification] Exception: {e}")
continue
print(f"Checking modified cyber_check:", end=" ", flush=True)
try:
status = find_yaml_value(modified_result, "cubeProcess.cyber_check")
if status == False:
print(f"", end="\n", flush=True)
else:
print(f"", end="\n", flush=True)
except Exception as e:
print(f"", end="\n", flush=True)
print(f"[Modified cyber_check value] Exception: {e}")
continue
response = input(f"Apply the change on {hostname.strip()}? (y)es or (n)o, anything else to cancel - ").lower()
if response in ['y']:
print(f"Applying changes:", end=" ", flush=True)
try:
write_remote_config_base64_sudo(c, "/etc/cube-default/configfile_monit.yaml", modified_result, ssh_password, "root", "root", "644")
print(f"", end="\n", flush=True)
except Exception as e:
print(f"", end="\n", flush=True)
print(f"[cyber_check configuration] Exception: {e}")
continue
print(f"Checking cyber_check configuration:", end=" ", flush=True)
try:
result = read_remote_config_sudo(c, "/etc/cube-default/configfile_monit.yaml", ssh_password)
print(f"", end="\n", flush=True)
except Exception as e:
print(f"", end="\n", flush=True)
print(f"[cyber_check configuration] Exception: {e}")
continue
try:
status = find_yaml_value(result, "cubeProcess.cyber_check")
if status == False:
print(f"", end="\n", flush=True)
else:
print(f"", end="\n", flush=True)
except Exception as e:
print(f"", end="\n", flush=True)
print(f"[Modified cyber_check configuration verification] Exception: {e}")
continue
elif response in ['n']:
print(f"Not applying configuration...")
else:
print(f"Not applying configuration...")
continue
print(f"Firewall check:", end="\n", flush=True)
modified_result = ""
try:
result = read_remote_config_sudo(c, "/etc/iptables/iptables-cube.rules", ssh_password)
except Exception as e:
print(f"[Firewall reading] Exception: {e}")
continue
try:
modified_result = ensure_iptables_port_rule(result, 8080, 443)
except Exception as e:
print(f"[Firewall changes] Exception: {e}")
continue
response = input(f"Apply the change on {hostname.strip()}? (y)es or (n)o, anything else to cancel - ").lower()
if response in ['y']:
try:
write_remote_config_base64_sudo(c, "/etc/iptables/iptables-cube.rules", modified_result, ssh_password, "root", "root", 600)
except Exception as e:
print(f"[Firewall configuration] Exception: {e}")
continue
elif response in ['n']:
print(f"Not applying configuration...")
else:
print(f"Not applying configuration...")
continue
print(f"Restarting Cyber Check:", end=" ", flush=True)
try:
execute_sudo_command(c, "mount -o remount,ro /", ssh_password)
execute_sudo_command(c, "systemctl start cube-monit.service", ssh_password)
print(f"", end="\n", flush=True)
except Exception as e:
print(f"", end="\n", flush=True)
print(f"[Restarting Cyber Check] Exception: {e}")
continue
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,106 @@
import pexpect
import getpass
import sys
import time
import os
from dotenv import load_dotenv
load_dotenv(override=True)
# --- Firewall Rule Definitions are correct as is ---
FIREWALL_RULE_1_CMDS = [
"firewall 3", "action accept", "interface LAN_ETH1_CUBE WAN", "protocol TCP",
"mode ip", "src-ip all", "src-port all", "dst-ip all", "dst-port single 8080",
"logging severity 6", "name proxy_retour", "exit",
]
FIREWALL_RULE_2_CMDS = [
"firewall 4", "action accept", "interface WAN LAN_ETH1_CUBE", "protocol TCP",
"mode ip", "src-ip all", "src-port single 8080", "dst-ip all", "dst-port all",
"logging severity 6", "name proxy_aller", "exit",
]
def configure_moxa_firewall_pexpect(router_ip):
ip_address = router_ip
username = os.getenv("DEFAULT_EDR_810_USER")
password = os.getenv("DEFAULT_EDR_810_PASSWORD")
try:
command = f"ssh -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null {username}@{ip_address}"
print(f"--- Starting SSH session...")
child = pexpect.spawn(command, encoding='utf-8', timeout=20)
child.logfile_read = sys.stdout
child.expect("[Pp]assword:")
child.sendline(password)
child.expect(r"[\#\>\$]\s*$")
base_prompt = child.after.strip()
print(f"\n>>> SUCCESSFULLY CONNECTED! Base prompt is: '{base_prompt}'")
# --- Check for existing rules ---
# print("\n--- Checking for existing rules...")
# child.sendline("terminal length 0")
# child.expect(base_prompt)
# child.sendline("show running-config")
# child.expect(base_prompt)
# running_config = child.before
# if "name proxy_retour" in running_config and "name proxy_aller" in running_config:
# print("--- Firewall rules already exist. No action needed.")
# child.sendline("exit")
# child.close()
# return
# print("--- Rules not found. Proceeding with configuration.")
# --- Enter Configuration Mode ---
# print("\n>>> Entering configuration mode...")
# child.sendline("configure")
# config_prompt_re = r"\(config\)#\s*$"
# child.expect(config_prompt_re)
# --- Apply Rule 1 ---
# print("\n>>> Applying Rule 1: proxy_retour")
# for cmd in FIREWALL_RULE_1_CMDS:
# child.sendline(cmd)
# child.expect([r"\(config-firewall\)#\s*$", config_prompt_re])
# time.sleep(0.2)
# --- Apply Rule 2 ---
# print("\n>>> Applying Rule 2: proxy_aller")
# for cmd in FIREWALL_RULE_2_CMDS:
# child.sendline(cmd)
# child.expect([r"\(config-firewall\)#\s*$", config_prompt_re])
# time.sleep(0.2)
# --- THE CORRECTED SAVE LOGIC ---
# 1. Exit from configuration mode to return to the base prompt
# print("\n>>> Exiting configuration mode to save...")
# child.sendline("exit")
# child.expect(base_prompt)
# 2. Now, from the base prompt, issue the save command
print("\n>>> Saving configuration...")
child.sendline("save")
child.expect(base_prompt)
print("\n--- Configuration complete. Closing session.")
child.sendline("exit") # Log out
child.close()
except pexpect.exceptions.TIMEOUT:
print("\n!!! CRITICAL: Connection timed out.")
print(f"--- Last output seen (child.before): {child.before}")
except pexpect.exceptions.EOF:
print("\n!!! CRITICAL: Connection closed unexpectedly.")
print(f"--- Last output seen (child.before): {child.before}")
except Exception as e:
print(f"\n!!! An unexpected error occurred: {e}")
if __name__ == "__main__":
ip_prefix = "10.81.60."
ip_suffixes = list(range(194, 215))
for suffix in ip_suffixes:
router_ip = ip_prefix + str(suffix)
configure_moxa_firewall_pexpect(router_ip)