πŸ‘¨β€πŸ’» dev vrm

development

fukurou

the supreme coder
ADMIN
Python:
import asyncio
import threading
import json
import websockets
from flask import Flask, send_from_directory
import os

from Skill import Skill


class DiVrmController(Skill):
    def __init__(self):
        super().__init__()
        self.set_skill_type(1)   # regular skill
        self.set_skill_lobe(2)   # hardware skill

        # runtime state
        self._clients = set()
        self._ws_server = None
        self._flask_thread = None
        self._loop = None
        self._running = False

        # file root (folder where vrm_viewer.html and mixamo/ live)
        self._ROOT = os.path.dirname(os.path.abspath(__file__))

        # flask app
        self._app = Flask(__name__)
        self._app.add_url_rule("/", "index", self._serve_index)
        self._app.add_url_rule("/<path:filename>", "file", self._serve_file)

    # ───────────────────────────────────────────────
    # FLASK HANDLERS
    # ───────────────────────────────────────────────
    def _serve_index(self):
        return send_from_directory(self._ROOT, "vrm_viewer.html")

    def _serve_file(self, filename):
        return send_from_directory(self._ROOT, filename)

    def _run_flask(self):
        self._app.run(port=5500, debug=False, use_reloader=False)

    # ───────────────────────────────────────────────
    # WEBSOCKET HANDLER
    # ───────────────────────────────────────────────
    async def _ws_handler(self, websocket):
        self._clients.add(websocket)
        try:
            await websocket.wait_closed()
        finally:
            self._clients.discard(websocket)

    async def _send_command(self, action, path=None):
        if not self._clients:
            return

        msg = {"action": action}
        if path:
            msg["path"] = path.replace("\\", "/")

        data = json.dumps(msg)

        dead = []
        for ws in list(self._clients):
            try:
                await ws.send(data)
            except:
                dead.append(ws)

        for ws in dead:
            self._clients.discard(ws)

    # ───────────────────────────────────────────────
    # SKILL LIFECYCLE
    # ───────────────────────────────────────────────
    def manifest(self):
        """Start Flask + WebSocket servers."""
        if self._running:
            return

        self._running = True

        # Start Flask in background thread
        self._flask_thread = threading.Thread(
            target=self._run_flask,
            daemon=True
        )
        self._flask_thread.start()

        # Start asyncio loop in background thread
        def start_loop():
            self._loop = asyncio.new_event_loop()
            asyncio.set_event_loop(self._loop)
            self._loop.run_until_complete(self._start_ws_server())
            self._loop.run_forever()

        threading.Thread(target=start_loop, daemon=True).start()

    async def _start_ws_server(self):
        self._ws_server = await websockets.serve(
            self._ws_handler, "localhost", 8765
        )

    def ghost(self):
        """Shutdown servers cleanly."""
        self._running = False

        if self._loop:
            self._loop.call_soon_threadsafe(self._loop.stop)

    # ───────────────────────────────────────────────
    # INPUT: HARDWARE DRIVER BEHAVIOR
    # ───────────────────────────────────────────────
    def input(self, ear: str, skin: str, eye: str):
        # ear is either:
        # - "stop"
        # - "quit"/"exit"
        # - "<file>.fbx" (inside mixamo/)
        if not ear:
            return

        ear = ear.strip()

        # stop command
        if ear == "stop":
            if self._loop:
                asyncio.run_coroutine_threadsafe(
                    self._send_command("stop"),
                    self._loop
                )
            return

        # quit command
        if ear in ("quit", "exit"):
            self.ghost()
            return

        # otherwise treat ear as a filename under mixamo/
        # e.g. "Talking.fbx" β†’ mixamo/Talking.fbx
        mixamo_path = os.path.join(self._ROOT, "mixamo", ear)

        if os.path.isfile(mixamo_path):
            # send play command with relative path as browser expects
            if self._loop:
                asyncio.run_coroutine_threadsafe(
                    self._send_command("play", f"mixamo/{ear}"),
                    self._loop
                )
        else:
            # hardware skill: silently ignore invalid input
            pass

    # ───────────────────────────────────────────────
    def skillNotes(self, param: str) -> str:
        return "VRM animation controller hardware skill (plays mixamo/*.fbx via WebSocket)"
 

fukurou

the supreme coder
ADMIN
Python:
import asyncio
import threading
import json
import websockets
from flask import Flask, send_from_directory
import os
import time

# ───────────────────────────────────────────────
# HARDWARE SKILL
# ───────────────────────────────────────────────

class DiVrmController:
    def __init__(self):
        # hardware skill: no output, no speech
        self._clients = set()
        self._ws_server = None
        self._flask_thread = None
        self._loop = None
        self._running = False

        # root folder (where vrm_viewer.html + mixamo/ live)
        self._ROOT = os.path.dirname(os.path.abspath(__file__))

        # flask app
        self._app = Flask(__name__)
        self._app.add_url_rule("/", "index", self._serve_index)
        self._app.add_url_rule("/<path:filename>", "file", self._serve_file)

    # ───────────────────────────────────────────────
    # FLASK
    # ───────────────────────────────────────────────
    def _serve_index(self):
        return send_from_directory(self._ROOT, "vrm_viewer.html")

    def _serve_file(self, filename):
        return send_from_directory(self._ROOT, filename)

    def _run_flask(self):
        self._app.run(port=5500, debug=False, use_reloader=False)

    # ───────────────────────────────────────────────
    # WEBSOCKET
    # ───────────────────────────────────────────────
    async def _ws_handler(self, websocket):
        self._clients.add(websocket)
        try:
            await websocket.wait_closed()
        finally:
            self._clients.discard(websocket)

    async def _send_command(self, action, path=None):
        if not self._clients:
            return

        msg = {"action": action}
        if path:
            msg["path"] = path.replace("\\", "/")

        data = json.dumps(msg)

        dead = []
        for ws in list(self._clients):
            try:
                await ws.send(data)
            except:
                dead.append(ws)

        for ws in dead:
            self._clients.discard(ws)

    # ───────────────────────────────────────────────
    # LIFECYCLE
    # ───────────────────────────────────────────────
    def manifest(self):
        if self._running:
            return

        self._running = True

        # start flask
        self._flask_thread = threading.Thread(
            target=self._run_flask,
            daemon=True
        )
        self._flask_thread.start()

        # start asyncio loop
        def start_loop():
            self._loop = asyncio.new_event_loop()
            asyncio.set_event_loop(self._loop)
            self._loop.run_until_complete(self._start_ws_server())
            self._loop.run_forever()

        threading.Thread(target=start_loop, daemon=True).start()

    async def _start_ws_server(self):
        self._ws_server = await websockets.serve(
            self._ws_handler, "localhost", 8765
        )

    def ghost(self):
        self._running = False
        if self._loop:
            self._loop.call_soon_threadsafe(self._loop.stop)

    # ───────────────────────────────────────────────
    # INPUT (HARDWARE DRIVER)
    # ───────────────────────────────────────────────
    def input(self, ear: str):
        if not ear:
            return

        ear = ear.strip()

        # stop
        if ear == "stop":
            if self._loop:
                asyncio.run_coroutine_threadsafe(
                    self._send_command("stop"),
                    self._loop
                )
            return

        # quit
        if ear in ("quit", "exit"):
            self.ghost()
            return

        # full command: play mixamo/Talking.fbx
        if ear.startswith("play "):
            path = ear[5:].strip()  # remove "play "
            if self._loop:
                asyncio.run_coroutine_threadsafe(
                    self._send_command("play", path),
                    self._loop
                )
            return

        # filename only: Talking.fbx
        mixamo_path = os.path.join(self._ROOT, "mixamo", ear)
        if os.path.isfile(mixamo_path):
            if self._loop:
                asyncio.run_coroutine_threadsafe(
                    self._send_command("play", f"mixamo/{ear}"),
                    self._loop
                )
            return

        # ignore anything else silently (hardware skill)
        return


