diff --git a/Python/azure_iot_hub_manage_iot_edge_devices.py b/Python/azure_iot_hub_manage_iot_edge_devices.py new file mode 100644 index 0000000..797d5f2 --- /dev/null +++ b/Python/azure_iot_hub_manage_iot_edge_devices.py @@ -0,0 +1,196 @@ +from azure.iot.hub import IoTHubRegistryManager +from azure.iot.hub.protocol.models import QuerySpecification, Module +from azure.iot.hub.models import CloudToDeviceMethod, CloudToDeviceMethodResult +from dotenv import load_dotenv +from isight_device import iSightDevice + +from textual.app import App, ComposeResult +from textual.widgets import Header, Footer, ListView, ListItem, Label, Input, Button, Static, DataTable +from textual.containers import Vertical, VerticalScroll +from textual.binding import Binding +from textual.screen import Screen +from textual import on + +from datetime import datetime + +import json +import os + +load_dotenv() + +class ObjectListItem(ListItem): + """A widget to display an iSightDevice object in the ListView.""" + + def __init__(self, device: iSightDevice) -> None: + super().__init__() + self.device = device + + def compose(self) -> ComposeResult: + """Create the displayable content of the list item.""" + # This line is customized to display the attributes of your iSightDevice object. + # It shows site, number, device_id, and version. + yield Label(f"{self.device.getDeviceId()} | {self.device.getSite()} | {self.device.getNumber()} ({self.device.getVersion()})") + +class ModuleScreen(Screen): + + BINDINGS = [Binding("escape", "app.pop_screen", "Go Back")] + + def __init__(self, device: iSightDevice, device_modules: list[Module]): + super().__init__() + self.device = device + self.device_modules = device_modules + + def compose(self) -> ComposeResult: + yield Header(f"IoT Edge modules of {self.device.getDeviceId}") + yield DataTable() + yield Footer() + + def on_mount(self) -> None: + table = self.query_one(DataTable) + table.add_columns("Module", "Status", "Since", "Last activity") + for module in self.device_modules: + local_connection_time = datetime.fromisoformat(str(module.connection_state_updated_time)).astimezone().replace(microsecond=0).isoformat() + local_activity_time = datetime.fromisoformat(str(module.last_activity_time)).astimezone().replace(microsecond=0).isoformat() + table.add_row(f"{module.module_id}", f"{module.connection_state}", f"{local_connection_time}", f"{local_activity_time}") + + +## NEW: The Screen for showing device details and actions +class DetailScreen(Screen): + """A screen to display details and actions for a single device.""" + + BINDINGS = [ + Binding("escape", "app.pop_screen", "Go Back"), + ] + + def __init__(self, device: iSightDevice, registry_manager: IoTHubRegistryManager) -> None: + super().__init__() + self.device = device + self.registry_manager = registry_manager + + def compose(self) -> ComposeResult: + yield Header(name=f"Details for {self.device.getDeviceId()}") + + # Use VerticalScroll so the layout works on small terminals + with VerticalScroll(id="details-container"): + # Static, non-interactive information section + yield Static(f"[b]Site:[/b] {self.device.getSite()}", markup=True) + yield Static(f"[b]Number:[/b] {self.device.getNumber()}", markup=True) + yield Static(f"[b]Version:[/b] {self.device.getVersion()}", markup=True) + yield Static(f"[b]Device ID:[/b] {self.device.getDeviceId()}", markup=True) + + # Interactive action buttons + yield Static("\n[b]Actions:[/b]", markup=True) + yield Button("Check IoT Edge modules", variant="primary", id="modules") + + yield Footer() + + ## NEW: Handlers for button presses + @on(Button.Pressed, "#modules") + def check_modules(self) -> None: + device_modules = self.registry_manager.get_modules(self.device.getDeviceId()) + device_modules.sort(key=lambda m: m.module_id) + self.app.push_screen(ModuleScreen(self.device, device_modules)) + + # @on(Button.Pressed, "#logs") + # def check_logs(self) -> None: + # # Placeholder for your logging logic + # self.app.notify(f"Fetching logs for {self.device.getDeviceId()}...") + + # @on(Button.Pressed, "#diag") + # def run_diagnostics(self) -> None: + # # Placeholder for your diagnostics logic + # self.app.notify(f"Running diagnostics on {self.device.getDeviceId()}...") + + # @on(Button.Pressed, "#delete") + # def delete_device(self) -> None: + # # Placeholder for delete logic + # self.app.notify(f"Deleting {self.device.getDeviceId()} is not yet implemented.", severity="error") + # # You would likely want a confirmation dialog here in a real app. + + +class DeviceTUI(App): + """An interactive TUI to list and filter iSightDevice objects.""" + + BINDINGS = [ + Binding("q", "quit", "Quit"), + ] + + def __init__(self, devices: list[iSightDevice], registry_manager: IoTHubRegistryManager): + super().__init__() + self.all_devices = devices + self.filtered_devices = self.all_devices[:] + self.registry_manager = registry_manager + + def compose(self) -> ComposeResult: + """Create child widgets for the app.""" + yield Header(name="iSight Device Viewer") + yield Input(placeholder="Filter by site...") + with Vertical(id="list-container"): + yield ListView(*[ObjectListItem(dev) for dev in self.filtered_devices], id="device-list") + yield Footer() + + def on_mount(self) -> None: + """Called when the app is first mounted.""" + self.query_one(Input).focus() + + def on_input_changed(self, event: Input.Changed) -> None: + """Handle changes to the input field and filter the list.""" + filter_text = event.value.lower() + self.filter_devices(filter_text) + + def filter_devices(self, filter_text: str): + """Filter the list of devices based on the site.""" + if filter_text: + self.filtered_devices = [ + dev for dev in self.all_devices if filter_text in dev.getSite().lower() + ] + else: + self.filtered_devices = self.all_devices[:] + + list_view = self.query_one(ListView) + list_view.clear() + for dev in self.filtered_devices: + list_view.append(ObjectListItem(dev)) + + def on_list_view_selected(self, event: ListView.Selected) -> None: + """Handle item selection in the ListView.""" + selected_item = event.item + if isinstance(selected_item, ObjectListItem): + selected_device = selected_item.device + # Instead of a notification, we push the new detail screen + self.push_screen(DetailScreen(device=selected_device, registry_manager=registry_manager)) + +if __name__ == "__main__": + CONNECTION_STRING = str(os.getenv("CONNECTION_STRING_INOX_PROD")) + if CONNECTION_STRING == "": + print("Provide a connection string for the Iot Hub before running the script!") + exit(13) + + print(f"Connecting to IoT Hub ", end="") + try: + registry_manager = IoTHubRegistryManager.from_connection_string(CONNECTION_STRING) + except Exception as e: + print(f"❌") + print(f"Error {e}") + exit + print(f"✅") + + print(f"Getting list of IoT Edge devices ", end="") + try: + query_spec = QuerySpecification(query="SELECT * FROM devices WHERE capabilities.iotEdge = true ") + query_result = registry_manager.query_iot_hub(query_spec) + except Exception as e: + print(f"❌") + print(f"Error {e}") + exit + print(f"✅") + + print(f"Sorting {len(query_result.items)} devices ", end="") + devices = [] + for item in query_result.items: + devices.append(iSightDevice(str(item.device_id), str(item.tags['site']), int(item.tags['number']), str(item.tags['version']))) + devices.sort(key = lambda d: (d.site, d.number)) + print(f"✅") + + app = DeviceTUI(devices=devices, registry_manager=registry_manager) + app.run() \ No newline at end of file diff --git a/Python/cube_ssh_set_proxy.py b/Python/cube_ssh_set_proxy.py new file mode 100644 index 0000000..01a1b1d --- /dev/null +++ b/Python/cube_ssh_set_proxy.py @@ -0,0 +1,159 @@ +from paramiko import SSHClient, AutoAddPolicy +import paramiko +from cube_activate_ssh import activate_ssh +from dotenv import load_dotenv +import os +import sys +import shlex +from scp import SCPClient +import time + +def resource_path(relative_path): + """ Get absolute path to resource, works for dev and for PyInstaller """ + try: + # PyInstaller creates a temp folder and stores path in _MEIPASS + base_path = sys._MEIPASS + except Exception: + base_path = os.path.abspath(".") + + return os.path.join(base_path, relative_path) + +dotenv_path = resource_path('.env') +load_dotenv(dotenv_path=dotenv_path) + +ip_address_prefix = "10.81.35." # Carling subnet +ip_address_range = list(range(65, 75)) # From 65 to 74 +ip_address_range.append(85) # Add 85 after 74. + +ENV_SSH = { + "DEFAULT_CUBE_LINUX_ADMIN_USER": os.getenv("DEFAULT_CUBE_LINUX_ADMIN_USER"), + "DEFAULT_CUBE_LINUX_ADMIN_PASSWORD": os.getenv("DEFAULT_CUBE_LINUX_ADMIN_PASSWORD") +} + +ssh_username = ENV_SSH["DEFAULT_CUBE_LINUX_ADMIN_USER"] +ssh_password = ENV_SSH["DEFAULT_CUBE_LINUX_ADMIN_PASSWORD"] + +def execute_ssh_command(ip, command, client): + try: + stdin, stdout, stderr = client.exec_command(command, timeout=180) + exit_status = stdout.channel.recv_exit_status() + + result = stdout.read().decode().lower().strip() + error = stderr.read().decode('utf-8') + + if exit_status == 0: + print(f"✅") + else: + print(f"❌") + print(f"{error}") + raise Exception(f"{str(error)}") + return result + except Exception as e: + print(f"SSH error: {str(e)} --- {str(error)}", flush=True) + raise + finally: + client.close() + +def execute_sudo_ssh_command(ip, command, client): + try: + quoted_command = f"bash -c {shlex.quote(command)}" + sudo_command = f"sudo -S -p '' {quoted_command}" + stdin, stdout, stderr = client.exec_command(sudo_command, timeout=180) + time.sleep(3) + stdin.write(ssh_password + '\n') + stdin.flush() + exit_status = stdout.channel.recv_exit_status() + + output = stdout.read().decode('utf-8') + error = stderr.read().decode('utf-8') + if exit_status == 0: + + print(f"{output}") + else: + print(f"❌") + print(f"{error}") + raise Exception("Error during SSH sudo command.") + + result = stdout.read().decode().lower().strip() + return result + except Exception as e: + raise + finally: + client.close() + +def scp_get_file(ip, remote_path, local_path): + client = SSHClient() + client.set_missing_host_key_policy(AutoAddPolicy()) + + local_path = os.path.expanduser(local_path) + local_path = os.path.abspath(local_path) + + local_dir = os.path.dirname(local_path) + if local_dir: + os.makedirs(local_dir, exist_ok=True) + + + try: + client.connect( + ip, + port=11022, + username=ssh_username, + password=ssh_password, + allow_agent=False, + look_for_keys=False, + timeout=180 + ) + + client.get_transport().set_keepalive(5) + with SCPClient(client.get_transport()) as scp: + scp.get(remote_path, local_path) + + except Exception as e: + raise + finally: + client.close() + +def main(): + + print(f"Starting...\n", flush=True) + + for i in ip_address_range: + ip_address = f"{ip_address_prefix}{i}" + print(f"[{time.ctime(time.time())}] {str(i)} ({ip_address})", end=" ", flush=True) + + try: + activate_ssh(ip_address) + except Exception as e: + print(f"SSH activation failed for {ip_address}:", flush=True) + print(f"{e}", flush=True) + print(f"Skipping CUBE...", flush=True) + continue + + cube_id = "NA" + + client = paramiko.SSHClient() + client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + client.connect(ip_address, port=11022, username=ssh_username, password=ssh_password, allow_agent=False, look_for_keys=False, timeout=180, banner_timeout=180) + client.get_transport().set_keepalive(5) + + try: + cube_id = execute_ssh_command(ip_address, "hostname", client) + print(f"{cube_id} ✅", flush=True) + except Exception as e: + print(f"cube-xxxxx ❌", flush=True) + print(f"Error getting hostname for {ip_address}:", flush=True) + print(f"{e}", flush=True) + print(f"Skipping CUBE...", flush=True) + continue + + try: + result = execute_ssh_command(ip_address, "cat /etc/cube", client) + print(f"{result}", flush=True) + except Exception as e: + print(f"Error getting Cloud settings for {cube_id}.", flush=True) + print(f"{e}", flush=True) + print(f"Skipping CUBE...", flush=True) + continue + +if __name__ == "__main__": + main() diff --git a/Python/isight_device.py b/Python/isight_device.py index 5f4414a..143048b 100644 --- a/Python/isight_device.py +++ b/Python/isight_device.py @@ -31,6 +31,9 @@ class iSightDevice: def getNumber(self): return f"{self.number}" + def getVersion(self): + return f"{self.cloudVersion}" + class CubeDevice: def __init__(self, deviceId: str): if not isinstance(deviceId, str): diff --git a/Python/requirements.txt b/Python/requirements.txt index e376340..3377648 100644 --- a/Python/requirements.txt +++ b/Python/requirements.txt @@ -5,4 +5,5 @@ paramiko requests pandas openpyxl -scp \ No newline at end of file +scp +textual \ No newline at end of file