Skip to content

Commit 9c12637

Browse files
committed
🔥 feat: Add StreamResponseBody support for the Client
1 parent 64a7113 commit 9c12637

File tree

6 files changed

+311
-8
lines changed

6 files changed

+311
-8
lines changed

client/client.go

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,10 @@ type Client struct {
5555
userResponseHooks []ResponseHook
5656
builtinResponseHooks []ResponseHook
5757

58-
timeout time.Duration
59-
mu sync.RWMutex
60-
debug bool
58+
timeout time.Duration
59+
mu sync.RWMutex
60+
debug bool
61+
streamResponseBody bool
6162
}
6263

6364
// R creates a new Request associated with the client.
@@ -435,6 +436,20 @@ func (c *Client) DisableDebug() *Client {
435436
return c
436437
}
437438

439+
// StreamResponseBody returns the current StreamResponseBody setting.
440+
func (c *Client) StreamResponseBody() bool {
441+
return c.streamResponseBody
442+
}
443+
444+
// SetStreamResponseBody enables or disables response body streaming.
445+
// When enabled, the response body can be read as a stream using BodyStream()
446+
// instead of being fully loaded into memory. This is useful for large responses
447+
// or server-sent events.
448+
func (c *Client) SetStreamResponseBody(enable bool) *Client {
449+
c.streamResponseBody = enable
450+
return c
451+
}
452+
438453
// SetCookieJar sets the cookie jar for the client.
439454
func (c *Client) SetCookieJar(cookieJar *CookieJar) *Client {
440455
c.cookieJar = cookieJar

client/client_test.go

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1733,3 +1733,151 @@ func Benchmark_Client_Request_Parallel(b *testing.B) {
17331733
require.NoError(b, err)
17341734
})
17351735
}
1736+
1737+
func Test_Client_StreamResponseBody(t *testing.T) {
1738+
t.Parallel()
1739+
client := New()
1740+
require.False(t, client.StreamResponseBody())
1741+
client.SetStreamResponseBody(true)
1742+
require.True(t, client.StreamResponseBody())
1743+
client.SetStreamResponseBody(false)
1744+
require.False(t, client.StreamResponseBody())
1745+
}
1746+
1747+
func Test_Client_StreamResponseBody_ServerSentEvents(t *testing.T) {
1748+
t.Parallel()
1749+
1750+
app, addr := startTestServerWithPort(t, func(app *fiber.App) {
1751+
app.Get("/sse", func(c fiber.Ctx) error {
1752+
c.Set("Content-Type", "text/event-stream")
1753+
c.Set("Cache-Control", "no-cache")
1754+
c.Set("Connection", "keep-alive")
1755+
1756+
messages := []string{
1757+
"data: message 1\n\n",
1758+
"data: message 2\n\n",
1759+
"data: message 3\n\n",
1760+
}
1761+
1762+
for _, msg := range messages {
1763+
if _, err := c.WriteString(msg); err != nil {
1764+
return err
1765+
}
1766+
}
1767+
1768+
return nil
1769+
})
1770+
})
1771+
defer func() { require.NoError(t, app.Shutdown()) }()
1772+
1773+
client := New().SetStreamResponseBody(true)
1774+
resp, err := client.Get("http://" + addr + "/sse")
1775+
require.NoError(t, err)
1776+
defer resp.Close()
1777+
1778+
bodyStream := resp.BodyStream()
1779+
require.NotNil(t, bodyStream)
1780+
1781+
buffer := make([]byte, 1024)
1782+
n, err := bodyStream.Read(buffer)
1783+
require.NoError(t, err)
1784+
require.Greater(t, n, 0)
1785+
1786+
content := string(buffer[:n])
1787+
require.Contains(t, content, "data: message 1")
1788+
}
1789+
1790+
func Test_Client_StreamResponseBody_LargeResponse(t *testing.T) {
1791+
t.Parallel()
1792+
1793+
largeData := make([]byte, 1024*1024)
1794+
for i := range largeData {
1795+
largeData[i] = byte(i % 256)
1796+
}
1797+
1798+
app, addr := startTestServerWithPort(t, func(app *fiber.App) {
1799+
app.Get("/large", func(c fiber.Ctx) error {
1800+
return c.Send(largeData)
1801+
})
1802+
})
1803+
defer func() { require.NoError(t, app.Shutdown()) }()
1804+
client := New().SetStreamResponseBody(true)
1805+
resp, err := client.Get("http://" + addr + "/large")
1806+
require.NoError(t, err)
1807+
defer resp.Close()
1808+
bodyStream := resp.BodyStream()
1809+
require.NotNil(t, bodyStream)
1810+
streamedData, err := io.ReadAll(bodyStream)
1811+
require.NoError(t, err)
1812+
require.Equal(t, largeData, streamedData)
1813+
client2 := New()
1814+
resp2, err := client2.Get("http://" + addr + "/large")
1815+
require.NoError(t, err)
1816+
defer resp2.Close()
1817+
body := resp2.Body()
1818+
require.Equal(t, largeData, body)
1819+
}
1820+
1821+
func Test_Client_StreamResponseBody_Disabled_Default(t *testing.T) {
1822+
t.Parallel()
1823+
1824+
app, addr := startTestServerWithPort(t, func(app *fiber.App) {
1825+
app.Get("/test", func(c fiber.Ctx) error {
1826+
return c.SendString("Hello, World!")
1827+
})
1828+
})
1829+
defer func() { require.NoError(t, app.Shutdown()) }()
1830+
1831+
client := New()
1832+
resp, err := client.Get("http://" + addr + "/test")
1833+
require.NoError(t, err)
1834+
defer resp.Close()
1835+
1836+
body := resp.Body()
1837+
require.Equal(t, "Hello, World!", string(body))
1838+
1839+
bodyStream := resp.BodyStream()
1840+
require.NotNil(t, bodyStream)
1841+
}
1842+
1843+
func Test_Client_StreamResponseBody_ChainableMethods(t *testing.T) {
1844+
t.Parallel()
1845+
1846+
client := New().
1847+
SetStreamResponseBody(true).
1848+
SetTimeout(time.Second * 5).
1849+
SetStreamResponseBody(false)
1850+
1851+
require.False(t, client.StreamResponseBody())
1852+
}
1853+
1854+
func Test_Request_StreamResponseBody(t *testing.T) {
1855+
t.Parallel()
1856+
1857+
app, addr := startTestServerWithPort(t, func(app *fiber.App) {
1858+
app.Get("/test", func(c fiber.Ctx) error {
1859+
return c.SendString("Hello, World!")
1860+
})
1861+
})
1862+
defer func() { require.NoError(t, app.Shutdown()) }()
1863+
1864+
client := New().SetStreamResponseBody(false) // client has streaming disabled
1865+
req := client.R().SetStreamResponseBody(true)
1866+
require.True(t, req.StreamResponseBody())
1867+
1868+
resp, err := req.Get("http://" + addr + "/test")
1869+
require.NoError(t, err)
1870+
defer resp.Close()
1871+
bodyStream := resp.BodyStream()
1872+
require.NotNil(t, bodyStream)
1873+
req2 := client.R().SetStreamResponseBody(false)
1874+
require.False(t, req2.StreamResponseBody())
1875+
clientWithStreaming := New().SetStreamResponseBody(true)
1876+
req3 := clientWithStreaming.R()
1877+
require.True(t, req3.StreamResponseBody()) // Should inherit from client
1878+
req4 := client.R().
1879+
SetStreamResponseBody(true).
1880+
SetTimeout(time.Second * 5).
1881+
SetStreamResponseBody(false)
1882+
require.False(t, req4.StreamResponseBody())
1883+
}

