Skip to content

Commit

Permalink
Merge pull request #10 from 0xPolygon/add-schema
Browse files Browse the repository at this point in the history
quick node block getter changes
  • Loading branch information
nitinmittal23 committed Sep 27, 2023
2 parents fc1dd20 + 6d32434 commit f67719f
Show file tree
Hide file tree
Showing 9 changed files with 88 additions and 29 deletions.
32 changes: 32 additions & 0 deletions .github/workflows/security-build.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
name: Security Build
on:
push:
branches:
- main # or the name of your main branch
workflow_dispatch: {}

jobs:
sonarqube:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
with:
# Disabling shallow clone is recommended for improving relevancy of reporting.
fetch-depth: 0

# Triggering SonarQube analysis as results of it are required by Quality Gate check.
- name: SonarQube Scan
uses: sonarsource/sonarqube-scan-action@master
env:
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
SONAR_HOST_URL: ${{ secrets.SONAR_HOST_URL }}

# Check the Quality Gate status.
- name: SonarQube Quality Gate check
id: sonarqube-quality-gate-check
uses: sonarsource/sonarqube-quality-gate-action@master
# Force to fail step after specific time.
timeout-minutes: 5
env:
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
SONAR_HOST_URL: ${{ secrets.SONAR_HOST_URL }}
37 changes: 15 additions & 22 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ Chain Indexer Framework block producers encompass three distinct types of produc
"bootstrap.servers": '<KAFKA_CONNECTION_URL>',
"security.protocol": "plaintext",
blockSubscriptionTimeout: 120000,
type: 'blocks:poller'.
type: 'blocks:poller',
{
error: (error: KafkaError | BlockProducerError) => {},
closed: () => {} // On broker connection closed
Expand Down Expand Up @@ -215,7 +215,7 @@ Chain Indexer Framework block producers encompass three distinct types of produc
maxRetries: '<MAX_RETRIES>',
mongoUrl: '<MONGO_DB_URL>',
"bootstrap.servers": '<KAFKA_CONNECTION_URL>',
"security.protocol": "plaintext",
"security.protocol": "plaintext"
})
producer.on("blockProducer.fatalError", (error) => {
Expand Down Expand Up @@ -275,7 +275,7 @@ const producer = new SynchronousProducer(
coder: {
fileName: "matic_transfer",
packageName: "matictransferpackage",
messageType: "MaticTransferBlock",
messageType: "MaticTransferBlock"
}
}
);
Expand Down Expand Up @@ -306,11 +306,11 @@ const producer = produce<SynchronousProducer>(
packageName: "matictransferpackage",
messageType: "MaticTransferBlock",
},
type: "synchronous" // use 'synchronous'. if synchronous producer is needed,
type: "synchronous", // use 'synchronous'. if synchronous producer is needed
{
emitter: () => {
this.produceEvent("<key: string>", "<message: object>");
}
},
error: (error: KafkaError | BlockProducerError) => {},
closed: () => {} // On broker connection closed
}
Expand All @@ -336,14 +336,7 @@ const producer = new AsynchronousProducer(
coder: {
fileName: "matic_transfer",
packageName: "matictransferpackage",
messageType: "MaticTransferBlock",
},
{
emitter: () => {
this.produceEvent("<key: string>", "<message: object>");
}
error: (error: KafkaError | BlockProducerError) => {},
closed: () => {} // On broker connection closed
messageType: "MaticTransferBlock"
}
}
);
Expand Down Expand Up @@ -372,13 +365,13 @@ const producer = produce<AsynchronousProducer>(
coder: {
fileName: "matic_transfer",
packageName: "matictransferpackage",
messageType: "MaticTransferBlock",
messageType: "MaticTransferBlock"
},
type: "asynchronous"
type: "asynchronous",
{
emitter: () => {
this.produceEvent("<key: string>", "<message: object>");
}
},
error: (error: KafkaError | BlockProducerError) => {},
closed: () => {} // On broker connection closed
}
Expand Down Expand Up @@ -484,7 +477,7 @@ const consumer = new SynchronousConsumer(
coders: {
fileName: "block",
packageName: "blockpackage",
messageType: "Block",
messageType: "Block"
}
}
);
Expand Down Expand Up @@ -512,7 +505,7 @@ consume(
"coderConfig": {
fileName: "block",
packageName: "blockpackage",
messageType: "Block",
messageType: "Block"
},
type: 'synchronous'
},
Expand Down Expand Up @@ -545,7 +538,7 @@ const consumer = new AsynchronousConsumer(
coders: {
fileName: "block",
packageName: "blockpackage",
messageType: "Block",
messageType: "Block"
}
}
);
Expand Down Expand Up @@ -651,7 +644,7 @@ const consumerConfig = {
packageName: "blockpackage",
messageType: "Block",
},
type: "synchronous",
type: "synchronous"
};

// Start consuming messages from the Kafka topic.
Expand All @@ -670,7 +663,7 @@ consume(consumerConfig, {
closed: () => {
Logger.info(`Subscription is ended.`);
throw new Error("Consumer stopped.");
},
}
});


Expand Down Expand Up @@ -740,7 +733,7 @@ const loggerConfig = {
},
winston: {
// Any additional Winston configuration options can be provided here
},
}
};

