Here’s a server program with support for writing holding registers and coils, as well as exception handling. The code will work with pymodbus 2.5.2
from pymodbus.server.asynchronous import StartTcpServer
from pymodbus.datastore import ModbusSequentialDataBlock, ModbusSlaveContext, ModbusServerContext
from pymodbus.transaction import ModbusRtuFramer
from pymodbus.client.sync import ModbusTcpClient
from pymodbus.exceptions import ModbusIOException, ParameterException
# Define the Modbus slave context
holding_registers = ModbusSequentialDataBlock(0, [0] * 10)
input_registers = ModbusSequentialDataBlock(0, [0] * 10)
coils = ModbusSequentialDataBlock(0, [False] * 10)
discrete_inputs = ModbusSequentialDataBlock(0, [False] * 10)
slave_context = ModbusSlaveContext(hr=holding_registers, ir=input_registers, co=coils, di=discrete_inputs)
context = ModbusServerContext(slaves=slave_context, single=False)
# Define the Modbus server and start it
def run_server():
try:
server = StartTcpServer(context, address=("localhost", 5020))
print("Modbus server started")
server.serve_forever()
except Exception as e:
print(f"Error starting server: {str(e)}")
# Read holding registers
def read_holding_registers(start_address, count):
try:
client = ModbusTcpClient('localhost', port=5020)
response = client.read_holding_registers(start_address, count, unit=0x01)
client.close()
return response.registers
except ModbusIOException as e:
print(f"Modbus IO error: {str(e)}")
return None
except ParameterException as e:
print(f"Parameter exception: {str(e)}")
return None
# Write holding registers
def write_holding_registers(start_address, values):
try:
client = ModbusTcpClient('localhost', port=5020)
response = client.write_registers(start_address, values, unit=0x01)
client.close()
return response
except ModbusIOException as e:
print(f"Modbus IO error: {str(e)}")
return None
except ParameterException as e:
print(f"Parameter exception: {str(e)}")
return None
# Read coils
def read_coils(start_address, count):
try:
client = ModbusTcpClient('localhost', port=5020)
response = client.read_coils(start_address, count, unit=0x01)
client.close()
return response.bits
except ModbusIOException as e:
print(f"Modbus IO error: {str(e)}")
return None
except ParameterException as e:
print(f"Parameter exception: {str(e)}")
return None
# Write coils
def write_coils(start_address, values):
try:
client = ModbusTcpClient('localhost', port=5020)
response = client.write_coils(start_address, values, unit=0x01)
client.close()
return response
except ModbusIOException as e:
print(f"Modbus IO error: {str(e)}")
return None
except ParameterException as e:
print(f"Parameter exception: {str(e)}")
return None
# Start the Modbus server in a new thread
import threading
server_thread = threading.Thread(target=run_server)
server_thread.start()
The above code is an example Python program that demonstrates how to create a Modbus TCP server with support for reading and writing holding registers and coils. The program uses the pymodbus
library to define the Modbus slave context, which includes the holding registers, input registers, coils, and discrete inputs that the server will provide. The context is then passed to the StartTcpServer
function to start the server and listen for incoming requests.
The program defines four functions for reading and writing data from the Modbus server: read_holding_registers
, write_holding_registers
, read_coils
, and write_coils
. Each of these functions uses the ModbusTcpClient
class to establish a connection to the Modbus server and perform the specified read or write operation. The functions also include exception handling code to catch and handle any errors that may occur during the read or write operation.
Finally, the program starts the Modbus server in a new thread using the threading
module to allow the program to continue running while the server is running in the background. This enables the program to read and write data from the server while still performing other tasks.
Here’s an example Python program for a Modbus TCP client that can get data from Modbus server
from pymodbus.client.sync import ModbusTcpClient
from pymodbus.exceptions import ModbusIOException, ParameterException
class ModbusClient:
def __init__(self, host='localhost', port=5020):
self.host = host
self.port = port
self.client = ModbusTcpClient(self.host, self.port)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.client.close()
def read_holding_registers(self, start_address, count):
try:
response = self.client.read_holding_registers(start_address, count, unit=0x01)
return response.registers
except ModbusIOException as e:
print(f"Modbus IO error: {str(e)}")
return None
except ParameterException as e:
print(f"Parameter exception: {str(e)}")
return None
def read_input_registers(self, start_address, count):
try:
response = self.client.read_input_registers(start_address, count, unit=0x01)
return response.registers
except ModbusIOException as e:
print(f"Modbus IO error: {str(e)}")
return None
except ParameterException as e:
print(f"Parameter exception: {str(e)}")
return None
def read_coils(self, start_address, count):
try:
response = self.client.read_coils(start_address, count, unit=0x01)
return response.bits
except ModbusIOException as e:
print(f"Modbus IO error: {str(e)}")
return None
except ParameterException as e:
print(f"Parameter exception: {str(e)}")
return None
def read_discrete_inputs(self, start_address, count):
try:
response = self.client.read_discrete_inputs(start_address, count, unit=0x01)
return response.bits
except ModbusIOException as e:
print(f"Modbus IO error: {str(e)}")
return None
except ParameterException as e:
print(f"Parameter exception: {str(e)}")
return None
This program defines a ModbusClient
class that uses the ModbusTcpClient
class from the pymodbus
library to read data from a Modbus TCP server. The __init__
method sets the host and port of the Modbus TCP server and creates a new Modbus TCP client.
The __enter__
and __exit__
methods are used to create a context manager for the Modbus TCP client. This allows the client to be used in a with
block, ensuring that the client is properly closed after use.
The read_holding_registers
, read_input_registers
, read_coils
, and read_discrete_inputs
methods are used to read data from the Modbus TCP server using the ModbusTcpClient
class. Each method takes a start address and count as arguments and returns the requested data as a list of integers or booleans. The methods also include exception handling code to catch and handle any errors that may occur during the read operation.
To use this library, simply import the ModbusClient
class and create a new instance with the desired host and port parameters. Then call the appropriate method to read data from the Modbus TCP server. Here’s an example:
from modbus_client import ModbusClient
with ModbusClient() as client:
# Read holding registers starting at address 0 with a count of 10
holding_registers = client.read_holding_registers(0, 10)
if holding_registers is not None:
print(f"Holding registers: {holding_registers}")
else:
print("Error reading holding registers")
# Read input registers starting at address 0 with a count of 10
input_registers = client.read_input_registers(0, 10)
if input_registers is not None:
print(f"Input registers: {input_registers}")
else:
print("Error reading input registers")
# Read coils starting at address 0 with a count of 10
coils = client.read_coils(0, 10)
if coils is not None:
print(f"Coils: {coils}")
else:
print("Error reading coils")
# Read discrete inputs starting at address 0 with a count of 10
discrete_inputs = client.read_discrete_inputs(0, 10)
if discrete_inputs is not None:
print(f"Discrete inputs: {discrete_inputs}")
else:
print("Error reading discrete inputs")
In this example, we create a new instance of the ModbusClient
class and use it in a with
block to ensure that the client is properly closed after use. We then call the read_holding_registers
, read_input_registers
, read_coils
, and read_discrete_inputs
methods to read data from the Modbus TCP server, passing in the appropriate start address and count parameters.
For each read operation, we check if the returned data is None
, indicating that there was an error during the read operation. If the data is not None
, we print it to the console. If there was an error during the read operation, we print an error message to the console.