Skip to content

Commit

Permalink
DCV-2572 dbt-coves blue-green (#482)
Browse files Browse the repository at this point in the history
* DCV-2572 dbt-coves blue-green

* Review changes

* Add tests

* Fix dbt_selector behavior difference between flag and config

* Leftover breakpoint

* Changes to testing and flags/config

* Limit tests to Blue Green for testing

* Limit tests to Blue Green for testing

* Limit tests to Blue Green for testing

* Remove blue-green test single-running

* Final changes to blue-green

* Rename production-database for service-connection-name

* Make service_connection_name uppercase

* Change env on the fly + run dbt compile

* Remove dbt compile usage, extend deferral to MANIFEST_FOUND

* Final changes
  • Loading branch information
BAntonellini authored Jul 17, 2024
1 parent e189873 commit 99697b3
Show file tree
Hide file tree
Showing 17 changed files with 826 additions and 4 deletions.
8 changes: 8 additions & 0 deletions .github/workflows/main_ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,14 @@ jobs:
SERVICE_ACCOUNT_GCP: op://Engineering - dbt-coves/dbt-coves-tests/SERVICE_ACCOUNT_GCP
PROJECT_BIGQUERY: op://Engineering - dbt-coves/dbt-coves-tests/PROJECT_BIGQUERY
DATASET_BIGQUERY: op://Engineering - dbt-coves/dbt-coves-tests/DATASET_BIGQUERY
# Blue green
DATACOVES__DBT_COVES_TEST__ACCOUNT: op://Engineering - dbt-coves/dbt-coves-tests/ACCOUNT_SNOWFLAKE
DATACOVES__DBT_COVES_TEST__USER: op://Engineering - dbt-coves/dbt-coves-tests/USER_SNOWFLAKE
DATACOVES__DBT_COVES_TEST__PASSWORD: op://Engineering - dbt-coves/dbt-coves-tests/PASSWORD_SNOWFLAKE
DATACOVES__DBT_COVES_TEST__WAREHOUSE: op://Engineering - dbt-coves/dbt-coves-tests/WAREHOUSE_SNOWFLAKE
DATACOVES__DBT_COVES_TEST__DATABASE: op://Engineering - dbt-coves/dbt-coves-tests/DATABASE_SNOWFLAKE
DATACOVES__DBT_COVES_TEST__SCHEMA: op://Engineering - dbt-coves/dbt-coves-tests/SCHEMA_SNOWFLAKE
DATACOVES__DBT_COVES_TEST__ROLE: op://Engineering - dbt-coves/dbt-coves-tests/ROLE_SNOWFLAKE

- name: Create profiles
run: |
Expand Down
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -151,4 +151,3 @@ cython_debug/
**.vim**
.gitsecret/keys/random_seed
!*.secret
profiles.yml
22 changes: 22 additions & 0 deletions dbt_coves/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,18 @@ class DataSyncModel(BaseModel):
snowflake: Optional[SnowflakeDataSyncModel] = SnowflakeDataSyncModel()


class BlueGreenModel(BaseModel):
service_connection_name: Optional[str] = ""
staging_database: Optional[str] = ""
staging_suffix: Optional[str] = ""
drop_staging_db_at_start: Optional[bool] = False
keep_staging_db_on_success: Optional[bool] = False
drop_staging_db_on_failure: Optional[bool] = False
dbt_selector: Optional[str] = ""
defer: Optional[bool] = False
full_refresh: Optional[bool] = False


class ConfigModel(BaseModel):
generate: Optional[GenerateModel] = GenerateModel()
extract: Optional[ExtractModel] = ExtractModel()
Expand All @@ -162,6 +174,7 @@ class ConfigModel(BaseModel):
dbt: Optional[RunDbtModel] = RunDbtModel()
data_sync: Optional[DataSyncModel] = DataSyncModel()
disable_tracking: Optional[bool] = False
blue_green: Optional[BlueGreenModel] = BlueGreenModel()


class DbtCovesConfig:
Expand Down Expand Up @@ -249,6 +262,15 @@ class DbtCovesConfig:
"load.fivetran.secrets_key",
"data_sync.redshift.tables",
"data_sync.snowflake.tables",
"blue_green.service_connection_name",
"blue_green.staging_database",
"blue_green.staging_suffix",
"blue_green.drop_staging_db_at_start",
"blue_green.drop_staging_db_on_failure",
"blue_green.dbt_selector",
"blue_green.defer",
"blue_green.full_refresh",
"blue_green.keep_staging_db_on_success",
]

