From 152b23180405c1bb8f906cbc852a740b2d1d2ddb Mon Sep 17 00:00:00 2001 From: Janez Troha Date: Fri, 11 Aug 2023 14:28:43 +0200 Subject: [PATCH] Add support for copy command --- README.md | 10 +++--- subsetter/db_test.go | 18 ++++++++-- subsetter/info.go | 17 ++++++++++ subsetter/info_test.go | 31 +++++++++++++++++ subsetter/query.go | 19 +++++++++++ subsetter/query_test.go | 74 +++++++++++++++++++++++++++++++++++++++++ subsetter/relations.go | 13 ++++++-- 7 files changed, 172 insertions(+), 10 deletions(-) create mode 100644 subsetter/info.go create mode 100644 subsetter/info_test.go diff --git a/README.md b/README.md index 3bd6326..5f17498 100644 --- a/README.md +++ b/README.md @@ -1,17 +1,17 @@ # pg-subsetter -`pg-subsetter` is a powerful and efficient tool designed to synchronize a fraction of a PostgreSQL database to another PostgreSQL database on the fly. Written in Go, this utility is tailored for the modern, scalable architecture that demands performance and robustness. +`pg-subsetter` is a powerful and efficient tool designed to synchronize a fraction of a PostgreSQL database to another PostgreSQL database on the fly, it does not copy the SCHEMA, this means that your target database has to have schema populated in some other way. -### Database Fraction Synchronization: +### Database Fraction Synchronization `pg-subsetter` allows you to select and sync a specific subset of your database. Whether it's a fraction of a table or a particular dataset, you can have it replicated in another database without synchronizing the entire DB. -### Integrity Preservation with Foreign Keys: +### Integrity Preservation with Foreign Keys Foreign keys play a vital role in maintaining the relationships between tables. `pg-subsetter` ensures that all foreign keys are handled correctly during the synchronization process, maintaining the integrity and relationships of the data. -### Efficient COPY Method: +### Efficient COPY Method Utilizing the native PostgreSQL COPY command, pg-subsetter performs data transfer with high efficiency. This method significantly speeds up the synchronization process, minimizing downtime and resource consumption. -### Stateless Operation: +### Stateless Operation `pg-subsetter` is built to be stateless, meaning it does not maintain any internal state between runs. This ensures that each synchronization process is independent, enhancing reliability and making it easier to manage and scale. diff --git a/subsetter/db_test.go b/subsetter/db_test.go index 61cd043..543599a 100644 --- a/subsetter/db_test.go +++ b/subsetter/db_test.go @@ -2,6 +2,7 @@ package subsetter import ( "context" + "fmt" "os" "github.com/jackc/pgx/v5" @@ -23,12 +24,12 @@ func getTestConnection() *pgx.Conn { func populateTests(conn *pgx.Conn) { conn.Exec(context.Background(), ` CREATE TABLE simple ( - id UUID PRIMARY KEY, + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), text TEXT ); CREATE TABLE relation ( - id UUID PRIMARY KEY, + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), simple_id UUID ); @@ -36,6 +37,19 @@ func populateTests(conn *pgx.Conn) { `) } +func populateTestsWithData(conn *pgx.Conn, table string, size int) { + for i := 0; i < size; i++ { + query := fmt.Sprintf("INSERT INTO %s (text) VALUES ('test%d') RETURNING id", table, i) + var row string + err := conn.QueryRow(context.Background(), query).Scan(&row) + fmt.Println(err) + query = fmt.Sprintf("INSERT INTO relation (simple_id) VALUES ('%v')", row) + + conn.Exec(context.Background(), query) + + } +} + func clearPopulateTests(conn *pgx.Conn) { conn.Exec(context.Background(), ` ALTER TABLE relation DROP CONSTRAINT relation_simple_fk; diff --git a/subsetter/info.go b/subsetter/info.go new file mode 100644 index 0000000..2025c99 --- /dev/null +++ b/subsetter/info.go @@ -0,0 +1,17 @@ +package subsetter + +import "math" + +// GetTargetSet returns a subset of tables with the number of rows scaled by the fraction. +func GetTargetSet(fraction float64, tables []Table) []Table { + var subset []Table + + for _, table := range tables { + subset = append(subset, Table{ + Name: table.Name, + Rows: int(math.Pow(10, math.Log10(float64(table.Rows))*fraction)), + }) + } + + return subset +} diff --git a/subsetter/info_test.go b/subsetter/info_test.go new file mode 100644 index 0000000..58d45b1 --- /dev/null +++ b/subsetter/info_test.go @@ -0,0 +1,31 @@ +package subsetter + +import ( + "context" + "reflect" + "testing" +) + +func TestGetTargetSet(t *testing.T) { + conn := getTestConnection() + populateTests(conn) + defer conn.Close(context.Background()) + defer clearPopulateTests(conn) + + tests := []struct { + name string + fraction float64 + tables []Table + want []Table + }{ + {"simple", 0.5, []Table{{"simple", 1000}}, []Table{{"simple", 31}}}, + {"simple", 0.5, []Table{{"simple", 10}}, []Table{{"simple", 3}}}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := GetTargetSet(tt.fraction, tt.tables); !reflect.DeepEqual(got, tt.want) { + t.Errorf("GetTargetSet() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/subsetter/query.go b/subsetter/query.go index 0ffcda3..325cecc 100644 --- a/subsetter/query.go +++ b/subsetter/query.go @@ -1,7 +1,9 @@ package subsetter import ( + "bytes" "context" + "fmt" "github.com/jackc/pgx/v5" ) @@ -40,3 +42,20 @@ func GetTablesWithRows(conn *pgx.Conn) (tables []Table, err error) { return } + +func CopyTableToString(table string, limit int, conn *pgx.Conn) (result string, err error) { + q := fmt.Sprintf(`copy (SELECT * FROM %s order by random() limit %d) to stdout`, table, limit) + var buff bytes.Buffer + conn.PgConn().CopyTo(context.Background(), &buff, q) + result = buff.String() + return +} + +func CopyStringToTable(table string, data string, conn *pgx.Conn) (err error) { + q := fmt.Sprintf(`copy %s from stdout`, table) + var buff bytes.Buffer + buff.WriteString(data) + conn.PgConn().CopyFrom(context.Background(), &buff, q) + + return +} diff --git a/subsetter/query_test.go b/subsetter/query_test.go index 14ff944..dfad7c6 100644 --- a/subsetter/query_test.go +++ b/subsetter/query_test.go @@ -3,6 +3,7 @@ package subsetter import ( "context" "reflect" + "strings" "testing" "github.com/jackc/pgx/v5" @@ -55,3 +56,76 @@ func TestGetTablesWithRows(t *testing.T) { }) } } + +func TestCopyRowToString(t *testing.T) { + conn := getTestConnection() + populateTests(conn) + defer conn.Close(context.Background()) + defer clearPopulateTests(conn) + populateTestsWithData(conn, "simple", 10) + + tests := []struct { + name string + table string + conn *pgx.Conn + wantResult bool + wantErr bool + }{ + {"With tables", "simple", conn, true, false}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gotResult, err := CopyTableToString(tt.table, 10, tt.conn) + if (err != nil) != tt.wantErr { + t.Errorf("CopyRowToString() error = %v, wantErr %v", err, tt.wantErr) + return + } + if strings.Contains(gotResult, "test") != tt.wantResult { + t.Errorf("CopyRowToString() = %v, want %v", gotResult, tt.wantResult) + } + }) + } +} + +func TestCopyStringToTable(t *testing.T) { + conn := getTestConnection() + populateTests(conn) + defer conn.Close(context.Background()) + defer clearPopulateTests(conn) + populateTestsWithData(conn, "simple", 10) + + tests := []struct { + name string + table string + data string + conn *pgx.Conn + wantResult int + wantErr bool + }{ + {"With tables", "simple", "cccc5f58-44d3-4d7a-bf37-a97d4f081a63 test\n", conn, 1, false}, + {"With more tables", "simple", "edcd63fe-303e-4d51-83ea-3fd00740ba2c test4\na170b0f5-3aec-469c-9589-cf25888a72e2 test7", conn, 2, false}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := CopyStringToTable(tt.table, tt.data, tt.conn) + if (err != nil) != tt.wantErr { + t.Errorf("CopyStringToTable() error = %v, wantErr %v", err, tt.wantErr) + return + } + if tt.wantResult == insertedRows(tt.table, tt.conn) { + t.Errorf("CopyStringToTable() = %v, want %v", tt.wantResult, tt.wantResult) + } + + }) + } +} + +func insertedRows(s string, conn *pgx.Conn) int { + tables, _ := GetTablesWithRows(conn) + for _, table := range tables { + if table.Name == s { + return table.Rows + } + } + return 0 +} diff --git a/subsetter/relations.go b/subsetter/relations.go index f756d33..f46de46 100644 --- a/subsetter/relations.go +++ b/subsetter/relations.go @@ -9,15 +9,22 @@ import ( // GetRelations returns a list of tables that have a foreign key for particular table. func GetRelations(table string, conn *pgx.Conn) (relations []string, err error) { - q := `SELECT ccu.table_name::string AS foreign_table_name + q := `SELECT tc.table_name AS foreign_table_name FROM information_schema.table_constraints AS tc JOIN information_schema.key_column_usage AS kcu ON tc.constraint_name = kcu.constraint_name JOIN information_schema.constraint_column_usage AS ccu ON ccu.constraint_name = tc.constraint_name - WHERE tc.constraint_type = 'FOREIGN KEY' AND tc.table_name = $1;` + WHERE tc.constraint_type = 'FOREIGN KEY' AND ccu.table_name = $1;` - err = conn.QueryRow(context.Background(), q, table).Scan(&relations) + rows, err := conn.Query(context.Background(), q, table) + for rows.Next() { + var table string + rows.Scan(&table) + + relations = append(relations, table) + } + rows.Close() return }