# This file was extracted from the HV SDK Docusaurus examples. # It is intended as a downloadable, runnable companion to the documentation. # Set HSI_EXAMPLE_BASE_DIR and related env vars to use your own data. # Source page: /hsi/hv_sdk/examples/streaming#operation-based-streamed-classification # region: setup import os from pathlib import Path import joblib import numpy as np import qtec_hv_sdk as hs from qtec_hv_sdk.preprocessing import make_reference from qtec_hv_sdk.preprocessing import reflectance_calibration from qtec_hv_sdk.util import operation try: import cv2 except ImportError as exc: raise SystemExit("Install opencv-python to run this live preview example.") from exc BASE_DIR = Path(os.environ.get("HSI_EXAMPLE_BASE_DIR", "/path/to/HSI_data/nuts")) if not BASE_DIR.exists(): raise SystemExit( "Run: 'export HSI_EXAMPLE_BASE_DIR=/path/to/HSI_data/' to setup the " "folder containing the example datacubes." ) TEST_CUBE = os.environ.get("HSI_EXAMPLE_TEST_CUBE", "mix2.pam") DARK_REF = os.environ.get("HSI_EXAMPLE_DARK_REF", "dark_ref.pam") WHITE_REF = os.environ.get("HSI_EXAMPLE_WHITE_REF", "white_ref.pam") CLASSIFIER_MODEL_PATH = Path(os.environ.get("HSI_EXAMPLE_CLASSIFIER_MODEL", "pixel_classifier.joblib")) N_LINES = int(os.environ.get("HSI_EXAMPLE_N_LINES", "300")) VISUAL_BAND = int(os.environ.get("HSI_EXAMPLE_VISUAL_BAND", "164")) PREVIEW_MIN = float(os.environ.get("HSI_EXAMPLE_PREVIEW_MIN", "0.0")) PREVIEW_MAX = float(os.environ.get("HSI_EXAMPLE_PREVIEW_MAX", "0.7")) HOLD_PREVIEW = os.environ.get("HSI_EXAMPLE_HOLD_PREVIEW", "1") == "1" CAMERA_WAVELENGTH_START_NM = float( os.environ.get("HSI_EXAMPLE_WAVELENGTH_START_NM", "430") ) CAMERA_WAVELENGTH_END_NM = float( os.environ.get("HSI_EXAMPLE_WAVELENGTH_END_NM", "1700") ) def add_camera_wavelengths(img): # Example PAM cubes may not include wavelengths; attach them so the # simulated camera exposes the same wavelength metadata as real hardware. if img.meta.spectral.wavelengths is not None: return img meta = img.meta meta.spectral = hs.SpectralMeta( wavelengths=hs.WavelengthMeta( np.linspace( CAMERA_WAVELENGTH_START_NM, CAMERA_WAVELENGTH_END_NM, img.meta.shape.bands, ), hs.WavelengthUnit.Nanometer, ) ) return img.with_meta(meta) def open_test_cube(): return add_camera_wavelengths(hs.open(str(BASE_DIR / TEST_CUBE))) def make_references(): dark = hs.open(str(BASE_DIR / DARK_REF)) white = hs.open(str(BASE_DIR / WHITE_REF)) return make_reference(dark), make_reference(white) def load_classifier_model(): if not CLASSIFIER_MODEL_PATH.exists(): raise SystemExit( "Set HSI_EXAMPLE_CLASSIFIER_MODEL to the saved classifier .joblib file." ) return joblib.load(CLASSIFIER_MODEL_PATH) class NumericLabelClassifier: # Operation outputs are image data, so string labels must be mapped to # numeric class ids before being stacked into the streamed output line. def __init__(self, clf): self.clf = clf self.classes_ = np.array(clf.classes_) self.class_to_id_ = { class_name: class_id for class_id, class_name in enumerate(self.classes_) } def predict(self, pixels): labels = self.clf.predict(pixels) return np.array([self.class_to_id_[label] for label in labels], dtype=np.uint8) def make_color_map(n_classes): color_map = np.zeros((n_classes, 3), dtype=np.uint8) for class_id in range(n_classes): hue = int(179 * class_id / max(n_classes, 1)) hsv = np.array([[[hue, 220, 255]]], dtype=np.uint8) color_map[class_id] = cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR)[0, 0] return color_map class RollingPreview: def __init__(self, n_lines, n_samples, color_map, visual_range): self.rows = np.zeros((n_lines, n_samples, 3), dtype=np.uint8) self.color_map = color_map self.visual_range = visual_range self.row = 0 self.visual_min = np.inf self.visual_max = -np.inf def append(self, class_ids, visual): visual = np.asarray(visual, dtype=np.float32) finite = visual[np.isfinite(visual)] if finite.size: self.visual_min = min(self.visual_min, float(finite.min())) self.visual_max = max(self.visual_max, float(finite.max())) lo, hi = self.visual_range if hi <= lo: gray = np.full(visual.shape, 80, dtype=np.uint8) else: gray = np.clip((visual - lo) / (hi - lo) * 255, 0, 255).astype(np.uint8) bgr = np.repeat(gray[:, None], 3, axis=1) class_ids = np.asarray(class_ids, dtype=int) colors = self.color_map[np.clip(class_ids, 0, len(self.color_map) - 1)] bgr = (0.45 * bgr + 0.55 * colors).astype(np.uint8) self.rows[self.row % len(self.rows)] = bgr self.row += 1 def image(self): if self.row < len(self.rows): return self.rows return np.roll(self.rows, -(self.row % len(self.rows)), axis=0) # end region # region: example model = NumericLabelClassifier(load_classifier_model()) color_map = make_color_map(len(model.classes_)) dark_ref, white_ref = make_references() dark_ref = dark_ref.to_interleave(hs.bil).resolve() white_ref = white_ref.to_interleave(hs.bil).resolve() # Simulated camera. Replace with hs.control.Camera("10.100.10.100") for # hardware, and keep the rest of the Image pipeline unchanged. test_img = open_test_cube() cam = hs.control.Camera(test_img) camera_img = cam.to_hs_image() camera_reflectance = reflectance_calibration( camera_img, white_ref, dark_ref, clip=True, ) # The saved classifier from the classification tutorial is trained on calibrated # reflectance spectra with all camera bands. Keep the same preprocessing here. preprocessed = camera_reflectance if VISUAL_BAND >= preprocessed.shape.bands: raise SystemExit( f"HSI_EXAMPLE_VISUAL_BAND={VISUAL_BAND} is outside the " f"preprocessed band range 0..{preprocessed.shape.bands - 1}." ) def make_line_classifier(classifier, visual_band): # Downstream may request only one output band, but classification needs all # input bands. Keep the full input band slice and forward the sample slice. @operation(hs.bil, slice_transform=lambda out_slice: (slice(None), out_slice[1])) def classify_line(meta, line, out_slice): # BIL plane order is (bands, samples). The model expects # (n_pixels, n_bands), so transpose the line before prediction. np.nan_to_num(line, copy=False) class_ids = classifier.predict(line.T.astype(np.float32)) visual = line[visual_band, :].astype(np.float32) output = np.stack([class_ids, visual], axis=0) return output[out_slice[0], :] return classify_line classified = make_line_classifier(model, VISUAL_BAND)(preprocessed) classified = classified[:N_LINES, :, :] preview = RollingPreview( n_lines=N_LINES, n_samples=classified.shape.samples, color_map=color_map, visual_range=(PREVIEW_MIN, PREVIEW_MAX), ) output_rows = [] try: with classified.stream() as stream: for meta, line in stream: if meta.dropped: print(f"Dropped frame {meta.seq} at {meta.timestamp}") continue line = np.asarray(line) output_rows.append(line.copy()) preview.append(line[0].astype(int), line[1].astype(np.float32)) cv2.imshow("Operation-based streamed classification", preview.image()) if cv2.waitKey(1) == 27: break finally: if HOLD_PREVIEW and preview.row: cv2.imshow("Operation-based streamed classification", preview.image()) cv2.waitKey(0) cv2.destroyAllWindows() if not output_rows: raise SystemExit("No lines were produced by the streamed classifier.") # The stream terminal drives the simulated scan once. It materializes the # compact two-band result, not the full spectral cube: # output[:, 0, :] = class ids # output[:, 1, :] = visual band output = np.stack(output_rows, axis=0) label_map = output[:, 0, :].astype(np.uint8) visual_band = output[:, 1, :] print(f"Classified {label_map.shape[0]} lines with {label_map.shape[1]} samples each.") print(f"Visual band range: {visual_band.min():.3f} to {visual_band.max():.3f}") print(f"Preview visual range: {preview.visual_min:.3f} to {preview.visual_max:.3f}") # end region