image recog

fukurou

the supreme coder
ADMIN
pip install torch torchvision pillow

Create a new file​

Create image_recog_threaded.py.

Python:
import threading
import time
import torch
from torchvision import models, transforms
from PIL import Image
import cv2

# -----------------------------
# GLOBAL STATIC RESULT VARIABLE
# -----------------------------
latest_recognition = None

# -----------------------------
# DEVICE + MODEL (CUDA)
# -----------------------------
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = models.resnet50(weights=models.ResNet50_Weights.DEFAULT)
model = model.to(device)
model.eval()

labels = models.ResNet50_Weights.DEFAULT.meta["categories"]

# -----------------------------
# PREPROCESSING
# -----------------------------
preprocess = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(
        mean=[0.485, 0.456, 0.406],
        std=[0.229, 0.224, 0.225]
    )
])

# -----------------------------
# THREAD FUNCTION (RUNS ONCE)
# -----------------------------
def recognition_once_from_camera():
    global latest_recognition

    cap = cv2.VideoCapture(0)
    ret, frame = cap.read()
    cap.release()

    if not ret:
        latest_recognition = "Camera error"
        return

    img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
    tensor = preprocess(img).unsqueeze(0).to(device)

    with torch.no_grad():
        outputs = model(tensor)
        _, predicted = outputs.max(1)

    latest_recognition = labels[predicted]

# -----------------------------
# WRAPPER FUNCTION (YOU CALL THIS)
# -----------------------------
def start_image_recognition():
    t = threading.Thread(
        target=recognition_once_from_camera,
        daemon=True
    )
    t.start()
    return t

# -----------------------------
# MAIN
# -----------------------------
if __name__ == "__main__":
    print("Starting recognition…")
    t = start_image_recognition()

    while t.is_alive():
        print("Waiting…")
        time.sleep(0.1)

    print("Recognition result:", latest_recognition)
 
Last edited:

fukurou

the supreme coder
ADMIN
with image detection:
Python:
import threading
import time
import torch
from torchvision import models, transforms
from PIL import Image
import cv2
from ultralytics import YOLO
from typing import List


# -----------------------------------------
# RECTANGLE CLASS
# -----------------------------------------
class Rectangle:
    def __init__(self, x1: int, y1: int, x2: int, y2: int):
        self.x1: int = x1
        self.y1: int = y1
        self.x2: int = x2
        self.y2: int = y2

    def __repr__(self) -> str:
        return f"Rectangle({self.x1}, {self.y1}, {self.x2}, {self.y2})"


# -----------------------------------------
# GLOBAL STATIC VARIABLES (TYPE ANNOTATED)
# -----------------------------------------
latest_recognition: str = ""                 # always a string
latest_detections: List[Rectangle] = []      # list of Rectangle objects


# -----------------------------------------
# CUDA DEVICE
# -----------------------------------------
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


# -----------------------------------------
# CLASSIFICATION MODEL (ResNet50)
# -----------------------------------------
clf_model = models.resnet50(weights=models.ResNet50_Weights.DEFAULT)
clf_model = clf_model.to(device)
clf_model.eval()
clf_labels = models.ResNet50_Weights.DEFAULT.meta["categories"]


# -----------------------------------------
# DETECTION MODEL (YOLOv8n)
# -----------------------------------------
det_model = YOLO("yolov8n.pt")
det_model.to(device)


# -----------------------------------------
# PREPROCESSING FOR CLASSIFICATION
# -----------------------------------------
preprocess = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(
        mean=[0.485, 0.456, 0.406],
        std=[0.229, 0.224, 0.225]
    )
])


# -----------------------------------------
# THREAD FUNCTION (RUNS ONCE, THEN DIES)
# -----------------------------------------
def recognition_once_from_camera(camera_index: int = 0) -> None:
    global latest_recognition, latest_detections

    cap = cv2.VideoCapture(camera_index)
    ret, frame = cap.read()
    cap.release()

    if not ret:
        latest_recognition = "Camera error"
        latest_detections = []
        return

    # -------------------------------------
    # OBJECT DETECTION (YOLO)
    # -------------------------------------
    results = det_model(frame, verbose=False)[0]

    rects: List[Rectangle] = []
    for box in results.boxes:
        x1, y1, x2, y2 = box.xyxy[0].cpu().numpy().astype(int)
        rects.append(Rectangle(x1, y1, x2, y2))

    latest_detections = rects

    # -------------------------------------
    # CLASSIFICATION (ResNet50)
    # -------------------------------------
    img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
    tensor = preprocess(img).unsqueeze(0).to(device)

    with torch.no_grad():
        outputs = clf_model(tensor)
        _, predicted = outputs.max(1)

    latest_recognition = clf_labels[predicted]