# ───────────────────────────────────────────────
# SIMPLE MAIN TEST
# ───────────────────────────────────────────────

if __name__ == "__main__":
    ctrl = DiVrmController()

    print("Starting VRM controller…")
    ctrl.manifest()

    # give servers time to start
    time.sleep(2)

    # EXACT TEST COMMAND YOU REQUESTED
    print("TEST: play mixamo/Talking.fbx")
    ctrl.input("play mixamo/Talking.fbx")

    time.sleep(5)

    print("Stopping…")
    ctrl.input("stop")

    time.sleep(1)

    print("Shutting down…")
    ctrl.ghost()

    print("Done.")
 

fukurou

the supreme coder
ADMIN
skill stage, delamein(DLC/main.py)
Python:
import asyncio
import threading
import json
import websockets
from flask import Flask, send_from_directory
import os
import time

# ───────────────────────────────────────────────
# HARDWARE SKILL
# ───────────────────────────────────────────────

class DiVrmController:
    def __init__(self):
        # hardware skill: no output, no speech
        self._clients = set()
        self._ws_server = None
        self._flask_thread = None
        self._loop = None
        self._running = False

        # ROOT = DLC/ folder (where this script lives)
        self._ROOT = os.path.dirname(os.path.abspath(__file__))

        # flask app
        self._app = Flask(__name__)
        self._app.add_url_rule("/", "index", self._serve_index)
        self._app.add_url_rule("/<path:filename>", "file", self._serve_file)

    # ───────────────────────────────────────────────
    # FLASK
    # ───────────────────────────────────────────────
    def _serve_index(self):
        return send_from_directory(self._ROOT, "vrm_viewer.html")

    def _serve_file(self, filename):
        return send_from_directory(self._ROOT, filename)

    def _run_flask(self):
        self._app.run(port=5500, debug=False, use_reloader=False)

    # ───────────────────────────────────────────────
    # WEBSOCKET
    # ───────────────────────────────────────────────
    async def _ws_handler(self, websocket):
        self._clients.add(websocket)
        try:
            await websocket.wait_closed()
        finally:
            self._clients.discard(websocket)

    async def _send_command(self, action, path=None):
        if not self._clients:
            return

        msg = {"action": action}
        if path:
            msg["path"] = path.replace("\\", "/")

        data = json.dumps(msg)

        dead = []
        for ws in list(self._clients):
            try:
                await ws.send(data)
            except:
                dead.append(ws)

        for ws in dead:
            self._clients.discard(ws)

    # ───────────────────────────────────────────────
    # LIFECYCLE
    # ───────────────────────────────────────────────
    def manifest(self):
        if self._running:
            return

        self._running = True

        # start flask
        self._flask_thread = threading.Thread(
            target=self._run_flask,
            daemon=True
        )
        self._flask_thread.start()

        # start asyncio loop
        def start_loop():
            self._loop = asyncio.new_event_loop()
            asyncio.set_event_loop(self._loop)
            self._loop.run_until_complete(self._start_ws_server())
            self._loop.run_forever()

        threading.Thread(target=start_loop, daemon=True).start()

    async def _start_ws_server(self):
        self._ws_server = await websockets.serve(
            self._ws_handler, "localhost", 8765
        )

    def ghost(self):
        self._running = False
        if self._loop:
            self._loop.call_soon_threadsafe(self._loop.stop)

    # ───────────────────────────────────────────────
    # INPUT (HARDWARE DRIVER)
    # ───────────────────────────────────────────────
    def input(self, ear: str):
        if not ear:
            return

        cmd = ear.strip()

        # stop
        if cmd == "stop":
            if self._loop:
                asyncio.run_coroutine_threadsafe(
                    self._send_command("stop"),
                    self._loop
                )
            return

        # full command: play mixamo/Talking.fbx
        if cmd.startswith("play "):
            path = ear[5:].strip()  # remove "play "
            if self._loop:
                asyncio.run_coroutine_threadsafe(
                    self._send_command("play", path),
                    self._loop
                )
            return

        # looping command: loopplay mixamo/Idle.fbx
        if cmd.startswith("loopplay "):
            path = ear[9:].strip()  # remove "loopplay "
            if self._loop:
                asyncio.run_coroutine_threadsafe(
                    self._send_command("loopplay", path),
                    self._loop
                )
            return

        # filename only: Talking.fbx  β†’  play once
        mixamo_path = os.path.join(self._ROOT, "mixamo", cmd)
        if os.path.isfile(mixamo_path):
            if self._loop:
                asyncio.run_coroutine_threadsafe(
                    self._send_command("play", f"mixamo/{cmd}"),
                    self._loop
                )
            return

        # loop:Talking.fbx  β†’  loop
        if cmd.startswith("loop:"):
            filename = cmd[5:]
            mixamo_loop_path = os.path.join(self._ROOT, "mixamo", filename)
            if os.path.isfile(mixamo_loop_path):
                if self._loop:
                    asyncio.run_coroutine_threadsafe(
                        self._send_command("loopplay", f"mixamo/{filename}"),
                        self._loop
                    )
            return

        # ignore anything else silently (hardware skill)
        return


