Initial commit

This commit is contained in:
Quentin WEPHRE
2024-06-18 14:59:46 +02:00
parent 13549fe46e
commit fe82b3962f
23 changed files with 1832 additions and 1 deletions

144
Python/danish_D1_api.py Normal file
View File

@@ -0,0 +1,144 @@
import datetime
import time
import pandas as pd
import requests
from urllib3.exceptions import InsecureRequestWarning
import jq
import json
# Function to authenticate and get token
def authenticate(device_ip, payload):
auth_url = f"https://{device_ip}:8443/api/v1/auth"
response = requests.post(auth_url, json=payload, verify=False)
if response.status_code == 200:
token = response.json()["data"]["token"]
#print(f"Authentication successful. Token received: {token}")
print(" authenticated!")
return token
else:
print(f"Authentication failed. Status code: {response.status_code}")
return None
# Function to send PATCH request
def send_patch_request(device_ip, token, connection_string):
headers = {
"mx-api-token": token
}
payload = {
"provisioning": {
"source": "manual",
"connectionString": connection_string,
"enable": True
}
}
patch_url = f"https://{device_ip}:8443/api/v1/azure-iotedge"
response = requests.patch(patch_url, json=payload, headers=headers, verify=False)
if response.status_code == 200:
print(f"PATCH request successful for device {device_ip}")
else:
print(f"Failed to send PATCH request to device {device_ip}. Status code: {response.status_code}")
# Function to send UPGRADE request
def send_upgrade_request(device_ip, token, upgrade_url):
headers = {
"mx-api-token": token
}
payload = {
"deleteFileAfterInstallComplete": True,
"install": True,
"url": upgrade_url
}
patch_url = f"https://{device_ip}:8443/api/v1/upgrades"
response = requests.post(patch_url, json=payload, headers=headers, verify=False)
if response.status_code == 200:
print(f"POST request successful for device {device_ip}")
else:
print(f"Failed to send POST request to device {device_ip}. Status code: {response.status_code}")
print(response.content)
# Function to send UPGRADE request
def get_upgrade_jobs(device_ip, token):
headers = {
"mx-api-token": token
}
payload = {
}
patch_url = f"https://{device_ip}:8443/api/v1/upgrades/7"
response = requests.get(patch_url, json=payload, headers=headers, verify=False)
if response.status_code == 200:
#print(f"GET request successful for device {device_ip}")
json_data = json.loads(response.content.decode())
json_str = json.dumps(json_data, indent=4)
#print(json_str)
print(jq.compile('.data.tasks[0].progress').input(json.loads(json_str)).first(), end="% speed: ")
print(jq.compile('.data.tasks[0].speed').input(json.loads(json_str)).first())
else:
print(f"Failed to send GET request to device {device_ip}. Status code: {response.status_code}")
print(response.content)
# Function to send UPGRADE request
def get_API_1(device_ip, token):
headers = {
"mx-api-token": token
}
payload = {
}
patch_url = f"https://{device_ip}:8443/api/v1/azure-iotedge/messages"
response = requests.get(patch_url, json=payload, headers=headers, verify=False)
if response.status_code == 200:
#print(f"GET request successful for device {device_ip}")
#json_data = json.loads(response.content.decode())
#json_str = json.dumps(json_data)
#print(jq.compile('.data.completedAt').input(json.loads(json_str)).first())
parsed = json.loads(response.content.decode())
print(json.dumps(parsed, indent=4))
else:
print(f"Failed to send GET request to device {device_ip}. Status code: {response.status_code}")
print(response.content.decode())
# Read the Excel file
# df = pd.read_excel("")
# df = df[df["device_name"].notnull()]
# Iterate over each row in the DataFrame
requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning)
payload_auth = {
"acceptEULA": True,
"name": "",
"password": ""
}
if payload_auth["name"] == "" or payload_auth["password"] == "":
print("Provide the credentials before running the script!")
exit(10)
ip_address = "10.197.35.116"
token = authenticate(ip_address, payload_auth)
upgrade_url = ""#"https://files.thingsprocloud.com/package/moxa-aig-301-series-includes-security-patch-firmware-v1.0.deb.yaml"
if upgrade_url == "":
print("Provide upgrade URL before running the script!")
exit(12)
if token:
while True:
print(datetime.datetime.now().strftime("%H:%M:%S"), end=" ")
get_upgrade_jobs(ip_address, token)
time.sleep(60)
# for index, row in df.iterrows():
# device_name = row['device_name']
# device_ip_address_https = row['device_ip_address_http']
# connection_string = row['connection_string']
# upgrade_url = "https://files.thingsprocloud.com/package/moxa-aig-301-series-includes-security-patch-firmware-v1.0.deb.yaml"
# # Authenticate and get token
# payload_auth = {
# "acceptEULA": True,
# "name": "",
# "password": ""
# }
# print(device_name, end="")
# token = authenticate(device_ip_address_https, payload_auth)
# if token:
# get_API(device_ip_address_https, token)
# print("\n")