From 0305a00f2e46072eb8d55c207653be3107a1248a Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Wed, 21 Jun 2023 19:52:21 +0200 Subject: [PATCH 1/2] perf/go-libp2p: minor simplifications --- perf/impl/go-libp2p/v0.27/perf.go | 80 ++++++++++++++----------------- 1 file changed, 36 insertions(+), 44 deletions(-) diff --git a/perf/impl/go-libp2p/v0.27/perf.go b/perf/impl/go-libp2p/v0.27/perf.go index c748dda4a..33d89835d 100644 --- a/perf/impl/go-libp2p/v0.27/perf.go +++ b/perf/impl/go-libp2p/v0.27/perf.go @@ -17,9 +17,8 @@ import ( var log = logging.Logger("perf") const ( - BlockSize = 64 << 10 - - ID = "/perf/1.0.0" + ID = "/perf/1.0.0" + blockSize = 64 << 10 ) type PerfService struct { @@ -32,10 +31,9 @@ func NewPerfService(h host.Host) *PerfService { return ps } -func (p *PerfService) PerfHandler(s network.Stream) { +func (ps *PerfService) PerfHandler(s network.Stream) { u64Buf := make([]byte, 8) - _, err := io.ReadFull(s, u64Buf) - if err != nil { + if _, err := io.ReadFull(s, u64Buf); err != nil { log.Errorw("err", err) s.Reset() return @@ -43,51 +41,18 @@ func (p *PerfService) PerfHandler(s network.Stream) { bytesToSend := binary.BigEndian.Uint64(u64Buf) - _, err = p.drainStream(context.Background(), s) - if err != nil { + if _, err := drainStream(s); err != nil { log.Errorw("err", err) s.Reset() return } - err = p.sendBytes(context.Background(), s, bytesToSend) - if err != nil { + if err := sendBytes(s, bytesToSend); err != nil { log.Errorw("err", err) s.Reset() return } - -} - -func (ps *PerfService) sendBytes(ctx context.Context, s network.Stream, bytesToSend uint64) error { - buf := pool.Get(BlockSize) - defer pool.Put(buf) - - for bytesToSend > 0 { - toSend := buf - if bytesToSend < BlockSize { - toSend = buf[:bytesToSend] - } - - n, err := s.Write(toSend) - if err != nil { - return err - } - bytesToSend -= uint64(n) - } s.CloseWrite() - - return nil -} - -func (ps *PerfService) drainStream(ctx context.Context, s network.Stream) (uint64, error) { - var recvd int64 - recvd, err := io.Copy(io.Discard, s) - if err != nil && err != io.EOF { - s.Reset() - return uint64(recvd), err - } - return uint64(recvd), nil } func (ps *PerfService) RunPerf(ctx context.Context, p peer.ID, bytesToSend uint64, bytesToRecv uint64) (time.Duration, time.Duration, error) { @@ -105,14 +70,13 @@ func (ps *PerfService) RunPerf(ctx context.Context, p peer.ID, bytesToSend uint6 } sendStart := time.Now() - err = ps.sendBytes(ctx, s, bytesToSend) - if err != nil { + if err := sendBytes(s, bytesToSend); err != nil { return 0, 0, err } sendDuration := time.Since(sendStart) recvStart := time.Now() - recvd, err := ps.drainStream(ctx, s) + recvd, err := drainStream(s) if err != nil { return sendDuration, 0, err } @@ -124,3 +88,31 @@ func (ps *PerfService) RunPerf(ctx context.Context, p peer.ID, bytesToSend uint6 return sendDuration, recvDuration, nil } + +func sendBytes(s io.Writer, bytesToSend uint64) error { + buf := pool.Get(blockSize) + defer pool.Put(buf) + + for bytesToSend > 0 { + toSend := buf + if bytesToSend < blockSize { + toSend = buf[:bytesToSend] + } + + n, err := s.Write(toSend) + if err != nil { + return err + } + bytesToSend -= uint64(n) + } + return nil +} + +func drainStream(s io.Reader) (uint64, error) { + var recvd int64 + recvd, err := io.Copy(io.Discard, s) + if err != nil && err != io.EOF { + return uint64(recvd), err + } + return uint64(recvd), nil +} From 92c9d078a39eea88cf6658e795dc7c3f25abaa37 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Wed, 21 Jun 2023 21:02:46 +0200 Subject: [PATCH 2/2] perf/https: simplifications --- perf/impl/https/v0.1/main.go | 133 +++++++++++++++++++---------------- 1 file changed, 72 insertions(+), 61 deletions(-) diff --git a/perf/impl/https/v0.1/main.go b/perf/impl/https/v0.1/main.go index 24ed4befb..30373b75a 100644 --- a/perf/impl/https/v0.1/main.go +++ b/perf/impl/https/v0.1/main.go @@ -1,6 +1,7 @@ package main import ( + "bytes" "crypto/ecdsa" "crypto/elliptic" "crypto/rand" @@ -21,88 +22,71 @@ import ( "time" ) -const ( - BlockSize = 64 << 10 -) +const blockSize = 64 << 10 func handleRequest(w http.ResponseWriter, r *http.Request) { - // Read the big-endian bytesToSend value - var bytesToSend uint64 - if err := binary.Read(r.Body, binary.BigEndian, &bytesToSend); err != nil { - http.Error(w, "failed to read uint64 value", http.StatusBadRequest) + u64Buf := make([]byte, 8) + if _, err := io.ReadFull(r.Body, u64Buf); err != nil { + log.Printf("reading upload size failed: %s", err) + w.WriteHeader(http.StatusBadRequest) return } - // Read and discard the remaining bytes in the request - io.Copy(io.Discard, r.Body) - - // Set content type and length headers - w.Header().Set("Content-Type", "application/octet-stream") - w.Header().Set("Content-Length", strconv.FormatUint(bytesToSend, 10)) - - // Write the status code explicitly - w.WriteHeader(http.StatusOK) + bytesToSend := binary.BigEndian.Uint64(u64Buf) - buf := make([]byte, BlockSize) + if _, err := drainStream(r.Body); err != nil { + log.Printf("draining stream failed: %s", err) + w.WriteHeader(http.StatusBadRequest) + return + } - for bytesToSend > 0 { - toSend := buf - if bytesToSend < BlockSize { - toSend = buf[:bytesToSend] - } + r.Header.Set("Content-Type", "application/octet-stream") + r.Header.Set("Content-Length", strconv.FormatUint(bytesToSend, 10)) - n, err := w.Write(toSend) - if err != nil { - http.Error(w, "Failed write", http.StatusInternalServerError) - return - } - bytesToSend -= uint64(n) + if err := sendBytes(w, bytesToSend); err != nil { + log.Printf("sending response failed: %s", err) + return } } -var zeroSlice = make([]byte, BlockSize) // Pre-made zero-filled slice - -type customReader struct { - downloadBytes uint64 - uploadBytes uint64 - position uint64 +type nullReader struct { + N uint64 + read uint64 } -func (c *customReader) Read(p []byte) (int, error) { - if c.position < 8 { - binary.BigEndian.PutUint64(p, c.downloadBytes) - c.position += 8 - return 8, nil - } else if c.position-8 < c.uploadBytes { - bytesToWrite := min(min(len(p), int(c.uploadBytes-(c.position-8))), len(zeroSlice)) - copy(p[:bytesToWrite], zeroSlice[:bytesToWrite]) // zero the slice - c.position += uint64(bytesToWrite) - return bytesToWrite, nil - } - - return 0, io.EOF -} +var _ io.Reader = &nullReader{} -// Helper function to get minimum of two integers -func min(a, b int) int { - if a < b { - return a +func (r *nullReader) Read(b []byte) (int, error) { + remaining := r.N - r.read + l := uint64(len(b)) + if uint64(len(b)) > remaining { + l = remaining } - return b + r.read += l + if r.read == r.N { + return int(l), io.EOF + } + return int(l), nil } func runClient(serverAddr string, uploadBytes, downloadBytes uint64) (time.Duration, time.Duration, error) { client := &http.Client{ Transport: &http.Transport{ - TLSClientConfig: &tls.Config{ - InsecureSkipVerify: true, - }, + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, }, } - reqBody := &customReader{downloadBytes: downloadBytes, uploadBytes: uploadBytes} - - req, err := http.NewRequest("POST", fmt.Sprintf("https://%s/", serverAddr), reqBody) + b := make([]byte, 8) + binary.BigEndian.PutUint64(b, uploadBytes) + + req, err := http.NewRequest( + http.MethodPost, + fmt.Sprintf("https://%s/", serverAddr), + io.MultiReader( + bytes.NewReader(b), + &nullReader{N: uploadBytes}, + ), + ) if err != nil { return 0, 0, err } @@ -121,11 +105,11 @@ func runClient(serverAddr string, uploadBytes, downloadBytes uint64) (time.Durat uploadDoneTime := time.Now() defer resp.Body.Close() - n, err := io.Copy(io.Discard, resp.Body) + n, err := drainStream(resp.Body) if err != nil { return 0, 0, fmt.Errorf("error reading response: %w", err) } - if n != int64(downloadBytes) { + if n != downloadBytes { return 0, 0, fmt.Errorf("expected %d bytes in response, but received %d", downloadBytes, n) } @@ -250,3 +234,30 @@ func main() { fmt.Println(string(jsonB)) } } + +func sendBytes(s io.Writer, bytesToSend uint64) error { + buf := make([]byte, blockSize) + + for bytesToSend > 0 { + toSend := buf + if bytesToSend < blockSize { + toSend = buf[:bytesToSend] + } + + n, err := s.Write(toSend) + if err != nil { + return err + } + bytesToSend -= uint64(n) + } + return nil +} + +func drainStream(s io.Reader) (uint64, error) { + var recvd int64 + recvd, err := io.Copy(io.Discard, s) + if err != nil && err != io.EOF { + return uint64(recvd), err + } + return uint64(recvd), nil +}