from azure.iot.hub import IoTHubRegistryManager from azure.iot.hub.protocol.models import QuerySpecification, Module from azure.iot.hub.models import CloudToDeviceMethod, CloudToDeviceMethodResult from dotenv import load_dotenv from isight_device import iSightDevice from collections import defaultdict import json import os import logging import sys def generate_oe(device_id, modbus_tags): return { 'description': '', 'outputTopic': 'REL_OE_PCS', 'properties': [ {'key': 'cdid', 'value': device_id}, {'key': 'deviceType', 'value': 'AC_GATEWAY'} ], 'tags': { 'modbus_tcp_master': modbus_tags }, 'sendOutThreshold': { 'mode': 'immediately', 'size': 4096, 'time': 60, 'sizeIdleTimer': { 'enable': True, 'time': 60 } }, 'minPublishInterval': 0, 'samplingMode': 'allChangedValues', 'customSamplingRate': False, 'pollingInterval': 0, 'onChange': True, 'enable': True, 'format': ( '{version:3,deviceId:("' + device_id + '"),emissionDate:(now|todateiso8601),deviceType:("AC_GATEWAY"),' 'metrics:[{idAsset:(.srcName),name:(.tagName),value:.dataValue,' 'timestamp:((.ts/1000000)|todateiso8601)}]}' ) } def generate_12h(device_id, modbus_tags): return { 'description': '', 'outputTopic': 'REL_OE_PCS_12H', 'properties': [ {'key': 'cdid', 'value': device_id}, {'key': 'deviceType', 'value': 'AC_GATEWAY'} ], 'tags': { 'modbus_tcp_master': modbus_tags }, 'sendOutThreshold': { 'mode': 'byTime', 'size': 4096, 'time': 43200, 'sizeIdleTimer': { 'enable': True, 'time': 60 } }, 'minPublishInterval': 0, 'samplingMode': 'latestValues', 'customSamplingRate': False, 'pollingInterval': 43200, 'onChange': False, 'enable': True, 'format': ( '{version:3,deviceId:("' + device_id + '"),emissionDate:(now|todateiso8601),deviceType:("AC_GATEWAY"),' 'metrics:[{idAsset:(.srcName),name:(.tagName),value:.dataValue,' 'timestamp:((.ts/1000000)|todateiso8601)}]}' ) } # Configure logger to use stderr logging.basicConfig( level=logging.ERROR, format='%(asctime)s - %(levelname)s - %(message)s', stream=sys.stderr # <- Key line ) logger = logging.getLogger(__name__) load_dotenv() module_id = "thingspro-agent" method_name = "thingspro-api-v1" CONNECTION_STRING = str(os.getenv("CONNECTION_STRING_INOX_PROD")) if CONNECTION_STRING == "": print("Provide a connection string for the Iot Hub before running the script!") exit(13) registry_manager = IoTHubRegistryManager.from_connection_string(CONNECTION_STRING) query_spec = QuerySpecification(query="SELECT * FROM devices WHERE capabilities.iotEdge = true ") query_result = registry_manager.query_iot_hub(query_spec) devices = [] for item in query_result.items: currentDevice = iSightDevice(str(item.device_id), str(item.tags['site']), int(item.tags['number']), str(item.tags['version'])) devices.append(iSightDevice(str(item.device_id), str(item.tags['site']), int(item.tags['number']), str(item.tags['version']))) devices.sort(key = lambda d: (d.site, d.number)) for device in devices: current_device_modules = registry_manager.get_modules(device.deviceId) for module in current_device_modules: if (module.module_id == module_id): #print(f"{module.module_id}") device.setModule(module) payload = '{"method":"GET", "path":"/azure-iotedge/messages"}' try: direct_method = CloudToDeviceMethod(method_name=method_name, payload=json.loads(payload)) response = registry_manager.invoke_device_module_method(device_id=device.deviceId, module_id=module_id, direct_method_request=direct_method) device_messages = json.dumps(response.payload, indent = 2) parsed_messages = json.loads(device_messages) #print("Parsed messages") message_ids_with_tags = {} tags_to_on_event = defaultdict(set) for message in parsed_messages["data"]: message_id = message["id"] topic_name = message["outputTopic"] message_content = message tags = message.get("tags", {}).get("modbus_tcp_master", {}) for asset, tag_list in tags.items(): if "InvSt" in tag_list and "OE" in topic_name: message_ids_with_tags.setdefault(message_id, message) tags_to_on_event[asset].add("InvSt") print(f"{device.getDeviceId()} {device.getSite()} {device.getNumber()} {message_id} {topic_name} {asset} InvSt") if "Fault_Id" in tag_list and "OE" in topic_name: message_ids_with_tags.setdefault(message_id, message) tags_to_on_event[asset].add("Fault_Id") print(f"{device.getDeviceId()} {device.getSite()} {device.getNumber()} {message_id} {topic_name} {asset} Fault_Id") # print(f"{device.getDeviceId()} {device.getSite()} {device.getNumber()}") # print(f"Messages to update: {', '.join(str(key) for key in message_ids_with_tags.keys())}") # for asset, tag in tags_to_on_event.items(): # print(f"Alarms to make on event:") # print(f"{asset}: {', '.join(tag)}") # print(f"Finishing on {device}") # oe_exists = False # twelve_exists = False # for message in message_ids_with_tags.values(): # if message['outputTopic'] == "REL_OE_PCS": # oe_exists = True # if message['outputTopic'] == "REL_OE_PCS_12H": # twelve_exists = True # if (oe_exists == False or twelve_exists == False): # first_message = next(iter(message_ids_with_tags.values())) # properties = first_message['properties'] # cdid = next((item['value'] for item in properties if item['key'] == 'cdid')) # print(f"{cdid}") # modbus_tags = { # asset: sorted(list(tags)) # for asset, tags in tags_to_on_event.items() # } # new_payload = "" # if (oe_exists == False): # new_payload = '{"method":"POST", "path":"/azure-iotedge/messages", "requestBody":' + json.dumps(generate_oe(cdid, modbus_tags)) + '}' # new_direct_method = CloudToDeviceMethod(method_name=method_name, payload=json.loads(new_payload)) # new_response = registry_manager.invoke_device_module_method(device_id=device.deviceId, module_id=module_id, direct_method_request=new_direct_method) # print(json.dumps(new_response.payload, indent = 2)) # if (twelve_exists == False): # new_payload = '{"method":"POST", "path":"/azure-iotedge/messages", "requestBody":' + json.dumps(generate_12h(cdid, modbus_tags)) + '}' # new_direct_method = CloudToDeviceMethod(method_name=method_name, payload=json.loads(new_payload)) # new_response = registry_manager.invoke_device_module_method(device_id=device.deviceId, module_id=module_id, direct_method_request=new_direct_method) # print(json.dumps(new_response.payload, indent = 2)) # for id, message in message_ids_with_tags.items(): # if "OE" in message['outputTopic']: # continue # tags_to_remove = {"InvSt", "Fault_Id"} # for protocol, versions in message['tags'].items(): # for version, tag_list in versions.items(): # message['tags'][protocol][version] = [ # tag for tag in tag_list if tag not in tags_to_remove # ] # print("\n\n\nREMOVING\n\n\n") # new_payload = '{"method":"PUT", "path":"/azure-iotedge/messages/' + str(id) + '", "requestBody":' + json.dumps(message) + '}' # print(new_payload) # new_direct_method = CloudToDeviceMethod(method_name=method_name, payload=json.loads(new_payload)) # new_response = registry_manager.invoke_device_module_method(device_id=device.deviceId, module_id=module_id, direct_method_request=new_direct_method) # print(json.dumps(new_response.payload, indent = 2)) # print(message['tags']) # print("\n\n\nREMOVING\n\n\n") # new_payload = '{"method":"PUT", "path":"/azure-iotedge/messages/' + str(id) + '", "requestBody":' + json.dumps(message) + '}' # print(new_payload) # new_direct_method = CloudToDeviceMethod(method_name=method_name, payload=json.loads(new_payload)) # new_response = registry_manager.invoke_device_module_method(device_id=device.deviceId, module_id=module_id, direct_method_request=new_direct_method) # print(json.dumps(new_response.payload, indent = 2)) except Exception as e: logger.error(str(e))