From 9c7884204993c573d9c610704128382e1438453a Mon Sep 17 00:00:00 2001 From: Apoorva M K Date: Tue, 27 Sep 2022 18:08:19 +0000 Subject: [PATCH] Added connection string parser, FQN generation and Lineage Recording for Generic DB plugin --- .../cdap/plugin/db/batch/source/DBSource.java | 12 ++++++ .../io/cdap/plugin/db/common/DBURLParser.java | 40 +++++++++++++++++++ 2 files changed, 52 insertions(+) create mode 100644 database-plugins/src/main/java/io/cdap/plugin/db/common/DBURLParser.java diff --git a/database-plugins/src/main/java/io/cdap/plugin/db/batch/source/DBSource.java b/database-plugins/src/main/java/io/cdap/plugin/db/batch/source/DBSource.java index 3f487e92d..787a65527 100644 --- a/database-plugins/src/main/java/io/cdap/plugin/db/batch/source/DBSource.java +++ b/database-plugins/src/main/java/io/cdap/plugin/db/batch/source/DBSource.java @@ -41,6 +41,7 @@ import io.cdap.plugin.DBRecord; import io.cdap.plugin.FieldCase; import io.cdap.plugin.StructuredRecordUtils; +import io.cdap.plugin.common.Asset; import io.cdap.plugin.common.LineageRecorder; import io.cdap.plugin.common.ReferenceBatchSource; import io.cdap.plugin.common.ReferencePluginConfig; @@ -49,6 +50,7 @@ import io.cdap.plugin.common.db.DriverCleanup; import io.cdap.plugin.db.batch.TransactionIsolationLevel; import io.cdap.plugin.db.common.DBBaseConfig; +import io.cdap.plugin.db.common.DBURLParser; import io.cdap.plugin.db.connector.DBConnector; import io.cdap.plugin.db.connector.DBConnectorConfig; import org.apache.hadoop.conf.Configuration; @@ -59,6 +61,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.net.URI; import java.sql.Connection; import java.sql.Driver; import java.sql.DriverManager; @@ -270,6 +273,15 @@ private Connection getConnection() throws SQLException { return DriverManager.getConnection(sourceConfig.getConnectionString(), properties); } + protected LineageRecorder getLineageRecorder(BatchSourceContext context) { + // dbtype, host, port, db from the connection string + // table is the reference name + URI uri = DBURLParser.parseURL(sourceConfig.getConnectionString()); + String fqn = DBURLParser.constructFQN(uri, sourceConfig.getReferenceName()); + Asset asset = Asset.builder(sourceConfig.getReferenceName()).setFqn(fqn).build(); + return new LineageRecorder(context, asset); + } + /** * {@link PluginConfig} for {@link DBSource} */ diff --git a/database-plugins/src/main/java/io/cdap/plugin/db/common/DBURLParser.java b/database-plugins/src/main/java/io/cdap/plugin/db/common/DBURLParser.java new file mode 100644 index 000000000..6853d04f7 --- /dev/null +++ b/database-plugins/src/main/java/io/cdap/plugin/db/common/DBURLParser.java @@ -0,0 +1,40 @@ +/* + * Copyright © 2022 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.db.common; + +import java.net.URI; + +/** + * URL Parser for DB + */ + +public class DBURLParser { + public static URI parseURL(String connectionString) { + // Remove the 'jdbc:' prefix from the connection string + String cleanURI = connectionString.substring(5); + URI uri = URI.create(cleanURI); + return uri; + } + + public static String constructFQN(URI uri, String tableName) { + if (uri.getScheme() == "postgres") { + return ""; + } else { + return String.format("%s://%s:%s/%s/%s", uri.getScheme(), uri.getHost(), uri.getPort(), uri.getPath(), tableName); + } + } +}