Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: simplify go-libp2p and https implementation #201

Merged
merged 2 commits into from
Jun 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 36 additions & 44 deletions perf/impl/go-libp2p/v0.27/perf.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@ import (
var log = logging.Logger("perf")

const (
BlockSize = 64 << 10

ID = "/perf/1.0.0"
ID = "/perf/1.0.0"
blockSize = 64 << 10
)

type PerfService struct {
Expand All @@ -32,62 +31,28 @@ func NewPerfService(h host.Host) *PerfService {
return ps
}

func (p *PerfService) PerfHandler(s network.Stream) {
func (ps *PerfService) PerfHandler(s network.Stream) {
u64Buf := make([]byte, 8)
_, err := io.ReadFull(s, u64Buf)
if err != nil {
if _, err := io.ReadFull(s, u64Buf); err != nil {
log.Errorw("err", err)
s.Reset()
return
}

bytesToSend := binary.BigEndian.Uint64(u64Buf)

_, err = p.drainStream(context.Background(), s)
if err != nil {
if _, err := drainStream(s); err != nil {
log.Errorw("err", err)
s.Reset()
return
}

err = p.sendBytes(context.Background(), s, bytesToSend)
if err != nil {
if err := sendBytes(s, bytesToSend); err != nil {
log.Errorw("err", err)
s.Reset()
return
}

}

func (ps *PerfService) sendBytes(ctx context.Context, s network.Stream, bytesToSend uint64) error {
buf := pool.Get(BlockSize)
defer pool.Put(buf)

for bytesToSend > 0 {
toSend := buf
if bytesToSend < BlockSize {
toSend = buf[:bytesToSend]
}

n, err := s.Write(toSend)
if err != nil {
return err
}
bytesToSend -= uint64(n)
}
s.CloseWrite()

return nil
}

func (ps *PerfService) drainStream(ctx context.Context, s network.Stream) (uint64, error) {
var recvd int64
recvd, err := io.Copy(io.Discard, s)
if err != nil && err != io.EOF {
s.Reset()
return uint64(recvd), err
}
return uint64(recvd), nil
}

func (ps *PerfService) RunPerf(ctx context.Context, p peer.ID, bytesToSend uint64, bytesToRecv uint64) (time.Duration, time.Duration, error) {
Expand All @@ -105,14 +70,13 @@ func (ps *PerfService) RunPerf(ctx context.Context, p peer.ID, bytesToSend uint6
}

sendStart := time.Now()
err = ps.sendBytes(ctx, s, bytesToSend)
if err != nil {
if err := sendBytes(s, bytesToSend); err != nil {
return 0, 0, err
}
sendDuration := time.Since(sendStart)

recvStart := time.Now()
recvd, err := ps.drainStream(ctx, s)
recvd, err := drainStream(s)
if err != nil {
return sendDuration, 0, err
}
Expand All @@ -124,3 +88,31 @@ func (ps *PerfService) RunPerf(ctx context.Context, p peer.ID, bytesToSend uint6

return sendDuration, recvDuration, nil
}

func sendBytes(s io.Writer, bytesToSend uint64) error {
buf := pool.Get(blockSize)
defer pool.Put(buf)

for bytesToSend > 0 {
toSend := buf
if bytesToSend < blockSize {
toSend = buf[:bytesToSend]
}

n, err := s.Write(toSend)
if err != nil {
return err
}
bytesToSend -= uint64(n)
}
return nil
}

func drainStream(s io.Reader) (uint64, error) {
var recvd int64
recvd, err := io.Copy(io.Discard, s)
if err != nil && err != io.EOF {
return uint64(recvd), err
}
return uint64(recvd), nil
}
133 changes: 72 additions & 61 deletions perf/impl/https/v0.1/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"bytes"
"crypto/ecdsa"
"crypto/elliptic"
"crypto/rand"
Expand All @@ -21,88 +22,71 @@ import (
"time"
)

const (
BlockSize = 64 << 10
)
const blockSize = 64 << 10

func handleRequest(w http.ResponseWriter, r *http.Request) {
// Read the big-endian bytesToSend value
var bytesToSend uint64
if err := binary.Read(r.Body, binary.BigEndian, &bytesToSend); err != nil {
http.Error(w, "failed to read uint64 value", http.StatusBadRequest)
u64Buf := make([]byte, 8)
if _, err := io.ReadFull(r.Body, u64Buf); err != nil {
log.Printf("reading upload size failed: %s", err)
w.WriteHeader(http.StatusBadRequest)
return
}

// Read and discard the remaining bytes in the request
io.Copy(io.Discard, r.Body)

// Set content type and length headers
w.Header().Set("Content-Type", "application/octet-stream")
w.Header().Set("Content-Length", strconv.FormatUint(bytesToSend, 10))

// Write the status code explicitly
w.WriteHeader(http.StatusOK)
bytesToSend := binary.BigEndian.Uint64(u64Buf)

buf := make([]byte, BlockSize)
if _, err := drainStream(r.Body); err != nil {
log.Printf("draining stream failed: %s", err)
w.WriteHeader(http.StatusBadRequest)
return
}

for bytesToSend > 0 {
toSend := buf
if bytesToSend < BlockSize {
toSend = buf[:bytesToSend]
}
r.Header.Set("Content-Type", "application/octet-stream")
r.Header.Set("Content-Length", strconv.FormatUint(bytesToSend, 10))

n, err := w.Write(toSend)
if err != nil {
http.Error(w, "Failed write", http.StatusInternalServerError)
return
}
bytesToSend -= uint64(n)
if err := sendBytes(w, bytesToSend); err != nil {
log.Printf("sending response failed: %s", err)
return
}
}

var zeroSlice = make([]byte, BlockSize) // Pre-made zero-filled slice

type customReader struct {
downloadBytes uint64
uploadBytes uint64
position uint64
type nullReader struct {
N uint64
read uint64
}

func (c *customReader) Read(p []byte) (int, error) {
if c.position < 8 {
binary.BigEndian.PutUint64(p, c.downloadBytes)
c.position += 8
return 8, nil
} else if c.position-8 < c.uploadBytes {
bytesToWrite := min(min(len(p), int(c.uploadBytes-(c.position-8))), len(zeroSlice))
copy(p[:bytesToWrite], zeroSlice[:bytesToWrite]) // zero the slice
c.position += uint64(bytesToWrite)
return bytesToWrite, nil
}

return 0, io.EOF
}
var _ io.Reader = &nullReader{}

// Helper function to get minimum of two integers
func min(a, b int) int {
if a < b {
return a
func (r *nullReader) Read(b []byte) (int, error) {
remaining := r.N - r.read
l := uint64(len(b))
if uint64(len(b)) > remaining {
l = remaining
}
return b
r.read += l
if r.read == r.N {
return int(l), io.EOF
}
return int(l), nil
}
Comment on lines +59 to 70
Copy link
Member

@mxinden mxinden Jun 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am assuming that it is safe in Golang to return "uninitialized" bytes, i.e. the original bytes in b to a remote machine. E.g. there is no risk of b containing deallocated secret key material.

Just for others, returning non-zero bytes is just fine according to the perf specification:

The bytes themselves should be a predetermined arbitrary set of bytes. Zero is fine, but so is random bytes (as long as it's not a different set of random bytes, because then you may be limited by how fast you can generate random bytes).

https://github.com/libp2p/specs/blob/master/perf/perf.md


func runClient(serverAddr string, uploadBytes, downloadBytes uint64) (time.Duration, time.Duration, error) {
client := &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
},
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
},
}

reqBody := &customReader{downloadBytes: downloadBytes, uploadBytes: uploadBytes}

req, err := http.NewRequest("POST", fmt.Sprintf("https://%s/", serverAddr), reqBody)
b := make([]byte, 8)
binary.BigEndian.PutUint64(b, uploadBytes)

req, err := http.NewRequest(
http.MethodPost,
fmt.Sprintf("https://%s/", serverAddr),
io.MultiReader(
bytes.NewReader(b),
&nullReader{N: uploadBytes},
),
)
if err != nil {
return 0, 0, err
}
Expand All @@ -121,11 +105,11 @@ func runClient(serverAddr string, uploadBytes, downloadBytes uint64) (time.Durat
uploadDoneTime := time.Now()
defer resp.Body.Close()

n, err := io.Copy(io.Discard, resp.Body)
n, err := drainStream(resp.Body)
if err != nil {
return 0, 0, fmt.Errorf("error reading response: %w", err)
}
if n != int64(downloadBytes) {
if n != downloadBytes {
return 0, 0, fmt.Errorf("expected %d bytes in response, but received %d", downloadBytes, n)
}

Expand Down Expand Up @@ -250,3 +234,30 @@ func main() {
fmt.Println(string(jsonB))
}
}

func sendBytes(s io.Writer, bytesToSend uint64) error {
buf := make([]byte, blockSize)

for bytesToSend > 0 {
toSend := buf
if bytesToSend < blockSize {
toSend = buf[:bytesToSend]
}

n, err := s.Write(toSend)
if err != nil {
return err
}
bytesToSend -= uint64(n)
}
return nil
}

func drainStream(s io.Reader) (uint64, error) {
var recvd int64
recvd, err := io.Copy(io.Discard, s)
if err != nil && err != io.EOF {
return uint64(recvd), err
}
return uint64(recvd), nil
}