from azure.iot.hub import IoTHubRegistryManager from azure.iot.hub.protocol.models import QuerySpecification from azure.iot.hub.models import CloudToDeviceMethod, CloudToDeviceMethodResult from dotenv import load_dotenv import json import os load_dotenv() module_id = "thingspro-agent" method_name = "thingspro-api-v1" payload = '{"method":"GET", "path":"/device/general"}' # Install the Azure IoT Hub SDK: # pip install azure-iot-hub # Authenticate to your Azure account # CONNECTION_STRING = str(os.getenv("CONNECTION_STRING_SAFT_PROD")) CONNECTION_STRING = str(os.getenv("CONNECTION_STRING_INOX_PROD")) if CONNECTION_STRING == "": print("Provide a connection string for the Iot Hub before running the script!") exit(13) registry_manager = IoTHubRegistryManager.from_connection_string(CONNECTION_STRING) query_spec = QuerySpecification(query="SELECT * FROM devices WHERE IS_DEFINED(tags.site) AND capabilities.iotEdge = true AND tags.site = 'PIERREFONDS'") query_result = registry_manager.query_iot_hub(query_spec) devices = [] for item in query_result.items: deviceId = str(item.device_id) site = str(item.tags['site']) number = int(item.tags['number']) cloud_version = str(item.tags['version']) devices.append([deviceId, site, number, cloud_version]) ordered_devices = sorted(devices, key = lambda x: (x[1], x[2])) for index in range(len(ordered_devices)): current_device_modules = registry_manager.get_modules(ordered_devices[index][0]) for module in current_device_modules: if module.module_id == module_id: thingspro_module = module print(thingspro_module) try: direct_method = CloudToDeviceMethod(method_name=method_name, payload=json.loads(payload)) response = registry_manager.invoke_device_module_method(device_id=ordered_devices[index][0], module_id=module_id, direct_method_request=direct_method) # print(str(ordered_devices[index][0]), str(ordered_devices[index][1]), str(ordered_devices[index][2]), response.payload['data']['description'], "device version: " + str(response.payload['data']['firmwareVersion']), "cloud version: " + str(ordered_devices[index][3]), sep=";") device_version = str(response.payload['data']['firmwareVersion']) if device_version != "1.6.0": payload = '{"deleteFileAfterInstallComplete": true, "install": true, "url": "https://files.thingsprocloud.com/package/Upgrade_AIG-301_2.5.0-4404_IMG_1.5_to_1.6.0.yaml"}' direct_method = CloudToDeviceMethod(method_name=method_name, payload=json.loads(payload)) except: print(str(ordered_devices[index][0]), str(ordered_devices[index][1]), str(ordered_devices[index][2]), "UNREACHABLE", "device version: UNREACHABLE", "cloud version: " + str(ordered_devices[index][3]), sep=";") else: print("No thingspro-agent available for " + ordered_devices[index][0] + " (" + ordered_devices[index][1] + " " + ordered_devices[index][2] + ")")