Skip to main content

Streaming

Streaming workflows apply the same calibration, preprocessing, and model patterns line by line as data arrives from a camera or simulated camera.

Two streaming patterns appear across the examples below:

  • cam.stream() — direct frame loop on a bare camera. Each frame is an eager NumPy array. Use this when you need raw frame-by-frame control with no processing pipeline.
  • img.stream() — frame loop on a processed Image pipeline. Build calibration, dtype conversion, band selection, or model prediction on top of cam.to_hs_image() first, then stream the result. The processing is applied lazily to each frame as it arrives.

Use the workflows like this:

WorkflowAPIData you receiveUse when
Finite cube capturecam.to_hs_image()[:n_lines, :, :]Lazy Image consumed by to_numpy(), resolve(), or hs.write()You want a bounded datacube.
Direct frame loopcam.stream()Eager 2D NumPy frame plus metadataYou need raw frame-by-frame control or dropped-frame checks.
Processed line streamcam.to_hs_image() plus SDK operations, then .stream()Processed line output from the lazy pipelineYou want calibration, preprocessing, or model prediction line by line.
Custom operation pipeline@operation on cam.to_hs_image()Lazy Image with custom per-line outputYou need custom model logic, compact multi-output lines, or preview/logging taps.
Simulated camerahs.control.Camera(file_img)Same APIs as camera examplesYou want to test without camera hardware.
Lab scanner / simulated stagehs.control.StageController(camera_ip) or hs.control.SimulatedStageController(file_img)Lazy Image from to_hs_image() plus stage/lights control methodsYou want to coordinate camera capture with scanner motion, or test that workflow without lab scanner hardware.

See Choosing a camera workflow in the usage guide for more detail.

Lazy Capture Triggers

For live camera and lab scanner sources, terminal operations such as hs.write(image, output_path), image.resolve(), and image.to_numpy() trigger acquisition. Use one terminal operation for each live capture. If you write one lazy branch and resolve another lazy branch from the same camera image, you may trigger two consecutive acquisitions.

Resolve once and reuse the resolved image when the capture fits in memory, or write once and reopen the saved file for lower-memory offline inspection.

Lab Scanner Workflows

For lab scanner stage-controller workflows, use hs.control.StageController with hardware or hs.control.SimulatedStageController with a saved SDK image while developing without hardware.

In examples that build a lazy pipeline from cam.to_hs_image(), the camera object can usually be replaced by a stage controller object and the rest of the pipeline kept the same. Direct cam.stream() examples are different: use stage.to_hs_image().stream() for a processed image stream, or stage.hs_camera.stream() when you specifically need the underlying camera stream. The simulated controller accepts motion and light commands as no-ops.

See the Lab Scanner interface section in the usage guide for the setup pattern.

Stream lifetime
  • Direct camera streams: cam.stream() is open-ended by default. A live camera runs until stopped, and a simulated camera backed by a file currently loops back to the first line after the last line. BSQ source cubes are converted to a line-first stream layout. This conversion is only needed for BSQ, because push-broom camera streaming requires lines as the first dimension; BIL and BIP are already line-first for this purpose. Use cam.stream(n_frames=...) when you want a bounded direct frame stream.
  • Processed Image streams: cam.to_hs_image().stream() and processed.stream() stream the lazy SDK image pipeline. With a simulated camera backed by a finite file, the processed stream stops when the file is exhausted.

Both stream types are Python iterators, so examples can use for meta, frame in stream: for normal iteration. If you request direct camera frames manually with stream.get_frame(), a bounded direct camera stream returns None when it is exhausted.

Downloaded scripts

The downloadable .py files can be run without editing paths by setting environment variables such as HSI_EXAMPLE_BASE_DIR. See Running Downloaded Scripts for the full list of supported overrides.

Camera Pipelines

A camera can also be used as an Image pipeline source with cam.to_hs_image(). This is useful when you want live camera data to pass through the same lazy SDK operations as file-backed datacubes: calibration, dtype conversion, band selection, preprocessing, or model prediction can be composed before any frames are captured.

For real cameras, set exposure, framerate, spatial crop, and spectral bands before creating the Image pipeline or starting a direct stream. The downloadable script supports optional environment variables such as HSI_EXAMPLE_EXPOSURE_US, HSI_EXAMPLE_FRAMERATE, HSI_EXAMPLE_HORIZONTAL_CROP, and HSI_EXAMPLE_BANDS for this.

