From beb1a9041f301e6ac1280a7355b0820fdc227f7d Mon Sep 17 00:00:00 2001 From: Apekshit Sharma Date: Thu, 7 May 2020 01:07:39 -0700 Subject: [PATCH] Deduplication Job Deduplication job will run as a Spring application. It'll be triggered every 5 min (configurable) and will do the following: - Get state of last run - Check if there is new data to be deduplicated - Check if there are duplicates in new data - Deduplicated - Save new state Changes: - Adds metrics to track deduplication job - Adds new column 'dedupe' to transactions table - Adds integration test - Adds create-tables.sh script to create all tables needed by hedera-etl Signed-off-by: Apekshit Sharma --- hedera-dedupe-bigquery/pom.xml | 159 ++++++++++++++ .../com/hedera/dedupe/BigQueryHelper.java | 56 +++++ .../com/hedera/dedupe/DedupeApplication.java | 33 +++ .../hedera/dedupe/DedupeConfiguration.java | 81 +++++++ .../java/com/hedera/dedupe/DedupeMetrics.java | 96 +++++++++ .../com/hedera/dedupe/DedupeProperties.java | 46 ++++ .../java/com/hedera/dedupe/DedupeRunner.java | 200 ++++++++++++++++++ .../java/com/hedera/dedupe/DedupeState.java | 82 +++++++ .../main/java/com/hedera/dedupe/Utility.java | 32 +++ .../src/main/resources/application.yml | 8 + .../src/main/resources/banner.txt | 12 ++ .../src/main/resources/log4j2.xml | 13 ++ .../hedera/dedupe/DedupeIntegrationTest.java | 131 ++++++++++++ .../test/resources/application-default.yml | 15 ++ hedera-dedupe-bigquery/state-schema.json | 13 ++ hedera-etl-dataflow/pom.xml | 3 +- .../src/main/resources/schema.json | 5 + .../etl/TransactionJsonToTableRowTest.java | 6 +- pom.xml | 73 ++++--- scripts/create-tables.sh | 48 +++++ 20 files changed, 1075 insertions(+), 37 deletions(-) create mode 100644 hedera-dedupe-bigquery/pom.xml create mode 100644 hedera-dedupe-bigquery/src/main/java/com/hedera/dedupe/BigQueryHelper.java create mode 100644 hedera-dedupe-bigquery/src/main/java/com/hedera/dedupe/DedupeApplication.java create mode 100644 hedera-dedupe-bigquery/src/main/java/com/hedera/dedupe/DedupeConfiguration.java create mode 100644 hedera-dedupe-bigquery/src/main/java/com/hedera/dedupe/DedupeMetrics.java create mode 100644 hedera-dedupe-bigquery/src/main/java/com/hedera/dedupe/DedupeProperties.java create mode 100644 hedera-dedupe-bigquery/src/main/java/com/hedera/dedupe/DedupeRunner.java create mode 100644 hedera-dedupe-bigquery/src/main/java/com/hedera/dedupe/DedupeState.java create mode 100644 hedera-dedupe-bigquery/src/main/java/com/hedera/dedupe/Utility.java create mode 100644 hedera-dedupe-bigquery/src/main/resources/application.yml create mode 100644 hedera-dedupe-bigquery/src/main/resources/banner.txt create mode 100644 hedera-dedupe-bigquery/src/main/resources/log4j2.xml create mode 100644 hedera-dedupe-bigquery/src/test/java/com/hedera/dedupe/DedupeIntegrationTest.java create mode 100644 hedera-dedupe-bigquery/src/test/resources/application-default.yml create mode 100644 hedera-dedupe-bigquery/state-schema.json create mode 100755 scripts/create-tables.sh diff --git a/hedera-dedupe-bigquery/pom.xml b/hedera-dedupe-bigquery/pom.xml new file mode 100644 index 0000000..5cd0956 --- /dev/null +++ b/hedera-dedupe-bigquery/pom.xml @@ -0,0 +1,159 @@ + + + + hedera-bigquery-deduplication + Deduplicates rows in BigQuery table + 4.0.0 + Hedera BigQuery Deduplication + jar + + + com.hedera + hedera-etl + 0.0.1 + + + + 1.18.12 + 1.5.0 + + + + + io.micrometer + micrometer-registry-stackdriver + ${micrometer.version} + + + org.hibernate.validator + hibernate-validator + + + org.hibernate.validator + hibernate-validator-annotation-processor + + + org.projectlombok + lombok + ${lombok.version} + compile + + + org.springframework.boot + spring-boot-configuration-processor + + + org.springframework.boot + spring-boot-starter-logging + + + + + org.springframework.boot + spring-boot-starter-actuator + + + org.springframework.boot + spring-boot-starter-logging + + + + + org.springframework.boot + spring-boot-starter-log4j2 + + + org.springframework.boot + spring-boot-starter-test + test + + + org.springframework.cloud + spring-cloud-gcp-starter-bigquery + + + + + + + org.springframework.cloud + spring-cloud-gcp-dependencies + 1.2.2.RELEASE + pom + import + + + + + + + dev + + true + + + + + org.apache.maven.plugins + maven-surefire-plugin + + + default-test + + gcpBigquery + + + + + + + + + gcpBigquery + + + + org.apache.maven.plugins + maven-surefire-plugin + + + default-test + + gcpBigquery + + + + + + + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + build-info + + + + repackage + + repackage + + + exec + + + + + + + + + diff --git a/hedera-dedupe-bigquery/src/main/java/com/hedera/dedupe/BigQueryHelper.java b/hedera-dedupe-bigquery/src/main/java/com/hedera/dedupe/BigQueryHelper.java new file mode 100644 index 0000000..3413f16 --- /dev/null +++ b/hedera-dedupe-bigquery/src/main/java/com/hedera/dedupe/BigQueryHelper.java @@ -0,0 +1,56 @@ +package com.hedera.dedupe; + +/*- + * ‌ + * Hedera ETL + * ​ + * Copyright (C) 2020 Hedera Hashgraph, LLC + * ​ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ‍ + */ + +import com.google.cloud.bigquery.*; +import java.time.Instant; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.extern.log4j.Log4j2; +import org.springframework.stereotype.Component; + +@Log4j2 +@Component +@RequiredArgsConstructor +public class BigQueryHelper { + @Getter + private final BigQuery bigQuery; + private final DedupeProperties properties; + private final DedupeMetrics dedupeMetrics; + + public TableResult runQuery(String query, String jobName) throws InterruptedException { + // timestamp is just for uniqueness + JobId jobId = JobId.of(properties.getProjectId(), "dedupe_" + jobName + "_" + Instant.now().getEpochSecond()); + log.info("### Starting job {}", jobId.getJob()); + log.info("Query: {}", query); + TableResult tableResult = bigQuery.query(QueryJobConfiguration.newBuilder(query).build(), jobId); + Job job = bigQuery.getJob(jobId); + dedupeMetrics.recordMetrics(jobName, job.getStatistics()); + return tableResult; + } + + public void ensureTableExists(String dataset, String tableName) { + Table table = bigQuery.getTable(dataset, tableName); + if (table == null) { + throw new IllegalArgumentException("Table does not exist : " + dataset + "." + tableName); + } + } +} diff --git a/hedera-dedupe-bigquery/src/main/java/com/hedera/dedupe/DedupeApplication.java b/hedera-dedupe-bigquery/src/main/java/com/hedera/dedupe/DedupeApplication.java new file mode 100644 index 0000000..5ac0648 --- /dev/null +++ b/hedera-dedupe-bigquery/src/main/java/com/hedera/dedupe/DedupeApplication.java @@ -0,0 +1,33 @@ +package com.hedera.dedupe; + +/*- + * ‌ + * Hedera ETL + * ​ + * Copyright (C) 2020 Hedera Hashgraph, LLC + * ​ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ‍ + */ + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.context.properties.ConfigurationPropertiesScan; + +@ConfigurationPropertiesScan +@SpringBootApplication +public class DedupeApplication { + public static void main(String[] args) { + SpringApplication.run(DedupeApplication.class, args); + } +} diff --git a/hedera-dedupe-bigquery/src/main/java/com/hedera/dedupe/DedupeConfiguration.java b/hedera-dedupe-bigquery/src/main/java/com/hedera/dedupe/DedupeConfiguration.java new file mode 100644 index 0000000..5e851a4 --- /dev/null +++ b/hedera-dedupe-bigquery/src/main/java/com/hedera/dedupe/DedupeConfiguration.java @@ -0,0 +1,81 @@ +package com.hedera.dedupe; + +/*- + * ‌ + * Hedera ETL + * ​ + * Copyright (C) 2020 Hedera Hashgraph, LLC + * ​ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ‍ + */ + +import com.google.api.gax.core.CredentialsProvider; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.stackdriver.StackdriverConfig; +import io.micrometer.stackdriver.StackdriverMeterRegistry; +import lombok.RequiredArgsConstructor; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.cloud.gcp.core.GcpProjectIdProvider; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.annotation.EnableAsync; +import org.springframework.scheduling.annotation.EnableScheduling; + +@EnableAsync +@Configuration +@RequiredArgsConstructor +public class DedupeConfiguration { + + private final GcpProjectIdProvider projectIdProvider; + private final CredentialsProvider credentialsProvider; + private final DedupeProperties properties; + + @Bean + StackdriverConfig stackdriverConfig() { + return new StackdriverConfig() { + @Override + public String projectId() { + return projectIdProvider.getProjectId(); + } + + // Setting "management.metrics.export.stackdriver: false" is not working + @Override + public boolean enabled() { + return properties.isMetricsEnabled(); + } + + @Override + public String get(String key) { + return null; + } + + @Override + public CredentialsProvider credentials() { + return credentialsProvider; + } + }; + } + + @Bean + MeterRegistry stackdriverMeterRegistry(StackdriverConfig stackdriverConfig) { + return StackdriverMeterRegistry.builder(stackdriverConfig).build(); + } + + // Scheduling is disabled for testing + @Configuration + @EnableScheduling + @ConditionalOnProperty(prefix = "hedera.dedupe.scheduling", name = "enabled", matchIfMissing = true) + protected static class SchedulingConfiguration { + } +} diff --git a/hedera-dedupe-bigquery/src/main/java/com/hedera/dedupe/DedupeMetrics.java b/hedera-dedupe-bigquery/src/main/java/com/hedera/dedupe/DedupeMetrics.java new file mode 100644 index 0000000..e425a6a --- /dev/null +++ b/hedera-dedupe-bigquery/src/main/java/com/hedera/dedupe/DedupeMetrics.java @@ -0,0 +1,96 @@ +package com.hedera.dedupe; + +import com.google.cloud.bigquery.JobStatistics; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.Gauge; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Tag; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; +import lombok.Getter; +import lombok.Value; +import lombok.extern.log4j.Log4j2; +import org.springframework.stereotype.Component; + +@Log4j2 +@Getter +@Component +class DedupeMetrics { + private final MeterRegistry meterRegistry; + private final Map jobMetrics; + private final AtomicLong startTimestampGauge; + private final AtomicLong endTimestampGauge; + private final AtomicLong delayGauge; + private final AtomicLong runtimeGauge; + private final Counter duplicatesCounter; + private final Counter invocationsCounter; + private final Counter failuresCounter; + + public DedupeMetrics(MeterRegistry meterRegistry) { + this.meterRegistry = meterRegistry; + this.jobMetrics = new HashMap<>(); + this.startTimestampGauge = new AtomicLong(0L); + Gauge.builder("dedupe.start.timestamp", startTimestampGauge, AtomicLong::get) + .description("Start of dedupe window. Last endTimestamp + 1") + .baseUnit("ns") + .register(meterRegistry); + this.endTimestampGauge = new AtomicLong(0L); + Gauge.builder("dedupe.end.timestamp", endTimestampGauge, AtomicLong::get) + .description("consensusTimestamp of last row in dedupe window.") + .baseUnit("ns") + .register(meterRegistry); + this.runtimeGauge = new AtomicLong(0L); + Gauge.builder("dedupe.runtime", runtimeGauge, AtomicLong::get) + .description("Total time taken by single dedupe run") + .baseUnit("sec") + .register(meterRegistry); + this.delayGauge = new AtomicLong(0L); + Gauge.builder("dedupe.delay", delayGauge, AtomicLong::get) + .description("Delay in deduplication (now - startTimestamp)") + .baseUnit("sec") + .register(meterRegistry); + this.duplicatesCounter = Counter.builder("dedupe.duplicates.count") + .description("Count of duplicates found") + .register(meterRegistry); + this.invocationsCounter = Counter.builder("dedupe.invocations").register(meterRegistry); + this.failuresCounter = Counter.builder("dedupe.failures").register(meterRegistry); + } + + public void recordMetrics(String jobName, JobStatistics.QueryStatistics queryStatistics) { + long runTime = queryStatistics.getEndTime() - queryStatistics.getStartTime(); + Long affectedRows = queryStatistics.getNumDmlAffectedRows(); + log.info("Job stats: runtime = {}ms, affected rows = {}", runTime, affectedRows); + jobMetrics.computeIfAbsent(jobName, k -> new JobMetrics(meterRegistry, k)); + var metrics = jobMetrics.get(jobName); + metrics.getRuntimeGauge().set(runTime); + if (affectedRows != null) { + metrics.getAffectedRowsGauge().set(affectedRows); + } + } + + // Individuals jobs (queries) may not run, so reset their metrics' to 0. + // All other metrics are always set on each invocation, so no need to reset them. + public void resetJobMetrics() { + jobMetrics.forEach((key, value) -> value.reset()); + } + + @Value + static class JobMetrics { + private final AtomicLong runtimeGauge; + private final AtomicLong affectedRowsGauge; + + JobMetrics(MeterRegistry meterRegistry, String jobName) { + Collection tags = List.of(Tag.of("name", jobName)); + runtimeGauge = meterRegistry.gauge("dedupe.job.runtime", tags, new AtomicLong(0L)); + affectedRowsGauge = meterRegistry.gauge("dedupe.job.rows", tags, new AtomicLong(0L)); + } + + void reset() { + runtimeGauge.set(0L); + affectedRowsGauge.set(0L); + } + } +} diff --git a/hedera-dedupe-bigquery/src/main/java/com/hedera/dedupe/DedupeProperties.java b/hedera-dedupe-bigquery/src/main/java/com/hedera/dedupe/DedupeProperties.java new file mode 100644 index 0000000..5c4977f --- /dev/null +++ b/hedera-dedupe-bigquery/src/main/java/com/hedera/dedupe/DedupeProperties.java @@ -0,0 +1,46 @@ +package com.hedera.dedupe; + +/*- + * ‌ + * Hedera ETL + * ​ + * Copyright (C) 2020 Hedera Hashgraph, LLC + * ​ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ‍ + */ + +import javax.validation.constraints.NotBlank; +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.validation.annotation.Validated; + +@Data +@Validated +@ConfigurationProperties("hedera.dedupe") +public class DedupeProperties { + + @NotBlank + private String projectId; + + @NotBlank + private String datasetName; + + // Can be blank if initStateTable is true + private String tableName; + + @NotBlank + private String stateTableName; + + private boolean metricsEnabled = false; +} diff --git a/hedera-dedupe-bigquery/src/main/java/com/hedera/dedupe/DedupeRunner.java b/hedera-dedupe-bigquery/src/main/java/com/hedera/dedupe/DedupeRunner.java new file mode 100644 index 0000000..4e6c272 --- /dev/null +++ b/hedera-dedupe-bigquery/src/main/java/com/hedera/dedupe/DedupeRunner.java @@ -0,0 +1,200 @@ +package com.hedera.dedupe; + +/*- + * ‌ + * Hedera ETL + * ​ + * Copyright (C) 2020 Hedera Hashgraph, LLC + * ​ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ‍ + */ + +import static com.hedera.dedupe.DedupeState.STATE_NAME_LAST_VALID_TIMESTAMP; +import static com.hedera.dedupe.Utility.toBigQueryTimestamp; + +import java.time.Duration; +import java.time.Instant; +import lombok.extern.log4j.Log4j2; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +/** + * Dedupe runner periodically executes BigQuery job to deduplicate rows. + * State across runs is maintained using a separate BigQuery table. + * If a run exceeds the configured 'fixedRate', next run will be queued and started only after previous one finishes. + * + * Deduplication algorithm: + * (consensusTimestamp is unique for each transaction) + * 1. Get state: startTimestamp = lastValidTimestamp + 1 (from stateTable). 0 if not set. + * 2. Flag rows not in the streaming buffer: UPDATE table SET dedupe = 1 WHERE ...; + * 3. Get endTimestamp: SELECT MAX(consensusTimestamp) FROM table WHERE dedupe IS NOT NULL .... + * 4. Check for duplicates in [startTimestamp , endTimestamp] window + * 5. Remove duplicates (if present) + * 6. Save state: Set lastValidTimestamp in state table to endTimestamp + */ +@Log4j2 +@Component +public class DedupeRunner { + private static final String JOB_NAME_FLAG_ROW = "flag_dml_accessible_rows"; + private static final String JOB_NAME_GET_END_TIMESTAMP = "get_end_timestamp"; + private static final String JOB_NAME_GET_DUPLICATES = "get_duplicates"; + private static final String JOB_NAME_REMOVE_DUPLICATES = "remove_duplicates"; + + private final BigQueryHelper bigQueryHelper; + private final String transactionsTable; + private final DedupeState dedupeState; + private final DedupeMetrics metrics; + + public DedupeRunner(DedupeProperties properties, BigQueryHelper bigQueryHelper, DedupeState dedupeState, + DedupeMetrics dedupeMetrics) { + this.bigQueryHelper = bigQueryHelper; + this.dedupeState = dedupeState; + this.metrics = dedupeMetrics; + String datasetName = properties.getDatasetName(); + bigQueryHelper.ensureTableExists(datasetName, properties.getTableName()); + transactionsTable = properties.getProjectId() + "." + datasetName + "." + properties.getTableName(); + } + + @Scheduled(fixedRateString = "${hedera.dedupe.fixedRate:300000}") // default: 5 min + public void run() { + try { + Instant dedupeStart = Instant.now(); + metrics.getInvocationsCounter().increment(); + metrics.resetJobMetrics(); + runDedupe(); + metrics.getRuntimeGauge().set(Duration.between(dedupeStart, Instant.now()).getSeconds()); + } catch (Exception e) { + log.error("Failed deduplication", e); + metrics.getFailuresCounter().increment(); + } + } + + private void runDedupe() throws Exception { + var state = dedupeState.getState(); + + // 1. Get state: startTimestamp = lastValidTimestamp + 1 (from stateTable). 0 if not set. + long startTimestamp = 0; // in nanos + if (state.containsKey(STATE_NAME_LAST_VALID_TIMESTAMP)) { + startTimestamp = state.get(STATE_NAME_LAST_VALID_TIMESTAMP).getLongValue() + 1; + } + metrics.getStartTimestampGauge().set(startTimestamp); + + // 2. Flag rows not in the streaming buffer: UPDATE table SET dedupe = 1 WHERE ...; + setDedupeState(startTimestamp); + + // 3. Get endTimestamp: SELECT MAX(consensusTimestamp) FROM table WHERE dedupe IS NOT NULL .... + long endTimestamp = getEndTimestamp(startTimestamp); + if (endTimestamp == startTimestamp) { + return; + } + + // 4. Check for duplicates in [startTimestamp , endTimestamp] window + boolean hasDuplicates = hasDuplicates(startTimestamp, endTimestamp); + + // 5. Remove duplicates (if present) + if (hasDuplicates) { + removeDuplicates(startTimestamp, endTimestamp); + } + + // 6. Save state: Set lastValidTimestamp in state table to endTimestamp + dedupeState.setState(endTimestamp); + + metrics.getDelayGauge().set( + Duration.between(Instant.ofEpochSecond(0L, startTimestamp), Instant.now()).getSeconds()); + } + + private void setDedupeState(long startTimestamp) throws Exception { + String query = String.format("UPDATE %s SET dedupe = 1 WHERE %s", + transactionsTable, consensusTimestampGte(startTimestamp)); + bigQueryHelper.runQuery(query, JOB_NAME_FLAG_ROW); + } + + private long getEndTimestamp(long startTimestamp) throws Exception { + String query = String.format("SELECT MAX(consensusTimestamp) AS ts \n" + + "FROM %s \n" + + "WHERE %s AND dedupe IS NOT NULL", + transactionsTable, consensusTimestampGte(startTimestamp)); + var tableResult = bigQueryHelper.runQuery(query, JOB_NAME_GET_END_TIMESTAMP); + long endTimestamp = 0L; + for (var fvl : tableResult.iterateAll()) { + if (fvl.get("ts").getValue() == null) { + log.info("No new rows"); + return startTimestamp; + } + endTimestamp = fvl.get("ts").getLongValue(); + } + metrics.getEndTimestampGauge().set(endTimestamp); + log.info("endTimestamp = {}", endTimestamp); + return endTimestamp; + } + + // SELECT count(*) AS num, consensusTimestamp FROM table + // WHERE consensusTimestamp BETWEEN startTimestamp AND endTimestamp + // GROUP BY consensusTimestamp HAVING num > 1; + private boolean hasDuplicates(long startTimestamp, long endTimestamp) throws Exception { + String query = String.format("SELECT count(*) AS num, consensusTimestamp \n" + + " FROM %s \n" + + " WHERE %s \n" + + " GROUP BY consensusTimestamp HAVING num > 1", + transactionsTable, consensusTimestampBetween(startTimestamp, endTimestamp, "")); + var tableResult = bigQueryHelper.runQuery(query, JOB_NAME_GET_DUPLICATES); + if (tableResult.getTotalRows() == 0) { + log.info("No duplicates found"); + return false; + } + log.info("Duplicates found"); + log.info("consensusTimestamp, count"); + long numDuplicates = 0; + for (var fvl : tableResult.iterateAll()) { + long num = fvl.get("num").getLongValue(); + log.info("{}, {}", fvl.get("consensusTimestamp").getLongValue(), num); + numDuplicates += num - 1; // -1 for original + } + metrics.getDuplicatesCounter().increment(numDuplicates); + return true; + } + + private void removeDuplicates(long startTimestamp, long endTimestamp) throws Exception { + String query = String.format("MERGE INTO %s AS dest \n" + + "USING ( \n" + + " SELECT k.* \n" + + " FROM ( \n" + + " SELECT ARRAY_AGG(original_data LIMIT 1)[OFFSET(0)] k \n" + + " FROM %s AS original_data \n" + + " WHERE %s \n" + + " GROUP BY consensusTimestamp \n" + + " ) \n" + + ") AS src \n" + + "ON FALSE \n" + + "WHEN NOT MATCHED BY SOURCE AND %s -- remove all data in partition range \n" + + " THEN DELETE \n" + + "WHEN NOT MATCHED BY TARGET THEN INSERT ROW", + transactionsTable, transactionsTable, consensusTimestampBetween(startTimestamp, endTimestamp, ""), + consensusTimestampBetween(startTimestamp, endTimestamp, "dest")); + bigQueryHelper.runQuery(query, JOB_NAME_REMOVE_DUPLICATES); + } + + private String consensusTimestampGte(long timestamp) { + return String.format("(consensusTimestamp >= %d AND consensusTimestampTruncated >= '%s')", timestamp, + toBigQueryTimestamp(timestamp)); + } + + private String consensusTimestampBetween(long startTimestamp, long endTimestamp, String alias) { + alias = alias.isEmpty() ? alias : alias + "."; + return String.format( + "(%sconsensusTimestamp BETWEEN %d AND %d) AND (%sconsensusTimestampTruncated BETWEEN '%s' AND '%s')", + alias, startTimestamp, endTimestamp, + alias, toBigQueryTimestamp(startTimestamp), toBigQueryTimestamp(endTimestamp)); + } +} diff --git a/hedera-dedupe-bigquery/src/main/java/com/hedera/dedupe/DedupeState.java b/hedera-dedupe-bigquery/src/main/java/com/hedera/dedupe/DedupeState.java new file mode 100644 index 0000000..316e4e6 --- /dev/null +++ b/hedera-dedupe-bigquery/src/main/java/com/hedera/dedupe/DedupeState.java @@ -0,0 +1,82 @@ +package com.hedera.dedupe; + +/*- + * ‌ + * Hedera ETL + * ​ + * Copyright (C) 2020 Hedera Hashgraph, LLC + * ​ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ‍ + */ + +import com.google.cloud.bigquery.*; +import java.util.HashMap; +import java.util.Map; +import lombok.extern.log4j.Log4j2; +import org.springframework.stereotype.Component; + +/** + * Reads/writes deduplication job's state from/to BigQuery table. + */ +@Log4j2 +@Component +public class DedupeState { + public static final String STATE_NAME_LAST_VALID_TIMESTAMP = "lastValidTimestamp"; + public static final String JOB_NAME_SAVE_STATE = "save_state"; + + private final BigQueryHelper bigQueryHelper; + private final TableId stateTableId; + private final String stateTable; + + public DedupeState(DedupeProperties properties, BigQueryHelper bigQueryHelper) { + this.bigQueryHelper = bigQueryHelper; + String datasetName = properties.getDatasetName(); + String tableName = properties.getStateTableName(); + stateTableId = TableId.of(datasetName, tableName); + stateTable = properties.getProjectId() + "." + datasetName + "." + tableName; + bigQueryHelper.ensureTableExists(datasetName, tableName); + } + + /** + * @return state read from the given table. Map's key is state field's name and map's value is state field's value. + */ + public Map getState() { + TableResult tableResult = bigQueryHelper.getBigQuery().listTableData(stateTableId, + Schema.of(Field.of("name", LegacySQLTypeName.STRING), Field.of("value", StandardSQLTypeName.STRING))); + if (tableResult.getTotalRows() == 0) { // state not initialized + return Map.of(); + } + Map state = new HashMap<>(); + // okay to iterateAll since state will contain couple rows at max. + tableResult.iterateAll().forEach(fvl -> { + state.put(fvl.get("name").getStringValue(), fvl.get("value")); + }); + return state; + } + + public void setState(long endTimestamp) throws Exception { + // State variable may not be present in stateTable already (for UPDATE). For example, adding new state + // variables. Also, deprecating state variable would require deleting it. + // Although the query *looks* complicated, this is just atomically resetting state table to the given values. + String query = String.format("MERGE INTO %s \n" + + "USING ( \n" + + " SELECT '%s' AS name, '%d' AS value \n" + + ") \n" + + "ON FALSE \n" + + "WHEN NOT MATCHED BY SOURCE THEN DELETE \n" + + "WHEN NOT MATCHED BY TARGET THEN INSERT ROW \n", + stateTable, STATE_NAME_LAST_VALID_TIMESTAMP, endTimestamp); + bigQueryHelper.runQuery(query, JOB_NAME_SAVE_STATE); + } +} diff --git a/hedera-dedupe-bigquery/src/main/java/com/hedera/dedupe/Utility.java b/hedera-dedupe-bigquery/src/main/java/com/hedera/dedupe/Utility.java new file mode 100644 index 0000000..ed429c3 --- /dev/null +++ b/hedera-dedupe-bigquery/src/main/java/com/hedera/dedupe/Utility.java @@ -0,0 +1,32 @@ +package com.hedera.dedupe; + +/*- + * ‌ + * Hedera ETL + * ​ + * Copyright (C) 2020 Hedera Hashgraph, LLC + * ​ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ‍ + */ + +import java.time.Instant; + +public class Utility { + + static String toBigQueryTimestamp(long timestamp) { + return Instant.ofEpochSecond(0L, (timestamp / 1000) * 1000).toString(); + } + + +} diff --git a/hedera-dedupe-bigquery/src/main/resources/application.yml b/hedera-dedupe-bigquery/src/main/resources/application.yml new file mode 100644 index 0000000..0b6848d --- /dev/null +++ b/hedera-dedupe-bigquery/src/main/resources/application.yml @@ -0,0 +1,8 @@ +spring: + cloud: + gcp: + credentials: + location: file:/Users/appy/git/hedera-mirror-node/gcp_keys/appy-dev-272020-dedupe.json + bigquery: + projectId: ${hedera.dedupe.projectId} + datasetName: ${hedera.dedupe.datasetName} \ No newline at end of file diff --git a/hedera-dedupe-bigquery/src/main/resources/banner.txt b/hedera-dedupe-bigquery/src/main/resources/banner.txt new file mode 100644 index 0000000..5d779ab --- /dev/null +++ b/hedera-dedupe-bigquery/src/main/resources/banner.txt @@ -0,0 +1,12 @@ + + @@@@@@@@@@@@@ + @@@@@@@@@@@@@@@@@@@ + @@@@@ 0@@@@@@@@ @@@@@ @@@ @@@ @@@ + @@@@@@@ @@@@@@@@@ @@@@@@@ @@@ @@@ @@@ + @@@@@@@@ @@@@@@@@@ @@@@@@@@ @@@ @@@ @@@@@@ @@@@@@@@@ @@@@@@ @@@@@@@@ @@@@@@ + @@@@@@@@ @@@@@@@@ @@@@@@@@@@@@@@ @@@ @@@ @@@ @@@ @@@ @@@ @@@@@ @@@ + @@@@@@@@ @@@@@@@@@ @@@@@@@@ @@@ @@@ @@@@@@@@@ @@@ @@@ 8@@@@@@@@ @@@ @@@@@@@@ + @@@@@@@ @@@@@@@@@ @@@@@@@ @@@ @@@ @@@ @@@ @@@ @@@ @@@ @@@ @@ + @@@@@ @@@@@@@@@ @@@@@ @@@ @@@ @@@@@@@ @@@@@@@@ @@@@@@@ @@@ @@@@@@@@ + @@@@@@@@@@@@@@@@@@@ + @@@@@@@@@@@@@ BigQuery Deduplication ${application.formatted-version} diff --git a/hedera-dedupe-bigquery/src/main/resources/log4j2.xml b/hedera-dedupe-bigquery/src/main/resources/log4j2.xml new file mode 100644 index 0000000..f4f8765 --- /dev/null +++ b/hedera-dedupe-bigquery/src/main/resources/log4j2.xml @@ -0,0 +1,13 @@ + + + + + + + + + + + + + diff --git a/hedera-dedupe-bigquery/src/test/java/com/hedera/dedupe/DedupeIntegrationTest.java b/hedera-dedupe-bigquery/src/test/java/com/hedera/dedupe/DedupeIntegrationTest.java new file mode 100644 index 0000000..1b25533 --- /dev/null +++ b/hedera-dedupe-bigquery/src/test/java/com/hedera/dedupe/DedupeIntegrationTest.java @@ -0,0 +1,131 @@ +package com.hedera.dedupe; + +/*- + * ‌ + * Hedera ETL + * ​ + * Copyright (C) 2020 Hedera Hashgraph, LLC + * ​ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ‍ + */ + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import com.google.cloud.bigquery.QueryJobConfiguration; +import java.util.Random; +import javax.annotation.Resource; +import lombok.extern.log4j.Log4j2; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.springframework.boot.test.context.SpringBootTest; + +/** + * Due to lack of any fake/mock/emulator for BigQuery, this test requires GCP BigQuery. + * Setup: + * - Create transactions and state table. See documentation for more details. + * - Fill the properties in resources/application-default.yml + * + * Test is not run as part of 'mvn test'. To run the test, use following command: + * - mvn test -PgcpBigquery + */ +@Log4j2 +@SpringBootTest +@Tag("gcpBigquery") +public class DedupeIntegrationTest { + static final long NUM_ROWS = 100; + + @Resource + protected DedupeRunner dedupeRunner; + @Resource + protected BigQueryHelper bigQueryHelper; + @Resource + protected DedupeProperties properties; + @Resource + private DedupeState dedupeState; + + private String transactionsTable; + + @BeforeEach + void beforeEach() throws Exception { + transactionsTable = properties.getProjectId() + "." + properties.getDatasetName() + + "." + properties.getTableName(); + bigQueryHelper.runQuery("DELETE FROM " + transactionsTable + " WHERE 1 = 1", "reset_table"); + dedupeState.setState(0); + } + + @Test + void testDeduplication() throws Exception { + // add data + long expectedEndTimestamp = generateDuplicatedData(1, NUM_ROWS); + + // run dedupe, check num rows and state. + dedupeRunner.run(); + long actualNumRows = getNumRows(transactionsTable); + var state = dedupeState.getState(); + assertEquals(NUM_ROWS, actualNumRows); + assertEquals(expectedEndTimestamp, state.get(DedupeState.STATE_NAME_LAST_VALID_TIMESTAMP).getLongValue()); + + // add more data + expectedEndTimestamp = generateDuplicatedData(expectedEndTimestamp + 1, NUM_ROWS); + + // run dedupe, check num rows and state. + dedupeRunner.run(); + actualNumRows = getNumRows(transactionsTable); + state = dedupeState.getState(); + assertEquals(2 * NUM_ROWS, actualNumRows); + assertEquals(expectedEndTimestamp, state.get(DedupeState.STATE_NAME_LAST_VALID_TIMESTAMP).getLongValue()); + + // No new data. + // Run dedupe, check num rows and state. + dedupeRunner.run(); + actualNumRows = getNumRows(transactionsTable); + state = dedupeState.getState(); + assertEquals(2 * NUM_ROWS, actualNumRows); + assertEquals(expectedEndTimestamp, state.get(DedupeState.STATE_NAME_LAST_VALID_TIMESTAMP).getLongValue()); + } + + // Generates fake transaction rows starting with startTimestamp (in nanos). Every 5th row is duplicated. + // Returns timestamp of last row. + private long generateDuplicatedData(long startTimestamp, long numRows) throws Exception { + String query = "INSERT INTO " + transactionsTable + + " (consensusTimestampTruncated, consensusTimestamp) VALUES " + makeRow(startTimestamp); + long timestamp = startTimestamp; + Random rand = new Random(); + for (int i = 1; i < numRows; i++) { + timestamp += rand.nextInt(1_000_000_000); // add nanos + String row = makeRow(timestamp); + query += ", " + row; + if (i % 5 == 0) { // duplicate every 5th row + query += ", " + row; + } + } + log.info("Inserting duplicated data: {}", query); + bigQueryHelper.runQuery(query, "insert_data"); + return timestamp; + } + + private String makeRow(long timestamp) { + return String.format("( '%s', %d )", Utility.toBigQueryTimestamp(timestamp), timestamp); + } + + private long getNumRows(String tableName) throws Exception { + String query = "SELECT count(*) AS count FROM " + tableName; + var tableResult = bigQueryHelper.getBigQuery().query(QueryJobConfiguration.newBuilder(query).build()); + for (var row : tableResult.iterateAll()) { + return row.get("count").getLongValue(); + } + return 0; + } +} diff --git a/hedera-dedupe-bigquery/src/test/resources/application-default.yml b/hedera-dedupe-bigquery/src/test/resources/application-default.yml new file mode 100644 index 0000000..1abe9dd --- /dev/null +++ b/hedera-dedupe-bigquery/src/test/resources/application-default.yml @@ -0,0 +1,15 @@ +hedera: + dedupe: + projectId: + datasetName: + tableName: + stateTableName: + metricsEnabled: false + scheduling: + enabled: false # Dedupe runs are manually invoked in tests + +spring: + cloud: + gcp: + credentials: + location: diff --git a/hedera-dedupe-bigquery/state-schema.json b/hedera-dedupe-bigquery/state-schema.json new file mode 100644 index 0000000..8965ab7 --- /dev/null +++ b/hedera-dedupe-bigquery/state-schema.json @@ -0,0 +1,13 @@ +[ + { + "name": "name", + "type": "STRING", + "mode": "REQUIRED", + "description": "Name of the state variable" + }, + { + "name": "value", + "type": "STRING", + "description": "Value of the state variable" + } +] \ No newline at end of file diff --git a/hedera-etl-dataflow/pom.xml b/hedera-etl-dataflow/pom.xml index 02760ad..fbd86e2 100644 --- a/hedera-etl-dataflow/pom.xml +++ b/hedera-etl-dataflow/pom.xml @@ -20,7 +20,6 @@ 2.20.0 v2-rev20181104-1.27.0 1.6.0 - RELEASE 2.13.1 1.18.12 v1-rev20181105-1.27.0 @@ -87,7 +86,7 @@ org.junit.jupiter junit-jupiter - ${junit.version} + ${junit-jupiter.version} test diff --git a/hedera-etl-dataflow/src/main/resources/schema.json b/hedera-etl-dataflow/src/main/resources/schema.json index a7ac09b..1ef0a11 100644 --- a/hedera-etl-dataflow/src/main/resources/schema.json +++ b/hedera-etl-dataflow/src/main/resources/schema.json @@ -393,5 +393,10 @@ "type": "INTEGER" } ] + }, + { + "name": "dedupe", + "type": "INTEGER", + "description": "Used internally for deduplication of rows." } ] diff --git a/hedera-etl-dataflow/src/test/java/com/hedera/etl/TransactionJsonToTableRowTest.java b/hedera-etl-dataflow/src/test/java/com/hedera/etl/TransactionJsonToTableRowTest.java index 7291e8f..f3f7704 100644 --- a/hedera-etl-dataflow/src/test/java/com/hedera/etl/TransactionJsonToTableRowTest.java +++ b/hedera-etl-dataflow/src/test/java/com/hedera/etl/TransactionJsonToTableRowTest.java @@ -28,13 +28,13 @@ import java.nio.file.Path; import java.util.List; import java.util.stream.Collectors; -import org.junit.Test; +import org.junit.jupiter.api.Test; public class TransactionJsonToTableRowTest { private final TransactionJsonToTableRow converter = new TransactionJsonToTableRow(); @Test - public void testConversion() throws Exception { + void testConversion() throws Exception { // Given List jsonTransactions = readFileLines("data/TransactionJsonToTableRowTest/transactions.txt"); List expected = readFileLines("data/TransactionJsonToTableRowTest/expectedTableRows.txt"); @@ -51,7 +51,7 @@ public void testConversion() throws Exception { @Test - public void testThrowsExceptionForBadJson() throws Exception { + void testThrowsExceptionForBadJson() throws Exception { // given String badJson = "{\"consensusTimestamp\":1570802944412586000,\"entity\":{\"shardNum\":0,"; diff --git a/pom.xml b/pom.xml index 1a7be37..6f887b9 100644 --- a/pom.xml +++ b/pom.xml @@ -32,6 +32,7 @@ hedera-etl-dataflow + hedera-dedupe-bigquery @@ -39,25 +40,54 @@ https://hedera.com + + org.springframework.boot + spring-boot-starter-parent + 2.2.7.RELEASE + + + 0.8.5 11 - 3.7.0 + 3.8.1 + + + org.apache.maven.plugins + maven-compiler-plugin + ${maven-compiler-plugin.version} + + ${java.version} + ${java.version} + + + + + org.jacoco + jacoco-maven-plugin + ${jacoco.version} + + + + prepare-agent + + + + jacoco-report + test + + report + + + + + + - - org.apache.maven.plugins - maven-compiler-plugin - ${maven-compiler-plugin.version} - - ${java.version} - ${java.version} - - - org.codehaus.mojo @@ -79,27 +109,6 @@ - - - org.jacoco - jacoco-maven-plugin - ${jacoco.version} - - - - prepare-agent - - - - jacoco-report - test - - report - - - - - diff --git a/scripts/create-tables.sh b/scripts/create-tables.sh new file mode 100755 index 0000000..3305bba --- /dev/null +++ b/scripts/create-tables.sh @@ -0,0 +1,48 @@ +#!/usr/bin/env bash + +# Usage: PROJECT_ID=.. DATASET_NAME=... ./create-tables.sh +# Optionally, table names can be set using TRANSACTIONS_TABLE_NAME, ERRORS_TABLE_NAME, DEDUPE_STATE_TABLE_NAME + +TRANSACTIONS_TABLE_NAME=${TRANSACTIONS_TABLE_NAME:-transactions} +ERRORS_TABLE_NAME=${ERRORS_TABLE_NAME:-errors} +DEDUPE_STATE_TABLE_NAME=${DEDUPE_STATE_TABLE_NAME:-dedupe_state} + +# Ensure `bq` cli exists +if [[ `which bq` == "" ]]; then + echo "Couldn't find 'bq' cli. Make sure Cloud SDK is installed. https://cloud.google.com/sdk/docs#install_the_latest_cloud_sdk_version" + exit 1 +fi + +if [[ "${PROJECT_ID}" == "" ]]; then + echo "PROJECT_ID is not set" + exit 1 +fi + +if [[ "${DATASET_NAME}" == "" ]]; then + echo "DATASET_NAME is not set" + exit 1 +fi + +BASE_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )" + +bq mk \ + --table \ + --description "Hedera network transactions" \ + --time_partitioning_field consensusTimestampTruncated \ + --time_partitioning_type DAY \ + --clustering_fields transactionType \ + ${PROJECT_ID}:${DATASET_NAME}.${TRANSACTIONS_TABLE_NAME} \ + ${BASE_DIR}/../hedera-etl-dataflow/src/main/resources/schema.json + +bq mk \ + --table \ + --description "Hedera ETL Errors" \ + ${PROJECT_ID}:${DATASET_NAME}.${ERRORS_TABLE_NAME} \ + ${BASE_DIR}/../hedera-etl-dataflow/src/main/resources/errors_schema.json + +bq mk \ + --table \ + --description "BigQuery deduplication job state" \ + ${PROJECT_ID}:${DATASET_NAME}.${DEDUPE_STATE_TABLE_NAME} \ + ${BASE_DIR}/../hedera-dedupe-bigquery/state-schema.json +