84 lines
2.8 KiB
Python
84 lines
2.8 KiB
Python
#!/usr/bin/env python3
|
|
import asyncio
|
|
import logging
|
|
|
|
import server_async
|
|
import time
|
|
import math
|
|
|
|
from pymodbus.payload import BinaryPayloadBuilder
|
|
from pymodbus.constants import Endian
|
|
|
|
from pymodbus.datastore import (
|
|
ModbusSequentialDataBlock,
|
|
ModbusServerContext,
|
|
ModbusSlaveContext,
|
|
)
|
|
|
|
|
|
_logger = logging.getLogger(__name__)
|
|
|
|
|
|
async def updating_task(context):
|
|
# parameters
|
|
fc_as_hex = 3
|
|
slave_id = 0x00
|
|
address = 0x00
|
|
count = 100
|
|
|
|
# initialization: set values to zero
|
|
values = context[slave_id].getValues(fc_as_hex, address, count=count)
|
|
values = [0 for v in values]
|
|
|
|
# infinite loop updating every register to its sine value (period 1 hour, amplitude according to register number)
|
|
while True:
|
|
await asyncio.sleep(1)
|
|
current_time = time.time()
|
|
sine_value = sin_wave(current_time)
|
|
# BinaryPayloadBuilder is used to convert a float to two Modbus registers
|
|
builder = BinaryPayloadBuilder(byteorder=Endian.LITTLE)
|
|
for i in range(address, count):
|
|
builder.add_32bit_float((i+1) * sine_value)
|
|
payload = builder.to_registers()
|
|
# once the registers of all the sine waves are created, update the values in the simulator
|
|
context[slave_id].setValues(fc_as_hex, address, payload)
|
|
|
|
# sin wave value calculator
|
|
period = 3600
|
|
def sin_wave(time_elapsed):
|
|
return math.sin(2 * math.pi * time_elapsed / period)
|
|
|
|
|
|
# server start setup function
|
|
def setup_updating_server(cmdline=None):
|
|
# define the registers
|
|
num_floats = 100
|
|
float_values = [float(i + 0.5) for i in range(num_floats)]
|
|
builder = BinaryPayloadBuilder(byteorder=Endian.LITTLE)
|
|
for value in float_values:
|
|
builder.add_32bit_float(value)
|
|
payload = builder.to_registers()
|
|
|
|
# create a Modbus simulator with the previously generated registers
|
|
datablock = ModbusSequentialDataBlock(0x01, payload)
|
|
context = ModbusSlaveContext(di=datablock, co=datablock, hr=datablock, ir=datablock)
|
|
context = ModbusServerContext(slaves=context, single=True)
|
|
return server_async.setup_server(
|
|
description="Run asynchronous server.", context=context, cmdline=cmdline
|
|
)
|
|
|
|
# asynchronous function that will call the values updating function with the arguments (modbus registers layout) of the created server
|
|
async def run_updating_server(args):
|
|
task = asyncio.create_task(updating_task(args.context))
|
|
await server_async.run_async_server(args) # start the server
|
|
task.cancel()
|
|
|
|
# main function that will setup the server, start it and run the function what will update the values of the registers
|
|
async def main(cmdline=None):
|
|
run_args = setup_updating_server(cmdline=cmdline)
|
|
await run_updating_server(run_args)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main(), debug=True)
|