Source

services/physics-gateway.service.js

/**
 * Physics Gateway Service
 *
 * Serialization (one command at a time) is at RabbitMQ level: Chrono must consume from
 * physics_engine_commands with prefetch(1) or a single worker. Then "waiting" = messages in the
 * queue; "running" = the one unacked message Chrono is processing. No Node-side queue needed.
 * @module services/physics-gateway
 */
import { env } from "@config/env";
import { encode, decode } from "@msgpack/msgpack";
import { PhysicsCommandType } from "@typez/physics";
import { getLogger } from "@utils/asyncLocalStorage";
import amqp from "amqplib";
import { randomUUID } from "crypto";
/**
 * Type guard to check if error has system properties
 * @param {unknown} error - The error to check
 * @returns {boolean} True if error has system properties
 */
function isSystemError(error) {
    return (error instanceof Error &&
        (typeof error.code === "string" ||
            typeof error.errno === "number" ||
            typeof error.syscall === "string"));
}
/**
 * Chrono / physics engine contract:
 * - Commands: consume from physics_engine_commands with prefetch(1) so only one command runs at a time (waiting = queued, running = unacked). Reply to replyTo with correlationId; include connection_id in payload.
 * - State: publish to exchange physics_engine_broadcast (topic) with routing key state.{connection_id}.{investigation_id}; backend forwards to WebSocket (physics:state).
 */
/** Topic prefix for state broadcasts (routing key: state.{connection_id}.{investigation_id}) */
const STATE_TOPIC_PREFIX = "state.";
/** Receive timeout in ms */
const RECEIVE_TIMEOUT_MS = 30000;
/**
 * Find invalid entries (null/undefined) inside batch commands payload
 * @param {object} payload - Physics command payload
 * @returns {{index: number, value: unknown} | null} Invalid entry details
 */
function findInvalidBatchCommandEntry(payload) {
    if (!payload || typeof payload !== "object") {
        return null;
    }
    const batchCommand = payload?.command;
    if (!batchCommand || batchCommand.op !== "batch" || !Array.isArray(batchCommand.commands)) {
        return null;
    }
    for (let i = 0; i < batchCommand.commands.length; i++) {
        const entry = batchCommand.commands[i];
        if (!entry || typeof entry !== "object") {
            return { index: i, value: entry };
        }
    }
    return null;
}
/**
 * Sanitize batch payload (remove null/undefined entries)
 * @param {object} payload - Physics command payload
 * @returns {object} Sanitization metadata (removedIndices/originalCount/sanitizedCount)
 */
function sanitizeBatchPayload(payload) {
    if (!payload || typeof payload !== "object") {
        return { removedIndices: [], originalCount: null, sanitizedCount: null };
    }
    const batchPayload = payload;
    const originalCommand = batchPayload.command;
    if (!originalCommand ||
        originalCommand.op !== "batch" ||
        !Array.isArray(originalCommand.commands)) {
        return { removedIndices: [], originalCount: null, sanitizedCount: null };
    }
    const removedIndices = [];
    const originalCount = originalCommand.commands.length;
    const sanitizedCommands = originalCommand.commands.filter((entry, index) => {
        const isValid = entry && typeof entry === "object";
        if (!isValid) {
            removedIndices.push(index);
        }
        return isValid;
    });
    if (removedIndices.length === 0) {
        return { removedIndices: [], originalCount, sanitizedCount: originalCount };
    }
    batchPayload.command = {
        ...originalCommand,
        commands: sanitizedCommands,
    };
    return {
        removedIndices,
        originalCount,
        sanitizedCount: sanitizedCommands.length,
    };
}
/**
 * Physics Gateway Service (RabbitMQ)
 */
