Skip to content
4 changes: 4 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { deployService } from './utils/k8s';
import { webServer } from '@/web/index';
import { Manager } from './manager';
import { ResourceMonitor } from './manager/resource-monitor';
import { SystemRequiredDeployments } from './deployment/system';
import { defaultFilter, FileControllerManager } from './manager/file-manager';
import { DomainManager } from './manager/domain-manager';
Expand Down Expand Up @@ -28,6 +29,9 @@ try {
// Initialize the Server Manager
Manager.getInstance();

// Start collecting pod resource metrics in the background.
ResourceMonitor.getInstance();

// Initialize the File Controller Manager
await FileControllerManager.initialize('', {
filter: defaultFilter,
Expand Down
89 changes: 89 additions & 0 deletions src/manager/resource-monitor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import { Namespace } from '@/utils/config';
import { type PodData, coreV1Api, k8sMetrics } from '@/utils/k8s';

export class ResourceMonitor {
private static instance: ResourceMonitor | null = null;
private podData: Map<string, PodData> = new Map();

private cronJob: NodeJS.Timeout | null = null;

constructor(interval: number = 1_000 * 2) {
this.cronJob = setInterval(async () => {
try {
const allServerPod = await coreV1Api.listNamespacedPod({
namespace: Namespace,
labelSelector: 'app=minecraft-server',
});
const nextPodData: Map<string, PodData> = new Map();
const podNameToServerName: Map<string, string> = new Map();

allServerPod.items.forEach((pod) => {
const podName = pod.metadata?.name;
const serverName = pod.metadata?.labels?.name || podName;
if (!podName || !serverName) {
return;
}
const allocatedCpu =
pod.spec?.containers?.[0]?.resources?.requests?.cpu || '0';
const allocatedMemory =
pod.spec?.containers?.[0]?.resources?.requests?.memory || '0';
podNameToServerName.set(podName, serverName);
nextPodData.set(serverName, {
name: serverName,
allocatedCpu,
allocatedMemory,
cpu: this.podData.get(serverName)?.cpu || '0',
memory: this.podData.get(serverName)?.memory || '0',
});
});

const currentPodsMetrics = await k8sMetrics.getPodMetrics(Namespace);
currentPodsMetrics.items.forEach((podMetric) => {
const podName = podMetric.metadata?.name;
if (!podName) {
return;
}
const serverName =
podNameToServerName.get(podName) ||
podMetric.metadata?.labels?.name ||
podName;
const existingData = nextPodData.get(serverName);
if (!existingData) {
return;
}
const cpu = podMetric.containers?.[0]?.usage?.cpu || '0';
const memory = podMetric.containers?.[0]?.usage?.memory || '0';
nextPodData.set(serverName, {
...existingData,
cpu,
memory,
});
});
this.podData = nextPodData;
} catch (error) {
console.error('Failed to update resource monitor data:', error);
}
}, interval);
}

public static getInstance(interval?: number) {
if (!this.instance) {
this.instance = new ResourceMonitor(interval);
}
return this.instance;
}

public getPodData() {
return Array.from(this.podData.values());
}

public getPodDataByName(name: string) {
return this.podData.get(name);
}

public clean() {
this.podData.clear();
clearInterval(this.cronJob!);
ResourceMonitor.instance = null;
}
}
50 changes: 50 additions & 0 deletions src/manager/server-controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ export class ServerController {
private rconClient: Rcon;
private host: string;
private port: number;
private isEnded: boolean = false;
private isConnected: boolean = false;
private retryCount: number = 0;
private maxRetries: number = 5;
private shouldReconnect: boolean = true;
private isConnectionListenerRegistered: boolean = false;

constructor(host: string, port: number, log = false) {
this.host = host;
Expand All @@ -31,15 +37,59 @@ export class ServerController {
port: this.port,
password: RCONPassword,
});
this.registerConnectionListeners();
}

private registerConnectionListeners() {
if (this.isConnectionListenerRegistered) {
return;
}

this.isConnectionListenerRegistered = true;
this.rconClient.on('end', () => {
this.isEnded = true;
this.isConnected = false;
if (!this.shouldReconnect) {
return;
}
if (this.retryCount >= this.maxRetries) {
console.error(
`RCON reconnection aborted after ${this.maxRetries} retries.`,
);
this.shouldReconnect = false;
return;
}
this.retryCount++;
setTimeout(
() => {
this.connect().catch((e) =>
console.error('Failed to reconnect RCON:', (e as Error).message),
);
},
1000 * Math.min(2 ** this.retryCount, 30),
); // Exponential backoff with max delay of 30 seconds
});
this.rconClient.on('connect', () => {
this.isConnected = true;
this.isEnded = false;
this.retryCount = 0;
});
}

public connect() {
this.shouldReconnect = true;
return this.rconClient.connect();
}

public disconnect() {
this.isEnded = true;
this.shouldReconnect = false;
return this.rconClient.end();
}
public async sendCommand(command: string) {
if (this.isEnded) {
throw new Error('RCON connection has ended');
}
return await this.rconClient.send(command);
}
}
91 changes: 90 additions & 1 deletion src/manager/server-mamager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ import gateClient from '@/utils/gate';
import {
PhaseEnum,
Watcher,
coreV1Api,
deleteService,
deployService,
getConfigMapData,
getDeploymentData,
k8sApiEndpoint,
k8sLogger,
patchConfigMap,
patchDeployment,
stopDeploymentRollout,
Expand All @@ -27,6 +29,7 @@ import { minecraftServerDeployment } from '@/deployment/minecraft-server';
import { FileController, FileControllerManager } from './file-manager';
import { DomainManager, getTopLevelDomain } from './domain-manager';
import { TaskQueue } from '@/utils/taskQueue';
import { PassThrough } from 'node:stream';

export enum ServerStatusEnum {
RUNNING = 'running',
Expand Down Expand Up @@ -89,7 +92,7 @@ export class Manager {
const existingServer = this.servers.get(serverName);
if (existingServer) {
if (
existingServer.servicesUpdateResourceVersion <=
existingServer.servicesUpdateResourceVersion >=
parseInt(currentResourceVersion)
) {
return;
Expand Down Expand Up @@ -510,6 +513,24 @@ export class Manager {
return server.nameTemplate.replace('@PlaceHolder@', target);
}

private static async getCurrentServerPodName(
serverName: string,
): Promise<string> {
const pods = await coreV1Api.listNamespacedPod({
namespace: Namespace,
labelSelector: `app=minecraft-server,name=${serverName}`,
});
const podName =
pods.items.find((pod) => pod.status?.phase === 'Running')?.metadata
?.name || pods.items[0]?.metadata?.name;

if (!podName) {
throw new Error(`No pod found for server ${serverName}.`);
}

return podName;
}

public static async stopServer(serverName: string): Promise<void> {
if (!this.servers.has(serverName)) {
throw new Error(`Server ${serverName} not found.`);
Expand Down Expand Up @@ -560,6 +581,74 @@ export class Manager {
return Array.from(Manager.servers.values());
}

public static async executeServerPod(
serverName: string,
command: string,
): Promise<string> {
const server = Manager.getServerInfoByName(serverName);
if (!server) {
throw new Error(`Server ${serverName} not found.`);
}
const podName = await this.getCurrentServerPodName(serverName);
const executeResponse =
await coreV1Api.connectPostNamespacedPodExecWithHttpInfo({
namespace: Namespace,
name: podName,
command,
stderr: true,
stdin: true,
stdout: true,
});
if (executeResponse.httpStatusCode !== 101) {
throw new Error(
`Failed to execute command on server ${serverName}. HTTP status code: ${executeResponse.httpStatusCode}`,
);
}

const executeResultText = await executeResponse.body.text();
return executeResultText;
Comment on lines +584 to +609
}

public static async readServerLogs(
serverName: string,
lines: number = 100,
): Promise<string> {
const server = Manager.getServerInfoByName(serverName);
if (!server) {
throw new Error(`Server ${serverName} not found.`);
}
const podName = await this.getCurrentServerPodName(serverName);
const logsResponse = await coreV1Api.readNamespacedPodLogWithHttpInfo({
namespace: Namespace,
name: podName,
tailLines: lines,
});
const logs = await logsResponse.body.text();
return logs;
Comment on lines +612 to +627
}

public static async getFollowedServerLogs(
serverName: string,
): Promise<PassThrough> {
const server = Manager.getServerInfoByName(serverName);
if (!server) {
throw new Error(`Server ${serverName} not found.`);
}
const podName = await this.getCurrentServerPodName(serverName);
const logStream = new PassThrough();
k8sLogger.log(
Namespace,
podName,
'minecraft-server',
logStream,
{
follow: true,
pretty: true,
},
);
Comment on lines +630 to +648
return logStream;
}

public static getServerStatus(
serverName: string,
): ServerStatusEnum | undefined {
Expand Down
71 changes: 71 additions & 0 deletions src/manager/stream-manager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import { PassThrough } from 'node:stream';
import { Manager } from './server-mamager';

export class LogStreamManager {
private static instance: LogStreamManager | null = null;
private logStreams: Map<string, PassThrough> = new Map();
private sendFn: ((subscriptionId: string, data: string) => void) | null =
null;

private constructor(sendFn: (subscriptionId: string, data: string) => void) {
this.sendFn = sendFn;
}

public static getInstance(
sendFn?: (subscriptionId: string, data: string) => void,
) {
if (!this.instance) {
if (!sendFn) {
throw new Error(
'sendFn is required for the first initialization of LogStreamManager',
);
}
this.instance = new LogStreamManager(sendFn);
}
return this.instance;
}

private registerLogStream(serverName: string, logStream: PassThrough) {
this.logStreams.set(serverName, logStream);
}

private unregisterLogStream(serverName: string) {
this.logStreams.delete(serverName);
}

public async createLogStream(serverName: string, subscriptionId: string) {
const logStream = await Manager.getFollowedServerLogs(serverName);
this.registerLogStream(subscriptionId, logStream);

logStream.on('data', (chunk) => {
const data = chunk.toString();
if (this.sendFn) {
this.sendFn(subscriptionId, data);
}
});

logStream.on('error', (error) => {
console.error(`Error in log stream for server ${serverName}:`, error);
if (this.sendFn) {
this.sendFn(subscriptionId, `Error: ${error.message}`);
}
this.unregisterLogStream(subscriptionId);
});

logStream.on('end', () => {
console.log(`Log stream for server ${serverName} ended.`);
if (this.sendFn) {
this.sendFn(subscriptionId, 'Log stream ended.');
}
this.unregisterLogStream(subscriptionId);
});
}

public async closeLogStream(subscriptionId: string) {
const logStream = this.logStreams.get(subscriptionId);
if (logStream) {
logStream.end();
this.unregisterLogStream(subscriptionId);
}
}
}
Loading
Loading