Skip to main content

Python

Deprecated

These are legacy examples from before the HV SDK was developed.

We strongly recommend using the SDKs dedicated functions instead.

Reading PAM datacubes

See PAM for more details on the file format itself.

import numpy as np

def read_pam_file(file_path, chan_rmv=None):
try:
# Open the .pam file for reading
with open(file_path, "rb") as file:
# Function to parse the PAM header until the "ENDHDR" marker
def parse_pam_header(file):
header = {}
while True:
line = file.readline().decode("utf-8").strip()
if not line:
break
if line == "ENDHDR":
break
parts = line.split(" ", 1)
if len(parts) == 2:
key, value = parts
header[key] = value
elif len(parts) == 1:
# Handle lines with no key (just a value)
header[line] = None
return header

# Parse the PAM header until "ENDHDR"
header = parse_pam_header(file)

# Get image properties from the header
width = int(header["WIDTH"]) # Actual width of image
channels = int(header["DEPTH"]) # Corresponds to the number of spectral channels
maxval = int(header["MAXVAL"])
height = int(header["HEIGHT"]) # Corresponds to the number of scan lines
interleave = header.get("TUPLTYPE")

# Read the binary data with the number of channels determined by depth
data = np.fromfile(file, dtype=np.uint8, count=width * height * channels)

# image_data = data.reshape(width, height, channels, order='F') # Fortran-style
if interleave == "BIL":
image_data = data.reshape(height, width, channels, order='C') # C-style
else: #BSQ
image_data = data.reshape(channels, height, width, order='C') # C-style
print("Shape of cube: ", image_data.shape)

if chan_rmv != None:
image_data = image_data[chan_rmv:-chan_rmv]
return image_data

except FileNotFoundError:
print(f"File not found: {file_path}")
return None
except Exception as e:
print(f"An error occurred: {e}")
return None
PAM interleave

The above example expects PAM files captured using the Buteo Hyperspectral Imaging System which have a BIL or BSQ interleave type.

Note that the PAM format native interleave is BIP.

Writing PNM files and PAM datacubes

See PAM for more details on the file format itself.

from datetime import datetime
import sys

# PNM/PAM
def needs_byteswap(frame):
# 8-bit
if frame.itemsize == 1:
return False
# Big Endian
if frame.dtype.byteorder == ">":
return False
# Big Endian native byte order
if frame.dtype.byteorder == "=" and sys.byteorder == "big":
return False
return True

def write_pnm(name, frame):
with open(name, "wb") as f:
height, width, channels = frame.shape

if channels == 1:
type = "P5"
else:
type = "P6"

if frame.nbytes == width * height * channels:
if frame.dtype == "uint8":
max_val = 255
else:
max_val = 65535
else:
max_val = 65535

pnm_header = f"{type} {width} {height} {max_val}\n"
f.write(bytearray(pnm_header, "ascii"))

if needs_byteswap(frame):
# swap data in memory in order to properly write PNM
# since PPM is Big Endian by definition
# Note that if the operation is done in place
# it will mess up the endianess when reading single values out
# so in this case use: frame = frame.byteswap(True).newbyteorder()
# in order to keep the correct byte order
frame.byteswap().tofile(f)
else:
frame.tofile(f)

# write PAM header for BIL interleave (Band-interleaved-by-line)
def write_pam_header(name, frame, gain, exposure, n_imgs):
with open(name, "wb") as f:
width, channels, height = frame.shape
if frame.nbytes == width * height * channels:
if frame.dtype == "uint8":
max_val = 255
else:
max_val = 65535
else:
max_val = 65535
height = n_imgs

# Get today's date and time
now = datetime.now()
date_str = now.strftime("%Y-%m-%d")
time_str = now.strftime("%H:%M:%S")

pam_header = (
f"P7\n"
f"#Date: {date_str}\n"
f"#Time: {time_str}\n"
f"#Gain: {gain}\n"
f"#Exposure: {exposure}\n"
f"WIDTH {width}\n"
f"HEIGHT {height}\n"
f"DEPTH {channels}\n"
f"MAXVAL {max_val}\n"
f"TUPLTYPE BIL\n"
f"ENDHDR\n"
)
f.write(bytearray(pam_header, "ascii"))

def append_pam_image(name, frame):
with open(name, "ab") as f:
if needs_byteswap(frame):
# Swap data in memory in order to properly write PAM
# since PAM is Big Endian by definition
# Note that if the operation is done in place
# it will mess up the endianess when reading single values out
# so in this case use: frame = frame.byteswap(True).newbyteorder()
# in order to keep the correct byte order
frame.byteswap().tofile(f)
else:
frame.tofile(f)
PAM interleave