# ───────────────────────────────────────────────
# SIMPLE MAIN TEST
# ───────────────────────────────────────────────

if __name__ == "__main__":
    ctrl = DiVrmController()

    print("Starting VRM controller…")
    ctrl.manifest()

    # give servers time to start
    time.sleep(2)

    print("TEST: loopplay mixamo/Idle.fbx")
    ctrl.input("loopplay mixamo/Idle.fbx")

    time.sleep(4)

    print("TEST: play mixamo/Thinking.fbx")
    ctrl.input("play mixamo/Thinking.fbx")

    time.sleep(5)

    print("Stopping…")
    ctrl.input("stop")

    time.sleep(1)

    print("Shutting down…")
    ctrl.ghost()

    print("Done.")
 

fukurou

the supreme coder
ADMIN
Python:
import asyncio
import threading
import json
import websockets
from flask import Flask, send_from_directory
import os
import time

from LivinGrimoirePacket.AXPython import Responder
from LivinGrimoirePacket.LivinGrimoire import Skill, Brain


# ───────────────────────────────────────────────
# HARDWARE SKILL
# ───────────────────────────────────────────────

class DiVrmController(Skill):
    def __init__(self,brain:Brain):
        super().__init__()
        # hardware skill: no output, no speech
        self._clients = set()
        self._ws_server = None
        self._flask_thread = None
        self._loop = None
        self._running = False

        # ROOT = DLC/ folder (where this script lives)
        self._ROOT = os.path.dirname(os.path.abspath(__file__))

        # flask app
        self._app = Flask(__name__)
        self._app.add_url_rule("/", "index", self._serve_index)
        self._app.add_url_rule("/<path:filename>", "file", self._serve_file)

        self.brain = brain
        self.last_cmd = "null"
        self.last_emotion = "null"

        self.nxt_idler: Responder = Responder("loopplay mixamo/Thinking.fbx","loopplay mixamo/Idle.fbx","loopplay mixamo/Thinking.fbx")

    # ───────────────────────────────────────────────
    # FLASK
    # ───────────────────────────────────────────────
    def _serve_index(self):
        return send_from_directory(self._ROOT, "vrm_viewer.html")

    def _serve_file(self, filename):
        return send_from_directory(self._ROOT, filename)

    def _run_flask(self):
        self._app.run(port=5500, debug=False, use_reloader=False)

    # ───────────────────────────────────────────────
    # WEBSOCKET
    # ───────────────────────────────────────────────
    async def _ws_handler(self, websocket):
        self._clients.add(websocket)
        try:
            await websocket.wait_closed()
        finally:
            self._clients.discard(websocket)

    async def _send_command(self, action, path=None):
        if not self._clients:
            return

        msg = {"action": action}
        if path:
            msg["path"] = path.replace("\\", "/")

        data = json.dumps(msg)

        dead = []
        for ws in list(self._clients):
            try:
                await ws.send(data)
            except:
                dead.append(ws)

        for ws in dead:
            self._clients.discard(ws)

    # ───────────────────────────────────────────────
    # LIFECYCLE
    # ───────────────────────────────────────────────
    def manifest(self):
        if self._running:
            return

        self._running = True

        # start flask
        self._flask_thread = threading.Thread(
            target=self._run_flask,
            daemon=True
        )
        self._flask_thread.start()

        # start asyncio loop
        def start_loop():
            self._loop = asyncio.new_event_loop()
            asyncio.set_event_loop(self._loop)
            self._loop.run_until_complete(self._start_ws_server())
            self._loop.run_forever()

        threading.Thread(target=start_loop, daemon=True).start()

    async def _start_ws_server(self):
        self._ws_server = await websockets.serve(
            self._ws_handler, "localhost", 8765
        )

    def ghost(self):
        self._running = False
        if self._loop:
            self._loop.call_soon_threadsafe(self._loop.stop)

    def cmd_extract(self,ear)->str:
        ap = self.brain.getEmotion()

        match ap:
            case "APHappy":
                # TODO add animations
                self.last_emotion = ap
                self.last_cmd = "happy"
                return ""
        # match self.getKokoro().toHeart[self.skill_name]:
        #     # TODO add animations
        #     case "dance1":
        #         self.last_cmd = "dance1"
        #         return ""
        if len(ear)>0:
            # TODO expand per last emotion
            self.last_cmd = "play mixamo/Talking.fbx"
            return self.last_cmd
        nxt = self.nxt_idler.getAResponse()
        if not self.last_cmd == nxt:
            # TODO expand with more animations
            self.last_cmd = nxt
            return self.last_cmd
        return ""



    # ───────────────────────────────────────────────
    # INPUT (HARDWARE DRIVER)
    # ───────────────────────────────────────────────
    def input(self, ear: str, skin: str, eye: str):

        cmd = self.cmd_extract(ear)

        # stop
        if cmd == "stop":
            if self._loop:
                asyncio.run_coroutine_threadsafe(
                    self._send_command("stop"),
                    self._loop
                )
            return

        # full command: play mixamo/Talking.fbx
        if cmd.startswith("play "):
            path = cmd[5:].strip()  # remove "play "
            if self._loop:
                asyncio.run_coroutine_threadsafe(
                    self._send_command("play", path),
                    self._loop
                )
            return

        # looping command: loopplay mixamo/Idle.fbx
        if cmd.startswith("loopplay "):
            path = cmd[9:].strip()  # remove "loopplay "
            if self._loop:
                asyncio.run_coroutine_threadsafe(
                    self._send_command("loopplay", path),
                    self._loop
                )
            return

        # filename only: Talking.fbx  β†’  play once
        mixamo_path = os.path.join(self._ROOT, "mixamo", cmd)
        if os.path.isfile(mixamo_path):
            if self._loop:
                asyncio.run_coroutine_threadsafe(
                    self._send_command("play", f"mixamo/{cmd}"),
                    self._loop
                )
            return

        # loop:Talking.fbx  β†’  loop
        if cmd.startswith("loop:"):
            filename = cmd[5:]
            mixamo_loop_path = os.path.join(self._ROOT, "mixamo", filename)
            if os.path.isfile(mixamo_loop_path):
                if self._loop:
                    asyncio.run_coroutine_threadsafe(
                        self._send_command("loopplay", f"mixamo/{filename}"),
                        self._loop
                    )
            return

        # ignore anything else silently (hardware skill)
        return