def configure_camera(cam):
if EXPOSURE_US:
cam.set_exposure(int(EXPOSURE_US))
if FRAMERATE:
cam.set_framerate(float(FRAMERATE))
if HORIZONTAL_CROP:
start, end = parse_range(HORIZONTAL_CROP)
cam.set_horizontal_crop(start, end)
if BANDS:
cam.set_bands([parse_range(part) for part in BANDS.split(";")])

# To use a real camera on the default ETH-B address:
# export HSI_EXAMPLE_CAMERA_IP=10.100.10.100
# When this variable is not set, the script uses a simulated camera backed by
# HSI_EXAMPLE_TEST_CUBE. That makes it possible to test the same pipeline
# without camera hardware.
def make_camera():
if CAMERA_IP:
cam = hs.control.Camera(CAMERA_IP)
configure_camera(cam)
return cam

test_img = open_test_cube()
return hs.control.Camera(test_img)


cam = make_camera()

wavelengths = np.array(cam.get_wavelengths())
band_650 = int(np.argmin(np.abs(wavelengths - 650.0)))
print(f"Closest band to 650 nm: {band_650}, wavelength={wavelengths[band_650]:.1f} nm")

# For real cameras, configure exposure, framerate, crop, and bands before
# creating the Image pipeline or starting a direct stream.
img = cam.to_hs_image()
processed = img.ensure_dtype(hs.float32) / 255.0

# The camera pipeline is executed when data is consumed.
# Slice the line dimension first; a live camera stream is otherwise open-ended.
first_lines = processed[:N_LINES, :, :].resolve()
first_band = first_lines[:, :, band_650]

print(f"{first_lines.shape=}")
print(f"{first_band.shape=}")

Download this script

info

For Hypervision push-broom cameras, the streaming dimension is lines, so slice the first dimension before resolving or exporting a finite capture.

Use cam.stream() when you want explicit frame-by-frame control; use cam.to_hs_image() when you want to build a reusable lazy processing pipeline and trigger capture only when the result is consumed. See the official Image capture guide for the underlying camera pipeline behavior.

Stream Individual Frames

The camera stream yields metadata and a frame. With a simulated camera, the source is an existing datacube. With hardware, replace the simulated camera creation with the camera address or configuration used in your setup. The downloadable example requests frames manually and stops when get_frame() returns None.

# Real camera
# cam = hs.control.Camera("10.100.10.100")

# Simulated camera
test_img = open_test_cube()
cam = hs.control.Camera(test_img)

with cam.stream(n_frames=10) as stream:
while True:
item = stream.get_frame()
if item is None:
break

meta, frame = item

if meta.dropped:
print(f"Dropped frame {meta.seq} at {meta.timestamp}")
continue

print(f"Frame {meta.seq}: shape={frame.shape}")

Download this script

info

frame is one streamed line. For an HV-style push-broom stream, BIL is a good layout because each frame naturally contains bands and samples for one line.

Calibrate Streamed Frames

To classify or analyze streamed data, calibrate each frame using the same dark and white references used offline.

dark_ref, white_ref = make_references()

# Simulated camera
test_img = open_test_cube()
cam = hs.control.Camera(test_img)

# Create a lazy Image pipeline from the camera stream.
img = cam.to_hs_image()

# Limit the stream to a finite number of frames for this example.
n_frames = 10
img = img[:n_frames, :, :]

# Prepare calibration references for the camera pipeline.
# References interleave must match camera output.
dark_ref = dark_ref.to_interleave(hs.bil).resolve()
white_ref = white_ref.to_interleave(hs.bil).resolve()
img = reflectance_calibration(img, white_ref, dark_ref, clip=True)

with img.stream() as stream:
for meta, calibrated in stream:
print(f"Frame {meta.seq}: calibrated frame shape={calibrated.shape}")

Download this script

Stream With a Saved Classifier

This combines the previous pieces: load a model, calibrate each streamed frame, and classify the line. If the classifier was trained with string labels, wrap it so the SDK pipeline produces numeric image data.

numeric_clf = NumericLabelClassifier(load_classifier_model())

dark_ref, white_ref = make_references()

# Simulated camera
test_img = open_test_cube()
cam = hs.control.Camera(test_img)

# Create a lazy Image pipeline from the camera stream.
img = cam.to_hs_image()

# Limit the stream to a finite number of frames for this example.
n_frames = 10
img = img[:n_frames, :, :]

# Prepare calibration references for the camera pipeline.
# References interleave must match camera output.
dark_ref = dark_ref.to_interleave(hs.bil).resolve()
white_ref = white_ref.to_interleave(hs.bil).resolve()
img = reflectance_calibration(img, white_ref, dark_ref, clip=True)

