-
Notifications
You must be signed in to change notification settings - Fork 16
perf: Select Storage Provider by a ping race #390
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -23,15 +23,15 @@ | |||||||||||||||||
| */ | ||||||||||||||||||
|
|
||||||||||||||||||
| import * as SP from '@filoz/synapse-core/sp' | ||||||||||||||||||
| import { randIndex, randU256 } from '@filoz/synapse-core/utils' | ||||||||||||||||||
| import { randU256 } from '@filoz/synapse-core/utils' | ||||||||||||||||||
| import type { ethers } from 'ethers' | ||||||||||||||||||
| import type { Hex } from 'viem' | ||||||||||||||||||
| import type { PaymentsService } from '../payments/index.ts' | ||||||||||||||||||
| import { PDPAuthHelper, PDPServer } from '../pdp/index.ts' | ||||||||||||||||||
| import { PDPVerifier } from '../pdp/verifier.ts' | ||||||||||||||||||
| import { asPieceCID } from '../piece/index.ts' | ||||||||||||||||||
| import { SPRegistryService } from '../sp-registry/index.ts' | ||||||||||||||||||
| import type { ProviderInfo } from '../sp-registry/types.ts' | ||||||||||||||||||
| import type { ProviderInfo, ServiceProduct } from '../sp-registry/types.ts' | ||||||||||||||||||
| import type { Synapse } from '../synapse.ts' | ||||||||||||||||||
| import type { | ||||||||||||||||||
| CreateContextsOptions, | ||||||||||||||||||
|
|
@@ -605,7 +605,7 @@ export class StorageContext { | |||||||||||||||||
|
|
||||||||||||||||||
| const skipProviderIds = new Set<number>(excludeProviderIds) | ||||||||||||||||||
| // Filter for managed data sets with matching metadata | ||||||||||||||||||
| const managedDataSets = dataSets.filter( | ||||||||||||||||||
| const managedDataSets: EnhancedDataSetInfo[] = dataSets.filter( | ||||||||||||||||||
| (ps) => | ||||||||||||||||||
| ps.isLive && | ||||||||||||||||||
| ps.isManaged && | ||||||||||||||||||
|
|
@@ -615,48 +615,38 @@ export class StorageContext { | |||||||||||||||||
| ) | ||||||||||||||||||
|
|
||||||||||||||||||
| if (managedDataSets.length > 0 && !forceCreateDataSet) { | ||||||||||||||||||
| // Prefer data sets with pieces, sort by ID (older first) | ||||||||||||||||||
| const sorted = managedDataSets.sort((a, b) => { | ||||||||||||||||||
| if (a.currentPieceCount > 0 && b.currentPieceCount === 0) return -1 | ||||||||||||||||||
| if (b.currentPieceCount > 0 && a.currentPieceCount === 0) return 1 | ||||||||||||||||||
| return a.pdpVerifierDataSetId - b.pdpVerifierDataSetId | ||||||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
No, the current tie breaker is dataset ID |
||||||||||||||||||
| }) | ||||||||||||||||||
|
|
||||||||||||||||||
| // Create async generator that yields providers lazily | ||||||||||||||||||
| async function* generateProviders(): AsyncGenerator<ProviderInfo> { | ||||||||||||||||||
| // First, yield providers from existing data sets (in sorted order) | ||||||||||||||||||
| for (const dataSet of sorted) { | ||||||||||||||||||
| if (skipProviderIds.has(dataSet.providerId)) { | ||||||||||||||||||
| continue | ||||||||||||||||||
| } | ||||||||||||||||||
| skipProviderIds.add(dataSet.providerId) | ||||||||||||||||||
| const provider = await spRegistry.getProvider(dataSet.providerId) | ||||||||||||||||||
|
|
||||||||||||||||||
| if (provider == null) { | ||||||||||||||||||
| console.warn( | ||||||||||||||||||
| `Provider ID ${dataSet.providerId} for data set ${dataSet.pdpVerifierDataSetId} is not currently approved` | ||||||||||||||||||
| ) | ||||||||||||||||||
| continue | ||||||||||||||||||
| } | ||||||||||||||||||
| // Prefer data sets with pieces | ||||||||||||||||||
| const [hasNoPieces, hasPieces] = managedDataSets | ||||||||||||||||||
| .reduce<[Set<EnhancedDataSetInfo>, Set<EnhancedDataSetInfo>]>( | ||||||||||||||||||
| (results: [Set<EnhancedDataSetInfo>, Set<EnhancedDataSetInfo>], managedDataSet: EnhancedDataSetInfo) => { | ||||||||||||||||||
| results[managedDataSet.currentPieceCount > 0 ? 1 : 0].add(managedDataSet) | ||||||||||||||||||
| return results | ||||||||||||||||||
| }, | ||||||||||||||||||
| [new Set(), new Set()] | ||||||||||||||||||
| ) | ||||||||||||||||||
| .map((deduped) => [...deduped]) | ||||||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. does There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These are the client data sets, and while we don't expect there to be multiple data sets with the same provider, there could be many, and we want to dedupe because the subsequent code does assume the providerId are unique. It will also be important to dedupe when changing this to a method that returns multiple providers; otherwise it might pick the same provider multiple times. We currently dedupe these in the iterative code with the skipProviderIds. |
||||||||||||||||||
|
|
||||||||||||||||||
| if (withIpni && provider.products.PDP?.data.ipniIpfs === false) { | ||||||||||||||||||
| continue | ||||||||||||||||||
| } | ||||||||||||||||||
| for (const managedDataSets of [hasPieces, hasNoPieces]) { | ||||||||||||||||||
| const providers: ProviderInfo[] = ( | ||||||||||||||||||
| await Promise.all( | ||||||||||||||||||
| managedDataSets.map((dataSet: EnhancedDataSetInfo) => spRegistry.getProvider(dataSet.providerId)) | ||||||||||||||||||
|
Comment on lines
+629
to
+632
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
shadowing There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||||||||||||||||||
| ) | ||||||||||||||||||
| ).filter<ProviderInfo>( | ||||||||||||||||||
| (provider: ProviderInfo | null): provider is ProviderInfo => | ||||||||||||||||||
| provider !== null && | ||||||||||||||||||
| (!withIpni || provider.products.PDP?.data.ipniIpfs !== false) && | ||||||||||||||||||
| (dev || provider.products.PDP?.capabilities?.dev == null) | ||||||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||||||||||||||||||
| ) | ||||||||||||||||||
|
|
||||||||||||||||||
| if (!dev && provider.products.PDP?.capabilities?.dev != null) { | ||||||||||||||||||
| continue | ||||||||||||||||||
| } | ||||||||||||||||||
| const selectedProvider = await StorageContext.selectProviderWithPing(providers) | ||||||||||||||||||
|
|
||||||||||||||||||
| yield provider | ||||||||||||||||||
| if (selectedProvider == null) { | ||||||||||||||||||
| continue | ||||||||||||||||||
| } | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| try { | ||||||||||||||||||
| const selectedProvider = await StorageContext.selectProviderWithPing(generateProviders()) | ||||||||||||||||||
|
|
||||||||||||||||||
| // Find the first matching data set ID for this provider | ||||||||||||||||||
| // Match by provider ID (stable identifier in the registry) | ||||||||||||||||||
| const matchingDataSet = sorted.find((ps) => ps.providerId === selectedProvider.id) | ||||||||||||||||||
| const matchingDataSet = managedDataSets.find((ps: EnhancedDataSetInfo) => ps.providerId === selectedProvider.id) | ||||||||||||||||||
|
|
||||||||||||||||||
| if (matchingDataSet == null) { | ||||||||||||||||||
| console.warn( | ||||||||||||||||||
|
|
@@ -675,10 +665,8 @@ export class StorageContext { | |||||||||||||||||
| dataSetMetadata, | ||||||||||||||||||
| } | ||||||||||||||||||
| } | ||||||||||||||||||
| } catch (_error) { | ||||||||||||||||||
| console.warn('All providers from existing data sets failed health check. Falling back to all providers.') | ||||||||||||||||||
| // Fall through to select from all approved providers below | ||||||||||||||||||
| } | ||||||||||||||||||
| console.warn('All providers from existing data sets failed health check. Falling back to all providers.') | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| // No existing data sets - select from all approved providers. First we get approved IDs from | ||||||||||||||||||
|
|
@@ -696,8 +684,16 @@ export class StorageContext { | |||||||||||||||||
| throw createError('StorageContext', 'smartSelectProvider', NO_REMAINING_PROVIDERS_ERROR_MESSAGE) | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| // Random selection from all providers | ||||||||||||||||||
| const provider = await StorageContext.selectRandomProvider(allProviders) | ||||||||||||||||||
| // Select from all providers | ||||||||||||||||||
| const provider = await StorageContext.selectProviderWithPing(allProviders) | ||||||||||||||||||
|
|
||||||||||||||||||
| if (provider == null) { | ||||||||||||||||||
| throw createError( | ||||||||||||||||||
| 'StorageContext', | ||||||||||||||||||
| 'selectProviderWithPing', | ||||||||||||||||||
| `All ${allProviders.length} providers failed health check. Storage may be temporarily unavailable.` | ||||||||||||||||||
| ) | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| return { | ||||||||||||||||||
| provider, | ||||||||||||||||||
|
|
@@ -708,72 +704,41 @@ export class StorageContext { | |||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| /** | ||||||||||||||||||
| * Select a random provider from a list with ping validation | ||||||||||||||||||
| * @param providers - Array of providers to select from | ||||||||||||||||||
| * @param withIpni - Filter for IPNI support | ||||||||||||||||||
| * @param dev - Include dev providers | ||||||||||||||||||
| * @returns Selected provider | ||||||||||||||||||
| * Select a provider with ping validation. | ||||||||||||||||||
| * @param providers - providers to try | ||||||||||||||||||
| * @returns The first provider that responds, or null if none do | ||||||||||||||||||
| */ | ||||||||||||||||||
| private static async selectRandomProvider(providers: ProviderInfo[]): Promise<ProviderInfo> { | ||||||||||||||||||
| if (providers.length === 0) { | ||||||||||||||||||
| throw createError('StorageContext', 'selectRandomProvider', 'No providers available') | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| // Create async generator that yields providers in random order | ||||||||||||||||||
| async function* generateRandomProviders(): AsyncGenerator<ProviderInfo> { | ||||||||||||||||||
| const remaining = [...providers] | ||||||||||||||||||
|
|
||||||||||||||||||
| while (remaining.length > 0) { | ||||||||||||||||||
| // Remove and yield the selected provider | ||||||||||||||||||
| const selected = remaining.splice(randIndex(remaining.length), 1)[0] | ||||||||||||||||||
| yield selected | ||||||||||||||||||
| private static async selectProviderWithPing(providers: ProviderInfo[]): Promise<ProviderInfo | null> { | ||||||||||||||||||
| type ProviderWithPDP = ProviderInfo & { | ||||||||||||||||||
| products: { | ||||||||||||||||||
| PDP: ServiceProduct | ||||||||||||||||||
| } | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| return await StorageContext.selectProviderWithPing(generateRandomProviders()) | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| /** | ||||||||||||||||||
| * Select a provider from an async iterator with ping validation. | ||||||||||||||||||
| * This is shared logic used by both smart selection and random selection. | ||||||||||||||||||
| * @param providers - Async iterable of providers to try | ||||||||||||||||||
| * @returns The first provider that responds | ||||||||||||||||||
| * @throws If all providers fail | ||||||||||||||||||
| */ | ||||||||||||||||||
| private static async selectProviderWithPing(providers: AsyncIterable<ProviderInfo>): Promise<ProviderInfo> { | ||||||||||||||||||
| let providerCount = 0 | ||||||||||||||||||
|
|
||||||||||||||||||
| // Try providers in order until we find one that responds to ping | ||||||||||||||||||
| for await (const provider of providers) { | ||||||||||||||||||
| providerCount++ | ||||||||||||||||||
| function hasPDP(provider: ProviderInfo): provider is ProviderWithPDP { | ||||||||||||||||||
| return provider.products.PDP != null | ||||||||||||||||||
| } | ||||||||||||||||||
| // Ping all providers | ||||||||||||||||||
| const pings = providers.filter(hasPDP).map((provider, index) => | ||||||||||||||||||
| new PDPServer(null, provider.products.PDP.data.serviceURL).ping().then( | ||||||||||||||||||
| () => Promise.resolve(provider), | ||||||||||||||||||
| (error) => Promise.reject({ error, index, provider }) | ||||||||||||||||||
| ) | ||||||||||||||||||
| ) | ||||||||||||||||||
|
Comment on lines
+722
to
+727
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this construct feels unnecessarily complex, can't we just There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. then does simplify this code. The only difference from making pdpProviders a local would be the ability to recalculate the provider from the index. Both resolve and reject need the provider though, so the code is simpler if you nest it like this. |
||||||||||||||||||
| let remaining = pings.length | ||||||||||||||||||
| while (remaining-- > 0) { | ||||||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I like to write this like |
||||||||||||||||||
| try { | ||||||||||||||||||
| // Create a temporary PDPServer for this specific provider's endpoint | ||||||||||||||||||
| if (!provider.products.PDP?.data.serviceURL) { | ||||||||||||||||||
| // Skip providers without PDP products | ||||||||||||||||||
| continue | ||||||||||||||||||
| } | ||||||||||||||||||
| const providerPdpServer = new PDPServer(null, provider.products.PDP.data.serviceURL) | ||||||||||||||||||
| await providerPdpServer.ping() | ||||||||||||||||||
| return provider | ||||||||||||||||||
| } catch (error) { | ||||||||||||||||||
| return await Promise.race(pings) | ||||||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This whole loop could just be replaced with a then use Also, see in the retriever code how There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Promise.any would be good. Would have to move the failure logging into the .then() reject block, but could eliminate index and remaining. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Moving the logging into the reject block would actually be noisy if we abort though. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The rule is something like: |
||||||||||||||||||
| } catch (err: any) { | ||||||||||||||||||
| const { error, index, provider } = err | ||||||||||||||||||
| console.warn( | ||||||||||||||||||
| `Provider ${provider.serviceProvider} failed ping test:`, | ||||||||||||||||||
| error instanceof Error ? error.message : String(error) | ||||||||||||||||||
| ) | ||||||||||||||||||
| // Continue to next provider | ||||||||||||||||||
| pings[index] = new Promise(() => undefined) | ||||||||||||||||||
| } | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| // All providers failed ping test | ||||||||||||||||||
| if (providerCount === 0) { | ||||||||||||||||||
| throw createError('StorageContext', 'selectProviderWithPing', 'No providers available to select from') | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| throw createError( | ||||||||||||||||||
| 'StorageContext', | ||||||||||||||||||
| 'selectProviderWithPing', | ||||||||||||||||||
| `All ${providerCount} providers failed health check. Storage may be temporarily unavailable.` | ||||||||||||||||||
| ) | ||||||||||||||||||
| return null | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| /** | ||||||||||||||||||
|
|
||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't delete
fallbackRandIndexbecause it is still used byfallbackRandU256.