from azure.iot.hub import IoTHubRegistryManager from azure.iot.hub.protocol.models import QuerySpecification, Module from azure.iot.hub.models import CloudToDeviceMethod, CloudToDeviceMethodResult from dotenv import load_dotenv from isight_device import iSightDevice from collections import defaultdict import copy import json import os import logging import sys import re # from 1 to 5 (Modbus) # PCS # AC_Resistance # DC_Resistance # Hz # DCV # VL1L2 # VL2L3 # VL3L1 def resolve_missing_topic(topic_name, available_list): """ Prompts user to select a topic or create a new one. Returns the selected ID or 0 if creation is requested. """ print(f"Topic {topic_name} not found!") print(f"0. Create a new {topic_name} topic") # Display available options for idx, item in enumerate(available_list): print(f"{idx + 1}. {item['outputTopic']}") while True: try: choice = int(input(f"Select an option for {topic_name}: ")) if choice == 0: return 0 if 1 <= choice <= len(available_list): selected = available_list[choice - 1] return selected['id'] print("Invalid number, please select from the list.") except ValueError: print("Please enter a valid number.") def generate_oe(device_id, modbus_tags): return { 'description': '', 'outputTopic': 'REL_OE_PCS', 'properties': [ {'key': 'cdid', 'value': device_id}, {'key': 'deviceType', 'value': 'AC_GATEWAY'} ], 'tags': { 'modbus_tcp_master': modbus_tags }, 'sendOutThreshold': { 'mode': 'immediately', 'size': 4096, 'time': 60, 'sizeIdleTimer': { 'enable': True, 'time': 60 } }, 'minPublishInterval': 0, 'samplingMode': 'allChangedValues', 'customSamplingRate': False, 'pollingInterval': 0, 'onChange': True, 'enable': True, 'format': ( '{deviceId:("' + device_id + '"),emissionDate:(now|todateiso8601),deviceType:("AC_GATEWAY"),' 'metrics:[{idAsset:(.srcName),name:(.tagName),value:.dataValue,' 'timestamp:((.ts/1000000)|todateiso8601)}]}' ) } def generate_12h(device_id, modbus_tags): return { 'description': '', 'outputTopic': 'REL_OE_PCS_12H', 'properties': [ {'key': 'cdid', 'value': device_id}, {'key': 'deviceType', 'value': 'AC_GATEWAY'} ], 'tags': { 'modbus_tcp_master': modbus_tags }, 'sendOutThreshold': { 'mode': 'byTime', 'size': 4096, 'time': 43200, 'sizeIdleTimer': { 'enable': True, 'time': 60 } }, 'minPublishInterval': 0, 'samplingMode': 'latestValues', 'customSamplingRate': False, 'pollingInterval': 43200, 'onChange': False, 'enable': True, 'format': ( '{deviceId:("' + device_id + '"),emissionDate:(now|todateiso8601),deviceType:("AC_GATEWAY"),' 'metrics:[{idAsset:(.srcName),name:(.tagName),value:.dataValue,' 'timestamp:((.ts/1000000)|todateiso8601)}]}' ) } # Configure logger to use stderr logging.basicConfig( level=logging.ERROR, format='%(asctime)s - %(levelname)s - %(message)s', stream=sys.stderr # <- Key line ) logger = logging.getLogger(__name__) # .env containing secrets to authorize access to IoT Hub load_dotenv() CONNECTION_STRING = str(os.getenv("CONNECTION_STRING_INOX_PROD")) if CONNECTION_STRING == "": print("Provide a connection string for the Iot Hub before running the script!") exit(13) # IoT Edge module and method that allow Moxa AIG-301 management module_id = "thingspro-agent" method_name = "thingspro-api-v1" # Connection to IoT Hub and listing of IoT Edge device (so AIG-301 in the scope of SAFT) registry_manager = IoTHubRegistryManager.from_connection_string(CONNECTION_STRING) query_spec = QuerySpecification(query="SELECT * FROM devices WHERE capabilities.iotEdge = true ") query_result = registry_manager.query_iot_hub(query_spec) # Sorting devices by site devices = [] try: for item in query_result.items: if str(item.connection_state) == "Connected": currentDevice = iSightDevice(str(item.device_id), str(item.tags['site']), int(item.tags['number']), str(item.tags['version'])) devices.append(iSightDevice(str(item.device_id), str(item.tags['site']), int(item.tags['number']), str(item.tags['version']))) devices.sort(key = lambda d: (d.site, d.number)) else: print(f"Device not connected: {str(item.tags['site'])} {int(item.tags['number'])} {str(item.device_id)}") except KeyboardInterrupt: # This block runs when user presses Ctrl+C print("\nProgram killed by user (Ctrl+C).") sys.exit(0) # List of the tags that are a status or alert on_event_tags = [ "1365", "AC_Insulation_Status", "ACSw_Status", "ActErrNo1", "ActErrNo10", "ActErrNo2", "ActErrNo3", "ActErrNo4", "ActErrNo5", "ActErrNo6", "ActErrNo7", "ActErrNo8", "ActErrNo9", "BESS_Control_Mode", "BESS_Control_Mode_2", "Calibration_Required", "CO_Sensor_1", "CO_Sensor_2", "CO_Sensor_3", "CO_Sensor_4", "CO_Sensor_5", "Combi_Detector_1", "Combi_Detector_2", "Combi_Detector_3", "Combi_Detector_4", "Combi_Detector_5", "Combi_Detector_6", "DC_Insulation_Status", "DCSw_1_Status", "DCSw_2_Status", "DCSw_3_Status", "Digital_Input_Line_0", "DIL0", "DIL1", "DIL2", "Door_Open", "EMS_Control", "Energy_Fault", "ESS_Status", "Extinguisher_Activated", "Fault_Id_tmp", "Fault_Module_Id", "Fault_Status", "General_Fault", "InvSt_tmp", "Last_Event", "Loss_of_Agent", "Manual_mode", "Manual_Pull_Station", "MV_Transformer_Gas_Monitoring", "MV_Transformer_Oil_Level_Trip", "MV_Transformer_Pressure", "MV_Transformer_Pressure_Trip", "MV_Transformer_Temperature_Alarm", "MV_Transformer_Temperature_Warning", "NbModule", "NbModule_Run", "Reg_Request", "Version", "Warn_Id" ] try: for device in devices: print(f"{device.getSite()} {device.getNumber()} {device.getDeviceId()}") modbus_tags_to_set_on_event = {} id_on_event_topic = -1 id_12h_topic = -1 current_device_modules = registry_manager.get_modules(device.deviceId) for module in current_device_modules: if (module.module_id == module_id): device.setModule(module) # This block get all the Modbus tags available and cross check them with the list of tags that require changes payload = '{"method":"GET", "path":"/tags/list"}' try: direct_method = CloudToDeviceMethod(method_name=method_name, payload=json.loads(payload)) response = registry_manager.invoke_device_module_method(device_id=device.deviceId, module_id=module_id, direct_method_request=direct_method) device_tags = json.dumps(response.payload, indent = 2) parsed_tags = json.loads(device_tags) for item in parsed_tags['data']: if item['prvdName'] == "modbus_tcp_master" and item['tagName'] in on_event_tags: modbus_tags_to_set_on_event.setdefault(item['srcName'], []).append(item['tagName']) except Exception as e: logger.error(f"Exception when getting list of device's Modbus tags that can be set On Event:\n{str(e)}") payload = '{"method":"GET", "path":"/azure-iotedge/messages"}' try: direct_method = CloudToDeviceMethod(method_name=method_name, payload=json.loads(payload)) response = registry_manager.invoke_device_module_method(device_id=device.deviceId, module_id=module_id, direct_method_request=direct_method) device_topics = json.dumps(response.payload, indent = 2) parsed_topics = json.loads(device_topics) for item in parsed_topics["data"]: if item['enable'] and item["outputTopic"] == "REL_OE_PCS": id_on_event_topic = item['id'] if item['enable'] and item["outputTopic"] == "REL_OE_PCS_12H": id_12h_topic = item['id'] available_topics = [ item for item in parsed_topics["data"] if item['enable'] and item['id'] != id_on_event_topic and item['id'] != id_12h_topic ] if id_on_event_topic == -1: id_on_event_topic = resolve_missing_topic("REL_OE_PCS", available_topics) if id_12h_topic == -1: id_12h_topic = resolve_missing_topic("REL_OE_PCS_12H", available_topics) targets = [ {"id": id_on_event_topic, "name": "OnEvent"}, {"id": id_12h_topic, "name": "12h"} ] for target in targets: target_id = target["id"] target_name = target["name"] if target_id > 0: topic_item = next((item for item in parsed_topics["data"] if item["id"] == target_id), None) if topic_item: original_tags = topic_item.get("tags", {}).get("modbus_tcp_master", {}) else: # Pure error handling, detected ID does not exist print(f"Error: ID {target_id} not found. Skipping.") continue else: # Not error handling, target_id == 0 means creation of new topic original_tags = {} final_tag_list = copy.deepcopy(original_tags) for device_name, tags_to_add in modbus_tags_to_set_on_event.items(): if device_name not in final_tag_list: final_tag_list[device_name] = [] current_list = final_tag_list[device_name] for tag in tags_to_add: if tag not in current_list: current_list.append(tag) # CASE A: CREATE NEW if target_id == 0: print(f"[{target_name}] ID is 0 -> ACTION: CREATE NEW TOPIC") elif final_tag_list != original_tags: print(f"[{target_name}] ID {target_id} -> ACTION: UPDATE EXISTING") else: print(f"[{target_name}] ID {target_id} -> ACTION: NONE (Already up to date)") # Removing tags from existing topics: cleaned_topic protected_ids = [id_on_event_topic, id_12h_topic] for item in parsed_topics["data"]: if item['id'] in protected_ids or item['enable'] == False: continue original_tags = item.get("tags", {}).get("modbus_tcp_master", {}) try: idAsset = re.search(r"idAsset\s*:\s*\((.*?)\)", item.get("format", "")).group(1).strip() except Exception as e: print(f"Exception while getting idAsset: {item}") continue if not original_tags: continue final_tag_list = copy.deepcopy(original_tags) modification_needed = False for device_name, tags_to_remove in modbus_tags_to_set_on_event.items(): if device_name in final_tag_list: current_list = final_tag_list[device_name] new_list = [t for t in current_list if t not in tags_to_remove] if len(new_list) != len(current_list): final_tag_list[device_name] = new_list modification_needed = True if not final_tag_list[device_name]: del final_tag_list[device_name] if modification_needed: print(f"-> Applying update for topic: {item['outputTopic']} (ID: {item['id']} & {idAsset})") except Exception as e: logger.error(f"Exception when getting all Telemetry Topics:\n{str(e)}") user_input = 'Y' # input("Continue? Y/n ").lower().strip() if user_input == 'n': print("Stopping loop.") break except KeyboardInterrupt: # This block runs when user presses Ctrl+C print("\nProgram killed by user (Ctrl+C).") sys.exit(0)