Skip to content

Commit

Permalink
Fix bufferstream thread-safety with shim for now
Browse files Browse the repository at this point in the history
  • Loading branch information
quinnj committed Jan 31, 2025
1 parent adc7a03 commit b08c9bd
Showing 1 changed file with 22 additions and 2 deletions.
24 changes: 22 additions & 2 deletions src/client/stream.jl
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,26 @@ function c_on_response_body(aws_stream_ptr, data::Ptr{aws_byte_cursor}, stream_p
if stream.decompress
if stream.gzipstream === nothing
stream.bufferstream = b = Base.BufferStream()
Core.println("gzip BufferStream: len = $(length(b.buffer.data)), size = $(b.buffer.size)")
stream.gzipstream = g = CodecZlib.GzipDecompressorStream(b)
# @info "decompress 1" ptr=data len=bc.len
unsafe_write(g, bc.ptr, bc.len)
Core.println("gzip BufferStream after write: len = $(length(b.buffer.data)), size = $(b.buffer.size)")
else
# @info "decompress 2" ptr=data len=bc.len
unsafe_write(stream.gzipstream, bc.ptr, bc.len)
end
else
if stream.bufferstream === nothing
stream.bufferstream = b = Base.BufferStream()
Core.println("BufferStream: len = $(length(b.buffer.data)), size = $(b.buffer.size)")
# @info "writebuf 1" ptr=data len=bc.len
unsafe_write(b, bc.ptr, bc.len)
Core.println("BufferStream after write: len = $(length(b.buffer.data)), size = $(b.buffer.size)")
else
# @info "writebuf 2" ptr=data len=bc.len
unsafe_write(stream.bufferstream, bc.ptr, bc.len)
Core.println("after write: len = $(length(stream.bufferstream.buffer.data)), size = $(stream.bufferstream.buffer.size)")
end
end
return Cint(0)
Expand Down Expand Up @@ -192,15 +201,15 @@ function with_stream(conn::Ptr{aws_http_connection}, req::Request, chunkedbody,
if on_stream_response_body !== nothing
try
while !eof(stream.bufferstream)
on_stream_response_body(resp, readavailable(stream.bufferstream))
on_stream_response_body(resp, _readavailable(stream.bufferstream))
end
catch e
rethrow(DontRetry(e))
end
else
wait(stream.fut)
if stream.bufferstream !== nothing
resp.body = readavailable(stream.bufferstream)
resp.body = _readavailable(stream.bufferstream)
else
resp.body = UInt8[]
end
Expand All @@ -210,4 +219,15 @@ function with_stream(conn::Ptr{aws_http_connection}, req::Request, chunkedbody,
aws_http_stream_release(stream_ptr)
end
end # GC.@preserve
end

# can be removed once https://github.com/JuliaLang/julia/pull/57211 is fully released
function _readavailable(this::Base.BufferStream)
bytes = lock(this.cond) do
Base.wait_readnb(this, 1)
buf = this.buffer
@assert buf.seekable == false
take!(buf)
end
return bytes
end

0 comments on commit b08c9bd

Please sign in to comment.