diff --git a/packages/rxjs/src/internal/observable/dom/fetch.ts b/packages/rxjs/src/internal/observable/dom/fetch.ts index c0c26cadd9..3b19cff7cb 100644 --- a/packages/rxjs/src/internal/observable/dom/fetch.ts +++ b/packages/rxjs/src/internal/observable/dom/fetch.ts @@ -1,5 +1,6 @@ import { operate, Observable, from } from '@rxjs/observable'; import type { ObservableInput } from '../../types.js'; +import { ServerSentEventItem, handleServerSentEvents } from './sse.js'; export function fromFetch( input: string | Request, @@ -9,7 +10,30 @@ export function fromFetch( ): Observable; export function fromFetch(input: string | Request, init?: RequestInit): Observable; - +export function fromFetch( + input: string | Request, + init: RequestInit & { + selector: 'sse' | 'server-sent-event'; + } +): Observable; +export function fromFetch( + input: string | Request, + init: RequestInit & { + selector: 'text'; + } +): Observable; +export function fromFetch( + input: string | Request, + init: RequestInit & { + selector: 'formdata' | 'form-data'; + } +): Observable; +export function fromFetch( + input: string | Request, + init: RequestInit & { + selector: 'arraybuffer' | 'array-buffer'; + } +): Observable; /** * Uses [the Fetch API](https://developer.mozilla.org/en-US/docs/Web/API/Fetch_API) to * make an HTTP request. @@ -26,7 +50,7 @@ export function fromFetch(input: string | Request, init?: RequestInit): Observab * * ## Examples * - * Basic use + * ### Basic use * * ```ts * import { fromFetch } from 'rxjs/fetch'; @@ -84,6 +108,35 @@ export function fromFetch(input: string | Request, init?: RequestInit): Observab * }); * ``` * + * ### Server-Sent Events + * + * To receive response in stream mode (Server-Sent Events). + * + * ```ts + * import { of } from 'rxjs'; + * import { fromFetch } from 'rxjs/fetch'; + * + * // URL_WITH_STREAM_MODE = "…"; + * // REQUEST_BODY = {…}; + * + * const data$ = fromFetch(URL_WITH_STREAM_MODE, { + * selector: 'sse' + * method: "POST", + * body: JSON.stringify(REQUEST_BODY), + * mode: "cors", + * credentials: "include", + * headers: { + * "Content-Type": "application/json", + * "Accept": "text/event-stream, application/json" + * }, + * }); + * + * data$.subscribe({ + * next: result => console.log(result), + * complete: () => console.log('done') + * }); + * ``` + * * @param input The resource you would like to fetch. Can be a url or a request object. * @param initWithSelector A configuration object for the fetch. * [See MDN for more details](https://developer.mozilla.org/en-US/docs/Web/API/WindowOrWorkerGlobalScope/fetch#Parameters) @@ -93,10 +146,11 @@ export function fromFetch(input: string | Request, init?: RequestInit): Observab export function fromFetch( input: string | Request, initWithSelector: RequestInit & { - selector?: (response: Response) => ObservableInput; + selector?: 'json' | 'text' | 'formdata' | 'form-data' | 'arraybuffer' | 'array-buffer' | 'sse' | 'server-sent-event' | ((response: Response) => ObservableInput); } = {} ): Observable { - const { selector, ...init } = initWithSelector; + const { selector: originalSelector, ...init } = initWithSelector; + let selector = originalSelector; return new Observable((destination) => { // Our controller for aborting this fetch. // Any externally provided AbortSignal will have to call @@ -148,6 +202,48 @@ export function fromFetch( // If we have a selector function, use it to project our response. // Note that any error that comes from our selector will be // sent to the promise `catch` below and handled. + + if (typeof selector === 'string') { + // The field `selector` can be an enum to return + // in the specific type. + switch (selector.toLowerCase()) { + case 'json': + selector = response => response.json(); + break; + case 'text': + selector = response => response.text() as Promise; + break; + case 'formdata': + case 'form-data': + selector = response => response.formData() as Promise; + break; + case 'blob': + selector = response => response.blob() as Promise; + break; + case 'arraybuffer': + case 'array-buffer': + selector = response => response.arrayBuffer() as Promise; + break; + case 'sse': + case 'server-sent-event': + // Handle Server-Sent Events. + handleServerSentEvents(response, null, item => { + // Notify the SSE item. + destination.next(item as T); + }).then(arr => { + abortable = false; + destination.complete(); + return arr; + }, handleError); + return; + default: + abortable = false; + destination.error('selector does not support'); + return; + } + } + + // Otherwise, `selector` is the function to project response. from(selector(response)).subscribe( operate({ destination, diff --git a/packages/rxjs/src/internal/observable/dom/sse.ts b/packages/rxjs/src/internal/observable/dom/sse.ts new file mode 100644 index 0000000000..0a132b8f29 --- /dev/null +++ b/packages/rxjs/src/internal/observable/dom/sse.ts @@ -0,0 +1,140 @@ +/** + * The record info of Server-Sent Events item. + */ +export class ServerSentEventItem { + private source: Record; + private dataParsedInJson: Record | undefined; + + /** + * Initializes a new instance of the ServerSentEventItem class. + * @param source + */ + constructor(source: string) { + this.source = {}; + + // Parse each field. + (source || "").split("\n").forEach(line => { + const pos = line.indexOf(":"); + if (pos < 0) return; + const key = line.substring(0, pos); + const value = line.substring(pos + 1); + if (!this.source[key] || (key !== "data" && key !== "")) this.source[key] = value; + else this.source[key] += value; + }); + } + + /** + * Gets the event name. + */ + get event() { + return this.source.event || "message"; + } + + /** + * Gets the data in string. + */ + get data() { + return this.source.data; + } + + /** + * Gets the event identifier. + */ + get id() { + return this.source.id; + } + + /** + * Gets the comment. + */ + get comment() { + return this.source[""]; + } + + /** + * Gets the retry in millisecond. + */ + get retry() { + return this.source.retry ? parseInt(this.source.retry, 10) : undefined; + } + + /** + * Gets the data in JSON. + * @returns The data object parsed by JSON. + */ + dataJson>() { + const data = this.source.data; + if (!data) return undefined; + if (!this.dataParsedInJson) this.dataParsedInJson = JSON.parse(data); + return this.dataParsedInJson as T; + } + + /** + * Gets a specific field. + * @param key The field key. + * @returns The raw value of the field. + */ + get(key: string) { + return this.source[key]; + } +} + +/** + * Handles the response of Server-Sent Events. + * @param response The response of `fetch`. + * @param decoder The decoder. Default is to use UTF-8 text decoder. + * @param callback The callback for each item. + * @returns The `ServerSentEventItem` array parsed. + */ +export async function handleServerSentEvents(response: Response, decoder: TextDecoder | null, callback: ((item: ServerSentEventItem) => void)) { + // Response body is required. + if (!response.body) return Promise.reject("no response body"); + + // Get reader to read content partially into buffer. + // Then parse buffer to `ServerSentEventItem` instances. + const reader = response.body.getReader(); + let buffer = ""; + const arr: ServerSentEventItem[] = []; + if (!decoder) decoder = new TextDecoder('utf-8'); + + // Read in loop. + while (true) { + const { done, value } = await reader.read(); + if (done) { // Ending handler. + convertSse(buffer, arr, callback); + return arr; + } + + // Decode the array buffer and append to buffer. + buffer += decoder.decode(value, { stream: true }); + + // Check the double new line (`\n\n`) seperator. + const messages = buffer.split("\n\n"); + if (messages.length < 2) continue; + + // Remove the last one becuase it may not complete. + buffer = messages.pop() || ""; + + // Parse each content into `ServerSentEventItem` instances. + messages.forEach(msg => { + convertSse(msg, arr, callback); + }); + } +} + +/** + * Parses and raise callback for each. + * @param msg The item content in string. + * @param arr The `ServerSentEventItem` to push. + * @param callback The callback handler. + * @returns The instance parsed. + */ +function convertSse(msg: string, arr: ServerSentEventItem[], callback: ((item: ServerSentEventItem) => void)) { + if (!msg) return; + + // Parse and raise callback for each. + const sse = new ServerSentEventItem(msg); + arr.push(sse); + callback(sse); + return sse; +}