Skip to content

Commit

Permalink
feat: add s3 ops metrics (#317)
Browse files Browse the repository at this point in the history
* feat: add s3 ops metrics

* changes

* fix: add event emitter and change metric to Counter
  • Loading branch information
octo-gone authored and belopash committed Jul 23, 2024
1 parent 00d1d69 commit 1f6a573
Show file tree
Hide file tree
Showing 13 changed files with 135 additions and 14 deletions.
2 changes: 2 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,5 @@ docker-compose.yml
/ops/docker-publish.sh

**/.DS_Store

*.temp
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ common/autoinstallers/*/.npmrc

# IDE
.idea
.vscode

# Built js libs
/*/*/lib
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"changes": [
{
"packageName": "@subsquid/util-internal-dump-cli",
"comment": "add prometheus metrics for S3 file system handler",
"type": "patch"
}
],
"packageName": "@subsquid/util-internal-dump-cli"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"changes": [
{
"packageName": "@subsquid/util-internal-fs",
"comment": "add metrics for S3 file system handler",
"type": "patch"
}
],
"packageName": "@subsquid/util-internal-fs"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"changes": [
{
"packageName": "@subsquid/util-internal-ingest-cli",
"comment": "add prometheus metrics",
"type": "minor"
}
],
"packageName": "@subsquid/util-internal-ingest-cli"
}
3 changes: 2 additions & 1 deletion common/config/rush/pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 10 additions & 2 deletions util/util-internal-dump-cli/src/dumper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {createFs, Fs} from '@subsquid/util-internal-fs'
import {assertRange, printRange, Range, rangeEnd} from '@subsquid/util-internal-range'
import {Command} from 'commander'
import {PrometheusServer} from './prometheus'
import {EventEmitter} from 'events'


export interface DumperOptions {
Expand Down Expand Up @@ -93,7 +94,7 @@ export abstract class Dumper<B extends {hash: string, height: number}, O extends
@def
protected destination(): Fs {
let dest = assertNotNull(this.options().dest)
return createFs(dest)
return createFs(dest, this.eventEmitter())
}

@def
Expand All @@ -118,14 +119,21 @@ export abstract class Dumper<B extends {hash: string, height: number}, O extends
return true
}

@def
protected eventEmitter(): EventEmitter {
return new EventEmitter()
}

@def
protected prometheus() {
return new PrometheusServer(
let server = new PrometheusServer(
this.options().metrics ?? 0,
() => this.getFinalizedHeight(),
this.rpc(),
this.log().child('prometheus')
)
this.eventEmitter().on('S3FsOperation', (op: string) => server.incS3Requests(op))
return server
}

private async *ingest(from?: number, prevHash?: string): AsyncIterable<B[]> {
Expand Down
20 changes: 16 additions & 4 deletions util/util-internal-dump-cli/src/prometheus.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import {Logger} from '@subsquid/logger'
import {RpcClient} from '@subsquid/rpc-client'
import {createPrometheusServer, ListeningServer} from '@subsquid/util-internal-prometheus-server'
import {collectDefaultMetrics, Gauge, Registry} from 'prom-client'
import {collectDefaultMetrics, Gauge, Counter, Registry} from 'prom-client'


export class PrometheusServer {
private registry = new Registry()
private chainHeightGauge: Gauge
private lastWrittenBlockGauge: Gauge
private rpcRequestsGauge: Gauge
private s3RequestsCounter: Counter

constructor(
private port: number,
Expand All @@ -30,13 +31,13 @@ export class PrometheusServer {
}
this.set(chainHeight)
}
});
})

this.lastWrittenBlockGauge = new Gauge({
name: 'sqd_dump_last_written_block',
help: 'Last saved block',
registers: [this.registry]
});
})

this.rpcRequestsGauge = new Gauge({
name: 'sqd_rpc_request_count',
Expand All @@ -56,7 +57,14 @@ export class PrometheusServer {
kind: 'failure'
}, metrics.connectionErrors)
}
});
})

