pip install torch torchvision pillow
Create a new file
Create image_recog_threaded.py.
Python:
import threading
import time
import torch
from torchvision import models, transforms
from PIL import Image
import cv2
# -----------------------------
# GLOBAL STATIC RESULT VARIABLE
# -----------------------------
latest_recognition = None
# -----------------------------
# DEVICE + MODEL (CUDA)
# -----------------------------
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = models.resnet50(weights=models.ResNet50_Weights.DEFAULT)
model = model.to(device)
model.eval()
labels = models.ResNet50_Weights.DEFAULT.meta["categories"]
# -----------------------------
# PREPROCESSING
# -----------------------------
preprocess = transforms.Compose([
transforms.Resize(256),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Normalize(
mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225]
)
])
# -----------------------------
# THREAD FUNCTION (RUNS ONCE)
# -----------------------------
def recognition_once_from_camera():
global latest_recognition
cap = cv2.VideoCapture(0)
ret, frame = cap.read()
cap.release()
if not ret:
latest_recognition = "Camera error"
return
img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
tensor = preprocess(img).unsqueeze(0).to(device)
with torch.no_grad():
outputs = model(tensor)
_, predicted = outputs.max(1)
latest_recognition = labels[predicted]
# -----------------------------
# WRAPPER FUNCTION (YOU CALL THIS)
# -----------------------------
def start_image_recognition():
t = threading.Thread(
target=recognition_once_from_camera,
daemon=True
)
t.start()
return t
# -----------------------------
# MAIN
# -----------------------------
if __name__ == "__main__":
print("Starting recognition…")
t = start_image_recognition()
while t.is_alive():
print("Waiting…")
time.sleep(0.1)
print("Recognition result:", latest_recognition)
Last edited: