Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
## [Unreleased](https://github.com/openfga/js-sdk/compare/v0.9.0...HEAD)

- feat: add support for handling Retry-After header (#267)
- feat: add support for [StreamedListObjects](https://openfga.dev/api/service#/Relationship%20Queries/StreamedListObjects)
with streaming semantics. See [documentation](https://github.com/openfga/js-sdk/blob/main/README.md#streamed-list-objects) for more.
- feat: add support for conflict options for Write operations**: (#276)
The client now supports setting `conflict` on `ClientWriteRequestOpts` to control behavior when writing duplicate tuples or deleting non-existent tuples. This feature requires OpenFGA server [v1.10.0](https://github.com/openfga/openfga/releases/tag/v1.10.0) or later.
See [Conflict Options for Write Operations](./README.md#conflict-options-for-write-operations) for more.
Expand Down
26 changes: 26 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -739,6 +739,32 @@ const response = await fgaClient.listObjects({
// response.objects = ["document:0192ab2a-d83f-756d-9397-c5ed9f3cb69a"]
```

##### Streamed List Objects

The Streamed ListObjects API is very similar to the ListObjects API, with two differences:

1. Instead of collecting all objects before returning a response, it streams them to the client as they are collected.
2. The number of results returned is only limited by the execution timeout specified in the flag `OPENFGA_LIST_OBJECTS_DEADLINE`.

[API Documentation](https://openfga.dev/api/service#/Relationship%20Queries/StreamedListObjects)

```javascript
const options = {};

// To override the authorization model id for this request
options.authorizationModelId = "01GXSA8YR785C4FYS3C0RTG7B1";

const objects = [];
for await (const response of fgaClient.streamedListObjects(
{ user: "user:anne", relation: "can_read", type: "document" },
{ consistency: ConsistencyPreference.HigherConsistency }
)) {
objects.push(response.object);
}

// objects = ["document:0192ab2a-d83f-756d-9397-c5ed9f3cb69a"]
```

##### List Relations

List the relations a user has with an object. This wraps around [BatchCheck](#batchcheck) to allow checking multiple relationships at once.
Expand Down
83 changes: 83 additions & 0 deletions api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import {
serializeDataIfNeeded,
toPathString,
createRequestFunction,
createStreamingRequestFunction,
RequestArgs,
CallResult,
PromiseResult
Expand Down Expand Up @@ -383,6 +384,45 @@ export const OpenFgaApiAxiosParamCreator = function (configuration: Configuratio
options: localVarRequestOptions,
};
},
/**
* The Streamed ListObjects API is very similar to the ListObjects API, with two differences:
* 1. Instead of collecting all objects before returning a response, it streams them to the client as they are collected.
* 2. The number of results returned is only limited by the execution timeout specified in the flag OPENFGA_LIST_OBJECTS_DEADLINE.
* @summary Stream all objects of the given type that the user has a relation with
* @param {string} storeId
* @param {ListObjectsRequest} body
* @param {*} [options] Override http request option.
* @throws { FgaError }
*/
streamedListObjects: (storeId: string, body: ListObjectsRequest, options: any = {}): RequestArgs => {
// verify required parameter 'storeId' is not null or undefined
assertParamExists("streamedListObjects", "storeId", storeId);
// verify required parameter 'body' is not null or undefined
assertParamExists("streamedListObjects", "body", body);
const localVarPath = "/stores/{store_id}/streamed-list-objects"
.replace(`{${"store_id"}}`, encodeURIComponent(String(storeId)));
// use dummy base URL string because the URL constructor only accepts absolute URLs.
const localVarUrlObj = new URL(localVarPath, DUMMY_BASE_URL);
let baseOptions;
if (configuration) {
baseOptions = configuration.baseOptions;
}

const localVarRequestOptions = { method: "POST", ...baseOptions, ...options };
const localVarHeaderParameter = {} as any;
const localVarQueryParameter = {} as any;

localVarHeaderParameter["Content-Type"] = "application/json";

setSearchParams(localVarUrlObj, localVarQueryParameter, options.query);
localVarRequestOptions.headers = { ...localVarHeaderParameter, ...options.headers };
localVarRequestOptions.data = serializeDataIfNeeded(body, localVarRequestOptions);

return {
url: toPathString(localVarUrlObj),
options: localVarRequestOptions,
};
},
/**
* Returns a paginated list of OpenFGA stores and a continuation token to get additional stores. The continuation token will be empty if there are no more stores.
* @summary List all stores
Expand Down Expand Up @@ -912,6 +952,22 @@ export const OpenFgaApiFp = function(configuration: Configuration, credentials:
...TelemetryAttributes.fromRequestBody(body)
});
},
/**
* The Streamed ListObjects API is very similar to the ListObjects API, with two differences:
* 1. Instead of collecting all objects before returning a response, it streams them to the client as they are collected.
* 2. The number of results returned is only limited by the execution timeout specified in the flag OPENFGA_LIST_OBJECTS_DEADLINE.
* @summary Stream all objects of the given type that the user has a relation with
* @param {string} storeId
* @param {ListObjectsRequest} body
* @param {*} [options] Override http request option.
* @throws { FgaError }
*/
async streamedListObjects(storeId: string, body: ListObjectsRequest, options?: any): Promise<(axios?: AxiosInstance) => Promise<any>> {
const localVarAxiosArgs = localVarAxiosParamCreator.streamedListObjects(storeId, body, options);
return createStreamingRequestFunction(localVarAxiosArgs, globalAxios, configuration, credentials, {
[TelemetryAttribute.FgaClientRequestMethod]: "StreamedListObjects"
});
},
/**
* Returns a paginated list of OpenFGA stores and a continuation token to get additional stores. The continuation token will be empty if there are no more stores.
* @summary List all stores
Expand Down Expand Up @@ -1156,6 +1212,19 @@ export const OpenFgaApiFactory = function (configuration: Configuration, credent
listObjects(storeId: string, body: ListObjectsRequest, options?: any): PromiseResult<ListObjectsResponse> {
return localVarFp.listObjects(storeId, body, options).then((request) => request(axios));
},
/**
* The Streamed ListObjects API is very similar to the ListObjects API, with two differences:
* 1. Instead of collecting all objects before returning a response, it streams them to the client as they are collected.
* 2. The number of results returned is only limited by the execution timeout specified in the flag OPENFGA_LIST_OBJECTS_DEADLINE.
* @summary Stream all objects of the given type that the user has a relation with
* @param {string} storeId
* @param {ListObjectsRequest} body
* @param {*} [options] Override http request option.
* @throws { FgaError }
*/
streamedListObjects(storeId: string, body: ListObjectsRequest, options?: any): Promise<any> {
return localVarFp.streamedListObjects(storeId, body, options).then((request) => request(axios));
},
/**
* Returns a paginated list of OpenFGA stores and a continuation token to get additional stores. The continuation token will be empty if there are no more stores.
* @summary List all stores
Expand Down Expand Up @@ -1370,6 +1439,20 @@ export class OpenFgaApi extends BaseAPI {
return OpenFgaApiFp(this.configuration, this.credentials).listObjects(storeId, body, options).then((request) => request(this.axios));
}

/**
* The Streamed ListObjects API is very similar to the ListObjects API, with two differences:
* 1. Instead of collecting all objects before returning a response, it streams them to the client as they are collected.
* 2. The number of results returned is only limited by the execution timeout specified in the flag OPENFGA_LIST_OBJECTS_DEADLINE.
* @summary Stream all objects of the given type that the user has a relation with
* @param {string} storeId
* @param {ListObjectsRequest} body
* @param {*} [options] Override http request option.
* @throws { FgaError }
*/
public streamedListObjects(storeId: string, body: ListObjectsRequest, options?: any): Promise<any> {
return OpenFgaApiFp(this.configuration, this.credentials).streamedListObjects(storeId, body, options).then((request) => request(this.axios));
}

/**
* Returns a paginated list of OpenFGA stores and a continuation token to get additional stores. The continuation token will be empty if there are no more stores.
* @summary List all stores
Expand Down
15 changes: 15 additions & 0 deletions apiModel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -860,6 +860,21 @@ export interface ListObjectsResponse {
*/
objects: Array<string>;
}

/**
* The response for a StreamedListObjects RPC.
* @export
* @interface StreamedListObjectsResponse
*/
export interface StreamedListObjectsResponse {
/**
*
* @type {string}
* @memberof StreamedListObjectsResponse
*/
object: string;
}

/**
*
* @export
Expand Down
47 changes: 47 additions & 0 deletions client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import {
GetStoreResponse,
ListObjectsRequest,
ListObjectsResponse,
StreamedListObjectsResponse,
ListStoresResponse,
ListUsersRequest,
ListUsersResponse,
Expand Down Expand Up @@ -50,6 +51,7 @@ import {
} from "./utils";
import { isWellFormedUlidString } from "./validation";
import SdkConstants from "./constants";
import { parseNDJSONStream } from "./streaming";

export type UserClientConfigurationParams = UserConfigurationParams & {
storeId?: string;
Expand Down Expand Up @@ -847,6 +849,51 @@ export class OpenFgaClient extends BaseAPI {
}, options);
}

/**
* StreamedListObjects - Stream all objects of a particular type that the user has a certain relation to (evaluates)
*
* Note: This method is Node.js only. Streams are supported via the axios API layer.
* The response will be streamed as newline-delimited JSON objects.
*
* @param {ClientListObjectsRequest} body
* @param {ClientRequestOptsWithConsistency} [options]
* @param {string} [options.authorizationModelId] - Overrides the authorization model id in the configuration
* @param {object} [options.headers] - Custom headers to send alongside the request
* @param {ConsistencyPreference} [options.consistency] - The consistency preference to use
* @param {object} [options.retryParams] - Override the retry parameters for this request
* @param {number} [options.retryParams.maxRetry] - Override the max number of retries on each API request
* @param {number} [options.retryParams.minWaitInMs] - Override the minimum wait before a retry is initiated
* @returns {AsyncGenerator<StreamedListObjectsResponse>} An async generator that yields objects as they are received
*/
async *streamedListObjects(body: ClientListObjectsRequest, options: ClientRequestOptsWithConsistency = {}): AsyncGenerator<StreamedListObjectsResponse> {
const stream = await this.api.streamedListObjects(this.getStoreId(options)!, {
authorization_model_id: this.getAuthorizationModelId(options),
user: body.user,
relation: body.relation,
type: body.type,
context: body.context,
contextual_tuples: { tuple_keys: body.contextualTuples || [] },
consistency: options.consistency
}, options);

// Unwrap axios CallResult to get the raw Node.js stream when needed
const source = stream?.$response?.data ?? stream;

// Parse the Node.js stream
try {
for await (const item of parseNDJSONStream(source as any)) {
if (item && item.result && item.result.object) {
yield { object: item.result.object } as StreamedListObjectsResponse;
}
}
} finally {
// Ensure underlying HTTP connection closes if consumer stops early
if (source && typeof source.destroy === "function") {
try { source.destroy(); } catch { }
}
}
}

/**
* ListRelations - List all the relations a user has with an object (evaluates)
* @param {object} listRelationsRequest
Expand Down
71 changes: 71 additions & 0 deletions common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,77 @@ export const createRequestFunction = function (axiosArgs: RequestArgs, axiosInst
);
}

return result;
};
};

/**
* creates an axios streaming request function that returns the raw response stream
* for incremental parsing (used by streamedListObjects)
*/
export const createStreamingRequestFunction = function (axiosArgs: RequestArgs, axiosInstance: AxiosInstance, configuration: Configuration, credentials: Credentials, methodAttributes: Record<string, string | number> = {}) {
configuration.isValid();

const retryParams = axiosArgs.options?.retryParams ? axiosArgs.options?.retryParams : configuration.retryParams;
const maxRetry: number = retryParams ? retryParams.maxRetry : 0;
const minWaitInMs: number = retryParams ? retryParams.minWaitInMs : 0;

const start = performance.now();

return async (axios: AxiosInstance = axiosInstance): Promise<any> => {
await setBearerAuthToObject(axiosArgs.options.headers, credentials!);

const url = configuration.getBasePath() + axiosArgs.url;

const axiosRequestArgs = { ...axiosArgs.options, responseType: "stream", url: url };
const wrappedResponse = await attemptHttpRequest(axiosRequestArgs, {
maxRetry,
minWaitInMs,
}, axios);
const response = wrappedResponse?.response;

const result: any = response?.data; // raw stream

let attributes: StringIndexable = {};

attributes = TelemetryAttributes.fromRequest({
userAgent: configuration.baseOptions?.headers["User-Agent"],
httpMethod: axiosArgs.options?.method,
url,
resendCount: wrappedResponse?.retries,
start: start,
credentials: credentials,
attributes: methodAttributes,
});

attributes = TelemetryAttributes.fromResponse({
response,
attributes,
});

const serverRequestDuration = attributes[TelemetryAttribute.HttpServerRequestDuration];
if (configuration.telemetry?.metrics?.histogramQueryDuration && typeof serverRequestDuration !== "undefined") {
configuration.telemetry.recorder.histogram(
TelemetryHistograms.queryDuration,
parseInt(attributes[TelemetryAttribute.HttpServerRequestDuration] as string, 10),
TelemetryAttributes.prepare(
attributes,
configuration.telemetry.metrics.histogramQueryDuration.attributes
)
);
}

if (configuration.telemetry?.metrics?.histogramRequestDuration) {
configuration.telemetry.recorder.histogram(
TelemetryHistograms.requestDuration,
attributes[TelemetryAttribute.HttpClientRequestDuration],
TelemetryAttributes.prepare(
attributes,
configuration.telemetry.metrics.histogramRequestDuration.attributes
)
);
}

return result;
};
};
Loading