[WiP] Carling Cloud connectivity

This commit is contained in:
Quentin WEPHRE
2025-10-07 13:36:29 +02:00
parent 013d9c820f
commit 46a3c444dd
4 changed files with 360 additions and 1 deletions

View File

@@ -0,0 +1,196 @@
from azure.iot.hub import IoTHubRegistryManager
from azure.iot.hub.protocol.models import QuerySpecification, Module
from azure.iot.hub.models import CloudToDeviceMethod, CloudToDeviceMethodResult
from dotenv import load_dotenv
from isight_device import iSightDevice
from textual.app import App, ComposeResult
from textual.widgets import Header, Footer, ListView, ListItem, Label, Input, Button, Static, DataTable
from textual.containers import Vertical, VerticalScroll
from textual.binding import Binding
from textual.screen import Screen
from textual import on
from datetime import datetime
import json
import os
load_dotenv()
class ObjectListItem(ListItem):
"""A widget to display an iSightDevice object in the ListView."""
def __init__(self, device: iSightDevice) -> None:
super().__init__()
self.device = device
def compose(self) -> ComposeResult:
"""Create the displayable content of the list item."""
# This line is customized to display the attributes of your iSightDevice object.
# It shows site, number, device_id, and version.
yield Label(f"{self.device.getDeviceId()} | {self.device.getSite()} | {self.device.getNumber()} ({self.device.getVersion()})")
class ModuleScreen(Screen):
BINDINGS = [Binding("escape", "app.pop_screen", "Go Back")]
def __init__(self, device: iSightDevice, device_modules: list[Module]):
super().__init__()
self.device = device
self.device_modules = device_modules
def compose(self) -> ComposeResult:
yield Header(f"IoT Edge modules of {self.device.getDeviceId}")
yield DataTable()
yield Footer()
def on_mount(self) -> None:
table = self.query_one(DataTable)
table.add_columns("Module", "Status", "Since", "Last activity")
for module in self.device_modules:
local_connection_time = datetime.fromisoformat(str(module.connection_state_updated_time)).astimezone().replace(microsecond=0).isoformat()
local_activity_time = datetime.fromisoformat(str(module.last_activity_time)).astimezone().replace(microsecond=0).isoformat()
table.add_row(f"{module.module_id}", f"{module.connection_state}", f"{local_connection_time}", f"{local_activity_time}")
## NEW: The Screen for showing device details and actions
class DetailScreen(Screen):
"""A screen to display details and actions for a single device."""
BINDINGS = [
Binding("escape", "app.pop_screen", "Go Back"),
]
def __init__(self, device: iSightDevice, registry_manager: IoTHubRegistryManager) -> None:
super().__init__()
self.device = device
self.registry_manager = registry_manager
def compose(self) -> ComposeResult:
yield Header(name=f"Details for {self.device.getDeviceId()}")
# Use VerticalScroll so the layout works on small terminals
with VerticalScroll(id="details-container"):
# Static, non-interactive information section
yield Static(f"[b]Site:[/b] {self.device.getSite()}", markup=True)
yield Static(f"[b]Number:[/b] {self.device.getNumber()}", markup=True)
yield Static(f"[b]Version:[/b] {self.device.getVersion()}", markup=True)
yield Static(f"[b]Device ID:[/b] {self.device.getDeviceId()}", markup=True)
# Interactive action buttons
yield Static("\n[b]Actions:[/b]", markup=True)
yield Button("Check IoT Edge modules", variant="primary", id="modules")
yield Footer()
## NEW: Handlers for button presses
@on(Button.Pressed, "#modules")
def check_modules(self) -> None:
device_modules = self.registry_manager.get_modules(self.device.getDeviceId())
device_modules.sort(key=lambda m: m.module_id)
self.app.push_screen(ModuleScreen(self.device, device_modules))
# @on(Button.Pressed, "#logs")
# def check_logs(self) -> None:
# # Placeholder for your logging logic
# self.app.notify(f"Fetching logs for {self.device.getDeviceId()}...")
# @on(Button.Pressed, "#diag")
# def run_diagnostics(self) -> None:
# # Placeholder for your diagnostics logic
# self.app.notify(f"Running diagnostics on {self.device.getDeviceId()}...")
# @on(Button.Pressed, "#delete")
# def delete_device(self) -> None:
# # Placeholder for delete logic
# self.app.notify(f"Deleting {self.device.getDeviceId()} is not yet implemented.", severity="error")
# # You would likely want a confirmation dialog here in a real app.
class DeviceTUI(App):
"""An interactive TUI to list and filter iSightDevice objects."""
BINDINGS = [
Binding("q", "quit", "Quit"),
]
def __init__(self, devices: list[iSightDevice], registry_manager: IoTHubRegistryManager):
super().__init__()
self.all_devices = devices
self.filtered_devices = self.all_devices[:]
self.registry_manager = registry_manager
def compose(self) -> ComposeResult:
"""Create child widgets for the app."""
yield Header(name="iSight Device Viewer")
yield Input(placeholder="Filter by site...")
with Vertical(id="list-container"):
yield ListView(*[ObjectListItem(dev) for dev in self.filtered_devices], id="device-list")
yield Footer()
def on_mount(self) -> None:
"""Called when the app is first mounted."""
self.query_one(Input).focus()
def on_input_changed(self, event: Input.Changed) -> None:
"""Handle changes to the input field and filter the list."""
filter_text = event.value.lower()
self.filter_devices(filter_text)
def filter_devices(self, filter_text: str):
"""Filter the list of devices based on the site."""
if filter_text:
self.filtered_devices = [
dev for dev in self.all_devices if filter_text in dev.getSite().lower()
]
else:
self.filtered_devices = self.all_devices[:]
list_view = self.query_one(ListView)
list_view.clear()
for dev in self.filtered_devices:
list_view.append(ObjectListItem(dev))
def on_list_view_selected(self, event: ListView.Selected) -> None:
"""Handle item selection in the ListView."""
selected_item = event.item
if isinstance(selected_item, ObjectListItem):
selected_device = selected_item.device
# Instead of a notification, we push the new detail screen
self.push_screen(DetailScreen(device=selected_device, registry_manager=registry_manager))
if __name__ == "__main__":
CONNECTION_STRING = str(os.getenv("CONNECTION_STRING_INOX_PROD"))
if CONNECTION_STRING == "":
print("Provide a connection string for the Iot Hub before running the script!")
exit(13)
print(f"Connecting to IoT Hub ", end="")
try:
registry_manager = IoTHubRegistryManager.from_connection_string(CONNECTION_STRING)
except Exception as e:
print(f"")
print(f"Error {e}")
exit
print(f"")
print(f"Getting list of IoT Edge devices ", end="")
try:
query_spec = QuerySpecification(query="SELECT * FROM devices WHERE capabilities.iotEdge = true ")
query_result = registry_manager.query_iot_hub(query_spec)
except Exception as e:
print(f"")
print(f"Error {e}")
exit
print(f"")
print(f"Sorting {len(query_result.items)} devices ", end="")
devices = []
for item in query_result.items:
devices.append(iSightDevice(str(item.device_id), str(item.tags['site']), int(item.tags['number']), str(item.tags['version'])))
devices.sort(key = lambda d: (d.site, d.number))
print(f"")
app = DeviceTUI(devices=devices, registry_manager=registry_manager)
app.run()

