🐍 python aeye reborn

python

fukurou

the supreme coder
ADMIN
Python:
import threading
import time
from typing import List, Tuple, Optional

import cv2
import numpy as np


# -----------------------------------------
# RECTANGLE CLASS (IMAGE COORDS)
# -----------------------------------------
class Rectangle:
    def __init__(self, x1: int, y1: int, x2: int, y2: int):
        self.x1: int = x1
        self.y1: int = y1
        self.x2: int = x2
        self.y2: int = y2

    def width(self) -> int:
        return max(0, self.x2 - self.x1)

    def height(self) -> int:
        return max(0, self.y2 - self.y1)

    def area(self) -> int:
        return self.width() * self.height()

    def center(self) -> Tuple[float, float]:
        return (self.x1 + self.x2) / 2.0, (self.y1 + self.y2) / 2.0

    def __repr__(self) -> str:
        return f"Rectangle({self.x1}, {self.y1}, {self.x2}, {self.y2})"


# -----------------------------------------
# GLOBAL STATIC RESULTS
# -----------------------------------------
latest_blob_rects: List[Rectangle] = []
latest_blob_shapes: List[str] = []
latest_blob_movements: List[str] = []

_prev_rects: List[Rectangle] = []
_prev_centers: List[Tuple[float, float]] = []
_prev_areas: List[float] = []

# control flags
recognition_running = False
recognition_done = False


# -----------------------------------------
# DYNAMIC BLACK PIXEL DETECTION
# -----------------------------------------
def is_black_mask(gray: np.ndarray) -> np.ndarray:
    thr = np.percentile(gray, 25)
    return gray < thr


# -----------------------------------------
# GRID BUILDING (VECTORIZED)
# -----------------------------------------
def build_grid(mask: np.ndarray, grid_w: int, grid_h: int) -> np.ndarray:
    h, w = mask.shape
    cell_h = h // grid_h
    cell_w = w // grid_w
    h_use = cell_h * grid_h
    w_use = cell_w * grid_w

    mask_cropped = mask[:h_use, :w_use]
    grid_view = mask_cropped.reshape(grid_h, cell_h, grid_w, cell_w)
    grid = grid_view.any(axis=(1, 3))
    return grid


# -----------------------------------------
# CONNECTED COMPONENTS ON GRID (BLOBS)
# -----------------------------------------
def find_grid_blobs(grid: np.ndarray) -> List[List[Tuple[int, int]]]:
    h, w = grid.shape
    visited = np.zeros_like(grid, dtype=bool)
    blobs: List[List[Tuple[int, int]]] = []

    def neighbors(x: int, y: int):
        for dx, dy in ((1, 0), (-1, 0), (0, 1), (0, -1)):
            nx, ny = x + dx, y + dy
            if 0 <= nx < w and 0 <= ny < h:
                yield nx, ny

    for gy in range(h):
        for gx in range(w):
            if not grid[gy, gx] or visited[gy, gx]:
                continue
            stack = [(gx, gy)]
            visited[gy, gx] = True
            blob: List[Tuple[int, int]] = []
            while stack:
                cx, cy = stack.pop()
                blob.append((cx, cy))
                for nx, ny in neighbors(cx, cy):
                    if grid[ny, nx] and not visited[ny, nx]:
                        visited[ny, nx] = True
                        stack.append((nx, ny))
            blobs.append(blob)

    return blobs


# -----------------------------------------
# SHAPE RECOGNITION FROM GRID BLOB
# -----------------------------------------
def classify_blob_shape(blob: List[Tuple[int, int]]) -> str:
    xs = [p[0] for p in blob]
    ys = [p[1] for p in blob]
    min_x, max_x = min(xs), max(xs)
    min_y, max_y = min(ys), max(ys)
    w = max_x - min_x + 1
    h = max_y - min_y + 1
    area_box = w * h
    area_blob = len(blob)
    fill_ratio = area_blob / area_box if area_box > 0 else 0.0
    ar = w / h if h != 0 else 0

    if area_blob == 1:
        return "dot"
    if fill_ratio > 0.7:
        if 0.8 <= ar <= 1.2:
            return "block"
        elif ar > 1.2:
            return "wide"
        else:
            return "tall"
    else:
        if ar > 2.0:
            return "line-horizontal"
        elif ar < 0.5:
            return "line-vertical"
        else:
            return "sparse"


# -----------------------------------------
# MAP GRID BLOB TO IMAGE RECTANGLE
# -----------------------------------------
def blob_to_rectangle(
    blob: List[Tuple[int, int]],
    img_w: int,
    img_h: int,
    grid_w: int,
    grid_h: int,
) -> Rectangle:
    xs = [p[0] for p in blob]
    ys = [p[1] for p in blob]
    min_x, max_x = min(xs), max(xs)
    min_y, max_y = min(ys), max(ys)

    cell_w = img_w / grid_w
    cell_h = img_h / grid_h

    x1 = int(min_x * cell_w)
    y1 = int(min_y * cell_h)
    x2 = int((max_x + 1) * cell_w)
    y2 = int((max_y + 1) * cell_h)

    return Rectangle(x1, y1, x2, y2)


