Skip to main content
publish_custom.py
"""Example: Publishers: publish custom."""

import time

from instro.lib.types import Command, Measurement
from instro.psu import InstroPSU
from instro.psu.drivers import SimulatedPSU

VISA_RESOURCE = "TCPIP0::127.0.0.1::5025::SOCKET"


class PrintChannelDelegate:
    def __init__(self, channel_name: str):
        self.channel_name = channel_name

    def publish(self, data: Measurement | Command, **kwargs):
        """Handle the data - return True on success, False on failure."""
        try:
            if isinstance(data, Measurement):
                if data.channel_data.get(self.channel_name, None):
                    print(f"{self.channel_name}: {data.latest}")
        except Exception as e:
            print(f"Error publishing data: {e}")

    def close(self):
        pass


# Create instrument instances
psu = InstroPSU(name="myPSU", driver=SimulatedPSU(VISA_RESOURCE), num_channels=2)
psu.add_publisher(PrintChannelDelegate(channel_name="myPSU.ch2.current"))

try:
    # Set up initial state of test
    psu.open()

    psu.output_enable(False, channel=2)
    psu.set_current_limit(0.2, channel=2)
    psu.set_voltage(0, channel=2)

    psu.get_current(channel=2)
    psu.get_voltage(channel=2)

    # Start
    psu.output_enable(True, channel=2)

    for v in range(10):
        psu.set_voltage(v, channel=2)
        time.sleep(1)  # Simulate some delay
        psu.get_current(channel=2)  # This value will be printed
        psu.get_voltage(channel=2)  # This value will not be printed

    psu.output_enable(False, channel=2)

finally:
    # Shut it down
    psu.close()