diff --git a/Python/isight_device.py b/Python/isight_device.py index 68b502e..5f4414a 100644 --- a/Python/isight_device.py +++ b/Python/isight_device.py @@ -21,6 +21,15 @@ class iSightDevice: def __str__(self): return f"{self.deviceId} {self.site} {self.number} {self.cloudVersion}" + + def getDeviceId(self): + return f"{self.deviceId}" + + def getSite(self): + return f"{self.site}" + + def getNumber(self): + return f"{self.number}" class CubeDevice: def __init__(self, deviceId: str): diff --git a/Python/on_event_topics.py b/Python/on_event_topics.py new file mode 100644 index 0000000..a45c7ac --- /dev/null +++ b/Python/on_event_topics.py @@ -0,0 +1,207 @@ +from azure.iot.hub import IoTHubRegistryManager +from azure.iot.hub.protocol.models import QuerySpecification, Module +from azure.iot.hub.models import CloudToDeviceMethod, CloudToDeviceMethodResult +from dotenv import load_dotenv +from isight_device import iSightDevice +from collections import defaultdict + +import json +import os +import logging +import sys + +def generate_oe(device_id, modbus_tags): + return { + 'description': '', + 'outputTopic': 'REL_OE_PCS', + 'properties': [ + {'key': 'cdid', 'value': device_id}, + {'key': 'deviceType', 'value': 'AC_GATEWAY'} + ], + 'tags': { + 'modbus_tcp_master': modbus_tags + }, + 'sendOutThreshold': { + 'mode': 'immediately', + 'size': 4096, + 'time': 60, + 'sizeIdleTimer': { + 'enable': True, + 'time': 60 + } + }, + 'minPublishInterval': 0, + 'samplingMode': 'allChangedValues', + 'customSamplingRate': False, + 'pollingInterval': 0, + 'onChange': True, + 'enable': True, + 'format': ( + '{version:3,deviceId:("' + device_id + + '"),emissionDate:(now|todateiso8601),deviceType:("AC_GATEWAY"),' + 'metrics:[{idAsset:(.srcName),name:(.tagName),value:.dataValue,' + 'timestamp:((.ts/1000000)|todateiso8601)}]}' + ) +} + +def generate_12h(device_id, modbus_tags): + return { + 'description': '', + 'outputTopic': 'REL_OE_PCS_12H', + 'properties': [ + {'key': 'cdid', 'value': device_id}, + {'key': 'deviceType', 'value': 'AC_GATEWAY'} + ], + 'tags': { + 'modbus_tcp_master': modbus_tags + }, + 'sendOutThreshold': { + 'mode': 'byTime', + 'size': 4096, + 'time': 43200, + 'sizeIdleTimer': { + 'enable': True, + 'time': 60 + } + }, + 'minPublishInterval': 0, + 'samplingMode': 'latestValues', + 'customSamplingRate': False, + 'pollingInterval': 43200, + 'onChange': False, + 'enable': True, + 'format': ( + '{version:3,deviceId:("' + device_id + + '"),emissionDate:(now|todateiso8601),deviceType:("AC_GATEWAY"),' + 'metrics:[{idAsset:(.srcName),name:(.tagName),value:.dataValue,' + 'timestamp:((.ts/1000000)|todateiso8601)}]}' + ) +} + +# Configure logger to use stderr +logging.basicConfig( + level=logging.ERROR, + format='%(asctime)s - %(levelname)s - %(message)s', + stream=sys.stderr # <- Key line +) +logger = logging.getLogger(__name__) + +load_dotenv() + +module_id = "thingspro-agent" +method_name = "thingspro-api-v1" + +CONNECTION_STRING = str(os.getenv("CONNECTION_STRING_INOX_PROD")) +if CONNECTION_STRING == "": + print("Provide a connection string for the Iot Hub before running the script!") + exit(13) + +registry_manager = IoTHubRegistryManager.from_connection_string(CONNECTION_STRING) +query_spec = QuerySpecification(query="SELECT * FROM devices WHERE capabilities.iotEdge = true ") + +query_result = registry_manager.query_iot_hub(query_spec) + +devices = [] +for item in query_result.items: + currentDevice = iSightDevice(str(item.device_id), str(item.tags['site']), int(item.tags['number']), str(item.tags['version'])) + devices.append(iSightDevice(str(item.device_id), str(item.tags['site']), int(item.tags['number']), str(item.tags['version']))) + + devices.sort(key = lambda d: (d.site, d.number)) + +for device in devices: + current_device_modules = registry_manager.get_modules(device.deviceId) + for module in current_device_modules: + if (module.module_id == module_id): + #print(f"{module.module_id}") + device.setModule(module) + payload = '{"method":"GET", "path":"/azure-iotedge/messages"}' + try: + direct_method = CloudToDeviceMethod(method_name=method_name, payload=json.loads(payload)) + response = registry_manager.invoke_device_module_method(device_id=device.deviceId, module_id=module_id, direct_method_request=direct_method) + device_messages = json.dumps(response.payload, indent = 2) + parsed_messages = json.loads(device_messages) + #print("Parsed messages") + message_ids_with_tags = {} + tags_to_on_event = defaultdict(set) + for message in parsed_messages["data"]: + + message_id = message["id"] + topic_name = message["outputTopic"] + message_content = message + + tags = message.get("tags", {}).get("modbus_tcp_master", {}) + + for asset, tag_list in tags.items(): + if "InvSt" in tag_list and "OE" in topic_name: + message_ids_with_tags.setdefault(message_id, message) + tags_to_on_event[asset].add("InvSt") + print(f"{device.getDeviceId()} {device.getSite()} {device.getNumber()} {message_id} {topic_name} {asset} InvSt") + if "Fault_Id" in tag_list and "OE" in topic_name: + message_ids_with_tags.setdefault(message_id, message) + tags_to_on_event[asset].add("Fault_Id") + print(f"{device.getDeviceId()} {device.getSite()} {device.getNumber()} {message_id} {topic_name} {asset} Fault_Id") + # print(f"{device.getDeviceId()} {device.getSite()} {device.getNumber()}") + # print(f"Messages to update: {', '.join(str(key) for key in message_ids_with_tags.keys())}") + # for asset, tag in tags_to_on_event.items(): + # print(f"Alarms to make on event:") + # print(f"{asset}: {', '.join(tag)}") + # print(f"Finishing on {device}") + + # oe_exists = False + # twelve_exists = False + + # for message in message_ids_with_tags.values(): + # if message['outputTopic'] == "REL_OE_PCS": + # oe_exists = True + # if message['outputTopic'] == "REL_OE_PCS_12H": + # twelve_exists = True + + # if (oe_exists == False or twelve_exists == False): + # first_message = next(iter(message_ids_with_tags.values())) + # properties = first_message['properties'] + # cdid = next((item['value'] for item in properties if item['key'] == 'cdid')) + # print(f"{cdid}") + + # modbus_tags = { + # asset: sorted(list(tags)) + # for asset, tags in tags_to_on_event.items() + # } + # new_payload = "" + # if (oe_exists == False): + # new_payload = '{"method":"POST", "path":"/azure-iotedge/messages", "requestBody":' + json.dumps(generate_oe(cdid, modbus_tags)) + '}' + # new_direct_method = CloudToDeviceMethod(method_name=method_name, payload=json.loads(new_payload)) + # new_response = registry_manager.invoke_device_module_method(device_id=device.deviceId, module_id=module_id, direct_method_request=new_direct_method) + # print(json.dumps(new_response.payload, indent = 2)) + + # if (twelve_exists == False): + # new_payload = '{"method":"POST", "path":"/azure-iotedge/messages", "requestBody":' + json.dumps(generate_12h(cdid, modbus_tags)) + '}' + # new_direct_method = CloudToDeviceMethod(method_name=method_name, payload=json.loads(new_payload)) + # new_response = registry_manager.invoke_device_module_method(device_id=device.deviceId, module_id=module_id, direct_method_request=new_direct_method) + # print(json.dumps(new_response.payload, indent = 2)) + + # for id, message in message_ids_with_tags.items(): + # if "OE" in message['outputTopic']: + # continue + # tags_to_remove = {"InvSt", "Fault_Id"} + # for protocol, versions in message['tags'].items(): + # for version, tag_list in versions.items(): + # message['tags'][protocol][version] = [ + # tag for tag in tag_list if tag not in tags_to_remove + # ] + # print("\n\n\nREMOVING\n\n\n") + # new_payload = '{"method":"PUT", "path":"/azure-iotedge/messages/' + str(id) + '", "requestBody":' + json.dumps(message) + '}' + # print(new_payload) + # new_direct_method = CloudToDeviceMethod(method_name=method_name, payload=json.loads(new_payload)) + # new_response = registry_manager.invoke_device_module_method(device_id=device.deviceId, module_id=module_id, direct_method_request=new_direct_method) + # print(json.dumps(new_response.payload, indent = 2)) + # print(message['tags']) + + + # print("\n\n\nREMOVING\n\n\n") + # new_payload = '{"method":"PUT", "path":"/azure-iotedge/messages/' + str(id) + '", "requestBody":' + json.dumps(message) + '}' + # print(new_payload) + # new_direct_method = CloudToDeviceMethod(method_name=method_name, payload=json.loads(new_payload)) + # new_response = registry_manager.invoke_device_module_method(device_id=device.deviceId, module_id=module_id, direct_method_request=new_direct_method) + # print(json.dumps(new_response.payload, indent = 2)) + except Exception as e: + logger.error(str(e)) \ No newline at end of file