# -----------------------------------------
# MOVEMENT DETECTION
# -----------------------------------------
def movement_from_delta(
    old_center: Tuple[float, float],
    new_center: Tuple[float, float],
    old_area: float,
    new_area: float,
    pos_eps: float = 1.0,
    area_eps: float = 1.0,
) -> str:
    ox, oy = old_center
    nx, ny = new_center

    dx = nx - ox
    dy = ny - oy
    da = new_area - old_area

    horizontal = ""
    vertical = ""
    depth = ""

    if dx < -pos_eps:
        horizontal = "left"
    elif dx > pos_eps:
        horizontal = "right"

    if dy < -pos_eps:
        vertical = "up"
    elif dy > pos_eps:
        vertical = "down"

    if da > area_eps:
        depth = "closer"
    elif da < -area_eps:
        depth = "further"

    parts = [p for p in (horizontal, vertical, depth) if p]
    return " ".join(parts) if parts else "static"


def match_prev_to_current(
    prev_centers: List[Tuple[float, float]],
    curr_centers: List[Tuple[float, float]],
) -> List[Optional[int]]:
    matches: List[Optional[int]] = [None] * len(curr_centers)
    used_prev = set()

    for i, c in enumerate(curr_centers):
        cx, cy = c
        best_idx = None
        best_dist = float("inf")
        for j, p in enumerate(prev_centers):
            if j in used_prev:
                continue
            px, py = p
            d = (cx - px) ** 2 + (cy - py) ** 2
            if d < best_dist:
                best_dist = d
                best_idx = j
        if best_idx is not None:
            matches[i] = best_idx
            used_prev.add(best_idx)

    return matches


# -----------------------------------------
# PROCESS SINGLE FRAME
# -----------------------------------------
def process_frame(
    frame: np.ndarray,
    grid_w: int = 30,
    grid_h: int = 30,
    resize_w: int = 320,
    resize_h: int = 240,
) -> None:
    global latest_blob_rects, latest_blob_shapes, latest_blob_movements
    global _prev_rects, _prev_centers, _prev_areas

    frame_small = cv2.resize(frame, (resize_w, resize_h), interpolation=cv2.INTER_AREA)
    h, w, _ = frame_small.shape

    gray = cv2.cvtColor(frame_small, cv2.COLOR_BGR2GRAY)
    black = is_black_mask(gray)
    grid = build_grid(black, grid_w, grid_h)
    blobs = find_grid_blobs(grid)

    rects: List[Rectangle] = []
    shapes: List[str] = []
    centers: List[Tuple[float, float]] = []
    areas: List[float] = []

    for blob in blobs:
        rect = blob_to_rectangle(blob, w, h, grid_w, grid_h)
        shape = classify_blob_shape(blob)
        rects.append(rect)
        shapes.append(shape)
        centers.append(rect.center())
        areas.append(float(rect.area()))

    movements: List[str] = []

    if _prev_centers:
        matches = match_prev_to_current(_prev_centers, centers)
        for i, match_idx in enumerate(matches):
            if match_idx is None:
                movements.append("static")
            else:
                mv = movement_from_delta(
                    _prev_centers[match_idx],
                    centers[i],
                    _prev_areas[match_idx],
                    areas[i],
                )
                movements.append(mv)
    else:
        movements = ["static"] * len(rects)

    latest_blob_rects = rects
    latest_blob_shapes = shapes
    latest_blob_movements = movements

    _prev_rects = rects
    _prev_centers = centers
    _prev_areas = areas


# -----------------------------------------
# THREAD TARGET — RUNS ONCE
# -----------------------------------------
def _camera_once(camera_index: int = 0):
    global recognition_running, recognition_done

    cap = cv2.VideoCapture(camera_index)
    ret, frame = cap.read()
    cap.release()

    if ret:
        process_frame(frame)

    recognition_done = True
    recognition_running = False


# -----------------------------------------
# SAFE start_eye() — RUNS ONCE ONLY
# -----------------------------------------
def start_eye(camera_index: int = 0) -> Optional[threading.Thread]:
    global recognition_running, recognition_done

    if recognition_running:
        return None  # ignore duplicate calls

    recognition_running = True
    recognition_done = False

    t = threading.Thread(
        target=_camera_once,
        args=(camera_index,),
        daemon=True,
    )
    t.start()
    return t


# -----------------------------------------
# DEMO
# -----------------------------------------
if __name__ == "__main__":
    print("Starting optimized AEye…")

    while True:
        start_eye()  # safe to call every loop

        while not recognition_done:
            time.sleep(0.01)

        print("Rects:", latest_blob_rects)
        print("Shapes:", latest_blob_shapes)
        print("Movements:", latest_blob_movements)
        print("-" * 40)

        time.sleep(0.5)
 
Top