From fcc8c811cc9ee60d1c65d2d02d0fca47eb216ab8 Mon Sep 17 00:00:00 2001 From: Quentin WEPHRE Date: Thu, 26 Mar 2026 08:48:53 +0100 Subject: [PATCH] more on event topic for less cloud usage --- Python/on_event_topics.py | 144 ++++++++------ Python/on_event_topics_2.py | 366 ++++++++++++++++++++++++++++++++++++ 2 files changed, 449 insertions(+), 61 deletions(-) create mode 100644 Python/on_event_topics_2.py diff --git a/Python/on_event_topics.py b/Python/on_event_topics.py index a45c7ac..113353d 100644 --- a/Python/on_event_topics.py +++ b/Python/on_event_topics.py @@ -10,6 +10,28 @@ import os import logging import sys +# from 1 to 5 (Modbus) +# PCS +# AC_Resistance +# DC_Resistance +# Hz +# DCV +# VL1L2 +# VL2L3 +# VL3L1 + +# OnEvent +# ACSw_Status +# ActErrNoX +# DCSw_3_Status +# Fault_Id_tmp +# Fault_Status +# InvSt_tmp +# MV_Transformer_Gas_Monitoring +# MV_Transformer_Oil_Level_Trip +# MV_Transformer_Pressure +# MV_Transformer_Pressure_Trip + def generate_oe(device_id, modbus_tags): return { 'description': '', @@ -112,7 +134,6 @@ for device in devices: current_device_modules = registry_manager.get_modules(device.deviceId) for module in current_device_modules: if (module.module_id == module_id): - #print(f"{module.module_id}") device.setModule(module) payload = '{"method":"GET", "path":"/azure-iotedge/messages"}' try: @@ -120,7 +141,6 @@ for device in devices: response = registry_manager.invoke_device_module_method(device_id=device.deviceId, module_id=module_id, direct_method_request=direct_method) device_messages = json.dumps(response.payload, indent = 2) parsed_messages = json.loads(device_messages) - #print("Parsed messages") message_ids_with_tags = {} tags_to_on_event = defaultdict(set) for message in parsed_messages["data"]: @@ -131,6 +151,7 @@ for device in devices: tags = message.get("tags", {}).get("modbus_tcp_master", {}) + # This block seems list existing OE topics and their content. for asset, tag_list in tags.items(): if "InvSt" in tag_list and "OE" in topic_name: message_ids_with_tags.setdefault(message_id, message) @@ -140,68 +161,69 @@ for device in devices: message_ids_with_tags.setdefault(message_id, message) tags_to_on_event[asset].add("Fault_Id") print(f"{device.getDeviceId()} {device.getSite()} {device.getNumber()} {message_id} {topic_name} {asset} Fault_Id") - # print(f"{device.getDeviceId()} {device.getSite()} {device.getNumber()}") - # print(f"Messages to update: {', '.join(str(key) for key in message_ids_with_tags.keys())}") - # for asset, tag in tags_to_on_event.items(): - # print(f"Alarms to make on event:") - # print(f"{asset}: {', '.join(tag)}") - # print(f"Finishing on {device}") - - # oe_exists = False - # twelve_exists = False - - # for message in message_ids_with_tags.values(): - # if message['outputTopic'] == "REL_OE_PCS": - # oe_exists = True - # if message['outputTopic'] == "REL_OE_PCS_12H": - # twelve_exists = True - - # if (oe_exists == False or twelve_exists == False): - # first_message = next(iter(message_ids_with_tags.values())) - # properties = first_message['properties'] - # cdid = next((item['value'] for item in properties if item['key'] == 'cdid')) - # print(f"{cdid}") - - # modbus_tags = { - # asset: sorted(list(tags)) - # for asset, tags in tags_to_on_event.items() - # } - # new_payload = "" - # if (oe_exists == False): - # new_payload = '{"method":"POST", "path":"/azure-iotedge/messages", "requestBody":' + json.dumps(generate_oe(cdid, modbus_tags)) + '}' - # new_direct_method = CloudToDeviceMethod(method_name=method_name, payload=json.loads(new_payload)) - # new_response = registry_manager.invoke_device_module_method(device_id=device.deviceId, module_id=module_id, direct_method_request=new_direct_method) - # print(json.dumps(new_response.payload, indent = 2)) - # if (twelve_exists == False): - # new_payload = '{"method":"POST", "path":"/azure-iotedge/messages", "requestBody":' + json.dumps(generate_12h(cdid, modbus_tags)) + '}' - # new_direct_method = CloudToDeviceMethod(method_name=method_name, payload=json.loads(new_payload)) - # new_response = registry_manager.invoke_device_module_method(device_id=device.deviceId, module_id=module_id, direct_method_request=new_direct_method) - # print(json.dumps(new_response.payload, indent = 2)) + print(f"{device.getDeviceId()} {device.getSite()} {device.getNumber()}") + print(f"Messages to update: {', '.join(str(key) for key in message_ids_with_tags.keys())}") + for asset, tag in tags_to_on_event.items(): + print(f"Alarms to make on event:") + print(f"{asset}: {', '.join(tag)}") + print(f"Finishing on {device}") - # for id, message in message_ids_with_tags.items(): - # if "OE" in message['outputTopic']: - # continue - # tags_to_remove = {"InvSt", "Fault_Id"} - # for protocol, versions in message['tags'].items(): - # for version, tag_list in versions.items(): - # message['tags'][protocol][version] = [ - # tag for tag in tag_list if tag not in tags_to_remove - # ] - # print("\n\n\nREMOVING\n\n\n") - # new_payload = '{"method":"PUT", "path":"/azure-iotedge/messages/' + str(id) + '", "requestBody":' + json.dumps(message) + '}' - # print(new_payload) - # new_direct_method = CloudToDeviceMethod(method_name=method_name, payload=json.loads(new_payload)) - # new_response = registry_manager.invoke_device_module_method(device_id=device.deviceId, module_id=module_id, direct_method_request=new_direct_method) - # print(json.dumps(new_response.payload, indent = 2)) - # print(message['tags']) + oe_exists = False + twelve_exists = False + + for message in message_ids_with_tags.values(): + if message['outputTopic'] == "REL_OE_PCS": + oe_exists = True + if message['outputTopic'] == "REL_OE_PCS_12H": + twelve_exists = True + + if (oe_exists == False or twelve_exists == False): + first_message = next(iter(message_ids_with_tags.values())) + properties = first_message['properties'] + cdid = next((item['value'] for item in properties if item['key'] == 'cdid')) + print(f"{cdid}") + + modbus_tags = { + asset: sorted(list(tags)) + for asset, tags in tags_to_on_event.items() + } + new_payload = "" + if (oe_exists == False): + new_payload = '{"method":"POST", "path":"/azure-iotedge/messages", "requestBody":' + json.dumps(generate_oe(cdid, modbus_tags)) + '}' + new_direct_method = CloudToDeviceMethod(method_name=method_name, payload=json.loads(new_payload)) + new_response = registry_manager.invoke_device_module_method(device_id=device.deviceId, module_id=module_id, direct_method_request=new_direct_method) + print(json.dumps(new_response.payload, indent = 2)) + + if (twelve_exists == False): + new_payload = '{"method":"POST", "path":"/azure-iotedge/messages", "requestBody":' + json.dumps(generate_12h(cdid, modbus_tags)) + '}' + new_direct_method = CloudToDeviceMethod(method_name=method_name, payload=json.loads(new_payload)) + new_response = registry_manager.invoke_device_module_method(device_id=device.deviceId, module_id=module_id, direct_method_request=new_direct_method) + print(json.dumps(new_response.payload, indent = 2)) + + for id, message in message_ids_with_tags.items(): + if "OE" in message['outputTopic']: + continue + tags_to_remove = {"InvSt", "Fault_Id"} + for protocol, versions in message['tags'].items(): + for version, tag_list in versions.items(): + message['tags'][protocol][version] = [ + tag for tag in tag_list if tag not in tags_to_remove + ] + print("\n\n\nREMOVING\n\n\n") + new_payload = '{"method":"PUT", "path":"/azure-iotedge/messages/' + str(id) + '", "requestBody":' + json.dumps(message) + '}' + print(new_payload) + new_direct_method = CloudToDeviceMethod(method_name=method_name, payload=json.loads(new_payload)) + new_response = registry_manager.invoke_device_module_method(device_id=device.deviceId, module_id=module_id, direct_method_request=new_direct_method) + print(json.dumps(new_response.payload, indent = 2)) + print(message['tags']) - # print("\n\n\nREMOVING\n\n\n") - # new_payload = '{"method":"PUT", "path":"/azure-iotedge/messages/' + str(id) + '", "requestBody":' + json.dumps(message) + '}' - # print(new_payload) - # new_direct_method = CloudToDeviceMethod(method_name=method_name, payload=json.loads(new_payload)) - # new_response = registry_manager.invoke_device_module_method(device_id=device.deviceId, module_id=module_id, direct_method_request=new_direct_method) - # print(json.dumps(new_response.payload, indent = 2)) + print("\n\n\nREMOVING\n\n\n") + new_payload = '{"method":"PUT", "path":"/azure-iotedge/messages/' + str(id) + '", "requestBody":' + json.dumps(message) + '}' + print(new_payload) + new_direct_method = CloudToDeviceMethod(method_name=method_name, payload=json.loads(new_payload)) + new_response = registry_manager.invoke_device_module_method(device_id=device.deviceId, module_id=module_id, direct_method_request=new_direct_method) + print(json.dumps(new_response.payload, indent = 2)) except Exception as e: logger.error(str(e)) \ No newline at end of file diff --git a/Python/on_event_topics_2.py b/Python/on_event_topics_2.py new file mode 100644 index 0000000..17e9dd2 --- /dev/null +++ b/Python/on_event_topics_2.py @@ -0,0 +1,366 @@ +from azure.iot.hub import IoTHubRegistryManager +from azure.iot.hub.protocol.models import QuerySpecification, Module +from azure.iot.hub.models import CloudToDeviceMethod, CloudToDeviceMethodResult +from dotenv import load_dotenv +from isight_device import iSightDevice +from collections import defaultdict + +import copy +import json +import os +import logging +import sys +import re + +# from 1 to 5 (Modbus) +# PCS +# AC_Resistance +# DC_Resistance +# Hz +# DCV +# VL1L2 +# VL2L3 +# VL3L1 + +def resolve_missing_topic(topic_name, available_list): + """ + Prompts user to select a topic or create a new one. + Returns the selected ID or 0 if creation is requested. + """ + print(f"Topic {topic_name} not found!") + print(f"0. Create a new {topic_name} topic") + + # Display available options + for idx, item in enumerate(available_list): + print(f"{idx + 1}. {item['outputTopic']}") + + while True: + try: + choice = int(input(f"Select an option for {topic_name}: ")) + + if choice == 0: + return 0 + + if 1 <= choice <= len(available_list): + selected = available_list[choice - 1] + return selected['id'] + + print("Invalid number, please select from the list.") + except ValueError: + print("Please enter a valid number.") + +def generate_oe(device_id, modbus_tags): + return { + 'description': '', + 'outputTopic': 'REL_OE_PCS', + 'properties': [ + {'key': 'cdid', 'value': device_id}, + {'key': 'deviceType', 'value': 'AC_GATEWAY'} + ], + 'tags': { + 'modbus_tcp_master': modbus_tags + }, + 'sendOutThreshold': { + 'mode': 'immediately', + 'size': 4096, + 'time': 60, + 'sizeIdleTimer': { + 'enable': True, + 'time': 60 + } + }, + 'minPublishInterval': 0, + 'samplingMode': 'allChangedValues', + 'customSamplingRate': False, + 'pollingInterval': 0, + 'onChange': True, + 'enable': True, + 'format': ( + '{deviceId:("' + device_id + + '"),emissionDate:(now|todateiso8601),deviceType:("AC_GATEWAY"),' + 'metrics:[{idAsset:(.srcName),name:(.tagName),value:.dataValue,' + 'timestamp:((.ts/1000000)|todateiso8601)}]}' + ) +} + +def generate_12h(device_id, modbus_tags): + return { + 'description': '', + 'outputTopic': 'REL_OE_PCS_12H', + 'properties': [ + {'key': 'cdid', 'value': device_id}, + {'key': 'deviceType', 'value': 'AC_GATEWAY'} + ], + 'tags': { + 'modbus_tcp_master': modbus_tags + }, + 'sendOutThreshold': { + 'mode': 'byTime', + 'size': 4096, + 'time': 43200, + 'sizeIdleTimer': { + 'enable': True, + 'time': 60 + } + }, + 'minPublishInterval': 0, + 'samplingMode': 'latestValues', + 'customSamplingRate': False, + 'pollingInterval': 43200, + 'onChange': False, + 'enable': True, + 'format': ( + '{deviceId:("' + device_id + + '"),emissionDate:(now|todateiso8601),deviceType:("AC_GATEWAY"),' + 'metrics:[{idAsset:(.srcName),name:(.tagName),value:.dataValue,' + 'timestamp:((.ts/1000000)|todateiso8601)}]}' + ) +} + +# Configure logger to use stderr +logging.basicConfig( + level=logging.ERROR, + format='%(asctime)s - %(levelname)s - %(message)s', + stream=sys.stderr # <- Key line +) +logger = logging.getLogger(__name__) + +# .env containing secrets to authorize access to IoT Hub +load_dotenv() +CONNECTION_STRING = str(os.getenv("CONNECTION_STRING_INOX_PROD")) +if CONNECTION_STRING == "": + print("Provide a connection string for the Iot Hub before running the script!") + exit(13) + +# IoT Edge module and method that allow Moxa AIG-301 management +module_id = "thingspro-agent" +method_name = "thingspro-api-v1" + +# Connection to IoT Hub and listing of IoT Edge device (so AIG-301 in the scope of SAFT) +registry_manager = IoTHubRegistryManager.from_connection_string(CONNECTION_STRING) +query_spec = QuerySpecification(query="SELECT * FROM devices WHERE capabilities.iotEdge = true ") +query_result = registry_manager.query_iot_hub(query_spec) + +# Sorting devices by site +devices = [] +try: + for item in query_result.items: + if str(item.connection_state) == "Connected": + currentDevice = iSightDevice(str(item.device_id), str(item.tags['site']), int(item.tags['number']), str(item.tags['version'])) + devices.append(iSightDevice(str(item.device_id), str(item.tags['site']), int(item.tags['number']), str(item.tags['version']))) + devices.sort(key = lambda d: (d.site, d.number)) + else: + print(f"Device not connected: {str(item.tags['site'])} {int(item.tags['number'])} {str(item.device_id)}") +except KeyboardInterrupt: + # This block runs when user presses Ctrl+C + print("\nProgram killed by user (Ctrl+C).") + sys.exit(0) + +# List of the tags that are a status or alert +on_event_tags = [ + "1365", + "AC_Insulation_Status", + "ACSw_Status", + "ActErrNo1", + "ActErrNo10", + "ActErrNo2", + "ActErrNo3", + "ActErrNo4", + "ActErrNo5", + "ActErrNo6", + "ActErrNo7", + "ActErrNo8", + "ActErrNo9", + "BESS_Control_Mode", + "BESS_Control_Mode_2", + "Calibration_Required", + "CO_Sensor_1", + "CO_Sensor_2", + "CO_Sensor_3", + "CO_Sensor_4", + "CO_Sensor_5", + "Combi_Detector_1", + "Combi_Detector_2", + "Combi_Detector_3", + "Combi_Detector_4", + "Combi_Detector_5", + "Combi_Detector_6", + "DC_Insulation_Status", + "DCSw_1_Status", + "DCSw_2_Status", + "DCSw_3_Status", + "Digital_Input_Line_0", + "DIL0", + "DIL1", + "DIL2", + "Door_Open", + "EMS_Control", + "Energy_Fault", + "ESS_Status", + "Extinguisher_Activated", + "Fault_Id_tmp", + "Fault_Module_Id", + "Fault_Status", + "General_Fault", + "InvSt_tmp", + "Last_Event", + "Loss_of_Agent", + "Manual_mode", + "Manual_Pull_Station", + "MV_Transformer_Gas_Monitoring", + "MV_Transformer_Oil_Level_Trip", + "MV_Transformer_Pressure", + "MV_Transformer_Pressure_Trip", + "MV_Transformer_Temperature_Alarm", + "MV_Transformer_Temperature_Warning", + "NbModule", + "NbModule_Run", + "Reg_Request", + "Version", + "Warn_Id" +] + + +try: + for device in devices: + + print(f"{device.getSite()} {device.getNumber()} {device.getDeviceId()}") + + modbus_tags_to_set_on_event = {} + id_on_event_topic = -1 + id_12h_topic = -1 + + current_device_modules = registry_manager.get_modules(device.deviceId) + for module in current_device_modules: + if (module.module_id == module_id): + device.setModule(module) + + # This block get all the Modbus tags available and cross check them with the list of tags that require changes + payload = '{"method":"GET", "path":"/tags/list"}' + try: + direct_method = CloudToDeviceMethod(method_name=method_name, payload=json.loads(payload)) + response = registry_manager.invoke_device_module_method(device_id=device.deviceId, module_id=module_id, direct_method_request=direct_method) + device_tags = json.dumps(response.payload, indent = 2) + parsed_tags = json.loads(device_tags) + + for item in parsed_tags['data']: + if item['prvdName'] == "modbus_tcp_master" and item['tagName'] in on_event_tags: + modbus_tags_to_set_on_event.setdefault(item['srcName'], []).append(item['tagName']) + + except Exception as e: + logger.error(f"Exception when getting list of device's Modbus tags that can be set On Event:\n{str(e)}") + + + payload = '{"method":"GET", "path":"/azure-iotedge/messages"}' + try: + direct_method = CloudToDeviceMethod(method_name=method_name, payload=json.loads(payload)) + response = registry_manager.invoke_device_module_method(device_id=device.deviceId, module_id=module_id, direct_method_request=direct_method) + device_topics = json.dumps(response.payload, indent = 2) + parsed_topics = json.loads(device_topics) + + for item in parsed_topics["data"]: + if item['enable'] and item["outputTopic"] == "REL_OE_PCS": + id_on_event_topic = item['id'] + if item['enable'] and item["outputTopic"] == "REL_OE_PCS_12H": + id_12h_topic = item['id'] + + available_topics = [ + item for item in parsed_topics["data"] + if item['enable'] + and item['id'] != id_on_event_topic + and item['id'] != id_12h_topic + ] + + if id_on_event_topic == -1: + id_on_event_topic = resolve_missing_topic("REL_OE_PCS", available_topics) + + if id_12h_topic == -1: + id_12h_topic = resolve_missing_topic("REL_OE_PCS_12H", available_topics) + + + targets = [ + {"id": id_on_event_topic, "name": "OnEvent"}, + {"id": id_12h_topic, "name": "12h"} + ] + + for target in targets: + target_id = target["id"] + target_name = target["name"] + + if target_id > 0: + topic_item = next((item for item in parsed_topics["data"] if item["id"] == target_id), None) + if topic_item: + original_tags = topic_item.get("tags", {}).get("modbus_tcp_master", {}) + else: # Pure error handling, detected ID does not exist + print(f"Error: ID {target_id} not found. Skipping.") + continue + else: # Not error handling, target_id == 0 means creation of new topic + original_tags = {} + + final_tag_list = copy.deepcopy(original_tags) + for device_name, tags_to_add in modbus_tags_to_set_on_event.items(): + if device_name not in final_tag_list: + final_tag_list[device_name] = [] + current_list = final_tag_list[device_name] + for tag in tags_to_add: + if tag not in current_list: + current_list.append(tag) + # CASE A: CREATE NEW + if target_id == 0: + print(f"[{target_name}] ID is 0 -> ACTION: CREATE NEW TOPIC") + elif final_tag_list != original_tags: + print(f"[{target_name}] ID {target_id} -> ACTION: UPDATE EXISTING") + else: + print(f"[{target_name}] ID {target_id} -> ACTION: NONE (Already up to date)") + + # Removing tags from existing topics: cleaned_topic + protected_ids = [id_on_event_topic, id_12h_topic] + + + for item in parsed_topics["data"]: + if item['id'] in protected_ids or item['enable'] == False: + continue + + original_tags = item.get("tags", {}).get("modbus_tcp_master", {}) + + try: + idAsset = re.search(r"idAsset\s*:\s*\((.*?)\)", item.get("format", "")).group(1).strip() + except Exception as e: + print(f"Exception while getting idAsset: {item}") + continue + + if not original_tags: + continue + + final_tag_list = copy.deepcopy(original_tags) + modification_needed = False + + for device_name, tags_to_remove in modbus_tags_to_set_on_event.items(): + + if device_name in final_tag_list: + current_list = final_tag_list[device_name] + new_list = [t for t in current_list if t not in tags_to_remove] + + if len(new_list) != len(current_list): + final_tag_list[device_name] = new_list + modification_needed = True + + if not final_tag_list[device_name]: + del final_tag_list[device_name] + + if modification_needed: + print(f"-> Applying update for topic: {item['outputTopic']} (ID: {item['id']} & {idAsset})") + + except Exception as e: + logger.error(f"Exception when getting all Telemetry Topics:\n{str(e)}") + + user_input = 'Y' # input("Continue? Y/n ").lower().strip() + if user_input == 'n': + print("Stopping loop.") + break +except KeyboardInterrupt: + # This block runs when user presses Ctrl+C + print("\nProgram killed by user (Ctrl+C).") + sys.exit(0) + +