Skip to content

Streaming PDP piece upload flow #51

@rvagg

Description

@rvagg

After chatting with @ZenGround0; I thought I'd try something new and gather my thoughts with Claude along with its understanding of both this code and Curio. So here are some notes about proposed changes to Curio to integrate a streaming piece upload flow on top of the existing CommP-first flow:


PDP Streaming Upload Flow Analysis

Problem Statement

The current PDP upload flow requires clients to calculate CommP (Piece Commitment) before initiating the upload. This creates significant challenges for JavaScript/browser environments:

Memory Constraints

  • Large file buffering: Clients must load entire files (up to 256 MiB, potentially much larger in future) into memory before calculating CommP
  • Browser limitations: Mobile devices and resource-constrained environments struggle with large buffer allocations
  • Memory pressure: Multiple concurrent uploads can exhaust available memory

Performance Issues

  • Sequential processing: Current flow requires CommP calculation → network upload (two separate phases)
  • JavaScript CommP overhead: CommP calculation in JavaScript is computationally expensive
  • Blocking operations: Large files block the UI thread during CommP calculation
  • Network inefficiency: Cannot start upload until CommP completes

User Experience Problems

  • Upload delays: Users must wait for full CommP calculation before seeing progress
  • Progress reporting: No progress indication during CommP phase
  • Memory crashes: Large files can crash browser tabs or mobile apps

Current Flow Limitations

❌ Client must buffer entire file (0-256 MiB)
❌ Calculate CommP completely before upload starts
❌ No progress feedback during CommP calculation
❌ Memory usage spikes to file_size + buffers
❌ Sequential: CommP_time + Upload_time = Total_time

Proposed Streaming Benefits

✅ Stream data directly without full buffering
✅ Parallel CommP calculation during upload
✅ Constant memory usage regardless of file size
✅ Progressive upload feedback
✅ Parallel: max(CommP_time, Upload_time) = Total_time

The streaming approach enables memory-efficient uploads with parallel processing, making large file uploads feasible in resource-constrained JavaScript environments while providing better user experience through immediate upload progress.

Current Flow Recap

1. POST /pdp/piece → {check: {name, hash, size}, notify?}
   ├─ Known CommP? → 200 + pieceCID  
   └─ Unknown CommP? → 201 + Location: /piece/upload/{uuid}
   
2. PUT /piece/upload/{uuid} → binary data
   ├─ Validate against expected CommP/size
   ├─ Calculate actual CommP during write
   ├─ Compare & commit to parked_pieces
   └─ 204 No Content

Proposed Streaming Flow

1. PUT /piece/upload → binary data (no UUID, no prior CommP)
   ├─ Stream data while calculating CommP
   ├─ Store in "pending" state with generated UUID
   └─ 201 + Location: /piece/commit/{uuid}
   
2. POST /piece/commit/{uuid} → {commp, authBlob}
   ├─ Verify client CommP == server CommP  
   ├─ Store authBlob for smart contract
   ├─ Transition to "committed" state
   └─ 200 + final pieceCID

Technical Design

Database Schema Changes

-- Add state tracking to pdp_piece_uploads
ALTER TABLE pdp_piece_uploads ADD COLUMN state VARCHAR(20) DEFAULT 'pending';
-- 'pending' | 'committed' | 'abandoned'

ALTER TABLE pdp_piece_uploads ADD COLUMN server_commp TEXT;
ALTER TABLE pdp_piece_uploads ADD COLUMN auth_blob BYTEA;
ALTER TABLE pdp_piece_uploads ADD COLUMN created_at TIMESTAMP DEFAULT NOW();

Code Reuse Strategy

Shared Components:

  • CommP calculation (commp.Calc)
  • Stash storage operations
  • Database transaction patterns
  • Hash validation logic

