Python:
import threading
import time
from typing import List, Tuple, Optional
import cv2
import numpy as np
# -----------------------------------------
# RECTANGLE CLASS (IMAGE COORDS)
# -----------------------------------------
class Rectangle:
def __init__(self, x1: int, y1: int, x2: int, y2: int):
self.x1: int = x1
self.y1: int = y1
self.x2: int = x2
self.y2: int = y2
def width(self) -> int:
return max(0, self.x2 - self.x1)
def height(self) -> int:
return max(0, self.y2 - self.y1)
def area(self) -> int:
return self.width() * self.height()
def center(self) -> Tuple[float, float]:
return (self.x1 + self.x2) / 2.0, (self.y1 + self.y2) / 2.0
def __repr__(self) -> str:
return f"Rectangle({self.x1}, {self.y1}, {self.x2}, {self.y2})"
# -----------------------------------------
# GLOBAL STATIC RESULTS
# -----------------------------------------
latest_blob_rects: List[Rectangle] = []
latest_blob_shapes: List[str] = []
latest_blob_movements: List[str] = []
_prev_rects: List[Rectangle] = []
_prev_centers: List[Tuple[float, float]] = []
_prev_areas: List[float] = []
# control flags
recognition_running = False
recognition_done = False
# -----------------------------------------
# DYNAMIC BLACK PIXEL DETECTION
# -----------------------------------------
def is_black_mask(gray: np.ndarray) -> np.ndarray:
thr = np.percentile(gray, 25)
return gray < thr
# -----------------------------------------
# GRID BUILDING (VECTORIZED)
# -----------------------------------------
def build_grid(mask: np.ndarray, grid_w: int, grid_h: int) -> np.ndarray:
h, w = mask.shape
cell_h = h // grid_h
cell_w = w // grid_w
h_use = cell_h * grid_h
w_use = cell_w * grid_w
mask_cropped = mask[:h_use, :w_use]
grid_view = mask_cropped.reshape(grid_h, cell_h, grid_w, cell_w)
grid = grid_view.any(axis=(1, 3))
return grid
# -----------------------------------------
# CONNECTED COMPONENTS ON GRID (BLOBS)
# -----------------------------------------
def find_grid_blobs(grid: np.ndarray) -> List[List[Tuple[int, int]]]:
h, w = grid.shape
visited = np.zeros_like(grid, dtype=bool)
blobs: List[List[Tuple[int, int]]] = []
def neighbors(x: int, y: int):
for dx, dy in ((1, 0), (-1, 0), (0, 1), (0, -1)):
nx, ny = x + dx, y + dy
if 0 <= nx < w and 0 <= ny < h:
yield nx, ny
for gy in range(h):
for gx in range(w):
if not grid[gy, gx] or visited[gy, gx]:
continue
stack = [(gx, gy)]
visited[gy, gx] = True
blob: List[Tuple[int, int]] = []
while stack:
cx, cy = stack.pop()
blob.append((cx, cy))
for nx, ny in neighbors(cx, cy):
if grid[ny, nx] and not visited[ny, nx]:
visited[ny, nx] = True
stack.append((nx, ny))
blobs.append(blob)
return blobs
# -----------------------------------------
# SHAPE RECOGNITION FROM GRID BLOB
# -----------------------------------------
def classify_blob_shape(blob: List[Tuple[int, int]]) -> str:
xs = [p[0] for p in blob]
ys = [p[1] for p in blob]
min_x, max_x = min(xs), max(xs)
min_y, max_y = min(ys), max(ys)
w = max_x - min_x + 1
h = max_y - min_y + 1
area_box = w * h
area_blob = len(blob)
fill_ratio = area_blob / area_box if area_box > 0 else 0.0
ar = w / h if h != 0 else 0
if area_blob == 1:
return "dot"
if fill_ratio > 0.7:
if 0.8 <= ar <= 1.2:
return "block"
elif ar > 1.2:
return "wide"
else:
return "tall"
else:
if ar > 2.0:
return "line-horizontal"
elif ar < 0.5:
return "line-vertical"
else:
return "sparse"
# -----------------------------------------
# MAP GRID BLOB TO IMAGE RECTANGLE
# -----------------------------------------
def blob_to_rectangle(
blob: List[Tuple[int, int]],
img_w: int,
img_h: int,
grid_w: int,
grid_h: int,
) -> Rectangle:
xs = [p[0] for p in blob]
ys = [p[1] for p in blob]
min_x, max_x = min(xs), max(xs)
min_y, max_y = min(ys), max(ys)
cell_w = img_w / grid_w
cell_h = img_h / grid_h
x1 = int(min_x * cell_w)
y1 = int(min_y * cell_h)
x2 = int((max_x + 1) * cell_w)
y2 = int((max_y + 1) * cell_h)
return Rectangle(x1, y1, x2, y2)
# -----------------------------------------
# MOVEMENT DETECTION
# -----------------------------------------
def movement_from_delta(
old_center: Tuple[float, float],
new_center: Tuple[float, float],
old_area: float,
new_area: float,
pos_eps: float = 1.0,
area_eps: float = 1.0,
) -> str:
ox, oy = old_center
nx, ny = new_center
dx = nx - ox
dy = ny - oy
da = new_area - old_area
horizontal = ""
vertical = ""
depth = ""
if dx < -pos_eps:
horizontal = "left"
elif dx > pos_eps:
horizontal = "right"
if dy < -pos_eps:
vertical = "up"
elif dy > pos_eps:
vertical = "down"
if da > area_eps:
depth = "closer"
elif da < -area_eps:
depth = "further"
parts = [p for p in (horizontal, vertical, depth) if p]
return " ".join(parts) if parts else "static"
def match_prev_to_current(
prev_centers: List[Tuple[float, float]],
curr_centers: List[Tuple[float, float]],
) -> List[Optional[int]]:
matches: List[Optional[int]] = [None] * len(curr_centers)
used_prev = set()
for i, c in enumerate(curr_centers):
cx, cy = c
best_idx = None
best_dist = float("inf")
for j, p in enumerate(prev_centers):
if j in used_prev:
continue
px, py = p
d = (cx - px) ** 2 + (cy - py) ** 2
if d < best_dist:
best_dist = d
best_idx = j
if best_idx is not None:
matches[i] = best_idx
used_prev.add(best_idx)
return matches
# -----------------------------------------
# PROCESS SINGLE FRAME
# -----------------------------------------
def process_frame(
frame: np.ndarray,
grid_w: int = 30,
grid_h: int = 30,
resize_w: int = 320,
resize_h: int = 240,
) -> None:
global latest_blob_rects, latest_blob_shapes, latest_blob_movements
global _prev_rects, _prev_centers, _prev_areas
frame_small = cv2.resize(frame, (resize_w, resize_h), interpolation=cv2.INTER_AREA)
h, w, _ = frame_small.shape
gray = cv2.cvtColor(frame_small, cv2.COLOR_BGR2GRAY)
black = is_black_mask(gray)
grid = build_grid(black, grid_w, grid_h)
blobs = find_grid_blobs(grid)
rects: List[Rectangle] = []
shapes: List[str] = []
centers: List[Tuple[float, float]] = []
areas: List[float] = []
for blob in blobs:
rect = blob_to_rectangle(blob, w, h, grid_w, grid_h)
shape = classify_blob_shape(blob)
rects.append(rect)
shapes.append(shape)
centers.append(rect.center())
areas.append(float(rect.area()))
movements: List[str] = []
if _prev_centers:
matches = match_prev_to_current(_prev_centers, centers)
for i, match_idx in enumerate(matches):
if match_idx is None:
movements.append("static")
else:
mv = movement_from_delta(
_prev_centers[match_idx],
centers[i],
_prev_areas[match_idx],
areas[i],
)
movements.append(mv)
else:
movements = ["static"] * len(rects)
latest_blob_rects = rects
latest_blob_shapes = shapes
latest_blob_movements = movements
_prev_rects = rects
_prev_centers = centers
_prev_areas = areas
# -----------------------------------------
# THREAD TARGET — RUNS ONCE
# -----------------------------------------
def _camera_once(camera_index: int = 0):
global recognition_running, recognition_done
cap = cv2.VideoCapture(camera_index)
ret, frame = cap.read()
cap.release()
if ret:
process_frame(frame)
recognition_done = True
recognition_running = False
# -----------------------------------------
# SAFE start_eye() — RUNS ONCE ONLY
# -----------------------------------------
def start_eye(camera_index: int = 0) -> Optional[threading.Thread]:
global recognition_running, recognition_done
if recognition_running:
return None # ignore duplicate calls
recognition_running = True
recognition_done = False
t = threading.Thread(
target=_camera_once,
args=(camera_index,),
daemon=True,
)
t.start()
return t
# -----------------------------------------
# DEMO
# -----------------------------------------
if __name__ == "__main__":
print("Starting optimized AEye…")
while True:
start_eye() # safe to call every loop
while not recognition_done:
time.sleep(0.01)
print("Rects:", latest_blob_rects)
print("Shapes:", latest_blob_shapes)
print("Movements:", latest_blob_movements)
print("-" * 40)
time.sleep(0.5)