View File

@@ -0,0 +1,159 @@
from paramiko import SSHClient, AutoAddPolicy
import paramiko
from cube_activate_ssh import activate_ssh
from dotenv import load_dotenv
import os
import sys
import shlex
from scp import SCPClient
import time
def resource_path(relative_path):
""" Get absolute path to resource, works for dev and for PyInstaller """
try:
# PyInstaller creates a temp folder and stores path in _MEIPASS
base_path = sys._MEIPASS
except Exception:
base_path = os.path.abspath(".")
return os.path.join(base_path, relative_path)
dotenv_path = resource_path('.env')
load_dotenv(dotenv_path=dotenv_path)
ip_address_prefix = "10.81.35." # Carling subnet
ip_address_range = list(range(65, 75)) # From 65 to 74
ip_address_range.append(85) # Add 85 after 74.
ENV_SSH = {
"DEFAULT_CUBE_LINUX_ADMIN_USER": os.getenv("DEFAULT_CUBE_LINUX_ADMIN_USER"),
"DEFAULT_CUBE_LINUX_ADMIN_PASSWORD": os.getenv("DEFAULT_CUBE_LINUX_ADMIN_PASSWORD")
}
ssh_username = ENV_SSH["DEFAULT_CUBE_LINUX_ADMIN_USER"]
ssh_password = ENV_SSH["DEFAULT_CUBE_LINUX_ADMIN_PASSWORD"]
def execute_ssh_command(ip, command, client):
try:
stdin, stdout, stderr = client.exec_command(command, timeout=180)
exit_status = stdout.channel.recv_exit_status()
result = stdout.read().decode().lower().strip()
error = stderr.read().decode('utf-8')
if exit_status == 0:
print(f"")
else:
print(f"")
print(f"{error}")
raise Exception(f"{str(error)}")
return result
except Exception as e:
print(f"SSH error: {str(e)} --- {str(error)}", flush=True)
raise
finally:
client.close()
def execute_sudo_ssh_command(ip, command, client):
try:
quoted_command = f"bash -c {shlex.quote(command)}"
sudo_command = f"sudo -S -p '' {quoted_command}"
stdin, stdout, stderr = client.exec_command(sudo_command, timeout=180)
time.sleep(3)
stdin.write(ssh_password + '\n')
stdin.flush()
exit_status = stdout.channel.recv_exit_status()
output = stdout.read().decode('utf-8')
error = stderr.read().decode('utf-8')
if exit_status == 0:
print(f"{output}")
else:
print(f"")
print(f"{error}")
raise Exception("Error during SSH sudo command.")
result = stdout.read().decode().lower().strip()
return result
except Exception as e:
raise
finally:
client.close()
def scp_get_file(ip, remote_path, local_path):
client = SSHClient()
client.set_missing_host_key_policy(AutoAddPolicy())
local_path = os.path.expanduser(local_path)
local_path = os.path.abspath(local_path)
local_dir = os.path.dirname(local_path)
if local_dir:
os.makedirs(local_dir, exist_ok=True)
try:
client.connect(
ip,
port=11022,
username=ssh_username,
password=ssh_password,
allow_agent=False,
look_for_keys=False,
timeout=180
)
client.get_transport().set_keepalive(5)
with SCPClient(client.get_transport()) as scp:
scp.get(remote_path, local_path)
except Exception as e:
raise
finally:
client.close()
def main():
print(f"Starting...\n", flush=True)
for i in ip_address_range:
ip_address = f"{ip_address_prefix}{i}"
print(f"[{time.ctime(time.time())}] {str(i)} ({ip_address})", end=" ", flush=True)
try:
activate_ssh(ip_address)
except Exception as e:
print(f"SSH activation failed for {ip_address}:", flush=True)
print(f"{e}", flush=True)
print(f"Skipping CUBE...", flush=True)
continue
cube_id = "NA"
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(ip_address, port=11022, username=ssh_username, password=ssh_password, allow_agent=False, look_for_keys=False, timeout=180, banner_timeout=180)
client.get_transport().set_keepalive(5)
try:
cube_id = execute_ssh_command(ip_address, "hostname", client)
print(f"{cube_id}", flush=True)
except Exception as e:
print(f"cube-xxxxx ❌", flush=True)
print(f"Error getting hostname for {ip_address}:", flush=True)
print(f"{e}", flush=True)
print(f"Skipping CUBE...", flush=True)
continue
try:
result = execute_ssh_command(ip_address, "cat /etc/cube", client)
print(f"{result}", flush=True)
except Exception as e:
print(f"Error getting Cloud settings for {cube_id}.", flush=True)
print(f"{e}", flush=True)
print(f"Skipping CUBE...", flush=True)
continue
if __name__ == "__main__":
main()

View File

@@ -31,6 +31,9 @@ class iSightDevice:
def getNumber(self): def getNumber(self):
return f"{self.number}" return f"{self.number}"
def getVersion(self):
return f"{self.cloudVersion}"
class CubeDevice: class CubeDevice:
def __init__(self, deviceId: str): def __init__(self, deviceId: str):
if not isinstance(deviceId, str): if not isinstance(deviceId, str):

View File

@@ -6,3 +6,4 @@ requests
pandas pandas
openpyxl openpyxl
scp scp
textual