diff --git a/cloudsql-postgresql-plugin/docs/CloudSQLPostgreSQL-action.md b/cloudsql-postgresql-plugin/docs/CloudSQLPostgreSQL-action.md new file mode 100644 index 000000000..5efe512f2 --- /dev/null +++ b/cloudsql-postgresql-plugin/docs/CloudSQLPostgreSQL-action.md @@ -0,0 +1,108 @@ +# PostgreSQL Action + + +Description +----------- +Action that runs a PostgreSQL command on a CloudSQL PostgreSQL instance. + + +Use Case +-------- +The action can be used whenever you want to run a PostgreSQL command before or after a data pipeline. +For example, you may want to run a SQL update command on a database before the pipeline source pulls data from tables. + + +Properties +---------- +**Driver Name:** Name of the JDBC driver to use. + +**Database Command:** Database command to execute. + +**Database:** PostgreSQL database name. + +**Connection Name:** The CloudSQL instance to connect to in the format :\:. +Can be found in the instance overview page. + +**CloudSQL Instance Type:** Whether the CloudSQL instance to connect to is private or public. Defaults to 'Public'. + +**Username:** User identity for connecting to the specified database. + +**Password:** Password to use to connect to the specified database. + +**Connection Arguments:** A list of arbitrary string key/value pairs as connection arguments. These arguments +will be passed to the JDBC driver as connection arguments for JDBC drivers that may need additional configurations. + +**Connection Timeout** The timeout value used for socket connect operations. If connecting to the server takes longer +than this value, the connection is broken.The timeout is specified in seconds and a value of zero means that it is +disabled. + + +Examples +-------- +**Connecting to a public CloudSQL PostgreSQL instance** + +Suppose you want to execute a query against a CloudSQL PostgreSQL database named "prod", as "postgres" user with "postgres" +password (Get the latest version of the CloudSQL socket factory jar with driver and dependencies +[here](https://github.com/GoogleCloudPlatform/cloud-sql-jdbc-socket-factory/releases)), then configure plugin with: + + +``` +Driver Name: "cloudsql-postgresql" +Database Command: "UPDATE table_name SET price = 20 WHERE ID = 6" +Instance Name: [PROJECT_ID]:[REGION]:[INSTANCE_NAME] +CloudSQL Instance Type: "Public" +Database: "prod" +Username: "postgres" +Password: "postgres" +``` + + +**Connecting to a private CloudSQL PostgreSQL instance** + +If you want to connect to a private CloudSQL PostgreSQL instance, create a Compute Engine VM that runs the CloudSQL Proxy +docker image using the following command + +``` +# Set the environment variables +export PROJECT=[project_id] +export REGION=[vm-region] +export ZONE=`gcloud compute zones list --filter="name=${REGION}" --limit +1 --uri --project=${PROJECT}| sed 's/.*\///'` +export SUBNET=[vpc-subnet-name] +export NAME=[gce-vm-name] +export POSTGRESQL_CONN=[postgresql-instance-connection-name] + +# Create a Compute Engine VM +gcloud beta compute --project=${PROJECT_ID} instances create ${INSTANCE_NAME} +--zone=${ZONE} --machine-type=g1-small --subnet=${SUBNE} --no-address +--metadata=startup-script="docker run -d -p 0.0.0.0:3306:3306 +gcr.io/cloudsql-docker/gce-proxy:1.16 /cloud_sql_proxy +-instances=${POSTGRESQL_CONNECTION_NAME}=tcp:0.0.0.0:3306" --maintenance-policy=MIGRATE +--scopes=https://www.googleapis.com/auth/cloud-platform +--image=cos-69-10895-385-0 --image-project=cos-cloud +``` + +Optionally, you can promote the internal IP address of the VM running the Proxy image to a static IP using + +``` +# Get the VM internal IP +export IP=`gcloud compute instances describe ${NAME} --zone ${ZONE} | +grep "networkIP" | awk '{print $2}'` + +# Promote the VM internal IP to static IP +gcloud compute addresses create postgresql-proxy --addresses ${IP} --region +${REGION} --subnet ${SUBNET} +``` + +Get the latest version of the CloudSQL socket factory jar with driver and dependencies from +[here](https://github.com/GoogleCloudPlatform/cloud-sql-jdbc-socket-factory/releases), then configure plugin with: + +``` +Driver Name: "cloudsql-postgresql" +Database Command: "UPDATE table_name SET price = 20 WHERE ID = 6" +Instance Name: [PROJECT_ID]:[REGION]:[INSTANCE_NAME] +CloudSQL Instance Type: "Private" +Database: "prod" +Username: "postgres" +Password: "postgres" +``` diff --git a/cloudsql-postgresql-plugin/docs/CloudSQLPostgreSQL-batchsink.md b/cloudsql-postgresql-plugin/docs/CloudSQLPostgreSQL-batchsink.md new file mode 100644 index 000000000..3c530cf38 --- /dev/null +++ b/cloudsql-postgresql-plugin/docs/CloudSQLPostgreSQL-batchsink.md @@ -0,0 +1,163 @@ +# CloudSQL PostgreSQL Batch Sink + + +Description +----------- +Writes records to a CloudSQL PostgreSQL table. Each record will be written to a row in the table. + + +Use Case +-------- +This sink is used whenever you need to write to a CloudSQL PostgreSQL table. +Suppose you periodically build a recommendation model for products on your online store. +The model is stored in a GCS bucket and you want to export the contents +of the bucket to a CloudSQL PostgreSQL table where it can be served to your users. + +Column names would be auto detected from input schema. + +Properties +---------- +**Reference Name:** Name used to uniquely identify this sink for lineage, annotating metadata, etc. + +**Driver Name:** Name of the JDBC driver to use. + +**Database:** CloudSQL PostgreSQL database name. + +**Connection Name:** The CloudSQL instance to connect to in the format :\:. +Can be found in the instance overview page. + +**CloudSQL Instance Type:** Whether the CloudSQL instance to connect to is private or public. Defaults to 'Public'. + +**Table Name:** Name of the table to export to. + +**Username:** User identity for connecting to the specified database. + +**Password:** Password to use to connect to the specified database. + +**Transaction Isolation Level:** Transaction isolation level for queries run by this sink. + +**Connection Arguments:** A list of arbitrary string key/value pairs as connection arguments. These arguments +will be passed to the JDBC driver as connection arguments for JDBC drivers that may need additional configurations. + +**Connection Timeout** The timeout value used for socket connect operations. If connecting to the server takes longer +than this value, the connection is broken.The timeout is specified in seconds and a value of zero means that it is +disabled. + + +Examples +-------- +**Connecting to a public CloudSQL PostgreSQL instance** + +Suppose you want to write output records to "users" table of CloudSQL PostgreSQL database named "prod", as "postgres" +user with "postgres" password (Get the latest version of the CloudSQL socket factory jar with driver and dependencies +[here](https://github.com/GoogleCloudPlatform/cloud-sql-jdbc-socket-factory/releases)), then configure plugin with: + + +``` +Reference Name: "sink1" +Driver Name: "cloudsql-postgresql" +Database: "prod" +Instance Name: [PROJECT_ID]:[REGION]:[INSTANCE_NAME] +CloudSQL Instance Type: "Public" +Table Name: "users" +Username: "postgres" +Password: "postgres" +``` + + +**Connecting to a private CloudSQL PostgreSQL instance** + +If you want to connect to a private CloudSQL PostgreSQL instance, create a Compute Engine VM that runs the CloudSQL Proxy +docker image using the following command + +``` +# Set the environment variables +export PROJECT=[project_id] +export REGION=[vm-region] +export ZONE=`gcloud compute zones list --filter="name=${REGION}" --limit +1 --uri --project=${PROJECT}| sed 's/.*\///'` +export SUBNET=[vpc-subnet-name] +export NAME=[gce-vm-name] +export POSTGRESQL_CONN=[postgresql-instance-connection-name] + +# Create a Compute Engine VM +gcloud beta compute --project=${PROJECT_ID} instances create ${INSTANCE_NAME} +--zone=${ZONE} --machine-type=g1-small --subnet=${SUBNE} --no-address +--metadata=startup-script="docker run -d -p 0.0.0.0:3306:3306 +gcr.io/cloudsql-docker/gce-proxy:1.16 /cloud_sql_proxy +-instances=${POSTGRESQL_CONNECTION_NAME}=tcp:0.0.0.0:3306" --maintenance-policy=MIGRATE +--scopes=https://www.googleapis.com/auth/cloud-platform +--image=cos-69-10895-385-0 --image-project=cos-cloud +``` + +Optionally, you can promote the internal IP address of the VM running the Proxy image to a static IP using + +``` +# Get the VM internal IP +export IP=`gcloud compute instances describe ${NAME} --zone ${ZONE} | +grep "networkIP" | awk '{print $2}'` + +# Promote the VM internal IP to static IP +gcloud compute addresses create postgresql-proxy --addresses ${IP} --region +${REGION} --subnet ${SUBNET} +``` + +Get the latest version of the CloudSQL socket factory jar with driver and dependencies from +[here](https://github.com/GoogleCloudPlatform/cloud-sql-jdbc-socket-factory/releases), then configure plugin with: + +``` +Reference Name: "sink1" +Driver Name: "cloudsql-postgresql" +Database: "prod" +Instance Name: [PROJECT_ID]:[REGION]:[INSTANCE_NAME] +CloudSQL Instance Type: "Private" +Table Name: "users" +Username: "postgres" +Password: "postgres" +``` + + +Data Types Mapping +------------------ +All PostgreSQL specific data types mapped to string and can have multiple input formats and one 'canonical' output form. +Please, refer to PostgreSQL data types documentation to figure out proper formats. + +| PostgreSQL Data Type | CDAP Schema Data Type | Comment | +|-----------------------------------------------------|-----------------------|----------------------------------------------| +| bigint | long | | +| bit(n) | string | string with '0' and '1' chars exact n length | +| bit varying(n) | string | string with '0' and '1' chars max n length | +| boolean | boolean | | +| bytea | bytes | | +| character | string | | +| character varying | string | | +| double precision | double | | +| integer | int | | +| numeric(precision, scale)/decimal(precision, scale) | decimal | | +| real | float | | +| smallint | int | | +| text | string | | +| date | date | | +| time [ (p) ] [ without time zone ] | time | | +| time [ (p) ] with time zone | string | | +| timestamp [ (p) ] [ without time zone ] | timestamp | | +| timestamp [ (p) ] with time zone | timestamp | stored in UTC format in database | +| xml | string | | +| tsquery | string | | +| tsvector | string | | +| uuid | string | | +| box | string | | +| cidr | string | | +| circle | string | | +| inet | string | | +| interval | string | | +| json | string | | +| jsonb | string | | +| line | string | | +| lseg | string | | +| macaddr | string | | +| macaddr8 | string | | +| money | string | | +| path | string | | +| point | string | | +| polygon | string | | diff --git a/cloudsql-postgresql-plugin/docs/CloudSQLPostgreSQL-batchsource.md b/cloudsql-postgresql-plugin/docs/CloudSQLPostgreSQL-batchsource.md new file mode 100644 index 000000000..69cddb22c --- /dev/null +++ b/cloudsql-postgresql-plugin/docs/CloudSQLPostgreSQL-batchsource.md @@ -0,0 +1,187 @@ +# CloudSQL PostgreSQL Batch Source + + +Description +----------- +Reads from a CloudSQL PostgreSQL database table(s) using a configurable SQL query. +Outputs one record for each row returned by the query. + + +Use Case +-------- +The source is used whenever you need to read from a CloudSQL PostgreSQL instance database. For example, you may want +to create daily snapshots of a database table by using this source and writing to +a TimePartitionedFileSet. + + +Properties +---------- +**Reference Name:** Name used to uniquely identify this source for lineage, annotating metadata, etc. + +**Driver Name:** Name of the JDBC driver to use. + +**Database:** CloudSQL PostgreSQL database name. + +**Connection Name:** The CloudSQL instance to connect to in the format :\:. +Can be found in the instance overview page. + +**CloudSQL Instance Type:** Whether the CloudSQL instance to connect to is private or public. Defaults to 'Public'. + +**Import Query:** The SELECT query to use to import data from the specified table. +You can specify an arbitrary number of columns to import, or import all columns using \*. The Query should +contain the '$CONDITIONS' string. For example, 'SELECT * FROM table WHERE $CONDITIONS'. +The '$CONDITIONS' string will be replaced by 'splitBy' field limits specified by the bounding query. +The '$CONDITIONS' string is not required if numSplits is set to one. + +**Bounding Query:** Bounding Query should return the min and max of the values of the 'splitBy' field. +For example, 'SELECT MIN(id),MAX(id) FROM table'. Not required if numSplits is set to one. + +**Split-By Field Name:** Field Name which will be used to generate splits. Not required if numSplits is set to one. + +**Number of Splits to Generate:** Number of splits to generate. + +**Username:** User identity for connecting to the specified database. + +**Password:** Password to use to connect to the specified database. + +**Connection Arguments:** A list of arbitrary string key/value pairs as connection arguments. These arguments +will be passed to the JDBC driver as connection arguments for JDBC drivers that may need additional configurations. + +**Schema:** The schema of records output by the source. This will be used in place of whatever schema comes +back from the query. However, it must match the schema that comes back from the query, +except it can mark fields as nullable and can contain a subset of the fields. + + +Examples +-------- +**Connecting to a public CloudSQL PostgreSQL instance** + +Suppose you want to read data from CloudSQL PostgreSQL database named "prod", as "postgres" user with "postgres" +password (Get the latest version of the CloudSQL socket factory jar with driver and dependencies +[here](https://github.com/GoogleCloudPlatform/cloud-sql-jdbc-socket-factory/releases)), then configure plugin with: + + +``` +Reference Name: "src1" +Driver Name: "cloudsql-postgresql" +Database: "prod" +Instance Name: [PROJECT_ID]:[REGION]:[INSTANCE_NAME] +CloudSQL Instance Type: "Public" +Import Query: "select id, name, email, phone from users;" +Number of Splits to Generate: 1 +Username: "postgres" +Password: "postgres" +``` + +For example, if the 'id' column is a primary key of type int and the other columns are +non-nullable varchars, output records will have this schema: + + | field name | type | + | -------------- | ------------------- | + | id | int | + | name | string | + | email | string | + | phone | string | + + + +**Connecting to a private CloudSQL PostgreSQL instance** + +If you want to connect to a private CloudSQL PostgreSQL instance, create a Compute Engine VM that runs the CloudSQL Proxy +docker image using the following command + +``` +# Set the environment variables +export PROJECT=[project_id] +export REGION=[vm-region] +export ZONE=`gcloud compute zones list --filter="name=${REGION}" --limit +1 --uri --project=${PROJECT}| sed 's/.*\///'` +export SUBNET=[vpc-subnet-name] +export NAME=[gce-vm-name] +export POSTGRESQL_CONN=[postgresql-instance-connection-name] + +# Create a Compute Engine VM +gcloud beta compute --project=${PROJECT_ID} instances create ${INSTANCE_NAME} +--zone=${ZONE} --machine-type=g1-small --subnet=${SUBNE} --no-address +--metadata=startup-script="docker run -d -p 0.0.0.0:3306:3306 +gcr.io/cloudsql-docker/gce-proxy:1.16 /cloud_sql_proxy +-instances=${POSTGRESQL_CONNECTION_NAME}=tcp:0.0.0.0:3306" --maintenance-policy=MIGRATE +--scopes=https://www.googleapis.com/auth/cloud-platform +--image=cos-69-10895-385-0 --image-project=cos-cloud +``` + +Optionally, you can promote the internal IP address of the VM running the Proxy image to a static IP using + +``` +# Get the VM internal IP +export IP=`gcloud compute instances describe ${NAME} --zone ${ZONE} | +grep "networkIP" | awk '{print $2}'` + +# Promote the VM internal IP to static IP +gcloud compute addresses create postgresql-proxy --addresses ${IP} --region +${REGION} --subnet ${SUBNET} +``` + +Get the latest version of the CloudSQL socket factory jar with driver and dependencies from +[here](https://github.com/GoogleCloudPlatform/cloud-sql-jdbc-socket-factory/releases), then configure plugin with: + +``` +Reference Name: "src1" +Driver Name: "cloudsql-postgresql" +Database: "prod" +Instance Name: [PROJECT_ID]:[REGION]:[INSTANCE_NAME] +CloudSQL Instance Type: "Private" +Import Query: "select id, name, email, phone from users;" +Number of Splits to Generate: 1 +Username: "postgres" +Password: "postgres" +``` + + +Data Types Mapping +------------------ +All PostgreSQL specific data types mapped to string and can have multiple input formats and one 'canonical' output form. +Please, refer to PostgreSQL data types documentation to figure out proper formats. + +| PostgreSQL Data Type | CDAP Schema Data Type | Comment | +|-----------------------------------------------------|-----------------------|----------------------------------------------| +| bigint | long | | +| bigserial | long | | +| bit(n) | string | string with '0' and '1' chars exact n length | +| bit varying(n) | string | string with '0' and '1' chars max n length | +| boolean | boolean | | +| bytea | bytes | | +| character | string | | +| character varying | string | | +| double precision | double | | +| integer | int | | +| numeric(precision, scale)/decimal(precision, scale) | decimal | | +| real | float | | +| smallint | int | | +| smallserial | int | | +| serial | int | | +| text | string | | +| date | date | | +| time [ (p) ] [ without time zone ] | time | | +| time [ (p) ] with time zone | string | | +| timestamp [ (p) ] [ without time zone ] | timestamp | | +| timestamp [ (p) ] with time zone | timestamp | stored in UTC format in database | +| xml | string | | +| tsquery | string | | +| tsvector | string | | +| uuid | string | | +| box | string | | +| cidr | string | | +| circle | string | | +| inet | string | | +| interval | string | | +| json | string | | +| jsonb | string | | +| line | string | | +| lseg | string | | +| macaddr | string | | +| macaddr8 | string | | +| money | string | | +| path | string | | +| point | string | | +| polygon | string | | diff --git a/cloudsql-postgresql-plugin/icons/CloudSQLPostgreSQL-action.png b/cloudsql-postgresql-plugin/icons/CloudSQLPostgreSQL-action.png new file mode 100644 index 000000000..5ce72783a Binary files /dev/null and b/cloudsql-postgresql-plugin/icons/CloudSQLPostgreSQL-action.png differ diff --git a/cloudsql-postgresql-plugin/icons/CloudSQLPostgreSQL-batchsink.png b/cloudsql-postgresql-plugin/icons/CloudSQLPostgreSQL-batchsink.png new file mode 100644 index 000000000..5ce72783a Binary files /dev/null and b/cloudsql-postgresql-plugin/icons/CloudSQLPostgreSQL-batchsink.png differ diff --git a/cloudsql-postgresql-plugin/icons/CloudSQLPostgreSQL-batchsource.png b/cloudsql-postgresql-plugin/icons/CloudSQLPostgreSQL-batchsource.png new file mode 100644 index 000000000..5ce72783a Binary files /dev/null and b/cloudsql-postgresql-plugin/icons/CloudSQLPostgreSQL-batchsource.png differ diff --git a/cloudsql-postgresql-plugin/pom.xml b/cloudsql-postgresql-plugin/pom.xml new file mode 100644 index 000000000..fd0b6eb81 --- /dev/null +++ b/cloudsql-postgresql-plugin/pom.xml @@ -0,0 +1,90 @@ + + + + + database-plugins + io.cdap.plugin + 1.4.0-SNAPSHOT + + + CloudSQL PostgreSQL plugin + cloudsql-postgresql-plugin + 1.4.0-SNAPSHOT + 4.0.0 + + + + io.cdap.cdap + cdap-etl-api + provided + + + io.cdap.plugin + database-commons + ${project.version} + + + io.cdap.plugin + hydrator-common + + + io.cdap.plugin + postgresql-plugin + 1.4.0-SNAPSHOT + + + io.cdap.cdap + hydrator-test + provided + + + + + + + org.apache.felix + maven-bundle-plugin + 3.5.0 + true + + + <_exportcontents> + io.cdap.plugin.* + + *;inline=false;scope=compile + true + lib + + + + + package + + bundle + + + + + + io.cdap + cdap-maven-plugin + + + + diff --git a/cloudsql-postgresql-plugin/samples/BigQuery-to-CloudSQLPostgreSQL.json b/cloudsql-postgresql-plugin/samples/BigQuery-to-CloudSQLPostgreSQL.json new file mode 100644 index 000000000..aaeebca9d --- /dev/null +++ b/cloudsql-postgresql-plugin/samples/BigQuery-to-CloudSQLPostgreSQL.json @@ -0,0 +1,129 @@ +{ + "artifact": { + "name": "cdap-data-pipeline", + "version": "6.1.2", + "scope": "SYSTEM", + "label": "Data Pipeline - Batch" + }, + "description": "", + "name": "bigquery-cloudsqlpostgresql", + "config": { + "resources": { + "memoryMB": 1024, + "virtualCores": 1 + }, + "driverResources": { + "memoryMB": 1024, + "virtualCores": 1 + }, + "connections": [ + { + "from": "BigQuery", + "to": "CloudSQL PostgreSQL" + }, + { + "from": "CloudSQL PostgreSQL", + "to": "CloudSQL PostgreSQL Execute" + } + ], + "comments": [], + "postActions": [], + "properties": {}, + "processTimingEnabled": true, + "stageLoggingEnabled": false, + "stages": [ + { + "name": "BigQuery", + "plugin": { + "name": "BigQueryTable", + "type": "batchsource", + "label": "BigQuery", + "artifact": { + "name": "google-cloud", + "version": "0.14.2", + "scope": "SYSTEM" + }, + "properties": { + "project": "auto-detect", + "serviceFilePath": "auto-detect", + "referenceName": "source", + "dataset": "postgresOrders", + "table": "orders", + "schema": "{\"type\":\"record\",\"name\":\"etlSchemaBody\",\"fields\":[{\"name\":\"ORDERKEY\",\"type\":[\"int\",\"null\"]},{\"name\":\"CUSTKEY\",\"type\":[\"int\",\"null\"]},{\"name\":\"ORDERSTATUS\",\"type\":[\"string\",\"null\"]},{\"name\":\"ORDERDATE\",\"type\":[{\"type\":\"int\",\"logicalType\":\"date\"},\"null\"]},{\"name\":\"ORDERPRIORITY\",\"type\":[\"string\",\"null\"]},{\"name\":\"CLERK\",\"type\":[\"string\",\"null\"]},{\"name\":\"SHIPPRIORITY\",\"type\":[\"int\",\"null\"]},{\"name\":\"O_COMMENT\",\"type\":[\"string\",\"null\"]}]}" + } + }, + "outputSchema": "{\"type\":\"record\",\"name\":\"etlSchemaBody\",\"fields\":[{\"name\":\"ORDERKEY\",\"type\":[\"int\",\"null\"]},{\"name\":\"CUSTKEY\",\"type\":[\"int\",\"null\"]},{\"name\":\"ORDERSTATUS\",\"type\":[\"string\",\"null\"]},{\"name\":\"ORDERDATE\",\"type\":[{\"type\":\"int\",\"logicalType\":\"date\"},\"null\"]},{\"name\":\"ORDERPRIORITY\",\"type\":[\"string\",\"null\"]},{\"name\":\"CLERK\",\"type\":[\"string\",\"null\"]},{\"name\":\"SHIPPRIORITY\",\"type\":[\"int\",\"null\"]},{\"name\":\"O_COMMENT\",\"type\":[\"string\",\"null\"]}]}" + }, + { + "name": "CloudSQL PostgreSQL", + "plugin": { + "name": "CloudSQLPostgreSQL", + "type": "batchsink", + "label": "CloudSQL PostgreSQL", + "artifact": { + "name": "cloudsql-postgresql-plugin", + "version": "1.4.0-SNAPSHOT", + "scope": "USER" + }, + "properties": { + "jdbcPluginName": "cloudsql-postgresql", + "instanceType": "public", + "transactionIsolationLevel": "TRANSACTION_READ_COMMITTED", + "connectionTimeout": "10", + "referenceName": "sink", + "database": "postgres", + "user": "postgres", + "password": "", + "connectionName": "", + "tableName": "orders" + } + }, + "outputSchema": [ + { + "name": "etlSchemaBody", + "schema": "{\"type\":\"record\",\"name\":\"etlSchemaBody\",\"fields\":[{\"name\":\"ORDERKEY\",\"type\":\"int\"},{\"name\":\"CUSTKEY\",\"type\":\"int\"},{\"name\":\"ORDERSTATUS\",\"type\":\"string\"},{\"name\":\"TOTALPRICE\",\"type\":{\"type\":\"bytes\",\"logicalType\":\"decimal\",\"precision\":38,\"scale\":9}},{\"name\":\"ORDERDATE\",\"type\":{\"type\":\"int\",\"logicalType\":\"date\"}},{\"name\":\"ORDERPRIORITY\",\"type\":\"string\"},{\"name\":\"CLERK\",\"type\":\"string\"},{\"name\":\"SHIPPRIORITY\",\"type\":\"int\"},{\"name\":\"O_COMMENT\",\"type\":\"string\"}]}" + } + ], + "inputSchema": [ + { + "name": "BigQuery", + "schema": "{\"type\":\"record\",\"name\":\"etlSchemaBody\",\"fields\":[{\"name\":\"ORDERKEY\",\"type\":[\"int\",\"null\"]},{\"name\":\"CUSTKEY\",\"type\":[\"int\",\"null\"]},{\"name\":\"ORDERSTATUS\",\"type\":[\"string\",\"null\"]},{\"name\":\"ORDERDATE\",\"type\":[{\"type\":\"int\",\"logicalType\":\"date\"},\"null\"]},{\"name\":\"ORDERPRIORITY\",\"type\":[\"string\",\"null\"]},{\"name\":\"CLERK\",\"type\":[\"string\",\"null\"]},{\"name\":\"SHIPPRIORITY\",\"type\":[\"int\",\"null\"]},{\"name\":\"O_COMMENT\",\"type\":[\"string\",\"null\"]}]}" + } + ] + }, + { + "name": "CloudSQL PostgreSQL Execute", + "plugin": { + "name": "CloudSQLPostgreSQL", + "type": "action", + "label": "CloudSQL PostgreSQL Execute", + "artifact": { + "name": "cloudsql-postgresql-plugin", + "version": "1.4.0-SNAPSHOT", + "scope": "USER" + }, + "properties": { + "jdbcPluginName": "cloudsql-postgresql", + "instanceType": "public", + "connectionTimeout": "10", + "database": "postgres", + "user": "postgres", + "password": "", + "connectionName": "", + "query": "UPDATE orders SET \"CLERK\"='Clerk#000049252' WHERE \"ORDERKEY\"=182250663;" + } + }, + "outputSchema": [ + { + "name": "etlSchemaBody", + "schema": "" + } + ] + } + ], + "schedule": "0 * * * *", + "engine": "spark", + "numOfRecordsPreview": 100, + "maxConcurrentRuns": 1 + } +} diff --git a/cloudsql-postgresql-plugin/samples/CloudSQLPostgreSQL-to-BigQuery.json b/cloudsql-postgresql-plugin/samples/CloudSQLPostgreSQL-to-BigQuery.json new file mode 100644 index 000000000..70289cf8f --- /dev/null +++ b/cloudsql-postgresql-plugin/samples/CloudSQLPostgreSQL-to-BigQuery.json @@ -0,0 +1,113 @@ +{ + "name": "cloudsqlpostgresql-bigquery", + "description": "Data Pipeline Application", + "artifact": { + "name": "cdap-data-pipeline", + "version": "6.1.2", + "scope": "SYSTEM" + }, + "config": { + "stages": [ + { + "name": "CloudSQL PostgreSQL", + "plugin": { + "name": "CloudSQLPostgreSQL", + "type": "batchsource", + "label": "CloudSQL PostgreSQL", + "artifact": { + "name": "cloudsql-postgresql-plugin", + "version": "1.4.0-SNAPSHOT", + "scope": "USER" + }, + "properties": { + "jdbcPluginName": "cloudsql-postgresql", + "instanceType": "public", + "numSplits": "500", + "referenceName": "source", + "database": "postgres", + "user": "postgres", + "password": "", + "connectionName": "", + "importQuery": "select * from orders where $CONDITIONS;", + "boundingQuery": "select min(\"ORDERKEY\"), max(\"ORDERKEY\") from qin1tb_orders;", + "splitBy": "\"ORDERKEY\"", + "schema": "{\"type\":\"record\",\"name\":\"etlSchemaBody\",\"fields\":[{\"name\":\"ORDERKEY\",\"type\":[\"int\",\"null\"]},{\"name\":\"CUSTKEY\",\"type\":[\"int\",\"null\"]},{\"name\":\"ORDERSTATUS\",\"type\":[\"string\",\"null\"]},{\"name\":\"ORDERDATE\",\"type\":[{\"type\":\"int\",\"logicalType\":\"date\"},\"null\"]},{\"name\":\"ORDERPRIORITY\",\"type\":[\"string\",\"null\"]},{\"name\":\"CLERK\",\"type\":[\"string\",\"null\"]},{\"name\":\"SHIPPRIORITY\",\"type\":[\"int\",\"null\"]},{\"name\":\"O_COMMENT\",\"type\":[\"string\",\"null\"]}]}" + } + }, + "outputSchema": [ + { + "name": "etlSchemaBody", + "schema": "{\"type\":\"record\",\"name\":\"etlSchemaBody\",\"fields\":[{\"name\":\"ORDERKEY\",\"type\":[\"int\",\"null\"]},{\"name\":\"CUSTKEY\",\"type\":[\"int\",\"null\"]},{\"name\":\"ORDERSTATUS\",\"type\":[\"string\",\"null\"]},{\"name\":\"ORDERDATE\",\"type\":[{\"type\":\"int\",\"logicalType\":\"date\"},\"null\"]},{\"name\":\"ORDERPRIORITY\",\"type\":[\"string\",\"null\"]},{\"name\":\"CLERK\",\"type\":[\"string\",\"null\"]},{\"name\":\"SHIPPRIORITY\",\"type\":[\"int\",\"null\"]},{\"name\":\"O_COMMENT\",\"type\":[\"string\",\"null\"]}]}" + } + ], + "type": "batchsource", + "label": "CloudSQL PostgreSQL", + "icon": "fa-plug" + }, + { + "name": "BigQuery", + "plugin": { + "name": "BigQueryTable", + "type": "batchsink", + "label": "BigQuery", + "artifact": { + "name": "google-cloud", + "version": "0.14.2", + "scope": "SYSTEM" + }, + "properties": { + "project": "auto-detect", + "serviceFilePath": "auto-detect", + "operation": "insert", + "truncateTable": "false", + "allowSchemaRelaxation": "false", + "location": "US", + "createPartitionedTable": "false", + "partitionFilterRequired": "false", + "schema": "{\"type\":\"record\",\"name\":\"etlSchemaBody\",\"fields\":[{\"name\":\"ORDERKEY\",\"type\":[\"int\",\"null\"]},{\"name\":\"CUSTKEY\",\"type\":[\"int\",\"null\"]},{\"name\":\"ORDERSTATUS\",\"type\":[\"string\",\"null\"]},{\"name\":\"ORDERDATE\",\"type\":[{\"type\":\"int\",\"logicalType\":\"date\"},\"null\"]},{\"name\":\"ORDERPRIORITY\",\"type\":[\"string\",\"null\"]},{\"name\":\"CLERK\",\"type\":[\"string\",\"null\"]},{\"name\":\"SHIPPRIORITY\",\"type\":[\"int\",\"null\"]},{\"name\":\"O_COMMENT\",\"type\":[\"string\",\"null\"]}]}", + "referenceName": "sink", + "dataset": "postgresOrders", + "table": "orders" + } + }, + "outputSchema": [ + { + "name": "etlSchemaBody", + "schema": "{\"type\":\"record\",\"name\":\"etlSchemaBody\",\"fields\":[{\"name\":\"ORDERKEY\",\"type\":[\"int\",\"null\"]},{\"name\":\"CUSTKEY\",\"type\":[\"int\",\"null\"]},{\"name\":\"ORDERSTATUS\",\"type\":[\"string\",\"null\"]},{\"name\":\"ORDERDATE\",\"type\":[{\"type\":\"int\",\"logicalType\":\"date\"},\"null\"]},{\"name\":\"ORDERPRIORITY\",\"type\":[\"string\",\"null\"]},{\"name\":\"CLERK\",\"type\":[\"string\",\"null\"]},{\"name\":\"SHIPPRIORITY\",\"type\":[\"int\",\"null\"]},{\"name\":\"O_COMMENT\",\"type\":[\"string\",\"null\"]}]}" + } + ], + "inputSchema": [ + { + "name": "CloudSQL PostgreSQL", + "schema": "{\"type\":\"record\",\"name\":\"etlSchemaBody\",\"fields\":[{\"name\":\"ORDERKEY\",\"type\":[\"int\",\"null\"]},{\"name\":\"CUSTKEY\",\"type\":[\"int\",\"null\"]},{\"name\":\"ORDERSTATUS\",\"type\":[\"string\",\"null\"]},{\"name\":\"ORDERDATE\",\"type\":[{\"type\":\"int\",\"logicalType\":\"date\"},\"null\"]},{\"name\":\"ORDERPRIORITY\",\"type\":[\"string\",\"null\"]},{\"name\":\"CLERK\",\"type\":[\"string\",\"null\"]},{\"name\":\"SHIPPRIORITY\",\"type\":[\"int\",\"null\"]},{\"name\":\"O_COMMENT\",\"type\":[\"string\",\"null\"]}]}" + } + ], + "type": "batchsink", + "label": "BigQuery", + "icon": "fa-plug" + } + ], + "connections": [ + { + "from": "CloudSQL PostgreSQL", + "to": "BigQuery" + } + ], + "comments": [], + "resources": { + "memoryMB": 1024, + "virtualCores": 1 + }, + "driverResources": { + "memoryMB": 1024, + "virtualCores": 1 + }, + "postActions": [], + "properties": {}, + "processTimingEnabled": true, + "stageLoggingEnabled": false, + "engine": "spark", + "schedule": "0 * * * *", + "maxConcurrentRuns": 1 + } +} diff --git a/cloudsql-postgresql-plugin/src/main/java/io/cdap/plugin/cloudsql/postgres/CloudSQLPostgreSQLAction.java b/cloudsql-postgresql-plugin/src/main/java/io/cdap/plugin/cloudsql/postgres/CloudSQLPostgreSQLAction.java new file mode 100644 index 000000000..6c1c75aa6 --- /dev/null +++ b/cloudsql-postgresql-plugin/src/main/java/io/cdap/plugin/cloudsql/postgres/CloudSQLPostgreSQLAction.java @@ -0,0 +1,111 @@ +/* + * Copyright © 2020 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.cloudsql.postgres; + +import com.google.common.collect.ImmutableMap; +import io.cdap.cdap.api.annotation.Description; +import io.cdap.cdap.api.annotation.Name; +import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.etl.api.FailureCollector; +import io.cdap.cdap.etl.api.PipelineConfigurer; +import io.cdap.cdap.etl.api.action.Action; +import io.cdap.plugin.db.batch.action.AbstractDBAction; +import io.cdap.plugin.db.batch.action.QueryConfig; + +import java.util.Map; +import javax.annotation.Nullable; + +/** + * Action that runs a PostgreSQL command on a CloudSQL PostgreSQL instance. + */ +@Plugin(type = Action.PLUGIN_TYPE) +@Name(CloudSQLPostgreSQLConstants.PLUGIN_NAME) +@Description("Action that runs a PostgreSQL command on a CloudSQL PostgreSQL instance") +public class CloudSQLPostgreSQLAction extends AbstractDBAction { + + private final CloudSQLPostgreSQLActionConfig cloudsqlPostgresqlActionConfig; + + public CloudSQLPostgreSQLAction(CloudSQLPostgreSQLActionConfig cloudsqlPostgresqlActionConfig) { + super(cloudsqlPostgresqlActionConfig, false); + this.cloudsqlPostgresqlActionConfig = cloudsqlPostgresqlActionConfig; + } + + @Override + public void configurePipeline(PipelineConfigurer pipelineConfigurer) { + FailureCollector failureCollector = pipelineConfigurer.getStageConfigurer().getFailureCollector(); + + CloudSQLPostgreSQLUtil.checkConnectionName( + failureCollector, + cloudsqlPostgresqlActionConfig.instanceType, + cloudsqlPostgresqlActionConfig.connectionName); + + super.configurePipeline(pipelineConfigurer); + } + + /** CloudSQL PostgreSQL action config. */ + public static class CloudSQLPostgreSQLActionConfig extends QueryConfig { + + public CloudSQLPostgreSQLActionConfig() { + this.instanceType = CloudSQLPostgreSQLConstants.PUBLIC_INSTANCE; + } + + @Name(CloudSQLPostgreSQLConstants.CONNECTION_NAME) + @Description( + "The CloudSQL instance to connect to. For a public instance, the connection string should be in the format " + + ":: which can be found in the instance overview page. For a private " + + "instance, enter the internal IP address of the Compute Engine VM cloudsql proxy is running on.") + public String connectionName; + + @Name(DATABASE) + @Description("Database name to connect to") + public String database; + + @Name(CloudSQLPostgreSQLConstants.CONNECTION_TIMEOUT) + @Description( + "The timeout value used for socket connect operations. If connecting to the server takes longer " + + "than this value, the connection is broken. The timeout is specified in seconds and a value " + + "of zero means that it is disabled") + @Nullable + public Integer connectionTimeout; + + @Name(CloudSQLPostgreSQLConstants.INSTANCE_TYPE) + @Description("Whether the CloudSQL instance to connect to is private or public.") + @Nullable + public String instanceType; + + @Override + public String getConnectionString() { + if (CloudSQLPostgreSQLConstants.PRIVATE_INSTANCE.equalsIgnoreCase(instanceType)) { + return String.format( + CloudSQLPostgreSQLConstants.PRIVATE_CLOUDSQL_POSTGRES_CONNECTION_STRING_FORMAT, + connectionName, + database); + } + + return String.format( + CloudSQLPostgreSQLConstants.PUBLIC_CLOUDSQL_POSTGRES_CONNECTION_STRING_FORMAT, + database, + connectionName); + } + + @Override + public Map getDBSpecificArguments() { + return ImmutableMap.of( + CloudSQLPostgreSQLConstants.CONNECTION_TIMEOUT, String.valueOf(connectionTimeout)); + } + } +} diff --git a/cloudsql-postgresql-plugin/src/main/java/io/cdap/plugin/cloudsql/postgres/CloudSQLPostgreSQLConstants.java b/cloudsql-postgresql-plugin/src/main/java/io/cdap/plugin/cloudsql/postgres/CloudSQLPostgreSQLConstants.java new file mode 100644 index 000000000..79f17a018 --- /dev/null +++ b/cloudsql-postgresql-plugin/src/main/java/io/cdap/plugin/cloudsql/postgres/CloudSQLPostgreSQLConstants.java @@ -0,0 +1,35 @@ +/* + * Copyright © 2020 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.cloudsql.postgres; + +/** CloudSQL PostgreSQL constants. */ +public final class CloudSQLPostgreSQLConstants { + + private CloudSQLPostgreSQLConstants() { + } + + public static final String PLUGIN_NAME = "CloudSQLPostgreSQL"; + public static final String CONNECTION_NAME = "connectionName"; + public static final String CONNECTION_NAME_PATTERN = "[a-z0-9-]+:[a-z0-9-]+:[a-z0-9-]+"; + public static final String CONNECTION_TIMEOUT = "connectionTimeout"; + public static final String PUBLIC_INSTANCE = "public"; + public static final String PUBLIC_CLOUDSQL_POSTGRES_CONNECTION_STRING_FORMAT = + "jdbc:postgresql:///%s?cloudSqlInstance=%s&socketFactory=com.google.cloud.sql.postgres.SocketFactory"; + public static final String INSTANCE_TYPE = "instanceType"; + public static final String PRIVATE_INSTANCE = "private"; + public static final String PRIVATE_CLOUDSQL_POSTGRES_CONNECTION_STRING_FORMAT = "jdbc:postgresql://%s/%s"; +} diff --git a/cloudsql-postgresql-plugin/src/main/java/io/cdap/plugin/cloudsql/postgres/CloudSQLPostgreSQLSink.java b/cloudsql-postgresql-plugin/src/main/java/io/cdap/plugin/cloudsql/postgres/CloudSQLPostgreSQLSink.java new file mode 100644 index 000000000..05a4ba62c --- /dev/null +++ b/cloudsql-postgresql-plugin/src/main/java/io/cdap/plugin/cloudsql/postgres/CloudSQLPostgreSQLSink.java @@ -0,0 +1,166 @@ +/* + * Copyright © 2020 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.cloudsql.postgres; + +import com.google.common.collect.ImmutableMap; +import io.cdap.cdap.api.annotation.Description; +import io.cdap.cdap.api.annotation.Name; +import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.api.data.format.StructuredRecord; +import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.etl.api.FailureCollector; +import io.cdap.cdap.etl.api.PipelineConfigurer; +import io.cdap.cdap.etl.api.batch.BatchSink; +import io.cdap.plugin.db.DBRecord; +import io.cdap.plugin.db.SchemaReader; +import io.cdap.plugin.db.batch.sink.AbstractDBSink; +import io.cdap.plugin.db.batch.sink.FieldsValidator; +import io.cdap.plugin.postgres.PostgresDBRecord; +import io.cdap.plugin.postgres.PostgresFieldsValidator; +import io.cdap.plugin.postgres.PostgresSchemaReader; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.StringJoiner; +import javax.annotation.Nullable; + +/** Sink support for a CloudSQL PostgreSQL database. */ +@Plugin(type = BatchSink.PLUGIN_TYPE) +@Name(CloudSQLPostgreSQLConstants.PLUGIN_NAME) +@Description( + "Writes records to a CloudSQL PostgreSQL table. Each record will be written in a row in the table") +public class CloudSQLPostgreSQLSink extends AbstractDBSink { + + private static final Character ESCAPE_CHAR = '"'; + + private final CloudSQLPostgreSQLSinkConfig cloudsqlPostgresqlSinkConfig; + + public CloudSQLPostgreSQLSink(CloudSQLPostgreSQLSinkConfig cloudsqlPostgresqlSinkConfig) { + super(cloudsqlPostgresqlSinkConfig); + this.cloudsqlPostgresqlSinkConfig = cloudsqlPostgresqlSinkConfig; + } + + @Override + public void configurePipeline(PipelineConfigurer pipelineConfigurer) { + FailureCollector failureCollector = pipelineConfigurer.getStageConfigurer().getFailureCollector(); + + CloudSQLPostgreSQLUtil.checkConnectionName( + failureCollector, + cloudsqlPostgresqlSinkConfig.instanceType, + cloudsqlPostgresqlSinkConfig.connectionName); + + super.configurePipeline(pipelineConfigurer); + } + + @Override + protected SchemaReader getSchemaReader() { + return new PostgresSchemaReader(); + } + + @Override + protected DBRecord getDBRecord(StructuredRecord output) { + return new PostgresDBRecord(output, columnTypes); + } + + @Override + protected void setColumnsInfo(List fields) { + List columnsList = new ArrayList<>(); + StringJoiner columnsJoiner = new StringJoiner(","); + for (Schema.Field field : fields) { + columnsList.add(field.getName()); + columnsJoiner.add(ESCAPE_CHAR + field.getName() + ESCAPE_CHAR); + } + + super.columns = Collections.unmodifiableList(columnsList); + super.dbColumns = columnsJoiner.toString(); + } + + @Override + protected FieldsValidator getFieldsValidator() { + return new PostgresFieldsValidator(); + } + + /** CloudSQL PostgreSQL sink config. */ + public static class CloudSQLPostgreSQLSinkConfig extends AbstractDBSink.DBSinkConfig { + + public CloudSQLPostgreSQLSinkConfig() { + this.instanceType = CloudSQLPostgreSQLConstants.PUBLIC_INSTANCE; + } + + @Name(CloudSQLPostgreSQLConstants.CONNECTION_NAME) + @Description( + "The CloudSQL instance to connect to. For a public instance, the connection string should be in the format " + + ":: which can be found in the instance overview page. For a private " + + "instance, enter the internal IP address of the Compute Engine VM cloudsql proxy is running on.") + public String connectionName; + + @Name(CloudSQLPostgreSQLConstants.CONNECTION_TIMEOUT) + @Description( + "The timeout value used for socket connect operations. If connecting to the server takes longer " + + "than this value, the connection is broken. The timeout is specified in seconds and a value " + + "of zero means that it is disabled") + @Nullable + public Integer connectionTimeout; + + @Name(DATABASE) + @Description("Database name to connect to") + public String database; + + @Name(TRANSACTION_ISOLATION_LEVEL) + @Description("Transaction isolation level for queries run by this sink.") + @Nullable + public String transactionIsolationLevel; + + @Name(CloudSQLPostgreSQLConstants.INSTANCE_TYPE) + @Description("Whether the CloudSQL instance to connect to is private or public.") + @Nullable + public String instanceType; + + @Override + public String getTransactionIsolationLevel() { + return transactionIsolationLevel; + } + + @Override + protected String getEscapedTableName() { + return ESCAPE_CHAR + tableName + ESCAPE_CHAR; + } + + @Override + public Map getDBSpecificArguments() { + return ImmutableMap.of( + CloudSQLPostgreSQLConstants.CONNECTION_TIMEOUT, String.valueOf(connectionTimeout)); + } + + @Override + public String getConnectionString() { + if (CloudSQLPostgreSQLConstants.PRIVATE_INSTANCE.equalsIgnoreCase(instanceType)) { + return String.format( + CloudSQLPostgreSQLConstants.PRIVATE_CLOUDSQL_POSTGRES_CONNECTION_STRING_FORMAT, + connectionName, + database); + } + + return String.format( + CloudSQLPostgreSQLConstants.PUBLIC_CLOUDSQL_POSTGRES_CONNECTION_STRING_FORMAT, + database, + connectionName); + } + } +} diff --git a/cloudsql-postgresql-plugin/src/main/java/io/cdap/plugin/cloudsql/postgres/CloudSQLPostgreSQLSource.java b/cloudsql-postgresql-plugin/src/main/java/io/cdap/plugin/cloudsql/postgres/CloudSQLPostgreSQLSource.java new file mode 100644 index 000000000..b3f451fad --- /dev/null +++ b/cloudsql-postgresql-plugin/src/main/java/io/cdap/plugin/cloudsql/postgres/CloudSQLPostgreSQLSource.java @@ -0,0 +1,124 @@ +/* + * Copyright © 2020 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.cloudsql.postgres; + +import io.cdap.cdap.api.annotation.Description; +import io.cdap.cdap.api.annotation.Name; +import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.etl.api.FailureCollector; +import io.cdap.cdap.etl.api.PipelineConfigurer; +import io.cdap.cdap.etl.api.batch.BatchSource; +import io.cdap.plugin.db.SchemaReader; +import io.cdap.plugin.db.batch.source.AbstractDBSource; +import io.cdap.plugin.postgres.PostgresDBRecord; +import io.cdap.plugin.postgres.PostgresSchemaReader; +import org.apache.hadoop.mapreduce.lib.db.DBWritable; + +import javax.annotation.Nullable; + +/** Batch source to read from a CloudSQL PostgreSQL instance database. */ +@Plugin(type = BatchSource.PLUGIN_TYPE) +@Name(CloudSQLPostgreSQLConstants.PLUGIN_NAME) +@Description( + "Reads from a CloudSQL PostgreSQL database table(s) using a configurable SQL query." + + " Outputs one record for each row returned by the query.") +public class CloudSQLPostgreSQLSource extends AbstractDBSource { + + private final CloudSQLPostgreSQLSourceConfig cloudsqlPostgresqlSourceConfig; + + public CloudSQLPostgreSQLSource(CloudSQLPostgreSQLSourceConfig cloudsqlPostgresqlSourceConfig) { + super(cloudsqlPostgresqlSourceConfig); + this.cloudsqlPostgresqlSourceConfig = cloudsqlPostgresqlSourceConfig; + } + + @Override + public void configurePipeline(PipelineConfigurer pipelineConfigurer) { + FailureCollector failureCollector = pipelineConfigurer.getStageConfigurer().getFailureCollector(); + + CloudSQLPostgreSQLUtil.checkConnectionName( + failureCollector, + cloudsqlPostgresqlSourceConfig.instanceType, + cloudsqlPostgresqlSourceConfig.connectionName); + + super.configurePipeline(pipelineConfigurer); + } + + @Override + protected SchemaReader getSchemaReader() { + return new PostgresSchemaReader(); + } + + @Override + protected Class getDBRecordType() { + return PostgresDBRecord.class; + } + + @Override + protected String createConnectionString() { + if (CloudSQLPostgreSQLConstants.PRIVATE_INSTANCE.equalsIgnoreCase( + cloudsqlPostgresqlSourceConfig.instanceType)) { + return String.format( + CloudSQLPostgreSQLConstants.PRIVATE_CLOUDSQL_POSTGRES_CONNECTION_STRING_FORMAT, + cloudsqlPostgresqlSourceConfig.connectionName, + cloudsqlPostgresqlSourceConfig.database); + } + + return String.format( + CloudSQLPostgreSQLConstants.PUBLIC_CLOUDSQL_POSTGRES_CONNECTION_STRING_FORMAT, + cloudsqlPostgresqlSourceConfig.database, + cloudsqlPostgresqlSourceConfig.connectionName); + } + + /** CloudSQL PostgreSQL source config. */ + public static class CloudSQLPostgreSQLSourceConfig extends AbstractDBSource.DBSourceConfig { + + public CloudSQLPostgreSQLSourceConfig() { + this.instanceType = CloudSQLPostgreSQLConstants.PUBLIC_INSTANCE; + } + + @Name(CloudSQLPostgreSQLConstants.CONNECTION_NAME) + @Description( + "The CloudSQL instance to connect to. For a public instance, the connection string should be in the format " + + ":: which can be found in the instance overview page. For a private " + + "instance, enter the internal IP address of the Compute Engine VM cloudsql proxy is running on.") + public String connectionName; + + @Name(DATABASE) + @Description("Database name to connect to") + public String database; + + @Name(CloudSQLPostgreSQLConstants.INSTANCE_TYPE) + @Description("Whether the CloudSQL instance to connect to is private or public.") + @Nullable + public String instanceType; + + @Override + public String getConnectionString() { + if (CloudSQLPostgreSQLConstants.PRIVATE_INSTANCE.equalsIgnoreCase(instanceType)) { + return String.format( + CloudSQLPostgreSQLConstants.PRIVATE_CLOUDSQL_POSTGRES_CONNECTION_STRING_FORMAT, + connectionName, + database); + } + + return String.format( + CloudSQLPostgreSQLConstants.PUBLIC_CLOUDSQL_POSTGRES_CONNECTION_STRING_FORMAT, + database, + connectionName); + } + } +} diff --git a/cloudsql-postgresql-plugin/src/main/java/io/cdap/plugin/cloudsql/postgres/CloudSQLPostgreSQLUtil.java b/cloudsql-postgresql-plugin/src/main/java/io/cdap/plugin/cloudsql/postgres/CloudSQLPostgreSQLUtil.java new file mode 100644 index 000000000..f6720d3eb --- /dev/null +++ b/cloudsql-postgresql-plugin/src/main/java/io/cdap/plugin/cloudsql/postgres/CloudSQLPostgreSQLUtil.java @@ -0,0 +1,62 @@ +/* + * Copyright © 2020 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.cloudsql.postgres; + +import com.google.common.net.InetAddresses; +import io.cdap.cdap.etl.api.FailureCollector; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Utility class for CloudSQL PostgreSQL. + */ +public class CloudSQLPostgreSQLUtil { + + /** + * Utility method to check the Connection Name format of a CloudSQL PostgreSQL instance. + * + * @param failureCollector {@link FailureCollector} for the pipeline + * @param instanceType CloudSQL PostgreSQL instance type + * @param connectionName Connection Name for the CloudSQL PostgreSQL instance + */ + public static void checkConnectionName( + FailureCollector failureCollector, String instanceType, String connectionName) { + + if (CloudSQLPostgreSQLConstants.PUBLIC_INSTANCE.equalsIgnoreCase(instanceType)) { + Pattern connectionNamePattern = + Pattern.compile(CloudSQLPostgreSQLConstants.CONNECTION_NAME_PATTERN); + Matcher matcher = connectionNamePattern.matcher(connectionName); + + if (!matcher.matches()) { + failureCollector + .addFailure( + "Connection Name must be in the format :: to connect to " + + "a public CloudSQL PostgreSQL instance.", null) + .withConfigProperty(CloudSQLPostgreSQLConstants.CONNECTION_NAME); + } + } else { + if (!InetAddresses.isInetAddress(connectionName)) { + failureCollector + .addFailure( + "Enter the internal IP address of the Compute Engine VM cloudsql proxy " + + "is running on, to connect to a private CloudSQL PostgreSQL instance.", null) + .withConfigProperty(CloudSQLPostgreSQLConstants.CONNECTION_NAME); + } + } + } +} diff --git a/cloudsql-postgresql-plugin/widgets/CloudSQLPostgreSQL-action.json b/cloudsql-postgresql-plugin/widgets/CloudSQLPostgreSQL-action.json new file mode 100644 index 000000000..9e9f124b9 --- /dev/null +++ b/cloudsql-postgresql-plugin/widgets/CloudSQLPostgreSQL-action.json @@ -0,0 +1,109 @@ +{ + "metadata": { + "spec-version": "1.5" + }, + "display-name": "CloudSQL PostgreSQL Execute", + "configuration-groups": [ + { + "label": "Basic", + "properties": [ + { + "widget-type": "textbox", + "label": "Driver Name", + "name": "jdbcPluginName", + "widget-attributes": { + "default": "cloudsql-postgresql" + } + }, + { + "widget-type": "textbox", + "label": "Database", + "name": "database" + } + ] + }, + { + "label": "Credentials", + "properties": [ + { + "widget-type": "textbox", + "label": "Username", + "name": "user", + "widget-attributes": { + "placeholder": "The username to use to connect to the CloudSQL database" + } + }, + { + "widget-type": "password", + "label": "Password", + "name": "password", + "widget-attributes": { + "placeholder": "The password to use to connect to the CloudSQL database" + } + } + ] + }, + { + "label": "CloudSQL Properties", + "properties": [ + { + "widget-type": "radio-group", + "label": "CloudSQL Instance Type", + "name": "instanceType", + "widget-attributes": { + "layout": "inline", + "default": "public", + "options": [ + { + "id": "public", + "label": "Public" + }, + { + "id": "private", + "label": "Private" + } + ] + } + }, + { + "widget-type": "textbox", + "label": "Connection Name", + "name": "connectionName", + "widget-attributes": { + "placeholder": "CloudSQL instance connection name" + } + }, + { + "widget-type": "textarea", + "label": "Database Command", + "name": "query" + } + ] + }, + { + "label": "Advanced", + "properties":[ + { + "widget-type": "number", + "label": "Connection Timeout", + "name":"connectionTimeout", + "widget-attributes": { + "default": "10" + } + }, + { + "widget-type": "keyvalue", + "label": "Connection Arguments", + "name": "connectionArguments", + "widget-attributes": { + "showDelimiter": "false", + "key-placeholder": "Key", + "value-placeholder": "Value", + "kv-delimiter": "=", + "delimiter": ";" + } + } + ] + } + ] +} diff --git a/cloudsql-postgresql-plugin/widgets/CloudSQLPostgreSQL-batchsink.json b/cloudsql-postgresql-plugin/widgets/CloudSQLPostgreSQL-batchsink.json new file mode 100644 index 000000000..666ca499e --- /dev/null +++ b/cloudsql-postgresql-plugin/widgets/CloudSQLPostgreSQL-batchsink.json @@ -0,0 +1,143 @@ +{ + "metadata": { + "spec-version": "1.5" + }, + "display-name": "CloudSQL PostgreSQL", + "configuration-groups": [ + { + "label": "Basic", + "properties": [ + { + "widget-type": "textbox", + "label": "Reference Name", + "name": "referenceName", + "widget-attributes": { + "placeholder": "Name used to identify this sink for lineage" + } + }, + { + "widget-type": "textbox", + "label": "Driver Name", + "name": "jdbcPluginName", + "widget-attributes": { + "default": "cloudsql-postgresql" + } + }, + { + "widget-type": "textbox", + "label": "Database", + "name": "database" + } + ] + }, + { + "label": "Credentials", + "properties": [ + { + "widget-type": "textbox", + "label": "Username", + "name": "user", + "widget-attributes": { + "placeholder": "The username to use to connect to the CloudSQL database" + } + }, + { + "widget-type": "password", + "label": "Password", + "name": "password", + "widget-attributes": { + "placeholder": "The password to use to connect to the CloudSQL database" + } + } + ] + }, + { + "label": "CloudSQL Properties", + "properties": [ + { + "widget-type": "radio-group", + "label": "CloudSQL Instance Type", + "name": "instanceType", + "widget-attributes": { + "layout": "inline", + "default": "public", + "options": [ + { + "id": "public", + "label": "Public" + }, + { + "id": "private", + "label": "Private" + } + ] + } + }, + { + "widget-type": "textbox", + "label": "Connection Name", + "name": "connectionName", + "widget-attributes": { + "placeholder": "CloudSQL instance connection name" + } + }, + { + "widget-type": "textbox", + "label": "Table Name", + "name": "tableName", + "widget-attributes": { + "placeholder": "The table to write to" + } + } + ] + }, + { + "label": "Advanced", + "properties":[ + { + "widget-type": "select", + "label": "Transaction Isolation Level", + "name": "transactionIsolationLevel", + "widget-attributes": { + "default": "TRANSACTION_READ_COMMITTED", + "values": [ + "TRANSACTION_READ_COMMITTED", + "TRANSACTION_READ_UNCOMMITTED", + "TRANSACTION_REPEATABLE_READ", + "TRANSACTION_SERIALIZABLE", + "TRANSACTION_NONE" + ] + } + }, + { + "widget-type": "number", + "label": "Connection Timeout", + "name":"connectionTimeout", + "widget-attributes": { + "default": "10" + } + }, + { + "widget-type": "keyvalue", + "label": "Connection Arguments", + "name": "connectionArguments", + "widget-attributes": { + "showDelimiter": "false", + "key-placeholder": "Key", + "value-placeholder": "Value", + "kv-delimiter": "=", + "delimiter": ";" + } + } + ] + } + ], + "outputs": [], + "jump-config": { + "datasets": [ + { + "ref-property-name": "referenceName" + } + ] + } +} diff --git a/cloudsql-postgresql-plugin/widgets/CloudSQLPostgreSQL-batchsource.json b/cloudsql-postgresql-plugin/widgets/CloudSQLPostgreSQL-batchsource.json new file mode 100644 index 000000000..32ad7ae5b --- /dev/null +++ b/cloudsql-postgresql-plugin/widgets/CloudSQLPostgreSQL-batchsource.json @@ -0,0 +1,162 @@ +{ + "metadata": { + "spec-version": "1.5" + }, + "display-name": "CloudSQL PostgreSQL", + "configuration-groups": [ + { + "label": "Basic", + "properties": [ + { + "widget-type": "textbox", + "label": "Reference Name", + "name": "referenceName", + "widget-attributes": { + "placeholder": "Name used to identify this source for lineage" + } + }, + { + "widget-type": "textbox", + "label": "Driver Name", + "name": "jdbcPluginName", + "widget-attributes": { + "default": "cloudsql-postgresql" + } + }, + { + "widget-type": "textbox", + "label": "Database", + "name": "database" + } + ] + }, + { + "label": "Credentials", + "properties": [ + { + "widget-type": "textbox", + "label": "Username", + "name": "user", + "widget-attributes": { + "placeholder": "The username to use to connect to the CloudSQL database" + } + }, + { + "widget-type": "password", + "label": "Password", + "name": "password", + "widget-attributes": { + "placeholder": "The password to use to connect to the CloudSQL database" + } + } + ] + }, + { + "label": "CloudSQL Properties", + "properties": [ + { + "widget-type": "radio-group", + "label": "CloudSQL Instance Type", + "name": "instanceType", + "widget-attributes": { + "layout": "inline", + "default": "public", + "options": [ + { + "id": "public", + "label": "Public" + }, + { + "id": "private", + "label": "Private" + } + ] + } + }, + { + "widget-type": "textbox", + "label": "Connection Name", + "name": "connectionName", + "widget-attributes": { + "placeholder": "CloudSQL instance connection name" + } + }, + { + "widget-type": "textarea", + "label": "Import Query", + "name": "importQuery", + "widget-attributes": { + "rows": "4" + } + }, + { + "widget-type": "get-schema", + "widget-category": "plugin" + } + ] + }, + { + "label": "Advanced", + "properties": [ + { + "widget-type": "textarea", + "label": "Bounding Query", + "name": "boundingQuery", + "widget-attributes": { + "rows": "4" + } + }, + { + "widget-type": "textbox", + "label": "Split Column", + "name": "splitBy" + }, + { + "widget-type": "textbox", + "label": "Number of Splits", + "name": "numSplits", + "widget-attributes": { + "default": "1" + } + }, + { + "widget-type": "keyvalue", + "label": "Connection Arguments", + "name": "connectionArguments", + "widget-attributes": { + "showDelimiter": "false", + "key-placeholder": "Key", + "value-placeholder": "Value", + "kv-delimiter": "=", + "delimiter": ";" + } + } + ] + } + ], + "outputs": [ + { + "name": "schema", + "widget-type": "schema", + "widget-attributes": { + "schema-types": [ + "boolean", + "int", + "long", + "float", + "double", + "bytes", + "string" + ], + "schema-default-type": "string" + } + } + ], + "jump-config": { + "datasets": [ + { + "ref-property-name": "referenceName" + } + ] + } +} diff --git a/pom.xml b/pom.xml index 5c1061ef7..0033f5e2b 100644 --- a/pom.xml +++ b/pom.xml @@ -40,6 +40,7 @@ memsql-plugin saphana-plugin cloudsql-mysql-plugin + cloudsql-postgresql-plugin