Skip to content

Commit f623bca

Browse files
feat(client)!: extract auto pagination to shared classes
refactor(client)!: refactor async auto-pagination refactor(client)!: rename `getNextPage{,Params}` to `nextPage{,Params}` refactor(client)!: swap `nextPage{,Params}` to return non-optional # Migration - If you were referencing the `AutoPager` class on a specific `*Page` or `*PageAsync` type, then you should instead reference the shared `AutoPager` and `AutoPagerAsync` types, under the `core` package - `AutoPagerAsync` now has different usage. You can call `.subscribe(...)` on the returned object instead to get called back each page item. You can also call `onCompleteFuture()` to get a future that completes when all items have been processed. Finally, you can call `.close()` on the returned object to stop auto-paginating early - If you were referencing `getNextPage` or `getNextPageParams`: - Swap to `nextPage()` and `nextPageParams()` - Note that these both now return non-optional types (use `hasNextPage()` before calling these, since they will throw if it's impossible to get another page) There are examples and further information about pagination in the readme.
1 parent 4bf868a commit f623bca

File tree

70 files changed

+1289
-1752
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

70 files changed

+1289
-1752
lines changed

README.md

+65-21
Original file line numberDiff line numberDiff line change
@@ -525,53 +525,101 @@ The SDK throws custom unchecked exception types:
525525

526526
## Pagination
527527

528-
For methods that return a paginated list of results, this library provides convenient ways access the results either one page at a time, or item-by-item across all pages.
528+
The SDK defines methods that return a paginated lists of results. It provides convenient ways to access the results either one page at a time or item-by-item across all pages.
529529

530530
### Auto-pagination
531531

532-
To iterate through all results across all pages, you can use `autoPager`, which automatically handles fetching more pages for you:
532+
To iterate through all results across all pages, use the `autoPager()` method, which automatically fetches more pages as needed.
533533

