Skip to content

Commit

Permalink
Merge pull request #947 from input-output-hk/feat/no-stability-window…
Browse files Browse the repository at this point in the history
…-buffer-til-volatile

feat: no stability window buffer til volatile
  • Loading branch information
mkazlauskas committed Oct 19, 2023
2 parents 24b3181 + b2244ea commit e8a2e87
Show file tree
Hide file tree
Showing 51 changed files with 1,352 additions and 905 deletions.
12 changes: 2 additions & 10 deletions packages/cardano-services/src/Program/programs/projector.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { Bootstrap } from '@cardano-sdk/projection';
import { Cardano } from '@cardano-sdk/core';
import { CommonProgramOptions, OgmiosProgramOptions, PosgresProgramOptions } from '../options';
import { DnsResolver, createDnsResolver } from '../utils';
Expand All @@ -12,8 +11,8 @@ import { Logger } from 'ts-log';
import { MissingProgramOption, UnknownServiceName } from '../errors';
import { ProjectionHttpService, ProjectionName, createTypeormProjection, storeOperators } from '../../Projection';
import { SrvRecord } from 'dns';
import { TypeormStabilityWindowBuffer, createStorePoolMetricsUpdateJob } from '@cardano-sdk/projection-typeorm';
import { createLogger } from 'bunyan';
import { createStorePoolMetricsUpdateJob } from '@cardano-sdk/projection-typeorm';
import { getConnectionConfig, getOgmiosObservableCardanoNode } from '../services';

export const BLOCKS_BUFFER_LENGTH_DEFAULT = 10;
Expand Down Expand Up @@ -50,24 +49,17 @@ const createProjectionHttpService = async (options: ProjectionMapFactoryOptions)
ogmiosUrl: args.ogmiosUrl
});
const connectionConfig$ = getConnectionConfig(dnsResolver, 'projector', '', args);
const buffer = new TypeormStabilityWindowBuffer({ logger });
const { blocksBufferLength, dropSchema, dryRun, exitAtBlockNo, handlePolicyIds, projectionNames, synchronize } = args;
const projection$ = createTypeormProjection({
blocksBufferLength,
buffer,
cardanoNode,
connectionConfig$,
devOptions: { dropSchema, synchronize },
exitAtBlockNo,
logger,
projectionOptions: {
handlePolicyIds
},
projectionSource$: Bootstrap.fromCardanoNode({
blocksBufferLength,
buffer,
cardanoNode,
logger
}),
projections: projectionNames
});
return new ProjectionHttpService({ dryRun, projection$, projectionNames }, { logger });
Expand Down
34 changes: 25 additions & 9 deletions packages/cardano-services/src/Program/services/pgboss.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
PoolRegistrationEntity,
PoolRetirementEntity,
StakePoolEntity,
createDataSource,
createPgBoss,
isRecoverableTypeormError
} from '@cardano-sdk/projection-typeorm';
Expand All @@ -32,14 +33,13 @@ import { Pool } from 'pg';
import { Router } from 'express';
import { StakePoolMetadataProgramOptions } from '../options/stakePoolMetadata';
import { contextLogger } from '@cardano-sdk/util';
import { createObservableDataSource } from '../../Projection/createTypeormProjection';
import { retryBackoff } from 'backoff-rxjs';
import PgBoss from 'pg-boss';

