import { env } from "@config/env";
import { PhysicsGatewayService } from "@services/physics-gateway.service";
import { PhysicsCommandSchema, PhysicsCommandType } from "@typez/physics";
import { randomUUID } from "node:crypto";
/**
* Setup physics handlers for a WebSocket client
* @param {WebSocketClient} client - WebSocket client
* @param {IPhysicsHandlerOptions} [options] - Optional overrides (e.g. physics host for dev-stage)
*/
export function setupPhysicsHandlers(client, options) {
// Create a dedicated physics gateway service instance for this websocket connection
const physicsGatewayService = new PhysicsGatewayService(client.connectionId, options?.physicsEngineHost);
const subscriptions = new Set();
// Forward only state.data to Unity (same shape as ZeroMQ before RabbitMQ wrapper)
const callback = (state) => client.send("physics:state", state.data);
client.on("physics:command", (data) => {
void (async () => {
// Generate correlation ID for this command
const correlationId = randomUUID();
// Log received raw command
client.logger.info({ correlationId, command: data }, "Received physics command");
const result = PhysicsCommandSchema.safeParse(data);
if (!result.success) {
const error = result.error.errors.map((e) => e.message).join(", ");
client.logger.warn({ correlationId, error, command: data }, "Invalid physics command");
client.send("physics:response", {
success: false,
type: PhysicsCommandType.GENERIC,
investigation_id: "",
data: null,
error,
error_code: "INVALID_COMMAND",
error_details: { receivedCommand: data, correlationId },
});
return;
}
const command = result.data;
try {
client.logger.info({
correlationId,
command,
commandType: command.type,
investigationId: command.investigationId,
}, "Processing physics command");
const response = await physicsGatewayService.sendCommand(command);
client.logger.info({
correlationId,
command,
response,
commandType: command.type,
investigationId: command.investigationId,
}, "Sending physics response");
client.send("physics:response", response);
// Subscribe on START
if (command.type === PhysicsCommandType.START &&
response.success &&
!subscriptions.has(command.investigationId)) {
physicsGatewayService.subscribe(command.investigationId, callback);
subscriptions.add(command.investigationId);
}
// Unsubscribe on STOP/DESTROY
if ((command.type === PhysicsCommandType.STOP ||
command.type === PhysicsCommandType.DESTROY) &&
response.success &&
subscriptions.has(command.investigationId)) {
physicsGatewayService.unsubscribe(command.investigationId, callback);
subscriptions.delete(command.investigationId);
}
}
catch (error) {
client.logger.error({ correlationId, error, command, commandType: command.type }, "Error processing physics command");
client.send("physics:response", {
success: false,
type: command.type,
investigation_id: command.investigationId,
data: null,
error: error instanceof Error ? error.message : "Internal error",
error_code: "INTERNAL_ERROR",
error_details: { command, correlationId },
});
}
})();
});
client.ws.on("close", (code, reason) => {
client.logger.info({
code,
reason: reason?.toString("utf8"),
connectionId: client.connectionId,
activeSubscriptions: subscriptions.size,
}, "Physics websocket closed");
// Unsubscribe from state immediately so we don't send to a closed socket
for (const investigationId of subscriptions) {
physicsGatewayService.unsubscribe(investigationId, callback);
}
subscriptions.clear();
const delayMs = env.PHYSICS_GATEWAY_CLEANUP_DELAY_MS;
const doClose = () => {
try {
physicsGatewayService.close();
client.logger.info({ connectionId: client.connectionId, delayMs }, "Physics gateway closed after delay");
}
catch (error) {
client.logger.error({ error, connectionId: client.connectionId }, "Error closing physics gateway");
}
};
if (delayMs <= 0) {
doClose();
}
else {
setTimeout(doClose, delayMs);
}
});
}
Source