this.s3RequestsCounter = new Counter({
name: 'sqd_s3_request_count',
help: 'Number of s3 requests made',
labelNames: ['kind'],
registers: [this.registry],
})

collectDefaultMetrics({register: this.registry})
}
Expand All @@ -65,6 +73,10 @@ export class PrometheusServer {
this.lastWrittenBlockGauge.set(block)
}

incS3Requests(kind: string, value?: number) {
this.s3RequestsCounter.inc({kind}, value)
}

serve(): Promise<ListeningServer> {
return createPrometheusServer(this.registry, this.port)
}
Expand Down
10 changes: 6 additions & 4 deletions util/util-internal-fs/src/factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ import {S3Client} from '@aws-sdk/client-s3'
import {Fs} from './interface'
import {LocalFs} from './local'
import {S3Fs} from './s3'
import {EventEmitter} from 'events'


export function createFs(url: string): Fs {
export function createFs(url: string, eventEmitter?: EventEmitter): Fs {
if (url.includes('://')) {
let protocol = new URL(url).protocol
switch(protocol) {
case 's3:':
return createS3Fs(url.slice('s3://'.length))
return createS3Fs(url.slice('s3://'.length), eventEmitter)
default:
throw new Error(`Unsupported protocol: ${protocol}`)
}
Expand All @@ -19,12 +20,13 @@ export function createFs(url: string): Fs {
}


function createS3Fs(root: string): S3Fs {
function createS3Fs(root: string, eventEmitter?: EventEmitter): S3Fs {
let client = new S3Client({
endpoint: process.env.AWS_S3_ENDPOINT
})
return new S3Fs({
root,
client
client,
eventEmitter
})
}
12 changes: 11 additions & 1 deletion util/util-internal-fs/src/s3.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,25 @@ import assert from 'assert'
import {Readable} from 'stream'
import Upath from 'upath'
import {Fs} from './interface'
import {EventEmitter} from 'events'


export interface S3FsOptions {
root: string
client: S3Client
eventEmitter?: EventEmitter
}


export class S3Fs implements Fs {
public readonly client: S3Client
private root: string
private eventEmitter?: EventEmitter

constructor(options: S3FsOptions) {
this.client = options.client
this.root = Upath.normalizeTrim(options.root)
this.eventEmitter = options.eventEmitter
splitPath(this.root)
}

Expand Down Expand Up @@ -52,7 +56,8 @@ export class S3Fs implements Fs {
cd(...path: string[]): S3Fs {
return new S3Fs({
client: this.client,
root: this.resolve(path)
root: this.resolve(path),
eventEmitter: this.eventEmitter
})
}

Expand All @@ -74,6 +79,7 @@ export class S3Fs implements Fs {
ContinuationToken
})
)
this.eventEmitter?.emit('S3FsOperation', 'ListObjectsV2')

// process folder names
if (res.CommonPrefixes) {
Expand Down Expand Up @@ -116,6 +122,7 @@ export class S3Fs implements Fs {
Key,
Body: content
}))
this.eventEmitter?.emit('S3FsOperation', 'PutObject')
}

async delete(path: string): Promise<void> {
Expand All @@ -129,6 +136,7 @@ export class S3Fs implements Fs {
ContinuationToken
})
)
this.eventEmitter?.emit('S3FsOperation', 'ListObjectsV2')

if (list.Contents) {
let Objects: ObjectIdentifier[] = []
Expand All @@ -144,6 +152,7 @@ export class S3Fs implements Fs {
Objects
}
}))
this.eventEmitter?.emit('S3FsOperation', 'DeleteObjects')
}