# Setup a predictor for the camera pipeline
pred = predictor(numeric_clf)
img = pred(img)

with img.stream() as stream:
for meta, predicted in stream:
predicted = predicted.astype(np.uint8).ravel()
classes, counts = np.unique(predicted, return_counts=True)
print(f"Frame {meta.seq}: {dict(zip(classes, counts.astype(int)))}")

Download this script

Stream With a Saved Regressor

The same camera pipeline pattern works for continuous predictions. Load the saved regression model from the regression examples, build the same absorbance preprocessing used during training, and stream one predicted value per spatial pixel. This example uses the milk-fat dataset, so set HSI_EXAMPLE_BASE_DIR to the folder containing the milk cube and matching dark/white references.

from qtec_hv_sdk.util import predictor


reg = load_regression_model()

# Real camera
# cam = hs.control.Camera("10.100.10.100")

# Simulated camera
test_img = add_camera_wavelengths(
hs.open(str(required_data_path(MILK_TEST_CUBE, "milk datacube")))
)
cam = hs.control.Camera(test_img)

dark_ref, white_ref = make_references()

# Prepare calibration references for the camera pipeline.
# References interleave must match camera output.
dark_ref = dark_ref.to_interleave(hs.bil).resolve()
white_ref = white_ref.to_interleave(hs.bil).resolve()

# Create a lazy Image pipeline from the camera stream.
camera_img = cam.to_hs_image()

# Limit the stream to a finite number of frames for this example.
n_frames = 10
camera_img = camera_img[:n_frames, :, :]
camera_reflectance = reflectance_calibration(
camera_img,
white_ref,
dark_ref,
clip=True,
)
camera_reflectance = camera_reflectance.ensure_dtype(hs.float32).clip(1e-6, 1.0)
camera_absorbance = camera_reflectance.ufunc(lambda meta, plane: -np.log10(np.clip(plane, 1e-6, 1.0)))

hs_regressor = predictor(reg)
camera_prediction = hs_regressor(camera_absorbance)

with camera_prediction.stream() as stream:
for meta, prediction_line in stream:
values = prediction_line.ravel()
print(
f"Frame {meta.seq}: "
f"{TARGET_PROPERTY} {values.min():.2f} to {values.max():.2f}"
)

Download this script

Visualize Streamed Classifier Output

For demos and debugging, you can render predictions line by line into a rolling preview image. This example uses OpenCV to display classifier labels from a processed camera pipeline. For production systems, prefer a dedicated UI or viewer rather than blocking the processing loop with display code.

OpenCV display

This script requires opencv-python and a graphical display. It is intentionally separate from the headless streaming examples above.

numeric_clf = NumericLabelClassifier(load_classifier_model())
color_map = make_color_map(len(numeric_clf.classes_))

dark_ref, white_ref = make_references()

# Simulated camera
test_img = open_test_cube()
cam = hs.control.Camera(test_img)

camera_img = cam.to_hs_image()
dark_ref = dark_ref.to_interleave(hs.bil).resolve()
white_ref = white_ref.to_interleave(hs.bil).resolve()
camera_reflectance = reflectance_calibration(
camera_img,
white_ref,
dark_ref,
clip=True,
)

classified = predictor(numeric_clf)(camera_reflectance)

preview_lines = 300
preview = np.zeros((preview_lines, classified.shape.samples, 3), dtype=np.uint8)
row = 0

with classified.stream() as stream:
for meta, labels_line in stream:
if meta.dropped:
print(f"Dropped frame {meta.seq} at {meta.timestamp}")
continue

labels_line = labels_line.astype(np.uint8).ravel()
preview[row % preview_lines] = color_map[labels_line]
display = np.roll(preview, -(row + 1), axis=0)

cv2.imshow("Streamed classifier output", display)
if cv2.waitKey(1) == 27:
break

row += 1

cv2.destroyAllWindows()

Download this script

Operation-Based Streamed Classification

The examples above use predictor() because the model prediction is the only output needed from each streamed line. For many applications that is enough: the model turns an input spectrum into one output value, and the SDK handles the streaming.

When the prediction is only part of what should happen to each line, a custom @operation workflow can be used instead. It keeps the same lazy, camera-compatible Image -> Image workflow, but lets you open the streamed line and write the NumPy code yourself. That might mean cleaning values before prediction, combining several model outputs, adding counters or diagnostic signals, or changing the shape of the data that continues through the pipeline.