// Create the singleton logger instance with the specified configuration
Expand Down
11 changes: 9 additions & 2 deletions internal/kafka/producer/synchronous_producer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ export class SynchronousProducer extends AbstractProducer {
timestamp?: number
): Promise<DeliveryReport | KafkaError> {
return new Promise(async (resolve, reject) => {
let deliveryListener: (error: LibrdKafkaError, report: DeliveryReport) => void = () => { };
// @ts-ignore
let deliveryListener: (error: LibrdKafkaError, report: DeliveryReport) => void;

try {
const identifier = { time: Date.now() };
const timer = setTimeout(() => {
Expand Down Expand Up @@ -88,7 +90,9 @@ export class SynchronousProducer extends AbstractProducer {
return;
}
};

this.on("delivery-report", deliveryListener);

await this.sendToInternalProducer(
key,
message,
Expand All @@ -102,7 +106,10 @@ export class SynchronousProducer extends AbstractProducer {
this.poll();
}, 100);
} catch (error) {
this.removeListener("delivery-report", deliveryListener);
// @ts-ignore
if (deliveryListener) {
this.removeListener("delivery-report", deliveryListener);
}

throw KafkaError.createUnknown(error);
}
Expand Down
5 changes: 3 additions & 2 deletions public/block_producers/quicknode_block_producer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { IProducedBlock, ProducedBlocksModel, IProducedBlocksModel } from "@inte
import { BlockSubscription } from "@internal/block_subscription/block_subscription.js";
import { IBlockProducerConfig } from "@internal/interfaces/block_producer_config.js";
import { IProducerConfig } from "@internal/interfaces/producer_config.js";
import { BlockGetter } from "@internal/block_getters/block_getter.js";
import { QuickNodeBlockGetter } from "@internal/block_getters/quicknode_block_getter.js";
import { Coder } from "@internal/coder/protobuf_coder.js";
import { Database } from "@internal/mongo/database.js";
import Eth from "web3-eth";
Expand Down Expand Up @@ -40,6 +40,7 @@ export class QuickNodeBlockProducer extends BlockProducer {
delete config.mongoUrl;
delete config.maxReOrgDepth;
delete config.maxRetries;
delete config.blockDelay;
delete config.blockSubscriptionTimeout;
delete config.blockDelay;
delete config.alternateEndpoint;
Expand Down Expand Up @@ -79,7 +80,7 @@ export class QuickNodeBlockProducer extends BlockProducer {
alternateEndpoint,
rpcTimeout
),
new BlockGetter(eth, maxRetries),
new QuickNodeBlockGetter(eth, maxRetries),
database,
database.model<IProducedBlock, IProducedBlocksModel<IProducedBlock>>(
"ProducedBlocks",
Expand Down
1 change: 0 additions & 1 deletion public/kafka/consumer/asynchronous_consumer.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { AsynchronousConsumer as InternalAsynchronousConsumer } from "@internal/kafka/consumer/asynchronous_consumer.js";
import { IKafkaCoderConfig } from "@internal/interfaces/kafka_coder_config.js";
import { IConsumerConfig } from "@internal/interfaces/consumer_config.js";
import { Coder } from "@internal/coder/protobuf_coder.js";
import { ICoderConfig } from "@internal/interfaces/coder_config.js";
Expand Down
1 change: 0 additions & 1 deletion public/kafka/consumer/synchronous_consumer.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { SynchronousConsumer as InternalSynchronousConsumer } from "@internal/kafka/consumer/synchronous_consumer.js";
import { IKafkaCoderConfig } from "@internal/interfaces/kafka_coder_config.js";
import { IConsumerConfig } from "@internal/interfaces/consumer_config.js";
import { Coder } from "@internal/coder/protobuf_coder.js";
import { ICoderConfig } from "@internal/interfaces/coder_config.js";
Expand Down
13 changes: 13 additions & 0 deletions schemas/global_exit_root.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package globalexitrootpackage;
syntax = "proto3";

message ExitRoots {
string mainnet = 1;
string rollUp = 2;
}

message GlobalExitRoot {
uint64 blockNumber = 1;
uint64 timestamp = 2;
repeated ExitRoots data = 3;
}
1 change: 1 addition & 0 deletions sonar-project.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
sonar.projectKey=maticnetwork_chainflow_AYn0jzKkuJjRFCFKZwPS
16 changes: 15 additions & 1 deletion tests/block_subscription/abstract_block_subscription.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ describe("Abstract Block Subscription", () => {
subscriber.unsubscribe();
});

test("Must call get block to determine if backfilling is required", async () => {
test("Must call get block(finalized) to determine if backfilling is required", async () => {
mockedEthObject.getBlock.mockResolvedValueOnce({ number: 0 } as BlockTransactionObject);
expect(
await subscriber.subscribe(observer, 0)
Expand All @@ -71,6 +71,20 @@ describe("Abstract Block Subscription", () => {
expect(mockedEthObject.getBlock).toBeCalledWith("finalized");
});

test("Must call get block(latest) to determine if backfilling is required and if block delay is greater than 0", async () => {
subscriber = new BlockSubscription(
mockedEthObject,
60000,
256
);
mockedEthObject.getBlock.mockResolvedValueOnce({ number: 0 } as BlockTransactionObject);
expect(
await subscriber.subscribe(observer, 0)
).toEqual(undefined);

expect(mockedEthObject.getBlock).toBeCalledWith("latest");
});

test("If the difference between last block and finalized block is more than 50, log subscription must not be called but backfill", async () => {
mockedEthObject.getBlock.mockResolvedValueOnce({ number: 51 } as BlockTransactionObject);
expect(
Expand Down

0 comments on commit f67719f

Please sign in to comment.