Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support PostgreSQL Databases #437

Merged
merged 32 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
9e3d274
add postgres driver support
carsonab Sep 24, 2024
5538cba
add postgres migrations
carsonab Sep 26, 2024
e310469
add postgres tls support
carsonab Sep 26, 2024
f3734f0
add script to generate tls certs
carsonab Sep 26, 2024
58f1a6d
gitleaks ignore postgres test certs
carsonab Sep 26, 2024
cf3dba8
add postgres connection check
carsonab Sep 26, 2024
94ad19b
wrapping new database errors
carsonab Sep 26, 2024
aa2b10a
generate test certs on setup
carsonab Sep 26, 2024
b9037a3
add postgres test for non tls
carsonab Sep 26, 2024
2d691e0
add postgres uniqueviolation
carsonab Sep 26, 2024
527bc58
use alloydbomni docker image instead of postgres
carsonab Sep 26, 2024
dc7edc4
add alloydb connector support
carsonab Sep 26, 2024
cc8581a
cleanup postgres configs
carsonab Sep 26, 2024
cdf8853
update gencerts.sh
carsonab Sep 27, 2024
de0dc47
add private service connect options
carsonab Sep 27, 2024
e6ba4d7
alloydb-go-connector tests
carsonab Sep 27, 2024
33fbfac
test alloydb migrations
carsonab Sep 28, 2024
b21fc44
Added in setting the connections for postgres connections
InfernoJJ Sep 30, 2024
11d6cdf
Adding in the logging for errors
InfernoJJ Sep 30, 2024
e76d2ae
test with postgres image
carsonab Sep 30, 2024
59fbc9d
build: add health check to postgres container
adamdecaf Sep 30, 2024
2563704
test with different host port
carsonab Sep 30, 2024
a4e8a48
Revert "test with different host port"
adamdecaf Sep 30, 2024
b91310f
database: use 127.0.0.1 instead of localhost
adamdecaf Sep 30, 2024
112ae62
add ip to cert SANs
carsonab Sep 30, 2024
97ae1c2
build: show docker-compose logs on failure
adamdecaf Sep 30, 2024
e0b1055
build: forgot github removed docker-compose
adamdecaf Sep 30, 2024
1798985
chore: chmod postgres key/certs for Github actions
adamdecaf Sep 30, 2024
ae459f4
build: capture certs on startup
adamdecaf Sep 30, 2024
bcc444c
build: try owning as root
adamdecaf Sep 30, 2024
c544239
test: use directory outside of /var/lib/postgresql
adamdecaf Sep 30, 2024
c21a935
build: skip TLS postgres setup
adamdecaf Sep 30, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,6 @@ coverage.txt

*.pyc