The operation in the example below uses that extra control for one concrete purpose: it classifies each reflected line and returns a compact two-band line with both class id and visual intensity, so the same streamed output can drive a preview and be stored for later use. The slice_transform argument is used because the operation may be asked for only one output band, but classification still needs the full input spectrum for that line.

The terminal stream reads that compact output once, blends the class colors over the visual band for an OpenCV preview, and stores the same compact rows for later use. The preview uses a fixed visual range instead of stretching each line independently, so low-signal background lines are not amplified into a noisy image. The collected output is (lines, 2, samples), not the full spectral cube, so the file, simulated camera, and hardware camera workflows stay close to each other while still allowing live preview.

Keep side effects at the terminal

The classifier itself is an Image -> Image operation. The OpenCV display is kept in the final stream loop, because lazy operations may be evaluated more than once depending on how downstream data is consumed. This avoids duplicate preview rows while still streaming only the compact operation output.

model = NumericLabelClassifier(load_classifier_model())
color_map = make_color_map(len(model.classes_))

dark_ref, white_ref = make_references()
dark_ref = dark_ref.to_interleave(hs.bil).resolve()
white_ref = white_ref.to_interleave(hs.bil).resolve()

# Simulated camera. Replace with hs.control.Camera("10.100.10.100") for
# hardware, and keep the rest of the Image pipeline unchanged.
test_img = open_test_cube()
cam = hs.control.Camera(test_img)

camera_img = cam.to_hs_image()
camera_reflectance = reflectance_calibration(
camera_img,
white_ref,
dark_ref,
clip=True,
)


# The saved classifier from the classification tutorial is trained on calibrated
# reflectance spectra with all camera bands. Keep the same preprocessing here.
preprocessed = camera_reflectance
if VISUAL_BAND >= preprocessed.shape.bands:
raise SystemExit(
f"HSI_EXAMPLE_VISUAL_BAND={VISUAL_BAND} is outside the "
f"preprocessed band range 0..{preprocessed.shape.bands - 1}."
)


def make_line_classifier(classifier, visual_band):
# Downstream may request only one output band, but classification needs all
# input bands. Keep the full input band slice and forward the sample slice.
@operation(hs.bil, slice_transform=lambda out_slice: (slice(None), out_slice[1]))
def classify_line(meta, line, out_slice):
# BIL plane order is (bands, samples). The model expects
# (n_pixels, n_bands), so transpose the line before prediction.
np.nan_to_num(line, copy=False)
class_ids = classifier.predict(line.T.astype(np.float32))
visual = line[visual_band, :].astype(np.float32)
output = np.stack([class_ids, visual], axis=0)
return output[out_slice[0], :]

return classify_line


classified = make_line_classifier(model, VISUAL_BAND)(preprocessed)
classified = classified[:N_LINES, :, :]

preview = RollingPreview(
n_lines=N_LINES,
n_samples=classified.shape.samples,
color_map=color_map,
visual_range=(PREVIEW_MIN, PREVIEW_MAX),
)


output_rows = []
try:
with classified.stream() as stream:
for meta, line in stream:
if meta.dropped:
print(f"Dropped frame {meta.seq} at {meta.timestamp}")
continue

line = np.asarray(line)
output_rows.append(line.copy())
preview.append(line[0].astype(int), line[1].astype(np.float32))
cv2.imshow("Operation-based streamed classification", preview.image())
if cv2.waitKey(1) == 27:
break
finally:
if HOLD_PREVIEW and preview.row:
cv2.imshow("Operation-based streamed classification", preview.image())
cv2.waitKey(0)
cv2.destroyAllWindows()

if not output_rows:
raise SystemExit("No lines were produced by the streamed classifier.")

# The stream terminal drives the simulated scan once. It materializes the
# compact two-band result, not the full spectral cube:
# output[:, 0, :] = class ids
# output[:, 1, :] = visual band
output = np.stack(output_rows, axis=0)

label_map = output[:, 0, :].astype(np.uint8)
visual_band = output[:, 1, :]
print(f"Classified {label_map.shape[0]} lines with {label_map.shape[1]} samples each.")
print(f"Visual band range: {visual_band.min():.3f} to {visual_band.max():.3f}")
print(f"Preview visual range: {preview.visual_min:.3f} to {preview.visual_max:.3f}")

Download this script

Lab Scanner Stage Controller

Use hs.control.StageController when a lab scanner is available, or hs.control.SimulatedStageController with a saved SDK image while developing without hardware. The simulated controller accepts motion and light commands as no-ops and exposes the same high-level to_hs_image() workflow as scanner hardware.

