Skip to main content
Publishers are the mechanism by which Nominal Instrumentation handles data from instruments. They define what happens to measurements and commands - whether that’s sending data to Nominal Core, streaming to Nominal Connect, writing to files, or implementing custom behavior. Every instrument can have multiple publishers attached, and data flows automatically: Instrument → Publishers → Destinations.

The Power of One Line

Here’s how simple it is to go from collecting data locally to streaming it to Nominal Core: Without a publisher:
from instro.daq.drivers.labjack import LabJackTSeriesDriver
from instro.daq import InstroDAQ

daq = InstroDAQ(
    name="myDAQ",
    driver=LabJackTSeriesDriver(device_id="440020473"),
)
# Data is collected but not persisted anywhere
With a publisher (just add one line):
from instro.daq.drivers.labjack import LabJackTSeriesDriver
from instro.daq import InstroDAQ
from instro.lib.publishers import NominalCorePublisher

daq = InstroDAQ(
    name="myDAQ",
    driver=LabJackTSeriesDriver(device_id="440020473"),
    publishers=[NominalCorePublisher(dataset_rid="ri.catalog.main.dataset.abc123...")],
)
# All data automatically streams to Nominal Core
By adding a NominalCorePublisher to the publishers list, every measurement and command is now recorded in your Nominal Core dataset.
Important Note about PublishersData is published as a direct result of an instrument method being called.For example, when you call InstroPSU.get_voltage(), this not only queries the instrument for the voltage but also causes all attached Publishers to publish the measurement response automatically.This makes it easy to ensure measurements and state changes are consistently recorded or streamed without explicit publishing logic in your workflow.

Publisher Protocol

All publishers implement a simple protocol with two required methods:
from typing import Protocol
from instro.lib.types import Measurement, Command

class Publisher(Protocol):
    def publish(self, data: Measurement | Command, **kwargs):
        """Called each time the instrument generates data"""
        ...

    def close(self):
        """Called when the instrument is closed"""
        ...

Data Types

  • Measurement: Contains data read from the instrument (e.g., analog input readings, voltage measurements). Includes channel data, timestamps, and optional tags.
  • Command: Represents a command sent to the instrument (e.g., setting voltage, enabling output). Includes channel data, a single timestamp, and optional tags.

Built-in Publishers

Nominal Instrumentation provides built-in publishers for sending data to Nominal’s platform and for writing to local files.

NominalCorePublisher

Sends measurement and command data to Nominal Core datasets for long-term storage, analysis, and collaboration. Constructor Parameters:
NominalCorePublisher(
    dataset_rid: str,              # Required: RID of your Nominal Core dataset
    batch_size: int | None = None, # Optional: Batch size before writing
    max_wait: timedelta | None = None,  # Optional: Max time before flush
    file_fallback: pathlib.Path | None = None,  # Optional: Path of fallback file (.avro) used if network connectivity is lost.
    profile: str | None = None,    # Optional: Nominal profile name
    api_key: str | None = None     # Optional: API key for authentication
)
Example:
from instro.daq.drivers.labjack import LabJackTSeriesDriver
from instro.daq import InstroDAQ
from instro.lib.publishers import NominalCorePublisher

DATASET_RID = "ri.catalog.main.dataset.abc123..."

daq = InstroDAQ(
    name="myDAQ",
    driver=LabJackTSeriesDriver(device_id="440020473"),
    publishers=[NominalCorePublisher(dataset_rid=DATASET_RID)],
)
The NominalCorePublisher includes built-in batching via the Nominal Core Python SDK, which optimizes upload performance automatically.

NominalConnectPublisher

Streams real-time data to Nominal Connect for live visualization and monitoring during tests that use the Nominal Connect Desktop Application. Constructor Parameters:
from connect_python import Client

NominalConnectPublisher(
    client: Client,     # Nominal Connect client instance
    stream_id: str      # Stream identifier for Connect
)
Example, from a typical Connect app:
from instro.daq.drivers.labjack import LabJackTSeriesDriver
from instro.daq import InstroDAQ
from instro.lib.publishers.nominal_connect import NominalConnectPublisher

import connect_python

@connect_python.main
def main(client: connect_python.Client):
    stream_id = "myDAQ-stream"

    daq = InstroDAQ(
        name="myDAQ",
        driver=LabJackTSeriesDriver(device_id="440020473"),
        publishers=[NominalConnectPublisher(client=connect_client, stream_id=stream_id)],
    )

