Upgrade to 1.5.2, tooling for VPN API requests
This commit is contained in:
@@ -1,83 +1,83 @@
|
||||
#!/usr/bin/env python3
|
||||
import asyncio
|
||||
import logging
|
||||
|
||||
import server_async
|
||||
import time
|
||||
import math
|
||||
|
||||
from pymodbus.payload import BinaryPayloadBuilder
|
||||
from pymodbus.constants import Endian
|
||||
|
||||
from pymodbus.datastore import (
|
||||
ModbusSequentialDataBlock,
|
||||
ModbusServerContext,
|
||||
ModbusSlaveContext,
|
||||
)
|
||||
|
||||
|
||||
_logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
async def updating_task(context):
|
||||
# parameters
|
||||
fc_as_hex = 3
|
||||
slave_id = 0x00
|
||||
address = 0x00
|
||||
count = 100
|
||||
|
||||
# initialization: set values to zero
|
||||
values = context[slave_id].getValues(fc_as_hex, address, count=count)
|
||||
values = [0 for v in values]
|
||||
|
||||
# infinite loop updating every register to its sine value (period 1 hour, amplitude according to register number)
|
||||
while True:
|
||||
await asyncio.sleep(1)
|
||||
current_time = time.time()
|
||||
sine_value = sin_wave(current_time)
|
||||
# BinaryPayloadBuilder is used to convert a float to two Modbus registers
|
||||
builder = BinaryPayloadBuilder(byteorder=Endian.LITTLE)
|
||||
for i in range(address, count):
|
||||
builder.add_32bit_float((i+1) * sine_value)
|
||||
payload = builder.to_registers()
|
||||
# once the registers of all the sine waves are created, update the values in the simulator
|
||||
context[slave_id].setValues(fc_as_hex, address, payload)
|
||||
|
||||
# sin wave value calculator
|
||||
period = 3600
|
||||
def sin_wave(time_elapsed):
|
||||
return math.sin(2 * math.pi * time_elapsed / period)
|
||||
|
||||
|
||||
# server start setup function
|
||||
def setup_updating_server(cmdline=None):
|
||||
# define the registers
|
||||
num_floats = 100
|
||||
float_values = [float(i + 0.5) for i in range(num_floats)]
|
||||
builder = BinaryPayloadBuilder(byteorder=Endian.LITTLE)
|
||||
for value in float_values:
|
||||
builder.add_32bit_float(value)
|
||||
payload = builder.to_registers()
|
||||
|
||||
# create a Modbus simulator with the previously generated registers
|
||||
datablock = ModbusSequentialDataBlock(0x01, payload)
|
||||
context = ModbusSlaveContext(di=datablock, co=datablock, hr=datablock, ir=datablock)
|
||||
context = ModbusServerContext(slaves=context, single=True)
|
||||
return server_async.setup_server(
|
||||
description="Run asynchronous server.", context=context, cmdline=cmdline
|
||||
)
|
||||
|
||||
# asynchronous function that will call the values updating function with the arguments (modbus registers layout) of the created server
|
||||
async def run_updating_server(args):
|
||||
task = asyncio.create_task(updating_task(args.context))
|
||||
await server_async.run_async_server(args) # start the server
|
||||
task.cancel()
|
||||
|
||||
# main function that will setup the server, start it and run the function what will update the values of the registers
|
||||
async def main(cmdline=None):
|
||||
run_args = setup_updating_server(cmdline=cmdline)
|
||||
await run_updating_server(run_args)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main(), debug=True)
|
||||
#!/usr/bin/env python3
|
||||
import asyncio
|
||||
import logging
|
||||
|
||||
import server_async
|
||||
import time
|
||||
import math
|
||||
|
||||
from pymodbus.payload import BinaryPayloadBuilder
|
||||
from pymodbus.constants import Endian
|
||||
|
||||
from pymodbus.datastore import (
|
||||
ModbusSequentialDataBlock,
|
||||
ModbusServerContext,
|
||||
ModbusSlaveContext,
|
||||
)
|
||||
|
||||
|
||||
_logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
async def updating_task(context):
|
||||
# parameters
|
||||
fc_as_hex = 3
|
||||
slave_id = 0x00
|
||||
address = 0x00
|
||||
count = 100
|
||||
|
||||
# initialization: set values to zero
|
||||
values = context[slave_id].getValues(fc_as_hex, address, count=count)
|
||||
values = [0 for v in values]
|
||||
|
||||
# infinite loop updating every register to its sine value (period 1 hour, amplitude according to register number)
|
||||
while True:
|
||||
await asyncio.sleep(1)
|
||||
current_time = time.time()
|
||||
sine_value = sin_wave(current_time)
|
||||
# BinaryPayloadBuilder is used to convert a float to two Modbus registers
|
||||
builder = BinaryPayloadBuilder(byteorder=Endian.LITTLE)
|
||||
for i in range(address, count):
|
||||
builder.add_32bit_float((i+1) * sine_value)
|
||||
payload = builder.to_registers()
|
||||
# once the registers of all the sine waves are created, update the values in the simulator
|
||||
context[slave_id].setValues(fc_as_hex, address, payload)
|
||||
|
||||
# sin wave value calculator
|
||||
period = 3600
|
||||
def sin_wave(time_elapsed):
|
||||
return math.sin(2 * math.pi * time_elapsed / period)
|
||||
|
||||
|
||||
# server start setup function
|
||||
def setup_updating_server(cmdline=None):
|
||||
# define the registers
|
||||
num_floats = 100
|
||||
float_values = [float(i + 0.5) for i in range(num_floats)]
|
||||
builder = BinaryPayloadBuilder(byteorder=Endian.LITTLE)
|
||||
for value in float_values:
|
||||
builder.add_32bit_float(value)
|
||||
payload = builder.to_registers()
|
||||
|
||||
# create a Modbus simulator with the previously generated registers
|
||||
datablock = ModbusSequentialDataBlock(0x01, payload)
|
||||
context = ModbusSlaveContext(di=datablock, co=datablock, hr=datablock, ir=datablock)
|
||||
context = ModbusServerContext(slaves=context, single=True)
|
||||
return server_async.setup_server(
|
||||
description="Run asynchronous server.", context=context, cmdline=cmdline
|
||||
)
|
||||
|
||||
# asynchronous function that will call the values updating function with the arguments (modbus registers layout) of the created server
|
||||
async def run_updating_server(args):
|
||||
task = asyncio.create_task(updating_task(args.context))
|
||||
await server_async.run_async_server(args) # start the server
|
||||
task.cancel()
|
||||
|
||||
# main function that will setup the server, start it and run the function what will update the values of the registers
|
||||
async def main(cmdline=None):
|
||||
run_args = setup_updating_server(cmdline=cmdline)
|
||||
await run_updating_server(run_args)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main(), debug=True)
|
||||
|
||||
Reference in New Issue
Block a user