Skip to content

feat: add multistream http download #1710

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions node/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,10 @@ func ConfigBoost(cfg *config.Boost) Option {
return Error(fmt.Errorf("Detected custom DAG store path %s. The DAG store must be at $BOOST_PATH/dagstore", cfg.DAGStore.RootDir))
}

if cfg.HttpDownload.NChunks < 1 || cfg.HttpDownload.NChunks > 16 {
return Error(errors.New("HttpDownload.NChunks should be between 1 and 16"))
}

legacyFees := cfg.LotusFees.Legacy()

return Options(
Expand Down
3 changes: 3 additions & 0 deletions node/config/def.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,9 @@ func DefaultBoost() *Boost {
Port: 3104,
},
},
HttpDownload: HttpDownloadConfig{
NChunks: 5,
},
}
return cfg
}
Expand Down
16 changes: 16 additions & 0 deletions node/config/doc_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions node/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type Boost struct {
Tracing TracingConfig
LocalIndexDirectory LocalIndexDirectoryConfig
ContractDeals ContractDealsConfig
HttpDownload HttpDownloadConfig

// Lotus configs
LotusDealmaking lotus_config.DealmakingConfig
Expand Down Expand Up @@ -421,3 +422,10 @@ type LocalIndexDirectoryConfig struct {
type LocalIndexDirectoryLeveldbConfig struct {
Enabled bool
}

type HttpDownloadConfig struct {
// NChunks is a number of chunks to split HTTP downloads into. Each chunk is downloaded in the goroutine of its own
// which improves the overall download speed. NChunks is always equal to 1 for libp2p transport because libp2p server
// doesn't support range requests yet. NChunks must be greater than 0 and less than 16, with the default of 5.
NChunks int
}
2 changes: 1 addition & 1 deletion node/modules/storageminer.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,7 @@ func NewStorageMarketProvider(provAddr address.Address, cfg *config.Boost) func(
SealingPipelineCacheTimeout: time.Duration(cfg.Dealmaking.SealingPipelineCacheTimeout),
}
dl := logs.NewDealLogger(logsDB)
tspt := httptransport.New(h, dl)
tspt := httptransport.New(h, dl, httptransport.NChunksOpt(cfg.HttpDownload.NChunks))
prov, err := storagemarket.NewProvider(prvCfg, sqldb, dealsDB, fundMgr, storageMgr, a, dp, provAddr, secb, commpc,
sps, cdm, df, logsSqlDB.db, logsDB, piecedirectory, ip, lp, &signatureVerifier{a}, dl, tspt)
if err != nil {
Expand Down
167 changes: 100 additions & 67 deletions testutil/httptestfileservers.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ func HttpTestUnstartedFileServer(t *testing.T, dir string) *HttpTestServer {
return tsrv
}

func addContentLengthHeader(w http.ResponseWriter, l int) {
w.Header().Add("Content-Length", strconv.Itoa(l))
w.WriteHeader(200)
}

type unblockInfo struct {
ch chan struct{}
closeOnce sync.Once
Expand All @@ -73,22 +78,37 @@ func NewBlockingHttpTestServer(t *testing.T, dir string) *BlockingHttpTestServer
}

svc := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// wait till serving the file is unblocked
name := path.Clean(strings.TrimPrefix(r.URL.Path, "/"))
b.mu.Lock()
ubi := b.unblock[name]
b.mu.Unlock()
<-ubi.ch

// serve the file
upath := r.URL.Path
if !strings.HasPrefix(upath, "/") {
upath = "/" + upath
r.URL.Path = upath
}
fp := path.Clean(r.URL.Path)
absPath := filepath.Join(dir, fp)
http.ServeFile(w, r, absPath)

switch r.Method {
case http.MethodGet:
// wait till serving the file is unblocked
name := path.Clean(strings.TrimPrefix(r.URL.Path, "/"))
b.mu.Lock()
ubi := b.unblock[name]
b.mu.Unlock()
<-ubi.ch

// serve the file
upath := r.URL.Path
if !strings.HasPrefix(upath, "/") {
upath = "/" + upath
r.URL.Path = upath
}
http.ServeFile(w, r, absPath)
case http.MethodHead:
fp := path.Clean(r.URL.Path)
absPath := filepath.Join(dir, fp)
stat, err := os.Stat(absPath)
if err != nil {
t.Logf("failed to get file stat: %s", err.Error())
w.WriteHeader(500)
return
}
addContentLengthHeader(w, int(stat.Size()))
}

}))