/**
* The entities required by the job handlers
*/
export const pgBossEntities = [
export const pgBossEntities: Function[] = [
CurrentPoolMetricsEntity,
BlockEntity,
PoolMetadataEntity,
Expand All @@ -49,13 +49,28 @@ export const pgBossEntities = [
];

export const createPgBossDataSource = (connectionConfig$: Observable<PgConnectionConfig>, logger: Logger) =>
createObservableDataSource({
connectionConfig$,
entities: pgBossEntities,
extensions: {},
logger,
migrationsRun: false
});
// TODO: use createObservableDataSource from projection-typeorm package.
// A challenge in doing that is that we call subscriber.error on retryable errors in order to reconnect.
// Doing that with createObservableDataSource will 'destroy' the data source that's currently used,
// so pg-boss is then unable to update job status and it stays 'active', not available for the newly
// recreated worker to be picked up.
// TODO: this raises another question - what happens when database connection drops while working on a job?
// Will it stay 'active' forever, or will pg-boss eventually update it due to some sort of timeout?
connectionConfig$.pipe(
switchMap((connectionConfig) =>
from(
(async () => {
const dataSource = createDataSource({
connectionConfig,
entities: pgBossEntities,
logger
});
await dataSource.initialize();
return dataSource;
})()
)
)
);

export type PgBossWorkerArgs = CommonProgramOptions &
StakePoolMetadataProgramOptions &
Expand Down Expand Up @@ -140,6 +155,7 @@ export class PgBossHttpService extends HttpService {
// This ensures that if an error which can't be retried arrives here is handled as a FATAL error
shouldRetry: (error: unknown) => {
const retry = isRecoverableError(error);
this.logger.debug('work() shouldRetry', retry, error);

this.#health = {
ok: false,
Expand Down
137 changes: 66 additions & 71 deletions packages/cardano-services/src/Projection/createTypeormProjection.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
/* eslint-disable prefer-spread */
import { Cardano } from '@cardano-sdk/core';
import { Bootstrap, ProjectionEvent, logProjectionProgress, requestNext } from '@cardano-sdk/projection';
import { Cardano, ObservableCardanoNode } from '@cardano-sdk/core';
import { Logger } from 'ts-log';
import { Observable, from, switchMap, takeWhile } from 'rxjs';
import { Observable, concat, defer, take, takeWhile } from 'rxjs';
import {
PgConnectionConfig,
TypeormDevOptions,
TypeormOptions,
TypeormStabilityWindowBuffer,
WithTypeormContext,
createDataSource,
createObservableConnection,
createTypeormTipTracker,
isRecoverableTypeormError,
typeormTransactionCommit,
withTypeormTransaction
Expand All @@ -19,19 +22,22 @@ import {
ProjectionOptions,
prepareTypeormProjection
} from './prepareTypeormProjection';
import { ProjectionEvent, logProjectionProgress, requestNext } from '@cardano-sdk/projection';
import { ReconnectionConfig, passthrough, shareRetryBackoff, toEmpty } from '@cardano-sdk/util-rxjs';
import { migrations } from './migrations';
import { passthrough, shareRetryBackoff } from '@cardano-sdk/util-rxjs';

const reconnectionConfig: ReconnectionConfig = {
initialInterval: 50,
maxInterval: 5000
};

export interface CreateTypeormProjectionProps {
projections: ProjectionName[];
blocksBufferLength: number;
buffer?: TypeormStabilityWindowBuffer;
projectionSource$: Observable<ProjectionEvent>;
connectionConfig$: Observable<PgConnectionConfig>;
devOptions?: TypeormDevOptions;
exitAtBlockNo?: Cardano.BlockNo;
logger: Logger;
cardanoNode: ObservableCardanoNode;
projectionOptions?: ProjectionOptions;
}

Expand All @@ -44,47 +50,6 @@ const applyStores =
(evt$: Observable<T>) =>
evt$.pipe.apply(evt$, selectedStores as any) as Observable<T>;

export const createObservableDataSource = ({
connectionConfig$,
logger,
buffer,
devOptions,
entities,
extensions,
migrationsRun
}: Omit<
CreateTypeormProjectionProps,
'blocksBufferLength' | 'exitAtBlockNo' | 'projections' | 'projectionSource$' | 'projectionOptions'
> &
Pick<PreparedProjection, 'entities' | 'extensions'> & { migrationsRun: boolean }) =>
connectionConfig$.pipe(
switchMap((connectionConfig) =>
from(
(async () => {
const dataSource = createDataSource({
connectionConfig,
devOptions,
entities,
extensions,
logger,
options: {
installExtensions: true,
migrations: migrations.filter(({ entity }) => entities.includes(entity as any)),
migrationsRun: migrationsRun && !devOptions?.synchronize
}
});
await dataSource.initialize();
if (buffer) {
const queryRunner = dataSource.createQueryRunner('master');
await buffer.initialize(queryRunner);
await queryRunner.release();
}
return dataSource;
})()
)
)
);

/**
* Creates a projection observable that applies a sequence of operators
* required to project requested `projections` into a postgres database.
Expand All @@ -95,12 +60,11 @@ export const createObservableDataSource = ({
export const createTypeormProjection = ({
blocksBufferLength,
projections,
projectionSource$,
connectionConfig$,
logger,
devOptions,
devOptions: requestedDevOptions,
cardanoNode,
exitAtBlockNo,
buffer,
projectionOptions
}: CreateTypeormProjectionProps) => {
const { handlePolicyIds } = { handlePolicyIds: [], ...projectionOptions };
Expand All @@ -110,34 +74,65 @@ export const createTypeormProjection = ({

const { mappers, entities, stores, extensions } = prepareTypeormProjection(
{
buffer,
options: projectionOptions,
projections
},
{ logger }
);
const dataSource$ = createObservableDataSource({
const connect = (options?: TypeormOptions, devOptions?: TypeormDevOptions) =>
createObservableConnection({
connectionConfig$,
devOptions,
entities,
extensions,
logger,
options
});

const tipTracker = createTypeormTipTracker({
connection$: connect(),
reconnectionConfig
});
const buffer = new TypeormStabilityWindowBuffer({
connection$: connect(),
logger,
reconnectionConfig
});
const projectionSource$ = Bootstrap.fromCardanoNode({
blocksBufferLength,
buffer,
connectionConfig$,
devOptions,
entities,
extensions,
cardanoNode,
logger,
migrationsRun: true
projectedTip$: tipTracker.tip$
});
return projectionSource$.pipe(
applyMappers(mappers),
shareRetryBackoff(
(evt$) =>
evt$.pipe(
withTypeormTransaction({ dataSource$, logger }, extensions),
applyStores(stores),
typeormTransactionCommit()
return concat(
// initialize database before starting the projector
connect(
{
installExtensions: true,
migrations: migrations.filter(({ entity }) => entities.includes(entity as any)),
migrationsRun: !requestedDevOptions?.synchronize
},
requestedDevOptions
).pipe(take(1), toEmpty),
defer(() =>
projectionSource$.pipe(
applyMappers(mappers),
shareRetryBackoff(
(evt$) =>
evt$.pipe(
withTypeormTransaction({ connection$: connect() }),
applyStores(stores),
buffer.storeBlockData(),
typeormTransactionCommit()
),
{ shouldRetry: isRecoverableTypeormError }
),
{ shouldRetry: isRecoverableTypeormError }
),
requestNext(),
logProjectionProgress(logger),
exitAtBlockNo ? takeWhile((event) => event.block.header.blockNo < exitAtBlockNo) : passthrough()
tipTracker.trackProjectedTip(),
requestNext(),
logProjectionProgress(logger),
exitAtBlockNo ? takeWhile((event) => event.block.header.blockNo < exitAtBlockNo) : passthrough()
)
)
);
};
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import {
StakeKeyRegistrationEntity,
StakePoolEntity,
TokensEntity,
TypeormStabilityWindowBuffer,
createStorePoolMetricsUpdateJob,
storeAddresses,
storeAssets,
Expand Down Expand Up @@ -139,7 +138,7 @@ type Entity = Entities[EntityName];
const storeEntities: Partial<Record<StoreName, EntityName[]>> = {
storeAddresses: ['address'],
storeAssets: ['asset'],
storeBlock: ['block'],
storeBlock: ['block', 'blockData'],
storeHandleMetadata: ['handleMetadata', 'output'],
storeHandles: ['handle', 'asset', 'tokens', 'output'],
storeNftMetadata: ['asset'],
Expand Down Expand Up @@ -303,7 +302,6 @@ const keyOf = <T extends {}>(obj: T, value: unknown): keyof T | null => {

export interface PrepareTypeormProjectionProps {
projections: ProjectionName[];
buffer?: TypeormStabilityWindowBuffer;
options?: ProjectionOptions;
}

Expand All @@ -312,7 +310,7 @@ export interface PrepareTypeormProjectionProps {
* based on 'projections' and presence of 'buffer':
*/
export const prepareTypeormProjection = (
{ projections, buffer, options = {} }: PrepareTypeormProjectionProps,
{ projections, options = {} }: PrepareTypeormProjectionProps,
dependencies: WithLogger
) => {
const mapperSorter = new Sorter<MapperOperator>();
Expand All @@ -329,10 +327,6 @@ export const prepareTypeormProjection = (
const selectedEntities = entitySorter.nodes;
const selectedMappers = mapperSorter.nodes;
const selectedStores = storeSorter.nodes;
if (buffer) {
selectedEntities.push(BlockDataEntity);
selectedStores.push(buffer.storeBlockData());
}
const extensions = requiredExtensions(projections);
return {
__debug: {
Expand Down
Loading

0 comments on commit e8a2e87

Please sign in to comment.