Creating a Modbus TCP Client and server in Python for Industrial Automation

Home » Blog » instrumentation » Creating a Modbus TCP Client and server in Python for Industrial Automation

Here’s a server program with support for writing holding registers and coils, as well as exception handling. The code will work with pymodbus 2.5.2

from pymodbus.server.asynchronous import StartTcpServer
from pymodbus.datastore import ModbusSequentialDataBlock, ModbusSlaveContext, ModbusServerContext
from pymodbus.transaction import ModbusRtuFramer
from pymodbus.client.sync import ModbusTcpClient
from pymodbus.exceptions import ModbusIOException, ParameterException

# Define the Modbus slave context
holding_registers = ModbusSequentialDataBlock(0, [0] * 10)
input_registers = ModbusSequentialDataBlock(0, [0] * 10)
coils = ModbusSequentialDataBlock(0, [False] * 10)
discrete_inputs = ModbusSequentialDataBlock(0, [False] * 10)
slave_context = ModbusSlaveContext(hr=holding_registers, ir=input_registers, co=coils, di=discrete_inputs)
context = ModbusServerContext(slaves=slave_context, single=False)

# Define the Modbus server and start it
def run_server():
    try:
        server = StartTcpServer(context, address=("localhost", 5020))
        print("Modbus server started")
        server.serve_forever()
    except Exception as e:
        print(f"Error starting server: {str(e)}")

# Read holding registers
def read_holding_registers(start_address, count):
    try:
        client = ModbusTcpClient('localhost', port=5020)
        response = client.read_holding_registers(start_address, count, unit=0x01)
        client.close()
        return response.registers
    except ModbusIOException as e:
        print(f"Modbus IO error: {str(e)}")
        return None
    except ParameterException as e:
        print(f"Parameter exception: {str(e)}")
        return None

# Write holding registers
def write_holding_registers(start_address, values):
    try:
        client = ModbusTcpClient('localhost', port=5020)
        response = client.write_registers(start_address, values, unit=0x01)
        client.close()
        return response
    except ModbusIOException as e:
        print(f"Modbus IO error: {str(e)}")
        return None
    except ParameterException as e:
        print(f"Parameter exception: {str(e)}")
        return None

# Read coils
def read_coils(start_address, count):
    try:
        client = ModbusTcpClient('localhost', port=5020)
        response = client.read_coils(start_address, count, unit=0x01)
        client.close()
        return response.bits
    except ModbusIOException as e:
        print(f"Modbus IO error: {str(e)}")
        return None
    except ParameterException as e:
        print(f"Parameter exception: {str(e)}")
        return None

# Write coils
def write_coils(start_address, values):
    try:
        client = ModbusTcpClient('localhost', port=5020)
        response = client.write_coils(start_address, values, unit=0x01)
        client.close()
        return response
    except ModbusIOException as e:
        print(f"Modbus IO error: {str(e)}")
        return None
    except ParameterException as e:
        print(f"Parameter exception: {str(e)}")
        return None

# Start the Modbus server in a new thread
import threading
server_thread = threading.Thread(target=run_server)
server_thread.start()

The above code is an example Python program that demonstrates how to create a Modbus TCP server with support for reading and writing holding registers and coils. The program uses the pymodbus library to define the Modbus slave context, which includes the holding registers, input registers, coils, and discrete inputs that the server will provide. The context is then passed to the StartTcpServer function to start the server and listen for incoming requests.

The program defines four functions for reading and writing data from the Modbus server: read_holding_registers, write_holding_registers, read_coils, and write_coils. Each of these functions uses the ModbusTcpClient class to establish a connection to the Modbus server and perform the specified read or write operation. The functions also include exception handling code to catch and handle any errors that may occur during the read or write operation.

Finally, the program starts the Modbus server in a new thread using the threading module to allow the program to continue running while the server is running in the background. This enables the program to read and write data from the server while still performing other tasks.

Here’s an example Python program for a Modbus TCP client that can get data from Modbus server

