/**
* Physics Gateway Service
*
* Serialization (one command at a time) is at RabbitMQ level: Chrono must consume from
* physics_engine_commands with prefetch(1) or a single worker. Then "waiting" = messages in the
* queue; "running" = the one unacked message Chrono is processing. No Node-side queue needed.
* @module services/physics-gateway
*/
import { env } from "@config/env";
import { encode, decode } from "@msgpack/msgpack";
import { PhysicsCommandType } from "@typez/physics";
import { getLogger } from "@utils/asyncLocalStorage";
import amqp from "amqplib";
import { randomUUID } from "crypto";
/**
* Type guard to check if error has system properties
* @param {unknown} error - The error to check
* @returns {boolean} True if error has system properties
*/
function isSystemError(error) {
return (error instanceof Error &&
(typeof error.code === "string" ||
typeof error.errno === "number" ||
typeof error.syscall === "string"));
}
/**
* Chrono / physics engine contract:
* - Commands: consume from physics_engine_commands with prefetch(1) so only one command runs at a time (waiting = queued, running = unacked). Reply to replyTo with correlationId; include connection_id in payload.
* - State: publish to exchange physics_engine_broadcast (topic) with routing key state.{connection_id}.{investigation_id}; backend forwards to WebSocket (physics:state).
*/
/** Topic prefix for state broadcasts (routing key: state.{connection_id}.{investigation_id}) */
const STATE_TOPIC_PREFIX = "state.";
/** Receive timeout in ms */
const RECEIVE_TIMEOUT_MS = 30000;
/**
* Find invalid entries (null/undefined) inside batch commands payload
* @param {object} payload - Physics command payload
* @returns {{index: number, value: unknown} | null} Invalid entry details
*/
function findInvalidBatchCommandEntry(payload) {
if (!payload || typeof payload !== "object") {
return null;
}
const batchCommand = payload?.command;
if (!batchCommand || batchCommand.op !== "batch" || !Array.isArray(batchCommand.commands)) {
return null;
}
for (let i = 0; i < batchCommand.commands.length; i++) {
const entry = batchCommand.commands[i];
if (!entry || typeof entry !== "object") {
return { index: i, value: entry };
}
}
return null;
}
/**
* Sanitize batch payload (remove null/undefined entries)
* @param {object} payload - Physics command payload
* @returns {object} Sanitization metadata (removedIndices/originalCount/sanitizedCount)
*/
function sanitizeBatchPayload(payload) {
if (!payload || typeof payload !== "object") {
return { removedIndices: [], originalCount: null, sanitizedCount: null };
}
const batchPayload = payload;
const originalCommand = batchPayload.command;
if (!originalCommand ||
originalCommand.op !== "batch" ||
!Array.isArray(originalCommand.commands)) {
return { removedIndices: [], originalCount: null, sanitizedCount: null };
}
const removedIndices = [];
const originalCount = originalCommand.commands.length;
const sanitizedCommands = originalCommand.commands.filter((entry, index) => {
const isValid = entry && typeof entry === "object";
if (!isValid) {
removedIndices.push(index);
}
return isValid;
});
if (removedIndices.length === 0) {
return { removedIndices: [], originalCount, sanitizedCount: originalCount };
}
batchPayload.command = {
...originalCommand,
commands: sanitizedCommands,
};
return {
removedIndices,
originalCount,
sanitizedCount: sanitizedCommands.length,
};
}
/**
* Physics Gateway Service (RabbitMQ)
*/
export class PhysicsGatewayService {
commandChannel = null;
subscribeChannel = null;
replyQueueName = null;
stateConsumerQueueName = null;
stateCallbacks = new Map();
pendingRequests = new Map();
isConnected = false;
isSubscribed = false;
shouldStopSubscription = false;
subscriptionReconnectAttempts = 0;
connectionReconnectAttempts = 0;
isClosing = false;
reconnecting = false;
connectionId;
amqpConnection = null;
amqpUrl;
commandQueue;
replyQueueNameBase;
stateExchange;
/** Base name for state queue; per-connection queue is {stateQueueNameBase}_{connectionId} (exclusive) */
stateQueueNameBase;
/**
* Creates a new physics gateway service instance
* @param {string} connectionId - Unique connection ID (from websocket)
* @param {string} [physicsEngineHostOverride] - Optional host override (e.g. for dev-stage); when not set, uses PHYSICS_ENGINE_HOST from env
*/
constructor(connectionId, physicsEngineHostOverride) {
const physicsHost = physicsEngineHostOverride ?? env.PHYSICS_ENGINE_HOST;
const user = encodeURIComponent(env.PHYSICS_ENGINE_AMQP_USER);
const password = encodeURIComponent(env.PHYSICS_ENGINE_AMQP_PASSWORD);
this.amqpUrl = `amqp://${user}:${password}@${physicsHost}:${env.PHYSICS_ENGINE_AMQP_PORT}`;
this.commandQueue = env.PHYSICS_ENGINE_COMMAND_QUEUE;
this.replyQueueNameBase = env.PHYSICS_ENGINE_REPLY_QUEUE;
this.stateExchange = env.PHYSICS_ENGINE_STATE_EXCHANGE;
this.stateQueueNameBase = env.PHYSICS_ENGINE_STATE_QUEUE;
this.connectionId = connectionId;
const logger = getLogger();
logger.info({
connectionId: this.connectionId,
host: physicsHost,
commandQueue: this.commandQueue,
stateExchange: this.stateExchange,
}, "Created new PhysicsGatewayService instance (RabbitMQ)");
}
/**
* Build composite key from connection ID and investigation ID
* @param {string} investigationId - Investigation ID
* @returns {string} Composite key in format "connectionId:investigationId"
*/
buildCompositeKey(investigationId) {
return `${this.connectionId}:${investigationId}`;
}
/**
* Build RabbitMQ routing key for state (state.{connection_id}.{investigation_id})
* @param {string} compositeKey - connectionId:investigationId
* @returns {string} Routing key "state.connectionId.investigationId"
*/
stateRoutingKey(compositeKey) {
return `${STATE_TOPIC_PREFIX}${compositeKey.replace(":", ".")}`;
}
/**
* Parse routing key state.{connection_id}.{investigation_id} to compositeKey (connectionId:investigationId)
* @param {string} routingKey - RabbitMQ routing key (e.g. state.connId.invId)
* @returns {string} Composite key connectionId:investigationId
*/
routingKeyToCompositeKey(routingKey) {
const withoutPrefix = routingKey.slice(STATE_TOPIC_PREFIX.length);
const dotIdx = withoutPrefix.indexOf(".");
if (dotIdx === -1)
return withoutPrefix;
const connectionId = withoutPrefix.slice(0, dotIdx);
const investigationId = withoutPrefix.slice(dotIdx + 1);
return `${connectionId}:${investigationId}`;
}
/**
* Attach connection lifecycle handlers (close/error -> reconnect)
* @param {object} logger - Logger instance for connection lifecycle events
*/
attachConnectionHandlers(logger) {
if (!this.amqpConnection?.connection)
return;
const conn = this.amqpConnection.connection;
conn.on("error", (err) => {
logger.error({ error: err, connectionId: this.connectionId }, "RabbitMQ connection error - will attempt reconnect");
this.connectionReconnectAttempts = 0;
this.markConnectionLost();
this.scheduleConnectionReconnect(logger);
});
conn.on("close", () => {
logger.warn({ connectionId: this.connectionId }, "RabbitMQ connection closed - will attempt reconnect");
this.markConnectionLost();
this.scheduleConnectionReconnect(logger);
});
}
/** Mark command path as disconnected; clear channel/queue refs, keep connection ref for now */
markConnectionLost() {
this.isConnected = false;
this.commandChannel = null;
this.replyQueueName = null;
this.subscribeChannel = null;
this.stateConsumerQueueName = null;
this.isSubscribed = false;
this.amqpConnection = null;
}
/**
* Schedule a single reconnection attempt (debounced by reconnecting flag)
* @param {object} logger - Logger instance for reconnection events
*/
scheduleConnectionReconnect(logger) {
if (this.isClosing || this.reconnecting)
return;
this.reconnecting = true;
void this.attemptConnectionReconnect(logger);
}
/**
* Reconnect after connection/channel loss (e.g. user internet drop). Retries with backoff.
* @param {object} logger - Logger instance for reconnection attempts
*/
async attemptConnectionReconnect(logger) {
if (this.isClosing) {
this.reconnecting = false;
return;
}
this.connectionReconnectAttempts++;
const maxAttempts = env.PHYSICS_GATEWAY_RECONNECT_ATTEMPTS;
const baseDelayMs = env.PHYSICS_GATEWAY_RECONNECT_DELAY_MS;
if (this.connectionReconnectAttempts > maxAttempts) {
logger.error({ attempts: this.connectionReconnectAttempts, connectionId: this.connectionId }, "Max physics gateway reconnection attempts reached, giving up");
this.reconnecting = false;
return;
}
const delayMs = Math.min(baseDelayMs * Math.pow(1.5, this.connectionReconnectAttempts - 1), 30_000);
logger.warn({
attempt: this.connectionReconnectAttempts,
maxAttempts,
delayMs,
connectionId: this.connectionId,
}, "Attempting physics gateway reconnection");
await new Promise((resolve) => setTimeout(resolve, delayMs));
if (this.isClosing) {
this.reconnecting = false;
return;
}
try {
this.amqpConnection = null;
await this.connect();
this.connectionReconnectAttempts = 0;
if (this.stateCallbacks.size > 0 && !this.shouldStopSubscription) {
await this.startSubscription(logger);
}
logger.info({ connectionId: this.connectionId }, "Physics gateway reconnected successfully");
}
catch (err) {
logger.error({ error: err, connectionId: this.connectionId }, "Physics gateway reconnect failed, will retry");
this.markConnectionLost();
this.scheduleConnectionReconnect(logger);
}
finally {
this.reconnecting = false;
}
}
/**
* Connect to RabbitMQ and set up command channel + reply queue
*/
async connect() {
const logger = getLogger();
if (this.isConnected && this.commandChannel && this.replyQueueName) {
logger.debug("Physics gateway already connected");
return;
}
try {
logger.debug({ connectionId: this.connectionId, commandQueue: this.commandQueue }, "Connecting to physics engine (RabbitMQ)...");
if (!this.amqpConnection) {
this.amqpConnection = await amqp.connect(this.amqpUrl, {
heartbeat: env.PHYSICS_ENGINE_HEARTBEAT,
});
this.attachConnectionHandlers(logger);
}
this.commandChannel = await this.amqpConnection.createChannel();
this.commandChannel.on("error", (err) => {
logger.error({ error: err, connectionId: this.connectionId }, "Physics command channel error");
this.isConnected = false;
this.commandChannel = null;
this.replyQueueName = null;
this.scheduleConnectionReconnect(logger);
});
this.commandChannel.on("close", () => {
logger.warn({ connectionId: this.connectionId }, "Physics command channel closed");
this.isConnected = false;
this.commandChannel = null;
this.replyQueueName = null;
if (this.amqpConnection) {
this.scheduleConnectionReconnect(logger);
}
});
const replyQueueName = `${this.replyQueueNameBase}_${this.connectionId}`;
await this.commandChannel.assertQueue(replyQueueName, { exclusive: true });
this.replyQueueName = replyQueueName;
await this.commandChannel.consume(this.replyQueueName, (msg) => {
if (!msg)
return;
const correlationId = msg.properties.correlationId;
const pending = correlationId ? this.pendingRequests.get(correlationId) : undefined;
if (pending && correlationId) {
clearTimeout(pending.timeoutId);
this.pendingRequests.delete(correlationId);
try {
const decoded = decode(msg.content);
pending.resolve(decoded);
}
catch (e) {
pending.resolve({
success: false,
type: PhysicsCommandType.GENERIC,
investigation_id: "",
data: null,
error: e instanceof Error ? e.message : "Failed to decode physics response",
error_code: "INTERNAL_ERROR",
error_details: null,
});
}
}
}, { noAck: true });
this.isConnected = true;
logger.info({
replyQueue: this.replyQueueName,
connectionId: this.connectionId,
}, "Connected to physics command (RabbitMQ)");
}
catch (error) {
const errorDetails = {
message: error instanceof Error ? error.message : String(error),
stack: error instanceof Error ? error.stack : undefined,
name: error instanceof Error ? error.name : undefined,
code: isSystemError(error) ? error.code : undefined,
errno: isSystemError(error) ? error.errno : undefined,
syscall: isSystemError(error) ? error.syscall : undefined,
};
logger.error({
error: errorDetails,
amqpUrl: this.amqpUrl.replace(/:[^:@]+@/, ":***@"),
connectionId: this.connectionId,
}, "Failed to connect to physics service (RabbitMQ)");
throw error;
}
}
/**
* Subscribe to state broadcasts for a specific investigation
* @param {string} investigationId - The investigation ID to subscribe to
* @param {StateCallback} callback - Callback function for state updates
*/
subscribe(investigationId, callback) {
const logger = getLogger();
const compositeKey = this.buildCompositeKey(investigationId);
if (!this.stateCallbacks.has(compositeKey)) {
this.stateCallbacks.set(compositeKey, new Set());
logger.info({ compositeKey, connectionId: this.connectionId, investigationId }, "Created new subscription callback set");
}
const callbackSet = this.stateCallbacks.get(compositeKey);
const sizeBefore = callbackSet.size;
callbackSet.add(callback);
logger.info({
compositeKey,
connectionId: this.connectionId,
investigationId,
callbacksBefore: sizeBefore,
callbacksAfter: callbackSet.size,
totalSubscriptions: this.stateCallbacks.size,
isSubscribed: this.isSubscribed,
}, "Added callback to physics state subscription");
if (!this.isSubscribed) {
logger.info({ connectionId: this.connectionId, compositeKey }, "Starting subscription channel");
void this.startSubscription(logger).catch((err) => {
logger.error({ err, connectionId: this.connectionId }, "Failed to start physics subscription");
});
}
else if (this.subscribeChannel) {
const routingKey = this.stateRoutingKey(compositeKey);
logger.debug({ routingKey, compositeKey, connectionId: this.connectionId }, "Binding additional routing key to existing subscription");
void this.subscribeChannel
.bindQueue(this.stateConsumerQueueName, this.stateExchange, routingKey)
.then(() => {
logger.info({ routingKey, compositeKey, connectionId: this.connectionId, investigationId }, "Bound physics state routing key");
})
.catch((err) => {
logger.error({ err, routingKey, compositeKey, connectionId: this.connectionId }, "Failed to bind state routing key");
});
}
else {
logger.warn({ connectionId: this.connectionId, compositeKey, isSubscribed: this.isSubscribed }, "isSubscribed is true but subscribeChannel is null - subscription may be broken");
}
}
/**
* Start subscription channel and consumer
* @param {object} logger - Logger instance (from getLogger())
* @returns {Promise<void>}
*/
async startSubscription(logger) {
try {
if (!this.amqpConnection) {
this.amqpConnection = await amqp.connect(this.amqpUrl, {
heartbeat: env.PHYSICS_ENGINE_HEARTBEAT,
});
this.attachConnectionHandlers(logger);
}
}
catch (err) {
logger.error({ err, connectionId: this.connectionId }, "Failed to connect for subscription");
throw err;
}
this.subscribeChannel = await this.amqpConnection.createChannel();
this.subscribeChannel.on("error", (err) => {
logger.error({ error: err, connectionId: this.connectionId }, "Subscription channel error - will attempt reconnect");
void this.attemptSubscriptionReconnect();
});
this.subscribeChannel.on("close", () => {
logger.warn({ connectionId: this.connectionId, wasSubscribed: this.isSubscribed }, "Subscription channel closed - will attempt reconnect");
this.isSubscribed = false;
this.subscribeChannel = null;
this.stateConsumerQueueName = null;
void this.attemptSubscriptionReconnect();
});
await this.subscribeChannel.assertExchange(this.stateExchange, "topic", { durable: true });
const stateQueueName = `${this.stateQueueNameBase}_${this.connectionId}`;
await this.subscribeChannel.assertQueue(stateQueueName, { exclusive: true });
this.stateConsumerQueueName = stateQueueName;
for (const compositeKey of this.stateCallbacks.keys()) {
const routingKey = this.stateRoutingKey(compositeKey);
await this.subscribeChannel.bindQueue(this.stateConsumerQueueName, this.stateExchange, routingKey);
}
this.isSubscribed = true;
this.subscriptionReconnectAttempts = 0;
this.shouldStopSubscription = false;
logger.info({ stateExchange: this.stateExchange, queue: this.stateConsumerQueueName }, "Connected to physics state (RabbitMQ)");
void this.subscribeChannel
.consume(this.stateConsumerQueueName, (msg) => {
if (!msg) {
logger.warn({ connectionId: this.connectionId }, "Received null message in physics state consumer - channel may be closing");
return;
}
if (this.shouldStopSubscription) {
logger.debug({ connectionId: this.connectionId }, "Subscription stopped, ignoring message");
return;
}
this.subscriptionReconnectAttempts = 0;
const routingKey = msg.fields.routingKey;
if (!routingKey || !routingKey.startsWith(STATE_TOPIC_PREFIX)) {
logger.warn({ routingKey, connectionId: this.connectionId }, "Received message with invalid routing key");
return;
}
const compositeKey = this.routingKeyToCompositeKey(routingKey);
const callbacks = this.stateCallbacks.get(compositeKey);
if (!callbacks?.size) {
logger.warn({
compositeKey,
routingKey,
connectionId: this.connectionId,
totalSubscriptions: this.stateCallbacks.size,
}, "Received state message but no callbacks registered for this investigation");
return;
}
try {
const state = decode(msg.content);
for (const cb of callbacks) {
try {
cb(state);
}
catch (cbError) {
logger.error({ error: cbError, compositeKey, connectionId: this.connectionId }, "Error in state callback");
}
}
}
catch (parseError) {
const errorMessage = parseError instanceof Error ? parseError.message : String(parseError);
const contentLength = msg.content?.length ?? 0;
const contentType = typeof msg.content === "string"
? "string"
: msg.content instanceof Buffer
? "buffer"
: typeof msg.content;
logger.error({
error: errorMessage,
stack: parseError instanceof Error ? parseError.stack : undefined,
routingKey,
contentLength,
contentType,
connectionId: this.connectionId,
}, "Failed to parse state broadcast");
}
}, { noAck: true })
.catch((err) => {
logger.error({ error: err, connectionId: this.connectionId }, "Failed to set up subscription consumer - subscriptions will not work");
});
}
/**
* Unsubscribe from state broadcasts for a specific investigation
* @param {string} investigationId - The investigation ID to unsubscribe from
* @param {StateCallback} [callback] - Optional specific callback to remove
*/
unsubscribe(investigationId, callback) {
const logger = getLogger();
const compositeKey = this.buildCompositeKey(investigationId);
const callbacks = this.stateCallbacks.get(compositeKey);
if (!callbacks) {
logger.warn({ compositeKey, connectionId: this.connectionId, investigationId }, "Attempted to unsubscribe from non-existent subscription");
return;
}
const sizeBefore = callbacks.size;
if (callback) {
callbacks.delete(callback);
}
else {
callbacks.clear();
}
logger.info({
compositeKey,
connectionId: this.connectionId,
investigationId,
callbacksBefore: sizeBefore,
callbacksAfter: callbacks.size,
totalSubscriptions: this.stateCallbacks.size,
}, "Removed callback from physics state subscription");
if (callbacks.size === 0) {
this.stateCallbacks.delete(compositeKey);
logger.info({ compositeKey, connectionId: this.connectionId, investigationId }, "Removed subscription callback set - no more callbacks for this investigation");
const routingKey = this.stateRoutingKey(compositeKey);
void this.subscribeChannel
?.unbindQueue(this.stateConsumerQueueName, this.stateExchange, routingKey)
.then(() => {
logger.info({ routingKey, compositeKey, connectionId: this.connectionId, investigationId }, "Unbound physics state routing key");
})
.catch((err) => {
logger.error({ err, routingKey, compositeKey, connectionId: this.connectionId }, "Unbind state routing key failed");
});
}
if (this.stateCallbacks.size === 0 && this.isSubscribed) {
logger.info({ connectionId: this.connectionId }, "No more subscriptions, stopping subscription channel");
this.stopSubscription();
}
}
/**
* Attempt to reconnect the subscription channel after a failure (e.g. channel closed)
*/
async attemptSubscriptionReconnect() {
const logger = getLogger();
if (this.shouldStopSubscription || this.stateCallbacks.size === 0 || this.isClosing) {
return;
}
this.subscriptionReconnectAttempts++;
const maxAttempts = env.PHYSICS_GATEWAY_RECONNECT_ATTEMPTS;
const delayMs = env.PHYSICS_GATEWAY_RECONNECT_DELAY_MS;
if (this.subscriptionReconnectAttempts > maxAttempts) {
logger.error({ attempts: this.subscriptionReconnectAttempts, connectionId: this.connectionId }, "Max subscription reconnect attempts reached, giving up");
this.isSubscribed = false;
return;
}
logger.warn({
attempt: this.subscriptionReconnectAttempts,
maxAttempts,
connectionId: this.connectionId,
}, "Attempting subscription channel reconnect");
try {
void this.subscribeChannel?.close();
}
catch {
// ignore
}
this.subscribeChannel = null;
this.stateConsumerQueueName = null;
await new Promise((resolve) => setTimeout(resolve, delayMs));
if (this.shouldStopSubscription || this.isClosing)
return;
try {
await this.startSubscription(logger);
}
catch (err) {
logger.error({ error: err, connectionId: this.connectionId }, "Failed to reconnect subscription");
void this.attemptSubscriptionReconnect();
}
}
/**
* Stop the subscription channel
*/
stopSubscription() {
const logger = getLogger();
if (!this.isSubscribed && !this.subscribeChannel) {
return;
}
this.shouldStopSubscription = true;
const channel = this.subscribeChannel;
try {
if (channel)
void channel.close();
}
catch (error) {
logger.error({ error }, "Error closing subscription channel");
}
this.subscribeChannel = null;
this.stateConsumerQueueName = null;
this.isSubscribed = false;
logger.debug({ connectionId: this.connectionId }, "Stopped physics state subscription");
}
/**
* Send a command and wait for response (RPC over RabbitMQ)
* @param {IPhysicsCommand} command - The command to send
* @returns {Promise<IPhysicsResponse>} The response
*/
async sendCommand(command) {
const logger = getLogger();
const commandWithConnectionId = {
...command,
payload: {
...command.payload,
connection_id: command.connectionId || this.connectionId,
},
};
const { removedIndices, originalCount, sanitizedCount } = sanitizeBatchPayload(commandWithConnectionId.payload);
if (removedIndices.length > 0) {
logger.warn({
removedIndices,
originalCount,
sanitizedCount,
commandType: commandWithConnectionId.type,
investigationId: commandWithConnectionId.investigationId,
connectionId: this.connectionId,
}, "Removed invalid batch entries before sending physics command");
}
if (originalCount !== null &&
sanitizedCount !== null &&
sanitizedCount === 0 &&
commandWithConnectionId.payload &&
typeof commandWithConnectionId.payload === "object") {
logger.warn({
removedIndices,
originalCount,
sanitizedCount,
commandType: commandWithConnectionId.type,
investigationId: commandWithConnectionId.investigationId,
}, "Batch command empty after sanitization - rejecting command");
return {
success: false,
type: commandWithConnectionId.type,
investigation_id: commandWithConnectionId.investigationId,
data: null,
error: "Batch command contains no valid entries",
error_code: "INVALID_COMMAND",
error_details: JSON.stringify({
reason: "BATCH_EMPTY_AFTER_SANITIZE",
removed_indices: removedIndices,
original_count: originalCount,
sanitized_count: sanitizedCount,
}),
};
}
const invalidBatchEntry = findInvalidBatchCommandEntry(commandWithConnectionId.payload);
if (invalidBatchEntry) {
logger.warn({
invalidEntryIndex: invalidBatchEntry.index,
invalidValue: invalidBatchEntry.value,
commandType: commandWithConnectionId.type,
investigationId: commandWithConnectionId.investigationId,
}, "Invalid batch command entry detected - rejecting command");
return {
success: false,
type: commandWithConnectionId.type,
investigation_id: commandWithConnectionId.investigationId,
data: null,
error: `Batch command contains invalid entry at index ${invalidBatchEntry.index}`,
error_code: "INVALID_COMMAND",
error_details: JSON.stringify({
reason: "BATCH_INVALID_ENTRY",
invalid_index: invalidBatchEntry.index,
removed_indices: removedIndices,
}),
};
}
logger.debug({
command: commandWithConnectionId,
isConnected: this.isConnected,
connectionId: this.connectionId,
}, "Sending physics command");
return this.sendCommandToEngine(commandWithConnectionId);
}
/**
* Send one command to the engine and wait for reply.
* @param {IPhysicsCommand} command - Command (payload.connection_id already set by sendCommand)
* @returns {Promise<IPhysicsResponse>} Response from Chrono
*/
async sendCommandToEngine(command) {
const logger = getLogger();
if (!this.isConnected || !this.commandChannel || !this.replyQueueName) {
logger.debug("Not connected, connecting now...");
try {
await this.connect();
}
catch (connectError) {
const errorDetails = {
message: connectError instanceof Error ? connectError.message : String(connectError),
stack: connectError instanceof Error ? connectError.stack : undefined,
name: connectError instanceof Error ? connectError.name : undefined,
code: isSystemError(connectError) ? connectError.code : undefined,
};
logger.error({ connectError: errorDetails, commandQueue: this.commandQueue }, "Failed to connect in sendCommand");
throw connectError;
}
}
const startTime = Date.now();
const correlationId = randomUUID();
return new Promise((resolve) => {
const timeoutId = setTimeout(() => {
if (this.pendingRequests.delete(correlationId)) {
logger.error({ correlationId, durationMs: Date.now() - startTime }, "Physics command timed out - physics engine may be unresponsive");
resolve({
success: false,
type: command.type,
investigation_id: command.investigationId,
data: null,
error: "Request timed out",
error_code: "TIMEOUT",
error_details: null,
});
}
}, RECEIVE_TIMEOUT_MS);
this.pendingRequests.set(correlationId, {
resolve,
reject: () => { },
timeoutId,
});
try {
const encoded = encode(command);
logger.debug({ encodedLength: encoded.byteLength, connectionId: this.connectionId }, "Sending encoded command to physics engine");
const sent = this.commandChannel.sendToQueue(this.commandQueue, Buffer.from(encoded), {
replyTo: this.replyQueueName,
correlationId,
persistent: false,
contentType: "application/msgpack",
});
if (!sent) {
clearTimeout(timeoutId);
this.pendingRequests.delete(correlationId);
resolve({
success: false,
type: command.type,
investigation_id: command.investigationId,
data: null,
error: "Channel backpressure",
error_code: "INTERNAL_ERROR",
error_details: null,
});
}
}
catch (error) {
clearTimeout(timeoutId);
this.pendingRequests.delete(correlationId);
const errMsg = error instanceof Error ? error.message : "Unknown error";
logger.error({ error: errMsg, commandType: command.type }, "Failed to send physics command");
resolve({
success: false,
type: command.type,
investigation_id: command.investigationId,
data: null,
error: errMsg,
error_code: "INTERNAL_ERROR",
error_details: null,
});
}
});
}
/**
* Health check
* @returns {Promise<IPhysicsResponse>} The health check response
*/
async healthCheck() {
return this.sendCommand({
type: PhysicsCommandType.HEALTH,
investigationId: "",
payload: {},
});
}
/**
* Close all connections and cleanup resources.
* Exclusive queues (replies, state) are deleted by RabbitMQ when the connection closes.
*/
close() {
const logger = getLogger();
this.isClosing = true;
if (!this.isConnected && !this.isSubscribed && !this.commandChannel) {
logger.debug({ connectionId: this.connectionId }, "Physics gateway already closed");
return;
}
logger.info({ connectionId: this.connectionId }, "Closing physics gateway");
this.stopSubscription();
for (const [, pending] of this.pendingRequests) {
clearTimeout(pending.timeoutId);
}
this.pendingRequests.clear();
try {
void this.commandChannel?.close();
}
catch (error) {
logger.error({ error, connectionId: this.connectionId }, "Error closing command channel");
}
this.commandChannel = null;
this.replyQueueName = null;
try {
void this.amqpConnection?.close();
}
catch (error) {
logger.error({ error, connectionId: this.connectionId }, "Error closing AMQP connection");
}
this.amqpConnection = null;
this.subscriptionReconnectAttempts = 0;
this.isConnected = false;
this.stateCallbacks.clear();
logger.info({ connectionId: this.connectionId }, "Physics gateway closed");
}
}
Source