Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 100 additions & 4 deletions packages/rxjs/src/internal/observable/dom/fetch.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { operate, Observable, from } from '@rxjs/observable';
import type { ObservableInput } from '../../types.js';
import { ServerSentEventItem, handleServerSentEvents } from './sse.js';

export function fromFetch<T>(
input: string | Request,
Expand All @@ -9,7 +10,30 @@ export function fromFetch<T>(
): Observable<T>;

export function fromFetch(input: string | Request, init?: RequestInit): Observable<Response>;

export function fromFetch(
input: string | Request,
init: RequestInit & {
selector: 'sse' | 'server-sent-event';
}
): Observable<ServerSentEventItem>;
export function fromFetch(
input: string | Request,
init: RequestInit & {
selector: 'text';
}
): Observable<string>;
export function fromFetch(
input: string | Request,
init: RequestInit & {
selector: 'formdata' | 'form-data';
}
): Observable<FormData>;
export function fromFetch(
input: string | Request,
init: RequestInit & {
selector: 'arraybuffer' | 'array-buffer';
}
): Observable<ArrayBuffer>;
/**
* Uses [the Fetch API](https://developer.mozilla.org/en-US/docs/Web/API/Fetch_API) to
* make an HTTP request.
Expand All @@ -26,7 +50,7 @@ export function fromFetch(input: string | Request, init?: RequestInit): Observab
*
* ## Examples
*
* Basic use
* ### Basic use
*
* ```ts
* import { fromFetch } from 'rxjs/fetch';
Expand Down Expand Up @@ -84,6 +108,35 @@ export function fromFetch(input: string | Request, init?: RequestInit): Observab
* });
* ```
*
* ### Server-Sent Events
*
* To receive response in stream mode (Server-Sent Events).
*
* ```ts
* import { of } from 'rxjs';
* import { fromFetch } from 'rxjs/fetch';
*
* // URL_WITH_STREAM_MODE = "…";
* // REQUEST_BODY = {…};
*
* const data$ = fromFetch(URL_WITH_STREAM_MODE, {
* selector: 'sse'
* method: "POST",
* body: JSON.stringify(REQUEST_BODY),
* mode: "cors",
* credentials: "include",
* headers: {
* "Content-Type": "application/json",
* "Accept": "text/event-stream, application/json"
* },
* });
*
* data$.subscribe({
* next: result => console.log(result),
* complete: () => console.log('done')
* });
* ```
*
* @param input The resource you would like to fetch. Can be a url or a request object.
* @param initWithSelector A configuration object for the fetch.
* [See MDN for more details](https://developer.mozilla.org/en-US/docs/Web/API/WindowOrWorkerGlobalScope/fetch#Parameters)
Expand All @@ -93,10 +146,11 @@ export function fromFetch(input: string | Request, init?: RequestInit): Observab
export function fromFetch<T>(
input: string | Request,
initWithSelector: RequestInit & {
selector?: (response: Response) => ObservableInput<T>;
selector?: 'json' | 'text' | 'formdata' | 'form-data' | 'arraybuffer' | 'array-buffer' | 'sse' | 'server-sent-event' | ((response: Response) => ObservableInput<T>);
} = {}
): Observable<Response | T> {
const { selector, ...init } = initWithSelector;
const { selector: originalSelector, ...init } = initWithSelector;
let selector = originalSelector;
return new Observable<Response | T>((destination) => {
// Our controller for aborting this fetch.
// Any externally provided AbortSignal will have to call
Expand Down Expand Up @@ -148,6 +202,48 @@ export function fromFetch<T>(
// If we have a selector function, use it to project our response.
// Note that any error that comes from our selector will be
// sent to the promise `catch` below and handled.

if (typeof selector === 'string') {
// The field `selector` can be an enum to return
// in the specific type.
switch (selector.toLowerCase()) {
case 'json':
selector = response => response.json();
break;
case 'text':
selector = response => response.text() as Promise<T>;
break;
case 'formdata':
case 'form-data':
selector = response => response.formData() as Promise<T>;
break;
case 'blob':
selector = response => response.blob() as Promise<T>;
break;
case 'arraybuffer':
case 'array-buffer':
selector = response => response.arrayBuffer() as Promise<T>;
break;
case 'sse':
case 'server-sent-event':
// Handle Server-Sent Events.
handleServerSentEvents(response, null, item => {
// Notify the SSE item.
destination.next(item as T);
}).then(arr => {
abortable = false;
destination.complete();
return arr;
}, handleError);
return;
default:
abortable = false;
destination.error('selector does not support');
return;
}
}

// Otherwise, `selector` is the function to project response.
from(selector(response)).subscribe(
operate({
destination,
Expand Down
140 changes: 140 additions & 0 deletions packages/rxjs/src/internal/observable/dom/sse.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/**
* The record info of Server-Sent Events item.
*/
export class ServerSentEventItem {
private source: Record<string, string>;
private dataParsedInJson: Record<string, unknown> | undefined;

/**
* Initializes a new instance of the ServerSentEventItem class.
* @param source
*/
constructor(source: string) {
this.source = {};

// Parse each field.
(source || "").split("\n").forEach(line => {
const pos = line.indexOf(":");
if (pos < 0) return;
const key = line.substring(0, pos);
const value = line.substring(pos + 1);
if (!this.source[key] || (key !== "data" && key !== "")) this.source[key] = value;
else this.source[key] += value;
});
}

/**
* Gets the event name.
*/
get event() {
return this.source.event || "message";
}

/**
* Gets the data in string.
*/
get data() {
return this.source.data;
}

/**
* Gets the event identifier.
*/
get id() {
return this.source.id;
}

/**
* Gets the comment.
*/
get comment() {
return this.source[""];
}

/**
* Gets the retry in millisecond.
*/
get retry() {
return this.source.retry ? parseInt(this.source.retry, 10) : undefined;
}

/**
* Gets the data in JSON.
* @returns The data object parsed by JSON.
*/
dataJson<T = Record<string, unknown>>() {
const data = this.source.data;
if (!data) return undefined;
if (!this.dataParsedInJson) this.dataParsedInJson = JSON.parse(data);
return this.dataParsedInJson as T;
}

/**
* Gets a specific field.
* @param key The field key.
* @returns The raw value of the field.
*/
get(key: string) {
return this.source[key];
}
}

/**
* Handles the response of Server-Sent Events.
* @param response The response of `fetch`.
* @param decoder The decoder. Default is to use UTF-8 text decoder.
* @param callback The callback for each item.
* @returns The `ServerSentEventItem` array parsed.
*/
export async function handleServerSentEvents(response: Response, decoder: TextDecoder | null, callback: ((item: ServerSentEventItem) => void)) {
// Response body is required.
if (!response.body) return Promise.reject("no response body");

// Get reader to read content partially into buffer.
// Then parse buffer to `ServerSentEventItem` instances.
const reader = response.body.getReader();
let buffer = "";
const arr: ServerSentEventItem[] = [];
if (!decoder) decoder = new TextDecoder('utf-8');

// Read in loop.
while (true) {
const { done, value } = await reader.read();
if (done) { // Ending handler.
convertSse(buffer, arr, callback);
return arr;
}

// Decode the array buffer and append to buffer.
buffer += decoder.decode(value, { stream: true });

// Check the double new line (`\n\n`) seperator.
const messages = buffer.split("\n\n");
if (messages.length < 2) continue;

// Remove the last one becuase it may not complete.
buffer = messages.pop() || "";

// Parse each content into `ServerSentEventItem` instances.
messages.forEach(msg => {
convertSse(msg, arr, callback);
});
}
}

/**
* Parses and raise callback for each.
* @param msg The item content in string.
* @param arr The `ServerSentEventItem` to push.
* @param callback The callback handler.
* @returns The instance parsed.
*/
function convertSse(msg: string, arr: ServerSentEventItem[], callback: ((item: ServerSentEventItem) => void)) {
if (!msg) return;

// Parse and raise callback for each.
const sse = new ServerSentEventItem(msg);
arr.push(sse);
callback(sse);
return sse;
}