# -----------------------------------------
# WRAPPER FUNCTION (YOU CALL THIS)
# -----------------------------------------
def start_image_recognition(camera_index: int = 0) -> threading.Thread:
    t = threading.Thread(
        target=recognition_once_from_camera,
        args=(camera_index,),
        daemon=True
    )
    t.start()
    return t


# -----------------------------------------
# MAIN (EXAMPLE)
# -----------------------------------------
if __name__ == "__main__":
    print("Starting recognition…")

    t = start_image_recognition()

    while t.is_alive():
        print("Waiting…")
        time.sleep(0.1)

    print("Classification:", latest_recognition)
    print("Detections:", latest_detections)
 

fukurou

the supreme coder
ADMIN
full shit in the ass mode with list of recognitions, detections:
Python:
import threading
import time
import torch
from torchvision import models, transforms
from PIL import Image
import cv2
from ultralytics import YOLO
from typing import List


# -----------------------------------------
# RECTANGLE CLASS
# -----------------------------------------
class Rectangle:
    def __init__(self, x1: int, y1: int, x2: int, y2: int):
        self.x1: int = x1
        self.y1: int = y1
        self.x2: int = x2
        self.y2: int = y2

    def __repr__(self) -> str:
        return f"Rectangle({self.x1}, {self.y1}, {self.x2}, {self.y2})"


# -----------------------------------------
# GLOBAL STATIC VARIABLES (TYPE ANNOTATED)
# -----------------------------------------
latest_detections: List[Rectangle] = []   # rectangles only
latest_recognitions: List[str] = []       # list of labels only


# -----------------------------------------
# CUDA DEVICE
# -----------------------------------------
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


# -----------------------------------------
# CLASSIFICATION MODEL (ResNet50)
# -----------------------------------------
clf_model = models.resnet50(weights=models.ResNet50_Weights.DEFAULT)
clf_model = clf_model.to(device)
clf_model.eval()
clf_labels = models.ResNet50_Weights.DEFAULT.meta["categories"]


# -----------------------------------------
# DETECTION MODEL (YOLOv8n)
# -----------------------------------------
det_model = YOLO("yolov8n.pt")
det_model.to(device)


# -----------------------------------------
# PREPROCESSING FOR CLASSIFICATION
# -----------------------------------------
preprocess = transforms.Compose([
    transforms.Resize(224),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(
        mean=[0.485, 0.456, 0.406],
        std=[0.229, 0.224, 0.225]
    )
])


# -----------------------------------------
# THREAD FUNCTION (RUNS ONCE, THEN DIES)
# -----------------------------------------
def recognition_once_from_camera(camera_index: int = 0) -> None:
    global latest_detections, latest_recognitions

    latest_detections = []
    latest_recognitions = []

    cap = cv2.VideoCapture(camera_index)
    ret, frame = cap.read()
    cap.release()

    if not ret:
        return

    # -------------------------------------
    # OBJECT DETECTION (YOLO)
    # -------------------------------------
    results = det_model(frame, verbose=False)[0]

    rects: List[Rectangle] = []
    recog_list: List[str] = []

    for box in results.boxes:
        x1, y1, x2, y2 = box.xyxy[0].cpu().numpy().astype(int)
        rect = Rectangle(x1, y1, x2, y2)
        rects.append(rect)

        # Crop object
        crop = frame[y1:y2, x1:x2]
        if crop.size == 0:
            continue

        img = Image.fromarray(cv2.cvtColor(crop, cv2.COLOR_BGR2RGB))
        tensor = preprocess(img).unsqueeze(0).to(device)

        # Classify crop
        with torch.no_grad():
            outputs = clf_model(tensor)
            _, predicted = outputs.max(1)

        label = clf_labels[predicted]
        recog_list.append(label)

    latest_detections = rects
    latest_recognitions = recog_list


# -----------------------------------------
# WRAPPER FUNCTION (YOU CALL THIS)
# -----------------------------------------
def start_image_recognition(camera_index: int = 0) -> threading.Thread:
    t = threading.Thread(
        target=recognition_once_from_camera,
        args=(camera_index,),
        daemon=True
    )
    t.start()
    return t


# -----------------------------------------
# MAIN (EXAMPLE)
# -----------------------------------------
if __name__ == "__main__":
    print("Starting recognition…")

    t = start_image_recognition()

    while t.is_alive():
        print("Waiting…")
        time.sleep(0.1)

    print("Detections:", latest_detections)
    print("Recognitions:", latest_recognitions)
 

fukurou

the supreme coder
ADMIN
with directions

Python:
import threading
import time
import torch
from torchvision import models, transforms
from PIL import Image
import cv2
from ultralytics import YOLO
from typing import List, Optional


