Source

socket/handlers/physics.js

import { env } from "@config/env";
import { PhysicsGatewayService } from "@services/physics-gateway.service";
import { PhysicsCommandSchema, PhysicsCommandType } from "@typez/physics";
import { randomUUID } from "node:crypto";
/**
 * Setup physics handlers for a WebSocket client
 * @param {WebSocketClient} client - WebSocket client
 * @param {IPhysicsHandlerOptions} [options] - Optional overrides (e.g. physics host for dev-stage)
 */
export function setupPhysicsHandlers(client, options) {
    // Create a dedicated physics gateway service instance for this websocket connection
    const physicsGatewayService = new PhysicsGatewayService(client.connectionId, options?.physicsEngineHost);
    const subscriptions = new Set();
    // Forward only state.data to Unity (same shape as ZeroMQ before RabbitMQ wrapper)
    const callback = (state) => client.send("physics:state", state.data);
    client.on("physics:command", (data) => {
        void (async () => {
            // Generate correlation ID for this command
            const correlationId = randomUUID();
            // Log received raw command
            client.logger.info({ correlationId, command: data }, "Received physics command");
            const result = PhysicsCommandSchema.safeParse(data);
            if (!result.success) {
                const error = result.error.errors.map((e) => e.message).join(", ");
                client.logger.warn({ correlationId, error, command: data }, "Invalid physics command");
                client.send("physics:response", {
                    success: false,
                    type: PhysicsCommandType.GENERIC,
                    investigation_id: "",
                    data: null,
                    error,
                    error_code: "INVALID_COMMAND",
                    error_details: { receivedCommand: data, correlationId },
                });
                return;
            }
            const command = result.data;
            try {
                client.logger.info({
                    correlationId,
                    command,
                    commandType: command.type,
                    investigationId: command.investigationId,
                }, "Processing physics command");
                const response = await physicsGatewayService.sendCommand(command);
                client.logger.info({
                    correlationId,
                    command,
                    response,
                    commandType: command.type,
                    investigationId: command.investigationId,
                }, "Sending physics response");
                client.send("physics:response", response);
                // Subscribe on START
                if (command.type === PhysicsCommandType.START &&
                    response.success &&
                    !subscriptions.has(command.investigationId)) {
                    physicsGatewayService.subscribe(command.investigationId, callback);
                    subscriptions.add(command.investigationId);
                }
                // Unsubscribe on STOP/DESTROY
                if ((command.type === PhysicsCommandType.STOP ||
                    command.type === PhysicsCommandType.DESTROY) &&
                    response.success &&
                    subscriptions.has(command.investigationId)) {
                    physicsGatewayService.unsubscribe(command.investigationId, callback);
                    subscriptions.delete(command.investigationId);
                }
            }
            catch (error) {
                client.logger.error({ correlationId, error, command, commandType: command.type }, "Error processing physics command");
                client.send("physics:response", {
                    success: false,
                    type: command.type,
                    investigation_id: command.investigationId,
                    data: null,
                    error: error instanceof Error ? error.message : "Internal error",
                    error_code: "INTERNAL_ERROR",
                    error_details: { command, correlationId },
                });
            }
        })();
    });
    client.ws.on("close", (code, reason) => {
        client.logger.info({
            code,
            reason: reason?.toString("utf8"),
            connectionId: client.connectionId,
            activeSubscriptions: subscriptions.size,
        }, "Physics websocket closed");
        // Unsubscribe from state immediately so we don't send to a closed socket
        for (const investigationId of subscriptions) {
            physicsGatewayService.unsubscribe(investigationId, callback);
        }
        subscriptions.clear();
        const delayMs = env.PHYSICS_GATEWAY_CLEANUP_DELAY_MS;
        const doClose = () => {
            try {
                physicsGatewayService.close();
                client.logger.info({ connectionId: client.connectionId, delayMs }, "Physics gateway closed after delay");
            }
            catch (error) {
                client.logger.error({ error, connectionId: client.connectionId }, "Error closing physics gateway");
            }
        };
        if (delayMs <= 0) {
            doClose();
        }
        else {
            setTimeout(doClose, delayMs);
        }
    });
}