# This file was extracted from the HV SDK Docusaurus examples. # It is intended as a downloadable, runnable companion to the documentation. # Set HSI_EXAMPLE_BASE_DIR and related env vars to use your own data. # Source page: /hsi/hv_sdk/examples/streaming#visualize-streamed-classifier-output # region: setup import os from pathlib import Path import joblib import numpy as np import qtec_hv_sdk as hs from qtec_hv_sdk.preprocessing import make_reference from qtec_hv_sdk.preprocessing import reflectance_calibration from qtec_hv_sdk.util import predictor try: import cv2 except ImportError as exc: raise SystemExit("Install opencv-python to run this live preview example.") from exc BASE_DIR = Path(os.environ.get("HSI_EXAMPLE_BASE_DIR", "/path/to/HSI_data/nuts")) if not BASE_DIR.exists(): raise SystemExit( "Run: 'export HSI_EXAMPLE_BASE_DIR=/path/to/HSI_data/' to setup the " "folder containing the example datacubes." ) TEST_CUBE = os.environ.get("HSI_EXAMPLE_TEST_CUBE", "mix2.pam") DARK_REF = os.environ.get("HSI_EXAMPLE_DARK_REF", "dark_ref.pam") WHITE_REF = os.environ.get("HSI_EXAMPLE_WHITE_REF", "white_ref.pam") CLASSIFIER_MODEL_PATH = Path(os.environ.get("HSI_EXAMPLE_CLASSIFIER_MODEL", "pixel_classifier.joblib")) CAMERA_WAVELENGTH_START_NM = float( os.environ.get("HSI_EXAMPLE_WAVELENGTH_START_NM", "430") ) CAMERA_WAVELENGTH_END_NM = float( os.environ.get("HSI_EXAMPLE_WAVELENGTH_END_NM", "1700") ) def add_camera_wavelengths(img): # Example PAM cubes may not include wavelengths; attach them so the # simulated camera exposes the same wavelength metadata as real hardware. meta = img.meta meta.spectral = hs.SpectralMeta( wavelengths=hs.WavelengthMeta( np.linspace( CAMERA_WAVELENGTH_START_NM, CAMERA_WAVELENGTH_END_NM, img.meta.shape.bands, ), hs.WavelengthUnit.Nanometer, ) ) return img.with_meta(meta) def open_test_cube(): return add_camera_wavelengths(hs.open(str(BASE_DIR / TEST_CUBE))) def make_references(): dark = hs.open(str(BASE_DIR / DARK_REF)) white = hs.open(str(BASE_DIR / WHITE_REF)) return make_reference(dark), make_reference(white) def load_classifier_model(): if not CLASSIFIER_MODEL_PATH.exists(): raise SystemExit( "Set HSI_EXAMPLE_CLASSIFIER_MODEL to the saved classifier .joblib file." ) return joblib.load(CLASSIFIER_MODEL_PATH) def make_color_map(n_classes): color_map = np.zeros((n_classes, 3), dtype=np.uint8) for class_id in range(n_classes): hue = int(179 * class_id / max(n_classes, 1)) hsv = np.array([[[hue, 220, 255]]], dtype=np.uint8) color_map[class_id] = cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR)[0, 0] return color_map class NumericLabelClassifier: # hs.util.predictor() returns an Image, so the prediction output must be # numeric image data. This adapter maps class names to uint8 ids. def __init__(self, clf): self.clf = clf self.classes_ = np.array(clf.classes_) self.class_to_id_ = { class_name: class_id for class_id, class_name in enumerate(self.classes_) } def predict(self, pixels): labels = self.clf.predict(pixels) return np.array([self.class_to_id_[label] for label in labels], dtype=np.uint8) # end region # region: example numeric_clf = NumericLabelClassifier(load_classifier_model()) color_map = make_color_map(len(numeric_clf.classes_)) dark_ref, white_ref = make_references() # Simulated camera test_img = open_test_cube() cam = hs.control.Camera(test_img) camera_img = cam.to_hs_image() dark_ref = dark_ref.to_interleave(hs.bil).resolve() white_ref = white_ref.to_interleave(hs.bil).resolve() camera_reflectance = reflectance_calibration( camera_img, white_ref, dark_ref, clip=True, ) classified = predictor(numeric_clf)(camera_reflectance) preview_lines = 300 preview = np.zeros((preview_lines, classified.shape.samples, 3), dtype=np.uint8) row = 0 with classified.stream() as stream: for meta, labels_line in stream: if meta.dropped: print(f"Dropped frame {meta.seq} at {meta.timestamp}") continue labels_line = labels_line.astype(np.uint8).ravel() preview[row % preview_lines] = color_map[labels_line] display = np.roll(preview, -(row + 1), axis=0) cv2.imshow("Streamed classifier output", display) if cv2.waitKey(1) == 27: break row += 1 cv2.destroyAllWindows() # end region