FilePublisher

Writes measurements and commands to a local file in Avro, CSV, or JSON format. Useful for data capture in offline or air-gapped environments, for post-run upload to Nominal Core, and for inspecting raw data while debugging. When to use:
  • You’re running tests on a system without access to Nominal Core or Connect
  • You want a durable local copy of raw data for post-processing or later upload
  • You want to inspect measurements in a human-readable format while debugging
Constructor Parameters:
FilePublisher(
    directory: str | Path,                            # Directory where the file will be written
    format: Literal["json", "csv", "avro"] = "avro",  # Output file format
    custom_file_name: str | None = None,              # Optional base name (no extension)
)
If custom_file_name is not provided, the output file is named measurements-YYYY-MM-DD-HH-MM-SS.<format> using the time FilePublisher was constructed. Format tradeoffs:
  • avro (default): Compact binary format with snappy compression. Uses the same schema as Nominal Core ingest, so captured files can be uploaded after the fact without transformation. Recommended for production captures.
  • csv: One row per (timestamp, channel, value, tags) tuple. Good for quick inspection in spreadsheet tools.
  • json: Appends each publish to a JSON array. Convenient for debugging small runs, but not recommended for high-rate data, as the file is re-read and rewritten on every publish call.
Example:
from instro.daq.drivers.labjack import LabJackTSeriesDriver
from instro.daq import InstroDAQ
from instro.lib.publishers import FilePublisher

daq = InstroDAQ(
    name="myDAQ",
    driver=LabJackTSeriesDriver(device_id="440020473"),
    publishers=[FilePublisher(directory="./captures", format="avro")],
)
Pairing with other publishers: FilePublisher can be attached alongside a NominalCorePublisher to keep a local copy of data while streaming to the cloud:
from instro.daq.drivers.labjack import LabJackTSeriesDriver
from instro.lib.publishers import FilePublisher, NominalCorePublisher

daq = InstroDAQ(
    name="myDAQ",
    driver=LabJackTSeriesDriver(device_id="440020473"),
    publishers=[
        NominalCorePublisher(dataset_rid="ri.catalog.main.dataset.abc123..."),
        FilePublisher(directory="./captures", format="avro"),
    ],
)
If you only need a local file as a fallback for when network connectivity to Nominal Core is lost, use the file_fallback parameter on NominalCorePublisher instead of attaching a separate FilePublisher.

Attaching Publishers

During Creation (via publishers parameter)

from instro.daq.drivers.labjack import LabJackTSeriesDriver
from instro.daq import InstroDAQ
from instro.lib.publishers import NominalCorePublisher

daq = InstroDAQ(
    name="myDAQ",
    driver=LabJackTSeriesDriver(device_id="440020473"),
    publishers=[NominalCorePublisher(dataset_rid="ri.catalog...")],
)

After Creation

from instro.daq.drivers.labjack import LabJackTSeriesDriver
from instro.daq import InstroDAQ
from instro.lib.publishers import NominalCorePublisher

daq = InstroDAQ(name="myDAQ", driver=LabJackTSeriesDriver(device_id="440020473"))
daq.add_publisher(NominalCorePublisher(dataset_rid="ri.catalog..."))

Core Publisher Shorthand

A shorthand method to add publishing to Nominal Core is to directly pass in a dataset_rid kwarg. This requires a default profile to have been configured on the system.
from instro.daq.drivers.labjack import LabJackTSeriesDriver
from instro.daq import InstroDAQ

daq = InstroDAQ(
    name="myDAQ",
    driver=LabJackTSeriesDriver(device_id="440020473"),
    dataset_rid="ri.catalog...",  # Automatically creates NominalCorePublisher
)

Custom Publisher

To create a custom publisher, create a class that implements the following protocol. i.e. has a publish and a close method.
class Publisher(Protocol):
    def publish(self, data: Measurement | Command, **kwargs): ...

    def close(self): ...

Example

This publisher prints to the console every time a Measurement is published corresponding to a predefied name.
from instro.lib.types import Measurement, Command

