Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Transaction projection indexed by credentials #1430

Draft
wants to merge 28 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
4559b99
refactor(ogmios-block-type-mapping): add block type; transaction cbor
will-break-it Aug 21, 2024
32e5ffc
feat: add transaction projection indexed by credentials
will-break-it Aug 21, 2024
2d388fa
refactor: add transactions for projector
will-break-it Aug 21, 2024
0e9f51c
fix: rename projection
will-break-it Aug 22, 2024
e434b5a
fix: whitespace for projections
will-break-it Aug 22, 2024
e9eab1e
feat: add transaction projection indexed by credentials
will-break-it Aug 21, 2024
e244cf1
refactor: add transactions for projector
will-break-it Aug 21, 2024
67ef970
fix: rename projection
will-break-it Aug 22, 2024
df16cd2
refactor: allow reading invalid entropy protocol parameter update
will-break-it Aug 22, 2024
ae022be
fix: add mapper for tx cbor
will-break-it Aug 22, 2024
60ecfc3
fix: add tx mapper cbor
will-break-it Aug 22, 2024
f73fecb
fix(LW-11312): temporary fix to not validate too long shelley addresses
will-break-it Aug 27, 2024
a1b59c3
fix(LW-11296): temporary fix for protocol parameter entropy reset
will-break-it Aug 27, 2024
fc6c0f3
feat: add log duration operator
will-break-it Aug 29, 2024
60114c4
chore(core): remove unused import
mkazlauskas Aug 29, 2024
19dc05b
feat: implement operator duration measure
mkazlauskas Aug 29, 2024
62901b6
refactor(credential-projection): add inmemory cache for hydrating txIn
will-break-it Sep 2, 2024
1725716
refactor(credential-projection): remove consumed txIns from cache
will-break-it Sep 2, 2024
8c7437a
refactor(credential-projection-cache): made cache mutation private to…
will-break-it Sep 3, 2024
e97228d
refactor(credential-projection): introduced manager to simplify colle…
will-break-it Sep 4, 2024
fac2747
refactor: add back coin which is not nullable
will-break-it Sep 4, 2024
eef04b0
fix: add byron credential mapping
will-break-it Sep 18, 2024
7e27e0d
refactor: add back output entity index
will-break-it Sep 18, 2024
d28ff85
fix: add check if resolvable txins are not empty
will-break-it Sep 18, 2024
89331db
fix: add export of credential manager
will-break-it Sep 18, 2024
8347299
fix(cardano-services): remove storeAssets from transaction projection
mkazlauskas Sep 18, 2024
e96fb94
feat: set projection postgres tx isolation level to default
mkazlauskas Sep 18, 2024
81565c9
fix: add index for foreign relationship transaction to block id
will-break-it Sep 23, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion compose/common.yml
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ services:
- *projector-environment
- *sdk-environment
POSTGRES_DB_FILE: /run/secrets/postgres_db_wallet_api
PROJECTION_NAMES: protocol-parameters
PROJECTION_NAMES: protocol-parameters,transactions
ports:
- ${WALLET_API_PROJECTOR_PORT:-4005}:3000

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,7 @@ export const mapBlock = (
: Cardano.SlotLeader(blockModel.slot_leader_hash.toString('hex')),
totalOutput: BigInt(blockOutputModel?.output ?? 0),
txCount: Number(blockModel.tx_count),
type: blockModel.type,
vrf: blockModel.vrf as unknown as Cardano.VrfVkBech32
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ export interface BlockModel {
slot_no: string;
time: string;
tx_count: string;
type: Cardano.BlockType;
vrf: string;
}

Expand Down
54 changes: 43 additions & 11 deletions packages/cardano-services/src/Projection/createTypeormProjection.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
/* eslint-disable prefer-spread */
import { Bootstrap, ProjectionEvent, logProjectionProgress, requestNext } from '@cardano-sdk/projection';
import {
Bootstrap,
ProjectionEvent,
logProjectionProgress,
requestNext,
withOperatorDuration
} from '@cardano-sdk/projection';
import { Cardano, ObservableCardanoNode } from '@cardano-sdk/core';
import { Logger } from 'ts-log';
import { Observable, concat, defer, groupBy, mergeMap, take, takeWhile } from 'rxjs';
Expand Down Expand Up @@ -41,14 +47,35 @@ export interface CreateTypeormProjectionProps {
projectionOptions?: ProjectionOptions;
}

type TrackDurationProps = {
operatorNames: Array<string | null>;
};

const applyMappers =
<T = {}>(selectedMappers: PreparedProjection['mappers']) =>
<T = {}>(selectedMappers: PreparedProjection['mappers'], trackDurationProps?: TrackDurationProps) =>
(evt$: Observable<ProjectionEvent>) =>
evt$.pipe.apply(evt$, selectedMappers as any) as Observable<ProjectionEvent<T>>;
evt$.pipe.apply(
evt$,
trackDurationProps
? selectedMappers.map((mapper, i) =>
withOperatorDuration(trackDurationProps.operatorNames[i] || '', mapper as any)
)
: (selectedMappers as any)
) as Observable<ProjectionEvent<T>>;
const applyStores =
<T extends WithTypeormContext>(selectedStores: PreparedProjection['stores']) =>
<T extends WithTypeormContext>(
selectedStores: PreparedProjection['stores'],
trackDurationProps?: TrackDurationProps
) =>
(evt$: Observable<T>) =>
evt$.pipe.apply(evt$, selectedStores as any) as Observable<T>;
evt$.pipe.apply(
evt$,
trackDurationProps
? selectedStores.map((mapper, i) =>
withOperatorDuration(trackDurationProps.operatorNames[i] || '', mapper as any)
)
: (selectedStores as any)
) as Observable<T>;

/**
* Creates a projection observable that applies a sequence of operators
Expand All @@ -72,7 +99,7 @@ export const createTypeormProjection = ({
logger.debug(`Creating projection with policyIds ${JSON.stringify(handlePolicyIds)}`);
logger.debug(`Using a ${blocksBufferLength} blocks buffer`);

const { mappers, entities, stores, extensions, willStore } = prepareTypeormProjection(
const { mappers, entities, stores, extensions, willStore, __debug } = prepareTypeormProjection(
{
options: projectionOptions,
projections
Expand Down Expand Up @@ -117,7 +144,9 @@ export const createTypeormProjection = ({
).pipe(take(1), toEmpty),
defer(() =>
projectionSource$.pipe(
applyMappers(mappers),
// TODO: only pass {operatorNames} if debugging;
// we should pass some cli argument here
applyMappers(mappers, { operatorNames: __debug.mappers }),
// if there are any relevant data to write into db
groupBy((evt) => willStore(evt)),
mergeMap((group$) =>
Expand All @@ -126,10 +155,13 @@ export const createTypeormProjection = ({
shareRetryBackoff(
(evt$) =>
evt$.pipe(
withTypeormTransaction({ connection$: connect() }),
applyStores(stores),
buffer.storeBlockData(),
typeormTransactionCommit()
withOperatorDuration(
'withTypeormTransaction',
withTypeormTransaction({ connection$: connect() })
),
applyStores(stores, { operatorNames: __debug.stores }),
withOperatorDuration('storeBlockData', buffer.storeBlockData()),
withOperatorDuration('typeormTransactionCommit', typeormTransactionCommit())
),
{ shouldRetry: isRecoverableTypeormError }
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {
AssetEntity,
BlockDataEntity,
BlockEntity,
CredentialEntity,
CurrentPoolMetricsEntity,
DataSourceExtensions,
GovernanceActionEntity,
Expand All @@ -18,22 +19,26 @@ import {
StakeKeyRegistrationEntity,
StakePoolEntity,
TokensEntity,
TransactionEntity,
createStorePoolMetricsUpdateJob,
createStoreStakePoolMetadataJob,
storeAddresses,
storeAssets,
storeBlock,
storeCredentials,
storeGovernanceAction,
storeHandleMetadata,
storeHandles,
storeNftMetadata,
storeStakeKeyRegistrations,
storeStakePoolRewardsJob,
storeStakePools,
storeTransactions,
storeUtxo,
willStoreAddresses,
willStoreAssets,
willStoreBlockData,
willStoreCredentials,
willStoreGovernanceAction,
willStoreHandleMetadata,
willStoreHandles,
Expand All @@ -42,6 +47,7 @@ import {
willStoreStakePoolMetadataJob,
willStoreStakePoolRewardsJob,
willStoreStakePools,
willStoreTransactions,
willStoreUtxo
} from '@cardano-sdk/projection-typeorm';
import { Cardano, ChainSyncEventType } from '@cardano-sdk/core';
Expand All @@ -61,6 +67,7 @@ export enum ProjectionName {
StakePoolMetadataJob = 'stake-pool-metadata-job',
StakePoolMetricsJob = 'stake-pool-metrics-job',
StakePoolRewardsJob = 'stake-pool-rewards-job',
Transactions = 'transactions',
UTXO = 'utxo'
}

Expand Down Expand Up @@ -106,7 +113,8 @@ const createMapperOperators = (
withNftMetadata: Mapper.withNftMetadata({ logger }),
withStakeKeyRegistrations: Mapper.withStakeKeyRegistrations(),
withStakePools: Mapper.withStakePools(),
withUtxo: Mapper.withUtxo()
withUtxo: Mapper.withUtxo(),
withValidByronAddresses: Mapper.withValidByronAddresses()
};
};
type MapperOperators = ReturnType<typeof createMapperOperators>;
Expand All @@ -117,6 +125,7 @@ export const storeOperators = {
storeAddresses: storeAddresses(),
storeAssets: storeAssets(),
storeBlock: storeBlock(),
storeCredentials: storeCredentials(),
storeGovernanceAction: storeGovernanceAction(),
storeHandleMetadata: storeHandleMetadata(),
storeHandles: storeHandles(),
Expand All @@ -129,6 +138,7 @@ export const storeOperators = {
storeStakePoolMetadataJob: createStoreStakePoolMetadataJob()(),
storeStakePoolRewardsJob: storeStakePoolRewardsJob(),
storeStakePools: storeStakePools(),
storeTransactions: storeTransactions(),
storeUtxo: storeUtxo()
};
type StoreOperators = typeof storeOperators;
Expand All @@ -144,6 +154,7 @@ type WillStore = {
const willStore: Partial<WillStore> = {
storeAddresses: willStoreAddresses,
storeAssets: willStoreAssets,
storeCredentials: willStoreCredentials,
storeGovernanceAction: willStoreGovernanceAction,
storeHandleMetadata: willStoreHandleMetadata,
storeHandles: willStoreHandles,
Expand All @@ -152,6 +163,7 @@ const willStore: Partial<WillStore> = {
storeStakePoolMetadataJob: willStoreStakePoolMetadataJob,
storeStakePoolRewardsJob: willStoreStakePoolRewardsJob,
storeStakePools: willStoreStakePools,
storeTransactions: willStoreTransactions,
storeUtxo: willStoreUtxo
};

Expand All @@ -160,6 +172,7 @@ const entities = {
asset: AssetEntity,
block: BlockEntity,
blockData: BlockDataEntity,
credential: CredentialEntity,
currentPoolMetrics: CurrentPoolMetricsEntity,
governanceAction: GovernanceActionEntity,
handle: HandleEntity,
Expand All @@ -173,7 +186,8 @@ const entities = {
poolRewards: PoolRewardsEntity,
stakeKeyRegistration: StakeKeyRegistrationEntity,
stakePool: StakePoolEntity,
tokens: TokensEntity
tokens: TokensEntity,
transaction: TransactionEntity
};
export const allEntities = Object.values(entities);
type Entities = typeof entities;
Expand All @@ -184,6 +198,7 @@ const storeEntities: Partial<Record<StoreName, EntityName[]>> = {
storeAddresses: ['address'],
storeAssets: ['asset'],
storeBlock: ['block', 'blockData'],
storeCredentials: ['credential', 'transaction', 'output'],
storeGovernanceAction: ['governanceAction'],
storeHandleMetadata: ['handleMetadata', 'output'],
storeHandles: ['handle', 'asset', 'tokens', 'output'],
Expand All @@ -195,13 +210,15 @@ const storeEntities: Partial<Record<StoreName, EntityName[]>> = {
storeStakePoolMetadataJob: ['stakePool', 'currentPoolMetrics', 'poolMetadata'],
storeStakePoolRewardsJob: ['poolRewards', 'stakePool'],
storeStakePools: ['stakePool', 'currentPoolMetrics', 'poolMetadata', 'poolDelisted'],
storeTransactions: ['block', 'transaction'],
storeUtxo: ['tokens', 'output']
};

const entityInterDependencies: Partial<Record<EntityName, EntityName[]>> = {
address: ['stakeKeyRegistration'],
asset: ['block', 'nftMetadata'],
blockData: ['block'],
credential: [],
currentPoolMetrics: ['stakePool'],
governanceAction: ['block'],
handle: ['asset'],
Expand All @@ -213,7 +230,8 @@ const entityInterDependencies: Partial<Record<EntityName, EntityName[]>> = {
poolRetirement: ['block'],
stakeKeyRegistration: ['block'],
stakePool: ['block', 'poolRegistration', 'poolRetirement'],
tokens: ['asset']
tokens: ['asset'],
transaction: ['block', 'credential']
};

export const getEntities = (entityNames: EntityName[]): Entity[] => {
Expand Down Expand Up @@ -245,12 +263,14 @@ const mapperInterDependencies: Partial<Record<MapperName, MapperName[]>> = {
withHandles: ['withMint', 'filterMint', 'withUtxo', 'filterUtxo', 'withCIP67'],
withNftMetadata: ['withCIP67', 'withMint', 'filterMint'],
withStakeKeyRegistrations: ['withCertificates'],
withStakePools: ['withCertificates']
withStakePools: ['withCertificates'],
withValidByronAddresses: ['withUtxo']
};

const storeMapperDependencies: Partial<Record<StoreName, MapperName[]>> = {
storeAddresses: ['withAddresses'],
storeAssets: ['withMint'],
storeCredentials: ['withAddresses', 'withCertificates', 'withUtxo', 'withValidByronAddresses'],
storeGovernanceAction: ['withGovernanceActions'],
storeHandleMetadata: ['withHandleMetadata'],
storeHandles: ['withHandles'],
Expand All @@ -272,7 +292,8 @@ const storeInterDependencies: Partial<Record<StoreName, StoreName[]>> = {
storeStakePoolMetadataJob: ['storeBlock'],
storeStakePoolRewardsJob: ['storeBlock'],
storeStakePools: ['storeBlock'],
storeUtxo: ['storeBlock', 'storeAssets']
storeTransactions: ['storeCredentials', 'storeBlock', 'storeUtxo'],
storeUtxo: ['storeBlock']
};

const projectionStoreDependencies: Record<ProjectionName, StoreName[]> = {
Expand All @@ -286,7 +307,8 @@ const projectionStoreDependencies: Record<ProjectionName, StoreName[]> = {
'stake-pool-metadata-job': ['storeStakePoolMetadataJob'],
'stake-pool-metrics-job': ['storePoolMetricsUpdateJob'],
'stake-pool-rewards-job': ['storeStakePoolRewardsJob'],
utxo: ['storeUtxo']
transactions: ['storeCredentials', 'storeTransactions'],
utxo: ['storeUtxo', 'storeAssets']
};

const registerMapper = (
Expand Down
3 changes: 1 addition & 2 deletions packages/core/src/Cardano/Address/BaseAddress.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
/* eslint-disable no-bitwise */
import { Address, AddressProps, AddressType, Credential, CredentialType } from './Address';
import { Hash28ByteBase16 } from '@cardano-sdk/crypto';
import { InvalidArgumentError } from '@cardano-sdk/util';
import { NetworkId } from '../ChainId';

/**
Expand Down Expand Up @@ -121,7 +120,7 @@ export class BaseAddress {
* @param data The serialized address data.
*/
static unpackParts(type: number, data: Uint8Array): Address {
if (data.length !== 57) throw new InvalidArgumentError('data', 'Base address data length should be 57 bytes long.');
// if (data.length !== 57) throw new InvalidArgumentError('data', 'Base address data length should be 57 bytes long.');

const network = data[0] & 0b0000_1111;
const paymentCredential = Hash28ByteBase16(Buffer.from(data.slice(1, 29)).toString('hex'));
Expand Down
3 changes: 3 additions & 0 deletions packages/core/src/Cardano/types/Block.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ VrfVkBech32.fromHex = (value: string) => {
return VrfVkBech32(BaseEncoding.bech32.encode('vrf_vk', words, 1023));
};

export type BlockType = 'bft' | 'praos';

/** Minimal Block type meant as a base for the more complete version `Block` */
// TODO: optionals (except previousBlock) are there because they are not calculated for Byron yet.
// Remove them once calculation is done and remove the Required<BlockMinimal> from interface Block
Expand All @@ -89,6 +91,7 @@ export interface BlockInfo {
/** Byron blocks size not calculated yet */
size?: BlockSize;
previousBlock?: BlockId;
type: BlockType;
vrf?: VrfVkBech32;
/**
* This is the operational cold verification key of the stake pool
Expand Down
2 changes: 2 additions & 0 deletions packages/core/src/Cardano/types/Transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { PlutusData } from './PlutusData';
import { ProposalProcedure, VotingProcedures } from './Governance';
import { RewardAccount } from '../Address';
import { Script } from './Script';
import { Serialization } from '../..';

/** transaction hash as hex string */
export type TransactionId = OpaqueString<'TransactionId'>;
Expand Down Expand Up @@ -151,6 +152,7 @@ export interface OnChainTx<TBody extends TxBody = TxBody>
extends Omit<TxWithInputSource<TBody>, 'witness' | 'auxiliaryData'> {
witness: Omit<Witness, 'scripts'>;
auxiliaryData?: Omit<AuxiliaryData, 'scripts'>;
cbor?: Serialization.TxCBOR;
}

export interface HydratedTx extends TxWithInputSource<HydratedTxBody> {
Expand Down
14 changes: 10 additions & 4 deletions packages/core/src/Serialization/Update/ProtocolParamUpdate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -321,13 +321,19 @@ export class ProtocolParamUpdate {
case 12n:
params.#d = UnitInterval.fromCbor(HexBlob.fromBytes(reader.readEncodedValue()));
break;
case 13n:
case 13n: {
// entropy is encoded as an array of two elements, where the second elements is the entropy value
reader.readStartArray();
const size = reader.readStartArray();
reader.readEncodedValue();
params.#extraEntropy = HexBlob.fromBytes(reader.readByteString());
reader.readEndArray();

if (size === 1) {
reader.readEndArray();
} else {
params.#extraEntropy = HexBlob.fromBytes(reader.readByteString());
reader.readEndArray();
}
break;
}
case 14n:
params.#protocolVersion = ProtocolVersion.fromCbor(HexBlob.fromBytes(reader.readEncodedValue()));
break;
Expand Down
2 changes: 2 additions & 0 deletions packages/ogmios/src/ogmiosToCore/block.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ const mapByronBlock = (block: Schema.BlockBFT): Cardano.Block => ({
size: mapBlockSize(block),
totalOutput: mapTotalOutputs(block),
txCount: mapTxCount(block),
type: block.type,
vrf: undefined // no vrf key for byron. DbSync doesn't have one either
});

Expand All @@ -86,6 +87,7 @@ const mapCommonBlock = (block: CommonBlock): Cardano.Block => ({
size: mapBlockSize(block),
totalOutput: mapTotalOutputs(block),
txCount: mapTxCount(block),
type: block.type,
vrf: mapCommonVrf(block)
});

Expand Down
Loading
Loading