Skip to main content
simple_temp_controller.py
"""A minimal custom `Instrument`, a simulated temperature controller whose reading lags the commanded setpoint."""

import random
import time

from instro.lib import Command, Instrument, Measurement
from instro.lib.instrument import publish_command, publish_measurement


class SimpleTempController(Instrument):
    def __init__(self, name: str, **kwargs):
        super().__init__(name, **kwargs)
        self._temperature_c = 20.0  # current temperature (starts at room temp)
        self._setpoint_c = 20.0  # commanded target
        self.background_interval = 0.5
        self.add_background_daemon_function(self.read_temperature)

    def open(self) -> None:
        # Establish your device connection here (open a socket, VISA session, etc.).
        pass

    def close(self) -> None:
        # `super().close()` stops the daemon and closes attached publishers.
        # Tear down your device connection after that.
        super().close()

    @publish_measurement
    def read_temperature(self, **kwargs) -> Measurement:
        # Each tick: move 20% of the way toward the setpoint, plus a little noise.
        # That lag is the "physics", enough to make the command visibly do work.
        drift = (self._setpoint_c - self._temperature_c) * 0.2
        self._temperature_c += drift + random.uniform(-0.1, 0.1)
        return self._package_measurement("temperature_c", self._temperature_c, time.time_ns(), **kwargs)

    @publish_command
    def set_target_temperature(self, value: float, **kwargs) -> Command:
        self._setpoint_c = value
        return self._package_command("setpoint_c.cmd", value, time.time_ns(), **kwargs)


def main() -> None:
    # `with` calls open() on entry and close() on exit. close() stops the daemon
    # and closes publishers even if an exception escapes the block.
    with SimpleTempController(name="controller") as controller:
        controller.start()

        for target in (25, 50, 75):
            controller.set_target_temperature(target)
            time.sleep(5)


if __name__ == "__main__":
    main()