Skip to content

[WIP] feat: market 2.0 #508

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,11 @@ go-generate:
gen: gensimple
.PHONY: gen

gensimple: api-gen go-generate cfgdoc-gen docsgen docsgen-cli
marketgen:
$(GOCC) run ./market/mk20/mk20gen -pkg ./market/mk20 -output ./market/mk20/info.md
.PHONY: marketgen

gensimple: api-gen go-generate cfgdoc-gen docsgen marketgen docsgen-cli
$(GOCC) run ./scripts/fiximports
go mod tidy
.PHONY: gen
Expand Down
13 changes: 7 additions & 6 deletions cmd/curio/tasks/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps, shutdownChan chan
machine := dependencies.ListenAddr
prover := dependencies.Prover
iStore := dependencies.IndexStore
pp := dependencies.SectorReader

var activeTasks []harmonytask.TaskInterface

Expand Down Expand Up @@ -244,16 +243,19 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps, shutdownChan chan
}

{
var sdeps cuhttp.ServiceDeps
// Market tasks
var dm *storage_market.CurioStorageDealMarket
if cfg.Subsystems.EnableDealMarket {
// Main market poller should run on all nodes
dm = storage_market.NewCurioStorageDealMarket(miners, db, cfg, si, full, as)
dm = storage_market.NewCurioStorageDealMarket(miners, db, cfg, must.One(dependencies.EthClient.Val()), si, full, as, lstor)
err := dm.StartMarket(ctx)
if err != nil {
return nil, err
}

sdeps.DealMarket = dm

if cfg.Subsystems.EnableCommP {
commpTask := storage_market.NewCommpTask(dm, db, must.One(slrLazy.Val()), full, cfg.Subsystems.CommPMaxTasks)
activeTasks = append(activeTasks, commpTask)
Expand All @@ -275,7 +277,6 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps, shutdownChan chan
if err != nil {
return nil, err
}
var sdeps cuhttp.ServiceDeps

if cfg.Subsystems.EnablePDP {
es := getSenderEth()
Expand All @@ -293,12 +294,12 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps, shutdownChan chan

idxMax := taskhelp.Max(cfg.Subsystems.IndexingMaxTasks)

indexingTask := indexing.NewIndexingTask(db, sc, iStore, pp, cfg, idxMax)
ipniTask := indexing.NewIPNITask(db, sc, iStore, pp, cfg, idxMax)
indexingTask := indexing.NewIndexingTask(db, sc, iStore, dependencies.CachedPieceReader, cfg, idxMax)
ipniTask := indexing.NewIPNITask(db, sc, iStore, dependencies.CachedPieceReader, cfg, idxMax)
activeTasks = append(activeTasks, ipniTask, indexingTask)

if cfg.HTTP.Enable {
err = cuhttp.StartHTTPServer(ctx, dependencies, &sdeps, dm)
err = cuhttp.StartHTTPServer(ctx, dependencies, &sdeps)
if err != nil {
return nil, xerrors.Errorf("failed to start the HTTP server: %w", err)
}
Expand Down
1 change: 1 addition & 0 deletions cmd/sptool/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,5 +101,6 @@ var toolboxCmd = &cli.Command{
Subcommands: []*cli.Command{
sparkCmd,
mk12Clientcmd,
mk20Clientcmd,
},
}
252 changes: 252 additions & 0 deletions cmd/sptool/toolbox_deal_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/filecoin-project/curio/lib/keystore"
mk12_libp2p "github.com/filecoin-project/curio/market/libp2p"
"github.com/filecoin-project/curio/market/mk12"
"github.com/filecoin-project/curio/market/mk20"

"github.com/filecoin-project/lotus/api"
chain_types "github.com/filecoin-project/lotus/chain/types"
Expand Down Expand Up @@ -1561,3 +1562,254 @@ var dealStatusCmd = &cli.Command{
return nil
},
}

var mk20Clientcmd = &cli.Command{
Name: "mk20-client",
Usage: "mk20 client for Curio",
Flags: []cli.Flag{
mk12_client_repo,
},
Subcommands: []*cli.Command{
initCmd,
mk20DealCmd,
},
}

var mk20DealCmd = &cli.Command{
Name: "deal",
Usage: "Make a mk20 deal with Curio",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "http-url",
Usage: "http url to CAR file",
Required: true,
},
&cli.StringSliceFlag{
Name: "http-headers",
Usage: "http headers to be passed with the request (e.g key=value)",
},
&cli.Uint64Flag{
Name: "car-size",
Usage: "size of the CAR file: required for online deals",
Required: true,
},
&cli.StringFlag{
Name: "provider",
Usage: "storage provider on-chain address",
Required: true,
},
&cli.StringFlag{
Name: "commp",
Usage: "commp of the CAR file",
Required: true,
},
&cli.Uint64Flag{
Name: "piece-size",
Usage: "size of the CAR file as a padded piece",
Required: true,
},
&cli.IntFlag{
Name: "duration",
Usage: "duration of the deal in epochs",
Value: 518400, // default is 2880 * 180 == 180 days
},
&cli.BoolFlag{
Name: "verified",
Usage: "whether the deal funds should come from verified client data-cap",
Value: false,
},
&cli.BoolFlag{
Name: "indexing",
Usage: "indicates that an deal should be indexed",
Value: true,
},
&cli.StringFlag{
Name: "wallet",
Usage: "wallet address to be used to initiate the deal",
},
&cli.BoolFlag{
Name: "announce",
Usage: "indicates that deal should be announced to the IPNI(Network Indexer)",
Value: true,
},
},
Action: func(cctx *cli.Context) error {
ctx := cctx.Context
n, err := Setup(cctx.String(mk12_client_repo.Name))
if err != nil {
return err
}

api, closer, err := lcli.GetGatewayAPIV1(cctx)
if err != nil {
return fmt.Errorf("cant setup gateway connection: %w", err)
}
defer closer()
if err != nil {
return xerrors.Errorf("cant setup gateway connection: %w", err)
}
defer closer()

walletAddr, err := n.GetProvidedOrDefaultWallet(ctx, cctx.String("wallet"))
if err != nil {
return err
}

log.Debugw("selected wallet", "wallet", walletAddr)

maddr, err := address.NewFromString(cctx.String("provider"))
if err != nil {
return err
}

minfo, err := api.StateMinerInfo(ctx, maddr, chain_types.EmptyTSK)
if err != nil {
return err
}
if minfo.PeerId == nil {
return xerrors.Errorf("storage provider %s has no peer ID set on-chain", maddr)
}

var maddrs []multiaddr.Multiaddr
for _, mma := range minfo.Multiaddrs {
ma, err := multiaddr.NewMultiaddrBytes(mma)
if err != nil {
return xerrors.Errorf("storage provider %s had invalid multiaddrs in their info: %w", maddr, err)
}
maddrs = append(maddrs, ma)
}
if len(maddrs) == 0 {
return xerrors.Errorf("storage provider %s has no multiaddrs set on-chain", maddr)
}

addrInfo := &peer.AddrInfo{
ID: *minfo.PeerId,
Addrs: maddrs,
}

log.Debugw("found storage provider", "id", addrInfo.ID, "multiaddrs", addrInfo.Addrs, "addr", maddr)

var hurls []*url.URL

for _, ma := range addrInfo.Addrs {
hurl, err := maurl.ToURL(ma)
if err != nil {
return xerrors.Errorf("failed to convert multiaddr %s to URL: %w", ma, err)
}
if hurl.Scheme == "ws" {
hurl.Scheme = "http"
}
if hurl.Scheme == "wss" {
hurl.Scheme = "https"
}
log.Debugw("converted multiaddr to URL", "url", hurl, "multiaddr", ma.String())
hurls = append(hurls, hurl)
}

commp := cctx.String("commp")
pieceCid, err := cid.Parse(commp)
if err != nil {
return xerrors.Errorf("parsing commp '%s': %w", commp, err)
}

pieceSize := cctx.Uint64("piece-size")
if pieceSize == 0 {
return xerrors.Errorf("must provide piece-size parameter for CAR url")
}

carFileSize := cctx.Uint64("car-size")
if carFileSize == 0 {
return xerrors.Errorf("size of car file cannot be 0")
}

url, err := url.Parse(cctx.String("http-url"))
if err != nil {
return xerrors.Errorf("parsing http url: %w", err)
}

var headers http.Header

for _, header := range cctx.StringSlice("http-headers") {
sp := strings.Split(header, "=")
if len(sp) != 2 {
return xerrors.Errorf("malformed http header: %s", header)
}
headers.Add(sp[0], sp[1])
}

d := mk20.DataSource{
PieceCID: pieceCid,
Size: abi.PaddedPieceSize(pieceSize),
Format: mk20.PieceDataFormat{
Car: &mk20.FormatCar{},
},
SourceHTTP: &mk20.DataSourceHTTP{
RawSize: carFileSize,
URLs: []mk20.HttpUrl{
{
URL: url.String(),
Headers: headers,
Priority: 0,
Fallback: true,
},
},
},
}

p := mk20.Products{
DDOV1: &mk20.DDOV1{
Provider: maddr,
Client: walletAddr,
PieceManager: walletAddr,
Duration: abi.ChainEpoch(cctx.Int64("duration")),
ContractAddress: cctx.String("contract-address"),
ContractVerifyMethod: cctx.String("contract-verify-method"),
ContractVerifyMethodParams: []byte("test bytes"),
Indexing: cctx.Bool("indexing"),
AnnounceToIPNI: cctx.Bool("announce"),
},
}

id, err := mk20.NewULID()
if err != nil {
return err
}
log.Debugw("generated deal id", "id", id)

deal := mk20.Deal{
Identifier: id,
Data: d,
Products: p,
}

log.Debugw("deal", "deal", deal)

body, err := json.Marshal(deal)
if err != nil {
return err
}

// Try to request all URLs one by one and exit after first success
for _, u := range hurls {
s := u.String() + "/market/mk20/store"
log.Debugw("trying to send request to", "url", u.String())
req, err := http.NewRequest("POST", s, bytes.NewReader(body))
if err != nil {
return xerrors.Errorf("failed to create request: %w", err)
}
req.Header.Set("Content-Type", "application/cbor")
resp, err := http.DefaultClient.Do(req)
if err != nil {
log.Warnw("failed to send request", "url", s, "error", err)
continue
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
log.Warnw("failed to send request", "url", s, "status", resp.StatusCode, "body", resp.Body)
continue
}
return nil
}
return xerrors.Errorf("failed to send request to any of the URLs")
},
}
22 changes: 12 additions & 10 deletions cuhttp/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
ipni_provider "github.com/filecoin-project/curio/market/ipni/ipni-provider"
"github.com/filecoin-project/curio/market/libp2p"
"github.com/filecoin-project/curio/market/retrieval"
"github.com/filecoin-project/curio/pdp"
"github.com/filecoin-project/curio/tasks/message"
storage_market "github.com/filecoin-project/curio/tasks/storage-market"
)
Expand Down Expand Up @@ -120,10 +119,11 @@ func isWebSocketUpgrade(r *http.Request) bool {
}

type ServiceDeps struct {
EthSender *message.SenderETH
EthSender *message.SenderETH
DealMarket *storage_market.CurioStorageDealMarket
}

func StartHTTPServer(ctx context.Context, d *deps.Deps, sd *ServiceDeps, dm *storage_market.CurioStorageDealMarket) error {
func StartHTTPServer(ctx context.Context, d *deps.Deps, sd *ServiceDeps) error {
cfg := d.Cfg.HTTP

// Setup the Chi router for more complex routing (if needed in the future)
Expand Down Expand Up @@ -165,7 +165,9 @@ func StartHTTPServer(ctx context.Context, d *deps.Deps, sd *ServiceDeps, dm *sto
fmt.Fprintf(w, "Service is up and running")
})

chiRouter, err = attachRouters(ctx, chiRouter, d, sd, dm)
// TODO: Attach a info page here with details about all the service and endpoints

chiRouter, err = attachRouters(ctx, chiRouter, d, sd)
if err != nil {
return xerrors.Errorf("failed to attach routers: %w", err)
}
Expand Down Expand Up @@ -259,7 +261,7 @@ func (c cache) Delete(ctx context.Context, key string) error {

var _ autocert.Cache = cache{}

func attachRouters(ctx context.Context, r *chi.Mux, d *deps.Deps, sd *ServiceDeps, dm *storage_market.CurioStorageDealMarket) (*chi.Mux, error) {
func attachRouters(ctx context.Context, r *chi.Mux, d *deps.Deps, sd *ServiceDeps) (*chi.Mux, error) {
// Attach retrievals
rp := retrieval.NewRetrievalProvider(ctx, d.DB, d.IndexStore, d.CachedPieceReader)
retrieval.Router(r, rp)
Expand All @@ -277,13 +279,13 @@ func attachRouters(ctx context.Context, r *chi.Mux, d *deps.Deps, sd *ServiceDep
rd := libp2p.NewRedirector(d.DB)
libp2p.Router(r, rd)

if sd.EthSender != nil {
pdsvc := pdp.NewPDPService(d.DB, d.LocalStore, must.One(d.EthClient.Get()), d.Chain, sd.EthSender)
pdp.Routes(r, pdsvc)
}
//if sd.EthSender != nil {
// pdsvc := pdp.NewPDPService(d.DB, d.LocalStore, must.One(d.EthClient.Get()), d.Chain, sd.EthSender)
// pdp.Routes(r, pdsvc)
//}

// Attach the market handler
dh, err := mhttp.NewMarketHandler(d.DB, d.Cfg, dm)
dh, err := mhttp.NewMarketHandler(d.DB, d.Cfg, sd.DealMarket, must.One(d.EthClient.Get()), d.Chain, sd.EthSender, d.LocalStore)
if err != nil {
return nil, xerrors.Errorf("failed to create new market handler: %w", err)
}
Expand Down
Loading