Publishers are the mechanism by which Nominal Instrumentation handles data from instruments. They define what happens to measurements and commands - whether that’s sending data to Nominal Core, streaming to Nominal Connect, writing to files, or implementing custom behavior.Every instrument can have multiple publishers attached, and data flows automatically: Instrument → Publishers → Destinations.
Here’s how simple it is to go from collecting data locally to streaming it to Nominal Core:Without a publisher:
from instro.daq.drivers.labjack import LabJackTSeriesDriverfrom instro.daq import InstroDAQdaq = InstroDAQ( name="myDAQ", driver=LabJackTSeriesDriver(device_id="440020473"),)# Data is collected but not persisted anywhere
With a publisher (just add one line):
from instro.daq.drivers.labjack import LabJackTSeriesDriverfrom instro.daq import InstroDAQfrom instro.lib.publishers import NominalCorePublisherdaq = InstroDAQ( name="myDAQ", driver=LabJackTSeriesDriver(device_id="440020473"), publishers=[NominalCorePublisher(dataset_rid="ri.catalog.main.dataset.abc123...")],)# All data automatically streams to Nominal Core
By adding a NominalCorePublisher to the publishers list, every measurement and command is now recorded in your Nominal Core dataset.
Important Note about PublishersData is published as a direct result of an instrument method being called.For example, when you call InstroPSU.get_voltage(), this not only queries the instrument for the voltage but also causes all attached Publishers to publish the measurement response automatically.This makes it easy to ensure measurements and state changes are consistently recorded or streamed without explicit publishing logic in your workflow.
All publishers implement a simple protocol with two required methods:
from typing import Protocolfrom instro.lib.types import Measurement, Commandclass Publisher(Protocol): def publish(self, data: Measurement | Command, **kwargs): """Called each time the instrument generates data""" ... def close(self): """Called when the instrument is closed""" ...
Measurement: Contains data read from the instrument (e.g., analog input readings, voltage measurements). Includes channel data, timestamps, and optional tags.
Command: Represents a command sent to the instrument (e.g., setting voltage, enabling output). Includes channel data, a single timestamp, and optional tags.
Streams real-time data to Nominal Connect for live visualization and monitoring during tests that use the Nominal Connect Desktop Application.Constructor Parameters:
from connect_python import ClientNominalConnectPublisher( client: Client, # Nominal Connect client instance stream_id: str # Stream identifier for Connect)
Writes measurements and commands to a local file in Avro, CSV, or JSON format. Useful for data capture in offline or air-gapped environments, for post-run upload to Nominal Core, and for inspecting raw data while debugging.When to use:
You’re running tests on a system without access to Nominal Core or Connect
You want a durable local copy of raw data for post-processing or later upload
You want to inspect measurements in a human-readable format while debugging
Constructor Parameters:
FilePublisher( directory: str | Path, # Directory where the file will be written format: Literal["json", "csv", "avro"] = "avro", # Output file format custom_file_name: str | None = None, # Optional base name (no extension))
If custom_file_name is not provided, the output file is named measurements-YYYY-MM-DD-HH-MM-SS.<format> using the time FilePublisher was constructed.Format tradeoffs:
avro (default): Compact binary format with snappy compression. Uses the same schema as Nominal Core ingest, so captured files can be uploaded after the fact without transformation. Recommended for production captures.
csv: One row per (timestamp, channel, value, tags) tuple. Good for quick inspection in spreadsheet tools.
json: Appends each publish to a JSON array. Convenient for debugging small runs, but not recommended for high-rate data, as the file is re-read and rewritten on every publish call.
Pairing with other publishers:FilePublisher can be attached alongside a NominalCorePublisher to keep a local copy of data while streaming to the cloud:
If you only need a local file as a fallback for when network connectivity to Nominal Core is lost, use the file_fallback parameter on NominalCorePublisher instead of attaching a separate FilePublisher.
A shorthand method to add publishing to Nominal Core is to directly pass in a dataset_rid kwarg. This requires a default profile to have been configured on the system.
Publisher wrappers modify the behavior of other publishers by adding buffering or asynchronous processing. They’re composable - wrap any publisher to add new capabilities.
Offloads publishing to a background thread, making publish calls non-blocking and preventing slow publishers from impacting instrument operations.When to use:
Publishing is slow (network latency, disk I/O) and blocking your test loop
You want instrument operations to proceed immediately without waiting for publish
You need guaranteed throughput for time-critical measurements
Parameters:
QueuedPublisher( publisher: Publisher, # The publisher to wrap max_queue_size: int = 1000, # Maximum queue size wait_for_queue: bool = False # Wait for queue to empty on close)
Example:
import timefrom instro.psu.drivers import SimulatedPSUfrom instro.psu import InstroPSUfrom instro.lib.publishers import QueuedPublisherfrom instro.lib.types import Measurement, Command# Custom publisher with artificial delay to simulate slow I/Oclass SlowPublisher: def publish(self, data: Measurement | Command, **kwargs): time.sleep(0.1) # Simulate slow network or disk I/O print(f"Published {len(data.channel_data)} channels") def close(self): print("Publisher closed")# Wrap the slow publisher with queuingslow_publisher = SlowPublisher()queued_publisher = QueuedPublisher( slow_publisher, max_queue_size=100, wait_for_queue=True # Ensure all data is sent before closing)psu = InstroPSU( name="myPSU", driver=SimulatedPSU("TCPIP0::192.168.1.100::5025::SOCKET"), num_channels=2,)psu.add_publisher(queued_publisher)psu.open()psu.output_enable(True, channel=1)# Publishing happens in background - no blocking!for v in range(10): start = time.time() psu.set_voltage(v, channel=1) psu.get_current(channel=1) print(f"Loop iteration: {time.time() - start:.4f}s") # Very fast!psu.close() # Waits for queue to empty before closing
Using QueuedPublisher can dramatically improve test performance when publishing is a bottleneck. The background thread handles all I/O while your test continues uninterrupted.