.idea/*
.idea/*

testcerts/*
3 changes: 3 additions & 0 deletions .gitleaksignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
testcerts/client.key:private-key:1
testcerts/root.key:private-key:1
testcerts/server.key:private-key:1
18 changes: 14 additions & 4 deletions database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,28 @@ func New(ctx context.Context, logger log.Logger, config DatabaseConfig) (*sql.DB
if config.MySQL != nil {
preppedDb, err := mysqlConnection(logger, config.MySQL, config.DatabaseName)
if err != nil {
return nil, err
return nil, fmt.Errorf("configuring mysql connection: %v", err)
}

db, err := preppedDb.Connect(ctx)
if err != nil {
return nil, err
return nil, fmt.Errorf("connecting to mysql: %w", err)
}

return ApplyConnectionsConfig(db, &config.MySQL.Connections, logger), nil

} else if config.Spanner != nil {
return spannerConnection(logger, *config.Spanner, config.DatabaseName)
db, err := spannerConnection(logger, *config.Spanner, config.DatabaseName)
if err != nil {
return nil, fmt.Errorf("connecting to spanner: %w", err)
}
return db, nil
} else if config.Postgres != nil {
db, err := postgresConnection(ctx, logger, *config.Postgres, config.DatabaseName)
if err != nil {
return nil, fmt.Errorf("connecting to postgres: %w", err)
}
return ApplyConnectionsConfig(db, &config.Postgres.Connections, logger), nil
}

return nil, fmt.Errorf("database config not defined")
Expand Down Expand Up @@ -61,7 +71,7 @@ func NewAndMigrate(ctx context.Context, logger log.Logger, config DatabaseConfig
// UniqueViolation returns true when the provided error matches a database error
// for duplicate entries (violating a unique table constraint).
func UniqueViolation(err error) bool {
return MySQLUniqueViolation(err) || SpannerUniqueViolation(err)
return MySQLUniqueViolation(err) || SpannerUniqueViolation(err) || PostgresUniqueViolation(err)
}

func DataTooLong(err error) bool {
Expand Down
28 changes: 28 additions & 0 deletions database/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/golang-migrate/migrate/v4"
"github.com/golang-migrate/migrate/v4/database"
migmysql "github.com/golang-migrate/migrate/v4/database/mysql"
migpostgres "github.com/golang-migrate/migrate/v4/database/postgres"
"github.com/golang-migrate/migrate/v4/source"
"github.com/golang-migrate/migrate/v4/source/iofs"

Expand Down Expand Up @@ -155,6 +156,29 @@ func getDriver(logger log.Logger, config DatabaseConfig, opts *migrateOptions) (
return nil, nil, err
}
}
} else if config.Postgres != nil {
if opts.source == nil {
src, err := NewPkgerSource("postgres", false)
if err != nil {
return nil, nil, err
}
opts.source = &SourceDriver{
name: "pkger-postgres",
Driver: src,
}
}

if opts.driver == nil {
db, err := New(context.Background(), logger, config)
if err != nil {
return nil, nil, err
}

opts.driver, err = PostgresDriver(db)
if err != nil {
return nil, nil, err
}
}
}

if opts.source == nil || opts.driver == nil {
Expand All @@ -172,6 +196,10 @@ func SpannerDriver(config DatabaseConfig) (database.Driver, error) {
return SpannerMigrationDriver(*config.Spanner, config.DatabaseName)
}

func PostgresDriver(db *sql.DB) (database.Driver, error) {
return migpostgres.WithInstance(db, &migpostgres.Config{})
}

type MigrateOption func(o *migrateOptions) error

type SourceDriver struct {
Expand Down
22 changes: 22 additions & 0 deletions database/model_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
type DatabaseConfig struct {
MySQL *MySQLConfig
Spanner *SpannerConfig
Postgres *PostgresConfig
DatabaseName string
}

Expand All @@ -23,6 +24,27 @@ type SpannerConfig struct {
DisableCleanStatements bool
}

type PostgresConfig struct {
Address string
User string
Password string
Connections ConnectionsConfig
TLS *PostgresTLSConfig
Alloy *PostgresAlloyConfig
}

type PostgresTLSConfig struct {
CACertFile string
ClientKeyFile string
ClientCertFile string
}

type PostgresAlloyConfig struct {
InstanceURI string
UseIAM bool
UsePSC bool
}

type MySQLConfig struct {
Address string
User string
Expand Down
145 changes: 145 additions & 0 deletions database/postgres.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package database

import (
"context"
"database/sql"
"errors"
"fmt"
"net"

"cloud.google.com/go/alloydbconn"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/stdlib"
_ "github.com/jackc/pgx/v5/stdlib"
"github.com/moov-io/base/log"
)

const (
// PostgreSQL Error Codes
// https://www.postgresql.org/docs/current/errcodes-appendix.html
postgresErrUniqueViolation = "23505"
)

func postgresConnection(ctx context.Context, logger log.Logger, config PostgresConfig, databaseName string) (*sql.DB, error) {
var connStr string
if config.Alloy != nil {
c, err := getAlloyDBConnectorConnStr(ctx, config, databaseName)
if err != nil {
return nil, logger.LogErrorf("creating alloydb connection: %w", err).Err()
}
connStr = c
} else {
c, err := getPostgresConnStr(config, databaseName)
if err != nil {
return nil, logger.LogErrorf("creating postgres connection: %w", err).Err()
}
connStr = c
}

db, err := sql.Open("pgx", connStr)
if err != nil {
return nil, logger.LogErrorf("opening database: %w", err).Err()
}

err = db.Ping()
if err != nil {
_ = db.Close()
return nil, logger.LogErrorf("connecting to database: %w", err).Err()
}

return db, nil
}

func getPostgresConnStr(config PostgresConfig, databaseName string) (string, error) {
url := fmt.Sprintf("postgres://%s:%s@%s/%s", config.User, config.Password, config.Address, databaseName)

params := ""

if config.TLS != nil {
params += "sslmode=verify-full"

if config.TLS.CACertFile == "" {
return "", fmt.Errorf("missing TLS CA file")
}
params += "&sslrootcert=" + config.TLS.CACertFile

if config.TLS.ClientCertFile != "" {
params += "&sslcert=" + config.TLS.ClientCertFile
}

if config.TLS.ClientKeyFile != "" {
params += "&sslkey=" + config.TLS.ClientKeyFile
}
}

connStr := fmt.Sprintf("%s?%s", url, params)
return connStr, nil
}

func getAlloyDBConnectorConnStr(ctx context.Context, config PostgresConfig, databaseName string) (string, error) {
if config.Alloy == nil {
return "", fmt.Errorf("missing alloy config")
}

var dialer *alloydbconn.Dialer
var dsn string

if config.Alloy.UseIAM {
d, err := alloydbconn.NewDialer(ctx, alloydbconn.WithIAMAuthN())
if err != nil {
return "", fmt.Errorf("creating alloydb dialer: %v", err)
}
dialer = d
dsn = fmt.Sprintf(
// sslmode is disabled because the alloy db connection dialer will handle it
// no password is used with IAM
"user=%s dbname=%s sslmode=disable",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason we format a DSN here but for vanilla postgres use their URI syntax?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either should work. I just preferred the key=value syntax when not giving a server (since thats handled by the alloydbconn library when connecting to a hosted AlloyDB instance)

config.User, databaseName,
)
} else {
d, err := alloydbconn.NewDialer(ctx)
if err != nil {
return "", fmt.Errorf("creating alloydb dialer: %v", err)
}
dialer = d
dsn = fmt.Sprintf(
// sslmode is disabled because the alloy db connection dialer will handle it
"user=%s password=%s dbname=%s sslmode=disable",
config.User, config.Password, databaseName,
)
}

// TODO
//cleanup := func() error { return d.Close() }

connConfig, err := pgx.ParseConfig(dsn)
if err != nil {
return "", fmt.Errorf("failed to parse pgx config: %v", err)
}

var connOptions []alloydbconn.DialOption
if config.Alloy.UsePSC {
connOptions = append(connOptions, alloydbconn.WithPSC())
}

connConfig.DialFunc = func(ctx context.Context, _ string, _ string) (net.Conn, error) {
return dialer.Dial(ctx, config.Alloy.InstanceURI, connOptions...)
}

connStr := stdlib.RegisterConnConfig(connConfig)
return connStr, nil
}

func PostgresUniqueViolation(err error) bool {
if err == nil {
return false
}
var pgError *pgconn.PgError
if errors.As(err, &pgError) {
if pgError.Code == postgresErrUniqueViolation {
return true
}
}
return false
}
Loading
Loading