Skip to content

Commit bbb9d0d

Browse files
committed
implement encode service
1 parent 2c39707 commit bbb9d0d

13 files changed

+162
-31
lines changed

config.example.json

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,14 @@
3232
"API_TOKEN": "example-token"
3333
}
3434
],
35+
"ENCODE_SERVICE": [
36+
{
37+
"NAME": "encode-service",
38+
"HOSTS": ["localhost"],
39+
"API_HOST": "http://encode-service:80",
40+
"API_TOKEN": "example-token"
41+
}
42+
],
3543
"JWT": {
3644
"SECRET": null
3745
}

src/config.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ export const NODE_MEDIA_SERVER: IWorkerConfig[] = env.NODE_MEDIA_SERVER;
2525

2626
export const ADOBE_MEDIA_SERVER: IWorkerConfig[] = env.ADOBE_MEDIA_SERVER;
2727

28+
export const ENCODE_SERVICE: IWorkerConfig[] = env.ENCODE_SERVICE;
29+
2830
export const JWT = {
2931
SECRET: env.JWT.SECRET,
3032
};

src/controllers/channels.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ export async function channels(ctx: Router.IRouterContext, next: Next) {
159159
}
160160

161161
export async function list(ctx: Router.IRouterContext, next: Next) {
162-
const liveChannels: { app: string; channel: string }[] = [];
162+
const liveChannels: { app: string; channel: string; protocol: string }[] = [];
163163

164164
const channels = (
165165
await channelService.getChannelsByType(ChannelTypeEnum.PUBLIC)
@@ -173,6 +173,7 @@ export async function list(ctx: Router.IRouterContext, next: Next) {
173173
liveChannels.push({
174174
app: channelObj.publisher.app,
175175
channel: channelObj.publisher.channel,
176+
protocol: channelObj.publisher.protocol,
176177
});
177178
}
178179
}

src/env.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,17 @@ convict.addFormat({
1616
},
1717
HOSTS: {
1818
format: (value) => {
19+
if (!_.isArray(value)) {
20+
throw new Error('not_array');
21+
}
22+
1923
if (!_.every(value, _.isString)) {
2024
throw new Error('not_valid_array');
2125
}
26+
27+
if (value.length === 0) {
28+
throw new Error('no_hosts');
29+
}
2230
},
2331
default: null,
2432
},
@@ -81,6 +89,10 @@ const config = convict({
8189
format: 'stream-server-config',
8290
default: null,
8391
},
92+
ENCODE_SERVICE: {
93+
format: 'stream-server-config',
94+
default: null,
95+
},
8496
JWT: {
8597
SECRET: {
8698
format: String,

src/helpers/hide-fields.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,12 @@ import * as _ from 'lodash';
33
import { shouldHideFields } from './should-hide-fields';
44
import { IUserModel } from '../models/user';
55

6-
const paths = [['ip'], ['location', 'ip'], ['location', 'api', 'query']];
6+
const paths = [
7+
['ip'],
8+
['location', 'ip'],
9+
['location', 'api', 'query'],
10+
['apiResponse', 'ip'],
11+
];
712

813
export function hideFields(user: IUserModel, obj) {
914
if (!shouldHideFields(user)) {

src/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import { API } from './config';
77
import { runUpdate as runUpdate_kms } from './workers/klpq-media-server';
88
import { runUpdate as runUpdate_ams } from './workers/adobe-media-server';
99
import { runUpdate as runUpdate_nms } from './workers/node-media-server';
10+
import { runUpdate as runUpdate_encode } from './workers/encode-service';
1011
import { logger } from './helpers/logger';
1112

1213
process.on('unhandledRejection', (error, p) => {
@@ -47,4 +48,5 @@ if (typeof API.PORT === 'string') {
4748
runUpdate_kms();
4849
runUpdate_ams();
4950
runUpdate_nms();
51+
runUpdate_encode();
5052
})();

src/models/stream.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ export enum ApiSourceEnum {
99
KLPQ_MEDIA_SERVER = 'klpq_media_server',
1010
NODE_MEDIA_SERVER = 'node_media_server',
1111
ADOBE_MEDIA_SERVER = 'adobe_media_server',
12+
ENCODE_SERVICE = 'encode_service',
1213
}
1314

1415
export interface IStreamModel extends Document {

src/workers/_base.ts

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,7 @@ export interface IGenericStreamsResponse {
4242
export abstract class BaseWorker {
4343
abstract apiSource: ApiSourceEnum;
4444

45-
abstract getStats(
46-
API_HOST: string,
47-
API_TOKEN: string,
48-
): Promise<IGenericStreamsResponse[]>;
45+
abstract getStats(config: IWorkerConfig): Promise<IGenericStreamsResponse[]>;
4946

5047
public async runUpdate(servers: IWorkerConfig[]) {
5148
await Promise.all(
@@ -89,9 +86,9 @@ export abstract class BaseWorker {
8986
}
9087

9188
private async readStats(config: IWorkerConfig) {
92-
const { NAME, API_HOST, API_TOKEN } = config;
89+
const { NAME } = config;
9390

94-
const data = await this.getStats(API_HOST, API_TOKEN);
91+
const data = await this.getStats(config);
9592

9693
const stats: ILiveStats[0] = {};
9794

src/workers/adobe-media-server.ts

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import axios from 'axios';
22
import * as _ from 'lodash';
33

4-
import { ADOBE_MEDIA_SERVER } from '../config';
4+
import { ADOBE_MEDIA_SERVER, IWorkerConfig } from '../config';
55
import { ApiSourceEnum } from '../models/stream';
66
import { BaseWorker, IGenericStreamsResponse } from './_base';
77

@@ -37,15 +37,12 @@ interface IApiResponse {
3737
class MediaServerWorker extends BaseWorker {
3838
apiSource = ApiSourceEnum.ADOBE_MEDIA_SERVER;
3939

40-
async getStats(
41-
host: string,
42-
token: string,
43-
): Promise<IGenericStreamsResponse[]> {
40+
async getStats(config: IWorkerConfig): Promise<IGenericStreamsResponse[]> {
4441
const {
4542
data: { stats: data },
46-
} = await axios.get<IApiResponse>(`${host}/v1/streams`, {
43+
} = await axios.get<IApiResponse>(`${config.API_HOST}/v1/streams`, {
4744
headers: {
48-
token,
45+
token: config.API_TOKEN,
4946
},
5047
});
5148

src/workers/encode-service.ts

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
import axios from 'axios';
2+
import { IncomingMessage } from 'http';
3+
import * as _ from 'lodash';
4+
5+
import { ENCODE_SERVICE, IWorkerConfig, KLPQ_MEDIA_SERVER } from '../config';
6+
import { ApiSourceEnum } from '../models/stream';
7+
8+
import { BaseWorker, IGenericStreamsResponse } from './_base';
9+
10+
interface IApiResponse {
11+
stats: {
12+
app: string;
13+
channels: {
14+
channel: string;
15+
publisher: {
16+
connectId: string;
17+
connectCreated: Date;
18+
connectUpdated: Date;
19+
bytes: number;
20+
protocol: string;
21+
};
22+
subscribers: {
23+
connectId: string;
24+
connectCreated: Date;
25+
connectUpdated: Date;
26+
bytes: number;
27+
ip: string;
28+
protocol: string;
29+
}[];
30+
}[];
31+
}[];
32+
}
33+
34+
class MediaServerWorker extends BaseWorker {
35+
apiSource = ApiSourceEnum.ENCODE_SERVICE;
36+
37+
constructor(private host: string) {
38+
super();
39+
}
40+
41+
async getStats(config: IWorkerConfig): Promise<IGenericStreamsResponse[]> {
42+
const {
43+
request,
44+
data: { stats: data },
45+
} = await axios.get<IApiResponse>(
46+
`${config.API_HOST}/api/stats/${this.host}`,
47+
);
48+
49+
const stats: IGenericStreamsResponse[] = [];
50+
51+
_.forEach(data, (appStats) => {
52+
const { app } = appStats;
53+
54+
const liveApp: IGenericStreamsResponse = {
55+
app,
56+
channels: [],
57+
};
58+
59+
_.forEach(appStats.channels, (channelStats) => {
60+
const { channel } = channelStats;
61+
62+
const liveChannel: IGenericStreamsResponse['channels'][0] = {
63+
channel,
64+
publisher: null,
65+
subscribers: [],
66+
};
67+
68+
if (channelStats.publisher) {
69+
liveChannel.publisher = {
70+
...channelStats.publisher,
71+
app,
72+
channel: channelStats.channel,
73+
ip: (request as IncomingMessage).socket.remoteAddress,
74+
userId: null,
75+
};
76+
}
77+
78+
liveChannel.subscribers = channelStats.subscribers.map(
79+
(subscriber) => ({
80+
...subscriber,
81+
app,
82+
channel: channelStats.channel,
83+
userId: null,
84+
}),
85+
);
86+
87+
liveApp.channels.push(liveChannel);
88+
});
89+
90+
stats.push(liveApp);
91+
});
92+
93+
return stats;
94+
}
95+
}
96+
97+
export async function runUpdate() {
98+
await Promise.all(
99+
KLPQ_MEDIA_SERVER.map(async (config) => {
100+
const mediaServerWorker = new MediaServerWorker(config.HOSTS[0]);
101+
102+
await mediaServerWorker.run(ENCODE_SERVICE);
103+
}),
104+
);
105+
}

src/workers/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import {
44
KLPQ_MEDIA_SERVER,
55
NODE_MEDIA_SERVER,
66
ADOBE_MEDIA_SERVER,
7+
ENCODE_SERVICE,
78
} from '../config';
89
import { IStreamModel } from '../models/stream';
910
import { ISubscriberModel } from '../models/subscriber';
@@ -25,6 +26,7 @@ export const STREAM_SERVERS = [
2526
...KLPQ_MEDIA_SERVER,
2627
...NODE_MEDIA_SERVER,
2728
...ADOBE_MEDIA_SERVER,
29+
...ENCODE_SERVICE,
2830
];
2931

3032
export const liveStats: ILiveStats = {};

src/workers/klpq-media-server.ts

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import axios from 'axios';
22
import * as _ from 'lodash';
33
import { ObjectId } from 'mongodb';
44

5-
import { KLPQ_MEDIA_SERVER } from '../config';
5+
import { IWorkerConfig, KLPQ_MEDIA_SERVER } from '../config';
66
import { ApiSourceEnum } from '../models/stream';
77

88
import { BaseWorker, IGenericStreamsResponse } from './_base';
@@ -58,15 +58,12 @@ interface IApiResponse {
5858
class MediaServerWorker extends BaseWorker {
5959
apiSource = ApiSourceEnum.KLPQ_MEDIA_SERVER;
6060

61-
async getStats(
62-
host: string,
63-
token: string,
64-
): Promise<IGenericStreamsResponse[]> {
61+
async getStats(config: IWorkerConfig): Promise<IGenericStreamsResponse[]> {
6562
const {
6663
data: { stats: data },
67-
} = await axios.get<IApiResponse>(`${host}/api/streams`, {
64+
} = await axios.get<IApiResponse>(`${config.API_HOST}/api/streams`, {
6865
headers: {
69-
token,
66+
token: config.API_TOKEN,
7067
},
7168
});
7269

src/workers/node-media-server.ts

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import axios from 'axios';
22
import * as _ from 'lodash';
33

4-
import { NODE_MEDIA_SERVER } from '../config';
4+
import { IWorkerConfig, NODE_MEDIA_SERVER } from '../config';
55
import { ApiSourceEnum } from '../models/stream';
66

77
import { BaseWorker, IGenericStreamsResponse } from './_base';
@@ -47,15 +47,17 @@ interface IApiResponse {
4747
class MediaServerWorker extends BaseWorker {
4848
apiSource = ApiSourceEnum.NODE_MEDIA_SERVER;
4949

50-
async getStats(
51-
host: string,
52-
token: string,
53-
): Promise<IGenericStreamsResponse[]> {
54-
const { data } = await axios.get<IApiResponse>(`${host}/api/streams`, {
55-
headers: {
56-
Authorization: `Basic ${Buffer.from(token).toString('base64')}`,
50+
async getStats(config: IWorkerConfig): Promise<IGenericStreamsResponse[]> {
51+
const { data } = await axios.get<IApiResponse>(
52+
`${config.API_HOST}/api/streams`,
53+
{
54+
headers: {
55+
Authorization: `Basic ${Buffer.from(config.API_TOKEN).toString(
56+
'base64',
57+
)}`,
58+
},
5759
},
58-
});
60+
);
5961

6062
const connectUpdated = new Date();
6163

0 commit comments

Comments
 (0)