Lab Scanner Capture Lifecycle

With real hardware, consuming the lazy scanner image switches on lights, moves the conveyor, captures lines, then returns the scanner to its initial state when the stream is complete. The same one-terminal-operation rule from Lazy Capture Triggers applies here.

Wavelength Metadata

This example also tags metadata-less sample files with a wavelength axis before creating the simulated controller, so downstream camera code can use scanner.hs_camera.get_wavelengths() just like it would with hardware.

# To use real lab scanner hardware:
# export HSI_EXAMPLE_STAGE_CONTROLLER_IP=10.100.10.100
# When this variable is not set, the script uses a simulated stage controller
# backed by HSI_EXAMPLE_TEST_CUBE.
scanner = make_stage_controller()

scanner.set_fps(FPS)
scanner.distance = DISTANCE_MM
scanner.oversampling = OVERSAMPLING

wavelengths = np.array(scanner.hs_camera.get_wavelengths())
band_650 = int(np.argmin(np.abs(wavelengths - 650.0)))
print(f"Closest band to 650 nm: {band_650}, wavelength={wavelengths[band_650]:.1f} nm")

image = scanner.to_hs_image()
datacube = image[:N_LINES, :, :]
band_image = datacube[:, :, band_650]

# Consuming the lazy image starts the scanner workflow. With real hardware,
# hs.write(), resolve(), and to_numpy() all trigger capture. Use one terminal
# operation for the live scan; this example writes the scan to disk.
hs.write(datacube, str(OUTPUT_PATH))

# This would trigger a second capture
# array = band_image.to_numpy()

print(f"Saved lab scanner capture to {OUTPUT_PATH} with shape: {datacube.shape}")

Download this script

qamlib Capture With SDK Processing

SDK (via localhost) vs qamlib

When running directly on the camera device, prefer the normal SDK camera interface with localhost / 127.0.0.1 if it works for your application. The following qamlib example is a temporary workaround for cases where direct local V4L2 capture is required today. Direct qamlib/V4L2 capture support inside the SDK is planned for a future release, so this bridge should become unnecessary for most workflows over time.

This example shows how to capture frames with qamlib and hand each one to the HV SDK for calibration and classification. Unlike the hs.control.Camera examples, qamlib controls the physical camera directly and returns eager NumPy frames, not an SDK camera pipeline source. The example therefore wraps each line as a small Image before applying SDK calibration or model inference. Dark and white references are resolved once before the loop so per-frame calibration is cheap.

Device requirement

qamlib runs on the camera device. This script is intended to be executed on the camera itself or on a machine with qamlib installed and a camera connected.

Use HSI_EXAMPLE_QAMLIB_CROP to set the direct V4L2 crop as left,top,width,height, for example 0,20,1296,900. Optional HSI_EXAMPLE_DARK_REF, HSI_EXAMPLE_WHITE_REF, and HSI_EXAMPLE_CLASSIFIER_MODEL values enable SDK calibration and classification inside the processing function.

def make_line_processor():
dark_ref = None
white_ref = None
if DARK_REF and WHITE_REF:
dark_ref = make_reference(hs.open(str(data_path(DARK_REF)))).to_interleave(hs.bil).resolve()
white_ref = make_reference(hs.open(str(data_path(WHITE_REF)))).to_interleave(hs.bil).resolve()

hs_classifier = None
if CLASSIFIER_MODEL:
from joblib import load

hs_classifier = predictor(load(data_path(CLASSIFIER_MODEL)))

def process_frame(frame):
line = normalize_qamlib_frame(frame)
# qamlib returns a NumPy frame, not an SDK camera pipeline source.
# Wrapping each line is the temporary bridge until direct local
# capture can be used as an Image source inside the SDK.
img = hs.Image.from_numpy(line, hs.bil)

if dark_ref is not None and white_ref is not None:
img = reflectance_calibration(img, white_ref, dark_ref)

if hs_classifier is not None:
img = hs_classifier(img)

return img.to_numpy_with_interleave(hs.bip)[0]

return process_frame


# This example is intended for on-camera execution. qamlib captures directly
# from the local V4L2 device, while the HV SDK performs HSI processing.
cam = open_qamlib_camera()
process_frame = make_line_processor()

with cam:
for _ in range(N_LINES):
meta, frame = cam.get_frame(buffered=True)

if getattr(meta, "dropped", False):
print(f"Dropped frame {meta.sequence}")
continue

processed_line = process_frame(frame)
print(f"Frame {meta.sequence}: processed shape={processed_line.shape}")

Download this script