Streaming
Streaming workflows apply the same calibration, preprocessing, and model patterns line by line as data arrives from a camera or simulated camera.
Two streaming patterns appear across the examples below:
cam.stream()— direct frame loop on a bare camera. Each frame is an eager NumPy array. Use this when you need raw frame-by-frame control with no processing pipeline.img.stream()— frame loop on a processedImagepipeline. Build calibration, dtype conversion, band selection, or model prediction on top ofcam.to_hs_image()first, then stream the result. The processing is applied lazily to each frame as it arrives.
Use the workflows like this:
| Workflow | API | Data you receive | Use when |
|---|---|---|---|
| Finite cube capture | cam.to_hs_image()[:n_lines, :, :] | Lazy Image consumed by to_numpy(), resolve(), or hs.write() | You want a bounded datacube. |
| Direct frame loop | cam.stream() | Eager 2D NumPy frame plus metadata | You need raw frame-by-frame control or dropped-frame checks. |
| Processed line stream | cam.to_hs_image() plus SDK operations, then .stream() | Processed line output from the lazy pipeline | You want calibration, preprocessing, or model prediction line by line. |
| Custom operation pipeline | @operation on cam.to_hs_image() | Lazy Image with custom per-line output | You need custom model logic, compact multi-output lines, or preview/logging taps. |
| Simulated camera | hs.control.Camera(file_img) | Same APIs as camera examples | You want to test without camera hardware. |
| Lab scanner / simulated stage | hs.control.StageController(camera_ip) or hs.control.SimulatedStageController(file_img) | Lazy Image from to_hs_image() plus stage/lights control methods | You want to coordinate camera capture with scanner motion, or test that workflow without lab scanner hardware. |
See Choosing a camera workflow in the usage guide for more detail.
For live camera and lab scanner sources, terminal operations such as
hs.write(image, output_path), image.resolve(), and image.to_numpy()
trigger acquisition.
Use one terminal operation for each live capture.
If you write one lazy branch and resolve another lazy branch from the same
camera image, you may trigger two consecutive acquisitions.
Resolve once and reuse the resolved image when the capture fits in memory, or write once and reopen the saved file for lower-memory offline inspection.
For lab scanner stage-controller workflows, use hs.control.StageController
with hardware or hs.control.SimulatedStageController with a saved SDK image
while developing without hardware.
In examples that build a lazy pipeline from cam.to_hs_image(), the camera
object can usually be replaced by a stage controller object and the rest of the
pipeline kept the same.
Direct cam.stream() examples are different: use stage.to_hs_image().stream()
for a processed image stream, or stage.hs_camera.stream() when you
specifically need the underlying camera stream. The simulated controller
accepts motion and light commands as no-ops.
See the Lab Scanner interface section in the usage guide for the setup pattern.
- Direct camera streams:
cam.stream()is open-ended by default. A live camera runs until stopped, and a simulated camera backed by a file currently loops back to the first line after the last line. BSQ source cubes are converted to a line-first stream layout. This conversion is only needed for BSQ, because push-broom camera streaming requires lines as the first dimension; BIL and BIP are already line-first for this purpose. Usecam.stream(n_frames=...)when you want a bounded direct frame stream. - Processed
Imagestreams:cam.to_hs_image().stream()andprocessed.stream()stream the lazy SDK image pipeline. With a simulated camera backed by a finite file, the processed stream stops when the file is exhausted.
Both stream types are Python iterators, so examples can use
for meta, frame in stream: for normal iteration. If you request direct
camera frames manually with stream.get_frame(), a bounded direct camera
stream returns None when it is exhausted.
The downloadable .py files can be run without editing paths by setting
environment variables such as HSI_EXAMPLE_BASE_DIR. See
Running Downloaded Scripts
for the full list of supported overrides.
Camera Pipelines
A camera can also be used as an Image pipeline source with cam.to_hs_image().
This is useful when you want live camera data to pass through the same lazy SDK
operations as file-backed datacubes: calibration, dtype conversion, band
selection, preprocessing, or model prediction can be composed before any frames
are captured.
For real cameras, set exposure, framerate, spatial crop, and spectral bands
before creating the Image pipeline or starting a direct stream. The
downloadable script supports optional environment variables such as
HSI_EXAMPLE_EXPOSURE_US, HSI_EXAMPLE_FRAMERATE,
HSI_EXAMPLE_HORIZONTAL_CROP, and HSI_EXAMPLE_BANDS for this.
def configure_camera(cam):
if EXPOSURE_US:
cam.set_exposure(int(EXPOSURE_US))
if FRAMERATE:
cam.set_framerate(float(FRAMERATE))
if HORIZONTAL_CROP:
start, end = parse_range(HORIZONTAL_CROP)
cam.set_horizontal_crop(start, end)
if BANDS:
cam.set_bands([parse_range(part) for part in BANDS.split(";")])
# To use a real camera on the default ETH-B address:
# export HSI_EXAMPLE_CAMERA_IP=10.100.10.100
# When this variable is not set, the script uses a simulated camera backed by
# HSI_EXAMPLE_TEST_CUBE. That makes it possible to test the same pipeline
# without camera hardware.
def make_camera():
if CAMERA_IP:
cam = hs.control.Camera(CAMERA_IP)
configure_camera(cam)
return cam
test_img = open_test_cube()
return hs.control.Camera(test_img)
cam = make_camera()
wavelengths = np.array(cam.get_wavelengths())
band_650 = int(np.argmin(np.abs(wavelengths - 650.0)))
print(f"Closest band to 650 nm: {band_650}, wavelength={wavelengths[band_650]:.1f} nm")
# For real cameras, configure exposure, framerate, crop, and bands before
# creating the Image pipeline or starting a direct stream.
img = cam.to_hs_image()
processed = img.ensure_dtype(hs.float32) / 255.0
# The camera pipeline is executed when data is consumed.
# Slice the line dimension first; a live camera stream is otherwise open-ended.
first_lines = processed[:N_LINES, :, :].resolve()
first_band = first_lines[:, :, band_650]
print(f"{first_lines.shape=}")
print(f"{first_band.shape=}")
For Hypervision push-broom cameras, the streaming dimension is lines, so slice
the first dimension before resolving or exporting a finite capture.
Use cam.stream() when you want explicit frame-by-frame control; use
cam.to_hs_image() when you want to build a reusable lazy processing pipeline
and trigger capture only when the result is consumed. See the official
Image capture
guide for the underlying camera pipeline behavior.
Stream Individual Frames
The camera stream yields metadata and a frame. With a simulated camera, the
source is an existing datacube. With hardware, replace the simulated camera
creation with the camera address or configuration used in your setup. The
downloadable example requests frames manually and stops when get_frame()
returns None.
# Real camera
# cam = hs.control.Camera("10.100.10.100")
# Simulated camera
test_img = open_test_cube()
cam = hs.control.Camera(test_img)
with cam.stream(n_frames=10) as stream:
while True:
item = stream.get_frame()
if item is None:
break
meta, frame = item
if meta.dropped:
print(f"Dropped frame {meta.seq} at {meta.timestamp}")
continue
print(f"Frame {meta.seq}: shape={frame.shape}")
frame is one streamed line. For an HV-style push-broom stream, BIL is a good
layout because each frame naturally contains bands and samples for one line.
Calibrate Streamed Frames
To classify or analyze streamed data, calibrate each frame using the same dark and white references used offline.
dark_ref, white_ref = make_references()
# Simulated camera
test_img = open_test_cube()
cam = hs.control.Camera(test_img)
# Create a lazy Image pipeline from the camera stream.
img = cam.to_hs_image()
# Limit the stream to a finite number of frames for this example.
n_frames = 10
img = img[:n_frames, :, :]
# Prepare calibration references for the camera pipeline.
# References interleave must match camera output.
dark_ref = dark_ref.to_interleave(hs.bil).resolve()
white_ref = white_ref.to_interleave(hs.bil).resolve()
img = reflectance_calibration(img, white_ref, dark_ref, clip=True)
with img.stream() as stream:
for meta, calibrated in stream:
print(f"Frame {meta.seq}: calibrated frame shape={calibrated.shape}")
Stream With a Saved Classifier
This combines the previous pieces: load a model, calibrate each streamed frame, and classify the line. If the classifier was trained with string labels, wrap it so the SDK pipeline produces numeric image data.
numeric_clf = NumericLabelClassifier(load_classifier_model())
dark_ref, white_ref = make_references()
# Simulated camera
test_img = open_test_cube()
cam = hs.control.Camera(test_img)
# Create a lazy Image pipeline from the camera stream.
img = cam.to_hs_image()
# Limit the stream to a finite number of frames for this example.
n_frames = 10
img = img[:n_frames, :, :]
# Prepare calibration references for the camera pipeline.
# References interleave must match camera output.
dark_ref = dark_ref.to_interleave(hs.bil).resolve()
white_ref = white_ref.to_interleave(hs.bil).resolve()
img = reflectance_calibration(img, white_ref, dark_ref, clip=True)
# Setup a predictor for the camera pipeline
pred = predictor(numeric_clf)
img = pred(img)
with img.stream() as stream:
for meta, predicted in stream:
predicted = predicted.astype(np.uint8).ravel()
classes, counts = np.unique(predicted, return_counts=True)
print(f"Frame {meta.seq}: {dict(zip(classes, counts.astype(int)))}")
Stream With a Saved Regressor
The same camera pipeline pattern works for continuous predictions. Load the
saved regression model from the regression examples, build the same absorbance
preprocessing used during training, and stream one predicted value per spatial
pixel. This example uses the milk-fat dataset, so set HSI_EXAMPLE_BASE_DIR to
the folder containing the milk cube and matching dark/white references.
from qtec_hv_sdk.util import predictor
reg = load_regression_model()
# Real camera
# cam = hs.control.Camera("10.100.10.100")
# Simulated camera
test_img = add_camera_wavelengths(
hs.open(str(required_data_path(MILK_TEST_CUBE, "milk datacube")))
)
cam = hs.control.Camera(test_img)
dark_ref, white_ref = make_references()
# Prepare calibration references for the camera pipeline.
# References interleave must match camera output.
dark_ref = dark_ref.to_interleave(hs.bil).resolve()
white_ref = white_ref.to_interleave(hs.bil).resolve()
# Create a lazy Image pipeline from the camera stream.
camera_img = cam.to_hs_image()
# Limit the stream to a finite number of frames for this example.
n_frames = 10
camera_img = camera_img[:n_frames, :, :]
camera_reflectance = reflectance_calibration(
camera_img,
white_ref,
dark_ref,
clip=True,
)
camera_reflectance = camera_reflectance.ensure_dtype(hs.float32).clip(1e-6, 1.0)
camera_absorbance = camera_reflectance.ufunc(lambda meta, plane: -np.log10(np.clip(plane, 1e-6, 1.0)))
hs_regressor = predictor(reg)
camera_prediction = hs_regressor(camera_absorbance)
with camera_prediction.stream() as stream:
for meta, prediction_line in stream:
values = prediction_line.ravel()
print(
f"Frame {meta.seq}: "
f"{TARGET_PROPERTY} {values.min():.2f} to {values.max():.2f}"
)
Visualize Streamed Classifier Output
For demos and debugging, you can render predictions line by line into a rolling preview image. This example uses OpenCV to display classifier labels from a processed camera pipeline. For production systems, prefer a dedicated UI or viewer rather than blocking the processing loop with display code.
This script requires opencv-python and a graphical display. It is intentionally
separate from the headless streaming examples above.
numeric_clf = NumericLabelClassifier(load_classifier_model())
color_map = make_color_map(len(numeric_clf.classes_))
dark_ref, white_ref = make_references()
# Simulated camera
test_img = open_test_cube()
cam = hs.control.Camera(test_img)
camera_img = cam.to_hs_image()
dark_ref = dark_ref.to_interleave(hs.bil).resolve()
white_ref = white_ref.to_interleave(hs.bil).resolve()
camera_reflectance = reflectance_calibration(
camera_img,
white_ref,
dark_ref,
clip=True,
)
classified = predictor(numeric_clf)(camera_reflectance)
preview_lines = 300
preview = np.zeros((preview_lines, classified.shape.samples, 3), dtype=np.uint8)
row = 0
with classified.stream() as stream:
for meta, labels_line in stream:
if meta.dropped:
print(f"Dropped frame {meta.seq} at {meta.timestamp}")
continue
labels_line = labels_line.astype(np.uint8).ravel()
preview[row % preview_lines] = color_map[labels_line]
display = np.roll(preview, -(row + 1), axis=0)
cv2.imshow("Streamed classifier output", display)
if cv2.waitKey(1) == 27:
break
row += 1
cv2.destroyAllWindows()
Operation-Based Streamed Classification
The examples above use predictor() because the model prediction is the only
output needed from each streamed line. For many applications that is enough:
the model turns an input spectrum into one output value, and the SDK handles the
streaming.
When the prediction is only part of what should happen to each line, a custom
@operation workflow can be used instead.
It keeps the same lazy, camera-compatible Image -> Image workflow, but lets
you open the streamed line and write the NumPy code yourself.
That might mean cleaning values before prediction, combining several model
outputs, adding counters or diagnostic signals, or changing the shape of the
data that continues through the pipeline.
The operation in the example below uses that extra control for one concrete
purpose: it classifies each reflected line and returns a compact two-band line
with both class id and visual intensity, so the same streamed output can drive
a preview and be stored for later use. The slice_transform argument is used
because the operation may be asked for only one output band, but classification
still needs the full input spectrum for that line.
The terminal stream reads that compact output once, blends the class colors over
the visual band for an OpenCV preview, and stores the same compact rows for
later use. The preview uses a fixed visual range instead of stretching each line
independently, so low-signal background lines are not amplified into a noisy
image. The collected output is (lines, 2, samples), not the full spectral cube,
so the file, simulated camera, and hardware camera workflows stay close to each
other while still allowing live preview.
The classifier itself is an Image -> Image operation. The OpenCV display is
kept in the final stream loop, because lazy operations may be evaluated more
than once depending on how downstream data is consumed. This avoids duplicate
preview rows while still streaming only the compact operation output.
model = NumericLabelClassifier(load_classifier_model())
color_map = make_color_map(len(model.classes_))
dark_ref, white_ref = make_references()
dark_ref = dark_ref.to_interleave(hs.bil).resolve()
white_ref = white_ref.to_interleave(hs.bil).resolve()
# Simulated camera. Replace with hs.control.Camera("10.100.10.100") for
# hardware, and keep the rest of the Image pipeline unchanged.
test_img = open_test_cube()
cam = hs.control.Camera(test_img)
camera_img = cam.to_hs_image()
camera_reflectance = reflectance_calibration(
camera_img,
white_ref,
dark_ref,
clip=True,
)
# The saved classifier from the classification tutorial is trained on calibrated
# reflectance spectra with all camera bands. Keep the same preprocessing here.
preprocessed = camera_reflectance
if VISUAL_BAND >= preprocessed.shape.bands:
raise SystemExit(
f"HSI_EXAMPLE_VISUAL_BAND={VISUAL_BAND} is outside the "
f"preprocessed band range 0..{preprocessed.shape.bands - 1}."
)
def make_line_classifier(classifier, visual_band):
# Downstream may request only one output band, but classification needs all
# input bands. Keep the full input band slice and forward the sample slice.
@operation(hs.bil, slice_transform=lambda out_slice: (slice(None), out_slice[1]))
def classify_line(meta, line, out_slice):
# BIL plane order is (bands, samples). The model expects
# (n_pixels, n_bands), so transpose the line before prediction.
np.nan_to_num(line, copy=False)
class_ids = classifier.predict(line.T.astype(np.float32))
visual = line[visual_band, :].astype(np.float32)
output = np.stack([class_ids, visual], axis=0)
return output[out_slice[0], :]
return classify_line
classified = make_line_classifier(model, VISUAL_BAND)(preprocessed)
classified = classified[:N_LINES, :, :]
preview = RollingPreview(
n_lines=N_LINES,
n_samples=classified.shape.samples,
color_map=color_map,
visual_range=(PREVIEW_MIN, PREVIEW_MAX),
)
output_rows = []
try:
with classified.stream() as stream:
for meta, line in stream:
if meta.dropped:
print(f"Dropped frame {meta.seq} at {meta.timestamp}")
continue
line = np.asarray(line)
output_rows.append(line.copy())
preview.append(line[0].astype(int), line[1].astype(np.float32))
cv2.imshow("Operation-based streamed classification", preview.image())
if cv2.waitKey(1) == 27:
break
finally:
if HOLD_PREVIEW and preview.row:
cv2.imshow("Operation-based streamed classification", preview.image())
cv2.waitKey(0)
cv2.destroyAllWindows()
if not output_rows:
raise SystemExit("No lines were produced by the streamed classifier.")
# The stream terminal drives the simulated scan once. It materializes the
# compact two-band result, not the full spectral cube:
# output[:, 0, :] = class ids
# output[:, 1, :] = visual band
output = np.stack(output_rows, axis=0)
label_map = output[:, 0, :].astype(np.uint8)
visual_band = output[:, 1, :]
print(f"Classified {label_map.shape[0]} lines with {label_map.shape[1]} samples each.")
print(f"Visual band range: {visual_band.min():.3f} to {visual_band.max():.3f}")
print(f"Preview visual range: {preview.visual_min:.3f} to {preview.visual_max:.3f}")
Lab Scanner Stage Controller
Use hs.control.StageController when a lab scanner is available, or
hs.control.SimulatedStageController with a saved SDK image while developing
without hardware. The simulated controller accepts motion and light commands as
no-ops and exposes the same high-level to_hs_image() workflow as scanner
hardware.
With real hardware, consuming the lazy scanner image switches on lights, moves
the conveyor, captures lines, then returns the scanner to its initial state when
the stream is complete.
The same one-terminal-operation rule from Lazy Capture Triggers applies here.
This example also tags metadata-less sample files with a wavelength axis before
creating the simulated controller, so downstream camera code can use
scanner.hs_camera.get_wavelengths() just like it would with hardware.
# To use real lab scanner hardware:
# export HSI_EXAMPLE_STAGE_CONTROLLER_IP=10.100.10.100
# When this variable is not set, the script uses a simulated stage controller
# backed by HSI_EXAMPLE_TEST_CUBE.
scanner = make_stage_controller()
scanner.set_fps(FPS)
scanner.distance = DISTANCE_MM
scanner.oversampling = OVERSAMPLING
wavelengths = np.array(scanner.hs_camera.get_wavelengths())
band_650 = int(np.argmin(np.abs(wavelengths - 650.0)))
print(f"Closest band to 650 nm: {band_650}, wavelength={wavelengths[band_650]:.1f} nm")
image = scanner.to_hs_image()
datacube = image[:N_LINES, :, :]
band_image = datacube[:, :, band_650]
# Consuming the lazy image starts the scanner workflow. With real hardware,
# hs.write(), resolve(), and to_numpy() all trigger capture. Use one terminal
# operation for the live scan; this example writes the scan to disk.
hs.write(datacube, str(OUTPUT_PATH))
# This would trigger a second capture
# array = band_image.to_numpy()
print(f"Saved lab scanner capture to {OUTPUT_PATH} with shape: {datacube.shape}")
qamlib Capture With SDK Processing
When running directly on the camera device, prefer the normal SDK camera
interface with localhost / 127.0.0.1 if it works for your application. The
following qamlib example is a temporary workaround for cases where direct local
V4L2 capture is required today. Direct qamlib/V4L2 capture support inside the
SDK is planned for a future release, so this bridge should become unnecessary
for most workflows over time.
This example shows how to capture frames with qamlib and hand each one to the
HV SDK for calibration and classification. Unlike the hs.control.Camera examples,
qamlib controls the physical camera directly and returns eager NumPy frames, not
an SDK camera pipeline source. The example therefore wraps each line as a small
Image before applying SDK calibration or model inference. Dark and white
references are resolved once before the loop so per-frame calibration is cheap.
qamlib runs on the camera device. This script is intended to be executed
on the camera itself or on a machine with qamlib installed and a camera
connected.
Use HSI_EXAMPLE_QAMLIB_CROP to set the direct V4L2 crop as
left,top,width,height, for example 0,20,1296,900. Optional
HSI_EXAMPLE_DARK_REF, HSI_EXAMPLE_WHITE_REF, and
HSI_EXAMPLE_CLASSIFIER_MODEL values enable SDK calibration and classification
inside the processing function.
def make_line_processor():
dark_ref = None
white_ref = None
if DARK_REF and WHITE_REF:
dark_ref = make_reference(hs.open(str(data_path(DARK_REF)))).to_interleave(hs.bil).resolve()
white_ref = make_reference(hs.open(str(data_path(WHITE_REF)))).to_interleave(hs.bil).resolve()
hs_classifier = None
if CLASSIFIER_MODEL:
from joblib import load
hs_classifier = predictor(load(data_path(CLASSIFIER_MODEL)))
def process_frame(frame):
line = normalize_qamlib_frame(frame)
# qamlib returns a NumPy frame, not an SDK camera pipeline source.
# Wrapping each line is the temporary bridge until direct local
# capture can be used as an Image source inside the SDK.
img = hs.Image.from_numpy(line, hs.bil)
if dark_ref is not None and white_ref is not None:
img = reflectance_calibration(img, white_ref, dark_ref)
if hs_classifier is not None:
img = hs_classifier(img)
return img.to_numpy_with_interleave(hs.bip)[0]
return process_frame
# This example is intended for on-camera execution. qamlib captures directly
# from the local V4L2 device, while the HV SDK performs HSI processing.
cam = open_qamlib_camera()
process_frame = make_line_processor()
with cam:
for _ in range(N_LINES):
meta, frame = cam.get_frame(buffered=True)
if getattr(meta, "dropped", False):
print(f"Dropped frame {meta.sequence}")
continue
processed_line = process_frame(frame)
print(f"Frame {meta.sequence}: processed shape={processed_line.shape}")