export class PhysicsGatewayService {
    commandChannel = null;
    subscribeChannel = null;
    replyQueueName = null;
    stateConsumerQueueName = null;
    stateCallbacks = new Map();
    pendingRequests = new Map();
    isConnected = false;
    isSubscribed = false;
    shouldStopSubscription = false;
    subscriptionReconnectAttempts = 0;
    connectionReconnectAttempts = 0;
    isClosing = false;
    reconnecting = false;
    connectionId;
    amqpConnection = null;
    amqpUrl;
    commandQueue;
    replyQueueNameBase;
    stateExchange;
    /** Base name for state queue; per-connection queue is {stateQueueNameBase}_{connectionId} (exclusive) */
    stateQueueNameBase;
    /**
     * Creates a new physics gateway service instance
     * @param {string} connectionId - Unique connection ID (from websocket)
     * @param {string} [physicsEngineHostOverride] - Optional host override (e.g. for dev-stage); when not set, uses PHYSICS_ENGINE_HOST from env
     */
    constructor(connectionId, physicsEngineHostOverride) {
        const physicsHost = physicsEngineHostOverride ?? env.PHYSICS_ENGINE_HOST;
        const user = encodeURIComponent(env.PHYSICS_ENGINE_AMQP_USER);
        const password = encodeURIComponent(env.PHYSICS_ENGINE_AMQP_PASSWORD);
        this.amqpUrl = `amqp://${user}:${password}@${physicsHost}:${env.PHYSICS_ENGINE_AMQP_PORT}`;
        this.commandQueue = env.PHYSICS_ENGINE_COMMAND_QUEUE;
        this.replyQueueNameBase = env.PHYSICS_ENGINE_REPLY_QUEUE;
        this.stateExchange = env.PHYSICS_ENGINE_STATE_EXCHANGE;
        this.stateQueueNameBase = env.PHYSICS_ENGINE_STATE_QUEUE;
        this.connectionId = connectionId;
        const logger = getLogger();
        logger.info({
            connectionId: this.connectionId,
            host: physicsHost,
            commandQueue: this.commandQueue,
            stateExchange: this.stateExchange,
        }, "Created new PhysicsGatewayService instance (RabbitMQ)");
    }
    /**
     * Build composite key from connection ID and investigation ID
     * @param {string} investigationId - Investigation ID
     * @returns {string} Composite key in format "connectionId:investigationId"
     */
    buildCompositeKey(investigationId) {
        return `${this.connectionId}:${investigationId}`;
    }
    /**
     * Build RabbitMQ routing key for state (state.{connection_id}.{investigation_id})
     * @param {string} compositeKey - connectionId:investigationId
     * @returns {string} Routing key "state.connectionId.investigationId"
     */
    stateRoutingKey(compositeKey) {
        return `${STATE_TOPIC_PREFIX}${compositeKey.replace(":", ".")}`;
    }
    /**
     * Parse routing key state.{connection_id}.{investigation_id} to compositeKey (connectionId:investigationId)
     * @param {string} routingKey - RabbitMQ routing key (e.g. state.connId.invId)
     * @returns {string} Composite key connectionId:investigationId
     */
    routingKeyToCompositeKey(routingKey) {
        const withoutPrefix = routingKey.slice(STATE_TOPIC_PREFIX.length);
        const dotIdx = withoutPrefix.indexOf(".");
        if (dotIdx === -1)
            return withoutPrefix;
        const connectionId = withoutPrefix.slice(0, dotIdx);
        const investigationId = withoutPrefix.slice(dotIdx + 1);
        return `${connectionId}:${investigationId}`;
    }
    /**
     * Attach connection lifecycle handlers (close/error -> reconnect)
     * @param {object} logger - Logger instance for connection lifecycle events
     */
    attachConnectionHandlers(logger) {
        if (!this.amqpConnection?.connection)
            return;
        const conn = this.amqpConnection.connection;
        conn.on("error", (err) => {
            logger.error({ error: err, connectionId: this.connectionId }, "RabbitMQ connection error - will attempt reconnect");
            this.connectionReconnectAttempts = 0;
            this.markConnectionLost();
            this.scheduleConnectionReconnect(logger);
        });
        conn.on("close", () => {
            logger.warn({ connectionId: this.connectionId }, "RabbitMQ connection closed - will attempt reconnect");
            this.markConnectionLost();
            this.scheduleConnectionReconnect(logger);
        });
    }
    /** Mark command path as disconnected; clear channel/queue refs, keep connection ref for now */
    markConnectionLost() {
        this.isConnected = false;
        this.commandChannel = null;
        this.replyQueueName = null;
        this.subscribeChannel = null;
        this.stateConsumerQueueName = null;
        this.isSubscribed = false;
        this.amqpConnection = null;
    }
    /**
     * Schedule a single reconnection attempt (debounced by reconnecting flag)
     * @param {object} logger - Logger instance for reconnection events
     */
    scheduleConnectionReconnect(logger) {
        if (this.isClosing || this.reconnecting)
            return;
        this.reconnecting = true;
        void this.attemptConnectionReconnect(logger);
    }
    /**
     * Reconnect after connection/channel loss (e.g. user internet drop). Retries with backoff.
     * @param {object} logger - Logger instance for reconnection attempts
     */
    async attemptConnectionReconnect(logger) {
        if (this.isClosing) {
            this.reconnecting = false;
            return;
        }
        this.connectionReconnectAttempts++;
        const maxAttempts = env.PHYSICS_GATEWAY_RECONNECT_ATTEMPTS;
        const baseDelayMs = env.PHYSICS_GATEWAY_RECONNECT_DELAY_MS;
        if (this.connectionReconnectAttempts > maxAttempts) {
            logger.error({ attempts: this.connectionReconnectAttempts, connectionId: this.connectionId }, "Max physics gateway reconnection attempts reached, giving up");
            this.reconnecting = false;
            return;
        }
        const delayMs = Math.min(baseDelayMs * Math.pow(1.5, this.connectionReconnectAttempts - 1), 30_000);
        logger.warn({
            attempt: this.connectionReconnectAttempts,
            maxAttempts,
            delayMs,
            connectionId: this.connectionId,
        }, "Attempting physics gateway reconnection");
        await new Promise((resolve) => setTimeout(resolve, delayMs));
        if (this.isClosing) {
            this.reconnecting = false;
            return;
        }
        try {
            this.amqpConnection = null;
            await this.connect();
            this.connectionReconnectAttempts = 0;
            if (this.stateCallbacks.size > 0 && !this.shouldStopSubscription) {
                await this.startSubscription(logger);
            }
            logger.info({ connectionId: this.connectionId }, "Physics gateway reconnected successfully");
        }
        catch (err) {
            logger.error({ error: err, connectionId: this.connectionId }, "Physics gateway reconnect failed, will retry");
            this.markConnectionLost();
            this.scheduleConnectionReconnect(logger);
        }
        finally {
            this.reconnecting = false;
        }
    }
    /**
     * Connect to RabbitMQ and set up command channel + reply queue
     */
    async connect() {
        const logger = getLogger();
        if (this.isConnected && this.commandChannel && this.replyQueueName) {
            logger.debug("Physics gateway already connected");
            return;
        }
        try {
            logger.debug({ connectionId: this.connectionId, commandQueue: this.commandQueue }, "Connecting to physics engine (RabbitMQ)...");
            if (!this.amqpConnection) {
                this.amqpConnection = await amqp.connect(this.amqpUrl, {
                    heartbeat: env.PHYSICS_ENGINE_HEARTBEAT,
                });
                this.attachConnectionHandlers(logger);
            }
            this.commandChannel = await this.amqpConnection.createChannel();
            this.commandChannel.on("error", (err) => {
                logger.error({ error: err, connectionId: this.connectionId }, "Physics command channel error");
                this.isConnected = false;
                this.commandChannel = null;
                this.replyQueueName = null;
                this.scheduleConnectionReconnect(logger);
            });
            this.commandChannel.on("close", () => {
                logger.warn({ connectionId: this.connectionId }, "Physics command channel closed");
                this.isConnected = false;
                this.commandChannel = null;
                this.replyQueueName = null;
                if (this.amqpConnection) {
                    this.scheduleConnectionReconnect(logger);
                }
            });
            const replyQueueName = `${this.replyQueueNameBase}_${this.connectionId}`;
            await this.commandChannel.assertQueue(replyQueueName, { exclusive: true });
            this.replyQueueName = replyQueueName;
            await this.commandChannel.consume(this.replyQueueName, (msg) => {
                if (!msg)
                    return;
                const correlationId = msg.properties.correlationId;
                const pending = correlationId ? this.pendingRequests.get(correlationId) : undefined;
                if (pending && correlationId) {
                    clearTimeout(pending.timeoutId);
                    this.pendingRequests.delete(correlationId);
                    try {
                        const decoded = decode(msg.content);
                        pending.resolve(decoded);
                    }
                    catch (e) {
                        pending.resolve({
                            success: false,
                            type: PhysicsCommandType.GENERIC,
                            investigation_id: "",
                            data: null,
                            error: e instanceof Error ? e.message : "Failed to decode physics response",
                            error_code: "INTERNAL_ERROR",
                            error_details: null,
                        });
                    }
                }
            }, { noAck: true });
            this.isConnected = true;
            logger.info({
                replyQueue: this.replyQueueName,
                connectionId: this.connectionId,
            }, "Connected to physics command (RabbitMQ)");
        }
        catch (error) {
            const errorDetails = {
                message: error instanceof Error ? error.message : String(error),
                stack: error instanceof Error ? error.stack : undefined,
                name: error instanceof Error ? error.name : undefined,
                code: isSystemError(error) ? error.code : undefined,
                errno: isSystemError(error) ? error.errno : undefined,
                syscall: isSystemError(error) ? error.syscall : undefined,
            };
            logger.error({
                error: errorDetails,
                amqpUrl: this.amqpUrl.replace(/:[^:@]+@/, ":***@"),
                connectionId: this.connectionId,
            }, "Failed to connect to physics service (RabbitMQ)");
            throw error;
        }
    }
    /**
     * Subscribe to state broadcasts for a specific investigation
     * @param {string} investigationId - The investigation ID to subscribe to
     * @param {StateCallback} callback - Callback function for state updates
     */
    subscribe(investigationId, callback) {
        const logger = getLogger();
        const compositeKey = this.buildCompositeKey(investigationId);
        if (!this.stateCallbacks.has(compositeKey)) {
            this.stateCallbacks.set(compositeKey, new Set());
            logger.info({ compositeKey, connectionId: this.connectionId, investigationId }, "Created new subscription callback set");
        }
        const callbackSet = this.stateCallbacks.get(compositeKey);
        const sizeBefore = callbackSet.size;
        callbackSet.add(callback);
        logger.info({
            compositeKey,
            connectionId: this.connectionId,
            investigationId,
            callbacksBefore: sizeBefore,
            callbacksAfter: callbackSet.size,
            totalSubscriptions: this.stateCallbacks.size,
            isSubscribed: this.isSubscribed,
        }, "Added callback to physics state subscription");
        if (!this.isSubscribed) {
            logger.info({ connectionId: this.connectionId, compositeKey }, "Starting subscription channel");
            void this.startSubscription(logger).catch((err) => {
                logger.error({ err, connectionId: this.connectionId }, "Failed to start physics subscription");
            });
        }
        else if (this.subscribeChannel) {
            const routingKey = this.stateRoutingKey(compositeKey);
            logger.debug({ routingKey, compositeKey, connectionId: this.connectionId }, "Binding additional routing key to existing subscription");
            void this.subscribeChannel
                .bindQueue(this.stateConsumerQueueName, this.stateExchange, routingKey)
                .then(() => {
                logger.info({ routingKey, compositeKey, connectionId: this.connectionId, investigationId }, "Bound physics state routing key");
            })
                .catch((err) => {
                logger.error({ err, routingKey, compositeKey, connectionId: this.connectionId }, "Failed to bind state routing key");
            });
        }
        else {
            logger.warn({ connectionId: this.connectionId, compositeKey, isSubscribed: this.isSubscribed }, "isSubscribed is true but subscribeChannel is null - subscription may be broken");
        }
    }
    /**
     * Start subscription channel and consumer
     * @param {object} logger - Logger instance (from getLogger())
     * @returns {Promise<void>}
     */
    async startSubscription(logger) {
        try {
            if (!this.amqpConnection) {
                this.amqpConnection = await amqp.connect(this.amqpUrl, {
                    heartbeat: env.PHYSICS_ENGINE_HEARTBEAT,
                });
                this.attachConnectionHandlers(logger);
            }
        }
        catch (err) {
            logger.error({ err, connectionId: this.connectionId }, "Failed to connect for subscription");
            throw err;
        }
        this.subscribeChannel = await this.amqpConnection.createChannel();
        this.subscribeChannel.on("error", (err) => {
            logger.error({ error: err, connectionId: this.connectionId }, "Subscription channel error - will attempt reconnect");
            void this.attemptSubscriptionReconnect();
        });
        this.subscribeChannel.on("close", () => {
            logger.warn({ connectionId: this.connectionId, wasSubscribed: this.isSubscribed }, "Subscription channel closed - will attempt reconnect");
            this.isSubscribed = false;
            this.subscribeChannel = null;
            this.stateConsumerQueueName = null;
            void this.attemptSubscriptionReconnect();
        });
        await this.subscribeChannel.assertExchange(this.stateExchange, "topic", { durable: true });
        const stateQueueName = `${this.stateQueueNameBase}_${this.connectionId}`;
        await this.subscribeChannel.assertQueue(stateQueueName, { exclusive: true });
        this.stateConsumerQueueName = stateQueueName;
        for (const compositeKey of this.stateCallbacks.keys()) {
            const routingKey = this.stateRoutingKey(compositeKey);
            await this.subscribeChannel.bindQueue(this.stateConsumerQueueName, this.stateExchange, routingKey);
        }
        this.isSubscribed = true;
        this.subscriptionReconnectAttempts = 0;
        this.shouldStopSubscription = false;
        logger.info({ stateExchange: this.stateExchange, queue: this.stateConsumerQueueName }, "Connected to physics state (RabbitMQ)");
        void this.subscribeChannel
            .consume(this.stateConsumerQueueName, (msg) => {
            if (!msg) {
                logger.warn({ connectionId: this.connectionId }, "Received null message in physics state consumer - channel may be closing");
                return;
            }
            if (this.shouldStopSubscription) {
                logger.debug({ connectionId: this.connectionId }, "Subscription stopped, ignoring message");
                return;
            }
            this.subscriptionReconnectAttempts = 0;
            const routingKey = msg.fields.routingKey;
            if (!routingKey || !routingKey.startsWith(STATE_TOPIC_PREFIX)) {
                logger.warn({ routingKey, connectionId: this.connectionId }, "Received message with invalid routing key");
                return;
            }
            const compositeKey = this.routingKeyToCompositeKey(routingKey);
            const callbacks = this.stateCallbacks.get(compositeKey);
            if (!callbacks?.size) {
                logger.warn({
                    compositeKey,
                    routingKey,
                    connectionId: this.connectionId,
                    totalSubscriptions: this.stateCallbacks.size,
                }, "Received state message but no callbacks registered for this investigation");
                return;
            }
            try {
                const state = decode(msg.content);
                for (const cb of callbacks) {
                    try {
                        cb(state);
                    }
                    catch (cbError) {
                        logger.error({ error: cbError, compositeKey, connectionId: this.connectionId }, "Error in state callback");
                    }
                }
            }
            catch (parseError) {
                const errorMessage = parseError instanceof Error ? parseError.message : String(parseError);
                const contentLength = msg.content?.length ?? 0;
                const contentType = typeof msg.content === "string"
                    ? "string"
                    : msg.content instanceof Buffer
                        ? "buffer"
                        : typeof msg.content;
                logger.error({
                    error: errorMessage,
                    stack: parseError instanceof Error ? parseError.stack : undefined,
                    routingKey,
                    contentLength,
                    contentType,
                    connectionId: this.connectionId,
                }, "Failed to parse state broadcast");
            }
        }, { noAck: true })
            .catch((err) => {
            logger.error({ error: err, connectionId: this.connectionId }, "Failed to set up subscription consumer - subscriptions will not work");
        });
    }
    /**
     * Unsubscribe from state broadcasts for a specific investigation
     * @param {string} investigationId - The investigation ID to unsubscribe from
     * @param {StateCallback} [callback] - Optional specific callback to remove
     */
    unsubscribe(investigationId, callback) {
        const logger = getLogger();
        const compositeKey = this.buildCompositeKey(investigationId);
        const callbacks = this.stateCallbacks.get(compositeKey);
        if (!callbacks) {
            logger.warn({ compositeKey, connectionId: this.connectionId, investigationId }, "Attempted to unsubscribe from non-existent subscription");
            return;
        }
        const sizeBefore = callbacks.size;
        if (callback) {
            callbacks.delete(callback);
        }
        else {
            callbacks.clear();
        }
        logger.info({
            compositeKey,
            connectionId: this.connectionId,
            investigationId,
            callbacksBefore: sizeBefore,
            callbacksAfter: callbacks.size,
            totalSubscriptions: this.stateCallbacks.size,
        }, "Removed callback from physics state subscription");
        if (callbacks.size === 0) {
            this.stateCallbacks.delete(compositeKey);
            logger.info({ compositeKey, connectionId: this.connectionId, investigationId }, "Removed subscription callback set - no more callbacks for this investigation");
            const routingKey = this.stateRoutingKey(compositeKey);
            void this.subscribeChannel
                ?.unbindQueue(this.stateConsumerQueueName, this.stateExchange, routingKey)
                .then(() => {
                logger.info({ routingKey, compositeKey, connectionId: this.connectionId, investigationId }, "Unbound physics state routing key");
            })
                .catch((err) => {
                logger.error({ err, routingKey, compositeKey, connectionId: this.connectionId }, "Unbind state routing key failed");
            });
        }
        if (this.stateCallbacks.size === 0 && this.isSubscribed) {
            logger.info({ connectionId: this.connectionId }, "No more subscriptions, stopping subscription channel");
            this.stopSubscription();
        }
    }
    /**
     * Attempt to reconnect the subscription channel after a failure (e.g. channel closed)
     */
    async attemptSubscriptionReconnect() {
        const logger = getLogger();
        if (this.shouldStopSubscription || this.stateCallbacks.size === 0 || this.isClosing) {
            return;
        }
        this.subscriptionReconnectAttempts++;
        const maxAttempts = env.PHYSICS_GATEWAY_RECONNECT_ATTEMPTS;
        const delayMs = env.PHYSICS_GATEWAY_RECONNECT_DELAY_MS;
        if (this.subscriptionReconnectAttempts > maxAttempts) {
            logger.error({ attempts: this.subscriptionReconnectAttempts, connectionId: this.connectionId }, "Max subscription reconnect attempts reached, giving up");
            this.isSubscribed = false;
            return;
        }
        logger.warn({
            attempt: this.subscriptionReconnectAttempts,
            maxAttempts,
            connectionId: this.connectionId,
        }, "Attempting subscription channel reconnect");
        try {
            void this.subscribeChannel?.close();
        }
        catch {
            // ignore
        }
        this.subscribeChannel = null;
        this.stateConsumerQueueName = null;
        await new Promise((resolve) => setTimeout(resolve, delayMs));
        if (this.shouldStopSubscription || this.isClosing)
            return;
        try {
            await this.startSubscription(logger);
        }
        catch (err) {
            logger.error({ error: err, connectionId: this.connectionId }, "Failed to reconnect subscription");
            void this.attemptSubscriptionReconnect();
        }
    }
    /**
     * Stop the subscription channel
     */
    stopSubscription() {
        const logger = getLogger();
        if (!this.isSubscribed && !this.subscribeChannel) {
            return;
        }
        this.shouldStopSubscription = true;
        const channel = this.subscribeChannel;
        try {
            if (channel)
                void channel.close();
        }
        catch (error) {
            logger.error({ error }, "Error closing subscription channel");
        }
        this.subscribeChannel = null;
        this.stateConsumerQueueName = null;
        this.isSubscribed = false;
        logger.debug({ connectionId: this.connectionId }, "Stopped physics state subscription");
    }
    /**
     * Send a command and wait for response (RPC over RabbitMQ)
     * @param {IPhysicsCommand} command - The command to send
     * @returns {Promise<IPhysicsResponse>} The response
     */
    async sendCommand(command) {
        const logger = getLogger();
        const commandWithConnectionId = {
            ...command,
            payload: {
                ...command.payload,
                connection_id: command.connectionId || this.connectionId,
            },
        };
        const { removedIndices, originalCount, sanitizedCount } = sanitizeBatchPayload(commandWithConnectionId.payload);
        if (removedIndices.length > 0) {
            logger.warn({
                removedIndices,
                originalCount,
                sanitizedCount,
                commandType: commandWithConnectionId.type,
                investigationId: commandWithConnectionId.investigationId,
                connectionId: this.connectionId,
            }, "Removed invalid batch entries before sending physics command");
        }
        if (originalCount !== null &&
            sanitizedCount !== null &&
            sanitizedCount === 0 &&
            commandWithConnectionId.payload &&
            typeof commandWithConnectionId.payload === "object") {
            logger.warn({
                removedIndices,
                originalCount,
                sanitizedCount,
                commandType: commandWithConnectionId.type,
                investigationId: commandWithConnectionId.investigationId,
            }, "Batch command empty after sanitization - rejecting command");
            return {
                success: false,
                type: commandWithConnectionId.type,
                investigation_id: commandWithConnectionId.investigationId,
                data: null,
                error: "Batch command contains no valid entries",
                error_code: "INVALID_COMMAND",
                error_details: JSON.stringify({
                    reason: "BATCH_EMPTY_AFTER_SANITIZE",
                    removed_indices: removedIndices,
                    original_count: originalCount,
                    sanitized_count: sanitizedCount,
                }),
            };
        }
        const invalidBatchEntry = findInvalidBatchCommandEntry(commandWithConnectionId.payload);
        if (invalidBatchEntry) {
            logger.warn({
                invalidEntryIndex: invalidBatchEntry.index,
                invalidValue: invalidBatchEntry.value,
                commandType: commandWithConnectionId.type,
                investigationId: commandWithConnectionId.investigationId,
            }, "Invalid batch command entry detected - rejecting command");
            return {
                success: false,
                type: commandWithConnectionId.type,
                investigation_id: commandWithConnectionId.investigationId,
                data: null,
                error: `Batch command contains invalid entry at index ${invalidBatchEntry.index}`,
                error_code: "INVALID_COMMAND",
                error_details: JSON.stringify({
                    reason: "BATCH_INVALID_ENTRY",
                    invalid_index: invalidBatchEntry.index,
                    removed_indices: removedIndices,
                }),
            };
        }
        logger.debug({
            command: commandWithConnectionId,
            isConnected: this.isConnected,
            connectionId: this.connectionId,
        }, "Sending physics command");
        return this.sendCommandToEngine(commandWithConnectionId);
    }
    /**
     * Send one command to the engine and wait for reply.
     * @param {IPhysicsCommand} command - Command (payload.connection_id already set by sendCommand)
     * @returns {Promise<IPhysicsResponse>} Response from Chrono
     */
    async sendCommandToEngine(command) {
        const logger = getLogger();
        if (!this.isConnected || !this.commandChannel || !this.replyQueueName) {
            logger.debug("Not connected, connecting now...");
            try {
                await this.connect();
            }
            catch (connectError) {
                const errorDetails = {
                    message: connectError instanceof Error ? connectError.message : String(connectError),
                    stack: connectError instanceof Error ? connectError.stack : undefined,
                    name: connectError instanceof Error ? connectError.name : undefined,
                    code: isSystemError(connectError) ? connectError.code : undefined,
                };
                logger.error({ connectError: errorDetails, commandQueue: this.commandQueue }, "Failed to connect in sendCommand");
                throw connectError;
            }
        }
        const startTime = Date.now();
        const correlationId = randomUUID();
        return new Promise((resolve) => {
            const timeoutId = setTimeout(() => {
                if (this.pendingRequests.delete(correlationId)) {
                    logger.error({ correlationId, durationMs: Date.now() - startTime }, "Physics command timed out - physics engine may be unresponsive");
                    resolve({
                        success: false,
                        type: command.type,
                        investigation_id: command.investigationId,
                        data: null,
                        error: "Request timed out",
                        error_code: "TIMEOUT",
                        error_details: null,
                    });
                }
            }, RECEIVE_TIMEOUT_MS);
            this.pendingRequests.set(correlationId, {
                resolve,
                reject: () => { },
                timeoutId,
            });
            try {
                const encoded = encode(command);
                logger.debug({ encodedLength: encoded.byteLength, connectionId: this.connectionId }, "Sending encoded command to physics engine");
                const sent = this.commandChannel.sendToQueue(this.commandQueue, Buffer.from(encoded), {
                    replyTo: this.replyQueueName,
                    correlationId,
                    persistent: false,
                    contentType: "application/msgpack",
                });
                if (!sent) {
                    clearTimeout(timeoutId);
                    this.pendingRequests.delete(correlationId);
                    resolve({
                        success: false,
                        type: command.type,
                        investigation_id: command.investigationId,
                        data: null,
                        error: "Channel backpressure",
                        error_code: "INTERNAL_ERROR",
                        error_details: null,
                    });
                }
            }
            catch (error) {
                clearTimeout(timeoutId);
                this.pendingRequests.delete(correlationId);
                const errMsg = error instanceof Error ? error.message : "Unknown error";
                logger.error({ error: errMsg, commandType: command.type }, "Failed to send physics command");
                resolve({
                    success: false,
                    type: command.type,
                    investigation_id: command.investigationId,
                    data: null,
                    error: errMsg,
                    error_code: "INTERNAL_ERROR",
                    error_details: null,
                });
            }
        });
    }
    /**
     * Health check
     * @returns {Promise<IPhysicsResponse>} The health check response
     */
    async healthCheck() {
        return this.sendCommand({
            type: PhysicsCommandType.HEALTH,
            investigationId: "",
            payload: {},
        });
    }
    /**
     * Close all connections and cleanup resources.
     * Exclusive queues (replies, state) are deleted by RabbitMQ when the connection closes.
     */
    close() {
        const logger = getLogger();
        this.isClosing = true;
        if (!this.isConnected && !this.isSubscribed && !this.commandChannel) {
            logger.debug({ connectionId: this.connectionId }, "Physics gateway already closed");
            return;
        }
        logger.info({ connectionId: this.connectionId }, "Closing physics gateway");
        this.stopSubscription();
        for (const [, pending] of this.pendingRequests) {
            clearTimeout(pending.timeoutId);
        }
        this.pendingRequests.clear();
        try {
            void this.commandChannel?.close();
        }
        catch (error) {
            logger.error({ error, connectionId: this.connectionId }, "Error closing command channel");
        }
        this.commandChannel = null;
        this.replyQueueName = null;
        try {
            void this.amqpConnection?.close();
        }
        catch (error) {
            logger.error({ error, connectionId: this.connectionId }, "Error closing AMQP connection");
        }
        this.amqpConnection = null;
        this.subscriptionReconnectAttempts = 0;
        this.isConnected = false;
        this.stateCallbacks.clear();
        logger.info({ connectionId: this.connectionId }, "Physics gateway closed");
    }
}