# -----------------------------------------
# RECTANGLE CLASS
# -----------------------------------------
class Rectangle:
    def __init__(self, x1: int, y1: int, x2: int, y2: int):
        self.x1: int = x1
        self.y1: int = y1
        self.x2: int = x2
        self.y2: int = y2

    def area(self) -> int:
        return max(0, self.x2 - self.x1) * max(0, self.y2 - self.y1)

    def __repr__(self) -> str:
        return f"Rectangle({self.x1}, {self.y1}, {self.x2}, {self.y2})"


# -----------------------------------------
# GLOBAL STATIC VARIABLES
# -----------------------------------------
latest_detections: List[Rectangle] = []
latest_recognitions: List[str] = []

prev_rect: Optional[Rectangle] = None
movement_direction: str = ""


# -----------------------------------------
# CUDA DEVICE
# -----------------------------------------
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


# -----------------------------------------
# CLASSIFICATION MODEL
# -----------------------------------------
clf_model = models.resnet50(weights=models.ResNet50_Weights.DEFAULT)
clf_model = clf_model.to(device)
clf_model.eval()
clf_labels = models.ResNet50_Weights.DEFAULT.meta["categories"]


# -----------------------------------------
# DETECTION MODEL
# -----------------------------------------
det_model = YOLO("yolov8n.pt")
det_model.to(device)


# -----------------------------------------
# PREPROCESSING
# -----------------------------------------
preprocess = transforms.Compose([
    transforms.Resize(224),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(
        mean=[0.485, 0.456, 0.406],
        std=[0.229, 0.224, 0.225]
    )
])


# -----------------------------------------
# MOVEMENT DETECTION
# -----------------------------------------
def detect_movement(old: Rectangle, new: Rectangle) -> str:
    dx = new.x1 - old.x1
    dy = new.y1 - old.y1
    da = new.area() - old.area()

    horizontal = ""
    vertical = ""
    depth = ""

    if dx < 0:
        horizontal = "left"
    elif dx > 0:
        horizontal = "right"

    if dy < 0:
        vertical = "up"
    elif dy > 0:
        vertical = "down"

    if da > 0:
        depth = "closer"
    elif da < 0:
        depth = "further"

    parts = [p for p in (horizontal, vertical, depth) if p]
    return " ".join(parts)


# -----------------------------------------
# THREAD FUNCTION (RUNS ONCE)
# -----------------------------------------
def recognition_once_from_camera(camera_index: int = 0) -> None:
    global latest_detections, latest_recognitions
    global prev_rect, movement_direction

    latest_detections = []
    latest_recognitions = []
    movement_direction = ""

    cap = cv2.VideoCapture(camera_index)
    ret, frame = cap.read()
    cap.release()

    if not ret:
        return

    # DETECTION
    results = det_model(frame, verbose=False)[0]

    rects: List[Rectangle] = []
    recog_list: List[str] = []

    for box in results.boxes:
        x1, y1, x2, y2 = box.xyxy[0].cpu().numpy().astype(int)
        rect = Rectangle(x1, y1, x2, y2)
        rects.append(rect)

        crop = frame[y1:y2, x1:x2]
        if crop.size == 0:
            continue

        img = Image.fromarray(cv2.cvtColor(crop, cv2.COLOR_BGR2RGB))
        tensor = preprocess(img).unsqueeze(0).to(device)

        with torch.no_grad():
            outputs = clf_model(tensor)
            _, predicted = outputs.max(1)

        label = clf_labels[predicted]
        recog_list.append(label)

    latest_detections = rects
    latest_recognitions = recog_list

    # MOVEMENT DETECTION
    if rects:
        new_rect = rects[0]  # track first object
        if prev_rect is not None:
            movement_direction = detect_movement(prev_rect, new_rect)
        prev_rect = new_rect


# -----------------------------------------
# WRAPPER FUNCTION
# -----------------------------------------
def start_image_recognition(camera_index: int = 0) -> threading.Thread:
    t = threading.Thread(
        target=recognition_once_from_camera,
        args=(camera_index,),
        daemon=True
    )
    t.start()
    return t


# -----------------------------------------
# MAIN (EXAMPLE)
# -----------------------------------------
if __name__ == "__main__":
    print("Starting recognition…")

    t = start_image_recognition()

    while t.is_alive():
        print("Waiting…")
        time.sleep(0.1)

    print("Detections:", latest_detections)
    print("Recognitions:", latest_recognitions)
    print("Movement:", movement_direction)
 

fukurou

the supreme coder
ADMIN
updated Python version with per‑object movement (each bounding box tracked individually), using parallel lists

Python:
import threading
import time
import torch
from torchvision import models, transforms
from PIL import Image
import cv2
from ultralytics import YOLO
from typing import List


