from azure.iot.hub import IoTHubRegistryManager from azure.iot.hub.protocol.models import QuerySpecification, Module from azure.iot.hub.models import CloudToDeviceMethod, CloudToDeviceMethodResult from dotenv import load_dotenv from isight_device import iSightDevice import json import os load_dotenv() module_id = "thingspro-agent" method_name = "thingspro-api-v1" payload = '{"method":"GET", "path":"/device/general"}' # Install the Azure IoT Hub SDK: # pip install azure-iot-hub # Authenticate to your Azure account # CONNECTION_STRING = str(os.getenv("CONNECTION_STRING_SAFT_PROD")) CONNECTION_STRING = str(os.getenv("CONNECTION_STRING_INOX_PROD")) if CONNECTION_STRING == "": print("Provide a connection string for the Iot Hub before running the script!") exit(13) registry_manager = IoTHubRegistryManager.from_connection_string(CONNECTION_STRING) query_spec = QuerySpecification(query="SELECT * FROM devices WHERE IS_DEFINED(tags.site) AND capabilities.iotEdge = true ") query_result = registry_manager.query_iot_hub(query_spec) devices = [] for item in query_result.items: # currentDevice = iSightDevice(str(item.device_id), str(item.tags['site']), int(item.tags['number']), str(item.tags['version'])) devices.append(iSightDevice(str(item.device_id), str(item.tags['site']), int(item.tags['number']), str(item.tags['version']))) devices.sort(key = lambda d: (d.site, d.number)) for device in devices: print(device, end="\t") current_device_modules = registry_manager.get_modules(device.deviceId) for module in current_device_modules: if (module.module_id == "thingspro-agent"): device.setModule(module) try: direct_method = CloudToDeviceMethod(method_name=method_name, payload=json.loads(payload)) response = registry_manager.invoke_device_module_method(device_id=device.deviceId, module_id=module_id, direct_method_request=direct_method) device_version = str(response.payload['data']['firmwareVersion']) print(device_version) except: print("ERROR")