207 lines
9.6 KiB
Python
207 lines
9.6 KiB
Python
from azure.iot.hub import IoTHubRegistryManager
|
|
from azure.iot.hub.protocol.models import QuerySpecification, Module
|
|
from azure.iot.hub.models import CloudToDeviceMethod, CloudToDeviceMethodResult
|
|
from dotenv import load_dotenv
|
|
from isight_device import iSightDevice
|
|
from collections import defaultdict
|
|
|
|
import json
|
|
import os
|
|
import logging
|
|
import sys
|
|
|
|
def generate_oe(device_id, modbus_tags):
|
|
return {
|
|
'description': '',
|
|
'outputTopic': 'REL_OE_PCS',
|
|
'properties': [
|
|
{'key': 'cdid', 'value': device_id},
|
|
{'key': 'deviceType', 'value': 'AC_GATEWAY'}
|
|
],
|
|
'tags': {
|
|
'modbus_tcp_master': modbus_tags
|
|
},
|
|
'sendOutThreshold': {
|
|
'mode': 'immediately',
|
|
'size': 4096,
|
|
'time': 60,
|
|
'sizeIdleTimer': {
|
|
'enable': True,
|
|
'time': 60
|
|
}
|
|
},
|
|
'minPublishInterval': 0,
|
|
'samplingMode': 'allChangedValues',
|
|
'customSamplingRate': False,
|
|
'pollingInterval': 0,
|
|
'onChange': True,
|
|
'enable': True,
|
|
'format': (
|
|
'{version:3,deviceId:("' + device_id +
|
|
'"),emissionDate:(now|todateiso8601),deviceType:("AC_GATEWAY"),'
|
|
'metrics:[{idAsset:(.srcName),name:(.tagName),value:.dataValue,'
|
|
'timestamp:((.ts/1000000)|todateiso8601)}]}'
|
|
)
|
|
}
|
|
|
|
def generate_12h(device_id, modbus_tags):
|
|
return {
|
|
'description': '',
|
|
'outputTopic': 'REL_OE_PCS_12H',
|
|
'properties': [
|
|
{'key': 'cdid', 'value': device_id},
|
|
{'key': 'deviceType', 'value': 'AC_GATEWAY'}
|
|
],
|
|
'tags': {
|
|
'modbus_tcp_master': modbus_tags
|
|
},
|
|
'sendOutThreshold': {
|
|
'mode': 'byTime',
|
|
'size': 4096,
|
|
'time': 43200,
|
|
'sizeIdleTimer': {
|
|
'enable': True,
|
|
'time': 60
|
|
}
|
|
},
|
|
'minPublishInterval': 0,
|
|
'samplingMode': 'latestValues',
|
|
'customSamplingRate': False,
|
|
'pollingInterval': 43200,
|
|
'onChange': False,
|
|
'enable': True,
|
|
'format': (
|
|
'{version:3,deviceId:("' + device_id +
|
|
'"),emissionDate:(now|todateiso8601),deviceType:("AC_GATEWAY"),'
|
|
'metrics:[{idAsset:(.srcName),name:(.tagName),value:.dataValue,'
|
|
'timestamp:((.ts/1000000)|todateiso8601)}]}'
|
|
)
|
|
}
|
|
|
|
# Configure logger to use stderr
|
|
logging.basicConfig(
|
|
level=logging.ERROR,
|
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
|
stream=sys.stderr # <- Key line
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
load_dotenv()
|
|
|
|
module_id = "thingspro-agent"
|
|
method_name = "thingspro-api-v1"
|
|
|
|
CONNECTION_STRING = str(os.getenv("CONNECTION_STRING_INOX_PROD"))
|
|
if CONNECTION_STRING == "":
|
|
print("Provide a connection string for the Iot Hub before running the script!")
|
|
exit(13)
|
|
|
|
registry_manager = IoTHubRegistryManager.from_connection_string(CONNECTION_STRING)
|
|
query_spec = QuerySpecification(query="SELECT * FROM devices WHERE capabilities.iotEdge = true ")
|
|
|
|
query_result = registry_manager.query_iot_hub(query_spec)
|
|
|
|
devices = []
|
|
for item in query_result.items:
|
|
currentDevice = iSightDevice(str(item.device_id), str(item.tags['site']), int(item.tags['number']), str(item.tags['version']))
|
|
devices.append(iSightDevice(str(item.device_id), str(item.tags['site']), int(item.tags['number']), str(item.tags['version'])))
|
|
|
|
devices.sort(key = lambda d: (d.site, d.number))
|
|
|
|
for device in devices:
|
|
current_device_modules = registry_manager.get_modules(device.deviceId)
|
|
for module in current_device_modules:
|
|
if (module.module_id == module_id):
|
|
#print(f"{module.module_id}")
|
|
device.setModule(module)
|
|
payload = '{"method":"GET", "path":"/azure-iotedge/messages"}'
|
|
try:
|
|
direct_method = CloudToDeviceMethod(method_name=method_name, payload=json.loads(payload))
|
|
response = registry_manager.invoke_device_module_method(device_id=device.deviceId, module_id=module_id, direct_method_request=direct_method)
|
|
device_messages = json.dumps(response.payload, indent = 2)
|
|
parsed_messages = json.loads(device_messages)
|
|
#print("Parsed messages")
|
|
message_ids_with_tags = {}
|
|
tags_to_on_event = defaultdict(set)
|
|
for message in parsed_messages["data"]:
|
|
|
|
message_id = message["id"]
|
|
topic_name = message["outputTopic"]
|
|
message_content = message
|
|
|
|
tags = message.get("tags", {}).get("modbus_tcp_master", {})
|
|
|
|
for asset, tag_list in tags.items():
|
|
if "InvSt" in tag_list and "OE" in topic_name:
|
|
message_ids_with_tags.setdefault(message_id, message)
|
|
tags_to_on_event[asset].add("InvSt")
|
|
print(f"{device.getDeviceId()} {device.getSite()} {device.getNumber()} {message_id} {topic_name} {asset} InvSt")
|
|
if "Fault_Id" in tag_list and "OE" in topic_name:
|
|
message_ids_with_tags.setdefault(message_id, message)
|
|
tags_to_on_event[asset].add("Fault_Id")
|
|
print(f"{device.getDeviceId()} {device.getSite()} {device.getNumber()} {message_id} {topic_name} {asset} Fault_Id")
|
|
# print(f"{device.getDeviceId()} {device.getSite()} {device.getNumber()}")
|
|
# print(f"Messages to update: {', '.join(str(key) for key in message_ids_with_tags.keys())}")
|
|
# for asset, tag in tags_to_on_event.items():
|
|
# print(f"Alarms to make on event:")
|
|
# print(f"{asset}: {', '.join(tag)}")
|
|
# print(f"Finishing on {device}")
|
|
|
|
# oe_exists = False
|
|
# twelve_exists = False
|
|
|
|
# for message in message_ids_with_tags.values():
|
|
# if message['outputTopic'] == "REL_OE_PCS":
|
|
# oe_exists = True
|
|
# if message['outputTopic'] == "REL_OE_PCS_12H":
|
|
# twelve_exists = True
|
|
|
|
# if (oe_exists == False or twelve_exists == False):
|
|
# first_message = next(iter(message_ids_with_tags.values()))
|
|
# properties = first_message['properties']
|
|
# cdid = next((item['value'] for item in properties if item['key'] == 'cdid'))
|
|
# print(f"{cdid}")
|
|
|
|
# modbus_tags = {
|
|
# asset: sorted(list(tags))
|
|
# for asset, tags in tags_to_on_event.items()
|
|
# }
|
|
# new_payload = ""
|
|
# if (oe_exists == False):
|
|
# new_payload = '{"method":"POST", "path":"/azure-iotedge/messages", "requestBody":' + json.dumps(generate_oe(cdid, modbus_tags)) + '}'
|
|
# new_direct_method = CloudToDeviceMethod(method_name=method_name, payload=json.loads(new_payload))
|
|
# new_response = registry_manager.invoke_device_module_method(device_id=device.deviceId, module_id=module_id, direct_method_request=new_direct_method)
|
|
# print(json.dumps(new_response.payload, indent = 2))
|
|
|
|
# if (twelve_exists == False):
|
|
# new_payload = '{"method":"POST", "path":"/azure-iotedge/messages", "requestBody":' + json.dumps(generate_12h(cdid, modbus_tags)) + '}'
|
|
# new_direct_method = CloudToDeviceMethod(method_name=method_name, payload=json.loads(new_payload))
|
|
# new_response = registry_manager.invoke_device_module_method(device_id=device.deviceId, module_id=module_id, direct_method_request=new_direct_method)
|
|
# print(json.dumps(new_response.payload, indent = 2))
|
|
|
|
# for id, message in message_ids_with_tags.items():
|
|
# if "OE" in message['outputTopic']:
|
|
# continue
|
|
# tags_to_remove = {"InvSt", "Fault_Id"}
|
|
# for protocol, versions in message['tags'].items():
|
|
# for version, tag_list in versions.items():
|
|
# message['tags'][protocol][version] = [
|
|
# tag for tag in tag_list if tag not in tags_to_remove
|
|
# ]
|
|
# print("\n\n\nREMOVING\n\n\n")
|
|
# new_payload = '{"method":"PUT", "path":"/azure-iotedge/messages/' + str(id) + '", "requestBody":' + json.dumps(message) + '}'
|
|
# print(new_payload)
|
|
# new_direct_method = CloudToDeviceMethod(method_name=method_name, payload=json.loads(new_payload))
|
|
# new_response = registry_manager.invoke_device_module_method(device_id=device.deviceId, module_id=module_id, direct_method_request=new_direct_method)
|
|
# print(json.dumps(new_response.payload, indent = 2))
|
|
# print(message['tags'])
|
|
|
|
|
|
# print("\n\n\nREMOVING\n\n\n")
|
|
# new_payload = '{"method":"PUT", "path":"/azure-iotedge/messages/' + str(id) + '", "requestBody":' + json.dumps(message) + '}'
|
|
# print(new_payload)
|
|
# new_direct_method = CloudToDeviceMethod(method_name=method_name, payload=json.loads(new_payload))
|
|
# new_response = registry_manager.invoke_device_module_method(device_id=device.deviceId, module_id=module_id, direct_method_request=new_direct_method)
|
|
# print(json.dumps(new_response.payload, indent = 2))
|
|
except Exception as e:
|
|
logger.error(str(e)) |