Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add traceFunction call to metrics #2898

Merged
merged 3 commits into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions packages/interface/src/content-routing/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { AbortOptions, RoutingOptions } from '../index.js'
import type { RoutingOptions } from '../index.js'
import type { PeerInfo } from '../peer-info/index.js'
import type { CID } from 'multiformats/cid'

Expand Down Expand Up @@ -50,7 +50,7 @@ export interface ContentRouting {
* provide content corresponding to the passed CID, call this function to no
* longer remind them.
*/
cancelReprovide (key: CID, options?: AbortOptions): Promise<void>
cancelReprovide (key: CID, options?: RoutingOptions): Promise<void>

/**
* Find the providers of the passed CID.
Expand Down
11 changes: 10 additions & 1 deletion packages/interface/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -753,13 +753,22 @@ export interface LoggerOptions {
log: Logger
}

/**
* An object that includes a trace object that is passed onwards.
*
* This is used by metrics method tracing to link function calls together.
*/
export interface TraceOptions {
trace?: any
}

/**
* When a routing operation involves reading values, these options allow
* controlling where the values are read from. By default libp2p will check
* local caches but may not use the network if a valid local value is found,
* these options allow tuning that behaviour.
*/
export interface RoutingOptions extends AbortOptions, ProgressOptions {
export interface RoutingOptions extends AbortOptions, ProgressOptions, TraceOptions {
/**
* Pass `false` to not use the network
*
Expand Down
57 changes: 57 additions & 0 deletions packages/interface/src/metrics/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -488,4 +488,61 @@ export interface Metrics {
* method on the returned summary group object
*/
registerSummaryGroup: ((name: string, options?: SummaryOptions) => SummaryGroup) & ((name: string, options: CalculatedSummaryOptions<Record<string, number>>) => void)

/**
* Wrap a function for tracing purposes.
*
* All functions wrapped like this should accept a final optional options arg.
*
* In order to pass an execution context along to create a multi-layered
* trace, the index of the options arg must be specified.
*/
traceFunction <F extends (...args: any[]) => AsyncIterator<any>> (name: string, fn: F, options?: TraceGeneratorFunctionOptions<Parameters<F>, ReturnType<F>, YieldType<ReturnType<F>>>): F
traceFunction <F extends (...args: any[]) => Iterator<any>> (name: string, fn: F, options?: TraceGeneratorFunctionOptions<Parameters<F>, ReturnType<F>, YieldType<ReturnType<F>>>): F
traceFunction <F extends (...args: any[]) => any = (...args: any[]) => any> (name: string, fn: F, options?: TraceFunctionOptions<Parameters<F>, ReturnType<F>>): F

/**
* Creates a tracing context that can be used to trace a method call
*/
createTrace(): any
}

/**
* Infer the yielded type of an (async)iterable
*/
type YieldType<T extends AsyncIterator<any> | Iterator<any>> = T extends AsyncIterator<infer Y> ? Y : T extends Iterator<infer Y, any, any> ? Y : never

export type TraceAttributes = Record<string, number | string | boolean | number[] | string[] | boolean[]>

export interface TraceFunctionOptions<A, B> {
/**
* To construct a trace that spans multiple method invocations, it's necessary
* to pass the trace context onwards as part of the options object.
*
* Specify the index of the options object in the args array here.
*
* @default 0
*/
optionsIndex?: number

/**
* Set attributes on the trace by modifying the passed attributes object.
*/
getAttributesFromArgs?(args: A, attributes: TraceAttributes): TraceAttributes

/**
* Set attributes on the trace by modifying the passed attributes object. The
* object will have previously been passed to `appendAttributesFromArgs`
* and/or `appendAttributesFromYieldedValue` (if defined)
*/
getAttributesFromReturnValue?(value: B, attributes: TraceAttributes): TraceAttributes
}

export interface TraceGeneratorFunctionOptions<A, B, C = any> extends TraceFunctionOptions<A, B> {
/**
* Set attributes on the trace by modifying the passed attributes object. The
* object will have previously been passed to `appendAttributesFromArgs` (if
* defined)
*/
getAttributesFromYieldedValue? (value: C, attributes: TraceAttributes, index: number): TraceAttributes
}
17 changes: 15 additions & 2 deletions packages/kad-dht/src/content-fetching/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,13 @@ export class ContentFetching {
this.peerRouting = peerRouting
this.queryManager = queryManager
this.network = network

this.get = components.metrics?.traceFunction('libp2p.kadDHT.get', this.get.bind(this), {
optionsIndex: 1
}) ?? this.get
this.put = components.metrics?.traceFunction('libp2p.kadDHT.put', this.put.bind(this), {
optionsIndex: 2
}) ?? this.put
}

/**
Expand Down Expand Up @@ -145,7 +152,10 @@ export class ContentFetching {

// put record to the closest peers
yield * pipe(
this.peerRouting.getClosestPeers(key, { signal: options.signal }),
this.peerRouting.getClosestPeers(key, {
...options,
signal: options.signal
}),
(source) => map(source, (event) => {
return async () => {
if (event.name !== 'FINAL_PEER') {
Expand Down Expand Up @@ -252,7 +262,10 @@ export class ContentFetching {
const self = this // eslint-disable-line @typescript-eslint/no-this-alias

const getValueQuery: QueryFunc = async function * ({ peer, signal }) {
for await (const event of self.peerRouting.getValueOrPeers(peer, key, { signal })) {
for await (const event of self.peerRouting.getValueOrPeers(peer, key, {
...options,
signal
})) {
yield event

if (event.name === 'PEER_RESPONSE' && (event.record != null)) {
Expand Down
23 changes: 23 additions & 0 deletions packages/kad-dht/src/content-routing/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,29 @@
this.queryManager = queryManager
this.routingTable = routingTable
this.providers = providers

this.findProviders = components.metrics?.traceFunction('libp2p.kadDHT.findProviders', this.findProviders.bind(this), {
optionsIndex: 1,
getAttributesFromYieldedValue: (event, attrs: { providers?: string[] }) => {
if (event.name === 'PROVIDER') {
attrs.providers ??= []
attrs.providers.push(...event.providers.map(info => info.id.toString()))
}

return attrs
}

Check warning on line 63 in packages/kad-dht/src/content-routing/index.ts

View check run for this annotation

Codecov / codecov/patch

packages/kad-dht/src/content-routing/index.ts#L57-L63

Added lines #L57 - L63 were not covered by tests
}) ?? this.findProviders
this.provide = components.metrics?.traceFunction('libp2p.kadDHT.provide', this.provide.bind(this), {
optionsIndex: 1,
getAttributesFromYieldedValue: (event, attrs: { providers?: string[] }) => {
if (event.name === 'PEER_RESPONSE' && event.messageName === 'ADD_PROVIDER') {
attrs.providers ??= []
attrs.providers.push(event.from.toString())
}

return attrs
}

Check warning on line 74 in packages/kad-dht/src/content-routing/index.ts

View check run for this annotation

Codecov / codecov/patch

packages/kad-dht/src/content-routing/index.ts#L68-L74

Added lines #L68 - L74 were not covered by tests
}) ?? this.provide
}

/**
Expand Down
55 changes: 55 additions & 0 deletions packages/kad-dht/src/network.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,61 @@
operations: components.metrics?.registerCounterGroup(`${init.metricsPrefix}_outbound_rpc_requests_total`),
errors: components.metrics?.registerCounterGroup(`${init.metricsPrefix}_outbound_rpc_errors_total`)
}

this.sendRequest = components.metrics?.traceFunction('libp2p.kadDHT.sendRequest', this.sendRequest.bind(this), {
optionsIndex: 2,
getAttributesFromArgs ([to, message], attrs) {
return {
...attrs,
to: to.toString(),
'message type': `${message.type}`
}

Check warning on line 68 in packages/kad-dht/src/network.ts

View check run for this annotation

Codecov / codecov/patch

packages/kad-dht/src/network.ts#L64-L68

Added lines #L64 - L68 were not covered by tests
},
getAttributesFromYieldedValue: (event, attrs) => {
if (event.name === 'PEER_RESPONSE') {
if (event.providers.length > 0) {
event.providers.forEach((value, index) => {
attrs[`providers-${index}`] = value.id.toString()
})
}

if (event.closer.length > 0) {
event.closer.forEach((value, index) => {
attrs[`closer-${index}`] = value.id.toString()
})
}
}

return attrs
}

Check warning on line 86 in packages/kad-dht/src/network.ts

View check run for this annotation

Codecov / codecov/patch

packages/kad-dht/src/network.ts#L71-L86

Added lines #L71 - L86 were not covered by tests
}) ?? this.sendRequest
this.sendMessage = components.metrics?.traceFunction('libp2p.kadDHT.sendMessage', this.sendMessage.bind(this), {
optionsIndex: 2,
getAttributesFromArgs ([to, message], attrs) {
return {
...attrs,
to: to.toString(),
'message type': `${message.type}`
}

Check warning on line 95 in packages/kad-dht/src/network.ts

View check run for this annotation

Codecov / codecov/patch

packages/kad-dht/src/network.ts#L91-L95

Added lines #L91 - L95 were not covered by tests
},
getAttributesFromYieldedValue: (event, attrs) => {
if (event.name === 'PEER_RESPONSE') {
if (event.providers.length > 0) {
event.providers.forEach((value, index) => {
attrs[`providers-${index}`] = value.id.toString()
})
}

if (event.closer.length > 0) {
event.closer.forEach((value, index) => {
attrs[`closer-${index}`] = value.id.toString()
})
}
}

return attrs
}

Check warning on line 113 in packages/kad-dht/src/network.ts

View check run for this annotation

Codecov / codecov/patch

packages/kad-dht/src/network.ts#L98-L113

Added lines #L98 - L113 were not covered by tests
}) ?? this.sendMessage
}

/**
Expand Down
10 changes: 9 additions & 1 deletion packages/kad-dht/src/peer-routing/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@ import type { Network } from '../network.js'
import type { QueryManager, QueryOptions } from '../query/manager.js'
import type { QueryFunc } from '../query/types.js'
import type { RoutingTable } from '../routing-table/index.js'
import type { ComponentLogger, Logger, PeerId, PeerInfo, PeerStore, RoutingOptions } from '@libp2p/interface'
import type { ComponentLogger, Logger, Metrics, PeerId, PeerInfo, PeerStore, RoutingOptions } from '@libp2p/interface'

export interface PeerRoutingComponents {
peerId: PeerId
peerStore: PeerStore
logger: ComponentLogger
metrics?: Metrics
}

export interface PeerRoutingInit {
Expand Down Expand Up @@ -55,6 +56,13 @@ export class PeerRouting {
this.peerStore = components.peerStore
this.peerId = components.peerId
this.log = components.logger.forComponent(`${init.logPrefix}:peer-routing`)

this.findPeer = components.metrics?.traceFunction('libp2p.kadDHT.findPeer', this.findPeer.bind(this), {
optionsIndex: 1
}) ?? this.findPeer
this.getClosestPeers = components.metrics?.traceFunction('libp2p.kadDHT.getClosestPeers', this.getClosestPeers.bind(this), {
optionsIndex: 1
}) ?? this.getClosestPeers
}

/**
Expand Down
4 changes: 2 additions & 2 deletions packages/kad-dht/src/query-self.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@ import { timeOperationMethod } from './utils.js'
import type { OperationMetrics } from './kad-dht.js'
import type { PeerRouting } from './peer-routing/index.js'
import type { RoutingTable } from './routing-table/index.js'
import type { ComponentLogger, Logger, PeerId, Startable } from '@libp2p/interface'
import type { ComponentLogger, Logger, Metrics, PeerId, Startable } from '@libp2p/interface'
import type { DeferredPromise } from 'p-defer'

export interface QuerySelfInit {
logPrefix: string
peerRouting: PeerRouting
Expand All @@ -28,6 +27,7 @@ export interface QuerySelfInit {
export interface QuerySelfComponents {
peerId: PeerId
logger: ComponentLogger
metrics?: Metrics
}

/**
Expand Down
1 change: 1 addition & 0 deletions packages/kad-dht/src/query/manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ export class QueryManager implements Startable {
// Create query paths from the starting peers
const paths = peersToQuery.map((peer, index) => {
return queryPath({
...options,
key,
startingPeer: peer,
ourPeerId: this.peerId,
Expand Down
1 change: 1 addition & 0 deletions packages/kad-dht/src/query/query-path.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ export async function * queryPath (options: QueryPathOptions): AsyncGenerator<Qu

try {
for await (const event of query({
...options,
key,
peer,
signal: compoundSignal,
Expand Down
56 changes: 54 additions & 2 deletions packages/libp2p/src/content-routing.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { NotStartedError } from '@libp2p/interface'
import { PeerSet } from '@libp2p/peer-collections'
import merge from 'it-merge'
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
import { NoContentRoutersError } from './errors.js'
import type { AbortOptions, ComponentLogger, ContentRouting, PeerInfo, PeerRouting, PeerStore, RoutingOptions, Startable } from '@libp2p/interface'
import type { AbortOptions, ComponentLogger, ContentRouting, Metrics, PeerInfo, PeerRouting, PeerStore, RoutingOptions, Startable } from '@libp2p/interface'
import type { CID } from 'multiformats/cid'

export interface CompoundContentRoutingInit {
Expand All @@ -13,6 +14,7 @@
peerStore: PeerStore
peerRouting: PeerRouting
logger: ComponentLogger
metrics?: Metrics
}

export class CompoundContentRouting implements ContentRouting, Startable {
Expand All @@ -24,6 +26,56 @@
this.routers = init.routers ?? []
this.started = false
this.components = components

this.findProviders = components.metrics?.traceFunction('libp2p.contentRouting.findProviders', this.findProviders.bind(this), {
optionsIndex: 1,
getAttributesFromArgs: ([cid], attrs) => {
return {
...attrs,
cid: cid.toString()
}

Check warning on line 36 in packages/libp2p/src/content-routing.ts

View check run for this annotation

Codecov / codecov/patch

packages/libp2p/src/content-routing.ts#L33-L36

Added lines #L33 - L36 were not covered by tests
},
getAttributesFromYieldedValue: (value, attrs: { providers?: string[] }) => {
return {
...attrs,
providers: [...(Array.isArray(attrs.providers) ? attrs.providers : []), value.id.toString()]
}
}

Check warning on line 43 in packages/libp2p/src/content-routing.ts

View check run for this annotation

Codecov / codecov/patch

packages/libp2p/src/content-routing.ts#L39-L43

Added lines #L39 - L43 were not covered by tests
}) ?? this.findProviders
this.provide = components.metrics?.traceFunction('libp2p.contentRouting.provide', this.provide.bind(this), {
optionsIndex: 1,
getAttributesFromArgs: ([cid], attrs) => {
return {
...attrs,
cid: cid.toString()
}
}

Check warning on line 52 in packages/libp2p/src/content-routing.ts

View check run for this annotation

Codecov / codecov/patch

packages/libp2p/src/content-routing.ts#L48-L52

Added lines #L48 - L52 were not covered by tests
}) ?? this.provide
this.cancelReprovide = components.metrics?.traceFunction('libp2p.contentRouting.cancelReprovide', this.cancelReprovide.bind(this), {
optionsIndex: 1,
getAttributesFromArgs: ([cid], attrs) => {
return {
...attrs,
cid: cid.toString()
}
}

Check warning on line 61 in packages/libp2p/src/content-routing.ts

View check run for this annotation

Codecov / codecov/patch

packages/libp2p/src/content-routing.ts#L57-L61

Added lines #L57 - L61 were not covered by tests
}) ?? this.cancelReprovide
this.put = components.metrics?.traceFunction('libp2p.contentRouting.put', this.put.bind(this), {
optionsIndex: 2,
getAttributesFromArgs: ([key]) => {
return {
key: uint8ArrayToString(key, 'base36')
}
}

Check warning on line 69 in packages/libp2p/src/content-routing.ts

View check run for this annotation

Codecov / codecov/patch

packages/libp2p/src/content-routing.ts#L66-L69

Added lines #L66 - L69 were not covered by tests
}) ?? this.put
this.get = components.metrics?.traceFunction('libp2p.contentRouting.get', this.get.bind(this), {
optionsIndex: 1,
getAttributesFromArgs: ([key]) => {
return {
key: uint8ArrayToString(key, 'base36')
}
}

Check warning on line 77 in packages/libp2p/src/content-routing.ts

View check run for this annotation

Codecov / codecov/patch

packages/libp2p/src/content-routing.ts#L74-L77

Added lines #L74 - L77 were not covered by tests
}) ?? this.get
}

readonly [Symbol.toStringTag] = '@libp2p/content-routing'
Expand All @@ -43,7 +95,7 @@
/**
* Iterates over all content routers in parallel to find providers of the given key
*/
async * findProviders (key: CID, options: RoutingOptions = {}): AsyncIterable<PeerInfo> {
async * findProviders (key: CID, options: RoutingOptions = {}): AsyncGenerator<PeerInfo> {
if (this.routers.length === 0) {
throw new NoContentRoutersError('No content routers available')
}
Expand Down
Loading
Loading