Skip to content

Commit b2f5e55

Browse files
authored
Add return result for stream send() (#21)
1 parent 468563d commit b2f5e55

File tree

14 files changed

+183
-29
lines changed

14 files changed

+183
-29
lines changed

library/src/main/kotlin/build/buf/connect/BidirectionalStreamInterface.kt

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,21 @@ interface BidirectionalStreamInterface<Input, Output> {
3131
* Send a request to the server over the stream.
3232
*
3333
* @param input The request message to send.
34+
* @return [Result.success] on send success, [Result.failure] on
35+
* any sends which are not successful.
3436
*/
35-
suspend fun send(input: Input)
37+
suspend fun send(input: Input): Result<Unit>
3638

3739
/**
38-
* Close the stream. No calls to `send()` are valid after calling `close()`.
40+
* Close the stream. No calls to [send] are valid after calling [close].
3941
*/
4042
fun close()
43+
44+
/**
45+
* Determine if the underlying stream is closed.
46+
*
47+
* @return true if the underlying stream is closed. If the stream is still open,
48+
* this will return false.
49+
*/
50+
fun isClosed(): Boolean
4151
}

library/src/main/kotlin/build/buf/connect/ClientOnlyStreamInterface.kt

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,18 @@ interface ClientOnlyStreamInterface<Input, Output> {
2424
*
2525
* @param input The request message to send.
2626
*/
27-
suspend fun send(input: Input)
27+
suspend fun send(input: Input): Result<Unit>
2828

2929
/**
30-
* Close the stream. No calls to `send()` are valid after calling `close()`.
30+
* Close the stream. No calls to [send] are valid after calling [close].
3131
*/
3232
fun close()
33+
34+
/**
35+
* Determine if the underlying stream is closed.
36+
*
37+
* @return true if the underlying stream is closed. If the stream is still open,
38+
* this will return false.
39+
*/
40+
fun isClosed(): Boolean
3341
}

library/src/main/kotlin/build/buf/connect/ConnectError.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ import kotlin.reflect.KClass
1818

1919
/**
2020
* Typed error provided by Connect RPCs that may optionally wrap additional typed custom errors
21-
* using `details`.
21+
* using [details].
2222
*/
2323
data class ConnectError constructor(
2424
// The resulting status code.
@@ -35,7 +35,7 @@ data class ConnectError constructor(
3535
) : Throwable(message, exception) {
3636

3737
/**
38-
* Unpacks values from `self.details` and returns the first matching error, if any.
38+
* Unpacks values from [details] and returns the first matching error, if any.
3939
*
4040
* @return The unpacked typed error details, if available.
4141
*/
@@ -51,7 +51,7 @@ data class ConnectError constructor(
5151
}
5252

5353
/**
54-
* Creates a new ConnectError with the specified CompletionParser.
54+
* Creates a new [ConnectError] with the specified [ErrorDetailParser].
5555
*/
5656
fun setErrorParser(errorParser: ErrorDetailParser): ConnectError {
5757
return ConnectError(

library/src/main/kotlin/build/buf/connect/SerializationStrategy.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import kotlin.reflect.KClass
1919
/**
2020
* The serialization strategy for completion events from gRPC or Connect.
2121
*
22-
* A base data type will need to implement a SerializationStrategy.
22+
* A base data type will need to implement a [SerializationStrategy].
2323
*/
2424
interface SerializationStrategy {
2525

library/src/main/kotlin/build/buf/connect/ServerOnlyStreamInterface.kt

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,21 @@ interface ServerOnlyStreamInterface<Input, Output> {
3333
* Can only be called exactly one time when starting the stream.
3434
*
3535
* @param input The request message to send.
36+
* @return [Result.success] on send success, [Result.failure] on
37+
* any sends which are not successful.
3638
*/
37-
suspend fun send(input: Input)
39+
suspend fun send(input: Input): Result<Unit>
3840

3941
/**
40-
* Close the stream. No calls to `send()` are valid after calling `close()`.
42+
* Close the stream. No calls to [send] are valid after calling [close].
4143
*/
4244
fun close()
45+
46+
/**
47+
* Determine if the underlying stream is closed.
48+
*
49+
* @return true if the underlying stream is closed. If the stream is still open,
50+
* this will return false.
51+
*/
52+
fun isClosed(): Boolean
4353
}

library/src/main/kotlin/build/buf/connect/StreamResult.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ package build.buf.connect
1717
/**
1818
* Enumeration of result states that can be received over streams.
1919
*
20-
* A typical stream receives `Headers > Message > Message > Message ... > Complete`
20+
* A typical stream receives [Headers] > [Message] > [Message] > [Message] ... > [Complete]
2121
*/
2222
sealed class StreamResult<Output> constructor(
2323
val error: Throwable? = null
@@ -34,7 +34,7 @@ sealed class StreamResult<Output> constructor(
3434
/**
3535
* Get the ConnectError from the result.
3636
*
37-
* @return The ConnectError if present, null otherwise.
37+
* @return The [ConnectError] if present, null otherwise.
3838
*/
3939
fun connectError(): ConnectError? {
4040
if (error is ConnectError) {

library/src/main/kotlin/build/buf/connect/compression/CompressionPool.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,11 @@ import okio.Buffer
2020
* Conforming types provide the functionality to compress/decompress data using a specific
2121
* algorithm.
2222
*
23-
* `ProtocolClientInterface` implementations are expected to use the first compression pool with
24-
* a matching `name()` for decompressing inbound responses.
23+
* [build.buf.connect.ProtocolClientInterface] implementations are expected to use the first compression pool with
24+
* a matching [name] for decompressing inbound responses.
2525
*
2626
* Outbound request compression can be specified using additional options that specify a
27-
* `compressionName` that matches a compression pool's `name()`.
27+
* `compressionName` that matches a compression pool's [name].
2828
*/
2929
interface CompressionPool {
3030
/**

library/src/main/kotlin/build/buf/connect/http/HTTPClientInterface.kt

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package build.buf.connect.http
1616

1717
import build.buf.connect.StreamResult
1818
import okio.Buffer
19+
import java.util.concurrent.atomic.AtomicReference
1920

2021
typealias Cancelable = () -> Unit
2122

@@ -49,11 +50,23 @@ class Stream(
4950
private val onSend: (Buffer) -> Unit,
5051
private val onClose: () -> Unit
5152
) {
52-
fun send(buffer: Buffer) {
53+
private val isClosed = AtomicReference(false)
54+
55+
fun send(buffer: Buffer): Result<Unit> {
56+
if (isClosed()) {
57+
return Result.failure(IllegalStateException("cannot send. underlying stream is closed"))
58+
}
5359
onSend(buffer)
60+
return Result.success(Unit)
5461
}
5562

5663
fun close() {
57-
onClose()
64+
if (!isClosed.getAndSet(true)) {
65+
onClose()
66+
}
67+
}
68+
69+
fun isClosed(): Boolean {
70+
return isClosed.get()
5871
}
5972
}

library/src/main/kotlin/build/buf/connect/http/TracingInfo.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
package build.buf.connect.http
1616

1717
/**
18-
* Tracing metadata for HTTPClientInterface and Interceptors.
18+
* Tracing metadata for [HTTPClientInterface] and [build.buf.connect.Interceptor].
1919
*/
2020
data class TracingInfo(
2121
// The underlying http status code

library/src/main/kotlin/build/buf/connect/impl/BidirectionalStream.kt

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,19 +20,24 @@ import build.buf.connect.StreamResult
2020
import build.buf.connect.http.Stream
2121
import kotlinx.coroutines.channels.Channel
2222
import kotlinx.coroutines.channels.ReceiveChannel
23+
import java.lang.Exception
2324

2425
/**
25-
* Concrete implementation of `BidirectionalStreamInterface`.
26+
* Concrete implementation of [BidirectionalStreamInterface].
2627
*/
2728
internal class BidirectionalStream<Input, Output>(
2829
val stream: Stream,
2930
private val requestCodec: Codec<Input>,
3031
private val receiveChannel: Channel<StreamResult<Output>>
3132
) : BidirectionalStreamInterface<Input, Output> {
3233

33-
override suspend fun send(input: Input) {
34-
val msg = requestCodec.serialize(input)
35-
stream.send(msg)
34+
override suspend fun send(input: Input): Result<Unit> {
35+
val msg = try {
36+
requestCodec.serialize(input)
37+
} catch (e: Exception) {
38+
return Result.failure(e)
39+
}
40+
return stream.send(msg)
3641
}
3742

3843
override fun resultChannel(): ReceiveChannel<StreamResult<Output>> {
@@ -42,4 +47,8 @@ internal class BidirectionalStream<Input, Output>(
4247
override fun close() {
4348
stream.close()
4449
}
50+
51+
override fun isClosed(): Boolean {
52+
return stream.isClosed()
53+
}
4554
}

0 commit comments

Comments
 (0)