The above example generates PAM which have a BIL interleave type.

Note that the PAM format native interleave is BIP.

Reflectance transformation

def refl_trans(fl, white_avg, dark_avg, clip=False, nan_replacement=None): #  
"""Performs reflectance transformation on a hyperspectral image cube"""

# Return reflectance cube
cube = read_pam_file(fl)
cube = (cube - dark_avg) / (white_avg - dark_avg)

# <<The transpose gives the correct orientation of the real image with format of (H, W, C)
# assuming the Buteo conveyor belt moves toward the right>>
cube = np.transpose(cube, axes = (2, 1, 0))[::-1,::-1,:]

# handle values out of range
if clip:
cube = cube.clip(0.0, 1.0)

# handle NaN because of possible division by zero on (white - dark)
if nan_replacement is not None:
cube = cube.nan_to_num(nan_replacement)

return cube

Capturing datacubes

Deprecated

This is a legacy example from before the HV SDK had a camera interface available.

We strongly recommend using the SDKs dedicated hypervision camera API instead.

info
  • Requires qamlib and should therefore be ran inside the camera (over ssh or with a screen and keyboard).
  • This examples assumes a belt running at a constant speed
  • This example generates datacubes with BIL interleave type
  • The datacube files are saved to /tmp (RAM) and will therefore be gone after a reboot
    • copy them over to a different machine using the scp command or WinSCP
      • from the host PC: scp root@<cam_ip>:/tmp/<file_name> <destination>
        • for example: scp root@10.100.10.100:/tmp/_HSI_* .
    • Note that saving directly to the camera CFAST (for permanent storage) might be too slow (slow IO operations)
      • consider connecting a SSD to the camera via the USB ports if fast permanent storage is necessary
    • Also note that the camera has 4GB of RAM, therefore there will be a limit to how many images can be saved
      • For example the full datacube bellow (with 1000 images) takes 1.1GB of space
import qamlib
import time
import math
import numpy as np
from datetime import datetime
import sys


def current_datetime_filename():
return datetime.now().strftime("%Y-%m-%d_%H-%M-%S")

SAVE_PAM = True
SAVE_PNM = False
pam_filename = f"/tmp/_HSI_{current_datetime_filename()}.pam"

# Adjust these as necessary
N_IMGS = 1000
EXP_TIME = 900 #us
GAIN = 1.0 #x
LEFT = 0
WIDTH = 1296
FIRST_BAND = 430.0 #nm
LAST_BAND = 1700.0 #nm
MIN_FPS = 10
MM_PER_PX = 0.35 #mm/px
BELT_SPEED = 100 #mm/s
#PX_SIZE*HEIGHT/FOCAL_LENGTH = BELT_WIDTH/1280

# converts gain from dB to x
def dB2x(dB):
return math.pow(10, dB/20.0)

def x2dB(x):
return 20.0*math.log10(x)

# spectograph
START_WL = 430.0 #nm
END_WL = 1700.0 #nm
CALIB_HEIGHT = 920
CALIB_TOP = 20
NM2LINE = (END_WL - START_WL)/CALIB_HEIGHT
SLIT_SIZE = 20.0 #um
PX_SIZE = 5.0 #um

# calculate framerate based on belt_speed (in mm/s)
def calc_fps(belt_speed, force_aspect_ratio=True):
# 1 to 1 sampling
if not force_aspect_ratio:
return 1/((SLIT_SIZE/PX_SIZE)*MM_PER_PX/belt_speed)
# aspect ratio of the resulting image is equal in the two spatial directions
else:
return 1/(MM_PER_PX/belt_speed)

# Wavelength to line number conversion
def band2line(band):
return round(CALIB_TOP + (band - START_WL)/NM2LINE)

# Line number to wavelength conversion
def line2band(line):
return (line - CALIB_TOP)*NM2LINE + START_WL

TOP = band2line(FIRST_BAND)
BOTTOM = band2line(LAST_BAND)
HEIGHT = BOTTOM - TOP


# Program start
try:
#opens camera at '/dev/qtec/video0'
cam = qamlib.Camera()

#cam.set_format("Y16") #16-bit
#cam.set_control("Sensor Bit Mode", 2) #12-bit
cam.set_format("GREY")
cam.set_control("Exposure Time, Absolute", EXP_TIME)
cam.set_control("Gain", round(x2dB(GAIN)*1000)) #mili dB
# binning: (adjust crop if binning is used)
#cam.set_resolution(WIDTH, HEIGHT/2)
#cam.set_control("Vertical Binning", 2)
cam.set_crop(LEFT, TOP, WIDTH, HEIGHT)

fps_limits = cam.get_framerates()
min_fps = max(fps_limits.min, MIN_FPS)
fps = calc_fps(BELT_SPEED)
if fps > fps_limits.max:
print(f"Warning required fps ({fps}) is over max: {fps_limits.max}")
fps = fps_limits.max
if fps < min_fps:
print(f"Warning required fps ({fps}) is under min: {min_fps}")
fps = min_fps
cam.set_framerate(fps)

# Read camera controls/parameters
img_format = cam.get_format()
px_format = img_format.pixelformat
exposure = cam.get_control("Exposure Time, Absolute")
gain = cam.get_control("Gain")
h_bin = cam.get_control('Horizontal Binning')
v_bin = cam.get_control('Vertical Binning')

print(f"FPS: {cam.get_framerate()} [{fps_limits}]")
print(f"Frame Size: {cam.get_resolution()}")
print(f"Crop: {cam.get_crop()}")
print(f"Binning: {h_bin}x{v_bin}")
print(f"Pixel format: {img_format}")
print(f"Exposure: {exposure}")
print(f"Gain: {dB2x(gain/1000)}")
print("\n")

except Exception as e:
template = "An exception of type {0} occurred. Arguments:\n{1!r}"
message = template.format(type(e).__name__, e.args)
print(message)
exit(-1)

# Start and stop streaming with context manager
print("Starting frame capture")
try:
with cam:
if SAVE_PAM:
meta, frame = cam.get_frame(buffered=True)
write_pam_header(pam_filename, frame, gain, exposure, N_IMGS)

#while True:
for i in range(N_IMGS):
meta, frame = cam.get_frame(buffered=True)

#print(f"{meta.sequence=}")

# save datacube
if SAVE_PAM:
append_pam_image(pam_filename, frame)

# save individual images
if SAVE_PNM:
write_pnm(f"/tmp/_HSI_{i}.pnm", frame)

except Exception as e:
template = "An exception of type {0} occurred. Arguments:\n{1!r}"
message = template.format(type(e).__name__, e.args)
print(message)

print("Done with frame capture")

Encoder read-out

Read-out of the encoder input in order to adjust the framerate to the belt speed.

# encoder
TICKS_PER_ROUND = 2048
SHAFT_RAD = 25 #mm
MM_PER_TICK = SHAFT_RAD*(2*math.pi)/TICKS_PER_ROUND

# Adjust camera settings/controls as before
...

# Start and stop streaming with context manager
print("Starting frame capture")
try:
with cam:
ticks_sec = None
speed_mm_per_sec = None
last_time = None
last_pos = None
time_diff = 0
pos_diff = 0
acc_time = 0
acc_pos = 0

if SAVE_PAM:
meta, frame = cam.get_frame(buffered=True)
write_pam_header(pam_filename, frame, gain, exposure, N_IMGS)

#while True:
for i in range(N_IMGS):
meta, frame = cam.get_frame(buffered=True)
enc_pos = cam.get_ext_control('Encoder Position').value
now = time.time()

#print(f"{meta.sequence=}")

#encoder
if last_time:
time_diff = now - last_time
if last_pos:
pos_diff = enc_pos - last_pos
if pos_diff < 0:
pos_diff += np.iinfo(enc_pos.dtype).max
acc_pos += pos_diff
acc_time += time_diff

# calc speed once a second
if acc_time >= 1.0:
ticks_sec = acc_pos/acc_time
speed_mm_per_sec = ticks_sec * MM_PER_TICK
shaft_rad = (MM_PER_TICK * TICKS_PER_ROUND)/(2*3.14)
print(f"{ticks_sec=} {speed_mm_per_sec=} {shaft_rad=}")
acc_time = 0
acc_pos = 0

# adjust fps
if speed_mm_per_sec > 1:
fps = calc_fps(speed_mm_per_sec)
if fps > fps_limits.max:
print(f"Warning required fps ({fps}) is over max: {fps_limits.max}")
fps = fps_limits.max
if fps < min_fps:
print(f"Warning required fps ({fps}) is under min: {min_fps}")
fps = min_fps
cam.set_framerate(fps)
print(f"New FPS: {cam.get_framerate()} [{fps_limits}]")

last_time = now
last_pos = enc_pos

#print(f"{enc_pos=} {pos_diff=} {time_diff=} {ticks_sec=} {speed_mm_per_sec=}")

# save datacube
if SAVE_PAM:
append_pam_image(pam_filename, frame)

# save individual images
if SAVE_PNM:
write_pnm(f"/tmp/_HSI_{i}.pnm", frame)

except Exception as e:
template = "An exception of type {0} occurred. Arguments:\n{1!r}"
message = template.format(type(e).__name__, e.args)
print(message)

print("Done with frame capture")