Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 164 additions & 0 deletions transport/httptransport/http_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"io"
"net/http"
"os"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -217,6 +218,169 @@ type transfer struct {
client *http.Client
dl *logs.DealLogger
}
type DealLogger struct {
// ... Define your DealLogger fields and methods here
}

type DealInfo struct {
DealUuid string
DealSize int64
OutputFile string
DealURL string
}

type AtomicWriter struct {
file *os.File
mu sync.Mutex
}

func (aw *AtomicWriter) WriteAt(data []byte, offset int64) (int, error) {
aw.mu.Lock()
defer aw.mu.Unlock()
return aw.file.WriteAt(data, offset)
}

type Downloader struct {
dealURL string
contentLength int64
numStreams int
}

func NewDownloader(dealURL string, contentLength, numStreams int64) *Downloader {
return &Downloader{
dealURL: dealURL,
contentLength: contentLength,
numStreams: int(numStreams),
}
}

type DownloadStatus struct {
StreamID int
Success bool
Message string
}

func (d *Downloader) DownloadChunk(start, end int64, streamID int, writer *AtomicWriter, statusChan chan<- DownloadStatus, wg *sync.WaitGroup) {
defer wg.Done()

status := DownloadStatus{StreamID: streamID}

client := &http.Client{}
req, err := http.NewRequest("GET", d.dealURL, nil)
if err != nil {
status.Success = false
status.Message = fmt.Sprintf("Error creating request: %s", err)
statusChan <- status
return
}

req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", start, end))

for retries := 3; retries > 0; retries-- {
resp, err := client.Do(req)
if err != nil {
status.Success = false
status.Message = fmt.Sprintf("Error downloading chunk: %s", err)
time.Sleep(time.Second * 5) // Backoff
continue
}
defer resp.Body.Close()

data := make([]byte, end-start+1)
_, err = resp.Body.Read(data)
if err != nil {
status.Success = false
status.Message = fmt.Sprintf("Error reading chunk data: %s", err)
time.Sleep(time.Second * 5) // Backoff
continue
}

_, err = writer.WriteAt(data, start)
if err != nil {
status.Success = false
status.Message = fmt.Sprintf("Error writing chunk to disk: %s", err)
time.Sleep(time.Second * 5) // Backoff
continue
}
status.Success = true
statusChan <- status
break
}
}

func getContentLength(url string) (int64, error) {
resp, err := http.Get(url)
if err != nil {
return 0, err
}
defer resp.Body.Close()

contentLengthHeader := resp.Header.Get("Content-Length")
if contentLengthHeader == "" {
return 0, fmt.Errorf("content length not provided in response")
}

contentLength, err := strconv.ParseInt(contentLengthHeader, 10, 64)
if err != nil {
return 0, err
}

return contentLength, nil
}

func (t *DealLogger) Infow(uuid string, msg string, keysAndValues ...interface{}) {
// Implement your DealLogger's Info logging here
}

func (t *transfer) executeMultiHttp(ctx context.Context) error {
duuid := t.dealInfo.DealUuid
t.dl.Infow(duuid, "execute multi http transfer", "deal size", t.dealInfo.DealSize, "output file", t.dealInfo.OutputFile)

contentLength, err := getContentLength(t.tInfo.URL)
if err != nil {
return err
}

numStreams := 10

file, err := os.Create(t.dealInfo.OutputFile)
if err != nil {
return err
}
defer file.Close()

atomicWriter := &AtomicWriter{file: file}

downloader := NewDownloader(t.tInfo.URL, contentLength, int64(numStreams))

chunkSize := downloader.contentLength / int64(downloader.numStreams-1)
remainingSize := downloader.contentLength % int64(downloader.numStreams-1)

statusChan := make(chan DownloadStatus)

var wg sync.WaitGroup
for i := 0; i < downloader.numStreams-1; i++ {
wg.Add(1)
go downloader.DownloadChunk(int64(i)*chunkSize, int64(i+1)*chunkSize-1, i, atomicWriter, statusChan, &wg)
}
// Download the remaining chunk using the last stream
wg.Add(1)
go downloader.DownloadChunk(int64(downloader.numStreams-1)*chunkSize, int64(downloader.numStreams-1)*chunkSize+remainingSize-1, downloader.numStreams-1, atomicWriter, statusChan, &wg)

// Wait for all downloads to complete
go func() {
wg.Wait()
close(statusChan)
}()

// Print download statuses
for status := range statusChan {
t.dl.Infow(duuid, fmt.Sprintf("Stream %d download", status.StreamID),
"Success", status.Success, "Message", status.Message)
}

return nil
}

func (t *transfer) execute(ctx context.Context) error {
duuid := t.dealInfo.DealUuid
Expand Down