Files
ess-moxa-configuration-tools/Python/azure_iot_hub_restart_iotedge.py
2025-06-27 08:40:36 +02:00

51 lines
2.0 KiB
Python

from azure.iot.hub import IoTHubRegistryManager
from azure.iot.hub.protocol.models import QuerySpecification, Module
from azure.iot.hub.models import CloudToDeviceMethod, CloudToDeviceMethodResult
from dotenv import load_dotenv
from isight_device import iSightDevice
import json
import os
load_dotenv()
module_id = "$edgeAgent"
method_name = "RestartModule"
module = "edgeHub"
payload = '{"schemaVersion":"1.0", "id":"' + module + '"}'
# Install the Azure IoT Hub SDK:
# pip install azure-iot-hub
# Authenticate to your Azure account
# CONNECTION_STRING = str(os.getenv("CONNECTION_STRING_SAFT_PROD"))
CONNECTION_STRING = str(os.getenv("CONNECTION_STRING_INOX_PROD"))
if CONNECTION_STRING == "":
print("Provide a connection string for the Iot Hub before running the script!")
exit(13)
registry_manager = IoTHubRegistryManager.from_connection_string(CONNECTION_STRING)
query_spec = QuerySpecification(query="SELECT * FROM devices WHERE IS_DEFINED(tags.site) AND tags.site = 'DANISH' AND capabilities.iotEdge = true ")
query_result = registry_manager.query_iot_hub(query_spec)
devices = []
for item in query_result.items:
# currentDevice = iSightDevice(str(item.device_id), str(item.tags['site']), int(item.tags['number']), str(item.tags['version']))
devices.append(iSightDevice(str(item.device_id), str(item.tags['site']), int(item.tags['number']), str(item.tags['version'])))
devices.sort(key = lambda d: (d.site, d.number))
for device in devices:
print(device, end="\t")
current_device_modules = registry_manager.get_modules(device.deviceId)
for module in current_device_modules:
if (module.module_id == "thingspro-agent"):
device.setModule(module)
try:
direct_method = CloudToDeviceMethod(method_name=method_name, payload=json.loads(payload))
response = registry_manager.invoke_device_module_method(device_id=device.deviceId, module_id=module_id, direct_method_request=direct_method)
print(response)
except:
print("ERROR")