b.svc = svc
Expand Down Expand Up @@ -146,64 +166,77 @@ func GetConn(r *http.Request) net.Conn {
// starting at the start offset mentioned in the Range request.
func HttpTestDisconnectingServer(t *testing.T, dir string, afterEvery int64) *httptest.Server {
svr := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// process the start offset
offset := r.Header.Get("Range")
finalOffset := strings.TrimSuffix(strings.TrimPrefix(offset, "bytes="), "-")
start, _ := strconv.ParseInt(finalOffset, 10, 64)
// only send `afterEvery` bytes and then disconnect
end := start + afterEvery

// open the file to serve
upath := r.URL.Path
if !strings.HasPrefix(upath, "/") {
upath = "/" + upath
r.URL.Path = upath
}
fp := path.Clean(r.URL.Path)
absPath := filepath.Join(dir, fp)
f, err := os.Open(absPath)
if err != nil {
t.Logf("failed to open file to serve: %s", err)
w.WriteHeader(500)
return
}
defer f.Close()

// prevent buffer overflow
fi, err := f.Stat()
if err != nil {
t.Logf("failed to stat file: %s", err)
w.WriteHeader(500)
return
}
if end > fi.Size() {
end = fi.Size()
}

// read (end-start) bytes from the file starting at the given offset and write them to the response
bz := make([]byte, end-start)
n, err := f.ReadAt(bz, start)
if err != nil {
t.Logf("failed to read file: %s", err)
w.WriteHeader(500)
return
}
if int64(n) != (end - start) {
w.WriteHeader(500)
return
}

w.WriteHeader(200)
_, err = w.Write(bz)
if err != nil {
t.Logf("failed to write file: %s", err)
w.WriteHeader(500)
return
switch r.Method {
case http.MethodGet:
// process the start offset
offset := r.Header.Get("Range")
startend := strings.Split(strings.TrimPrefix(offset, "bytes="), "-")
start, _ := strconv.ParseInt(startend[0], 10, 64)
// only send `afterEvery` bytes and then disconnect
end := start + afterEvery

// open the file to serve
upath := r.URL.Path
if !strings.HasPrefix(upath, "/") {
upath = "/" + upath
r.URL.Path = upath
}
f, err := os.Open(absPath)
if err != nil {
t.Logf("failed to open file to serve: %s", err)
w.WriteHeader(500)
return
}
defer f.Close()

// prevent buffer overflow
fi, err := f.Stat()
if err != nil {
t.Logf("failed to stat file: %s", err)
w.WriteHeader(500)
return
}
if end > fi.Size() {
end = fi.Size()
}

// read (end-start) bytes from the file starting at the given offset and write them to the response
bz := make([]byte, end-start)
n, err := f.ReadAt(bz, start)
if err != nil {
t.Logf("failed to read file: %s", err)
w.WriteHeader(500)
return
}
if int64(n) != (end - start) {
w.WriteHeader(500)
return
}

w.WriteHeader(200)
_, err = w.Write(bz)
if err != nil {
t.Logf("failed to write file: %s", err)
w.WriteHeader(500)
return
}

// close the connection so client sees an error while reading the response
c := GetConn(r)
c.Close() //nolint:errcheck
case http.MethodHead:
stat, err := os.Stat(absPath)
if err != nil {
t.Logf("failed to get file stat: %s", err)
w.WriteHeader(500)
return
}
addContentLengthHeader(w, int(stat.Size()))
}

// close the connection so client sees an error while reading the response
c := GetConn(r)
c.Close() //nolint:errcheck
}))
svr.Config.ConnContext = SaveConnInContext

Expand Down
Loading