From b8c18689bf3c56168b8631b73a127662e8d43592 Mon Sep 17 00:00:00 2001 From: Quentin WEPHRE Date: Thu, 26 Mar 2026 08:46:57 +0100 Subject: [PATCH] added on event topics --- Moulinette/data_path_config.py | 78 +++++++++++++++++++++++++++++++++- 1 file changed, 76 insertions(+), 2 deletions(-) diff --git a/Moulinette/data_path_config.py b/Moulinette/data_path_config.py index 7bcadd5..0ef6476 100644 --- a/Moulinette/data_path_config.py +++ b/Moulinette/data_path_config.py @@ -40,7 +40,7 @@ allowed_name_characters.append('.') logging.basicConfig(format='%(asctime)s %(levelname)-8s %(message)s', filename='data_config_debug.log', filemode='w', level=logging.DEBUG, datefmt='%Y%m%d%H%M%S') dir_name = 'I-Sight_Generated_Files' -input_datamodel = 'DATAMODEL_1.0.6_CW.xlsx' +input_datamodel = 'DATAMODEL_Ruakaka_QWE1.xlsx' shell_script_name = dir_name + '/I-Sight_Configuration_' global_shell_script_name = dir_name + '/I-Sight_Global_Configuration.sh' @@ -88,7 +88,7 @@ def excel_parser(sheet_det, slave_no, slave_device, allowed_name_characters, dsh logging.debug("Starting registration of Modbus commands for " + str(assets_name[slave_no]) + ".") for index, row in filtered_df_prop.iterrows(): step2_data = {} - logging.debug("Registering command " + row['metric_name'] + "...") + logging.debug("Registering command " + row['metric_name'] + " on address " + str(row['modbus_address'])) if allowed_data_sizes[allowed_data_types.index(row['type'])] != 1 and int(row['modbus_quantity']) % allowed_data_sizes[allowed_data_types.index(row['type'])] != 0: logging.debug("Wrong quantity (" + str(int(row['modbus_quantity'])) + ") for data type " + str(row['type']) + ", using default size " + str(allowed_data_sizes[allowed_data_types.index(row['type'])]) + ".") step2_data["readQuantity"] = allowed_data_sizes[allowed_data_types.index(row['type'])] @@ -246,6 +246,80 @@ def jq_filter(current_device, dsh, dsh_global, row_device): dsh_global.write("echo -e \"\\n\" >> global_config.log\n\n") dsh_global.write("echo \"Finished work on " + str(row_device['device_name']) + "\"") + if filter == "generic_metrics": + logging.debug("Creating OnEvent telemetry topic.") + jq_data = {} + jq_data["enable"] = False + jq_data["properties"] = [{"key": "deviceType", "value": "AC_GATEWAY"}, {"key": "cdid", "value": current_device}] + jq_data["outputTopic"] = "PCS_OnEvent" + jq_data["sendOutThreshold"] = {"mode": "immediately", "size": int(4096), "time": int(60), "sizeIdleTimer": {"enable": True, "time": int(60)}} + jq_data["minPublishInterval"] = int(0) + jq_data["samplingMode"] = "allChangedValues" + jq_data["customSamplingRate"] = False + jq_data["pollingInterval"] = int(0) + jq_data["onChange"] = True + final_filter = row["jq_filter"].replace("***device_name***", current_device) + cmd_list = {} + jq_data["format"] = final_filter + jq_data["tags"]= {"modbus_tcp_master": cmd_list} + json_object = json.dumps(jq_data) + dsh.write("# [STEP 4] Creating " + filter + " OnEvent Azure telemetry topic\n\n") + dsh.write( + inspect.cleandoc( + """sudo curl -X POST https://127.0.0.1:8443/api/v1/azure-iotedge/messages \\ + -H "Content-Type: application/json" \\ + -H "mx-api-token:$(sudo cat /var/thingspro/data/mx-api-token)" \\ + -d """) + "'" + str(json_object) +"'" + """ -k | jq + \n""" + ) + dsh.write('printf "\\n \\n" >> data_shell_script.log\n') + dsh.write("\n\n") + + dsh_global.write("### Creating OnEvent Azure messages " + "" + " for " + str(row_device['device_name']) + "\n") + dsh_global.write("curl -s -X POST -k https://" + row_device['device_ip_address_http'] + ":8443/api/v1/azure-iotedge/messages \\\n") + dsh_global.write("\t-H \"Content-Type: application/json\" \\\n") + dsh_global.write("\t-H \"mx-api-token: ${token}\" \\\n") + dsh_global.write("\t-d '" + str(json_object) + "' >> global_config.log\n\n") + dsh_global.write("echo -e \"\\n\" >> global_config.log\n\n") + dsh_global.write("echo \"Finished work on " + str(row_device['device_name']) + "\"") + + logging.debug("Creating 12H telemetry topic.") + jq_data = {} + jq_data["enable"] = False + jq_data["properties"] = [{"key": "deviceType", "value": "AC_GATEWAY"}, {"key": "cdid", "value": current_device}] + jq_data["outputTopic"] = "PCS_12H" + jq_data["sendOutThreshold"] = {"mode": "byTime", "size": int(4096), "time": int(43200), "sizeIdleTimer": {"enable": True, "time": int(60)}} + jq_data["minPublishInterval"] = int(0) + jq_data["samplingMode"] = "latestValues" + jq_data["customSamplingRate"] = False + jq_data["pollingInterval"] = int(43200) + jq_data["onChange"] = False + final_filter = row["jq_filter"].replace("***device_name***", current_device) + cmd_list = {} + jq_data["format"] = final_filter + jq_data["tags"]= {"modbus_tcp_master": cmd_list} + json_object = json.dumps(jq_data) + dsh.write("# [STEP 4] Creating " + filter + " 12H Azure telemetry topic\n\n") + dsh.write( + inspect.cleandoc( + """sudo curl -X POST https://127.0.0.1:8443/api/v1/azure-iotedge/messages \\ + -H "Content-Type: application/json" \\ + -H "mx-api-token:$(sudo cat /var/thingspro/data/mx-api-token)" \\ + -d """) + "'" + str(json_object) +"'" + """ -k | jq + \n""" + ) + dsh.write('printf "\\n \\n" >> data_shell_script.log\n') + dsh.write("\n\n") + + dsh_global.write("### Creating 12H Azure messages " + "" + " for " + str(row_device['device_name']) + "\n") + dsh_global.write("curl -s -X POST -k https://" + row_device['device_ip_address_http'] + ":8443/api/v1/azure-iotedge/messages \\\n") + dsh_global.write("\t-H \"Content-Type: application/json\" \\\n") + dsh_global.write("\t-H \"mx-api-token: ${token}\" \\\n") + dsh_global.write("\t-d '" + str(json_object) + "' >> global_config.log\n\n") + dsh_global.write("echo -e \"\\n\" >> global_config.log\n\n") + dsh_global.write("echo \"Finished work on " + str(row_device['device_name']) + "\"") + + STATIC_JQ_FILTER_EMISSIONDATE = "(now|todateiso8601)" def bitfield_jq_filter(current_device):