Skip to content

Commit 59b90c7

Browse files
authored
feat: Add parser (#142)
1 parent c68ed7d commit 59b90c7

File tree

10 files changed

+107
-54
lines changed

10 files changed

+107
-54
lines changed

README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,11 @@ documentation][docs]
6060
- `parseJson` (boolean, default is `true`). When set to true, uses `JSON.parse`
6161
to parse the data sent through PubSub.
6262

63+
- `parser` (function, optional). This option can be used to parse data coming
64+
from PubSub. The function must return data (this data is passed to the
65+
handler, as `data`). You can optionally throw an exception if the data could
66+
not be parsed, which in turn can be caught by `onError` if you'd like.
67+
6368
## Contributing
6469

6570
We love contributions! 🙏 Bug reports and pull requests are welcome on [GitHub].

examples/fastify-plugin.ts

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,26 @@
11
import { TypeBoxTypeProvider } from '@fastify/type-provider-typebox';
22
import Fastify from 'fastify';
3-
import { PubSubHandler, pubSubFastifyPlugin } from '../src';
3+
import { z } from 'zod';
4+
import { pubSubFastifyPlugin } from '../src';
45

5-
interface HandlerArguments {
6-
name: string;
7-
party: {
8-
name: string;
9-
};
10-
bookingId: string;
11-
}
6+
const inputData = z.object({
7+
name: z.string(),
8+
party: z.object({
9+
name: z.string(),
10+
}),
11+
bookingId: z.string(),
12+
});
1213

1314
const server = () => {
1415
const fastify = Fastify().withTypeProvider<TypeBoxTypeProvider>();
1516

16-
const handler: PubSubHandler<HandlerArguments> = ({ data, log }) => {
17-
const { name, party, bookingId } = data;
18-
log.info(`${name} from ${party.name} had a booking with id ${bookingId}`);
19-
};
20-
21-
fastify.register(pubSubFastifyPlugin, { handler });
17+
fastify.register(pubSubFastifyPlugin, {
18+
parser: d => inputData.parse(d),
19+
handler: ({ data, log }) => {
20+
const { name, party, bookingId } = data;
21+
log.info(`${name} from ${party.name} had a booking with id ${bookingId}`);
22+
},
23+
});
2224

2325
fastify.server.listen(8000);
2426
};

package.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
"@sinclair/typebox": "^0.24.0",
4646
"@types/express": "^4.17.13",
4747
"fastify": "^4.0.1",
48+
"fastify-plugin": "^4.2.0",
4849
"pino": "^8.0.0",
4950
"pino-cloud-logging": "^1.0.3"
5051
},
@@ -64,6 +65,7 @@
6465
"ts-jest": "^28.0.4",
6566
"typedoc": "^0.23.7",
6667
"typedoc-plugin-markdown": "^3.1.1",
67-
"typescript": "^4.3"
68+
"typescript": "^4.3",
69+
"zod": "^3.18.0"
6870
}
6971
}

src/__tests__/common.test.ts

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,4 +65,27 @@ describe('common', () => {
6565
log: {} as pino.Logger,
6666
});
6767
});
68+
69+
it('should run parser', async () => {
70+
const message = createPubSubdata({}, true);
71+
const handler = jest.fn();
72+
const parser = jest.fn(_ => '');
73+
74+
await handlePubSubMessage({
75+
message,
76+
handler,
77+
parser: data => parser(data),
78+
context: { hello: 'there' },
79+
log: {} as pino.Logger,
80+
});
81+
82+
expect(handler).toHaveBeenCalledWith({
83+
message,
84+
data: '',
85+
context: { hello: 'there' },
86+
log: {} as pino.Logger,
87+
});
88+
89+
expect(parser).toHaveBeenCalledWith({});
90+
});
6891
});

src/common.ts

Lines changed: 9 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,24 @@
11
import pino from 'pino';
22
import { gcpLogOptions } from 'pino-cloud-logging';
3-
import {
4-
PubSubHandler,
5-
PubSubHandlerResponse,
6-
PubSubMessageType,
7-
} from './types';
3+
import { HandlePubSubMessageArgs, PubSubHandlerResponse } from './types';
84