def __init__(self, flags: DbtCovesFlags) -> None:
Expand Down
11 changes: 10 additions & 1 deletion dbt_coves/core/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from dbt_coves.config.config import DbtCovesConfig
from dbt_coves.core.exceptions import MissingCommand, MissingDbtProject
from dbt_coves.tasks.base import BaseTask
from dbt_coves.tasks.blue_green.main import BlueGreenTask
from dbt_coves.tasks.data_sync.main import DataSyncTask
from dbt_coves.tasks.dbt.main import RunDbtTask
from dbt_coves.tasks.extract.main import ExtractTask
Expand Down Expand Up @@ -218,7 +219,15 @@
# Register subcommands
[
task.register_parser(sub_parsers, base_subparser)
for task in [GenerateTask, SetupTask, ExtractTask, LoadTask, RunDbtTask, DataSyncTask]
for task in [
GenerateTask,
SetupTask,
ExtractTask,
LoadTask,
RunDbtTask,
DataSyncTask,
BlueGreenTask,
]
]


Expand Down
Empty file.
214 changes: 214 additions & 0 deletions dbt_coves/tasks/blue_green/clone_db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
import threading
import time

from rich.console import Console
from snowflake.connector import DictCursor

console = Console()


class CloneDB:
"""
Class to clone a Snowflake database from one to another. This is intended to be used in a blue/green deployment and
will clone the schemas and grants from the blue database to the green database.
"""

def __init__(
self, blue_database: str, green_database: str, snowflake_conn, thread_count: int = 20
):
"""
Blue/Green deployment for Snowflake databases.
Args:
blue_database: The current production database.
green_database: The temporary database where the build will occur.
"""
self.start_time = time.time()
self.time_check = self.start_time
self._list_of_schemas_to_exclude = [
"INFORMATION_SCHEMA",
"ACCOUNT_USAGE",
"SECURITY",
"SNOWFLAKE",
"UTILS",
"PUBLIC",
]
self.blue_database = blue_database
self.green_database = green_database
self.con = snowflake_conn
self._thread_count = thread_count

def drop_database(self):
"""
Utility function to drop the green database.
Returns:
None
"""
console.print(f"Dropping database [green]{self.green_database}[/green]")
self.con.cursor().execute(f"drop database if exists {self.green_database};")

def create_database(self, database: str):
"""
Creates the specified database.
"""
console.print(f"Creating database [green]{self.green_database}[/green]")
self.con.cursor().execute(f"create database {database};")

def clone_database_grants(self, blue_database: str, green_database: str):
"""
Clones the grants from the blue database to the green database.
Args:
blue_database: The name of the blue database (prod)
green_database: The name of the green database (staging).
Returns:
None
"""
console.print(
f"Cloning grants from [blue]{self.blue_database}[/blue] to green [green]{self.green_database}[/green]"
)
dict_cursor = self.con.cursor(DictCursor)
grants_sql_stg_1 = f"""show grants on database {blue_database}"""
dict_cursor.execute(grants_sql_stg_1)
grants = dict_cursor.fetchall()
threaded_run_commands = ThreadedRunCommands(self.con, self._thread_count)
for grant in grants:
grant_sql = (
f"GRANT {grant['privilege']} ON {grant['granted_on']} {green_database} "
f"TO ROLE {grant['grantee_name']};"
)

threaded_run_commands.register_command(grant_sql)
threaded_run_commands.run()

