From 888352d3b8a01f6d2eff1739d83c56ae0a1e105d Mon Sep 17 00:00:00 2001 From: Quentin WEPHRE Date: Mon, 17 Mar 2025 08:33:46 +0100 Subject: [PATCH] New functions using Azure SDK for Python --- Python/azure_iot_hub_create_devices.py | 56 +++++ .../azure_iot_hub_get_connection_strings.py | 41 ++++ Python/azure_iot_hub_list_devices.py | 59 +++++ Python/calc_three_phase.py | 43 ++++ Python/server_async.py | 219 ++++++++++++++++++ 5 files changed, 418 insertions(+) create mode 100644 Python/azure_iot_hub_create_devices.py create mode 100644 Python/azure_iot_hub_get_connection_strings.py create mode 100644 Python/azure_iot_hub_list_devices.py create mode 100644 Python/calc_three_phase.py create mode 100644 Python/server_async.py diff --git a/Python/azure_iot_hub_create_devices.py b/Python/azure_iot_hub_create_devices.py new file mode 100644 index 0000000..40d8acd --- /dev/null +++ b/Python/azure_iot_hub_create_devices.py @@ -0,0 +1,56 @@ +from azure.iot.hub import IoTHubRegistryManager +from azure.iot.hub.models import Twin, TwinProperties +from dotenv import load_dotenv +import pandas as pd + +import os +import json + +load_dotenv() + +INPUT_FILE = "cottonwood_devices.xlsx" # Path to your Excel file +SITE_NAME = "COTTONWOOD" # Parameterized site name +VERSION = "1.5.0" # Parameterized version + +# Authenticate to your Azure account +# CONNECTION_STRING = str(os.getenv("CONNECTION_STRING_SAFT_PROD")) +CONNECTION_STRING = str(os.getenv("CONNECTION_STRING_INOX_PROD")) +if CONNECTION_STRING == "": + print("Provide a connection string for the Iot Hub before running the script!") + exit(13) + + +df = pd.read_excel(INPUT_FILE, header=None) + +registry_manager = IoTHubRegistryManager.from_connection_string(CONNECTION_STRING) + +def create_device(device_name, number): + try: + # Create the device + device = registry_manager.create_device_with_sas( + device_name, + primary_key="", secondary_key="", + status="enabled", + iot_edge=True + ) + print(f"Created IoT Edge-enabled device: {device_name}") + + # Set tags + twin = registry_manager.get_twin(device_name) + twin_patch = Twin(properties=TwinProperties(desired={}), tags={ + "site": SITE_NAME, + "number": number, + "version": VERSION + }) + registry_manager.update_twin(device_name, twin_patch, twin.etag) + print(f"Updated tags for {device_name}") + except Exception as e: + print(f"Error processing {device_name}: {e}") + +# Loop through the Excel file and process each device +for index, row in df.iterrows(): + device_name = str(row[0]).strip() + if device_name: + create_device(device_name, index + 1) + +print("Device provisioning completed.") \ No newline at end of file diff --git a/Python/azure_iot_hub_get_connection_strings.py b/Python/azure_iot_hub_get_connection_strings.py new file mode 100644 index 0000000..1be5006 --- /dev/null +++ b/Python/azure_iot_hub_get_connection_strings.py @@ -0,0 +1,41 @@ +from azure.iot.hub import IoTHubRegistryManager +from azure.iot.hub.protocol.models import QuerySpecification +from azure.iot.hub.models import CloudToDeviceMethod, CloudToDeviceMethodResult +from dotenv import load_dotenv + +import json +import os + +load_dotenv() + +# Authenticate to your Azure account +# CONNECTION_STRING = str(os.getenv("CONNECTION_STRING_SAFT_PROD")) +CONNECTION_STRING = str(os.getenv("CONNECTION_STRING_INOX_PROD")) +if CONNECTION_STRING == "": + print("Provide a connection string for the Iot Hub before running the script!") + exit(13) + +SITE_NAME = "COTTONWOOD" + + +registry_manager = IoTHubRegistryManager.from_connection_string(CONNECTION_STRING) +query_spec = QuerySpecification(query="SELECT * FROM devices WHERE IS_DEFINED(tags.site) AND tags.site = '" + SITE_NAME + "' AND capabilities.iotEdge = true") + +query_result = registry_manager.query_iot_hub(query_spec) + +devices = [] +for item in query_result.items: + deviceId = str(item.device_id) + site = str(item.tags['site']) + number = int(item.tags['number']) + cloud_version = str(item.tags['version']) + devices.append([deviceId, site, number, cloud_version]) + +ordered_devices = sorted(devices, key = lambda x: (x[1], x[2])) + +for i in ordered_devices: + device_info = registry_manager.get_device(i[0]) + primary_key = device_info.authentication.symmetric_key.primary_key + connection_string = f"HostName={CONNECTION_STRING.split(';')[0].split('=')[1]};DeviceId={i[0]};SharedAccessKey={primary_key}" + print(i[1], i[2], i[0]) + print(connection_string) \ No newline at end of file diff --git a/Python/azure_iot_hub_list_devices.py b/Python/azure_iot_hub_list_devices.py new file mode 100644 index 0000000..fc97a1a --- /dev/null +++ b/Python/azure_iot_hub_list_devices.py @@ -0,0 +1,59 @@ +from azure.iot.hub import IoTHubRegistryManager +from azure.iot.hub.protocol.models import QuerySpecification +from azure.iot.hub.models import CloudToDeviceMethod, CloudToDeviceMethodResult +from dotenv import load_dotenv + +import json +import os + +load_dotenv() + +module_id = "thingspro-agent" +method_name = "thingspro-api-v1" +payload = '{"method":"GET", "path":"/device/general"}' + +# Install the Azure IoT Hub SDK: +# pip install azure-iot-hub + +# Authenticate to your Azure account +# CONNECTION_STRING = str(os.getenv("CONNECTION_STRING_SAFT_PROD")) +CONNECTION_STRING = str(os.getenv("CONNECTION_STRING_INOX_PROD")) +if CONNECTION_STRING == "": + print("Provide a connection string for the Iot Hub before running the script!") + exit(13) + + +registry_manager = IoTHubRegistryManager.from_connection_string(CONNECTION_STRING) +query_spec = QuerySpecification(query="SELECT * FROM devices WHERE IS_DEFINED(tags.site) AND capabilities.iotEdge = true AND tags.site = 'PIERREFONDS'") + +query_result = registry_manager.query_iot_hub(query_spec) + +devices = [] +for item in query_result.items: + deviceId = str(item.device_id) + site = str(item.tags['site']) + number = int(item.tags['number']) + cloud_version = str(item.tags['version']) + devices.append([deviceId, site, number, cloud_version]) + +ordered_devices = sorted(devices, key = lambda x: (x[1], x[2])) + +for index in range(len(ordered_devices)): + current_device_modules = registry_manager.get_modules(ordered_devices[index][0]) + for module in current_device_modules: + if module.module_id == module_id: + thingspro_module = module + print(thingspro_module) + try: + direct_method = CloudToDeviceMethod(method_name=method_name, payload=json.loads(payload)) + response = registry_manager.invoke_device_module_method(device_id=ordered_devices[index][0], module_id=module_id, direct_method_request=direct_method) + # print(str(ordered_devices[index][0]), str(ordered_devices[index][1]), str(ordered_devices[index][2]), response.payload['data']['description'], "device version: " + str(response.payload['data']['firmwareVersion']), "cloud version: " + str(ordered_devices[index][3]), sep=";") + device_version = str(response.payload['data']['firmwareVersion']) + if device_version != "1.6.0": + payload = '{"deleteFileAfterInstallComplete": true, "install": true, "url": "https://files.thingsprocloud.com/package/Upgrade_AIG-301_2.5.0-4404_IMG_1.5_to_1.6.0.yaml"}' + direct_method = CloudToDeviceMethod(method_name=method_name, payload=json.loads(payload)) + except: + print(str(ordered_devices[index][0]), str(ordered_devices[index][1]), str(ordered_devices[index][2]), "UNREACHABLE", "device version: UNREACHABLE", "cloud version: " + str(ordered_devices[index][3]), sep=";") + else: + print("No thingspro-agent available for " + ordered_devices[index][0] + " (" + ordered_devices[index][1] + " " + ordered_devices[index][2] + ")") + \ No newline at end of file diff --git a/Python/calc_three_phase.py b/Python/calc_three_phase.py new file mode 100644 index 0000000..4738e1f --- /dev/null +++ b/Python/calc_three_phase.py @@ -0,0 +1,43 @@ +import math + +# Given values +V_LL_AB = 135 # Line-to-line voltage AB in Volts +V_LL_BC = 0 # Line-to-line voltage BC in Volts +V_LL_CA = 135 # Line-to-line voltage CA in Volts +I_A = 0.1 # Current in phase A in Amperes +I_B = 0 # Current in phase B in Amperes +I_C = 0 # Current in phase C in Amperes +cos_phi_A = 0.98 # Power factor for phase A +cos_phi_B = 1 # Power factor for phase B +cos_phi_C = 1 # Power factor for phase C + +# Calculate phase voltage from line-to-line voltage +V_ph = V_LL_AB / math.sqrt(3) + +# Power calculations for each phase +# Phase A +P_A = V_ph * I_A * cos_phi_A +Q_A = V_ph * I_A * math.sqrt(1 - cos_phi_A**2) if I_A > 0 else 0 + +# Phase B +P_B = V_ph * I_B * cos_phi_B +Q_B = V_ph * I_B * math.sqrt(1 - cos_phi_B**2) if I_B > 0 else 0 + +# Phase C +P_C = V_ph * I_C * cos_phi_C +Q_C = V_ph * I_C * math.sqrt(1 - cos_phi_C**2) if I_C > 0 else 0 + +# Total power calculations +P_total = P_A + P_B + P_C +Q_total = Q_A + Q_B + Q_C + +# Output the results +print(f"Phase Voltage (V_ph): {V_ph:.2f} V") +print(f"Active Power (P_A): {P_A:.2f} W") +print(f"Reactive Power (Q_A): {Q_A:.2f} VAR") +print(f"Active Power (P_B): {P_B:.2f} W") +print(f"Reactive Power (Q_B): {Q_B:.2f} VAR") +print(f"Active Power (P_C): {P_C:.2f} W") +print(f"Reactive Power (Q_C): {Q_C:.2f} VAR") +print(f"Total Active Power (P_total): {P_total:.2f} W") +print(f"Total Reactive Power (Q_total): {Q_total:.2f} VAR") diff --git a/Python/server_async.py b/Python/server_async.py new file mode 100644 index 0000000..bec0135 --- /dev/null +++ b/Python/server_async.py @@ -0,0 +1,219 @@ +#!/usr/bin/env python3 +"""Pymodbus asynchronous Server Example. + +An example of a multi threaded asynchronous server. + +usage:: + + server_async.py [-h] [--comm {tcp,udp,serial,tls}] + [--framer {ascii,rtu,socket,tls}] + [--log {critical,error,warning,info,debug}] + [--port PORT] [--store {sequential,sparse,factory,none}] + [--slaves SLAVES] + + -h, --help + show this help message and exit + -c, --comm {tcp,udp,serial,tls} + set communication, default is tcp + -f, --framer {ascii,rtu,socket,tls} + set framer, default depends on --comm + -l, --log {critical,error,warning,info,debug} + set log level, default is info + -p, --port PORT + set port + set serial device baud rate + --store {sequential,sparse,factory,none} + set datastore type + --slaves SLAVES + set number of slaves to respond to + +The corresponding client can be started as: + + python3 client_sync.py + +""" +import asyncio +import logging +import sys +from collections.abc import Callable +from typing import Any + + +try: + import helper # type: ignore[import-not-found] +except ImportError: + print("*** ERROR --> THIS EXAMPLE needs the example directory, please see \n\ + https://pymodbus.readthedocs.io/en/latest/source/examples.html\n\ + for more information.") + sys.exit(-1) + +from pymodbus import __version__ as pymodbus_version +from pymodbus.datastore import ( + ModbusSequentialDataBlock, + ModbusServerContext, + ModbusSlaveContext, + ModbusSparseDataBlock, +) +from pymodbus.device import ModbusDeviceIdentification +from pymodbus.server import ( + StartAsyncSerialServer, + StartAsyncTcpServer, + StartAsyncTlsServer, + StartAsyncUdpServer, +) + + +_logger = logging.getLogger(__file__) +_logger.setLevel(logging.INFO) + + +def setup_server(description=None, context=None, cmdline=None): + """Run server setup.""" + args = helper.get_commandline(server=True, description=description, cmdline=cmdline) + if context: + args.context = context + datablock: Callable[[], Any] + if not args.context: + _logger.info("### Create datastore") + # The datastores only respond to the addresses that are initialized + # If you initialize a DataBlock to addresses of 0x00 to 0xFF, a request to + # 0x100 will respond with an invalid address exception. + # This is because many devices exhibit this kind of behavior (but not all) + if args.store == "sequential": + # Continuing, use a sequential block without gaps. + datablock = lambda : ModbusSequentialDataBlock(0x00, [17] * 100) # pylint: disable=unnecessary-lambda-assignment + elif args.store == "sparse": + # Continuing, or use a sparse DataBlock which can have gaps + datablock = lambda : ModbusSparseDataBlock({0x00: 0, 0x05: 1}) # pylint: disable=unnecessary-lambda-assignment + elif args.store == "factory": + # Alternately, use the factory methods to initialize the DataBlocks + # or simply do not pass them to have them initialized to 0x00 on the + # full address range:: + datablock = lambda : ModbusSequentialDataBlock.create() # pylint: disable=unnecessary-lambda-assignment,unnecessary-lambda + + if args.slaves > 1: + # The server then makes use of a server context that allows the server + # to respond with different slave contexts for different slave ids. + # By default it will return the same context for every slave id supplied + # (broadcast mode). + # However, this can be overloaded by setting the single flag to False and + # then supplying a dictionary of slave id to context mapping:: + context = {} + + for slave in range(args.slaves): + context[slave] = ModbusSlaveContext( + di=datablock(), + co=datablock(), + hr=datablock(), + ir=datablock(), + ) + + single = False + else: + context = ModbusSlaveContext( + di=datablock(), co=datablock(), hr=datablock(), ir=datablock() + ) + single = True + + # Build data storage + args.context = ModbusServerContext(slaves=context, single=single) + + # ----------------------------------------------------------------------- # + # initialize the server information + # ----------------------------------------------------------------------- # + # If you don't set this or any fields, they are defaulted to empty strings. + # ----------------------------------------------------------------------- # + args.identity = ModbusDeviceIdentification( + info_name={ + "VendorName": "Pymodbus", + "ProductCode": "PM", + "VendorUrl": "https://github.com/pymodbus-dev/pymodbus/", + "ProductName": "Pymodbus Server", + "ModelName": "Pymodbus Server", + "MajorMinorRevision": pymodbus_version, + } + ) + return args + + +async def run_async_server(args) -> None: + """Run server.""" + txt = f"### start ASYNC server, listening on {args.port} - {args.comm}" + _logger.info(txt) + if args.comm == "tcp": + address = (args.host if args.host else "", args.port if args.port else None) + await StartAsyncTcpServer( + context=args.context, # Data storage + identity=args.identity, # server identify + address=address, # listen address + # custom_functions=[], # allow custom handling + framer=args.framer, # The framer strategy to use + # ignore_missing_slaves=True, # ignore request to a missing slave + # broadcast_enable=False, # treat slave 0 as broadcast address, + # timeout=1, # waiting time for request to complete + ) + elif args.comm == "udp": + address = ( + args.host if args.host else "127.0.0.1", + args.port if args.port else None, + ) + await StartAsyncUdpServer( + context=args.context, # Data storage + identity=args.identity, # server identify + address=address, # listen address + # custom_functions=[], # allow custom handling + framer=args.framer, # The framer strategy to use + # ignore_missing_slaves=True, # ignore request to a missing slave + # broadcast_enable=False, # treat slave 0 as broadcast address, + # timeout=1, # waiting time for request to complete + ) + elif args.comm == "serial": + # socat -d -d PTY,link=/tmp/ptyp0,raw,echo=0,ispeed=9600 + # PTY,link=/tmp/ttyp0,raw,echo=0,ospeed=9600 + await StartAsyncSerialServer( + context=args.context, # Data storage + identity=args.identity, # server identify + # timeout=1, # waiting time for request to complete + port=args.port, # serial port + # custom_functions=[], # allow custom handling + framer=args.framer, # The framer strategy to use + # stopbits=1, # The number of stop bits to use + # bytesize=8, # The bytesize of the serial messages + # parity="N", # Which kind of parity to use + baudrate=args.baudrate, # The baud rate to use for the serial device + # handle_local_echo=False, # Handle local echo of the USB-to-RS485 adaptor + # ignore_missing_slaves=True, # ignore request to a missing slave + # broadcast_enable=False, # treat slave 0 as broadcast address, + ) + elif args.comm == "tls": + address = (args.host if args.host else "", args.port if args.port else None) + await StartAsyncTlsServer( + context=args.context, # Data storage + # port=port, # on which port + identity=args.identity, # server identify + # custom_functions=[], # allow custom handling + address=address, # listen address + framer=args.framer, # The framer strategy to use + certfile=helper.get_certificate( + "crt" + ), # The cert file path for TLS (used if sslctx is None) + # sslctx=sslctx, # The SSLContext to use for TLS (default None and auto create) + keyfile=helper.get_certificate( + "key" + ), # The key file path for TLS (used if sslctx is None) + # password="none", # The password for for decrypting the private key file + # ignore_missing_slaves=True, # ignore request to a missing slave + # broadcast_enable=False, # treat slave 0 as broadcast address, + # timeout=1, # waiting time for request to complete + ) + + +async def async_helper() -> None: + """Combine setup and run.""" + _logger.info("Starting...") + run_args = setup_server(description="Run asynchronous server.") + await run_async_server(run_args) + + +if __name__ == "__main__": + asyncio.run(async_helper(), debug=True)