Skip to content

Commit 1b89e05

Browse files
committed
feat: generate API layer for streaming endpoints
feat: streaming list objects support
1 parent 623b191 commit 1b89e05

18 files changed

+4417
-2127
lines changed

.openapi-generator/FILES

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@ docs/RelationshipCondition.md
6868
docs/SourceInfo.md
6969
docs/Status.md
7070
docs/Store.md
71+
docs/StreamResultOfStreamedListObjectsResponse.md
72+
docs/StreamedListObjectsResponse.md
7173
docs/Tuple.md
7274
docs/TupleChange.md
7375
docs/TupleKey.md
@@ -157,6 +159,8 @@ model_relationship_condition.go
157159
model_source_info.go
158160
model_status.go
159161
model_store.go
162+
model_stream_result_of_streamed_list_objects_response.go
163+
model_streamed_list_objects_response.go
160164
model_tuple.go
161165
model_tuple_change.go
162166
model_tuple_key.go

CHANGELOG.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,12 @@
33
## [Unreleased](https://github.com/openfga/go-sdk/compare/v0.7.3...HEAD)
44
- feat: add generic `ToPtr[T any](v T) *T` function for creating pointers to any type
55
- deprecation: `PtrBool`, `PtrInt`, `PtrInt32`, `PtrInt64`, `PtrFloat32`, `PtrFloat64`, `PtrString`, and `PtrTime` are now deprecated in favor of the generic `ToPtr` function
6+
- feat: add support for StreamedListObjects endpoint
7+
- feat: add configurable buffer size for streaming responses via `ClientStreamedListObjectsOptions.StreamBufferSize`
68

79
## v0.7.3
810

9-
### [0.7.3](https://github.com/openfga/go-sdk/compare/v0.7.2...v0.7.3)
11+
### [0.7.3](https://github.com/openfga/go-sdk/compare/v0.7.2...v0.7.3) (2025-10-08)
1012

