from azure.iot.hub import IoTHubRegistryManager from azure.iot.hub.protocol.models import QuerySpecification, Module from azure.iot.hub.models import CloudToDeviceMethod, CloudToDeviceMethodResult from dotenv import load_dotenv from isight_device import iSightDevice from collections import defaultdict import json import os import logging import sys # from 1 to 5 (Modbus) # PCS # AC_Resistance # DC_Resistance # Hz # DCV # VL1L2 # VL2L3 # VL3L1 # OnEvent # ACSw_Status # ActErrNoX # DCSw_3_Status # Fault_Id_tmp # Fault_Status # InvSt_tmp # MV_Transformer_Gas_Monitoring # MV_Transformer_Oil_Level_Trip # MV_Transformer_Pressure # MV_Transformer_Pressure_Trip def generate_oe(device_id, modbus_tags): return { 'description': '', 'outputTopic': 'REL_OE_PCS', 'properties': [ {'key': 'cdid', 'value': device_id}, {'key': 'deviceType', 'value': 'AC_GATEWAY'} ], 'tags': { 'modbus_tcp_master': modbus_tags }, 'sendOutThreshold': { 'mode': 'immediately', 'size': 4096, 'time': 60, 'sizeIdleTimer': { 'enable': True, 'time': 60 } }, 'minPublishInterval': 0, 'samplingMode': 'allChangedValues', 'customSamplingRate': False, 'pollingInterval': 0, 'onChange': True, 'enable': True, 'format': ( '{version:3,deviceId:("' + device_id + '"),emissionDate:(now|todateiso8601),deviceType:("AC_GATEWAY"),' 'metrics:[{idAsset:(.srcName),name:(.tagName),value:.dataValue,' 'timestamp:((.ts/1000000)|todateiso8601)}]}' ) } def generate_12h(device_id, modbus_tags): return { 'description': '', 'outputTopic': 'REL_OE_PCS_12H', 'properties': [ {'key': 'cdid', 'value': device_id}, {'key': 'deviceType', 'value': 'AC_GATEWAY'} ], 'tags': { 'modbus_tcp_master': modbus_tags }, 'sendOutThreshold': { 'mode': 'byTime', 'size': 4096, 'time': 43200, 'sizeIdleTimer': { 'enable': True, 'time': 60 } }, 'minPublishInterval': 0, 'samplingMode': 'latestValues', 'customSamplingRate': False, 'pollingInterval': 43200, 'onChange': False, 'enable': True, 'format': ( '{version:3,deviceId:("' + device_id + '"),emissionDate:(now|todateiso8601),deviceType:("AC_GATEWAY"),' 'metrics:[{idAsset:(.srcName),name:(.tagName),value:.dataValue,' 'timestamp:((.ts/1000000)|todateiso8601)}]}' ) } # Configure logger to use stderr logging.basicConfig( level=logging.ERROR, format='%(asctime)s - %(levelname)s - %(message)s', stream=sys.stderr # <- Key line ) logger = logging.getLogger(__name__) load_dotenv() module_id = "thingspro-agent" method_name = "thingspro-api-v1" CONNECTION_STRING = str(os.getenv("CONNECTION_STRING_INOX_PROD")) if CONNECTION_STRING == "": print("Provide a connection string for the Iot Hub before running the script!") exit(13) registry_manager = IoTHubRegistryManager.from_connection_string(CONNECTION_STRING) query_spec = QuerySpecification(query="SELECT * FROM devices WHERE capabilities.iotEdge = true ") query_result = registry_manager.query_iot_hub(query_spec) devices = [] for item in query_result.items: currentDevice = iSightDevice(str(item.device_id), str(item.tags['site']), int(item.tags['number']), str(item.tags['version'])) devices.append(iSightDevice(str(item.device_id), str(item.tags['site']), int(item.tags['number']), str(item.tags['version']))) devices.sort(key = lambda d: (d.site, d.number)) for device in devices: current_device_modules = registry_manager.get_modules(device.deviceId) for module in current_device_modules: if (module.module_id == module_id): device.setModule(module) payload = '{"method":"GET", "path":"/azure-iotedge/messages"}' try: direct_method = CloudToDeviceMethod(method_name=method_name, payload=json.loads(payload)) response = registry_manager.invoke_device_module_method(device_id=device.deviceId, module_id=module_id, direct_method_request=direct_method) device_messages = json.dumps(response.payload, indent = 2) parsed_messages = json.loads(device_messages) message_ids_with_tags = {} tags_to_on_event = defaultdict(set) for message in parsed_messages["data"]: message_id = message["id"] topic_name = message["outputTopic"] message_content = message tags = message.get("tags", {}).get("modbus_tcp_master", {}) # This block seems list existing OE topics and their content. for asset, tag_list in tags.items(): if "InvSt" in tag_list and "OE" in topic_name: message_ids_with_tags.setdefault(message_id, message) tags_to_on_event[asset].add("InvSt") print(f"{device.getDeviceId()} {device.getSite()} {device.getNumber()} {message_id} {topic_name} {asset} InvSt") if "Fault_Id" in tag_list and "OE" in topic_name: message_ids_with_tags.setdefault(message_id, message) tags_to_on_event[asset].add("Fault_Id") print(f"{device.getDeviceId()} {device.getSite()} {device.getNumber()} {message_id} {topic_name} {asset} Fault_Id") print(f"{device.getDeviceId()} {device.getSite()} {device.getNumber()}") print(f"Messages to update: {', '.join(str(key) for key in message_ids_with_tags.keys())}") for asset, tag in tags_to_on_event.items(): print(f"Alarms to make on event:") print(f"{asset}: {', '.join(tag)}") print(f"Finishing on {device}") oe_exists = False twelve_exists = False for message in message_ids_with_tags.values(): if message['outputTopic'] == "REL_OE_PCS": oe_exists = True if message['outputTopic'] == "REL_OE_PCS_12H": twelve_exists = True if (oe_exists == False or twelve_exists == False): first_message = next(iter(message_ids_with_tags.values())) properties = first_message['properties'] cdid = next((item['value'] for item in properties if item['key'] == 'cdid')) print(f"{cdid}") modbus_tags = { asset: sorted(list(tags)) for asset, tags in tags_to_on_event.items() } new_payload = "" if (oe_exists == False): new_payload = '{"method":"POST", "path":"/azure-iotedge/messages", "requestBody":' + json.dumps(generate_oe(cdid, modbus_tags)) + '}' new_direct_method = CloudToDeviceMethod(method_name=method_name, payload=json.loads(new_payload)) new_response = registry_manager.invoke_device_module_method(device_id=device.deviceId, module_id=module_id, direct_method_request=new_direct_method) print(json.dumps(new_response.payload, indent = 2)) if (twelve_exists == False): new_payload = '{"method":"POST", "path":"/azure-iotedge/messages", "requestBody":' + json.dumps(generate_12h(cdid, modbus_tags)) + '}' new_direct_method = CloudToDeviceMethod(method_name=method_name, payload=json.loads(new_payload)) new_response = registry_manager.invoke_device_module_method(device_id=device.deviceId, module_id=module_id, direct_method_request=new_direct_method) print(json.dumps(new_response.payload, indent = 2)) for id, message in message_ids_with_tags.items(): if "OE" in message['outputTopic']: continue tags_to_remove = {"InvSt", "Fault_Id"} for protocol, versions in message['tags'].items(): for version, tag_list in versions.items(): message['tags'][protocol][version] = [ tag for tag in tag_list if tag not in tags_to_remove ] print("\n\n\nREMOVING\n\n\n") new_payload = '{"method":"PUT", "path":"/azure-iotedge/messages/' + str(id) + '", "requestBody":' + json.dumps(message) + '}' print(new_payload) new_direct_method = CloudToDeviceMethod(method_name=method_name, payload=json.loads(new_payload)) new_response = registry_manager.invoke_device_module_method(device_id=device.deviceId, module_id=module_id, direct_method_request=new_direct_method) print(json.dumps(new_response.payload, indent = 2)) print(message['tags']) print("\n\n\nREMOVING\n\n\n") new_payload = '{"method":"PUT", "path":"/azure-iotedge/messages/' + str(id) + '", "requestBody":' + json.dumps(message) + '}' print(new_payload) new_direct_method = CloudToDeviceMethod(method_name=method_name, payload=json.loads(new_payload)) new_response = registry_manager.invoke_device_module_method(device_id=device.deviceId, module_id=module_id, direct_method_request=new_direct_method) print(json.dumps(new_response.payload, indent = 2)) except Exception as e: logger.error(str(e))