Files
ess-moxa-configuration-tools/Python/create_iot_hub_devices_from_excel.py

157 lines
6.8 KiB
Python

import os
import pandas as pd
from dotenv import load_dotenv
from azure.iot.hub import IoTHubRegistryManager
from azure.iot.hub.models import Twin, Device
from azure.core.exceptions import ResourceNotFoundError, ResourceExistsError
from msrest.exceptions import HttpOperationError
load_dotenv()
# --- Configuration ---
IOTHUB_CONNECTION_STRING = os.getenv("CONNECTION_STRING_INOX_PROD")
INPUT_EXCEL_FILE = 'SALINAS_DEVICE_LIST.xlsx'
OUTPUT_EXCEL_FILE = 'SALINAS_DEVICE_LIST_CS.xlsx'
def get_connection_string(device: Device, host_name: str) -> str:
"""
Constructs the primary connection string for a given device object.
:param device: The IoTHubRegistryManager Device object.
:param host_name: The IoT Hub hostname (e.g., 'your-hub.azure-devices.net').
:return: The device connection string.
"""
if not device or not device.authentication or not device.authentication.symmetric_key:
return "Error: Could not retrieve symmetric key."
primary_key = device.authentication.symmetric_key.primary_key
connection_string = (
f"HostName={host_name};"
f"DeviceId={device.device_id};"
f"SharedAccessKey={primary_key}"
)
return connection_string
def main():
"""
Main function to provision devices from an Excel file and save results.
"""
if not IOTHUB_CONNECTION_STRING:
print("Error: The IOTHUB_CONNECTION_STRING environment variable is not set.")
return
try:
registry_manager = IoTHubRegistryManager.from_connection_string(IOTHUB_CONNECTION_STRING)
# Extract the hub hostname once for use in connection strings
host_name = IOTHUB_CONNECTION_STRING.split(';')[0].split('=')[1]
df = pd.read_excel(INPUT_EXCEL_FILE)
# Add the new column to store connection strings, initialized as empty
df['connectionString'] = ''
for index, row in df.iterrows():
device_id = row['deviceId']
device_type = row['deviceType']
site = row['site']
number = row['number']
version = row.get('version')
print(f"\n--- Processing Device: {device_id} ---")
final_device_obj = None # Will hold the device object after creation/validation
try:
# 1. CHECK IF DEVICE EXISTS
existing_device = registry_manager.get_device(device_id)
print(f"Device '{device_id}' already exists. Validating configuration...")
# 2. VALIDATE IOT EDGE STATUS
is_edge_in_hub = existing_device.capabilities.iot_edge
is_edge_in_file = (device_type == 'AC_GATEWAY')
if is_edge_in_hub != is_edge_in_file:
print(f" [MISMATCH] IoT Edge status is incorrect. Deleting and recreating.")
registry_manager.delete_device(device_id)
final_device_obj = create_device_from_spec(registry_manager, device_id, device_type, site, number, version)
else:
print(" [OK] IoT Edge status is correct. Updating tags...")
update_device_tags(registry_manager, device_id, device_type, site, number, version)
final_device_obj = existing_device
except HttpOperationError as http_err:
# Check the status code on the nested 'response' object
if http_err.response.status_code == 404:
print(f"Device '{device_id}' does not exist. Creating...")
final_device_obj = create_device_from_spec(registry_manager, device_id, device_type, site, number, version)
else:
# It's a different HTTP error (e.g., 401 Unauthorized, 503 Service Unavailable)
print(f" [ERROR] An unexpected HTTP error occurred for {device_id}:")
# The response body often has the most detailed error message
if http_err.response:
print(f" Status Code: {http_err.response.status_code}")
print(f" Response Text: {http_err.response.text}")
else:
print(f" Full Error: {http_err}")
except Exception as e:
print(f"{e.__class__} {e.__str__}")
# 4. IF A DEVICE WAS CREATED OR VALIDATED, GET AND SAVE ITS CONNECTION STRING
if final_device_obj:
connection_string = get_connection_string(final_device_obj, host_name)
# Use .at for efficient, label-based assignment
df.at[index, 'connectionString'] = connection_string
print(f" > Connection string recorded for '{device_id}'.")
# 5. SAVE THE UPDATED DATAFRAME TO A NEW EXCEL FILE
df.to_excel(OUTPUT_EXCEL_FILE, index=False)
print(f"\n--- Processing Complete ---")
print(f"Results, including connection strings, have been saved to '{OUTPUT_EXCEL_FILE}'.")
except FileNotFoundError:
print(f"Error: The input file '{INPUT_EXCEL_FILE}' was not found.")
except Exception as e:
print(f"A general error occurred: {e}")
def create_device_from_spec(registry_manager, device_id, device_type, site, number, version) -> Device:
"""Creates a device, sets its tags, and returns the created device object."""
try:
is_edge = (device_type == 'AC_GATEWAY')
device = registry_manager.create_device_with_sas(
device_id, "", "", status="enabled", iot_edge=is_edge
)
print(f" > Created {'Edge' if is_edge else 'non-Edge'} device '{device_id}'.")
# After creating, update the tags
update_device_tags(registry_manager, device_id, device_type, site, number, version)
return device # Return the newly created device object
except ResourceExistsError:
print(f" [WARNING] Device {device_id} already exists. Attempting to retrieve it.")
return registry_manager.get_device(device_id)
except Exception as e:
print(f" [ERROR] Failed to create device {device_id}: {e}")
return None
def update_device_tags(registry_manager, device_id, device_type, site, number, version):
"""Updates the device twin tags based on its type."""
try:
tags = {
"site": site,
"number": int(number),
"deviceType": device_type
}
if device_type == 'AC_GATEWAY':
tags["version"] = str(version)
twin = registry_manager.get_twin(device_id)
twin_patch = Twin(tags=tags)
registry_manager.update_twin(device_id, twin_patch, twin.etag)
print(f" > Successfully updated tags for '{device_id}'.")
except Exception as e:
print(f" [ERROR] Failed to update tags for {device_id}: {e}")
if __name__ == '__main__':
main()