Method Modifications:

  1. handlePieceUpload becomes dual-purpose:

    func (p *PDPService) handlePieceUpload(w http.ResponseWriter, r *http.Request) {
        uploadUUIDStr := chi.URLParam(r, "uploadUUID")
        
        if uploadUUIDStr == "" || uploadUUIDStr == "stream" {
            // NEW: Streaming flow - no UUID provided
            p.handleStreamingUpload(w, r)
            return
        }
        
        // EXISTING: Traditional flow with known UUID
        p.handleKnownUpload(w, r, uploadUUIDStr)
    }
  2. New handleStreamingUpload:

    func (p *PDPService) handleStreamingUpload(w http.ResponseWriter, r *http.Request) {
        // Generate new UUID
        uploadUUID := uuid.New()
        
        // Stream data with size limit (256 MiB)
        maxSize := 256 * 1024 * 1024
        cp := &commp.Calc{}
        hasher := sha256.New()
        
        writeFunc := func(f *os.File) error {
            limitedReader := io.LimitReader(r.Body, maxSize+1)
            multiWriter := io.MultiWriter(cp, hasher, f)
            
            n, err := io.Copy(multiWriter, limitedReader)
            if n > maxSize {
                return fmt.Errorf("piece exceeds 256MiB limit")
            }
            return err
        }
        
        // Store in stash
        stashID, err := p.storage.StashCreate(ctx, maxSize, writeFunc)
        
        // Calculate CommP
        digest, paddedSize, err := cp.Digest()
        pieceCID, err := commcid.DataCommitmentV1ToCID(digest)
        
        // Store in pending state
        _, err = p.db.Exec(`
            INSERT INTO pdp_piece_uploads 
            (id, state, server_commp, piece_cid, check_size, check_hash, check_hash_codec, created_at)
            VALUES ($1, 'pending', $2, $3, $4, $5, $6, NOW())
        `, uploadUUID.String(), pieceCID.String(), readSize, hasher.Sum(nil), "sha2-256")
        
        // Return commit URL
        commitURL := path.Join(PDPRoutePath, "/piece/commit", uploadUUID.String())
        w.Header().Set("Location", commitURL)
        w.WriteHeader(http.StatusCreated)
    }
  3. New handlePieceCommit:

    func (p *PDPService) handlePieceCommit(w http.ResponseWriter, r *http.Request) {
        uploadUUID := chi.URLParam(r, "uploadUUID")
        
        var req struct {
            CommP    string `json:"commp"`
            AuthBlob string `json:"authBlob"` // base64 encoded
        }
        json.NewDecoder(r.Body).Decode(&req)
        
        // Verify client CommP matches server CommP
        var serverCommP string
        var state string
        err := p.db.QueryRow(`
            SELECT server_commp, state FROM pdp_piece_uploads WHERE id = $1
        `, uploadUUID).Scan(&serverCommP, &state)
        
        if state != "pending" {
            http.Error(w, "Upload not in pending state", http.StatusConflict)
            return
        }
        
        if req.CommP != serverCommP {
            http.Error(w, "CommP mismatch", http.StatusBadRequest)
            return
        }
        
        // Decode and store auth blob
        authBlob, err := base64.StdEncoding.DecodeString(req.AuthBlob)
        
        // Transition to committed state and proceed with existing logic
        // (create parked_pieces, parked_piece_refs, etc.)
        
        // Update state
        _, err = p.db.Exec(`
            UPDATE pdp_piece_uploads 
            SET state = 'committed', auth_blob = $1 
            WHERE id = $2
        `, authBlob, uploadUUID)
        
        w.Header().Set("Content-Type", "application/json")
        json.NewEncoder(w).Encode(map[string]string{
            "pieceCID": serverCommP,
            "status": "committed"
        })
    }

Routing Changes

r.Route("/piece", func(r chi.Router) {
    r.Post("/", p.handlePiecePost)                    // Existing
    r.Put("/upload", p.handlePieceUpload)             // NEW: streaming
    r.Put("/upload/{uploadUUID}", p.handlePieceUpload) // Existing
    r.Post("/commit/{uploadUUID}", p.handlePieceCommit) // NEW: commit
})

SDK Integration

The SDK would need new methods:

// New streaming upload method
async uploadStreaming(data: Uint8Array | ReadableStream): Promise<{
  commitUrl: string
  uploadId: string
}> {
  const response = await fetch(`${this.apiEndpoint}/pdp/piece/upload`, {
    method: 'PUT',
    body: data
  })
  const location = response.headers.get('Location')
  return {
    commitUrl: location,
    uploadId: extractUUID(location)
  }
}

// Commit the upload with client-calculated CommP
async commitUpload(uploadId: string, clientCommP: CommP, authBlob: string): Promise<void> {
  await fetch(`${this.apiEndpoint}/pdp/piece/commit/${uploadId}`, {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify({
      commp: clientCommP.toString(),
      authBlob
    })
  })
}

Garbage Collection Strategy

Problem: Abandoned uploads in "pending" state consume storage.

Solution: Background cleanup job:

func (p *PDPService) cleanupAbandonedUploads() {
    // Clean uploads older than 24 hours in pending state
    rows, err := p.db.Query(`
        SELECT id, piece_ref FROM pdp_piece_uploads 
        WHERE state = 'pending' AND created_at < NOW() - INTERVAL '24 hours'
    `)
    
    for rows.Next() {
        var uploadID string
        var pieceRef sql.NullInt64
        rows.Scan(&uploadID, &pieceRef)
        
        // Remove from stash if exists
        if pieceRef.Valid {
            // Get stash ID from piece_ref and clean up
        }
        
        // Mark as abandoned
        p.db.Exec(`UPDATE pdp_piece_uploads SET state = 'abandoned' WHERE id = $1`, uploadID)
    }
}

Key Benefits

  1. Memory efficiency: No need to buffer entire piece before upload
  2. Progressive validation: Both client and server calculate CommP
  3. Flexible auth: Auth blob can include CommP for smart contract verification
  4. Backwards compatibility: Existing flow unchanged

Potential Issues

  1. DoS risk: Unlimited streaming without auth check
  2. Storage pressure: Pending pieces consume space before commitment
  3. Complexity: Two-phase upload increases failure modes
  4. Race conditions: Client must commit before GC cleanup

Recommendation

This design is practical and achievable with moderate code reuse. The dual-purpose handlers and shared CommP calculation logic minimize duplication. The main complexity is state management and GC, but both are solvable with proper database design and background jobs.

Implementation Priority

  1. Phase 1: Database schema changes and basic streaming upload
  2. Phase 2: Commit endpoint and state management
  3. Phase 3: SDK integration and streaming CommP utilities
  4. Phase 4: Garbage collection and monitoring

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

Status

⌨️ In Progress

Relationships

None yet

Development

No branches or pull requests

Issue actions