class PrintChannelPublisher:
    def __init__(self, channel_name: str):
        self.channel_name = channel_name

    def publish(self, data: Measurement | Command, **kwargs):
        """Handle the data - return True on success, False on failure"""
        try:
            if isinstance(data, Measurement):
                if data.channel_data.get(self.channel_name, None):
                    print(f"{self.channel_name}: {data.latest}")
        except Exception as e:
            print(f"Error publishing data: {e}")

    def close(self):
        pass

Publisher Wrappers

Publisher wrappers modify the behavior of other publishers by adding buffering or asynchronous processing. They’re composable - wrap any publisher to add new capabilities.

BufferedPublisher

Collects data in memory and publishes in batches, reducing the overhead of frequent publish calls. When to use:
  • You’re making many small publish calls and want to reduce overhead
  • You want to batch data before sending to a remote service
  • Network or I/O latency is impacting performance
Parameters:
BasicBufferedPublisher(
    publisher: Publisher,    # The publisher to wrap
    buffer_size: int = 1000  # Flush when buffer reaches this size
)
Example:
import time
from instro.psu.drivers import SimulatedPSU
from instro.psu import InstroPSU
from instro.lib.publishers import BasicBufferedPublisher
from instro.lib.types import Measurement, Command

# Custom publisher that prints to console
class PrintChannelPublisher:
    def __init__(self, channel_name: str):
        self.channel_name = channel_name

    def publish(self, data: Measurement | Command, **kwargs):
        if isinstance(data, Measurement):
            if data.channel_data.get(self.channel_name, None):
                print(f"{self.channel_name}: {data.latest}")

    def close(self):
        print("Publisher closed")

# Wrap the custom publisher with buffering
print_publisher = PrintChannelPublisher(channel_name="myPSU.ch1.current")
buffered_publisher = BasicBufferedPublisher(print_publisher, buffer_size=5)

psu = InstroPSU(
    name="myPSU",
    driver=SimulatedPSU("TCPIP0::192.168.1.100::5025::SOCKET"),
    num_channels=2,
)
psu.add_publisher(buffered_publisher)

psu.open()
psu.output_enable(True, channel=1)

# Console output only appears every 5 measurements (when buffer flushes)
for v in range(12):
    psu.set_voltage(v * 0.1, channel=1)
    psu.get_current(channel=1)
    time.sleep(0.1)

psu.close()  # Remaining buffer is automatically flushed
The buffer is automatically flushed when it reaches capacity or when close() is called, ensuring no data is lost.

QueuedPublisher

Offloads publishing to a background thread, making publish calls non-blocking and preventing slow publishers from impacting instrument operations. When to use:
  • Publishing is slow (network latency, disk I/O) and blocking your test loop
  • You want instrument operations to proceed immediately without waiting for publish
  • You need guaranteed throughput for time-critical measurements
Parameters:
QueuedPublisher(
    publisher: Publisher,           # The publisher to wrap
    max_queue_size: int = 1000,     # Maximum queue size
    wait_for_queue: bool = False    # Wait for queue to empty on close
)
Example:
import time
from instro.psu.drivers import SimulatedPSU
from instro.psu import InstroPSU
from instro.lib.publishers import QueuedPublisher
from instro.lib.types import Measurement, Command

# Custom publisher with artificial delay to simulate slow I/O
class SlowPublisher:
    def publish(self, data: Measurement | Command, **kwargs):
        time.sleep(0.1)  # Simulate slow network or disk I/O
        print(f"Published {len(data.channel_data)} channels")

    def close(self):
        print("Publisher closed")

# Wrap the slow publisher with queuing
slow_publisher = SlowPublisher()
queued_publisher = QueuedPublisher(
    slow_publisher,
    max_queue_size=100,
    wait_for_queue=True  # Ensure all data is sent before closing
)

psu = InstroPSU(
    name="myPSU",
    driver=SimulatedPSU("TCPIP0::192.168.1.100::5025::SOCKET"),
    num_channels=2,
)
psu.add_publisher(queued_publisher)

psu.open()
psu.output_enable(True, channel=1)

# Publishing happens in background - no blocking!
for v in range(10):
    start = time.time()
    psu.set_voltage(v, channel=1)
    psu.get_current(channel=1)
    print(f"Loop iteration: {time.time() - start:.4f}s")  # Very fast!

psu.close()  # Waits for queue to empty before closing
Using QueuedPublisher can dramatically improve test performance when publishing is a bottleneck. The background thread handles all I/O while your test continues uninterrupted.