tagging cube type
This commit is contained in:
45
Python/azure_iot_hub_cube_set_tags.py
Normal file
45
Python/azure_iot_hub_cube_set_tags.py
Normal file
@@ -0,0 +1,45 @@
|
||||
from azure.iot.hub import IoTHubRegistryManager
|
||||
from azure.iot.hub.protocol.models import QuerySpecification, Module
|
||||
from azure.iot.hub.models import CloudToDeviceMethod, CloudToDeviceMethodResult
|
||||
from dotenv import load_dotenv
|
||||
from isight_device import iSightDevice, CubeDevice
|
||||
import json
|
||||
import os
|
||||
from azure.iot.hub.models import Twin, TwinProperties
|
||||
|
||||
load_dotenv()
|
||||
|
||||
# Install the Azure IoT Hub SDK:
|
||||
# pip install azure-iot-hub
|
||||
|
||||
# Authenticate to your Azure account
|
||||
CONNECTION_STRING = str(os.getenv("CONNECTION_STRING_SAFT_PROD"))
|
||||
# CONNECTION_STRING = str(os.getenv("CONNECTION_STRING_INOX_PROD"))
|
||||
if CONNECTION_STRING == "":
|
||||
print("Provide a connection string for the Iot Hub before running the script!")
|
||||
exit(13)
|
||||
|
||||
|
||||
registry_manager = IoTHubRegistryManager.from_connection_string(CONNECTION_STRING)
|
||||
query_spec = QuerySpecification(query="SELECT * FROM devices")
|
||||
|
||||
query_result = registry_manager.query_iot_hub(query_spec)
|
||||
|
||||
devices = []
|
||||
i = 1
|
||||
for item in query_result.items:
|
||||
item_name = str(item.device_id)
|
||||
if "cube-" in item_name:
|
||||
print(str(i), end="\t")
|
||||
i = i + 1
|
||||
print(item_name, end="\t")
|
||||
try:
|
||||
twin = registry_manager.get_twin(item_name)
|
||||
twin_patch = Twin(properties=TwinProperties(desired={}), tags={
|
||||
"deviceType": "CUBE"
|
||||
})
|
||||
registry_manager.update_twin(item_name, twin_patch, twin.etag)
|
||||
print("tagged!")
|
||||
except Exception as e:
|
||||
print("error!")
|
||||
continue
|
||||
@@ -21,3 +21,13 @@ class iSightDevice:
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.deviceId} {self.site} {self.number} {self.cloudVersion}"
|
||||
|
||||
class CubeDevice:
|
||||
def __init__(self, deviceId: str):
|
||||
if not isinstance(deviceId, str):
|
||||
raise TypeError("deviceId must be a str")
|
||||
|
||||
self.deviceId = deviceId
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.deviceId}"
|
||||
Reference in New Issue
Block a user