Skip to content

Commit 92e26d0

Browse files
feat: handle thread interrupts in the core HTTP client
1 parent 88a4a35 commit 92e26d0

File tree

2 files changed

+53
-24
lines changed

2 files changed

+53
-24
lines changed

lib/openai/internal/transport/pooled_net_requester.rb

Lines changed: 30 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -128,40 +128,48 @@ def execute(request)
128128
url, deadline = request.fetch_values(:url, :deadline)
129129

130130
req = nil
131-
eof = false
132131
finished = false
133-
closing = nil
134132

135133
# rubocop:disable Metrics/BlockLength
136134
enum = Enumerator.new do |y|
137135
next if finished
138136

139137
with_pool(url, deadline: deadline) do |conn|
140-
req, closing = self.class.build_request(request) do
141-
self.class.calibrate_socket_timeout(conn, deadline)
142-
end
143-
144-
self.class.calibrate_socket_timeout(conn, deadline)
145-
unless conn.started?
146-
conn.keep_alive_timeout = self.class::KEEP_ALIVE_TIMEOUT
147-
conn.start
148-
end
138+
eof = false
139+
closing = nil
140+
::Thread.handle_interrupt(Object => :never) do
141+
::Thread.handle_interrupt(Object => :immediate) do
142+
req, closing = self.class.build_request(request) do
143+
self.class.calibrate_socket_timeout(conn, deadline)
144+
end
149145

150-
self.class.calibrate_socket_timeout(conn, deadline)
151-
conn.request(req) do |rsp|
152-
y << [req, rsp]
153-
break if finished
154-
155-
rsp.read_body do |bytes|
156-
y << bytes.force_encoding(Encoding::BINARY)
157-
break if finished
146+
self.class.calibrate_socket_timeout(conn, deadline)
147+
unless conn.started?
148+
conn.keep_alive_timeout = self.class::KEEP_ALIVE_TIMEOUT
149+
conn.start
150+
end
158151

159152
self.class.calibrate_socket_timeout(conn, deadline)
153+
conn.request(req) do |rsp|
154+
y << [req, rsp]
155+
break if finished
156+
157+
rsp.read_body do |bytes|
158+
y << bytes.force_encoding(Encoding::BINARY)
159+
break if finished
160+
161+
self.class.calibrate_socket_timeout(conn, deadline)
162+
end
163+
eof = true
164+
end
165+
end
166+
ensure
167+
begin
168+
conn.finish if !eof && conn&.started?
169+
ensure
170+
closing&.call
160171
end
161-
eof = true
162172
end
163-
ensure
164-
conn.finish if !eof && conn&.started?
165173
end
166174
rescue Timeout::Error
167175
raise OpenAI::Errors::APITimeoutError.new(url: url, request: req)
@@ -174,8 +182,6 @@ def execute(request)
174182
body = OpenAI::Internal::Util.fused_enum(enum, external: true) do
175183
finished = true
176184
loop { enum.next }
177-
ensure
178-
closing&.call
179185
end
180186
[Integer(response.code), response, body]
181187
end

test/openai/internal/util_test.rb

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -343,6 +343,29 @@ def test_rewind_closing
343343
assert_equal(0, steps)
344344
end
345345

346+
def test_thread_interrupts
347+
once = 0
348+
que = Queue.new
349+
enum = Enumerator.new do |y|
350+
10.times { y << _1 }
351+
ensure
352+
once = once.succ
353+
end
354+
355+
fused_1 = OpenAI::Internal::Util.fused_enum(enum, external: true) { loop { enum.next } }
356+
fused_2 = OpenAI::Internal::Util.chain_fused(fused_1) { fused_1.each(&_1) }
357+
fused_3 = OpenAI::Internal::Util.chain_fused(fused_2) { fused_2.each(&_1) }
358+
359+
th = ::Thread.new do
360+
que << "🐶"
361+
fused_3.each { sleep(10) }
362+
end
363+
364+
assert_equal("🐶", que.pop)
365+
th.kill.join
366+
assert_equal(1, once)
367+
end
368+
346369
def test_closing
347370
arr = [1, 2, 3]
348371
once = 0

0 commit comments

Comments
 (0)