# -----------------------------------------
# RECTANGLE CLASS
# -----------------------------------------
class Rectangle:
    def __init__(self, x1: int, y1: int, x2: int, y2: int):
        self.x1: int = x1
        self.y1: int = y1
        self.x2: int = x2
        self.y2: int = y2

    def area(self) -> int:
        return max(0, self.x2 - self.x1) * max(0, self.y2 - self.y1)

    def __repr__(self) -> str:
        return f"Rectangle({self.x1}, {self.y1}, {self.x2}, {self.y2})"


# -----------------------------------------
# GLOBAL STATIC VARIABLES (TYPE ANNOTATED)
# -----------------------------------------
latest_detections: List[Rectangle] = []   # rectangles only
latest_recognitions: List[str] = []       # labels only
latest_movements: List[str] = []          # movement per object (same index as above)

_prev_rects: List[Rectangle] = []         # internal previous frame rects


# -----------------------------------------
# CUDA DEVICE
# -----------------------------------------
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


# -----------------------------------------
# CLASSIFICATION MODEL (ResNet50)
# -----------------------------------------
clf_model = models.resnet50(weights=models.ResNet50_Weights.DEFAULT)
clf_model = clf_model.to(device)
clf_model.eval()
clf_labels = models.ResNet50_Weights.DEFAULT.meta["categories"]


# -----------------------------------------
# DETECTION MODEL (YOLOv8n)
# -----------------------------------------
det_model = YOLO("yolov8n.pt")
det_model.to(device)


# -----------------------------------------
# PREPROCESSING FOR CLASSIFICATION
# -----------------------------------------
preprocess = transforms.Compose([
    transforms.Resize(224),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(
        mean=[0.485, 0.456, 0.406],
        std=[0.229, 0.224, 0.225]
    )
])


# -----------------------------------------
# MOVEMENT DETECTION (PER OBJECT)
# -----------------------------------------
def _detect_movement(old: Rectangle, new: Rectangle) -> str:
    dx = new.x1 - old.x1
    dy = new.y1 - old.y1
    da = new.area() - old.area()

    horizontal = ""
    vertical = ""
    depth = ""

    if dx < 0:
        horizontal = "left"
    elif dx > 0:
        horizontal = "right"

    if dy < 0:
        vertical = "up"
    elif dy > 0:
        vertical = "down"

    if da > 0:
        depth = "closer"
    elif da < 0:
        depth = "further"

    parts = [p for p in (horizontal, vertical, depth) if p]
    return " ".join(parts) if parts else "static"


# -----------------------------------------
# THREAD FUNCTION (RUNS ONCE, THEN DIES)
# -----------------------------------------
def recognition_once_from_camera(camera_index: int = 0) -> None:
    global latest_detections, latest_recognitions, latest_movements, _prev_rects

    latest_detections = []
    latest_recognitions = []
    latest_movements = []

    cap = cv2.VideoCapture(camera_index)
    ret, frame = cap.read()
    cap.release()

    if not ret:
        _prev_rects = []
        return

    results = det_model(frame, verbose=False)[0]

    rects: List[Rectangle] = []
    recog_list: List[str] = []

    for box in results.boxes:
        x1, y1, x2, y2 = box.xyxy[0].cpu().numpy().astype(int)
        rect = Rectangle(x1, y1, x2, y2)
        rects.append(rect)

        crop = frame[y1:y2, x1:x2]
        if crop.size == 0:
            recog_list.append("")
            continue

        img = Image.fromarray(cv2.cvtColor(crop, cv2.COLOR_BGR2RGB))
        tensor = preprocess(img).unsqueeze(0).to(device)

        with torch.no_grad():
            outputs = clf_model(tensor)
            _, predicted = outputs.max(1)

        label = clf_labels[predicted]
        recog_list.append(label)

    latest_detections = rects
    latest_recognitions = recog_list

    movements: List[str] = []

    if _prev_rects and len(_prev_rects) == len(rects):
        for old, new in zip(_prev_rects, rects):
            movements.append(_detect_movement(old, new))
    else:
        movements = ["static"] * len(rects)

    latest_movements = movements
    _prev_rects = rects


# -----------------------------------------
# WRAPPER FUNCTION (YOU CALL THIS)
# -----------------------------------------
def start_image_recognition(camera_index: int = 0) -> threading.Thread:
    t = threading.Thread(
        target=recognition_once_from_camera,
        args=(camera_index,),
        daemon=True
    )
    t.start()
    return t


# -----------------------------------------
# MAIN (EXAMPLE)
# -----------------------------------------
if __name__ == "__main__":
    print("Starting recognition…")

    t = start_image_recognition()

    while t.is_alive():
        print("Waiting…")
        time.sleep(0.1)

    print("Detections:", latest_detections)
    print("Recognitions:", latest_recognitions)
    print("Movements:", latest_movements)
 
Top