# ───────────────────────────────────────────────
# SIMPLE MAIN TEST
# ───────────────────────────────────────────────

if __name__ == "__main__":
    ctrl = DiVrmController(Brain())

    print("Starting VRM controller…")
    ctrl.manifest()
    print("plays idle:")
    ctrl.input("", "", "")

    # give servers time to start
    time.sleep(4)
    print("should play talking")
    ctrl.input("mic chk 1 2","","")

    time.sleep(8)

    print("plays idle")
    ctrl.input("", "", "")

    time.sleep(8)
    print("should play talking")
    ctrl.input("mic chk 1 2","","")
    time.sleep(3)

    # print("Stopping…")
    # ctrl.input("stop","","")

    time.sleep(1)

    print("Shutting down…")
    ctrl.ghost()

    print("Done.")
 

fukurou

the supreme coder
ADMIN
thread save digivolved code:
Python:
import asyncio
import threading
import json
import websockets
from flask import Flask, send_from_directory
import os
import time

from LivinGrimoirePacket.AXPython import Responder
from LivinGrimoirePacket.LivinGrimoire import Skill, Brain


class DiVrmController(Skill):
    def __init__(self, brain: Brain):
        super().__init__()
        self._clients = set()
        self._clients_lock = threading.Lock()
        self._ws_server = None
        self._flask_thread = None
        self._loop = None
        self._running = False
        self._ROOT = os.path.dirname(os.path.abspath(__file__))

        self._app = Flask(__name__)
        self._app.add_url_rule("/", "index", self._serve_index)
        self._app.add_url_rule("/<path:filename>", "file", self._serve_file)

        self.brain = brain
        self.last_cmd = "null"
        self.last_emotion = "null"

        self.nxt_idler = Responder(
            "loopplay mixamo/Thinking.fbx",
            "loopplay mixamo/Idle.fbx",
            "loopplay mixamo/Thinking.fbx"
        )

    def _serve_index(self):
        return send_from_directory(self._ROOT, "vrm_viewer.html")

    def _serve_file(self, filename):
        return send_from_directory(self._ROOT, filename)

    def _run_flask(self):
        self._app.run(port=5500, debug=False, use_reloader=False)

    async def _ws_handler(self, websocket):
        with self._clients_lock:
            self._clients.add(websocket)
        try:
            await websocket.wait_closed()
        finally:
            with self._clients_lock:
                self._clients.discard(websocket)

    async def _send_command(self, action, path=None):
        with self._clients_lock:
            if not self._clients:
                return

        msg = {"action": action}
        if path:
            msg["path"] = path.replace("\\", "/")

        data = json.dumps(msg)

        with self._clients_lock:
            dead = []
            for ws in list(self._clients):
                try:
                    await ws.send(data)
                except:
                    dead.append(ws)

            for ws in dead:
                self._clients.discard(ws)

    def manifest(self):
        if self._running:
            return

        self._running = True

        self._flask_thread = threading.Thread(target=self._run_flask, daemon=True)
        self._flask_thread.start()

        def start_loop():
            self._loop = asyncio.new_event_loop()
            asyncio.set_event_loop(self._loop)
            self._loop.run_until_complete(self._start_ws_server())
            self._loop.run_forever()

        threading.Thread(target=start_loop, daemon=True).start()

    async def _start_ws_server(self):
        self._ws_server = await websockets.serve(self._ws_handler, "localhost", 8765)

    def ghost(self):
        self._running = False
        if self._loop:
            self._loop.call_soon_threadsafe(self._loop.stop)

    def cmd_extract(self, ear) -> str:
        ap = self.brain.getEmotion()

        match ap:
            case "APHappy":
                self.last_emotion = ap
                self.last_cmd = "happy"
                return ""

        # Your toHeart scaffolding preserved
        # if self.getKokoro().toHeart[self.skill_name]:
        #     match self.getKokoro().toHeart[self.skill_name]:
        #         case "dance1":
        #             self.last_cmd = "dance1"
        #             return ""

        if len(ear) > 0:
            self.last_cmd = "play mixamo/Talking.fbx"
            return self.last_cmd

        nxt = self.nxt_idler.getAResponse()
        if not self.last_cmd == nxt:
            self.last_cmd = nxt
            return self.last_cmd

        return ""

    def input(self, ear: str, skin: str, eye: str):
        cmd = self.cmd_extract(ear)

        if cmd == "stop":
            if self._loop:
                asyncio.run_coroutine_threadsafe(
                    self._send_command("stop"),
                    self._loop
                )
            return

        if cmd.startswith("play "):
            path = cmd[5:].strip()
            if self._loop:
                asyncio.run_coroutine_threadsafe(
                    self._send_command("play", path),
                    self._loop
                )
            return

        if cmd.startswith("loopplay "):
            path = cmd[9:].strip()
            if self._loop:
                asyncio.run_coroutine_threadsafe(
                    self._send_command("loopplay", path),
                    self._loop
                )
            return

        mixamo_path = os.path.join(self._ROOT, "mixamo", cmd)
        if os.path.isfile(mixamo_path):
            if self._loop:
                asyncio.run_coroutine_threadsafe(
                    self._send_command("play", f"mixamo/{cmd}"),
                    self._loop
                )
            return

        if cmd.startswith("loop:"):
            filename = cmd[5:]
            mixamo_loop_path = os.path.join(self._ROOT, "mixamo", filename)
            if os.path.isfile(mixamo_loop_path):
                if self._loop:
                    asyncio.run_coroutine_threadsafe(
                        self._send_command("loopplay", f"mixamo/{filename}"),
                        self._loop
                    )
            return


