Python
Deprecated
These are legacy examples from before the HV SDK was developed.
We strongly recommend using the SDKs dedicated functions instead.
Reading PAM datacubes
See PAM for more details on the file format itself.
import numpy as np
def read_pam_file(file_path, chan_rmv=None):
try:
# Open the .pam file for reading
with open(file_path, "rb") as file:
# Function to parse the PAM header until the "ENDHDR" marker
def parse_pam_header(file):
header = {}
while True:
line = file.readline().decode("utf-8").strip()
if not line:
break
if line == "ENDHDR":
break
parts = line.split(" ", 1)
if len(parts) == 2:
key, value = parts
header[key] = value
elif len(parts) == 1:
# Handle lines with no key (just a value)
header[line] = None
return header
# Parse the PAM header until "ENDHDR"
header = parse_pam_header(file)
# Get image properties from the header
width = int(header["WIDTH"]) # Actual width of image
channels = int(header["DEPTH"]) # Corresponds to the number of spectral channels
maxval = int(header["MAXVAL"])
height = int(header["HEIGHT"]) # Corresponds to the number of scan lines
interleave = header.get("TUPLTYPE")
# Read the binary data with the number of channels determined by depth
data = np.fromfile(file, dtype=np.uint8, count=width * height * channels)
# image_data = data.reshape(width, height, channels, order='F') # Fortran-style
if interleave == "BIL":
image_data = data.reshape(height, width, channels, order='C') # C-style
else: #BSQ
image_data = data.reshape(channels, height, width, order='C') # C-style
print("Shape of cube: ", image_data.shape)
if chan_rmv != None:
image_data = image_data[chan_rmv:-chan_rmv]
return image_data
except FileNotFoundError:
print(f"File not found: {file_path}")
return None
except Exception as e:
print(f"An error occurred: {e}")
return None
PAM interleave
The above example expects PAM files captured using the Buteo Hyperspectral Imaging System which have a BIL or BSQ interleave type.
Writing PNM files and PAM datacubes
See PAM for more details on the file format itself.
from datetime import datetime
import sys
# PNM/PAM
def needs_byteswap(frame):
# 8-bit
if frame.itemsize == 1:
return False
# Big Endian
if frame.dtype.byteorder == ">":
return False
# Big Endian native byte order
if frame.dtype.byteorder == "=" and sys.byteorder == "big":
return False
return True
def write_pnm(name, frame):
with open(name, "wb") as f:
height, width, channels = frame.shape
if channels == 1:
type = "P5"
else:
type = "P6"
if frame.nbytes == width * height * channels:
if frame.dtype == "uint8":
max_val = 255
else:
max_val = 65535
else:
max_val = 65535
pnm_header = f"{type} {width} {height} {max_val}\n"
f.write(bytearray(pnm_header, "ascii"))
if needs_byteswap(frame):
# swap data in memory in order to properly write PNM
# since PPM is Big Endian by definition
# Note that if the operation is done in place
# it will mess up the endianess when reading single values out
# so in this case use: frame = frame.byteswap(True).newbyteorder()
# in order to keep the correct byte order
frame.byteswap().tofile(f)
else:
frame.tofile(f)
# write PAM header for BIL interleave (Band-interleaved-by-line)
def write_pam_header(name, frame, gain, exposure, n_imgs):
with open(name, "wb") as f:
width, channels, height = frame.shape
if frame.nbytes == width * height * channels:
if frame.dtype == "uint8":
max_val = 255
else:
max_val = 65535
else:
max_val = 65535
height = n_imgs
# Get today's date and time
now = datetime.now()
date_str = now.strftime("%Y-%m-%d")
time_str = now.strftime("%H:%M:%S")
pam_header = (
f"P7\n"
f"#Date: {date_str}\n"
f"#Time: {time_str}\n"
f"#Gain: {gain}\n"
f"#Exposure: {exposure}\n"
f"WIDTH {width}\n"
f"HEIGHT {height}\n"
f"DEPTH {channels}\n"
f"MAXVAL {max_val}\n"
f"TUPLTYPE BIL\n"
f"ENDHDR\n"
)
f.write(bytearray(pam_header, "ascii"))
def append_pam_image(name, frame):
with open(name, "ab") as f:
if needs_byteswap(frame):
# Swap data in memory in order to properly write PAM
# since PAM is Big Endian by definition
# Note that if the operation is done in place
# it will mess up the endianess when reading single values out
# so in this case use: frame = frame.byteswap(True).newbyteorder()
# in order to keep the correct byte order
frame.byteswap().tofile(f)
else:
frame.tofile(f)
PAM interleave
The above example generates PAM which have a BIL interleave type.
Reflectance transformation
def refl_trans(fl, white_avg, dark_avg, clip=False, nan_replacement=None): #
"""Performs reflectance transformation on a hyperspectral image cube"""
# Return reflectance cube
cube = read_pam_file(fl)
cube = (cube - dark_avg) / (white_avg - dark_avg)
# <<The transpose gives the correct orientation of the real image with format of (H, W, C)
# assuming the Buteo conveyor belt moves toward the right>>
cube = np.transpose(cube, axes = (2, 1, 0))[::-1,::-1,:]
# handle values out of range
if clip:
cube = cube.clip(0.0, 1.0)
# handle NaN because of possible division by zero on (white - dark)
if nan_replacement is not None:
cube = cube.nan_to_num(nan_replacement)
return cube
Capturing datacubes
Deprecated
This is a legacy example from before the HV SDK had a camera interface available.
We strongly recommend using the SDKs dedicated hypervision camera API instead.
info
- Requires qamlib and should therefore be ran inside the camera (over ssh or with a screen and keyboard).
- This examples assumes a belt running at a constant speed
- This example generates datacubes with BIL interleave type
- The datacube files are saved to
/tmp(RAM) and will therefore be gone after a reboot- copy them over to a different machine using the
scpcommand orWinSCP- from the host PC:
scp root@<cam_ip>:/tmp/<file_name> <destination>- for example:
scp root@10.100.10.100:/tmp/_HSI_* .
- for example:
- from the host PC:
- Note that saving directly to the camera CFAST (for permanent storage) might
be too slow (slow IO operations)
- consider connecting a SSD to the camera via the USB ports if fast permanent storage is necessary
- Also note that the camera has 4GB of RAM, therefore there will be a limit
to how many images can be saved
- For example the full datacube bellow (with 1000 images) takes 1.1GB of space
- copy them over to a different machine using the
import qamlib
import time
import math
import numpy as np
from datetime import datetime
import sys
def current_datetime_filename():
return datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
SAVE_PAM = True
SAVE_PNM = False
pam_filename = f"/tmp/_HSI_{current_datetime_filename()}.pam"
# Adjust these as necessary
N_IMGS = 1000
EXP_TIME = 900 #us
GAIN = 1.0 #x
LEFT = 0
WIDTH = 1296
FIRST_BAND = 430.0 #nm
LAST_BAND = 1700.0 #nm
MIN_FPS = 10
MM_PER_PX = 0.35 #mm/px
BELT_SPEED = 100 #mm/s
#PX_SIZE*HEIGHT/FOCAL_LENGTH = BELT_WIDTH/1280
# converts gain from dB to x
def dB2x(dB):
return math.pow(10, dB/20.0)
def x2dB(x):
return 20.0*math.log10(x)
# spectograph
START_WL = 430.0 #nm
END_WL = 1700.0 #nm
CALIB_HEIGHT = 920
CALIB_TOP = 20
NM2LINE = (END_WL - START_WL)/CALIB_HEIGHT
SLIT_SIZE = 20.0 #um
PX_SIZE = 5.0 #um
# calculate framerate based on belt_speed (in mm/s)
def calc_fps(belt_speed, force_aspect_ratio=True):
# 1 to 1 sampling
if not force_aspect_ratio:
return 1/((SLIT_SIZE/PX_SIZE)*MM_PER_PX/belt_speed)
# aspect ratio of the resulting image is equal in the two spatial directions
else:
return 1/(MM_PER_PX/belt_speed)
# Wavelength to line number conversion
def band2line(band):
return round(CALIB_TOP + (band - START_WL)/NM2LINE)
# Line number to wavelength conversion
def line2band(line):
return (line - CALIB_TOP)*NM2LINE + START_WL
TOP = band2line(FIRST_BAND)
BOTTOM = band2line(LAST_BAND)
HEIGHT = BOTTOM - TOP
# Program start
try:
#opens camera at '/dev/qtec/video0'
cam = qamlib.Camera()
#cam.set_format("Y16") #16-bit
#cam.set_control("Sensor Bit Mode", 2) #12-bit
cam.set_format("GREY")
cam.set_control("Exposure Time, Absolute", EXP_TIME)
cam.set_control("Gain", round(x2dB(GAIN)*1000)) #mili dB
# binning: (adjust crop if binning is used)
#cam.set_resolution(WIDTH, HEIGHT/2)
#cam.set_control("Vertical Binning", 2)
cam.set_crop(LEFT, TOP, WIDTH, HEIGHT)
fps_limits = cam.get_framerates()
min_fps = max(fps_limits.min, MIN_FPS)
fps = calc_fps(BELT_SPEED)
if fps > fps_limits.max:
print(f"Warning required fps ({fps}) is over max: {fps_limits.max}")
fps = fps_limits.max
if fps < min_fps:
print(f"Warning required fps ({fps}) is under min: {min_fps}")
fps = min_fps
cam.set_framerate(fps)
# Read camera controls/parameters
img_format = cam.get_format()
px_format = img_format.pixelformat
exposure = cam.get_control("Exposure Time, Absolute")
gain = cam.get_control("Gain")
h_bin = cam.get_control('Horizontal Binning')
v_bin = cam.get_control('Vertical Binning')
print(f"FPS: {cam.get_framerate()} [{fps_limits}]")
print(f"Frame Size: {cam.get_resolution()}")
print(f"Crop: {cam.get_crop()}")
print(f"Binning: {h_bin}x{v_bin}")
print(f"Pixel format: {img_format}")
print(f"Exposure: {exposure}")
print(f"Gain: {dB2x(gain/1000)}")
print("\n")
except Exception as e:
template = "An exception of type {0} occurred. Arguments:\n{1!r}"
message = template.format(type(e).__name__, e.args)
print(message)
exit(-1)
# Start and stop streaming with context manager
print("Starting frame capture")
try:
with cam:
if SAVE_PAM:
meta, frame = cam.get_frame(buffered=True)
write_pam_header(pam_filename, frame, gain, exposure, N_IMGS)
#while True:
for i in range(N_IMGS):
meta, frame = cam.get_frame(buffered=True)
#print(f"{meta.sequence=}")
# save datacube
if SAVE_PAM:
append_pam_image(pam_filename, frame)
# save individual images
if SAVE_PNM:
write_pnm(f"/tmp/_HSI_{i}.pnm", frame)
except Exception as e:
template = "An exception of type {0} occurred. Arguments:\n{1!r}"
message = template.format(type(e).__name__, e.args)
print(message)
print("Done with frame capture")
Encoder read-out
Read-out of the encoder input in order to adjust the framerate to the belt speed.
# encoder
TICKS_PER_ROUND = 2048
SHAFT_RAD = 25 #mm
MM_PER_TICK = SHAFT_RAD*(2*math.pi)/TICKS_PER_ROUND
# Adjust camera settings/controls as before
...
# Start and stop streaming with context manager
print("Starting frame capture")
try:
with cam:
ticks_sec = None
speed_mm_per_sec = None
last_time = None
last_pos = None
time_diff = 0
pos_diff = 0
acc_time = 0
acc_pos = 0
if SAVE_PAM:
meta, frame = cam.get_frame(buffered=True)
write_pam_header(pam_filename, frame, gain, exposure, N_IMGS)
#while True:
for i in range(N_IMGS):
meta, frame = cam.get_frame(buffered=True)
enc_pos = cam.get_ext_control('Encoder Position').value
now = time.time()
#print(f"{meta.sequence=}")
#encoder
if last_time:
time_diff = now - last_time
if last_pos:
pos_diff = enc_pos - last_pos
if pos_diff < 0:
pos_diff += np.iinfo(enc_pos.dtype).max
acc_pos += pos_diff
acc_time += time_diff
# calc speed once a second
if acc_time >= 1.0:
ticks_sec = acc_pos/acc_time
speed_mm_per_sec = ticks_sec * MM_PER_TICK
shaft_rad = (MM_PER_TICK * TICKS_PER_ROUND)/(2*3.14)
print(f"{ticks_sec=} {speed_mm_per_sec=} {shaft_rad=}")
acc_time = 0
acc_pos = 0
# adjust fps
if speed_mm_per_sec > 1:
fps = calc_fps(speed_mm_per_sec)
if fps > fps_limits.max:
print(f"Warning required fps ({fps}) is over max: {fps_limits.max}")
fps = fps_limits.max
if fps < min_fps:
print(f"Warning required fps ({fps}) is under min: {min_fps}")
fps = min_fps
cam.set_framerate(fps)
print(f"New FPS: {cam.get_framerate()} [{fps_limits}]")
last_time = now
last_pos = enc_pos
#print(f"{enc_pos=} {pos_diff=} {time_diff=} {ticks_sec=} {speed_mm_per_sec=}")
# save datacube
if SAVE_PAM:
append_pam_image(pam_filename, frame)
# save individual images
if SAVE_PNM:
write_pnm(f"/tmp/_HSI_{i}.pnm", frame)
except Exception as e:
template = "An exception of type {0} occurred. Arguments:\n{1!r}"
message = template.format(type(e).__name__, e.args)
print(message)
print("Done with frame capture")