client/core.go

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,42 @@ func (c *core) execFunc() (*Response, error) {
8181
c.req.RawRequest.CopyTo(reqv)
8282
cfg := c.getRetryConfig()
8383

84+
// Determine which client to use - create a new one if StreamResponseBody differs
85+
var fastHttpClient *fasthttp.Client
86+
requestStreamResponseBody := c.req.StreamResponseBody()
87+
88+
if requestStreamResponseBody != c.client.streamResponseBody {
89+
// Request setting differs from client setting, create a temporary client
90+
c.client.mu.RLock()
91+
original := c.client.fasthttp
92+
fastHttpClient = &fasthttp.Client{
93+
Dial: original.Dial,
94+
DialDualStack: original.DialDualStack,
95+
TLSConfig: original.TLSConfig,
96+
MaxConnsPerHost: original.MaxConnsPerHost,
97+
MaxIdleConnDuration: original.MaxIdleConnDuration,
98+
MaxConnDuration: original.MaxConnDuration,
99+
ReadTimeout: original.ReadTimeout,
100+
WriteTimeout: original.WriteTimeout,
101+
ReadBufferSize: original.ReadBufferSize,
102+
WriteBufferSize: original.WriteBufferSize,
103+
MaxResponseBodySize: original.MaxResponseBodySize,
104+
NoDefaultUserAgentHeader: original.NoDefaultUserAgentHeader,
105+
DisableHeaderNamesNormalizing: original.DisableHeaderNamesNormalizing,
106+
DisablePathNormalizing: original.DisablePathNormalizing,
107+
MaxIdemponentCallAttempts: original.MaxIdemponentCallAttempts,
108+
Name: original.Name,
109+
ConfigureClient: original.ConfigureClient,
110+
111+
// Request-specific override
112+
StreamResponseBody: requestStreamResponseBody,
113+
}
114+
c.client.mu.RUnlock()
115+
} else {
116+
// Use the client's fasthttp client directly
117+
fastHttpClient = c.client.fasthttp
118+
}
119+
84120
var err error
85121
go func() {
86122
respv := fasthttp.AcquireResponse()
@@ -93,15 +129,15 @@ func (c *core) execFunc() (*Response, error) {
93129
// Use an exponential backoff retry strategy.
94130
err = retry.NewExponentialBackoff(*cfg).Retry(func() error {
95131
if c.req.maxRedirects > 0 && (string(reqv.Header.Method()) == fiber.MethodGet || string(reqv.Header.Method()) == fiber.MethodHead) {
96-
return c.client.fasthttp.DoRedirects(reqv, respv, c.req.maxRedirects)
132+
return fastHttpClient.DoRedirects(reqv, respv, c.req.maxRedirects)
97133
}
98-
return c.client.fasthttp.Do(reqv, respv)
134+
return fastHttpClient.Do(reqv, respv)
99135
})
100136
} else {
101137
if c.req.maxRedirects > 0 && (string(reqv.Header.Method()) == fiber.MethodGet || string(reqv.Header.Method()) == fiber.MethodHead) {
102-
err = c.client.fasthttp.DoRedirects(reqv, respv, c.req.maxRedirects)
138+
err = fastHttpClient.DoRedirects(reqv, respv, c.req.maxRedirects)
103139
} else {
104-
err = c.client.fasthttp.Do(reqv, respv)
140+
err = fastHttpClient.Do(reqv, respv)
105141
}
106142
}
107143

client/request.go

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,8 @@ type Request struct {
6767
timeout time.Duration
6868
maxRedirects int
6969

70-
bodyType bodyType
70+
bodyType bodyType
71+
streamResponseBody *bool // nil means use client setting
7172
}
7273

7374
// Method returns the HTTP method set in the Request.
@@ -590,6 +591,25 @@ func (r *Request) SetMaxRedirects(count int) *Request {
590591
return r
591592
}
592593

594+
// StreamResponseBody returns the StreamResponseBody setting for this request.
595+
// Returns the client's setting if not explicitly set at the request level.
596+
func (r *Request) StreamResponseBody() bool {
597+
if r.streamResponseBody != nil {
598+
return *r.streamResponseBody
599+
}
600+
if r.client != nil {
601+
return r.client.streamResponseBody
602+
}
603+
return false
604+
}
605+
606+
// SetStreamResponseBody sets the StreamResponseBody option for this specific request,
607+
// overriding the client-level setting.
608+
func (r *Request) SetStreamResponseBody(enable bool) *Request {
609+
r.streamResponseBody = &enable
610+
return r
611+
}
612+
593613
// checkClient ensures that a Client is set. If none is set, it defaults to the global defaultClient.
594614
func (r *Request) checkClient() {
595615
if r.client == nil {
@@ -656,6 +676,7 @@ func (r *Request) Reset() {
656676
r.maxRedirects = 0
657677
r.bodyType = noBody
658678
r.boundary = boundary
679+
r.streamResponseBody = nil
659680

660681
for len(r.files) != 0 {
661682
t := r.files[0]

client/response.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,17 @@ func (r *Response) Body() []byte {
8989
return r.RawResponse.Body()
9090
}
9191

92+
// BodyStream returns the response body as a stream reader.
93+
// Note: When using BodyStream(), the response body is not copied to memory,
94+
// so calling Body() afterwards may return an empty slice.
95+
func (r *Response) BodyStream() io.Reader {
96+
if stream := r.RawResponse.BodyStream(); stream != nil {
97+
return stream
98+
}
99+
// If streaming is not enabled, return a bytes.Reader from the regular body
100+
return bytes.NewReader(r.RawResponse.Body())
101+
}
102+
92103
// String returns the response body as a trimmed string.
93104
func (r *Response) String() string {
94105
return utils.Trim(string(r.Body()), ' ')

client/response_test.go

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -538,3 +538,75 @@ func Test_Response_Save(t *testing.T) {
538538
require.Error(t, err)
539539
})
540540
}
541+
542+
func Test_Response_BodyStream(t *testing.T) {
543+
t.Parallel()
544+
545+
server := startTestServer(t, func(app *fiber.App) {
546+
app.Get("/stream", func(c fiber.Ctx) error {
547+
return c.SendString("streaming data")
548+
})
549+
app.Get("/large", func(c fiber.Ctx) error {
550+
data := make([]byte, 1024)
551+
for i := range data {
552+
data[i] = byte('A' + i%26)
553+
}
554+
return c.Send(data)
555+
})
556+
})
557+
defer server.stop()
558+
559+
t.Run("basic streaming", func(t *testing.T) {
560+
client := New().SetDial(server.dial()).SetStreamResponseBody(true)
561+
562+
resp, err := client.Get("http://example.com/stream")
563+
require.NoError(t, err)
564+
defer resp.Close()
565+
bodyStream := resp.BodyStream()
566+
require.NotNil(t, bodyStream)
567+
data, err := io.ReadAll(bodyStream)
568+
require.NoError(t, err)
569+
require.Equal(t, "streaming data", string(data))
570+
})
571+
572+
t.Run("large response streaming", func(t *testing.T) {
573+
client := New().SetDial(server.dial()).SetStreamResponseBody(true)
574+
resp, err := client.Get("http://example.com/large")
575+
require.NoError(t, err)
576+
defer resp.Close()
577+
bodyStream := resp.BodyStream()
578+
require.NotNil(t, bodyStream)
579+
buffer := make([]byte, 256)
580+
var totalRead []byte
581+
for {
582+
n, err := bodyStream.Read(buffer)
583+
if n > 0 {
584+
totalRead = append(totalRead, buffer[:n]...)
585+
}
586+
if err == io.EOF {
587+
break
588+
}
589+
require.NoError(t, err)
590+
}
591+
require.Equal(t, 1024, len(totalRead))
592+
for i := 0; i < 1024; i++ {
593+
expected := byte('A' + i%26)
594+
require.Equal(t, expected, totalRead[i])
595+
}
596+
})
597+
598+
t.Run("compare with regular body", func(t *testing.T) {
599+
client1 := New().SetDial(server.dial())
600+
resp1, err := client1.Get("http://example.com/stream")
601+
require.NoError(t, err)
602+
defer resp1.Close()
603+
normalBody := resp1.Body()
604+
client2 := New().SetDial(server.dial()).SetStreamResponseBody(true)
605+
resp2, err := client2.Get("http://example.com/stream")
606+
require.NoError(t, err)
607+
defer resp2.Close()
608+
streamedBody, err := io.ReadAll(resp2.BodyStream())
609+
require.NoError(t, err)
610+
require.Equal(t, normalBody, streamedBody)
611+
})
612+
}

0 commit comments

Comments
 (0)