9-
export interface HandlePubSubMessageArgs<Context = unknown> {
10-
message: PubSubMessageType;
11-
handler: PubSubHandler;
12-
parseJson?: boolean;
13-
context?: Context;
14-
log?: pino.Logger;
15-
}
16-
17-
export async function handlePubSubMessage<Context = unknown>(
18-
args: HandlePubSubMessageArgs<Context>,
5+
export async function handlePubSubMessage<Data, Context>(
6+
args: HandlePubSubMessageArgs<Data, Context>,
197
): Promise<PubSubHandlerResponse | void> {
208
const {
219
message,
2210
parseJson = true,
11+
parser,
2312
handler,
2413
context,
2514
log = pino(gcpLogOptions()),
2615
} = args;
27-
let data = Buffer.from(message.data, 'base64').toString().trim();
16+
const bufferString = Buffer.from(message.data, 'base64').toString().trim();
17+
18+
let data = parseJson ? JSON.parse(bufferString) : bufferString;
2819

29-
if (parseJson) {
30-
data = JSON.parse(data);
20+
if (parser) {
21+
data = await parser(data);
3122
}
3223

3324
return handler({

src/methods/cloud-functions.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ import { gcpLogOptions } from 'pino-cloud-logging';
44
import { handlePubSubMessage } from '../common';
55
import { PubSubConfig, PubSubHandler } from '../types';
66

7-
export interface PubSubCloudFunctionsConfig
8-
extends Omit<PubSubConfig, 'handler' | 'path'> {
7+
export interface PubSubCloudFunctionsConfig<Data, Context>
8+
extends Omit<PubSubConfig<Data, Context>, 'handler' | 'path'> {
99
logger?: pino.LoggerOptions;
1010
}
1111

@@ -14,9 +14,9 @@ export type CloudFunctionFun = (
1414
res: express.Response,
1515
) => Promise<void>;
1616

17-
export function createPubSubCloudFunctions<T = unknown>(
18-
handler: PubSubHandler<T>,
19-
options: PubSubCloudFunctionsConfig = {},
17+
export function createPubSubCloudFunctions<Data = unknown, Context = unknown>(
18+
handler: PubSubHandler<Data, Context>,
19+
options: PubSubCloudFunctionsConfig<Data, Context> = {},
2020
): CloudFunctionFun {
2121
const { parseJson, onError, logger } = options;
2222
return async (req, res): Promise<void> => {

src/methods/fastify-plugin.ts

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
1-
import { FastifyPluginAsync, FastifyRequest } from 'fastify';
1+
import { FastifyInstance } from 'fastify';
2+
import fp from 'fastify-plugin';
23
import { handlePubSubMessage } from '../common';
34
import { PubSubConfig, PubSubRequest, PubSubRequestType } from '../types';
45

5-
export const pubSubFastifyPlugin: FastifyPluginAsync<PubSubConfig> = async (
6-
fastify,
7-
options,
8-
) => {
6+
const pubSubFastifyPluginFn = async <Data, Context>(
7+
fastify: FastifyInstance,
8+
options: PubSubConfig<Data, Context>,
9+
): Promise<void> => {
910
const { path = '/', handler, parseJson, onError } = options;
1011
fastify.post<{
1112
Body: PubSubRequestType;
@@ -19,11 +20,11 @@ export const pubSubFastifyPlugin: FastifyPluginAsync<PubSubConfig> = async (
1920
},
2021
async (req, reply) => {
2122
try {
22-
const res = await handlePubSubMessage<FastifyRequest>({
23+
const res = await handlePubSubMessage({
2324
message: req.body.message,
2425
handler,
2526
parseJson,
26-
context: req,
27+
// context: req,
2728
log: req.log,
2829
});
2930

@@ -38,3 +39,8 @@ export const pubSubFastifyPlugin: FastifyPluginAsync<PubSubConfig> = async (
3839
},
3940
);
4041
};
42+
43+
export const pubSubFastifyPlugin = fp(pubSubFastifyPluginFn, {
44+
name: 'pubsub-http-handler',
45+
fastify: '4.x',
46+
});

src/methods/server.ts

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@ import Fastify, { FastifyInstance, FastifyServerOptions } from 'fastify';
22
import { PubSubConfig, PubSubHandler } from '../types';
33
import { pubSubFastifyPlugin } from './fastify-plugin';
44

5-
export interface PubSubServerConfig extends Omit<PubSubConfig, 'handler'> {
5+
export interface PubSubServerConfig<Data, Context>
6+
extends Omit<PubSubConfig<Data, Context>, 'handler'> {
67
/**
78
* Will automatically pick up PORT environment variable.
89
* @default 8000
@@ -31,9 +32,9 @@ export interface CreatePubSubHandlerResponse {
3132
fastify: FastifyInstance;
3233
}
3334

34-
export function createPubSubServer<T = unknown>(
35-
handler: PubSubHandler<T>,
36-
config: PubSubServerConfig = {},
35+
export function createPubSubServer<Data, Context>(
36+
handler: PubSubHandler<Data, Context>,
37+
config: PubSubServerConfig<Data, Context> = {},
3738
): CreatePubSubHandlerResponse {
3839
let { host = '0.0.0.0' } = config;
3940
const {

src/types.ts

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,29 @@
11
import { Static, Type } from '@sinclair/typebox';
22
import { FastifyLoggerInstance } from 'fastify';
3+
import pino from 'pino';
34

4-
export interface PubSubConfig {
5+
export interface PubSubConfig<Data, Context> {
56
/**
67
* Handler
78
*/
8-
handler: PubSubHandler;
9+
handler: PubSubHandler<Data, Context>;
910
/**
1011
* OnError Handler
1112
*
1213
* When this is set, errors will not be
1314
* thrown.
1415
*/
1516
onError?: OnErrorHandler;
17+
18+
parser?: (data: unknown) => Data | Promise<Data>;
1619
/**
1720
* This will run JSON.parse on request data
1821
*
1922
* **Tip**: `false` when sending strings
2023
* @default true
2124
*/
2225
parseJson?: boolean; // Defaults to true
26+
2327
/**
2428
* Use this to set a different path
2529
* @default /
@@ -46,11 +50,20 @@ export class PubSubHandlerResponse {
4650
statusCode?: number;
4751
}
4852

49-
export type PubSubHandler<T = any> = (args: {
53+
export type PubSubHandler<Data, Context> = (args: {
5054
message: PubSubMessageType;
51-
data?: T;
52-
context?: unknown;
55+
data: Data;
56+
context?: Context;
5357
log: FastifyLoggerInstance;
5458
}) => Promise<PubSubHandlerResponse | void> | PubSubHandlerResponse | void;
5559

5660
export type OnErrorHandler = (error: unknown) => void | Promise<void>;
61+
62+
export interface HandlePubSubMessageArgs<Data, Context> {
63+
message: PubSubMessageType;
64+
handler: PubSubHandler<Data, Context>;
65+
parseJson?: boolean;
66+
parser?: (data: unknown) => Data;
67+
context?: Context;
68+
log?: pino.Logger;
69+
}

yarn.lock

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2022,6 +2022,11 @@ fast-uri@^2.1.0:
20222022
resolved "https://registry.yarnpkg.com/fast-uri/-/fast-uri-2.1.0.tgz#9279432d6b53675c90116b947ed2bbba582d6fb5"
20232023
integrity sha512-qKRta6N7BWEFVlyonVY/V+BMLgFqktCUV0QjT259ekAIlbVrMaFnFLxJ4s/JPl4tou56S1BzPufI60bLe29fHA==
20242024

2025+
fastify-plugin@^4.2.0:
2026+
version "4.2.0"
2027+
resolved "https://registry.yarnpkg.com/fastify-plugin/-/fastify-plugin-4.2.0.tgz#fb28ec08312777ad343059177f13277ca3e7c3b1"
2028+
integrity sha512-hovKzEXZc2YgeuXn41/2EA/IaIOdRu1pB9WKgnzDBj3lhKSdDCEsckHa7I6LiT/LhflvAQX7ZY8IQ6eBX0htTg==
2029+
20252030
fastify@^4.0.1:
20262031
version "4.2.0"
20272032
resolved "https://registry.yarnpkg.com/fastify/-/fastify-4.2.0.tgz#e8a56875083cab0fb0f1dd844b31f0fd145a6900"
@@ -4492,3 +4497,8 @@ yargs@^17.3.1:
44924497
string-width "^4.2.3"
44934498
y18n "^5.0.5"
44944499
yargs-parser "^21.0.0"
4500+
4501+
zod@^3.18.0:
4502+
version "3.18.0"
4503+
resolved "https://registry.yarnpkg.com/zod/-/zod-3.18.0.tgz#2eed58b3cafb8d9a67aa2fee69279702f584f3bc"
4504+
integrity sha512-gwTm8RfUCe8l9rDwN5r2A17DkAa8Ez4Yl4yXqc5VqeGaXaJahzYYXbTwvhroZi0SNBqTwh/bKm2N0mpCzuw4bA==

0 commit comments

Comments
 (0)