534-
### Synchronous
534+
When using the synchronous client, the method returns an [`Iterable`](https://docs.oracle.com/javase/8/docs/api/java/lang/Iterable.html)
535535

536536
```java
537537
import com.openai.models.finetuning.jobs.FineTuningJob;
538538
import com.openai.models.finetuning.jobs.JobListPage;
539539

540-
// As an Iterable:
541-
JobListPage page = client.fineTuning().jobs().list(params);
540+
JobListPage page = client.fineTuning().jobs().list();
541+
542+
// Process as an Iterable
542543
for (FineTuningJob job : page.autoPager()) {
543544
System.out.println(job);
544-
};
545+
}
545546

546-
// As a Stream:
547-
client.fineTuning().jobs().list(params).autoPager().stream()
547+
// Process as a Stream
548+
page.autoPager()
549+
.stream()
548550
.limit(50)
549551
.forEach(job -> System.out.println(job));
550552
```
551553

552-
### Asynchronous
554+
When using the asynchronous client, the method returns an [`AsyncStreamResponse`](openai-java-core/src/main/kotlin/com/openai/core/http/AsyncStreamResponse.kt):
553555

554556
```java
555-
// Using forEach, which returns CompletableFuture<Void>:
556-
asyncClient.fineTuning().jobs().list(params).autoPager()
557-
.forEach(job -> System.out.println(job), executor);
557+
import com.openai.core.http.AsyncStreamResponse;
558+
import com.openai.models.finetuning.jobs.FineTuningJob;
559+
import com.openai.models.finetuning.jobs.JobListPageAsync;
560+
import java.util.Optional;
561+
import java.util.concurrent.CompletableFuture;
562+
563+
CompletableFuture<JobListPageAsync> pageFuture = client.async().fineTuning().jobs().list();
564+
565+
pageFuture.thenRun(page -> page.autoPager().subscribe(job -> {
566+
System.out.println(job);
567+
}));
568+
569+
// If you need to handle errors or completion of the stream
570+
pageFuture.thenRun(page -> page.autoPager().subscribe(new AsyncStreamResponse.Handler<>() {
571+
@Override
572+
public void onNext(FineTuningJob job) {
573+
System.out.println(job);
574+
}
575+
576+
@Override
577+
public void onComplete(Optional<Throwable> error) {
578+
if (error.isPresent()) {
579+
System.out.println("Something went wrong!");
580+
throw new RuntimeException(error.get());
581+
} else {
582+
System.out.println("No more!");
583+
}
584+
}
585+
}));
586+
587+
// Or use futures
588+
pageFuture.thenRun(page -> page.autoPager()
589+
.subscribe(job -> {
590+
System.out.println(job);
591+
})
592+
.onCompleteFuture()
593+
.whenComplete((unused, error) -> {
594+
if (error != null) {
595+
System.out.println("Something went wrong!");
596+
throw new RuntimeException(error);
597+
} else {
598+
System.out.println("No more!");
599+
}
600+
}));
558601
```
559602

560603
### Manual pagination
561604

562-
If none of the above helpers meet your needs, you can also manually request pages one-by-one. A page of results has a `data()` method to fetch the list of objects, as well as top-level `response` and other methods to fetch top-level data about the page. It also has methods `hasNextPage`, `getNextPage`, and `getNextPageParams` methods to help with pagination.
605+
To access individual page items and manually request the next page, use the `items()`,
606+
`hasNextPage()`, and `nextPage()` methods:
563607

564608
```java
565609
import com.openai.models.finetuning.jobs.FineTuningJob;
566610
import com.openai.models.finetuning.jobs.JobListPage;
567611

568-
JobListPage page = client.fineTuning().jobs().list(params);
569-
while (page != null) {
570-
for (FineTuningJob job : page.data()) {
612+
JobListPage page = client.fineTuning().jobs().list();
613+
while (true) {
614+
for (FineTuningJob job : page.items()) {
571615
System.out.println(job);
572616
}
573617

574-
page = page.getNextPage().orElse(null);
618+
if (!page.hasNextPage()) {
619+
break;
620+
}
621+
622+
page = page.nextPage();
575623
}
576624
```
577625

@@ -654,9 +702,7 @@ Requests time out after 10 minutes by default.
654702
To set a custom timeout, configure the method call using the `timeout` method:
655703

656704
```java
657-
import com.openai.models.ChatModel;
658705
import com.openai.models.chat.completions.ChatCompletion;
659-
import com.openai.models.chat.completions.ChatCompletionCreateParams;
660706

661707
ChatCompletion chatCompletion = client.chat().completions().create(
662708
params, RequestOptions.builder().timeout(Duration.ofSeconds(30)).build()
@@ -907,9 +953,7 @@ ChatCompletion chatCompletion = client.chat().completions().create(params).valid
907953
Or configure the method call to validate the response using the `responseValidation` method:
908954

909955
```java
910-
import com.openai.models.ChatModel;
911956
import com.openai.models.chat.completions.ChatCompletion;
912-
import com.openai.models.chat.completions.ChatCompletionCreateParams;
913957

914958
ChatCompletion chatCompletion = client.chat().completions().create(
915959
params, RequestOptions.builder().responseValidation(true).build()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
// File generated from our OpenAPI spec by Stainless.
2+
3+
package com.openai.core
4+
5+
import java.util.stream.Stream
6+
import java.util.stream.StreamSupport
7+
8+
class AutoPager<T> private constructor(private val firstPage: Page<T>) : Iterable<T> {
9+
10+
companion object {
11+
12+
fun <T> from(firstPage: Page<T>): AutoPager<T> = AutoPager(firstPage)
13+
}
14+
15+
override fun iterator(): Iterator<T> =
16+
generateSequence(firstPage) { if (it.hasNextPage()) it.nextPage() else null }
17+
.flatMap { it.items() }
18+
.iterator()
19+
20+
fun stream(): Stream<T> = StreamSupport.stream(spliterator(), false)
21+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
// File generated from our OpenAPI spec by Stainless.
2+
3+
package com.openai.core
4+
5+
import com.openai.core.http.AsyncStreamResponse
6+
import java.util.Optional
7+
import java.util.concurrent.CompletableFuture
8+
import java.util.concurrent.CompletionException
9+
import java.util.concurrent.Executor
10+
import java.util.concurrent.atomic.AtomicReference
11+
12+
class AutoPagerAsync<T>
13+
private constructor(private val firstPage: PageAsync<T>, private val defaultExecutor: Executor) :
14+
AsyncStreamResponse<T> {
15+
16+
companion object {
17+
18+
fun <T> from(firstPage: PageAsync<T>, defaultExecutor: Executor): AutoPagerAsync<T> =
19+
AutoPagerAsync(firstPage, defaultExecutor)
20+
}
21+
22+
private val onCompleteFuture = CompletableFuture<Void?>()
23+
private val state = AtomicReference(State.NEW)
24+
25+
override fun subscribe(handler: AsyncStreamResponse.Handler<T>): AsyncStreamResponse<T> =
26+
subscribe(handler, defaultExecutor)
27+
28+
override fun subscribe(
29+
handler: AsyncStreamResponse.Handler<T>,
30+
executor: Executor,
31+
): AsyncStreamResponse<T> = apply {
32+
// TODO(JDK): Use `compareAndExchange` once targeting JDK 9.
33+
check(state.compareAndSet(State.NEW, State.SUBSCRIBED)) {
34+
if (state.get() == State.SUBSCRIBED) "Cannot subscribe more than once"
35+
else "Cannot subscribe after the response is closed"
36+
}
37+
38+
fun PageAsync<T>.handle(): CompletableFuture<Void?> {
39+
if (state.get() == State.CLOSED) {
40+
return CompletableFuture.completedFuture(null)
41+
}
42+
43+
items().forEach { handler.onNext(it) }
44+
return if (hasNextPage()) nextPage().thenCompose { it.handle() }
45+
else CompletableFuture.completedFuture(null)
46+
}
47+
48+
executor.execute {
49+
firstPage.handle().whenComplete { _, error ->
50+
val actualError =
51+
if (error is CompletionException && error.cause != null) error.cause else error
52+
try {
53+
handler.onComplete(Optional.ofNullable(actualError))
54+
} finally {
55+
try {
56+
if (actualError == null) {
57+
onCompleteFuture.complete(null)
58+
} else {
59+
onCompleteFuture.completeExceptionally(actualError)
60+
}
61+
} finally {
62+
close()
63+
}
64+
}
65+
}
66+
}
67+
}
68+
69+
override fun onCompleteFuture(): CompletableFuture<Void?> = onCompleteFuture
70+
71+
override fun close() {
72+
val previousState = state.getAndSet(State.CLOSED)
73+
if (previousState == State.CLOSED) {
74+
return
75+
}
76+
77+
// When the stream is closed, we should always consider it closed. If it closed due
78+
// to an error, then we will have already completed the future earlier, and this
79+
// will be a no-op.
80+
onCompleteFuture.complete(null)
81+
}
82+
}
83+
84+
private enum class State {
85+
NEW,
86+
SUBSCRIBED,
87+
CLOSED,
88+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
// File generated from our OpenAPI spec by Stainless.
2+
3+
package com.openai.core
4+
5+
/**
6+
* An interface representing a single page, with items of type [T], from a paginated endpoint
7+
* response.
8+
*
9+
* Implementations of this interface are expected to request additional pages synchronously. For
10+
* asynchronous pagination, see the [PageAsync] interface.
11+
*/
12+
interface Page<T> {
13+
14+
/**
15+
* Returns whether there's another page after this one.
16+
*
17+
* The method generally doesn't make requests so the result depends entirely on the data in this
18+
* page. If a significant amount of time has passed between requesting this page and calling
19+
* this method, then the result could be stale.
20+
*/
21+
fun hasNextPage(): Boolean
22+
23+
/**
24+
* Returns the page after this one by making another request.
25+
*
26+
* @throws IllegalStateException if it's impossible to get the next page. This exception is
27+
* avoidable by calling [hasNextPage] first.
28+
*/
29+
fun nextPage(): Page<T>
30+
31+
/** Returns the items in this page. */
32+
fun items(): List<T>
33+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
// File generated from our OpenAPI spec by Stainless.
2+
3+
package com.openai.core
4+
5+
import java.util.concurrent.CompletableFuture
6+
7+
/**
8+
* An interface representing a single page, with items of type [T], from a paginated endpoint
9+
* response.
10+
*
11+
* Implementations of this interface are expected to request additional pages asynchronously. For
12+
* synchronous pagination, see the [Page] interface.
13+
*/
14+
interface PageAsync<T> {
15+
16+
/**
17+
* Returns whether there's another page after this one.
18+
*
19+
* The method generally doesn't make requests so the result depends entirely on the data in this
20+
* page. If a significant amount of time has passed between requesting this page and calling
21+
* this method, then the result could be stale.
22+
*/
23+
fun hasNextPage(): Boolean
24+
25+
/**
26+
* Returns the page after this one by making another request.
27+
*
28+
* @throws IllegalStateException if it's impossible to get the next page. This exception is
29+
* avoidable by calling [hasNextPage] first.
30+
*/
31+
fun nextPage(): CompletableFuture<out PageAsync<T>>
32+
33+
/** Returns the items in this page. */
34+
fun items(): List<T>
35+
}

openai-java-core/src/main/kotlin/com/openai/models/batches/BatchListPage.kt

+9-31
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,12 @@
22

33
package com.openai.models.batches
44

5+
import com.openai.core.AutoPager
6+
import com.openai.core.Page
57
import com.openai.core.checkRequired
68
import com.openai.services.blocking.BatchService
79
import java.util.Objects
810
import java.util.Optional
9-
import java.util.stream.Stream
10-
import java.util.stream.StreamSupport
1111
import kotlin.jvm.optionals.getOrNull
1212

1313
/** @see [BatchService.list] */
@@ -16,7 +16,7 @@ private constructor(
1616
private val service: BatchService,
1717
private val params: BatchListParams,
1818
private val response: BatchListPageResponse,
19-
) {
19+
) : Page<Batch> {
2020

2121
/**
2222
* Delegates to [BatchListPageResponse], but gracefully handles missing data.
@@ -32,19 +32,16 @@ private constructor(
3232
*/
3333
fun hasMore(): Optional<Boolean> = response._hasMore().getOptional("has_more")
3434

35-
fun hasNextPage(): Boolean = data().isNotEmpty()
35+
override fun items(): List<Batch> = data()
3636

37-
fun getNextPageParams(): Optional<BatchListParams> {
38-
if (!hasNextPage()) {
39-
return Optional.empty()
40-
}
37+
override fun hasNextPage(): Boolean = items().isNotEmpty()
4138

42-
return Optional.of(params.toBuilder().after(data().last()._id().getOptional("id")).build())
43-
}
39+
fun nextPageParams(): BatchListParams =
40+
params.toBuilder().after(items().last()._id().getOptional("id")).build()
4441

45-
fun getNextPage(): Optional<BatchListPage> = getNextPageParams().map { service.list(it) }
42+
override fun nextPage(): BatchListPage = service.list(nextPageParams())
4643

47-
fun autoPager(): AutoPager = AutoPager(this)
44+
fun autoPager(): AutoPager<Batch> = AutoPager.from(this)
4845

4946
/** The parameters that were used to request this page. */
5047
fun params(): BatchListParams = params
@@ -113,25 +110,6 @@ private constructor(
113110
)
114111
}
115112

116-
class AutoPager(private val firstPage: BatchListPage) : Iterable<Batch> {
117-
118-
override fun iterator(): Iterator<Batch> = iterator {
119-
var page = firstPage
120-
var index = 0
121-
while (true) {
122-
while (index < page.data().size) {
123-
yield(page.data()[index++])
124-
}
125-
page = page.getNextPage().getOrNull() ?: break
126-
index = 0
127-
}
128-
}
129-
130-
fun stream(): Stream<Batch> {
131-
return StreamSupport.stream(spliterator(), false)
132-
}
133-
}
134-
135113
override fun equals(other: Any?): Boolean {
136114
if (this === other) {
137115
return true

0 commit comments

Comments
 (0)