if __name__ == "__main__":
    ctrl = DiVrmController(Brain())

    print("Starting VRM controller…")
    ctrl.manifest()
    print("plays idle:")
    ctrl.input("", "", "")

    time.sleep(4)
    print("should play talking")
    ctrl.input("mic chk 1 2", "", "")

    time.sleep(8)
    print("plays idle")
    ctrl.input("", "", "")

    time.sleep(8)
    print("should play talking")
    ctrl.input("mic chk 1 2", "", "")
    time.sleep(3)

    time.sleep(1)
    print("Shutting down…")
    ctrl.ghost()
    print("Done.")
 

fukurou

the supreme coder
ADMIN
Python:
import asyncio
import threading
import json
import websockets
from flask import Flask, send_from_directory
import os
import time

from LivinGrimoirePacket.AXPython import Responder
from LivinGrimoirePacket.LivinGrimoire import Skill, Brain


class DiVrmController(Skill):
    def __init__(self, brain: Brain):
        super().__init__()
        self._clients = set()
        self._clients_lock = threading.Lock()
        self._ws_server = None
        self._flask_thread = None
        self._loop = None
        self._running = False
        self._ROOT = os.path.dirname(os.path.abspath(__file__))

        self._app = Flask(__name__)
        self._app.add_url_rule("/", "index", self._serve_index)
        self._app.add_url_rule("/<path:filename>", "file", self._serve_file)

        self.brain = brain
        self.last_cmd = "null"

        self.nxt_idler = Responder(
            "loopplay mixamo/Thinking.fbx",
            "loopplay mixamo/Idle.fbx",
            "loopplay mixamo/Thinking.fbx"
        )

    def _serve_index(self):
        return send_from_directory(self._ROOT, "vrm_viewer.html")

    def _serve_file(self, filename):
        return send_from_directory(self._ROOT, filename)

    def _run_flask(self):
        self._app.run(port=5500, debug=False, use_reloader=False)

    async def _ws_handler(self, websocket):
        with self._clients_lock:
            self._clients.add(websocket)
        try:
            await websocket.wait_closed()
        finally:
            with self._clients_lock:
                self._clients.discard(websocket)

    async def _send_command(self, action, path=None):
        with self._clients_lock:
            if not self._clients:
                return

        msg = {"action": action}
        if path:
            msg["path"] = path.replace("\\", "/")

        data = json.dumps(msg)

        with self._clients_lock:
            dead = []
            for ws in list(self._clients):
                try:
                    await ws.send(data)
                except:
                    dead.append(ws)

            for ws in dead:
                self._clients.discard(ws)

    def manifest(self):
        if self._running:
            return

        self._running = True

        self._flask_thread = threading.Thread(target=self._run_flask, daemon=True)
        self._flask_thread.start()

        def start_loop():
            self._loop = asyncio.new_event_loop()
            asyncio.set_event_loop(self._loop)
            self._loop.run_until_complete(self._start_ws_server())
            self._loop.run_forever()

        threading.Thread(target=start_loop, daemon=True).start()

    async def _start_ws_server(self):
        self._ws_server = await websockets.serve(self._ws_handler, "localhost", 8765)

    def ghost(self):
        self._running = False
        if self._loop:
            self._loop.call_soon_threadsafe(self._loop.stop)

    def cmd_extract(self, ear) -> str:
        ap = self.brain.getEmotion()
        # custom animation
        # you can rename alg part object class name to return animation commands
        if not ap == "APVerbatim" and len(ap)>0:
            self.last_cmd = ap
            return ap
        # talk animation
        if len(ear) > 0:
            self.last_cmd = "play mixamo/Talking.fbx"
            return self.last_cmd
        # rnd idle animations
        nxt = self.nxt_idler.getAResponse()
        if not self.last_cmd == nxt:
            self.last_cmd = nxt
            return self.last_cmd

        return ""

    def input(self, ear: str, skin: str, eye: str):
        cmd = self.cmd_extract(ear)

        if cmd == "stop":
            if self._loop:
                asyncio.run_coroutine_threadsafe(
                    self._send_command("stop"),
                    self._loop
                )
            return

        if cmd.startswith("play "):
            path = cmd[5:].strip()
            if self._loop:
                asyncio.run_coroutine_threadsafe(
                    self._send_command("play", path),
                    self._loop
                )
            return

        if cmd.startswith("loopplay "):
            path = cmd[9:].strip()
            if self._loop:
                asyncio.run_coroutine_threadsafe(
                    self._send_command("loopplay", path),
                    self._loop
                )
            return

        mixamo_path = os.path.join(self._ROOT, "mixamo", cmd)
        if os.path.isfile(mixamo_path):
            if self._loop:
                asyncio.run_coroutine_threadsafe(
                    self._send_command("play", f"mixamo/{cmd}"),
                    self._loop
                )
            return

        if cmd.startswith("loop:"):
            filename = cmd[5:]
            mixamo_loop_path = os.path.join(self._ROOT, "mixamo", filename)
            if os.path.isfile(mixamo_loop_path):
                if self._loop:
                    asyncio.run_coroutine_threadsafe(
                        self._send_command("loopplay", f"mixamo/{filename}"),
                        self._loop
                    )
            return


if __name__ == "__main__":
    ctrl = DiVrmController(Brain())

    print("Starting VRM controller…")
    ctrl.manifest()
    print("plays idle:")
    ctrl.input("", "", "")

    time.sleep(4)
    print("should play talking")
    ctrl.input("mic chk 1 2", "", "")

    time.sleep(8)
    print("plays idle")
    ctrl.input("", "", "")
    time.sleep(5)
    print("plays idle")
    ctrl.input("", "", "")

    time.sleep(8)
    print("should play talking")
    ctrl.input("mic chk 1 2", "", "")
    time.sleep(3)

    time.sleep(1)
    print("Shutting down…")
    ctrl.ghost()
    print("Done.")
 
Top