Skip to content

Commit

Permalink
feat: use 🛻 pickup from api (#2310)
Browse files Browse the repository at this point in the history
pickup pulls dags into E-IPFS, so this PR updates the api to reflect
that.

- Update the pinning service add and update api routes to record
`ElasticIpfs` as the service that the pin is queued on.
- Update `GET /pins/:req-id` to check for a status change from pickup
and update out DB, so we can inform users early when a pin request is
pinned or failed.

Cron changes are in #2339 

Fixes #2309 

License: MIT
Signed-off-by: Oli Evans <[email protected]>
  • Loading branch information
olizilla authored Mar 22, 2023
1 parent 8ec3568 commit e327bb3
Show file tree
Hide file tree
Showing 15 changed files with 91 additions and 62 deletions.
7 changes: 3 additions & 4 deletions .env.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,9 @@ DATABASE_TOKEN=eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpc3MiOiJzdXBhYmFzZSIsImlh
# Postgres Database
DATABASE_CONNECTION=postgresql://postgres:postgres@localhost:5432/postgres

# Cluster
CLUSTER_BASIC_AUTH_TOKEN = dGVzdDp0ZXN0
CLUSTER_SERVICE =
CLUSTER_API_URL = http://127.0.0.1:9094
# Pickup (can be mocked with ipfs-cluster for local dev)
PICKUP_BASIC_AUTH_TOKEN = dGVzdDp0ZXN0
PICKUP_API_URL = http://127.0.0.1:9094

# Maintenance Mode
MAINTENANCE_MODE = rw
Expand Down
12 changes: 10 additions & 2 deletions packages/api/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,7 @@ wrangler secret put MAGIC_SECRET_KEY --env production # Get from magic.link acco
wrangler secret put SALT --env production # open `https://csprng.xyz/v1/api` in the browser and use the value of `Data`
wrangler secret put SENTRY_DSN --env USER # Get from Sentry
wrangler secret put DATABASE_TOKEN --env production # Get from database account
wrangler secret put CLUSTER_BASIC_AUTH_TOKEN --env production # Get from nft.storage vault in 1password
wrangler secret put CLUSTER_SERVICE --env production # Which cluster should be used. Options 'IpfsCluster' / 'IpfsCluster2' / 'IpfsCluster3'
wrangler secret put PICKUP_BASIC_AUTH_TOKEN --env production # Get from nft.storage vault in 1password
wrangler secret put MAILCHIMP_API_KEY --env production # Get from mailchimp
wrangler secret put LOGTAIL_TOKEN --env production # Get from Logtail
wrangler secret put METAPLEX_AUTH_TOKEN --env production # User ID meteplex endpoint should use (not required for dev)
Expand Down Expand Up @@ -128,3 +127,12 @@ see: https://github.com/web3-storage/linkdex-api
We write Uploaded CARs to both S3 and R2 in parallel. The R2 Bucket is bound to the worker as `env.CARPARK`. The API docs for an R2Bucket instance are here: https://developers.cloudflare.com/r2/runtime-apis/#bucket-method-definitions

We key our R2 uploads by CAR CID, and record them in the DB under `upload.backup_urls`. The URL prefix for CARs in R2 is set by the `env.CARPARK_URL`. This is currently pointing to a subdomain on web3.storage which we could configure when we need direct http access to the bucket, but does not exist at time of writing.

## Pickup

We use [pickup](https://github.com/web3-storage/pickup) to fetch DAGs from IPFS and save them to a bucket where E-IPFS can index them. It provides a subset of the ipfs-cluster api for `GET /pins` and `POST /pins` that we use as the backend for the [pinning service](https://ipfs.github.io/pinning-services-api-spec/) implementation.

- `PICKUP_URL` defines the service enpoint to use, and is set in the wrangler.toml.
- `PICKUP_BASIC_AUTH_TOKEN` must be set as a secret in the env.

For local dev, we use a local ipfs-cluster container for the same service.
4 changes: 2 additions & 2 deletions packages/api/docker/run-with-dependencies.sh
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,15 @@ export DATABASE_CONNECTION="postgres://postgres:postgres@$DB_HOST_PORT/postgres"

# The vars below are used to configure the service
export DATABASE_URL="http://$POSTGREST_HOST_PORT"
export CLUSTER_API_URL="http://$CLUSTER_HOST_PORT"
export PICKUP_URL="http://$CLUSTER_HOST_PORT"
export S3_ENDPOINT="http://$MINIO_HOST_PORT"

echo "services started."
echo "environment overrides:"
echo "MINIO_API_PORT=${MINIO_API_PORT}"
echo "DATABASE_CONNECTION=${DATABASE_CONNECTION}"
echo "DATABASE_URL=${DATABASE_URL}"
echo "CLUSTER_API_URL=${CLUSTER_API_URL}"
echo "PICKUP_URL=${PICKUP_URL}"
echo "S3_ENDPOINT=${S3_ENDPOINT}"
echo

Expand Down
8 changes: 4 additions & 4 deletions packages/api/src/bindings.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@ export interface ServiceConfiguration {
/** UCAN private signing key */
PRIVATE_KEY: string

/** API url for active IPFS cluster endpoint */
CLUSTER_API_URL: string
/** API url for pickup endpoint */
PICKUP_URL: string

/** Auth token for IPFS culster */
CLUSTER_BASIC_AUTH_TOKEN: string
/** Auth token for pickup pinning service */
PICKUP_BASIC_AUTH_TOKEN: string

/** Postgrest endpoint URL */
DATABASE_URL: string
Expand Down
27 changes: 24 additions & 3 deletions packages/api/src/cluster.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ import { Cluster } from '@nftstorage/ipfs-cluster'
import { getServiceConfig } from './config.js'
import { HTTPError } from './errors.js'

const { CLUSTER_API_URL, CLUSTER_BASIC_AUTH_TOKEN } = getServiceConfig()
// pickup provides a cluster compatible api for get /pins & post /pins
const { PICKUP_URL, PICKUP_BASIC_AUTH_TOKEN } = getServiceConfig()

const client = new Cluster(CLUSTER_API_URL, {
const client = new Cluster(PICKUP_URL, {
headers: {
Authorization: `Basic ${CLUSTER_BASIC_AUTH_TOKEN}`,
Authorization: `Basic ${PICKUP_BASIC_AUTH_TOKEN}`,
},
})

Expand Down Expand Up @@ -113,3 +114,23 @@ export function toPSAStatus(status) {
if (pinInfos.some((i) => i.status === 'pin_queued')) return 'queued'
return 'failed'
}

/**
* @param {import('@nftstorage/ipfs-cluster').API.StatusResponse} status
* @returns {import('./utils/db-client.js').definitions["pin"]["status"]} status
*/
export function toDBPinStatus(status) {
const pinInfos = Object.values(status.peerMap)
if (pinInfos.some((i) => i.status === 'pinned')) return 'Pinned'
if (pinInfos.some((i) => i.status === 'pinning')) return 'Pinning'
if (pinInfos.some((i) => i.status === 'pin_queued')) return 'PinQueued'
return 'PinError'
}

/**
* @param {string} cid
* @param {import("@nftstorage/ipfs-cluster").API.StatusOptions} [options]
*/
export function status(cid, options) {
return client.status(cid, options)
}
35 changes: 4 additions & 31 deletions packages/api/src/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,6 @@ import {
* @typedef {import('./bindings').RuntimeEnvironmentName} RuntimeEnvironmentName
*/

/**
* If the CLUSTER_SERVICE variable is set, the service URL will be resolved from here.
*
* @type Record<string, string> */
const CLUSTER_SERVICE_URLS = {
IpfsCluster: 'https://nft.storage.ipfscluster.io/api/',
IpfsCluster2: 'https://nft2.storage.ipfscluster.io/api/',
IpfsCluster3: 'https://nft3.storage.ipfscluster.io/api/',
}

/**
* Load a {@link ServiceConfiguration} from the global environment.
* @returns {ServiceConfiguration}
Expand All @@ -37,22 +27,6 @@ export const getServiceConfig = () => {
* @returns {ServiceConfiguration}
*/
export function serviceConfigFromVariables(vars) {
let clusterUrl
if (vars.CLUSTER_SERVICE) {
clusterUrl = CLUSTER_SERVICE_URLS[vars.CLUSTER_SERVICE]
if (!clusterUrl) {
throw new Error(`unknown cluster service: ${vars.CLUSTER_SERVICE}`)
}
}
if (vars.CLUSTER_API_URL) {
clusterUrl = vars.CLUSTER_API_URL
}
if (!clusterUrl || (vars.CLUSTER_SERVICE && vars.CLUSTER_API_URL)) {
throw new Error(
`One of CLUSTER_SERVICE or CLUSTER_API_URL must be set in ENV`
)
}

return {
ENV: parseRuntimeEnv(vars.ENV),
DEBUG: boolValue(vars.DEBUG),
Expand All @@ -65,8 +39,8 @@ export function serviceConfigFromVariables(vars) {
CARPARK_URL: vars.CARPARK_URL,
DATABASE_URL: vars.DATABASE_URL,
DATABASE_TOKEN: vars.DATABASE_TOKEN,
CLUSTER_API_URL: clusterUrl,
CLUSTER_BASIC_AUTH_TOKEN: vars.CLUSTER_BASIC_AUTH_TOKEN,
PICKUP_URL: vars.PICKUP_URL,
PICKUP_BASIC_AUTH_TOKEN: vars.PICKUP_BASIC_AUTH_TOKEN,
MAGIC_SECRET_KEY: vars.MAGIC_SECRET_KEY,
SENTRY_DSN: vars.SENTRY_DSN,
METAPLEX_AUTH_TOKEN: vars.METAPLEX_AUTH_TOKEN,
Expand Down Expand Up @@ -114,6 +88,8 @@ export function loadConfigVariables() {
'DUDEWHERE',
'CARPARK',
'CARPARK_URL',
'PICKUP_URL',
'PICKUP_BASIC_AUTH_TOKEN',
'DATABASE_URL',
'DATABASE_TOKEN',
'MAGIC_SECRET_KEY',
Expand All @@ -122,7 +98,6 @@ export function loadConfigVariables() {
'LOGTAIL_TOKEN',
'PRIVATE_KEY',
'SENTRY_DSN',
'CLUSTER_BASIC_AUTH_TOKEN',
'MAINTENANCE_MODE',
'S3_REGION',
'S3_ACCESS_KEY_ID',
Expand All @@ -145,8 +120,6 @@ export function loadConfigVariables() {
}

const optional = [
'CLUSTER_SERVICE',
'CLUSTER_API_URL',
'LINKDEX_URL',
'S3_ENDPOINT',
'SLACK_USER_REQUEST_WEBHOOK_URL',
Expand Down
6 changes: 6 additions & 0 deletions packages/api/src/routes/pins-add.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ export async function pinsAdd(event, ctx) {
})

const upload = await db.createUpload({
pins: [
{
status: 'PinQueued',
service: 'ElasticIpfs', // via pickup
},
],
type: 'Remote',
content_cid: cid.contentCid,
source_cid: cid.sourceCid,
Expand Down
19 changes: 17 additions & 2 deletions packages/api/src/routes/pins-get.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { checkAuth, validate } from '../utils/auth.js'
import * as cluster from '../cluster.js'
import { checkAuth } from '../utils/auth.js'
import { toPinsResponse } from '../utils/db-transforms.js'
import { JSONResponse } from '../utils/json-response.js'
import { parseCidPinning } from '../utils/utils.js'
Expand All @@ -21,7 +22,7 @@ export async function pinsGet(event, ctx) {
)
}

const upload = await db.getUpload(cid.sourceCid, user.id)
let upload = await db.getUpload(cid.sourceCid, user.id)

if (!upload) {
return new JSONResponse(
Expand All @@ -30,5 +31,19 @@ export async function pinsGet(event, ctx) {
)
}

// check if the status has changed upstream
const status = upload.content.pin[0].status
if (status === 'Pinning' || status === 'PinQueued') {
const res = await cluster.status(cid.sourceCid)
const newStatus = cluster.toDBPinStatus(res)
if (status !== newStatus) {
await ctx.db.updatePinStatus(upload.content_cid, {
service: 'ElasticIpfs',
status: newStatus,
})
upload = (await db.getUpload(cid.sourceCid, user.id)) ?? upload
}
}

return new JSONResponse(toPinsResponse(upload))
}
6 changes: 6 additions & 0 deletions packages/api/src/routes/pins-replace.js
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,12 @@ export async function pinsReplace(event, ctx) {
})

const upload = await db.createUpload({
pins: [
{
status: 'PinQueued',
service: 'ElasticIpfs', // via pickup
},
],
type: 'Remote',
content_cid: cid.contentCid,
source_cid: cid.sourceCid,
Expand Down
4 changes: 2 additions & 2 deletions packages/api/src/utils/router.js
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,8 @@ class Router {
listen(event) {
const url = new URL(event.request.url)
// Add more if needed for other backends
const { DATABASE_URL, CLUSTER_API_URL } = getServiceConfig()
const passThrough = [DATABASE_URL, CLUSTER_API_URL]
const { DATABASE_URL, PICKUP_URL } = getServiceConfig()
const passThrough = [DATABASE_URL, PICKUP_URL]

// Ignore http requests from the passthrough list above
if (!passThrough.includes(`${url.protocol}//${url.host}`)) {
Expand Down
9 changes: 4 additions & 5 deletions packages/api/test/config.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,13 @@ const BASE_CONFIG = {
DATABASE_TOKEN:
'eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpc3MiOiJzdXBhYmFzZSIsImlhdCI6MTYwMzk2ODgzNCwiZXhwIjoyNTUwNjUzNjM0LCJyb2xlIjoic2VydmljZV9yb2xlIn0.necIJaiP7X2T2QjGeV-FhpkizcNTX8HjDDBAxpgQTEI',
DATABASE_CONNECTION: 'postgresql://postgres:postgres@localhost:5432/postgres',
CLUSTER_BASIC_AUTH_TOKEN: 'dGVzdDp0ZXN0',
MAINTENANCE_MODE: 'rw',
S3_REGION: 'us-east-1',
S3_ACCESS_KEY_ID: 'minioadmin',
S3_SECRET_ACCESS_KEY: 'minioadmin',
S3_BUCKET_NAME: 'dotstorage-dev-0',
CLUSTER_SERVICE: '',
CLUSTER_API_URL: 'http://127.0.0.1:9094',
PICKUP_URL: 'http://127.0.0.1:9094',
PICKUP_BASIC_AUTH_TOKEN: 'dGVzdDp0ZXN0',
S3_ENDPOINT: 'http://127.0.0.1:9000',
SLACK_USER_REQUEST_WEBHOOK_URL: '',
SATNAV: '?',
Expand Down Expand Up @@ -167,8 +166,8 @@ test.serial(
'SALT',
'METAPLEX_AUTH_TOKEN',
'PRIVATE_KEY',
'CLUSTER_API_URL',
'CLUSTER_BASIC_AUTH_TOKEN',
'PICKUP_URL',
'PICKUP_BASIC_AUTH_TOKEN',
'DATABASE_URL',
'DATABASE_TOKEN',
'S3_ENDPOINT',
Expand Down
5 changes: 2 additions & 3 deletions packages/api/test/scripts/globals.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,9 @@ globalThis.PRIVATE_KEY = 'xmbtWjE9eYuAxae9G65lQSkw36HV6H+0LSFq2aKqVwY='
globalThis.SENTRY_DSN = 'https://[email protected]/0000000'
globalThis.SLACK_USER_REQUEST_WEBHOOK_URL = 'test'

globalThis.CLUSTER_API_URL = 'http://127.0.0.1:9094'
globalThis.PICKUP_API_URL = 'http://127.0.0.1:9094'
// will be used with we can active auth in cluster base64 of test:test
globalThis.CLUSTER_BASIC_AUTH_TOKEN = 'dGVzdDp0ZXN0'
globalThis.CLUSTER_SERVICE = ''
globalThis.PICKUP_BASIC_AUTH_TOKEN = 'dGVzdDp0ZXN0'

globalThis.MAINTENANCE_MODE = 'rw'

Expand Down
4 changes: 2 additions & 2 deletions packages/api/test/scripts/helpers.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import { getMiniflareContext, getTestServiceConfig } from './test-context.js'
* @returns {Cluster}
*/
export const getCluster = (config) => {
return new Cluster(config.CLUSTER_API_URL, {
headers: { Authorization: `Basic ${config.CLUSTER_BASIC_AUTH_TOKEN}` },
return new Cluster(config.PICKUP_URL, {
headers: { Authorization: `Basic ${config.PICKUP_BASIC_AUTH_TOKEN}` },
})
}

Expand Down
4 changes: 2 additions & 2 deletions packages/api/test/scripts/test-context.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ const pkg = JSON.parse(
export function makeMiniflare(bindings, fetchMock) {
const envPath = path.join(__dirname, '../../../../.env')

const { DATABASE_URL, CLUSTER_API_URL, S3_ENDPOINT } = process.env
const { DATABASE_URL, PICKUP_URL, S3_ENDPOINT } = process.env

return new Miniflare({
// Autoload configuration from `.env`, `package.json` and `wrangler.toml`
Expand All @@ -36,7 +36,7 @@ export function makeMiniflare(bindings, fetchMock) {
bindings: {
...bindings,
DATABASE_URL,
CLUSTER_API_URL,
PICKUP_URL,
S3_ENDPOINT,
},
fetchMock,
Expand Down
3 changes: 3 additions & 0 deletions packages/api/wrangler.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ ENV = "dev"
DEBUG = "true"
DATABASE_URL = "http://localhost:3000"
CARPARK_URL = "https://carpark-dev.web3.storage"
PICKUP_URL = "https://staging.pickup.dag.haus"

[build]
command = "scripts/cli.js build"
Expand All @@ -44,6 +45,7 @@ ENV = "staging"
DEBUG = "true"
DATABASE_URL = "https://nft-storage-pgrest-staging.herokuapp.com"
CARPARK_URL = "https://carpark-staging.web3.storage"
PICKUP_URL = "https://staging.pickup.dag.haus"

[env.staging.build]
command = "scripts/cli.js build --env staging"
Expand All @@ -69,6 +71,7 @@ ENV = "production"
DEBUG = "false"
DATABASE_URL = "https://nft-storage-pgrest-prod.herokuapp.com"
CARPARK_URL = "https://carpark.web3.storage"
PICKUP_URL = "https://pickup.dag.haus"

[env.production.build]
command = "scripts/cli.js build --env production"
Expand Down

0 comments on commit e327bb3

Please sign in to comment.