Skip to content

Commit

Permalink
Deduplication Job
Browse files Browse the repository at this point in the history
Deduplication job will run as a Spring application.
It'll be triggered every 5 min (configurable) and will do the following:
- Get state of last run
- Check if there is new data to be deduplicated
  - Check if there are duplicates in new data
    - Deduplicated
  - Save new state

Changes:
- Adds metrics to track deduplication job
- Adds new column 'dedupe' to transactions table
- Adds integration test
- Adds create-tables.sh script to create all tables needed by hedera-etl

Signed-off-by: Apekshit Sharma <[email protected]>
  • Loading branch information
apeksharma committed May 12, 2020
1 parent 0aba949 commit beb1a90
Show file tree
Hide file tree
Showing 20 changed files with 1,075 additions and 37 deletions.
159 changes: 159 additions & 0 deletions hedera-dedupe-bigquery/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
<?xml version="1.0" encoding="UTF-8" ?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

<artifactId>hedera-bigquery-deduplication</artifactId>
<description>Deduplicates rows in BigQuery table</description>
<modelVersion>4.0.0</modelVersion>
<name>Hedera BigQuery Deduplication</name>
<packaging>jar</packaging>

<parent>
<groupId>com.hedera</groupId>
<artifactId>hedera-etl</artifactId>
<version>0.0.1</version>
</parent>

<properties>
<lombok.version>1.18.12</lombok.version>
<micrometer.version>1.5.0</micrometer.version>
</properties>

<dependencies>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-stackdriver</artifactId>
<version>${micrometer.version}</version>
</dependency>
<dependency>
<groupId>org.hibernate.validator</groupId>
<artifactId>hibernate-validator</artifactId>
</dependency>
<dependency>
<groupId>org.hibernate.validator</groupId>
<artifactId>hibernate-validator-annotation-processor</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-gcp-starter-bigquery</artifactId>
</dependency>
</dependencies>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-gcp-dependencies</artifactId>
<version>1.2.2.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<profiles>
<profile>
<id>dev</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<executions>
<execution>
<id>default-test</id>
<configuration>
<excludedGroups>gcpBigquery</excludedGroups>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>gcpBigquery</id>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<executions>
<execution>
<id>default-test</id>
<configuration>
<groups>gcpBigquery</groups>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>build-info</goal>
</goals>
</execution>
<execution>
<id>repackage</id>
<goals>
<goal>repackage</goal>
</goals>
<configuration>
<classifier>exec</classifier>
</configuration>
</execution>
</executions>
</plugin>
</plugins>

</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package com.hedera.dedupe;

/*-
* ‌
* Hedera ETL
* ​
* Copyright (C) 2020 Hedera Hashgraph, LLC
* ​
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ‍
*/

import com.google.cloud.bigquery.*;
import java.time.Instant;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.springframework.stereotype.Component;

@Log4j2
@Component
@RequiredArgsConstructor
public class BigQueryHelper {
@Getter
private final BigQuery bigQuery;
private final DedupeProperties properties;
private final DedupeMetrics dedupeMetrics;

public TableResult runQuery(String query, String jobName) throws InterruptedException {
// timestamp is just for uniqueness
JobId jobId = JobId.of(properties.getProjectId(), "dedupe_" + jobName + "_" + Instant.now().getEpochSecond());
log.info("### Starting job {}", jobId.getJob());
log.info("Query: {}", query);
TableResult tableResult = bigQuery.query(QueryJobConfiguration.newBuilder(query).build(), jobId);
Job job = bigQuery.getJob(jobId);
dedupeMetrics.recordMetrics(jobName, job.getStatistics());
return tableResult;
}

public void ensureTableExists(String dataset, String tableName) {
Table table = bigQuery.getTable(dataset, tableName);
if (table == null) {
throw new IllegalArgumentException("Table does not exist : " + dataset + "." + tableName);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.hedera.dedupe;

/*-
* ‌
* Hedera ETL
* ​
* Copyright (C) 2020 Hedera Hashgraph, LLC
* ​
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ‍
*/

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.ConfigurationPropertiesScan;

@ConfigurationPropertiesScan
@SpringBootApplication
public class DedupeApplication {
public static void main(String[] args) {
SpringApplication.run(DedupeApplication.class, args);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package com.hedera.dedupe;

/*-
* ‌
* Hedera ETL
* ​
* Copyright (C) 2020 Hedera Hashgraph, LLC
* ​
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ‍
*/

import com.google.api.gax.core.CredentialsProvider;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.stackdriver.StackdriverConfig;
import io.micrometer.stackdriver.StackdriverMeterRegistry;
import lombok.RequiredArgsConstructor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.cloud.gcp.core.GcpProjectIdProvider;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;

@EnableAsync
@Configuration
@RequiredArgsConstructor
public class DedupeConfiguration {

private final GcpProjectIdProvider projectIdProvider;
private final CredentialsProvider credentialsProvider;
private final DedupeProperties properties;

@Bean
StackdriverConfig stackdriverConfig() {
return new StackdriverConfig() {
@Override
public String projectId() {
return projectIdProvider.getProjectId();
}

// Setting "management.metrics.export.stackdriver: false" is not working
@Override
public boolean enabled() {
return properties.isMetricsEnabled();
}

@Override
public String get(String key) {
return null;
}

@Override
public CredentialsProvider credentials() {
return credentialsProvider;
}
};
}

@Bean
MeterRegistry stackdriverMeterRegistry(StackdriverConfig stackdriverConfig) {
return StackdriverMeterRegistry.builder(stackdriverConfig).build();
}

// Scheduling is disabled for testing
@Configuration
@EnableScheduling
@ConditionalOnProperty(prefix = "hedera.dedupe.scheduling", name = "enabled", matchIfMissing = true)
protected static class SchedulingConfiguration {
}
}
Loading

0 comments on commit beb1a90

Please sign in to comment.