def clone_database_schemas(self, blue_database: str, green_database: str):
"""
Clones the schemas from the blue database to the green database and clones the existing blue database schema
grants.
Args:
green_database: The name of the green database (staging).
blue_database: The name of the blue database (prod)
Returns:
None
"""
console.print(
f"Cloning [u]schemas[/u] from [blue]{self.blue_database}[/blue] to [green]{self.green_database}[/green]"
)
dict_cursor = self.con.cursor(DictCursor)
dict_cursor.execute(f"show schemas in database {blue_database};")
schemas = dict_cursor.fetchall()
threaded_run_commands = ThreadedRunCommands(self.con, self._thread_count)

# Clone schemas
for schema in schemas:
if schema["name"] not in self._list_of_schemas_to_exclude:
# Clone each schema
sql = f"create schema {green_database}.{schema['name']} clone {blue_database}.{schema['name']};"
threaded_run_commands.register_command(sql)
threaded_run_commands.run()
console.print(f"Cloned schemas in {time.time() - self.time_check} seconds.")
self.time_check = time.time()
# Copy grants from Blue DB schemas
console.print(
f"Cloning [u]schema grants[/u] from [blue]{self.blue_database}[/blue] to "
f"[green]{self.green_database}[/green]"
)
for schema in schemas:
if schema["name"] not in self._list_of_schemas_to_exclude:
grants_sql_stg_1 = f"""show grants on schema {blue_database}.{schema['name']}"""
dict_cursor.execute(grants_sql_stg_1)
grants = dict_cursor.fetchall()
for grant in grants:
sql = (
f"GRANT {grant['privilege']} ON {grant['granted_on']} {green_database}.{schema['name']}"
f"TO ROLE {grant['grantee_name']};"
)
# Load SQL into the threaded commands to run.
threaded_run_commands.register_command(sql)
threaded_run_commands.run()
print(f"Cloned grants to schemas in {time.time() - self.time_check} seconds.")
self.time_check = time.time()


class ThreadedRunCommands:
"""Helper class for running queries across a configurable number of threads"""

def __init__(self, con, threads):
self.threads = threads
self.register_command_thread = 0
self.thread_commands = [[] for _ in range(self.threads)]
self.con = con

def register_command(self, command: str):
"""
Register a sql command to be run in a thread.
Args:
command: A SQL string to be run.
Returns:
None
"""
self.thread_commands[self.register_command_thread].append(command)
if self.register_command_thread + 1 == self.threads:
self.register_command_thread = 0
else:
self.register_command_thread += 1

def run_commands(self, commands):
"""
Loops over the commands passing off to the "run
Args:
commands: A list of SQL commands to be run async.
Returns:
None
"""
for command in commands:
self.con.cursor().execute_async(command)

def run(self):
"""
Run the commands in the threads.
Returns:
None
"""
procs = []
for v in self.thread_commands:
proc = threading.Thread(target=self.run_commands, args=(v,))
procs.append(proc)
proc.start()
# complete the processes
for proc in procs:
proc.join()


# if __name__ == "__main__":
# '''
# This section is really only designed for testing purposes. When used in production, it's is intended that you will
# call the clone_blue_db_to_green method from an external script or directly from the DAG as needed.
# '''
# parser = argparse.ArgumentParser(
# description="Script to run a blue/green swap")

# # Add the arguments
# parser.add_argument('--blue-db', type=str, default=os.environ.get('DATACOVES__MAIN__DATABASE'),
# help='The source database.')
# parser.add_argument('--green-db', type=str, help='The name of the green (temporary build) database.')

# # Parse the arguments
# args = parser.parse_args()

# # Handle the case when --green-db is not provided
# if args.green_db is None:
# args.green_db = f'{args.blue_db}_STAGING'

# blue_db = args.blue_db
# green_db = args.green_db

# c = CloneDB(blue_db, green_db)
# c.clone_blue_db_to_green()
Loading

0 comments on commit 99697b3

Please sign in to comment.