diff --git a/langchain4j-kotlin/src/main/kotlin/me/kpavlov/langchain4j/kotlin/model/chat/StreamingChatLanguageModelExtensions.kt b/langchain4j-kotlin/src/main/kotlin/me/kpavlov/langchain4j/kotlin/model/chat/StreamingChatLanguageModelExtensions.kt index e0ecb54..9f8ae18 100644 --- a/langchain4j-kotlin/src/main/kotlin/me/kpavlov/langchain4j/kotlin/model/chat/StreamingChatLanguageModelExtensions.kt +++ b/langchain4j-kotlin/src/main/kotlin/me/kpavlov/langchain4j/kotlin/model/chat/StreamingChatLanguageModelExtensions.kt @@ -10,7 +10,6 @@ import kotlinx.coroutines.channels.trySendBlocking import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.buffer import kotlinx.coroutines.flow.callbackFlow -import kotlinx.coroutines.flow.channelFlow import kotlinx.coroutines.flow.flow import me.kpavlov.langchain4j.kotlin.model.chat.request.ChatRequestBuilder import me.kpavlov.langchain4j.kotlin.model.chat.request.chatRequest @@ -50,19 +49,6 @@ sealed interface StreamingChatLanguageModelReply { data class CompleteResponse( val response: ChatResponse, ) : StreamingChatLanguageModelReply - - /** - * Represents an error that occurred during the streaming process - * when generating a reply from the AI language model. This type - * of reply is used to indicate a failure in the operation and - * provides details about the cause of the error. - * - * @property cause The underlying exception or error that caused the failure. - * @see StreamingChatResponseHandler.onError - */ - data class Error( - val cause: Throwable, - ) : StreamingChatLanguageModelReply } /** @@ -83,7 +69,7 @@ sealed interface StreamingChatLanguageModelReply { */ fun StreamingChatLanguageModel.chatFlow( block: ChatRequestBuilder.() -> Unit, -): Flow = +): Flow = flow { callbackFlow { val model = this@chatFlow val chatRequest = chatRequest(block) @@ -114,7 +100,6 @@ fun StreamingChatLanguageModel.chatFlow( error.message, error, ) - trySend(StreamingChatLanguageModelReply.Error(error)) close(error) } } @@ -127,7 +112,8 @@ fun StreamingChatLanguageModel.chatFlow( // cleanup logger.info("Flow is canceled") } - } + }.buffer(Channel.UNLIMITED).collect(this) +} fun TokenStream.asFlow(): Flow = flow { callbackFlow { diff --git a/langchain4j-kotlin/src/test/kotlin/me/kpavlov/langchain4j/kotlin/model/chat/StreamingChatLanguageModelIT.kt b/langchain4j-kotlin/src/test/kotlin/me/kpavlov/langchain4j/kotlin/model/chat/StreamingChatLanguageModelIT.kt index e2d207c..63f3826 100644 --- a/langchain4j-kotlin/src/test/kotlin/me/kpavlov/langchain4j/kotlin/model/chat/StreamingChatLanguageModelIT.kt +++ b/langchain4j-kotlin/src/test/kotlin/me/kpavlov/langchain4j/kotlin/model/chat/StreamingChatLanguageModelIT.kt @@ -74,7 +74,6 @@ internal class StreamingChatLanguageModelIT { } is CompleteResponse -> responseRef.set(it.response) - is StreamingChatLanguageModelReply.Error -> fail("Error", it.cause) else -> fail("Unsupported event: $it") } }