Skip to content
Open
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion pkg/scheduler/tests/restclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,13 @@ func (c *RClient) GetEventsStream(count uint64) (io.ReadCloser, error) {
return nil, err
}
req.URL.RawQuery = "count=" + strconv.FormatUint(count, 10)
resp, err := http.DefaultClient.Do(req)
tr := &http.Transport{
DisableCompression: true,
}
client := &http.Client{
Transport: tr,
}
resp, err := client.Do(req)
if err != nil {
return nil, err
}
Expand Down
84 changes: 84 additions & 0 deletions pkg/webservice/handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,18 @@
package webservice

import (
"compress/gzip"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net"
"net/http"
"net/http/httptest"
"net/url"
"reflect"
"sort"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -2602,3 +2607,82 @@ func NewResponseRecorderWithDeadline() *ResponseRecorderWithDeadline {
ResponseRecorder: httptest.NewRecorder(),
}
}

func TestCompressGetQueueApplicationAPI(t *testing.T) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we apply the compression to all restful APIs by changing the webservice directly, it seems we should add test for webservice instead of verifying all APIs here. For example: we can modify newRouter to make it accept custom routes, and then we write some dumb routes in testing to check the gzip compression.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for the dummy route

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test about webservice has been added. I don't think we should modify newRouter just for the test case, so I kept it as it is. Instead, I simulate the action in webservice and then check whether the result is correct.

schedulerContext, err := scheduler.NewClusterContext(rmID, policyGroup, []byte(configDefault))
assert.NilError(t, err, "Error when creating cluster context.")
partitionName := common.GetNormalizedPartitionName("default", rmID)
part := schedulerContext.GetPartition(partitionName)
_ = addApp(t, "app-1", part, "root.default", false)
_ = addApp(t, "app-2", part, "root.default", false)
_ = addApp(t, "app-3", part, "root.default", false)

m := NewWebApp(schedulerContext, nil)
m.StartWebApp()
defer func() {
err = m.StopWebApp()
assert.NilError(t, err, "Error when closing webapp service.")
}()

err = common.WaitFor(500*time.Millisecond, 2*time.Second, func() bool {
conn, connErr := net.DialTimeout("tcp", net.JoinHostPort("127.0.0.1", "9080"), time.Second)
if connErr == nil {
defer conn.Close()
return true
}
return false
})
assert.NilError(t, err, "Webapp failed to start in 2 seconds.")

u := &url.URL{
Host: "localhost:9080",
Scheme: "http",
Path: "/ws/v1/partition/default/queue/root.default/applications",
}

// request without gzip compression
var buf io.ReadWriter
req, err := http.NewRequest("GET", u.String(), buf)
assert.NilError(t, err, "Create new http request failed.")
req.Header.Set("Accept", "application/json")
// prevent http.DefaultClient from automatically adding gzip header
req.Header.Set("Accept-Encoding", "deflate")
resp, err := http.DefaultClient.Do(req)
assert.NilError(t, err, "Request failed to send.")
data, err := io.ReadAll(resp.Body)
assert.NilError(t, err, "Failed when reading data.")
defer resp.Body.Close()
var originalResp []*dao.ApplicationDAOInfo
err = json.Unmarshal(data, &originalResp)
assert.NilError(t, err, unmarshalError)

// request with gzip compression enabled
req.Header.Set("Accept-Encoding", "gzip")
resp2, err := http.DefaultClient.Do(req)
assert.NilError(t, err, "Request failed to send.")
gzipReader, err := gzip.NewReader(resp2.Body)
assert.NilError(t, err, "Failed to create gzip reader.")
data2, err := io.ReadAll(gzipReader)
assert.NilError(t, err, "Failed when reading data.")
defer resp2.Body.Close()
defer gzipReader.Close()
var compressedResp []*dao.ApplicationDAOInfo
err = json.Unmarshal(data2, &compressedResp)
assert.NilError(t, err, unmarshalError)

sort.Slice(originalResp, func(i, j int) bool {
return (originalResp[i].SubmissionTime < originalResp[j].SubmissionTime)
})
sort.Slice(compressedResp, func(i, j int) bool {
return (compressedResp[i].SubmissionTime < compressedResp[j].SubmissionTime)
})

for i := 0; i < 3; i++ {
assert.Equal(t, originalResp[i].ApplicationID, compressedResp[i].ApplicationID)
assert.Equal(t, originalResp[i].Partition, compressedResp[i].Partition)
assert.Equal(t, originalResp[i].QueueName, compressedResp[i].QueueName)
assert.Equal(t, originalResp[i].SubmissionTime, compressedResp[i].SubmissionTime)
assert.Equal(t, originalResp[i].User, compressedResp[i].User)
assert.DeepEqual(t, originalResp[i].Groups, compressedResp[i].Groups)
}
}
29 changes: 28 additions & 1 deletion pkg/webservice/webservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@
package webservice

import (
"compress/gzip"
"context"
"errors"
"io"
"net/http"
"strings"
"time"

"github.com/julienschmidt/httprouter"
Expand All @@ -43,7 +46,7 @@ type WebService struct {
func newRouter() *httprouter.Router {
router := httprouter.New()
for _, webRoute := range webRoutes {
handler := loggingHandler(webRoute.HandlerFunc, webRoute.Name)
handler := gzipHandler(loggingHandler(webRoute.HandlerFunc, webRoute.Name))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should unconditionally GZIP every response, only those that might be large. I'd suggest adding a "Compress" boolean to the route object in routes.go and use that to determine whether to add the gzip handler.

Copy link
Contributor

@craigcondit craigcondit Jul 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example, none of the /debug routes should have it (many of those generate binary data which is not compressible, and/or are simple text that wouldn't benefit). It should probably not be on anything that typically results in < 10 KB of data.

router.Handler(webRoute.Method, webRoute.Pattern, handler)
}
return router
Expand Down Expand Up @@ -94,3 +97,27 @@ func (m *WebService) StopWebApp() error {

return nil
}

type gzipResponseWriter struct {
io.Writer
http.ResponseWriter
}

func (w gzipResponseWriter) Write(b []byte) (int, error) {
return w.Writer.Write(b)
}

func gzipHandler(fn http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if !strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") {
fn(w, r)
return
}
w.Header().Set("Content-Encoding", "gzip")
w.Header().Del("Content-Length")
gz := gzip.NewWriter(w)
defer gz.Close()
gzr := gzipResponseWriter{Writer: gz, ResponseWriter: w}
fn(gzr, r)
}
}