1113
- feat: add support for custom headers per request. See [documentation](https://github.com/openfga/go-sdk#custom-headers).
1214
- feat: add support for conflict options for Write operations**: (#229)

CONTRIBUTING.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ Reading and following these guidelines will help us make the contribution proces
1010
* [Getting Started](#getting-started)
1111
* [Making Changes](#making-changes)
1212
* [Opening Issues](#opening-issues)
13-
* [Submitting Pull Requests](#submitting-pull-requests) [Note: We are not accepting Pull Requests at this time!]
13+
* [Submitting Pull Requests](#submitting-pull-requests)
1414
* [Getting in Touch](#getting-in-touch)
1515
* [Have a question or problem?](#have-a-question-or-problem)
1616
* [Vulnerability Reporting](#vulnerability-reporting)

README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1101,6 +1101,7 @@ Class | Method | HTTP request | Description
11011101
*OpenFgaApi* | [**ReadAuthorizationModel**](docs/OpenFgaApi.md#readauthorizationmodel) | **Get** /stores/{store_id}/authorization-models/{id} | Return a particular version of an authorization model
11021102
*OpenFgaApi* | [**ReadAuthorizationModels**](docs/OpenFgaApi.md#readauthorizationmodels) | **Get** /stores/{store_id}/authorization-models | Return all the authorization models for a particular store
11031103
*OpenFgaApi* | [**ReadChanges**](docs/OpenFgaApi.md#readchanges) | **Get** /stores/{store_id}/changes | Return a list of all the tuple changes
1104+
*OpenFgaApi* | [**StreamedListObjects**](docs/OpenFgaApi.md#streamedlistobjects) | **Post** /stores/{store_id}/streamed-list-objects | Stream all objects of the given type that the user has a relation with
11041105
*OpenFgaApi* | [**Write**](docs/OpenFgaApi.md#write) | **Post** /stores/{store_id}/write | Add or delete tuples from the store
11051106
*OpenFgaApi* | [**WriteAssertions**](docs/OpenFgaApi.md#writeassertions) | **Put** /stores/{store_id}/assertions/{authorization_model_id} | Upsert assertions for an authorization model ID
11061107
*OpenFgaApi* | [**WriteAuthorizationModel**](docs/OpenFgaApi.md#writeauthorizationmodel) | **Post** /stores/{store_id}/authorization-models | Create a new authorization model
@@ -1166,6 +1167,8 @@ Class | Method | HTTP request | Description
11661167
- [SourceInfo](docs/SourceInfo.md)
11671168
- [Status](docs/Status.md)
11681169
- [Store](docs/Store.md)
1170+
- [StreamResultOfStreamedListObjectsResponse](docs/StreamResultOfStreamedListObjectsResponse.md)
1171+
- [StreamedListObjectsResponse](docs/StreamedListObjectsResponse.md)
11691172
- [Tuple](docs/Tuple.md)
11701173
- [TupleChange](docs/TupleChange.md)
11711174
- [TupleKey](docs/TupleKey.md)

api_open_fga.go

Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -771,6 +771,24 @@ type OpenFgaApi interface {
771771
*/
772772
ReadChangesExecute(r ApiReadChangesRequest) (ReadChangesResponse, *http.Response, error)
773773

774+
/*
775+
* StreamedListObjects Stream all objects of the given type that the user has a relation with
776+
* The Streamed ListObjects API is very similar to the the ListObjects API, with two differences:
777+
1. Instead of collecting all objects before returning a response, it streams them to the client as they are collected.
778+
2. The number of results returned is only limited by the execution timeout specified in the flag OPENFGA_LIST_OBJECTS_DEADLINE.
779+
780+
* @param ctx context.Context - for authentication, logging, cancellation, deadlines, tracing, etc. Passed from http.Request or context.Background().
781+
* @param storeId
782+
* @return ApiStreamedListObjectsRequest
783+
*/
784+
StreamedListObjects(ctx context.Context, storeId string) ApiStreamedListObjectsRequest
785+
786+
/*
787+
* StreamedListObjectsExecute executes the request
788+
* @return StreamResultOfStreamedListObjectsResponse
789+
*/
790+
StreamedListObjectsExecute(r ApiStreamedListObjectsRequest) (StreamResultOfStreamedListObjectsResponse, *http.Response, error)
791+
774792
/*
775793
* Write Add or delete tuples from the store
776794
* The Write API will transactionally update the tuples for a certain store. Tuples and type definitions allow OpenFGA to determine whether a relationship exists between an object and an user.
@@ -4170,6 +4188,202 @@ func (a *OpenFgaApiService) ReadChangesExecute(r ApiReadChangesRequest) (ReadCha
41704188
return returnValue, nil, reportError("Error not handled properly")
41714189
}
41724190

4191+
type ApiStreamedListObjectsRequest struct {
4192+
ctx context.Context
4193+
ApiService OpenFgaApi
4194+
storeId string
4195+
body *ListObjectsRequest
4196+
options RequestOptions
4197+
}
4198+
4199+
func (r ApiStreamedListObjectsRequest) Body(body ListObjectsRequest) ApiStreamedListObjectsRequest {
4200+
r.body = &body
4201+
return r
4202+
}
4203+
4204+
func (r ApiStreamedListObjectsRequest) Options(options RequestOptions) ApiStreamedListObjectsRequest {
4205+
r.options = options
4206+
return r
4207+
}
4208+
4209+
func (r ApiStreamedListObjectsRequest) Execute() (StreamResultOfStreamedListObjectsResponse, *http.Response, error) {
4210+
return r.ApiService.StreamedListObjectsExecute(r)
4211+
}
4212+
4213+
/*
4214+
- StreamedListObjects Stream all objects of the given type that the user has a relation with
4215+
- The Streamed ListObjects API is very similar to the the ListObjects API, with two differences:
4216+
4217+
1. Instead of collecting all objects before returning a response, it streams them to the client as they are collected.
4218+
2. The number of results returned is only limited by the execution timeout specified in the flag OPENFGA_LIST_OBJECTS_DEADLINE.
4219+
4220+
- @param ctx context.Context - for authentication, logging, cancellation, deadlines, tracing, etc. Passed from http.Request or context.Background().
4221+
- @param storeId
4222+
- @return ApiStreamedListObjectsRequest
4223+
*/
4224+
func (a *OpenFgaApiService) StreamedListObjects(ctx context.Context, storeId string) ApiStreamedListObjectsRequest {
4225+
return ApiStreamedListObjectsRequest{
4226+
ApiService: a,
4227+
ctx: ctx,
4228+
storeId: storeId,
4229+
}
4230+
}
4231+
4232+
/*
4233+
* Execute executes the request
4234+
* @return StreamResultOfStreamedListObjectsResponse
4235+
*/
4236+
func (a *OpenFgaApiService) StreamedListObjectsExecute(r ApiStreamedListObjectsRequest) (StreamResultOfStreamedListObjectsResponse, *http.Response, error) {
4237+
const (
4238+
operationName = "StreamedListObjects"
4239+
httpMethod = http.MethodPost
4240+
)
4241+
var (
4242+
requestStarted = time.Now()
4243+
requestBody interface{}
4244+
returnValue StreamResultOfStreamedListObjectsResponse
4245+
)
4246+
4247+
path := "/stores/{store_id}/streamed-list-objects"
4248+
if r.storeId == "" {
4249+
return returnValue, nil, reportError("storeId is required and must be specified")
4250+
}
4251+
4252+
path = strings.ReplaceAll(path, "{"+"store_id"+"}", url.PathEscape(parameterToString(r.storeId, "")))
4253+
4254+
localVarHeaderParams := make(map[string]string)
4255+
localVarQueryParams := url.Values{}
4256+
if r.body == nil {
4257+
return returnValue, nil, reportError("body is required and must be specified")
4258+
}
4259+
4260+
// to determine the Content-Type header
4261+
localVarHTTPContentTypes := []string{"application/json"}
4262+
4263+
// set Content-Type header
4264+
localVarHTTPContentType := selectHeaderContentType(localVarHTTPContentTypes)
4265+
if localVarHTTPContentType != "" {
4266+
localVarHeaderParams["Content-Type"] = localVarHTTPContentType
4267+
}
4268+
4269+
// to determine the Accept header
4270+
localVarHTTPHeaderAccepts := []string{"application/json"}
4271+
4272+
// set Accept header
4273+
localVarHTTPHeaderAccept := selectHeaderAccept(localVarHTTPHeaderAccepts)
4274+
if localVarHTTPHeaderAccept != "" {
4275+
localVarHeaderParams["Accept"] = localVarHTTPHeaderAccept
4276+
}
4277+
// body params
4278+
requestBody = r.body
4279+
4280+
// if any override headers were in the options, set them now
4281+
for header, val := range r.options.Headers {
4282+
localVarHeaderParams[header] = val
4283+
}
4284+
4285+
retryParams := a.client.cfg.RetryParams
4286+
for i := 0; i < retryParams.MaxRetry+1; i++ {
4287+
req, err := a.client.prepareRequest(r.ctx, path, httpMethod, requestBody, localVarHeaderParams, localVarQueryParams)
4288+
if err != nil {
4289+
return returnValue, nil, err
4290+
}
4291+
4292+
httpResponse, err := a.client.callAPI(req)
4293+
if err != nil || httpResponse == nil {
4294+
if i < retryParams.MaxRetry {
4295+
timeToWait := retryutils.GetTimeToWait(i, retryParams.MaxRetry, retryParams.MinWaitInMs, http.Header{}, operationName)
4296+
if timeToWait > 0 {
4297+
if a.client.cfg.Debug {
4298+
log.Printf("\nWaiting %v to retry %v (%v %v) due to network error (error=%v) on attempt %v. Request body: %v\n", timeToWait, operationName, req.Method, req.URL, err, i, requestBody)
4299+
}
4300+
time.Sleep(timeToWait)
4301+
continue
4302+
}
4303+
}
4304+
return returnValue, httpResponse, err
4305+
}
4306+
4307+
responseBody, err := io.ReadAll(httpResponse.Body)
4308+
_ = httpResponse.Body.Close()
4309+
httpResponse.Body = io.NopCloser(bytes.NewBuffer(responseBody))
4310+
if err != nil {
4311+
if i < retryParams.MaxRetry {
4312+
timeToWait := retryutils.GetTimeToWait(i, retryParams.MaxRetry, retryParams.MinWaitInMs, httpResponse.Header, operationName)
4313+
if timeToWait > 0 {
4314+
if a.client.cfg.Debug {
4315+
log.Printf("\nWaiting %v to retry %v (%v %v) due to error parsing response body (err=%v) on attempt %v. Request body: %v\n", timeToWait, operationName, req.Method, req.URL, err, i, requestBody)
4316+
}
4317+
time.Sleep(timeToWait)
4318+
continue
4319+
}
4320+
}
4321+
return returnValue, httpResponse, err
4322+
}
4323+
4324+
if httpResponse.StatusCode >= http.StatusMultipleChoices {
4325+
err := a.client.handleAPIError(httpResponse, responseBody, requestBody, operationName, r.storeId)
4326+
if err != nil && i < retryParams.MaxRetry {
4327+
timeToWait := time.Duration(0)
4328+
var fgaApiRateLimitExceededError FgaApiRateLimitExceededError
4329+
var fgaApiInternalError FgaApiInternalError
4330+
switch {
4331+
case errors.As(err, &fgaApiRateLimitExceededError):
4332+
timeToWait = err.(FgaApiRateLimitExceededError).GetTimeToWait(i, *retryParams)
4333+
case errors.As(err, &fgaApiInternalError):
4334+
timeToWait = err.(FgaApiInternalError).GetTimeToWait(i, *retryParams)
4335+
}
4336+
4337+
if timeToWait > 0 {
4338+
if a.client.cfg.Debug {
4339+
log.Printf("\nWaiting %v to retry %v (%v %v) due to api retryable error (status code %v, error=%v) on attempt %v. Request body: %v\n", timeToWait, operationName, req.Method, req.URL, httpResponse.StatusCode, err, i, requestBody)
4340+
}
4341+
time.Sleep(timeToWait)
4342+
continue
4343+
}
4344+
}
4345+
4346+
return returnValue, httpResponse, err
4347+
}
4348+
4349+
err = a.client.decode(&returnValue, responseBody, httpResponse.Header.Get("Content-Type"))
4350+
if err != nil {
4351+
newErr := GenericOpenAPIError{
4352+
body: responseBody,
4353+
error: err.Error(),
4354+
}
4355+
return returnValue, httpResponse, newErr
4356+
}
4357+
4358+
metrics := telemetry.GetMetrics(telemetry.TelemetryFactoryParameters{Configuration: a.client.cfg.Telemetry})
4359+
4360+
var attrs, queryDuration, requestDuration, _ = metrics.BuildTelemetryAttributes(
4361+
operationName,
4362+
map[string]interface{}{
4363+
"storeId": r.storeId,
4364+
"body": requestBody,
4365+
},
4366+
req,
4367+
httpResponse,
4368+
requestStarted,
4369+
i,
4370+
)
4371+
4372+
if requestDuration > 0 {
4373+
_, _ = metrics.RequestDuration(requestDuration, attrs)
4374+
}
4375+
4376+
if queryDuration > 0 {
4377+
_, _ = metrics.QueryDuration(queryDuration, attrs)
4378+
}
4379+
4380+
return returnValue, httpResponse, nil
4381+
}
4382+
4383+
// should never have reached this
4384+
return returnValue, nil, reportError("Error not handled properly")
4385+
}
4386+
41734387
type ApiWriteRequest struct {
41744388
ctx context.Context
41754389
ApiService OpenFgaApi

0 commit comments

Comments
 (0)