from pymodbus.client.sync import ModbusTcpClient
from pymodbus.exceptions import ModbusIOException, ParameterException

class ModbusClient:
    def __init__(self, host='localhost', port=5020):
        self.host = host
        self.port = port
        self.client = ModbusTcpClient(self.host, self.port)

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        self.client.close()

    def read_holding_registers(self, start_address, count):
        try:
            response = self.client.read_holding_registers(start_address, count, unit=0x01)
            return response.registers
        except ModbusIOException as e:
            print(f"Modbus IO error: {str(e)}")
            return None
        except ParameterException as e:
            print(f"Parameter exception: {str(e)}")
            return None

    def read_input_registers(self, start_address, count):
        try:
            response = self.client.read_input_registers(start_address, count, unit=0x01)
            return response.registers
        except ModbusIOException as e:
            print(f"Modbus IO error: {str(e)}")
            return None
        except ParameterException as e:
            print(f"Parameter exception: {str(e)}")
            return None

    def read_coils(self, start_address, count):
        try:
            response = self.client.read_coils(start_address, count, unit=0x01)
            return response.bits
        except ModbusIOException as e:
            print(f"Modbus IO error: {str(e)}")
            return None
        except ParameterException as e:
            print(f"Parameter exception: {str(e)}")
            return None

    def read_discrete_inputs(self, start_address, count):
        try:
            response = self.client.read_discrete_inputs(start_address, count, unit=0x01)
            return response.bits
        except ModbusIOException as e:
            print(f"Modbus IO error: {str(e)}")
            return None
        except ParameterException as e:
            print(f"Parameter exception: {str(e)}")
            return None

This program defines a ModbusClient class that uses the ModbusTcpClient class from the pymodbus library to read data from a Modbus TCP server. The __init__ method sets the host and port of the Modbus TCP server and creates a new Modbus TCP client.

The __enter__ and __exit__ methods are used to create a context manager for the Modbus TCP client. This allows the client to be used in a with block, ensuring that the client is properly closed after use.

The read_holding_registers, read_input_registers, read_coils, and read_discrete_inputs methods are used to read data from the Modbus TCP server using the ModbusTcpClient class. Each method takes a start address and count as arguments and returns the requested data as a list of integers or booleans. The methods also include exception handling code to catch and handle any errors that may occur during the read operation.

To use this library, simply import the ModbusClient class and create a new instance with the desired host and port parameters. Then call the appropriate method to read data from the Modbus TCP server. Here’s an example:

from modbus_client import ModbusClient

with ModbusClient() as client:
    # Read holding registers starting at address 0 with a count of 10
    holding_registers = client.read_holding_registers(0, 10)
    if holding_registers is not None:
        print(f"Holding registers: {holding_registers}")
    else:
        print("Error reading holding registers")

    # Read input registers starting at address 0 with a count of 10
    input_registers = client.read_input_registers(0, 10)
    if input_registers is not None:
        print(f"Input registers: {input_registers}")
    else:
        print("Error reading input registers")

    # Read coils starting at address 0 with a count of 10
    coils = client.read_coils(0, 10)
    if coils is not None:
        print(f"Coils: {coils}")
    else:
        print("Error reading coils")

    # Read discrete inputs starting at address 0 with a count of 10
    discrete_inputs = client.read_discrete_inputs(0, 10)
    if discrete_inputs is not None:
        print(f"Discrete inputs: {discrete_inputs}")
    else:
        print("Error reading discrete inputs")

In this example, we create a new instance of the ModbusClient class and use it in a with block to ensure that the client is properly closed after use. We then call the read_holding_registers, read_input_registers, read_coils, and read_discrete_inputs methods to read data from the Modbus TCP server, passing in the appropriate start address and count parameters.

For each read operation, we check if the returned data is None, indicating that there was an error during the read operation. If the data is not None, we print it to the console. If there was an error during the read operation, we print an error message to the console.

Leave a Comment

Your email address will not be published. Required fields are marked *