#!/usr/bin/env python3 import asyncio import logging import server_async import time import math from pymodbus.payload import BinaryPayloadBuilder from pymodbus.constants import Endian from pymodbus.datastore import ( ModbusSequentialDataBlock, ModbusServerContext, ModbusSlaveContext, ) _logger = logging.getLogger(__name__) async def updating_task(context): # parameters fc_as_hex = 3 slave_id = 0x00 address = 0x00 count = 100 # initialization: set values to zero values = context[slave_id].getValues(fc_as_hex, address, count=count) values = [0 for v in values] # infinite loop updating every register to its sine value (period 1 hour, amplitude according to register number) while True: await asyncio.sleep(1) current_time = time.time() sine_value = sin_wave(current_time) # BinaryPayloadBuilder is used to convert a float to two Modbus registers builder = BinaryPayloadBuilder(byteorder=Endian.LITTLE) for i in range(address, count): builder.add_32bit_float((i+1) * sine_value) payload = builder.to_registers() # once the registers of all the sine waves are created, update the values in the simulator context[slave_id].setValues(fc_as_hex, address, payload) # sin wave value calculator period = 3600 def sin_wave(time_elapsed): return math.sin(2 * math.pi * time_elapsed / period) # server start setup function def setup_updating_server(cmdline=None): # define the registers num_floats = 100 float_values = [float(i + 0.5) for i in range(num_floats)] builder = BinaryPayloadBuilder(byteorder=Endian.LITTLE) for value in float_values: builder.add_32bit_float(value) payload = builder.to_registers() # create a Modbus simulator with the previously generated registers datablock = ModbusSequentialDataBlock(0x01, payload) context = ModbusSlaveContext(di=datablock, co=datablock, hr=datablock, ir=datablock) context = ModbusServerContext(slaves=context, single=True) return server_async.setup_server( description="Run asynchronous server.", context=context, cmdline=cmdline ) # asynchronous function that will call the values updating function with the arguments (modbus registers layout) of the created server async def run_updating_server(args): task = asyncio.create_task(updating_task(args.context)) await server_async.run_async_server(args) # start the server task.cancel() # main function that will setup the server, start it and run the function what will update the values of the registers async def main(cmdline=None): run_args = setup_updating_server(cmdline=cmdline) await run_updating_server(run_args) if __name__ == "__main__": asyncio.run(main(), debug=True)