if (list.IsTruncated) {
Expand All @@ -160,6 +169,7 @@ export class S3Fs implements Fs {
Bucket,
Key
}))
this.eventEmitter?.emit('S3FsOperation', 'GetObject')
assert(res.Body instanceof Readable)
return res.Body
}
Expand Down
4 changes: 3 additions & 1 deletion util/util-internal-ingest-cli/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
"@subsquid/util-internal-archive-layout": "^0.4.0",
"@subsquid/util-internal-commander": "^1.4.0",
"@subsquid/util-internal-fs": "^0.1.2",
"@subsquid/util-internal-prometheus-server": "^1.3.0",
"@subsquid/util-internal-http-server": "^2.0.0",
"@subsquid/util-internal-range": "^0.3.0",
"commander": "^11.1.0"
"commander": "^11.1.0",
"prom-client": "^14.2.0"
},
"devDependencies": {
"@types/node": "^18.18.14",
Expand Down
26 changes: 25 additions & 1 deletion util/util-internal-ingest-cli/src/ingest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import {HttpApp, HttpContext, HttpError, waitForInterruption} from '@subsquid/ut
import {assertRange, isRange, Range} from '@subsquid/util-internal-range'
import {Command} from 'commander'
import {Writable} from 'stream'
import {PrometheusServer} from './prometheus'
import {EventEmitter} from 'events'


export interface IngestOptions {
Expand All @@ -19,6 +21,7 @@ export interface IngestOptions {
endpointCapacity?: number
endpointRateLimit?: number
endpointMaxBatchCallSize?: number
metrics?: number
}


Expand Down Expand Up @@ -55,6 +58,7 @@ export class Ingest<O extends IngestOptions = IngestOptions> {
program.option('--first-block <number>', 'Height of the first block to ingest', nat)
program.option('--last-block <number>', 'Height of the last block to ingest', nat)
program.option('--service <port>', 'Run as HTTP data service', nat)
program.option('--metrics <port>', 'Enable prometheus metrics server', nat)
return program
}

Expand Down Expand Up @@ -95,10 +99,24 @@ export class Ingest<O extends IngestOptions = IngestOptions> {
@def
protected archive(): ArchiveLayout {
let url = assertNotNull(this.options().rawArchive, 'archive is not specified')
let fs = createFs(url)
let fs = createFs(url, this.eventEmitter())
return new ArchiveLayout(fs)
}

@def
protected eventEmitter(): EventEmitter {
return new EventEmitter()
}

@def
protected prometheus() {
let server = new PrometheusServer(
this.options().metrics ?? 0,
)
this.eventEmitter().on('S3FsOperation', (op: string) => server.incS3Requests(op))
return server
}

private async ingest(range: Range, writable: Writable): Promise<void> {
for await (let blocks of this.getBlocks(range)) {
await waitDrain(writable)
Expand All @@ -113,6 +131,7 @@ export class Ingest<O extends IngestOptions = IngestOptions> {
let log = this.log().child('service')
let app = new HttpApp()
let self = this
let prometheus = this.prometheus()

app.setMaxRequestBody(1024)
app.setLogger(log)
Expand All @@ -138,6 +157,11 @@ export class Ingest<O extends IngestOptions = IngestOptions> {
}
})

if (this.options().metrics != null) {
let server = await prometheus.serve()
this.log().info(`prometheus metrics are available on port ${server.port}`)
}

let server = await app.listen(port)
log.info(
`Data service is listening on port ${server.port}. ` +
Expand Down
29 changes: 29 additions & 0 deletions util/util-internal-ingest-cli/src/prometheus.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import {createPrometheusServer, ListeningServer} from '@subsquid/util-internal-prometheus-server'
import {collectDefaultMetrics, Counter, Registry} from 'prom-client'


export class PrometheusServer {
private registry = new Registry()
private s3RequestsCounter: Counter

constructor(
private port: number,
) {
this.s3RequestsCounter = new Counter({
name: 'sqd_s3_request_count',
help: 'Number of s3 requests made',
labelNames: ['kind'],
registers: [this.registry],
})

collectDefaultMetrics({register: this.registry})
}

incS3Requests(kind: string, value?: number) {
this.s3RequestsCounter.inc({kind}, value)
}

serve(): Promise<ListeningServer> {
return createPrometheusServer(this.registry, this.port)
}
}

0 comments on commit 1f6a573

Please sign in to comment.