From 1488e6f113a4df9f15efa9d2ed780d8586971117 Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Wed, 28 Sep 2016 12:40:58 -0400 Subject: [PATCH 01/33] wip --- dbt/compilation.py | 13 ++++++++++++- dbt/model.py | 12 ++++++++++++ dbt/project.py | 1 + dbt/source.py | 8 +++++++- 4 files changed, 32 insertions(+), 2 deletions(-) diff --git a/dbt/compilation.py b/dbt/compilation.py index 52157fa97a1..1eeb8b75b9c 100644 --- a/dbt/compilation.py +++ b/dbt/compilation.py @@ -38,6 +38,12 @@ def model_sources(self, this_project, own_project=None): else: raise RuntimeError("unexpected create template type: '{}'".format(self.create_template.label)) + def get_macros(self, this_project, own_project=None): + if own_project is None: + own_project = this_project + paths = own_project.get('macro-paths', []) + return Source(this_project, own_project=own_project).get_macros(paths) + def project_schemas(self): source_paths = self.project.get('source-paths', []) return Source(self.project).get_schemas(source_paths) @@ -156,7 +162,8 @@ def get_context(self, linker, model, models): def compile_model(self, linker, model, models): try: - jinja = jinja2.Environment(loader=jinja2.FileSystemLoader(searchpath=model.root_dir)) + fs_loader = jinja2.FileSystemLoader(searchpath=model.root_dir) + jinja = jinja2.Environment(loader=fs_loader) # this is a dumb jinja2 bug -- on windows, forward slashes are EXPECTED posix_filepath = '/'.join(split_path(model.rel_filepath)) @@ -301,9 +308,13 @@ def compile(self, dry=False): linker = Linker() all_models = self.model_sources(this_project=self.project) + all_macros = self.get_macros(this_project=self.project) for project in dependency_projects(self.project): all_models.extend(self.model_sources(this_project=self.project, own_project=project)) + all_macros.extend(self.get_macros(this_project=self.project, own_project=project)) + + macro_dirs = list(set([m.root_dir for m in all_macros])) enabled_models = [model for model in all_models if model.is_enabled] diff --git a/dbt/model.py b/dbt/model.py index fa8885ca774..ed8b024b6ce 100644 --- a/dbt/model.py +++ b/dbt/model.py @@ -549,3 +549,15 @@ def __init__(self, project, target_dir, rel_filepath, own_project): def __repr__(self): return "".format(self.project['name'], self.model_name, self.filepath) + +class Macro(DBTSource): + def __init__(self, project, target_dir, rel_filepath, own_project): + super(Macro, self).__init__(project, target_dir, rel_filepath, own_project) + + def inject_contained_macros(self): + pass + + def __repr__(self): + return "".format(self.project['name'], self.name, self.filepath) + + diff --git a/dbt/project.py b/dbt/project.py index efe0306d663..17e6f80244e 100644 --- a/dbt/project.py +++ b/dbt/project.py @@ -7,6 +7,7 @@ default_project_cfg = { 'source-paths': ['models'], + 'macro-paths': ['macros'], 'data-paths': ['data'], 'test-paths': ['test'], 'target-path': 'target', diff --git a/dbt/source.py b/dbt/source.py index c51a049e00b..da794beb797 100644 --- a/dbt/source.py +++ b/dbt/source.py @@ -1,7 +1,7 @@ import os.path import fnmatch -from dbt.model import Model, Analysis, TestModel, SchemaFile, Csv +from dbt.model import Model, Analysis, TestModel, SchemaFile, Csv, Macro class Source(object): def __init__(self, project, own_project=None): @@ -54,3 +54,9 @@ def get_csvs(self, csv_dirs): csvs = [Csv(*csv) for csv in self.find(csv_dirs, pattern)] return csvs + def get_macros(self, macro_dirs): + "Get CSV files" + pattern = "[!.#~]*.sql" + macros = [Macro(*macro) for macro in self.find(macro_dirs, pattern)] + return macros + From d67e69fe706f47403cb54a8b86c0a81647ca95b2 Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Wed, 28 Sep 2016 13:09:39 -0400 Subject: [PATCH 02/33] clean up runner output previously, dbt output made it look like > `threads` threads were running at any given time. This wasn't true, but it was misleading. Now, no greater than `threads` models will show as RUNNING at any given time --- dbt/runner.py | 75 ++++++++++++++++++++++++++++++--------------------- 1 file changed, 44 insertions(+), 31 deletions(-) diff --git a/dbt/runner.py b/dbt/runner.py index 3bbc92ad816..6c74ea68787 100644 --- a/dbt/runner.py +++ b/dbt/runner.py @@ -322,37 +322,50 @@ def get_idx(model): models_to_execute = [model for model in model_list if not model.should_skip()] - for i, model in enumerate(models_to_execute): - msg = runner.pre_run_msg(model) - self.print_fancy_output_line(msg, 'RUN', get_idx(model), num_models) - - wrapped_models_to_execute = [{"runner": runner, "model": model} for model in models_to_execute] - run_model_results = pool.map(self.safe_execute_model, wrapped_models_to_execute) - - for i, run_model_result in enumerate(run_model_results): - model_results.append(run_model_result) - - msg = runner.post_run_msg(run_model_result) - status = runner.status(run_model_result) - index = get_idx(run_model_result.model) - self.print_fancy_output_line(msg, status, index, num_models, run_model_result.execution_time) - - dbt.tracking.track_model_run({ - "invocation_id": dbt.tracking.invocation_id, - "index": index, - "total": num_models, - "execution_time": run_model_result.execution_time, - "run_status": run_model_result.status, - "run_skipped": run_model_result.skip, - "run_error": run_model_result.error, - "model_materialization": run_model_result.model['materialized'], - "model_id": run_model_result.model.hashed_name(), - "hashed_contents": run_model_result.model.hashed_contents(), - }) - - if run_model_result.errored: - on_failure(run_model_result.model) - print(run_model_result.error) + threads = self.target.threads + num_models_this_batch = len(models_to_execute) + model_index = 0 + + def on_complete(run_model_results): + for run_model_result in run_model_results: + model_results.append(run_model_result) + + msg = runner.post_run_msg(run_model_result) + status = runner.status(run_model_result) + index = get_idx(run_model_result.model) + self.print_fancy_output_line(msg, status, index, num_models, run_model_result.execution_time) + + dbt.tracking.track_model_run({ + "invocation_id": dbt.tracking.invocation_id, + "index": index, + "total": num_models, + "execution_time": run_model_result.execution_time, + "run_status": run_model_result.status, + "run_skipped": run_model_result.skip, + "run_error": run_model_result.error, + "model_materialization": run_model_result.model['materialized'], + "model_id": run_model_result.model.hashed_name(), + "hashed_contents": run_model_result.model.hashed_contents(), + }) + + if run_model_result.errored: + on_failure(run_model_result.model) + print(run_model_result.error) + + while model_index < num_models_this_batch: + local_models = [] + for i in range(model_index, min(model_index + threads, num_models_this_batch)): + model = models_to_execute[i] + local_models.append(model) + msg = runner.pre_run_msg(model) + self.print_fancy_output_line(msg, 'RUN', get_idx(model), num_models) + + wrapped_models_to_execute = [{"runner": runner, "model": model} for model in local_models] + map_result = pool.map_async(self.safe_execute_model, wrapped_models_to_execute, callback=on_complete) + map_result.wait() + run_model_results = map_result.get() + + model_index += threads pool.close() pool.join() From 2a006cb91d88bde04d5b603aa2ed90ad98d049a0 Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Wed, 28 Sep 2016 13:23:34 -0400 Subject: [PATCH 03/33] --threads command line argument --- dbt/main.py | 2 ++ dbt/runner.py | 4 ++-- dbt/targets.py | 12 ++++++++---- dbt/task/run.py | 2 +- dbt/task/test.py | 2 +- 5 files changed, 14 insertions(+), 8 deletions(-) diff --git a/dbt/main.py b/dbt/main.py index c374a29e6d5..6ec67be03da 100644 --- a/dbt/main.py +++ b/dbt/main.py @@ -74,6 +74,7 @@ def handle(args): sub = subs.add_parser('run', parents=[base_subparser]) sub.add_argument('--dry', action='store_true', help="'dry run' models") sub.add_argument('--models', required=False, nargs='+', help="Specify the models to run. All models depending on these models will also be run") + sub.add_argument('--threads', type=int, required=False, help="Specify number of threads to use while executing models. Overrides settings in profiles.yml") sub.set_defaults(cls=run_task.RunTask, which='run') sub = subs.add_parser('seed', parents=[base_subparser]) @@ -83,6 +84,7 @@ def handle(args): sub = subs.add_parser('test', parents=[base_subparser]) sub.add_argument('--skip-test-creates', action='store_true', help="Don't create temporary views to validate model SQL") sub.add_argument('--validate', action='store_true', help='Run constraint validations from schema.yml files') + sub.add_argument('--threads', type=int, required=False, help="Specify number of threads to use while executing tests. Overrides settings in profiles.yml") sub.set_defaults(cls=test_task.TestTask, which='test') if len(args) == 0: return p.print_help() diff --git a/dbt/runner.py b/dbt/runner.py index 6c74ea68787..cdf31f4c63e 100644 --- a/dbt/runner.py +++ b/dbt/runner.py @@ -198,13 +198,13 @@ def execute(self, schema, target, model): return row[0] class RunManager(object): - def __init__(self, project, target_path, graph_type): + def __init__(self, project, target_path, graph_type, threads): self.logger = logging.getLogger(__name__) self.project = project self.target_path = target_path self.graph_type = graph_type - self.target = RedshiftTarget(self.project.run_environment()) + self.target = RedshiftTarget(self.project.run_environment(), threads) if self.target.should_open_tunnel(): print("Opening ssh tunnel to host {}... ".format(self.target.ssh_host), end="") diff --git a/dbt/targets.py b/dbt/targets.py index e53ee6b3441..b22c4eed388 100644 --- a/dbt/targets.py +++ b/dbt/targets.py @@ -14,7 +14,7 @@ Value given was {supplied} but it should be an int between {min_val} and {max_val}""" class RedshiftTarget: - def __init__(self, cfg): + def __init__(self, cfg, threads=None): assert cfg['type'] == 'redshift' self.host = cfg['host'] self.user = cfg['user'] @@ -22,7 +22,8 @@ def __init__(self, cfg): self.port = cfg['port'] self.dbname = cfg['dbname'] self.schema = cfg['schema'] - self.threads = self.__get_threads(cfg) + + self.threads = self.__get_threads(cfg, threads) #self.ssh_host = cfg.get('ssh-host', None) self.ssh_host = None @@ -73,8 +74,11 @@ def cleanup(self): # self.ssh_tunnel.stop() pass - def __get_threads(self, cfg): - supplied = cfg.get('threads', 1) + def __get_threads(self, cfg, cli_threads=None): + if cli_threads is None: + supplied = cfg.get('threads', 1) + else: + supplied = cli_threads bad_threads_error = RuntimeError(BAD_THREADS_ERROR.format(supplied=supplied, min_val=THREAD_MIN, max_val=THREAD_MAX)) diff --git a/dbt/task/run.py b/dbt/task/run.py index 8007a5dcc36..64497fad8e7 100644 --- a/dbt/task/run.py +++ b/dbt/task/run.py @@ -25,7 +25,7 @@ def compile(self): def run(self): graph_type = self.compile() - runner = RunManager(self.project, self.project['target-path'], graph_type) + runner = RunManager(self.project, self.project['target-path'], graph_type, self.args.threads) if self.args.dry: results = runner.dry_run(self.args.models) diff --git a/dbt/task/test.py b/dbt/task/test.py index c52250720b4..7ba9b9a9728 100644 --- a/dbt/task/test.py +++ b/dbt/task/test.py @@ -33,7 +33,7 @@ def compile(self): def run(self): self.compile() - runner = RunManager(self.project, self.project['target-path'], 'build') + runner = RunManager(self.project, self.project['target-path'], 'build', self.args.threads) runner.run_tests() print("Done!") From ef50280992e37a6ebe9d2ceb5543a7980a131890 Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Wed, 28 Sep 2016 14:01:05 -0400 Subject: [PATCH 04/33] quick recompile of compiled models for runtime interpolation biscotti: the food so nice they baked it twice! --- dbt/compilation.py | 2 ++ dbt/compiled_model.py | 12 ++++++++++++ dbt/runner.py | 13 ++++++++++++- dbt/tracking.py | 3 --- 4 files changed, 26 insertions(+), 4 deletions(-) diff --git a/dbt/compilation.py b/dbt/compilation.py index 52157fa97a1..eb2cda4f6db 100644 --- a/dbt/compilation.py +++ b/dbt/compilation.py @@ -151,6 +151,8 @@ def get_context(self, linker, model, models): context['config'] = self.__model_config(model, linker) context['this'] = This(context['env']['schema'], model.immediate_name, model.name) context['compiled_at'] = time.strftime('%Y-%m-%d %H:%M:%S') + context['run_started_at'] = '{{ run_started_at }}' # jinjaception + context['invocation_id'] = '{{ invocation_id }}' context['var'] = Var(model, context=context) return context diff --git a/dbt/compiled_model.py b/dbt/compiled_model.py index 6fff486d9ac..b824db4a30c 100644 --- a/dbt/compiled_model.py +++ b/dbt/compiled_model.py @@ -1,4 +1,6 @@ import hashlib +import jinja2 +from dbt.utils import compiler_error class CompiledModel(object): def __init__(self, fqn, data): @@ -12,6 +14,7 @@ def __init__(self, fqn, data): self.skip = False self._contents = None + self.compiled_contents = None def __getitem__(self, key): return self.data[key] @@ -39,6 +42,15 @@ def contents(self): self._contents = fh.read() return self._contents + def compile(self, context): + contents = self.contents + try: + env = jinja2.Environment() + self.compiled_contents = env.from_string(contents).render(context) + return self.compiled_contents + except jinja2.exceptions.TemplateSyntaxError as e: + compiler_error(self, str(e)) + @property def materialization(self): return self.data['materialized'] diff --git a/dbt/runner.py b/dbt/runner.py index 3bbc92ad816..e4f34551493 100644 --- a/dbt/runner.py +++ b/dbt/runner.py @@ -6,6 +6,7 @@ import logging import time import itertools +from datetime import datetime from dbt.compilation import Compiler from dbt.linker import Linker @@ -100,7 +101,7 @@ def execute(self, schema, target, model): if model.tmp_drop_type is not None: schema.drop(target.schema, model.tmp_drop_type, model.tmp_name) - status = schema.execute_and_handle_permissions(model.contents, model.name) + status = schema.execute_and_handle_permissions(model.compiled_contents, model.name) if model.final_drop_type is not None: schema.drop(target.schema, model.final_drop_type, model.name) @@ -215,6 +216,12 @@ def __init__(self, project, target_path, graph_type): self.schema = dbt.schema.Schema(self.project, self.target) + self.context = { + "run_started_at": datetime.now(), + "invocation_id": dbt.tracking.invocation_id, + } + + def deserialize_graph(self): linker = Linker() base_target_path = self.project['target-path'] @@ -369,6 +376,10 @@ def run_from_graph(self, runner, limit_to): compiled_models = [make_compiled_model(fqn, linker.get_node(fqn)) for fqn in linker.nodes()] relevant_compiled_models = [m for m in compiled_models if m.is_type(runner.run_type)] + for m in relevant_compiled_models: + if m.should_execute(): + m.compile(self.context) + schema_name = self.target.schema diff --git a/dbt/tracking.py b/dbt/tracking.py index 1a20fdf8284..0689cf4c8a8 100644 --- a/dbt/tracking.py +++ b/dbt/tracking.py @@ -55,9 +55,6 @@ def get_user(): return user -def get_invocation_id(): - pass - def get_options(args): exclude = ['cls', 'target', 'profile'] options = {k:v for (k, v) in args.__dict__.items() if k not in exclude} From ff6ae009d78edb3d7fdaffca31a98d9dee58898e Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Wed, 28 Sep 2016 15:35:50 -0400 Subject: [PATCH 05/33] get macros working (within dependencies) --- dbt/compilation.py | 16 +++++++++++++++- dbt/model.py | 16 ++++++++++++---- dbt/source.py | 2 +- 3 files changed, 28 insertions(+), 6 deletions(-) diff --git a/dbt/compilation.py b/dbt/compilation.py index 1eeb8b75b9c..14426f3b514 100644 --- a/dbt/compilation.py +++ b/dbt/compilation.py @@ -14,6 +14,7 @@ class Compiler(object): def __init__(self, project, create_template_class): self.project = project self.create_template = create_template_class() + self.macro_generator = None def initialize(self): if not os.path.exists(self.project['target-path']): @@ -158,6 +159,10 @@ def get_context(self, linker, model, models): context['this'] = This(context['env']['schema'], model.immediate_name, model.name) context['compiled_at'] = time.strftime('%Y-%m-%d %H:%M:%S') context['var'] = Var(model, context=context) + + for macro_name, macro in self.macro_generator(context): + context[macro_name] = macro + return context def compile_model(self, linker, model, models): @@ -304,6 +309,15 @@ def compile_schema_tests(self, linker): return written_tests + def generate_macros(self, all_macros): + def do_gen(ctx): + macros = [] + for macro in all_macros: + new_macros = macro.get_macros(ctx) + macros.extend(new_macros) + return macros + return do_gen + def compile(self, dry=False): linker = Linker() @@ -314,7 +328,7 @@ def compile(self, dry=False): all_models.extend(self.model_sources(this_project=self.project, own_project=project)) all_macros.extend(self.get_macros(this_project=self.project, own_project=project)) - macro_dirs = list(set([m.root_dir for m in all_macros])) + self.macro_generator = self.generate_macros(all_macros) enabled_models = [model for model in all_models if model.is_enabled] diff --git a/dbt/model.py b/dbt/model.py index ed8b024b6ce..0b48dbbcdc3 100644 --- a/dbt/model.py +++ b/dbt/model.py @@ -288,8 +288,10 @@ def build_path(self): def compile_string(self, ctx, string): try: - env = jinja2.Environment() - return env.from_string(string).render(ctx) + fs_loader = jinja2.FileSystemLoader(searchpath=self.project['macro-paths']) + env = jinja2.Environment(loader=fs_loader) + template = env.from_string(string, globals=ctx) + return template.render(ctx) except jinja2.exceptions.TemplateSyntaxError as e: compiler_error(self, str(e)) @@ -553,9 +555,15 @@ def __repr__(self): class Macro(DBTSource): def __init__(self, project, target_dir, rel_filepath, own_project): super(Macro, self).__init__(project, target_dir, rel_filepath, own_project) + self.filepath = os.path.join(self.root_dir, self.rel_filepath) - def inject_contained_macros(self): - pass + def get_macros(self, ctx): + env = jinja2.Environment() + template = env.from_string(self.contents, globals=ctx) + + for key, item in template.module.__dict__.items(): + if type(item) == jinja2.runtime.Macro: + yield key, item def __repr__(self): return "".format(self.project['name'], self.name, self.filepath) diff --git a/dbt/source.py b/dbt/source.py index da794beb797..f6d6c365abd 100644 --- a/dbt/source.py +++ b/dbt/source.py @@ -55,7 +55,7 @@ def get_csvs(self, csv_dirs): return csvs def get_macros(self, macro_dirs): - "Get CSV files" + "Get Macro files" pattern = "[!.#~]*.sql" macros = [Macro(*macro) for macro in self.find(macro_dirs, pattern)] return macros From 456b7a02d7e30ebcf7dc7825ac4d9979f2b198ed Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Wed, 28 Sep 2016 16:44:00 -0400 Subject: [PATCH 06/33] use configs defined at the root model level --- dbt/model.py | 38 +++++++++++++++++++++++--------------- 1 file changed, 23 insertions(+), 15 deletions(-) diff --git a/dbt/model.py b/dbt/model.py index fa8885ca774..85f4dd9aed8 100644 --- a/dbt/model.py +++ b/dbt/model.py @@ -13,6 +13,9 @@ class SourceConfig(object): Materializations = ['view', 'table', 'incremental', 'ephemeral'] ConfigKeys = DBTConfigKeys + AppendListFields = ['pre-hook', 'post-hook'] + ExtendDictFields = ['vars'] + def __init__(self, active_project, own_project, fqn): self.active_project = active_project self.own_project = own_project @@ -80,15 +83,24 @@ def __get_hooks(self, relevant_configs, key): hooks.append(hook) return hooks + def smart_update(self, mutable_config, new_configs): + relevant_configs = {key: new_configs[key] for key in new_configs if key in self.ConfigKeys} + for key in SourceConfig.AppendListFields: + new_hooks = self.__get_hooks(relevant_configs, key) + mutable_config[key].extend([h for h in new_hooks if h not in mutable_config[key]]) + + for key in SourceConfig.ExtendDictFields: + dict_val = relevant_configs.get(key, {}) + mutable_config[key].update(dict_val) + + return relevant_configs + def get_project_config(self, project): # most configs are overwritten by a more specific config, but pre/post hooks are appended! - append_list_fields = ['pre-hook', 'post-hook'] - extend_dict_fields = ['vars'] - config = {} - for k in append_list_fields: + for k in SourceConfig.AppendListFields: config[k] = [] - for k in extend_dict_fields: + for k in SourceConfig.ExtendDictFields: config[k] = {} model_configs = project['models'] @@ -96,23 +108,19 @@ def get_project_config(self, project): if model_configs is None: return config + # mutates config + self.smart_update(config, model_configs) + fqn = self.fqn[:] for level in fqn: level_config = model_configs.get(level, None) if level_config is None: break - relevant_configs = {key: level_config[key] for key in level_config if key in self.ConfigKeys} - - for key in append_list_fields: - new_hooks = self.__get_hooks(relevant_configs, key) - config[key].extend([h for h in new_hooks if h not in config[key]]) - - for key in extend_dict_fields: - dict_val = relevant_configs.get(key, {}) - config[key].update(dict_val) + # mutates config + relevant_configs = self.smart_update(config, level_config) - clobber_configs = {k:v for (k,v) in relevant_configs.items() if k not in append_list_fields and k not in extend_dict_fields} + clobber_configs = {k:v for (k,v) in relevant_configs.items() if k not in SourceConfig.AppendListFields and k not in SourceConfig.ExtendDictFields} config.update(clobber_configs) model_configs = model_configs[level] From 4fabb93d1f95429bb6732cbc514cb2ae18b334bb Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Sat, 1 Oct 2016 15:23:11 -0400 Subject: [PATCH 07/33] catch error if no internet only a big deal if you're trying to develop dbt against local postgres on a train :) --- dbt/version.py | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/dbt/version.py b/dbt/version.py index acdc34cd171..e6e4cca12c0 100644 --- a/dbt/version.py +++ b/dbt/version.py @@ -14,25 +14,20 @@ def __parse_version(contents): matches = re.search(r"current_version = ([\.0-9]+)", contents) if matches is None or len(matches.groups()) != 1: - return "???" + return "unknown" else: version = matches.groups()[0] return version def get_version(): return __version__ - #dbt_dir = os.path.dirname(os.path.dirname(__file__)) - #version_cfg = os.path.join(dbt_dir, ".bumpversion.cfg") - #if not os.path.exists(version_cfg): - # return "???" - #else: - # with open(version_cfg) as fh: - # contents = fh.read() - # return __parse_version(contents) def get_latest_version(): - f = urlopen(REMOTE_VERISON_FILE) - contents = f.read() + try: + f = urlopen(REMOTE_VERISON_FILE) + contents = f.read() + except: + contents = '' if hasattr(contents, 'decode'): contents = contents.decode('utf-8') return __parse_version(contents) From d032d300fe121cc127d543eb70236966705435cf Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Sat, 1 Oct 2016 15:59:23 -0400 Subject: [PATCH 08/33] don't put valid jinja into prefix --- dbt/compilation.py | 6 ++++-- dbt/model.py | 3 ++- dbt/runner.py | 2 +- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/dbt/compilation.py b/dbt/compilation.py index e490e2092ce..000225b8ea0 100644 --- a/dbt/compilation.py +++ b/dbt/compilation.py @@ -157,9 +157,11 @@ def get_context(self, linker, model, models): context['ref'] = self.__ref(linker, context, model, models) context['config'] = self.__model_config(model, linker) context['this'] = This(context['env']['schema'], model.immediate_name, model.name) - context['compiled_at'] = time.strftime('%Y-%m-%d %H:%M:%S') - context['run_started_at'] = '{{ run_started_at }}' # jinjaception + + # these get re-interpolated at runtime! + context['run_started_at'] = '{{ run_started_at }}' context['invocation_id'] = '{{ invocation_id }}' + context['var'] = Var(model, context=context) for macro_name, macro in self.macro_generator(context): diff --git a/dbt/model.py b/dbt/model.py index 0b48dbbcdc3..756e241d25b 100644 --- a/dbt/model.py +++ b/dbt/model.py @@ -233,7 +233,8 @@ def __init__(self, project, model_dir, rel_filepath, own_project, create_templat super(Model, self).__init__(project, model_dir, rel_filepath, own_project) def add_to_prologue(self, s): - self.prologue.append(s) + safe_string = s.replace('{{', 'DBT_EXPR(').replace('}}', ')') + self.prologue.append(safe_string) def get_prologue_string(self): blob = "\n".join("-- {}".format(s) for s in self.prologue) diff --git a/dbt/runner.py b/dbt/runner.py index e4f34551493..d6b4c913582 100644 --- a/dbt/runner.py +++ b/dbt/runner.py @@ -188,7 +188,7 @@ def status(self, result): return info def execute(self, schema, target, model): - rows = schema.execute_and_fetch(model.contents) + rows = schema.execute_and_fetch(model.compiled_contents) if len(rows) > 1: raise RuntimeError("Bad test {name}: Returned {num_rows} rows instead of 1".format(name=model.name, num_rows=len(rows))) From 851dcfde341b75621b69ced85580fbee138b7953 Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Sat, 1 Oct 2016 16:20:16 -0400 Subject: [PATCH 09/33] make new PostgresTarget, new target type --- dbt/compilation.py | 11 +++++++++-- dbt/runner.py | 4 ++-- dbt/schema_tester.py | 4 ++-- dbt/seeder.py | 4 ++-- dbt/targets.py | 44 +++++++++++++++++++++++++++++++++++++++++--- 5 files changed, 56 insertions(+), 11 deletions(-) diff --git a/dbt/compilation.py b/dbt/compilation.py index 000225b8ea0..01bd0dd3c6e 100644 --- a/dbt/compilation.py +++ b/dbt/compilation.py @@ -7,6 +7,7 @@ from dbt.source import Source from dbt.utils import find_model_by_fqn, find_model_by_name, dependency_projects, split_path, This, Var, compiler_error from dbt.linker import Linker +import dbt.targets import time import sqlparse @@ -15,6 +16,7 @@ def __init__(self, project, create_template_class): self.project = project self.create_template = create_template_class() self.macro_generator = None + self.target = self.get_target() def initialize(self): if not os.path.exists(self.project['target-path']): @@ -25,7 +27,7 @@ def initialize(self): def get_target(self): target_cfg = self.project.run_environment() - return RedshiftTarget(target_cfg) + return dbt.targets.get_target(target_cfg) def model_sources(self, this_project, own_project=None): if own_project is None: @@ -154,16 +156,21 @@ def wrapped_do_ref(*args): def get_context(self, linker, model, models): context = self.project.context() + + # built-ins context['ref'] = self.__ref(linker, context, model, models) context['config'] = self.__model_config(model, linker) context['this'] = This(context['env']['schema'], model.immediate_name, model.name) + context['var'] = Var(model, context=context) # these get re-interpolated at runtime! context['run_started_at'] = '{{ run_started_at }}' context['invocation_id'] = '{{ invocation_id }}' - context['var'] = Var(model, context=context) + # add in context from run target + context.update(self.target.context) + # add in macros (can we cache these somehow?) for macro_name, macro in self.macro_generator(context): context[macro_name] = macro diff --git a/dbt/runner.py b/dbt/runner.py index d6b4c913582..b644061d1dc 100644 --- a/dbt/runner.py +++ b/dbt/runner.py @@ -11,7 +11,7 @@ from dbt.compilation import Compiler from dbt.linker import Linker from dbt.templates import BaseCreateTemplate -from dbt.targets import RedshiftTarget +import dbt.targets from dbt.source import Source from dbt.utils import find_model_by_fqn, find_model_by_name, dependency_projects from dbt.compiled_model import make_compiled_model @@ -205,7 +205,7 @@ def __init__(self, project, target_path, graph_type): self.target_path = target_path self.graph_type = graph_type - self.target = RedshiftTarget(self.project.run_environment()) + self.target = dbt.targets.get_target(self.project.run_environment()) if self.target.should_open_tunnel(): print("Opening ssh tunnel to host {}... ".format(self.target.ssh_host), end="") diff --git a/dbt/schema_tester.py b/dbt/schema_tester.py index 7ef2edd3cb6..3b066121818 100644 --- a/dbt/schema_tester.py +++ b/dbt/schema_tester.py @@ -1,6 +1,6 @@ import os -from dbt.targets import RedshiftTarget +import dbt.targets import psycopg2 import logging @@ -58,7 +58,7 @@ def __init__(self, project): def get_target(self): target_cfg = self.project.run_environment() - return RedshiftTarget(target_cfg) + return dbt.targets.get_target(target_cfg) def execute_query(self, model, sql): target = self.get_target() diff --git a/dbt/seeder.py b/dbt/seeder.py index b07dca4b8e3..3ceb92b364f 100644 --- a/dbt/seeder.py +++ b/dbt/seeder.py @@ -6,13 +6,13 @@ import psycopg2 from dbt.source import Source -from dbt.targets import RedshiftTarget +import dbt.targets class Seeder: def __init__(self, project): self.project = project run_environment = self.project.run_environment() - self.target = RedshiftTarget(run_environment) + self.target = dbt.targets.get_target(run_environment) def find_csvs(self): return Source(self.project).get_csvs(self.project['data-paths']) diff --git a/dbt/targets.py b/dbt/targets.py index e53ee6b3441..f2522dc8c14 100644 --- a/dbt/targets.py +++ b/dbt/targets.py @@ -13,9 +13,9 @@ BAD_THREADS_ERROR = """Invalid value given for "threads" in active run-target. Value given was {supplied} but it should be an int between {min_val} and {max_val}""" -class RedshiftTarget: +class BaseSQLTarget: def __init__(self, cfg): - assert cfg['type'] == 'redshift' + self.target_type = cfg['type'] self.host = cfg['host'] self.user = cfg['user'] self.password = cfg['pass'] @@ -63,7 +63,7 @@ def should_open_tunnel(self): return False # make the user explicitly call this function to enable the ssh tunnel - # we don't want it to be automatically opened any time someone makes a RedshiftTarget() + # we don't want it to be automatically opened any time someone makes a new target def open_tunnel_if_needed(self): #self.ssh_tunnel = self.__open_tunnel() pass @@ -105,3 +105,41 @@ def get_handle(self): def rollback(self): if self.handle is not None: self.handle.rollback() + + @property + def type(self): + return self.target_type + +class RedshiftTarget(BaseSQLTarget): + def __init__(self, cfg): + super(RedshiftTarget, self).__init__(cfg) + + @property + def context(self): + return { + "sql_now": "getdate()" + } + +class PostgresTarget(BaseSQLTarget): + def __init__(self, cfg): + super(PostgresTarget, self).__init__(cfg) + + @property + def context(self): + return { + "sql_now": "clock_timestamp()" + } + +target_map = { + 'postgres': PostgresTarget, + 'redshift': RedshiftTarget +} + +def get_target(cfg): + target_type = cfg['type'] + if target_type in target_map: + klass = target_map[target_type] + return klass(cfg) + else: + valid_csv = ", ".join(["'{}'".format(t) for t in target_map]) + raise RuntimeError("Invalid target type provided: '{}'. Must be one of {}".format(target_type, valid_csv)) From 4fb0f91bf7365bfc7ba2c9741cd7992503cfd953 Mon Sep 17 00:00:00 2001 From: Tristan Handy Date: Mon, 3 Oct 2016 18:17:12 -0400 Subject: [PATCH 10/33] update readme for chat locations --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 7ecb2daac9f..7af1f0dd354 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ dbt (data build tool) helps analysts write reliable, modular code using a workfl - [What is dbt]? - Read the [dbt viewpoint] - [Installation] -- Join the [chat][gittr-url] on Gittr. +- Join the [chat][slack-url] on Slack for live questions and support. ## Code of Conduct @@ -18,7 +18,7 @@ Everyone interacting in the dbt project's codebases, issue trackers, chat rooms, [PyPA Code of Conduct]: https://www.pypa.io/en/latest/code-of-conduct/ -[gittr-url]: https://gitter.im/analyst-collective/dbt +[slack-url]: http://ac-slackin.herokuapp.com/ [Installation]: http://dbt.readthedocs.io/en/master/guide/setup/ [What is dbt]: http://dbt.readthedocs.io/en/master/about/overview/ [dbt viewpoint]: http://dbt.readthedocs.io/en/master/about/viewpoint/ From 16eaf084eef5ac885464d0e33019d726b7a9558e Mon Sep 17 00:00:00 2001 From: Tristan Handy Date: Thu, 6 Oct 2016 15:57:59 -0400 Subject: [PATCH 11/33] update about section of docs --- docs/about/contributing.md | 36 +++++++++++++++++++++++++ docs/about/overview.md | 54 ++++++++++++++++++++------------------ docs/about/viewpoint.md | 7 +---- 3 files changed, 65 insertions(+), 32 deletions(-) diff --git a/docs/about/contributing.md b/docs/about/contributing.md index b8b23e045a9..00bbb2715d3 100644 --- a/docs/about/contributing.md +++ b/docs/about/contributing.md @@ -12,3 +12,39 @@ We welcome PRs! We recommend that you log any feature requests as issues and dis ## Docs We welcome PRs with updated documentation! All documentation for dbt is written in markdown using [mkdocs](http://www.mkdocs.org/). Please follow installation instructions there to set up mkdocs on your local environment. + + +## Design Principles + +When enhancing dbt, it's critical to keep the core project goal in mind: + +**dbt (data build tool) is a productivity tool that helps analysts get more done and produce higher quality results.** + +This goal has been carefully selected, and flows directly from the [viewpoint](about/viewpoint/). + +### Why do we call dbt a “productivity tool”? Doesn't this minimize its impact? + +This is a deliberate choice of words that forces us to remember what dbt actually is and what its goals are: *dbt is a user experience wrapper around an analytic database.* All design decisions should be made with the goal of creating a better workflow / user experience for analysts. + +### Why are we focused on speed and quality as opposed to capability? + +Most analytics tools that exist today were designed to maximize user capability. If an analyst wanted to build a line chart, the tool needed to make sure he/she could build that line chart. Exactly how that line chart was produced was less important. + +This perspective made sense in the past. It used to be hard to make a line chart. Today it is easy: using matplotlib, ggplot2, Tableau, or the countless other charting tools creates functionally the same result. Today, the hard part is not making the line chart, but making the line chart fast, with accurate data, in a collaborative environment. While analysts today can create stunning visualizations, they struggle to produce accurate and timely data in a collaborative environment. + +Analysts don’t need more new capabilities, they need a workflow that allows them to use the ones they have faster, with higher quality, and in teams. + +### Why are we focused on analysts instead of data engineers? + +Two reasons: + +1. Analysts are closer to the business and the business users, and therefore have the information they need to actually build data models. +1. There are far more analysts than data engineers in the world. To truly solve this problem, we need to make the solution accessible for analysts. + +### Why is dbt such a technical tool if its target users are analysts? + +Most analysts today don't spend their time in text files and on the command line, but dbt forces the user to do both. This choice was made intentionally, and on three beliefs: + +1. Analysts are already becoming more technical, and this trend will accelerate in coming years. +1. Working in this way allows dbt to hook into a much larger ecosystem of developer productivity tools like git, vim/emacs, etc. This ecosystem has a large part to play in the overall productivity gains to be had from dbt. +1. Core analytics workflows should not be locked away into a particular UI. diff --git a/docs/about/overview.md b/docs/about/overview.md index 2eb9be5dcb2..a5dac047c6b 100644 --- a/docs/about/overview.md +++ b/docs/about/overview.md @@ -1,48 +1,50 @@ # Overview # ## What is dbt? -dbt [data build tool] is a tool for creating analytical data models. dbt facilitates an analytical workflow that closely mirrors software development, including source control, testing, and deployment. dbt makes it possible to produce reliable, modular analytic code as an individual or in teams. -For more information on the thinking that led to dbt, see [this article]( https://medium.com/analyst-collective/building-a-mature-analytics-workflow-the-analyst-collective-viewpoint-7653473ef05b). +dbt (data build tool) is a productivity tool that helps analysts get more done and produce higher quality results. -## Who should use dbt? -dbt is built for data consumers who want to model data in SQL to support production analytics use cases. Familiarity with tools like text editors, git, and the command line is helpful—while you do not need to be an expert with any of these tools, some basic familiarity is important. +Analysts commonly spend 50-80% of their time modeling raw data—cleaning, reshaping, and applying fundamental business logic to it. dbt empowers analysts to do this work better and faster. -## Why do I need to model my data? -With the advent of MPP analytic databases like Amazon Redshift and Google BigQuery, it is now common for companies to load and analyze large amounts of raw data in SQL-based environments. Raw data is often not suited for direct analysis and needs to be restructured first. Some common use cases include: +dbt's primary interface is its CLI. Using dbt is a combination of editing code in a text editor and running that code using dbt from the command line using `dbt [command] [options]`. -- sessionizing raw web clickstream data -- amortizing multi-month financial transactions +## How does dbt work? -Modeling data transforms raw data into data that can be more easily consumed by business users and BI platforms. It also encodes business rules that can then be relied on by all subsequent analysis, establishing a "single source of truth". +dbt has two core workflows: building data models and testing data models. (We call any transformed view of raw data a data model.) -## What exactly is a "data model" in this context? -A dbt data model is a SQL `SELECT` statement with templating and dbt-specific extensions. +To create a data model, an analyst simply writes a SQL `SELECT` statement. dbt then takes that statement and builds it in the database, materializing it as either a view or a table. This model can then be queried by other models or by other analytics tools. -## How does dbt work? +To test a data model, an analyst asserts something to be true about the underlying data. For example, an analyst can assert that a certain field should never be null, should always hold unique values, or should always map to a field in another table. Analysts can also write assertions that express much more customized logic, such as “debits and credits should always be equal within a given journal entry”. dbt then tests all assertions against the database and returns success or failure responses. + +## Does dbt really help me get more done? -dbt has a small number of core functions. It: +One dbt user has this to say: *“At this point when I have a new question, I can answer it 10-100x faster than I could before.”* Here’s how: -- takes a set of data models and compiles them into raw SQL, -- materializes them into your database as views and tables, and -- runs automated tests on top of them to ensure their integrity. +- dbt allows analysts avoid writing boilerplate DML and DDL: managing transactions, dropping tables, and managing schema changes. All business logic is expressed in SQL `SELECT` statements, and dbt takes care of materialization. +- dbt creates leverage. Instead of starting at the raw data with every analysis, analysts instead build up reusable data models that can be referenced in subsequent work. +- dbt includes optimizations for data model materialization, allowing analysts to dramatically reduce the time their queries take to run. -Once your data models have been materialized into your database, you can write analytic queries on top of them in any SQL-enabled tool. +There are many other optimizations in the dbt to help you work quickly: macros, hooks, and package management are all accelerators. -Conceptually, this is very simple. Practically, dbt solves some big headaches in exactly *how* it accomplishes these tasks: +## Does dbt really help me produce more reliable analysis? -- dbt interpolates schema and table names in your data models. This allows you to do things like deploy models to test and production environments seamlessly. -- dbt automatically infers a directed acyclic graph of the dependencies between your data models and uses this graph to manage the deployment to your schema. This graph is powerful, and allows for features like partial deployment and safe multi-threading. -- dbt's opinionated design lets you focus on writing your business logic instead of writing configuration and boilerplate code. +It does. Here’s how: -## Why model data in SQL? +- Writing SQL frequently involves a lot of copy-paste, which leads to errors when logic changes. With dbt, analysts don’t need to copy-paste. Instead, they build reusable data models that then get pulled into subsequent models and analysis. Change a model once and everything that’s build on it reflects that change. +- dbt allows subject matter experts to publish the canonical version of a particular data model, encapsulating all complex business logic. All analysis on top of this model will incorporate the same business logic without needing to understand it. +- dbt plays nicely with source control. Using dbt, analysts can use mature source control processes like branching, pull requests, and code reviews. +- dbt makes it easy and fast to write functional tests on the underlying data. Many analytic errors are caused by edge cases in the data: testing helps analysts find and handle those edge cases. -Historically, most analytical data modeling has been done prior to loading data into a SQL-based analytic database. Today, however, it's often preferable to model data within an analytic database using SQL. There are two primary reasons for this: +## Why SQL? -1. SQL is a very widely-known language for working with data. Providing SQL-based modeling tools gives the largest-possible group of users access. -1. Modern analytic databases are extremely performant and have sophisticated optimizers. Writing data transformations in SQL allows users to describe transformations on their data but leave the execution plan to the underlying technology. In practice, this provides excellent results with far less work on the part of the author. +While there are a large number of great languages for manipulating data, we’ve chosen SQL as the primary data transformation language at the heart of dbt. There are two reasons for this: -Of course, SQL will inevitably not be suitable for 100% of potential use cases. dbt may be extended in the future to take advantage of support for non-SQL languages in platforms like Redshift and BigQuery. We have found, though, that modern SQL has a higher degree of coverage than we had originally expected. To users of languages like Python, solving a challenging problem in SQL often requires a different type of thinking, but the advantages of staying "in-database" and allowing the optimizer to work for you are very significant. +1. SQL is a very widely-known language for working with data. Using SQL gives the largest-possible group of users access. +1. Modern analytic databases are extremely performant and have sophisticated optimizers. Writing data transformations in SQL allows users to describe transformations on their data but leave the execution plan to the underlying database technology. In practice, this provides excellent results with far less work on the part of the author. ## What databases does dbt currently support? Currently, dbt supports PostgreSQL and Amazon Redshift. We anticipate building support for additional databases in the future. + +## How do I get started? + +dbt is open source and completely free to download and use. See our [setup instructions](guide/setup/) for more. diff --git a/docs/about/viewpoint.md b/docs/about/viewpoint.md index 7e891b70b90..d1aa8ef01df 100644 --- a/docs/about/viewpoint.md +++ b/docs/about/viewpoint.md @@ -23,8 +23,6 @@ Analytics doesn’t have to be this way. In fact, the playbook for solving these The same techniques that software engineering teams use to collaborate on the rapid creation of quality applications can apply to analytics. We believe it’s time to build an open set of tools and processes to make that happen. ## Analytics is collaborative -Most of the problems with the current analytics workflow aren’t so bad if you’re working alone. You know about all of the data available to you, you know what it means, and you know how it was created. But you don’t scale. As soon as your analytics needs grow beyond a single analyst, these problems begin to manifest. - We believe a mature analytics team’s techniques and workflow should have the following collaboration features: ### Version Control @@ -35,15 +33,12 @@ Bad data can lead to bad analyses, and bad analyses can lead to bad decisions. A ### Documentation Your analysis is a software application, and, like every other software application, people are going to have questions about how to use it. Even though it might seem simple, in reality the “Revenue” line you’re showing could mean dozens of things. Your code should come packaged with a basic description of how it should be interpreted, and your team should be able to add to that documentation as additional questions arise. -Further, your analysis may need to be extended or modified by another member of your team, so document any portions of the code that may benefit from clarification. ### Modularity If you build a series of analyses about your company’s revenue, and your colleague does as well, you should use the same input data. Copy-paste is not a good approach here — if the definition of the underlying set changes, it will need to be updated everywhere it was used. Instead, think of the schema of a data set as its public interface. Create tables, views, or other data sets that expose a consistent schema and can be modified if business logic changes. ## Analytic code is an asset -Data collection, processing, and analysis have all grown exponentially in capability in the past decades. As a result, analytics as a practice provides more value than ever to organizations. Today, the success of an organization can be directly linked to its ability to make effective, fast decisions based on data. - -If analytics is core to the success of an organization, the code, processes, and tooling required to produce that analysis are core organizational investments. We believe a mature analytics organization’s workflow should have the following characteristics so as to protect and grow that investment: +The code, processes, and tooling required to produce that analysis are core organizational investments. We believe a mature analytics organization’s workflow should have the following characteristics so as to protect and grow that investment: ### Environments Analytics requires multiple environments. Analysts need the freedom to work without impacting users, while users need service level guarantees so that they can trust the data they rely on to do their jobs. From b99f037ae351d5886e71b7712325df018e140d55 Mon Sep 17 00:00:00 2001 From: Tristan Handy Date: Sat, 8 Oct 2016 13:09:22 -0400 Subject: [PATCH 12/33] design constraints --- docs/about/contributing.md | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/docs/about/contributing.md b/docs/about/contributing.md index 00bbb2715d3..83d4ed257b0 100644 --- a/docs/about/contributing.md +++ b/docs/about/contributing.md @@ -13,10 +13,16 @@ We welcome PRs! We recommend that you log any feature requests as issues and dis We welcome PRs with updated documentation! All documentation for dbt is written in markdown using [mkdocs](http://www.mkdocs.org/). Please follow installation instructions there to set up mkdocs on your local environment. +## Design Constraints + +All contributions to dbt must adhere to the following design constraints: + +- All data models are a single `SELECT` statement. All decisions about how the results of that statement are materialized in the database must be user-controlled via configuration. +- The target schema must always be able to be regenerated from scratch—i.e. if a user performs a `DROP SCHEMA [target] CASCADE` and then runs `dbt run --target [target]`, all data will be re-built exactly as before. ## Design Principles -When enhancing dbt, it's critical to keep the core project goal in mind: +When contributing to dbt, please keep the core project goal in mind: **dbt (data build tool) is a productivity tool that helps analysts get more done and produce higher quality results.** From 3061d597ab95f97ec4c503d16034a24f3e92bd90 Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Mon, 10 Oct 2016 20:53:57 -0400 Subject: [PATCH 13/33] wip --- dbt/main.py | 4 +++ dbt/schema.py | 13 +++++++++ dbt/targets.py | 15 ++++++++++ dbt/task/archive.py | 71 +++++++++++++++++++++++++++++++++++++++++++++ dbt/templates.py | 68 +++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 171 insertions(+) create mode 100644 dbt/task/archive.py diff --git a/dbt/main.py b/dbt/main.py index c374a29e6d5..877b4b6c184 100644 --- a/dbt/main.py +++ b/dbt/main.py @@ -16,6 +16,7 @@ import dbt.task.init as init_task import dbt.task.seed as seed_task import dbt.task.test as test_task +import dbt.task.archive as archive_task import dbt.tracking @@ -71,6 +72,9 @@ def handle(args): sub = subs.add_parser('deps', parents=[base_subparser]) sub.set_defaults(cls=deps_task.DepsTask, which='deps') + sub = subs.add_parser('archive', parents=[base_subparser]) + sub.set_defaults(cls=archive_task.ArchiveTask, which='archive') + sub = subs.add_parser('run', parents=[base_subparser]) sub.add_argument('--dry', action='store_true', help="'dry run' models") sub.add_argument('--models', required=False, nargs='+', help="Specify the models to run. All models depending on these models will also be run") diff --git a/dbt/schema.py b/dbt/schema.py index ba18e30bcae..53086d8dd60 100644 --- a/dbt/schema.py +++ b/dbt/schema.py @@ -104,6 +104,13 @@ def drop(self, schema, relation_type, relation): self.execute_and_handle_permissions(sql, relation) self.logger.info("dropped %s %s.%s", relation_type, schema, relation) + def get_columns_in_table(self, schema_name, table_name): + sql = self.target.sql_columns_in_table(schema_name, table_name) + self.logger.debug("getting columns in table %s.%s", schema_name, table_name) + results = self.execute_and_fetch(sql) + columns = {column: data_type for (column, data_type) in results} + self.logger.debug("Found columns: %s", columns) + return columns def rename(self, schema, from_name, to_name): rename_query = 'alter table "{schema}"."{from_name}" rename to "{to_name}"'.format(schema=schema, from_name=from_name, to_name=to_name) @@ -111,6 +118,12 @@ def rename(self, schema, from_name, to_name): self.execute_and_handle_permissions(rename_query, from_name) self.logger.info("renamed model %s.%s --> %s.%s", schema, from_name, schema, to_name) + def create_table(self, schema, table, columns_dict, sort, dist): + fields = ['"{field}" {data_type}'.format(field=field, data_type=data_type) for (field, data_type) in columns_dict.items()] + fields_csv = ",\n ".join(fields) + sql = 'create table if not exists "{schema}"."{table}" (\n {fields}\n);'.format(schema=schema, table=table, fields=fields_csv) + self.logger.info('creating table "%s"."%s"'.format(schema, table)) + self.execute_and_handle_permissions(sql, table) def create_schema_if_not_exists(self, schema_name): schemas = self.get_schemas() diff --git a/dbt/targets.py b/dbt/targets.py index f2522dc8c14..7703ec13bc3 100644 --- a/dbt/targets.py +++ b/dbt/targets.py @@ -114,6 +114,14 @@ class RedshiftTarget(BaseSQLTarget): def __init__(self, cfg): super(RedshiftTarget, self).__init__(cfg) + + def sql_columns_in_table(self, schema_name, table_name): + return """ + select "column" as column_name, "type" as "data_type" + from pg_table_def + where schemaname = '{schema_name}' and tablename = '{table_name}' + """.format(schema_name=schema_name, table_name=table_name).strip() + @property def context(self): return { @@ -124,6 +132,13 @@ class PostgresTarget(BaseSQLTarget): def __init__(self, cfg): super(PostgresTarget, self).__init__(cfg) + def sql_columns_in_table(self, schema_name, table_name): + return """ + select column_name, data_type + from information_schema.columns + where table_schema = '{schema_name}' and table_name = '{table_name}' + """.format(schema_name=schema_name, table_name=table_name).strip() + @property def context(self): return { diff --git a/dbt/task/archive.py b/dbt/task/archive.py new file mode 100644 index 00000000000..26b69ee8442 --- /dev/null +++ b/dbt/task/archive.py @@ -0,0 +1,71 @@ + + +from __future__ import print_function +import dbt.targets +import dbt.schema +import dbt.templates +import jinja2 + +class ArchivableTable(object): + def __init__(self, source_table, dest_table, unique_key, updated_at): + self.source_table = source_table + self.dest_table = dest_table + self.unique_key = unique_key + self.updated_at = updated_at + + def __repr__(self): + return " {} unique:{} updated_at:{}>".format(self.source_table, self.dest_table, self.unique_key, self.updated_at) + +class SourceSchema(object): + def __init__(self, source_schema, target_schema, tables): + self.source_schema = source_schema + self.target_schema = target_schema + self.tables = [self.parse_table(t) for t in tables] + + def parse_table(self, table_definition): + return ArchivableTable(**table_definition) + +class ArchiveTask: + def __init__(self, args, project): + self.args = args + self.project = project + + self.target = dbt.targets.get_target(self.project.run_environment()) + self.schema = dbt.schema.Schema(self.project, self.target) + + def run(self): + if 'archive' not in self.project: + raise RuntimeError("dbt_project.yml file is missing an 'archive' config!") + + # TODO : obviously handle input / validate better here + raw_source_schemas = self.project['archive'] + source_schemas = [SourceSchema(**item) for item in raw_source_schemas] + + for source_schema in source_schemas: + + # create archive schema if not exists! + self.schema.create_schema(source_schema.target_schema) + + for table in source_schema.tables: + columns = self.schema.get_columns_in_table(source_schema.source_schema, table.source_table) + + if len(columns) == 0: + raise RuntimeError('Source table "{}"."{}" does not exist'.format(source_schema.source_schema, table.source_table)) + + # create archive table if not exists! TODO: Sort & Dist keys! Hmmmm + self.schema.create_table(source_schema.target_schema, table.dest_table, columns, sort=table.updated_at, dist=table.unique_key) + + env = jinja2.Environment() + + ctx = { + "columns": columns, + "table" : table, + "archive": source_schema + } + + base_query = dbt.templates.SCDArchiveTemplate + template = env.from_string(base_query, globals=ctx) + rendered = template.render(ctx) + + status = self.schema.execute(rendered) + print("STATUS: ", status) diff --git a/dbt/templates.py b/dbt/templates.py index b43251406e5..d487171a95c 100644 --- a/dbt/templates.py +++ b/dbt/templates.py @@ -165,3 +165,71 @@ def wrap(self, opts): raise RuntimeError("Invalid materialization parameter ({})".format(opts['materialization'])) return "{}\n\n{}".format(opts['prologue'], sql) + + + +SCDArchiveTemplate = """ + with current_data as ( + + select *, + {{ table.updated_at }} as dbt_updated_at, + {{ table.unique_key }} as dbt_pk + from "{{ archive.source_schema }}"."{{ table.source_table }}" + + ), + + archived_data as ( + + select *, + {{ table.updated_at }} as dbt_updated_at, + {{ table.unique_key }} as dbt_pk + from "{{ archive.target_schema }}"."{{ table.dest_table }}" + + ), + + combined as ( + + select + {% for (col, type) in columns.items() %} + "{{ col }}", + {% endfor %} + dbt_updated_at, + dbt_pk + from current_data + + union all + + select + {% for (col, type) in columns.items() %} + "{{ col }}", + {% endfor %} + dbt_updated_at, + dbt_pk + from archived_data + + ), + + merged as ( + + select + distinct + combined.*, + least(combined.dbt_updated_at, current_data.dbt_updated_at) as valid_from, + case when combined.dbt_updated_at = current_data.dbt_updated_at then null + else current_data.dbt_updated_at + end as valid_to + from current_data + left outer join combined + on combined.dbt_pk = current_data.dbt_pk + and current_data.dbt_updated_at >= combined.dbt_updated_at + + ), + with_id as ( + select *, + row_number() over (partition by dbt_pk order by dbt_updated_at asc) as dbt_archive_id + from merged + ) + + select md5(dbt_pk || '|' || dbt_archive_id) as scd_id, * + from with_id +""" From 00fa9f0cd8717b801e9789838ac8efdd4ed1656d Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Mon, 10 Oct 2016 23:43:28 -0400 Subject: [PATCH 14/33] working archival --- dbt/schema.py | 7 ++++--- dbt/task/archive.py | 18 +++++++++++++--- dbt/templates.py | 50 ++++++++++++++++++++++++++++++++++++++++----- 3 files changed, 64 insertions(+), 11 deletions(-) diff --git a/dbt/schema.py b/dbt/schema.py index 53086d8dd60..93ff42890ca 100644 --- a/dbt/schema.py +++ b/dbt/schema.py @@ -108,7 +108,7 @@ def get_columns_in_table(self, schema_name, table_name): sql = self.target.sql_columns_in_table(schema_name, table_name) self.logger.debug("getting columns in table %s.%s", schema_name, table_name) results = self.execute_and_fetch(sql) - columns = {column: data_type for (column, data_type) in results} + columns = [(column, data_type) for (column, data_type) in results] self.logger.debug("Found columns: %s", columns) return columns @@ -118,9 +118,10 @@ def rename(self, schema, from_name, to_name): self.execute_and_handle_permissions(rename_query, from_name) self.logger.info("renamed model %s.%s --> %s.%s", schema, from_name, schema, to_name) - def create_table(self, schema, table, columns_dict, sort, dist): - fields = ['"{field}" {data_type}'.format(field=field, data_type=data_type) for (field, data_type) in columns_dict.items()] + def create_table(self, schema, table, columns, sort, dist): + fields = ['"{field}" {data_type}'.format(field=field, data_type=data_type) for (field, data_type) in columns] fields_csv = ",\n ".join(fields) + # TODO : Sort and Dist keys?? sql = 'create table if not exists "{schema}"."{table}" (\n {fields}\n);'.format(schema=schema, table=table, fields=fields_csv) self.logger.info('creating table "%s"."%s"'.format(schema, table)) self.execute_and_handle_permissions(sql, table) diff --git a/dbt/task/archive.py b/dbt/task/archive.py index 26b69ee8442..0bf84041088 100644 --- a/dbt/task/archive.py +++ b/dbt/task/archive.py @@ -53,7 +53,16 @@ def run(self): raise RuntimeError('Source table "{}"."{}" does not exist'.format(source_schema.source_schema, table.source_table)) # create archive table if not exists! TODO: Sort & Dist keys! Hmmmm - self.schema.create_table(source_schema.target_schema, table.dest_table, columns, sort=table.updated_at, dist=table.unique_key) + + extra_cols = [ + ("valid_from", "timestamp"), + ("valid_to", "timestamp"), + ("scd_id","text"), + ("dbt_updated_at","timestamp") + ] + + dest_columns = columns + extra_cols + self.schema.create_table(source_schema.target_schema, table.dest_table, dest_columns, sort=table.updated_at, dist=table.unique_key) env = jinja2.Environment() @@ -67,5 +76,8 @@ def run(self): template = env.from_string(base_query, globals=ctx) rendered = template.render(ctx) - status = self.schema.execute(rendered) - print("STATUS: ", status) + template = dbt.templates.ArchiveInsertTemplate() + transaction = template.wrap(source_schema.target_schema, table.dest_table, rendered, table.unique_key) + + print(transaction) + #self.schema.execute_and_handle_permissions(transaction, table.dest_table) diff --git a/dbt/templates.py b/dbt/templates.py index d487171a95c..8d8798b374b 100644 --- a/dbt/templates.py +++ b/dbt/templates.py @@ -180,7 +180,10 @@ def wrap(self, opts): archived_data as ( - select *, + select + {% for (col, type) in columns %} + "{{ col }}", + {% endfor %} {{ table.updated_at }} as dbt_updated_at, {{ table.unique_key }} as dbt_pk from "{{ archive.target_schema }}"."{{ table.dest_table }}" @@ -190,7 +193,7 @@ def wrap(self, opts): combined as ( select - {% for (col, type) in columns.items() %} + {% for (col, type) in columns %} "{{ col }}", {% endfor %} dbt_updated_at, @@ -200,7 +203,7 @@ def wrap(self, opts): union all select - {% for (col, type) in columns.items() %} + {% for (col, type) in columns %} "{{ col }}", {% endfor %} dbt_updated_at, @@ -226,10 +229,47 @@ def wrap(self, opts): ), with_id as ( select *, - row_number() over (partition by dbt_pk order by dbt_updated_at asc) as dbt_archive_id + row_number() over (partition by dbt_pk order by dbt_updated_at asc) as dbt_archive_id, + count(*) over (partition by dbt_pk) as num_changes from merged ) - select md5(dbt_pk || '|' || dbt_archive_id) as scd_id, * + -- + -- TODO : The order of scd_id and dbt_updated_at depends + -- on the order of col injections in archive.py + select + {% for (col, type) in columns %} + "{{ col }}", + {% endfor %} + valid_from, + valid_to, + md5(dbt_pk || '|' || dbt_archive_id) as scd_id, + dbt_updated_at + from with_id + where num_changes > 1 """ + + +class ArchiveInsertTemplate(object): + archival_template = """ +create temporary table "{identifier}__dbt_archival_tmp" as ( + with dbt_archive_sbq as ( + {query} + ) + select * from dbt_archive_sbq +); + +delete from "{schema}"."{identifier}" where ({unique_key}) in ( + select ({unique_key}) from "{identifier}__dbt_archival_tmp" +); + +insert into "{schema}"."{identifier}" ( + select * from "{identifier}__dbt_archival_tmp" +); +""" + + def wrap(self, schema, table, query, unique_key): + sql = self.archival_template.format(schema=schema, identifier=table, query=query, unique_key=unique_key) + return sql + From 7f0855101e305aacc79d12093aebdeda57d636d2 Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Tue, 11 Oct 2016 12:10:18 -0400 Subject: [PATCH 15/33] call sql exec func --- dbt/task/archive.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/dbt/task/archive.py b/dbt/task/archive.py index 0bf84041088..dc505668beb 100644 --- a/dbt/task/archive.py +++ b/dbt/task/archive.py @@ -79,5 +79,4 @@ def run(self): template = dbt.templates.ArchiveInsertTemplate() transaction = template.wrap(source_schema.target_schema, table.dest_table, rendered, table.unique_key) - print(transaction) - #self.schema.execute_and_handle_permissions(transaction, table.dest_table) + self.schema.execute_and_handle_permissions(transaction, table.dest_table) From 7bff5c6e3e2045fdba9f6d5e234669b06e971c7d Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Tue, 11 Oct 2016 14:50:38 -0400 Subject: [PATCH 16/33] compile archive sql --- dbt/archival.py | 62 ++++++++++++++++++++++++++++++++++++ dbt/compilation.py | 27 +++++++++++++--- dbt/compiled_model.py | 18 +++++++++++ dbt/model.py | 33 +++++++++++++++++++ dbt/source.py | 23 +++++++++++++- dbt/task/archive.py | 74 +------------------------------------------ dbt/task/compile.py | 4 +-- dbt/templates.py | 15 +++++---- 8 files changed, 170 insertions(+), 86 deletions(-) create mode 100644 dbt/archival.py diff --git a/dbt/archival.py b/dbt/archival.py new file mode 100644 index 00000000000..7a4e31db8e0 --- /dev/null +++ b/dbt/archival.py @@ -0,0 +1,62 @@ + +from __future__ import print_function +import dbt.targets +import dbt.schema +import dbt.templates +import jinja2 + +class Archival(object): + + def __init__(self, project, archive_model): + self.archive_model = archive_model + self.project = project + + self.target = dbt.targets.get_target(self.project.run_environment()) + self.schema = dbt.schema.Schema(self.project, self.target) + + def compile(self): + source_schema = self.archive_model.source_schema + target_schema = self.archive_model.target_schema + source_table = self.archive_model.source_table + dest_table = self.archive_model.dest_table + unique_key = self.archive_model.unique_key + updated_at = self.archive_model.updated_at + + self.schema.create_schema(target_schema) + + source_columns = self.schema.get_columns_in_table(source_schema, source_table) + + if len(source_columns) == 0: + raise RuntimeError('Source table "{}"."{}" does not exist'.format(source_schema, source_table)) + + # create archive table if not exists! TODO: Sort & Dist keys! Hmmmm + + extra_cols = [ + ("valid_from", "timestamp"), + ("valid_to", "timestamp"), + ("scd_id","text"), + ("dbt_updated_at","timestamp") + ] + + dest_columns = source_columns + extra_cols + self.schema.create_table(target_schema, dest_table, dest_columns, sort=updated_at, dist=unique_key) + + env = jinja2.Environment() + + ctx = { + "columns" : source_columns, + "updated_at" : updated_at, + "unique_key" : unique_key, + "source_schema" : source_schema, + "source_table" : source_table, + "target_schema" : target_schema, + "dest_table" : dest_table, + } + + base_query = dbt.templates.SCDArchiveTemplate + template = env.from_string(base_query, globals=ctx) + rendered = template.render(ctx) + + return rendered + + diff --git a/dbt/compilation.py b/dbt/compilation.py index 01bd0dd3c6e..91502710208 100644 --- a/dbt/compilation.py +++ b/dbt/compilation.py @@ -8,6 +8,7 @@ from dbt.utils import find_model_by_fqn, find_model_by_name, dependency_projects, split_path, This, Var, compiler_error from dbt.linker import Linker import dbt.targets +import dbt.templates import time import sqlparse @@ -47,6 +48,10 @@ def get_macros(self, this_project, own_project=None): paths = own_project.get('macro-paths', []) return Source(this_project, own_project=own_project).get_macros(paths) + def get_archives(self, project): + archive_template = dbt.templates.ArchiveInsertTemplate() + return Source(project, own_project=project).get_archives(archive_template) + def project_schemas(self): source_paths = self.project.get('source-paths', []) return Source(self.project).get_schemas(source_paths) @@ -192,8 +197,8 @@ def compile_model(self, linker, model, models): return rendered - def write_graph_file(self, linker): - filename = 'graph-{}.yml'.format(self.create_template.label) + def write_graph_file(self, linker, label): + filename = 'graph-{}.yml'.format(label) graph_path = os.path.join(self.project['target-path'], filename) linker.write_graph(graph_path) @@ -329,7 +334,20 @@ def do_gen(ctx): return macros return do_gen + def compile_archives(self): + linker = Linker() + all_archives = self.get_archives(self.project) + + for archive in all_archives: + sql = archive.compile() + self.__write(archive.build_path(), sql) + + self.write_graph_file(linker, 'archive') + return all_archives + def compile(self, dry=False): + compiled_archives = [] if dry else self.compile_archives() + linker = Linker() all_models = self.model_sources(this_project=self.project) @@ -345,16 +363,17 @@ def compile(self, dry=False): compiled_models, written_models = self.compile_models(linker, enabled_models) + # TODO : only compile schema tests for enabled models written_schema_tests = self.compile_schema_tests(linker) self.validate_models_unique(compiled_models) self.validate_models_unique(written_schema_tests) - self.write_graph_file(linker) + self.write_graph_file(linker, self.create_template.label) if self.create_template.label != 'test': written_analyses = self.compile_analyses(linker, compiled_models) else: written_analyses = [] - return len(written_models), len(written_schema_tests), len(written_analyses) + return len(written_models), len(written_schema_tests), len(compiled_archives), len(written_analyses) diff --git a/dbt/compiled_model.py b/dbt/compiled_model.py index b824db4a30c..bacd39f2bd6 100644 --- a/dbt/compiled_model.py +++ b/dbt/compiled_model.py @@ -110,6 +110,22 @@ def prepare(self, existing, target): def __repr__(self): return "".format(self.data['project_name'], self.name, self.data['build_path']) +class CompiledArchive(CompiledModel): + def __init__(self, fqn, data): + super(CompiledArchive, self).__init__(fqn, data) + + def should_rename(self): + return False + + def should_execute(self): + return True + + def prepare(self, existing, target): + self.target = target + + def __repr__(self): + return "".format(self.data['project_name'], self.name, self.data['build_path']) + def make_compiled_model(fqn, data): run_type = data['dbt_run_type'] @@ -117,6 +133,8 @@ def make_compiled_model(fqn, data): return CompiledModel(fqn, data) elif run_type == 'test': return CompiledTest(fqn, data) + elif run_type == 'archive': + return CompiledArchive(fqn, data) else: raise RuntimeError("invalid run_type given: {}".format(run_type)) diff --git a/dbt/model.py b/dbt/model.py index 756e241d25b..06f18b14d94 100644 --- a/dbt/model.py +++ b/dbt/model.py @@ -7,6 +7,7 @@ from dbt.utils import split_path import dbt.schema_tester import dbt.project +import dbt.archival from dbt.utils import This, deep_merge, DBTConfigKeys, compiler_error class SourceConfig(object): @@ -570,3 +571,35 @@ def __repr__(self): return "".format(self.project['name'], self.name, self.filepath) +class ArchiveModel(DBTSource): + def __init__(self, project, create_template, source_schema, target_schema, source_table, dest_table, unique_key, updated_at): + + self.create_template = create_template + + self.source_schema = source_schema + self.target_schema = target_schema + self.source_table = source_table + self.dest_table = dest_table + self.unique_key = unique_key + self.updated_at = updated_at + + target_dir = self.create_template.label + rel_filepath = os.path.join(self.target_schema, self.dest_table) + + super(ArchiveModel, self).__init__(project, target_dir, rel_filepath, project) + + def compile(self): + archival = dbt.archival.Archival(self.project, self) + query = archival.compile() + + sql = self.create_template.wrap(self.target_schema, self.dest_table, query, self.unique_key) + return sql + + def build_path(self): + build_dir = self.create_template.label + filename = "{}.sql".format(self.name) + path_parts = [build_dir] + self.fqn[:-1] + [filename] + return os.path.join(*path_parts) + + def __repr__(self): + return " {} unique:{} updated_at:{}>".format(self.source_table, self.dest_table, self.unique_key, self.updated_at) diff --git a/dbt/source.py b/dbt/source.py index f6d6c365abd..1e1021909fa 100644 --- a/dbt/source.py +++ b/dbt/source.py @@ -1,7 +1,7 @@ import os.path import fnmatch -from dbt.model import Model, Analysis, TestModel, SchemaFile, Csv, Macro +from dbt.model import Model, Analysis, TestModel, SchemaFile, Csv, Macro, ArchiveModel class Source(object): def __init__(self, project, own_project=None): @@ -60,3 +60,24 @@ def get_macros(self, macro_dirs): macros = [Macro(*macro) for macro in self.find(macro_dirs, pattern)] return macros + def get_archives(self, create_template): + "Get Archive models defined in project config" + + if 'archive' not in self.project: + return [] + + raw_source_schemas = self.project['archive'].copy() + + archives = [] + for schema in raw_source_schemas: + if 'tables' not in schema: + continue + + tables = schema.pop('tables') + for table in tables: + fields = table.copy() + fields.update(schema) + archives.append(ArchiveModel(self.project, create_template, **fields)) + return archives + + diff --git a/dbt/task/archive.py b/dbt/task/archive.py index dc505668beb..0fa52782aed 100644 --- a/dbt/task/archive.py +++ b/dbt/task/archive.py @@ -1,82 +1,10 @@ -from __future__ import print_function -import dbt.targets -import dbt.schema -import dbt.templates -import jinja2 - -class ArchivableTable(object): - def __init__(self, source_table, dest_table, unique_key, updated_at): - self.source_table = source_table - self.dest_table = dest_table - self.unique_key = unique_key - self.updated_at = updated_at - - def __repr__(self): - return " {} unique:{} updated_at:{}>".format(self.source_table, self.dest_table, self.unique_key, self.updated_at) - -class SourceSchema(object): - def __init__(self, source_schema, target_schema, tables): - self.source_schema = source_schema - self.target_schema = target_schema - self.tables = [self.parse_table(t) for t in tables] - - def parse_table(self, table_definition): - return ArchivableTable(**table_definition) class ArchiveTask: def __init__(self, args, project): self.args = args self.project = project - self.target = dbt.targets.get_target(self.project.run_environment()) - self.schema = dbt.schema.Schema(self.project, self.target) - def run(self): - if 'archive' not in self.project: - raise RuntimeError("dbt_project.yml file is missing an 'archive' config!") - - # TODO : obviously handle input / validate better here - raw_source_schemas = self.project['archive'] - source_schemas = [SourceSchema(**item) for item in raw_source_schemas] - - for source_schema in source_schemas: - - # create archive schema if not exists! - self.schema.create_schema(source_schema.target_schema) - - for table in source_schema.tables: - columns = self.schema.get_columns_in_table(source_schema.source_schema, table.source_table) - - if len(columns) == 0: - raise RuntimeError('Source table "{}"."{}" does not exist'.format(source_schema.source_schema, table.source_table)) - - # create archive table if not exists! TODO: Sort & Dist keys! Hmmmm - - extra_cols = [ - ("valid_from", "timestamp"), - ("valid_to", "timestamp"), - ("scd_id","text"), - ("dbt_updated_at","timestamp") - ] - - dest_columns = columns + extra_cols - self.schema.create_table(source_schema.target_schema, table.dest_table, dest_columns, sort=table.updated_at, dist=table.unique_key) - - env = jinja2.Environment() - - ctx = { - "columns": columns, - "table" : table, - "archive": source_schema - } - - base_query = dbt.templates.SCDArchiveTemplate - template = env.from_string(base_query, globals=ctx) - rendered = template.render(ctx) - - template = dbt.templates.ArchiveInsertTemplate() - transaction = template.wrap(source_schema.target_schema, table.dest_table, rendered, table.unique_key) - - self.schema.execute_and_handle_permissions(transaction, table.dest_table) + pass diff --git a/dbt/task/compile.py b/dbt/task/compile.py index a5dd07a37c6..c39a13e82b3 100644 --- a/dbt/task/compile.py +++ b/dbt/task/compile.py @@ -16,6 +16,6 @@ def run(self): compiler = Compiler(self.project, create_template) compiler.initialize() - created_models, created_tests, created_analyses = compiler.compile(dry=self.args.dry) + created_models, created_tests, created_archives, created_analyses = compiler.compile(dry=self.args.dry) - print("Compiled {} models, {} tests and {} analyses".format(created_models, created_tests, created_analyses)) + print("Compiled {} models, {} tests, {} archives and {} analyses".format(created_models, created_tests, created_archives, created_analyses)) diff --git a/dbt/templates.py b/dbt/templates.py index 8d8798b374b..aa04223a62c 100644 --- a/dbt/templates.py +++ b/dbt/templates.py @@ -172,9 +172,9 @@ def wrap(self, opts): with current_data as ( select *, - {{ table.updated_at }} as dbt_updated_at, - {{ table.unique_key }} as dbt_pk - from "{{ archive.source_schema }}"."{{ table.source_table }}" + {{ updated_at }} as dbt_updated_at, + {{ unique_key }} as dbt_pk + from "{{ source_schema }}"."{{ source_table }}" ), @@ -184,9 +184,9 @@ def wrap(self, opts): {% for (col, type) in columns %} "{{ col }}", {% endfor %} - {{ table.updated_at }} as dbt_updated_at, - {{ table.unique_key }} as dbt_pk - from "{{ archive.target_schema }}"."{{ table.dest_table }}" + {{ updated_at }} as dbt_updated_at, + {{ unique_key }} as dbt_pk + from "{{ target_schema }}"."{{ dest_table }}" ), @@ -252,6 +252,9 @@ def wrap(self, opts): class ArchiveInsertTemplate(object): + + label = "archive" + archival_template = """ create temporary table "{identifier}__dbt_archival_tmp" as ( with dbt_archive_sbq as ( From 8fb4b680a264197af0627161f13ed20b93d9b161 Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Tue, 11 Oct 2016 21:03:12 -0400 Subject: [PATCH 17/33] raiders of the lost archive --- dbt/archival.py | 3 +- dbt/compilation.py | 12 ++++--- dbt/compiled_model.py | 4 +++ dbt/model.py | 17 ++++++++++ dbt/runner.py | 49 ++++++++++++++++++++++++++-- dbt/schema.py | 9 ++++++ dbt/task/archive.py | 16 ++++++++-- dbt/templates.py | 74 ++++++++++++++++++++++++++----------------- 8 files changed, 146 insertions(+), 38 deletions(-) diff --git a/dbt/archival.py b/dbt/archival.py index 7a4e31db8e0..7c2dff8fe17 100644 --- a/dbt/archival.py +++ b/dbt/archival.py @@ -5,6 +5,7 @@ import dbt.templates import jinja2 + class Archival(object): def __init__(self, project, archive_model): @@ -50,7 +51,7 @@ def compile(self): "source_schema" : source_schema, "source_table" : source_table, "target_schema" : target_schema, - "dest_table" : dest_table, + "dest_table" : dest_table } base_query = dbt.templates.SCDArchiveTemplate diff --git a/dbt/compilation.py b/dbt/compilation.py index 91502710208..116bb3dde78 100644 --- a/dbt/compilation.py +++ b/dbt/compilation.py @@ -39,6 +39,8 @@ def model_sources(self, this_project, own_project=None): return Source(this_project, own_project=own_project).get_models(paths, self.create_template) elif self.create_template.label == 'test': return Source(this_project, own_project=own_project).get_test_models(paths, self.create_template) + elif self.create_template.label == 'archive': + return [] else: raise RuntimeError("unexpected create template type: '{}'".format(self.create_template.label)) @@ -340,14 +342,14 @@ def compile_archives(self): for archive in all_archives: sql = archive.compile() + fqn = tuple(archive.fqn) + linker.update_node_data(fqn, archive.serialize()) self.__write(archive.build_path(), sql) self.write_graph_file(linker, 'archive') return all_archives def compile(self, dry=False): - compiled_archives = [] if dry else self.compile_archives() - linker = Linker() all_models = self.model_sources(this_project=self.project) @@ -363,7 +365,6 @@ def compile(self, dry=False): compiled_models, written_models = self.compile_models(linker, enabled_models) - # TODO : only compile schema tests for enabled models written_schema_tests = self.compile_schema_tests(linker) @@ -371,9 +372,12 @@ def compile(self, dry=False): self.validate_models_unique(written_schema_tests) self.write_graph_file(linker, self.create_template.label) - if self.create_template.label != 'test': + if self.create_template.label not in ['test', 'archive']: written_analyses = self.compile_analyses(linker, compiled_models) else: written_analyses = [] + + compiled_archives = self.compile_archives() + return len(written_models), len(written_schema_tests), len(compiled_archives), len(written_analyses) diff --git a/dbt/compiled_model.py b/dbt/compiled_model.py index bacd39f2bd6..f9bfe90e06d 100644 --- a/dbt/compiled_model.py +++ b/dbt/compiled_model.py @@ -6,6 +6,7 @@ class CompiledModel(object): def __init__(self, fqn, data): self.fqn = fqn self.data = data + self.nice_name = ".".join(fqn) # these are set just before the models are executed self.tmp_drop_type = None @@ -23,6 +24,9 @@ def hashed_name(self): fqn_string = ".".join(self.fqn) return hashlib.md5(fqn_string.encode('utf-8')).hexdigest() + def context(self): + return self.data + def hashed_contents(self): return hashlib.md5(self.contents.encode('utf-8')).hexdigest() diff --git a/dbt/model.py b/dbt/model.py index 06f18b14d94..74849250352 100644 --- a/dbt/model.py +++ b/dbt/model.py @@ -572,6 +572,8 @@ def __repr__(self): class ArchiveModel(DBTSource): + dbt_run_type = 'archive' + def __init__(self, project, create_template, source_schema, target_schema, source_table, dest_table, unique_key, updated_at): self.create_template = create_template @@ -588,6 +590,21 @@ def __init__(self, project, create_template, source_schema, target_schema, sourc super(ArchiveModel, self).__init__(project, target_dir, rel_filepath, project) + def serialize(self): + data = DBTSource.serialize(self).copy() + + serialized = { + "source_schema" : self.source_schema, + "target_schema" : self.target_schema, + "source_table" : self.source_table, + "dest_table" : self.dest_table, + "unique_key" : self.unique_key, + "updated_at" : self.updated_at + } + + data.update(serialized) + return data + def compile(self): archival = dbt.archival.Archival(self.project, self) query = archival.compile() diff --git a/dbt/runner.py b/dbt/runner.py index b644061d1dc..8fdd35a6608 100644 --- a/dbt/runner.py +++ b/dbt/runner.py @@ -198,6 +198,43 @@ def execute(self, schema, target, model): return row[0] +class ArchiveRunner(BaseRunner): + run_type = 'archive' + + def pre_run_msg(self, model): + print_vars = { + "schema": model.target.schema, + "model_name": model.name, + } + + output = "START archive table {schema}.{model_name} ".format(**print_vars) + return output + + def post_run_msg(self, result): + model = result.model + print_vars = { + "schema": model.target.schema, + "model_name": model.name, + "info": "ERROR archiving" if result.errored else "OK created" + } + + output = "{info} table {schema}.{model_name} ".format(**print_vars) + return output + + def pre_run_all_msg(self, models): + return "Archiving {} tables".format(len(models)) + + def post_run_all_msg(self, results): + return "Finished archiving {} tables".format(len(results)) + + def status(self, result): + return result.status + + def execute(self, schema, target, model): + print(model.compiled_contents) + status = schema.execute_and_handle_permissions(model.compiled_contents, model.name) + return status + class RunManager(object): def __init__(self, project, target_path, graph_type): self.logger = logging.getLogger(__name__) @@ -218,7 +255,9 @@ def __init__(self, project, target_path, graph_type): self.context = { "run_started_at": datetime.now(), - "invocation_id": dbt.tracking.invocation_id, + "invocation_id" : dbt.tracking.invocation_id, + "get_columns_in_table" : self.schema.get_columns_in_table, + "get_missing_columns" : self.schema.get_missing_columns, } @@ -378,7 +417,9 @@ def run_from_graph(self, runner, limit_to): for m in relevant_compiled_models: if m.should_execute(): - m.compile(self.context) + context = self.context.copy() + context.update(m.context()) + m.compile(context) schema_name = self.target.schema @@ -430,3 +471,7 @@ def dry_run(self, limit_to=None): runner = DryRunner() return self.safe_run_from_graph(runner, limit_to) + def run_archive(self): + runner = ArchiveRunner() + return self.safe_run_from_graph(runner, None) + diff --git a/dbt/schema.py b/dbt/schema.py index 93ff42890ca..c76f243ee5a 100644 --- a/dbt/schema.py +++ b/dbt/schema.py @@ -118,6 +118,15 @@ def rename(self, schema, from_name, to_name): self.execute_and_handle_permissions(rename_query, from_name) self.logger.info("renamed model %s.%s --> %s.%s", schema, from_name, schema, to_name) + def get_missing_columns(self, from_schema, from_table, to_schema, to_table): + "Returns dict of {column:type} for columns in from_table that are missing from to_table" + from_columns = {col:dtype for (col,dtype) in self.get_columns_in_table(from_schema, from_table)} + to_columns = {col:dtype for (col,dtype) in self.get_columns_in_table(to_schema, to_table)} + + missing_columns = set(from_columns.keys()) - set(to_columns.keys()) + + return {col:dtype for (col, dtype) in from_columns.items() if col in missing_columns} + def create_table(self, schema, table, columns, sort, dist): fields = ['"{field}" {data_type}'.format(field=field, data_type=data_type) for (field, data_type) in columns] fields_csv = ",\n ".join(fields) diff --git a/dbt/task/archive.py b/dbt/task/archive.py index 0fa52782aed..40467b42286 100644 --- a/dbt/task/archive.py +++ b/dbt/task/archive.py @@ -1,10 +1,22 @@ - +from dbt.runner import RunManager +from dbt.templates import ArchiveInsertTemplate +from dbt.compilation import Compiler class ArchiveTask: def __init__(self, args, project): self.args = args self.project = project + self.create_template = ArchiveInsertTemplate + + def compile(self): + compiler = Compiler(self.project, self.create_template) + compiler.initialize() + compiler.compile_archives() def run(self): - pass + self.compile() + runner = RunManager(self.project, self.project['target-path'], self.create_template.label) + + results = runner.run_archive() + diff --git a/dbt/templates.py b/dbt/templates.py index aa04223a62c..3527f242098 100644 --- a/dbt/templates.py +++ b/dbt/templates.py @@ -167,6 +167,9 @@ def wrap(self, opts): return "{}\n\n{}".format(opts['prologue'], sql) +SCDGetColumnsInTable = """ +""" + SCDArchiveTemplate = """ with current_data as ( @@ -181,9 +184,11 @@ def wrap(self, opts): archived_data as ( select - {% for (col, type) in columns %} - "{{ col }}", - {% endfor %} + {% raw %} + {% for (col, type) in get_columns_in_table(source_schema, source_table) %} + "{{ col }}" {% if not loop.last %},{% endif %} + {% endfor %}, + {% endraw %} {{ updated_at }} as dbt_updated_at, {{ unique_key }} as dbt_pk from "{{ target_schema }}"."{{ dest_table }}" @@ -193,22 +198,26 @@ def wrap(self, opts): combined as ( select - {% for (col, type) in columns %} - "{{ col }}", - {% endfor %} - dbt_updated_at, - dbt_pk - from current_data + {% raw %} + {% for (col, type) in get_columns_in_table(source_schema, source_table) %} + "{{ col }}" {% if not loop.last %},{% endif %} + {% endfor %}, + {% endraw %} + dbt_updated_at, + dbt_pk + from current_data union all select - {% for (col, type) in columns %} - "{{ col }}", - {% endfor %} - dbt_updated_at, - dbt_pk - from archived_data + {% raw %} + {% for (col, type) in get_columns_in_table(source_schema, source_table) %} + "{{ col }}" {% if not loop.last %},{% endif %} + {% endfor %}, + {% endraw %} + dbt_updated_at, + dbt_pk + from archived_data ), @@ -234,17 +243,8 @@ def wrap(self, opts): from merged ) - -- - -- TODO : The order of scd_id and dbt_updated_at depends - -- on the order of col injections in archive.py - select - {% for (col, type) in columns %} - "{{ col }}", - {% endfor %} - valid_from, - valid_to, - md5(dbt_pk || '|' || dbt_archive_id) as scd_id, - dbt_updated_at + select *, + md5(dbt_pk || '|' || dbt_archive_id) as scd_id from with_id where num_changes > 1 @@ -255,7 +255,22 @@ class ArchiveInsertTemplate(object): label = "archive" + alter_template = """ +{% for (col, dtype) in get_missing_columns(source_schema, source_table, target_schema, dest_table).items() %} + alter table "{{ target_schema }}"."{{ dest_table }}" add column "{{ col }}" {{ dtype }}; +{% endfor %} +""" + + dest_cols = """ +{% for (col, type) in get_columns_in_table(target_schema, dest_table) %} + "{{ col }}" {% if not loop.last %},{% endif %} +{% endfor %} +""" + archival_template = """ + +{alter_template} + create temporary table "{identifier}__dbt_archival_tmp" as ( with dbt_archive_sbq as ( {query} @@ -268,11 +283,12 @@ class ArchiveInsertTemplate(object): ); insert into "{schema}"."{identifier}" ( - select * from "{identifier}__dbt_archival_tmp" -); + {dest_cols} +) +select {dest_cols} from "{identifier}__dbt_archival_tmp"; """ def wrap(self, schema, table, query, unique_key): - sql = self.archival_template.format(schema=schema, identifier=table, query=query, unique_key=unique_key) + sql = self.archival_template.format(schema=schema, identifier=table, query=query, unique_key=unique_key, alter_template=self.alter_template, dest_cols=self.dest_cols) return sql From 13546726b87ee374decad25410dbb7058bdd05a1 Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Wed, 12 Oct 2016 17:09:15 -0400 Subject: [PATCH 18/33] fix archival query, code cleanup --- dbt/archival.py | 4 ++ dbt/compilation.py | 9 ++++- dbt/runner.py | 1 - dbt/schema.py | 27 ++++++++++++- dbt/task/archive.py | 3 +- dbt/task/compile.py | 7 ++-- dbt/task/run.py | 8 ++-- dbt/task/test.py | 7 ++-- dbt/templates.py | 99 +++++++++++++++++++++++---------------------- 9 files changed, 102 insertions(+), 63 deletions(-) diff --git a/dbt/archival.py b/dbt/archival.py index 7c2dff8fe17..40a38a51d70 100644 --- a/dbt/archival.py +++ b/dbt/archival.py @@ -60,4 +60,8 @@ def compile(self): return rendered + def runtime_compile(self, compiled_model): + context = self.context.copy() + context.update(model.context()) + model.compile(context) diff --git a/dbt/compilation.py b/dbt/compilation.py index 116bb3dde78..336cbf9dc39 100644 --- a/dbt/compilation.py +++ b/dbt/compilation.py @@ -12,6 +12,8 @@ import time import sqlparse +CompilableEntities = ["models", "tests", "archives", "analyses"] + class Compiler(object): def __init__(self, project, create_template_class): self.project = project @@ -380,4 +382,9 @@ def compile(self, dry=False): compiled_archives = self.compile_archives() - return len(written_models), len(written_schema_tests), len(compiled_archives), len(written_analyses) + return { + "models": len(written_models), + "tests" : len(written_schema_tests), + "archives": len(compiled_archives), + "analyses" : len(written_analyses) + } diff --git a/dbt/runner.py b/dbt/runner.py index 8fdd35a6608..f44e22e4bb8 100644 --- a/dbt/runner.py +++ b/dbt/runner.py @@ -231,7 +231,6 @@ def status(self, result): return result.status def execute(self, schema, target, model): - print(model.compiled_contents) status = schema.execute_and_handle_permissions(model.compiled_contents, model.name) return status diff --git a/dbt/schema.py b/dbt/schema.py index c76f243ee5a..1b962de8a01 100644 --- a/dbt/schema.py +++ b/dbt/schema.py @@ -23,6 +23,20 @@ def __init__(self, project, target): self.target = target self.logger = logging.getLogger(__name__) + self.schema_cache = {} + + def cache_table_columns(self, schema, table, columns): + tid = (schema, table) + + if tid not in self.schema_cache: + self.schema_cache[tid] = columns + + return tid + + def get_table_columns_if_cached(self, schema, table): + tid = (schema, table) + return self.schema_cache.get(tid, None) + def get_schemas(self): existing = [] results = self.execute_and_fetch('select nspname from pg_catalog.pg_namespace') @@ -105,10 +119,19 @@ def drop(self, schema, relation_type, relation): self.logger.info("dropped %s %s.%s", relation_type, schema, relation) def get_columns_in_table(self, schema_name, table_name): - sql = self.target.sql_columns_in_table(schema_name, table_name) self.logger.debug("getting columns in table %s.%s", schema_name, table_name) + + columns = self.get_table_columns_if_cached(schema_name, table_name) + if columns is not None: + self.logger.debug("Found columns (in cache): %s", columns) + return columns + + sql = self.target.sql_columns_in_table(schema_name, table_name) results = self.execute_and_fetch(sql) columns = [(column, data_type) for (column, data_type) in results] + + self.cache_table_columns(schema_name, table_name, columns) + self.logger.debug("Found columns: %s", columns) return columns @@ -125,7 +148,7 @@ def get_missing_columns(self, from_schema, from_table, to_schema, to_table): missing_columns = set(from_columns.keys()) - set(to_columns.keys()) - return {col:dtype for (col, dtype) in from_columns.items() if col in missing_columns} + return [(col, dtype) for (col, dtype) in from_columns.items() if col in missing_columns] def create_table(self, schema, table, columns, sort, dist): fields = ['"{field}" {data_type}'.format(field=field, data_type=data_type) for (field, data_type) in columns] diff --git a/dbt/task/archive.py b/dbt/task/archive.py index 40467b42286..067b8a091dd 100644 --- a/dbt/task/archive.py +++ b/dbt/task/archive.py @@ -12,7 +12,8 @@ def __init__(self, args, project): def compile(self): compiler = Compiler(self.project, self.create_template) compiler.initialize() - compiler.compile_archives() + compiled = compiler.compile_archives() + print("Compiled {} archives".format(len(compiled))) def run(self): self.compile() diff --git a/dbt/task/compile.py b/dbt/task/compile.py index c39a13e82b3..f06941c027d 100644 --- a/dbt/task/compile.py +++ b/dbt/task/compile.py @@ -1,5 +1,5 @@ -from dbt.compilation import Compiler +from dbt.compilation import Compiler, CompilableEntities from dbt.templates import BaseCreateTemplate, DryCreateTemplate @@ -16,6 +16,7 @@ def run(self): compiler = Compiler(self.project, create_template) compiler.initialize() - created_models, created_tests, created_archives, created_analyses = compiler.compile(dry=self.args.dry) + results = compiler.compile(dry=self.args.dry) - print("Compiled {} models, {} tests, {} archives and {} analyses".format(created_models, created_tests, created_archives, created_analyses)) + stat_line = ", ".join(["{} {}".format(results[k], k) for k in CompilableEntities]) + print("Compiled {}".format(stat_line)) diff --git a/dbt/task/run.py b/dbt/task/run.py index 8007a5dcc36..ef61d6cf9f5 100644 --- a/dbt/task/run.py +++ b/dbt/task/run.py @@ -4,7 +4,7 @@ import os from dbt.templates import DryCreateTemplate, BaseCreateTemplate from dbt.runner import RunManager -from dbt.compilation import Compiler +from dbt.compilation import Compiler, CompilableEntities THREAD_LIMIT = 9 @@ -17,8 +17,10 @@ def compile(self): create_template = DryCreateTemplate if self.args.dry else BaseCreateTemplate compiler = Compiler(self.project, create_template) compiler.initialize() - created_models, created_tests, created_analyses = compiler.compile(self.args.dry) - print("Compiled {} models, {} tests, and {} analyses".format(created_models, created_tests, created_analyses)) + results = compiler.compile(self.args.dry) + + stat_line = ", ".join(["{} {}".format(results[k], k) for k in CompilableEntities]) + print("Compiled {}".format(stat_line)) return create_template.label diff --git a/dbt/task/test.py b/dbt/task/test.py index c52250720b4..75204ded627 100644 --- a/dbt/task/test.py +++ b/dbt/task/test.py @@ -3,7 +3,7 @@ import psycopg2 import yaml -from dbt.compilation import Compiler +from dbt.compilation import Compiler, CompilableEntities from dbt.templates import DryCreateTemplate, BaseCreateTemplate from dbt.runner import RunManager from dbt.schema_tester import SchemaTester @@ -25,9 +25,10 @@ def __init__(self, args, project): def compile(self): compiler = Compiler(self.project, BaseCreateTemplate) compiler.initialize() + results = compiler.compile() - created_models, created_tests, created_analyses = compiler.compile() - print("Compiled {} models, {} tests, and {} analyses".format(created_models, created_tests, created_analyses)) + stat_line = ", ".join(["{} {}".format(results[k], k) for k in CompilableEntities]) + print("Compiled {}".format(stat_line)) return compiler diff --git a/dbt/templates.py b/dbt/templates.py index 3527f242098..75919384f8a 100644 --- a/dbt/templates.py +++ b/dbt/templates.py @@ -172,11 +172,14 @@ def wrap(self, opts): SCDArchiveTemplate = """ + with current_data as ( select *, {{ updated_at }} as dbt_updated_at, - {{ unique_key }} as dbt_pk + {{ unique_key }} as dbt_pk, + {{ updated_at }} as valid_from, + null::timestamp as tmp_valid_to from "{{ source_schema }}"."{{ source_table }}" ), @@ -190,64 +193,50 @@ def wrap(self, opts): {% endfor %}, {% endraw %} {{ updated_at }} as dbt_updated_at, - {{ unique_key }} as dbt_pk + {{ unique_key }} as dbt_pk, + valid_from, + valid_to as tmp_valid_to from "{{ target_schema }}"."{{ dest_table }}" ), - combined as ( + insertions as ( select - {% raw %} - {% for (col, type) in get_columns_in_table(source_schema, source_table) %} - "{{ col }}" {% if not loop.last %},{% endif %} - {% endfor %}, - {% endraw %} - dbt_updated_at, - dbt_pk - from current_data + current_data.*, + null::timestamp as valid_to + from current_data + left outer join archived_data on archived_data.dbt_pk = current_data.dbt_pk + where archived_data.dbt_pk is null or ( + archived_data.dbt_pk is not null and + current_data.dbt_updated_at > archived_data.dbt_updated_at and + archived_data.tmp_valid_to is null + ) + ), - union all + updates as ( select - {% raw %} - {% for (col, type) in get_columns_in_table(source_schema, source_table) %} - "{{ col }}" {% if not loop.last %},{% endif %} - {% endfor %}, - {% endraw %} - dbt_updated_at, - dbt_pk - from archived_data - + archived_data.*, + current_data.dbt_updated_at as valid_to + from current_data + left outer join archived_data on archived_data.dbt_pk = current_data.dbt_pk + where archived_data.dbt_pk is not null + and archived_data.dbt_updated_at < current_data.dbt_updated_at + and archived_data.tmp_valid_to is null ), merged as ( - select - distinct - combined.*, - least(combined.dbt_updated_at, current_data.dbt_updated_at) as valid_from, - case when combined.dbt_updated_at = current_data.dbt_updated_at then null - else current_data.dbt_updated_at - end as valid_to - from current_data - left outer join combined - on combined.dbt_pk = current_data.dbt_pk - and current_data.dbt_updated_at >= combined.dbt_updated_at + select *, 'update' as change_type from updates + union all + select *, 'insert' as change_type from insertions - ), - with_id as ( - select *, - row_number() over (partition by dbt_pk order by dbt_updated_at asc) as dbt_archive_id, - count(*) over (partition by dbt_pk) as num_changes - from merged ) select *, - md5(dbt_pk || '|' || dbt_archive_id) as scd_id - - from with_id - where num_changes > 1 + md5(dbt_pk || '|' || dbt_updated_at) as scd_id + from merged """ @@ -255,20 +244,30 @@ class ArchiveInsertTemplate(object): label = "archive" + + # missing_columns : columns in source_table that are missing from dest_table (used for the ALTER) + # dest_columns : columns in the dest table (post-alter!) + definitions = """ +{% set missing_columns = get_missing_columns(source_schema, source_table, target_schema, dest_table) %} +{% set dest_columns = get_columns_in_table(target_schema, dest_table) + missing_columns %} +""" + alter_template = """ -{% for (col, dtype) in get_missing_columns(source_schema, source_table, target_schema, dest_table).items() %} +{% for (col, dtype) in missing_columns %} alter table "{{ target_schema }}"."{{ dest_table }}" add column "{{ col }}" {{ dtype }}; {% endfor %} """ dest_cols = """ -{% for (col, type) in get_columns_in_table(target_schema, dest_table) %} +{% for (col, type) in dest_columns %} "{{ col }}" {% if not loop.last %},{% endif %} {% endfor %} """ archival_template = """ +{definitions} + {alter_template} create temporary table "{identifier}__dbt_archival_tmp" as ( @@ -278,17 +277,19 @@ class ArchiveInsertTemplate(object): select * from dbt_archive_sbq ); -delete from "{schema}"."{identifier}" where ({unique_key}) in ( - select ({unique_key}) from "{identifier}__dbt_archival_tmp" -); +update "{schema}"."{identifier}" as archive set valid_to = tmp.valid_to +from "{identifier}__dbt_archival_tmp" as tmp +where tmp.scd_id = archive.scd_id + and change_type = 'update'; insert into "{schema}"."{identifier}" ( {dest_cols} ) -select {dest_cols} from "{identifier}__dbt_archival_tmp"; +select {dest_cols} from "{identifier}__dbt_archival_tmp" +where change_type = 'insert'; """ def wrap(self, schema, table, query, unique_key): - sql = self.archival_template.format(schema=schema, identifier=table, query=query, unique_key=unique_key, alter_template=self.alter_template, dest_cols=self.dest_cols) + sql = self.archival_template.format(schema=schema, identifier=table, query=query, unique_key=unique_key, alter_template=self.alter_template, dest_cols=self.dest_cols, definitions=self.definitions) return sql From fbf7facc5eeccbabfa1698b70cec8d33e071b224 Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Wed, 12 Oct 2016 17:14:59 -0400 Subject: [PATCH 19/33] archive arg consistency --- dbt/archival.py | 6 +++--- dbt/model.py | 12 ++++++------ dbt/templates.py | 10 +++++----- 3 files changed, 14 insertions(+), 14 deletions(-) diff --git a/dbt/archival.py b/dbt/archival.py index 40a38a51d70..50638843b23 100644 --- a/dbt/archival.py +++ b/dbt/archival.py @@ -19,7 +19,7 @@ def compile(self): source_schema = self.archive_model.source_schema target_schema = self.archive_model.target_schema source_table = self.archive_model.source_table - dest_table = self.archive_model.dest_table + target_table = self.archive_model.target_table unique_key = self.archive_model.unique_key updated_at = self.archive_model.updated_at @@ -40,7 +40,7 @@ def compile(self): ] dest_columns = source_columns + extra_cols - self.schema.create_table(target_schema, dest_table, dest_columns, sort=updated_at, dist=unique_key) + self.schema.create_table(target_schema, target_table, dest_columns, sort=updated_at, dist=unique_key) env = jinja2.Environment() @@ -51,7 +51,7 @@ def compile(self): "source_schema" : source_schema, "source_table" : source_table, "target_schema" : target_schema, - "dest_table" : dest_table + "target_table" : target_table } base_query = dbt.templates.SCDArchiveTemplate diff --git a/dbt/model.py b/dbt/model.py index 74849250352..6cf972e3353 100644 --- a/dbt/model.py +++ b/dbt/model.py @@ -574,19 +574,19 @@ def __repr__(self): class ArchiveModel(DBTSource): dbt_run_type = 'archive' - def __init__(self, project, create_template, source_schema, target_schema, source_table, dest_table, unique_key, updated_at): + def __init__(self, project, create_template, source_schema, target_schema, source_table, target_table, unique_key, updated_at): self.create_template = create_template self.source_schema = source_schema self.target_schema = target_schema self.source_table = source_table - self.dest_table = dest_table + self.target_table = target_table self.unique_key = unique_key self.updated_at = updated_at target_dir = self.create_template.label - rel_filepath = os.path.join(self.target_schema, self.dest_table) + rel_filepath = os.path.join(self.target_schema, self.target_table) super(ArchiveModel, self).__init__(project, target_dir, rel_filepath, project) @@ -597,7 +597,7 @@ def serialize(self): "source_schema" : self.source_schema, "target_schema" : self.target_schema, "source_table" : self.source_table, - "dest_table" : self.dest_table, + "target_table" : self.target_table, "unique_key" : self.unique_key, "updated_at" : self.updated_at } @@ -609,7 +609,7 @@ def compile(self): archival = dbt.archival.Archival(self.project, self) query = archival.compile() - sql = self.create_template.wrap(self.target_schema, self.dest_table, query, self.unique_key) + sql = self.create_template.wrap(self.target_schema, self.target_table, query, self.unique_key) return sql def build_path(self): @@ -619,4 +619,4 @@ def build_path(self): return os.path.join(*path_parts) def __repr__(self): - return " {} unique:{} updated_at:{}>".format(self.source_table, self.dest_table, self.unique_key, self.updated_at) + return " {} unique:{} updated_at:{}>".format(self.source_table, self.target_table, self.unique_key, self.updated_at) diff --git a/dbt/templates.py b/dbt/templates.py index 75919384f8a..51bdfef5900 100644 --- a/dbt/templates.py +++ b/dbt/templates.py @@ -196,7 +196,7 @@ def wrap(self, opts): {{ unique_key }} as dbt_pk, valid_from, valid_to as tmp_valid_to - from "{{ target_schema }}"."{{ dest_table }}" + from "{{ target_schema }}"."{{ target_table }}" ), @@ -245,16 +245,16 @@ class ArchiveInsertTemplate(object): label = "archive" - # missing_columns : columns in source_table that are missing from dest_table (used for the ALTER) + # missing_columns : columns in source_table that are missing from target_table (used for the ALTER) # dest_columns : columns in the dest table (post-alter!) definitions = """ -{% set missing_columns = get_missing_columns(source_schema, source_table, target_schema, dest_table) %} -{% set dest_columns = get_columns_in_table(target_schema, dest_table) + missing_columns %} +{% set missing_columns = get_missing_columns(source_schema, source_table, target_schema, target_table) %} +{% set dest_columns = get_columns_in_table(target_schema, target_table) + missing_columns %} """ alter_template = """ {% for (col, dtype) in missing_columns %} - alter table "{{ target_schema }}"."{{ dest_table }}" add column "{{ col }}" {{ dtype }}; + alter table "{{ target_schema }}"."{{ target_table }}" add column "{{ col }}" {{ dtype }}; {% endfor %} """ From 35ff58490699ef4ccfb1c8d98644b83d9f05960c Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Wed, 12 Oct 2016 17:22:51 -0400 Subject: [PATCH 20/33] harden input validation for archival --- dbt/model.py | 30 +++++++++++++++++++++++------- dbt/runner.py | 2 +- dbt/source.py | 2 +- 3 files changed, 25 insertions(+), 9 deletions(-) diff --git a/dbt/model.py b/dbt/model.py index 6cf972e3353..6ae011f2dcb 100644 --- a/dbt/model.py +++ b/dbt/model.py @@ -574,22 +574,38 @@ def __repr__(self): class ArchiveModel(DBTSource): dbt_run_type = 'archive' - def __init__(self, project, create_template, source_schema, target_schema, source_table, target_table, unique_key, updated_at): + def __init__(self, project, create_template, archive_data): self.create_template = create_template - self.source_schema = source_schema - self.target_schema = target_schema - self.source_table = source_table - self.target_table = target_table - self.unique_key = unique_key - self.updated_at = updated_at + self.validate(archive_data) + + self.source_schema = archive_data['source_schema'] + self.target_schema = archive_data['target_schema'] + self.source_table = archive_data['source_table'] + self.target_table = archive_data['target_table'] + self.unique_key = archive_data['unique_key'] + self.updated_at = archive_data['updated_at'] target_dir = self.create_template.label rel_filepath = os.path.join(self.target_schema, self.target_table) super(ArchiveModel, self).__init__(project, target_dir, rel_filepath, project) + def validate(self, data): + required = [ + 'source_schema', + 'target_schema', + 'source_table', + 'target_table', + 'unique_key', + 'updated_at', + ] + + for key in required: + if data.get(key, None) is None: + raise RuntimeError("Invalid archive config: missing required field '{}'".format(key)) + def serialize(self): data = DBTSource.serialize(self).copy() diff --git a/dbt/runner.py b/dbt/runner.py index f44e22e4bb8..ebbcd018f86 100644 --- a/dbt/runner.py +++ b/dbt/runner.py @@ -339,7 +339,7 @@ def execute_models(self, runner, model_dependency_list, on_failure): num_models = len(flat_models) if num_models == 0: - print("WARNING: No models to run in '{}'. Try checking your model configs and running `dbt compile`".format(self.target_path)) + print("WARNING: Nothing to do. Try checking your model configs and running `dbt compile`".format(self.target_path)) return [] num_threads = self.target.threads diff --git a/dbt/source.py b/dbt/source.py index 1e1021909fa..1d98d9239c9 100644 --- a/dbt/source.py +++ b/dbt/source.py @@ -77,7 +77,7 @@ def get_archives(self, create_template): for table in tables: fields = table.copy() fields.update(schema) - archives.append(ArchiveModel(self.project, create_template, **fields)) + archives.append(ArchiveModel(self.project, create_template, fields)) return archives From 6728a4ba1483a537eeeef6570711f22221a9bf82 Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Sat, 15 Oct 2016 03:46:17 -0400 Subject: [PATCH 21/33] split up transaction --- dbt/runner.py | 21 ++++++++++++++++++++- dbt/schema.py | 25 +++++++++++++++++++++++++ dbt/templates.py | 2 ++ 3 files changed, 47 insertions(+), 1 deletion(-) diff --git a/dbt/runner.py b/dbt/runner.py index ebbcd018f86..e712b7aef27 100644 --- a/dbt/runner.py +++ b/dbt/runner.py @@ -6,6 +6,8 @@ import logging import time import itertools +import re +import yaml from datetime import datetime from dbt.compilation import Compiler @@ -101,7 +103,23 @@ def execute(self, schema, target, model): if model.tmp_drop_type is not None: schema.drop(target.schema, model.tmp_drop_type, model.tmp_name) - status = schema.execute_and_handle_permissions(model.compiled_contents, model.name) + parts = re.split(r'-- DBT_OPERATION (.*)', model.compiled_contents) + handle = None + for i, part in enumerate(parts): + if re.match(r'^{.*}$', part) is not None: + instruction = yaml.safe_load(part) + function = instruction['function'] + kwargs = instruction['args'] + + func_map = { + 'expand_column_types_if_needed': lambda kwargs: schema.expand_column_types_if_needed(**kwargs), + } + + func_map[function](kwargs) + else: + handle, status = schema.execute_without_auto_commit(part, handle) + + handle.commit() if model.final_drop_type is not None: schema.drop(target.schema, model.final_drop_type, model.name) @@ -474,3 +492,4 @@ def run_archive(self): runner = ArchiveRunner() return self.safe_run_from_graph(runner, None) + diff --git a/dbt/schema.py b/dbt/schema.py index 1b962de8a01..f34fd12e213 100644 --- a/dbt/schema.py +++ b/dbt/schema.py @@ -112,6 +112,27 @@ def execute_and_handle_permissions(self, query, model_name): else: raise e + def execute_without_auto_commit(self, sql, handle=None): + if handle is None: + handle = self.target.get_handle() + + cursor = handle.cursor() + + try: + self.logger.debug("SQL: %s", sql) + pre = time.time() + cursor.execute(sql) + post = time.time() + self.logger.debug("SQL status: %s in %0.2f seconds", cursor.statusmessage, post-pre) + return handle, cursor.statusmessage + except Exception as e: + self.target.rollback() + self.logger.exception("Error running SQL: %s", sql) + self.logger.debug("rolling back connection") + raise e + finally: + cursor.close() + def drop(self, schema, relation_type, relation): sql = 'drop {relation_type} if exists "{schema}"."{relation}" cascade'.format(schema=schema, relation_type=relation_type, relation=relation) self.logger.info("dropping %s %s.%s", relation_type, schema, relation) @@ -164,3 +185,7 @@ def create_schema_if_not_exists(self, schema_name): if schema_name not in schemas: self.create_schema(schema_name) + def expand_column_types_if_needed(self, from_table, to_schema, to_table): + "The hard part!" + pass + diff --git a/dbt/templates.py b/dbt/templates.py index 51bdfef5900..100653e6573 100644 --- a/dbt/templates.py +++ b/dbt/templates.py @@ -25,6 +25,8 @@ class BaseCreateTemplate(object): where ({sql_where}) or ({sql_where}) is null ); +-- DBT_OPERATION {{ function: expand_column_types_if_needed, args: {{ from_table: "{identifier}__dbt_incremental_tmp", to_schema: "{schema}", to_table: "{identifier}"}} }} + {incremental_delete_statement} insert into "{schema}"."{identifier}" ( From 007d7c6aead70f1014ad11ef0ee73c5d7c76c0b4 Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Sat, 15 Oct 2016 13:59:56 -0400 Subject: [PATCH 22/33] expand column types. Only varchar for now --- dbt/schema.py | 73 ++++++++++++++++++++++++++++++++++++++++++++---- dbt/targets.py | 26 ++++++++++++----- dbt/templates.py | 2 +- 3 files changed, 88 insertions(+), 13 deletions(-) diff --git a/dbt/schema.py b/dbt/schema.py index f34fd12e213..1c4f67252a2 100644 --- a/dbt/schema.py +++ b/dbt/schema.py @@ -2,6 +2,7 @@ import psycopg2 import logging import time +import re SCHEMA_PERMISSION_DENIED_MESSAGE = """The user '{user}' does not have sufficient permissions to create the schema '{schema}'. Either create the schema manually, or adjust the permissions of the '{user}' user.""" @@ -139,11 +140,11 @@ def drop(self, schema, relation_type, relation): self.execute_and_handle_permissions(sql, relation) self.logger.info("dropped %s %s.%s", relation_type, schema, relation) - def get_columns_in_table(self, schema_name, table_name): + def get_columns_in_table(self, schema_name, table_name, use_cached=True): self.logger.debug("getting columns in table %s.%s", schema_name, table_name) columns = self.get_table_columns_if_cached(schema_name, table_name) - if columns is not None: + if columns is not None and use_cached: self.logger.debug("Found columns (in cache): %s", columns) return columns @@ -185,7 +186,69 @@ def create_schema_if_not_exists(self, schema_name): if schema_name not in schemas: self.create_schema(schema_name) - def expand_column_types_if_needed(self, from_table, to_schema, to_table): - "The hard part!" - pass + def get_varchar_size(self, column_type): + if column_type == 'text': + return 255 + matches = re.match(r'character varying\((\d+)\)', column_type) + if matches is None: + return None + else: + return int(matches.groups()[0]) + + def is_varchar_field(self, dtype): + return dtype.startswith('character') or dtype == 'text' + + def expand_column_to_type(self, source_type, dest_type): + if not self.is_varchar_field(source_type) or not self.is_varchar_field(dest_type): + return None + + source_size = self.get_varchar_size(source_type) + dest_size = self.get_varchar_size(dest_type) + + if source_size is not None and dest_size is not None and source_size > dest_size: + return 'character varying({})'.format(source_size) + else: + return None + + def alter_column_type(self, schema, table, column_name, new_column_type): + """ + 1. Create a new column (w/ temp name and correct type) + 2. Copy data over to it + 3. Drop the existing column + 4. Rename the new column to existing column + """ + + opts = { + "schema": schema, + "table": table, + "old_column": column_name, + "tmp_column": "{}__dbt_alter".format(column_name), + "dtype": new_column_type + } + + sql = """ + alter table "{schema}"."{table}" add column "{tmp_column}" {dtype}; + update "{schema}"."{table}" set "{tmp_column}" = "{old_column}"; + alter table "{schema}"."{table}" drop column "{old_column}"; + alter table "{schema}"."{table}" rename column "{tmp_column}" to "{old_column}"; + """.format(**opts) + + status = self.execute(sql) + return status + + def expand_column_types_if_needed(self, temp_table, to_schema, to_table): + source_columns = self.get_columns_in_table(None, temp_table) + dest_columns = self.get_columns_in_table(to_schema, to_table) + + if len(source_columns) != len(dest_columns): + raise RuntimeError("Staging table and model table have different numbers of columns. staging={}, dest={}".format(temp_table, to_table)) + + for (source_col, dest_col) in zip(source_columns, dest_columns): + source_name, source_type = source_col + dest_name, dest_type = dest_col + + if source_type != dest_type: + new_type = self.expand_column_to_type(source_type, dest_type) + if new_type is not None: + self.alter_column_type(to_schema, to_table, dest_name, new_type) diff --git a/dbt/targets.py b/dbt/targets.py index 7703ec13bc3..8fd47983ea0 100644 --- a/dbt/targets.py +++ b/dbt/targets.py @@ -116,11 +116,13 @@ def __init__(self, cfg): def sql_columns_in_table(self, schema_name, table_name): - return """ + sql = """ select "column" as column_name, "type" as "data_type" from pg_table_def - where schemaname = '{schema_name}' and tablename = '{table_name}' - """.format(schema_name=schema_name, table_name=table_name).strip() + where tablename = '{table_name}'""".format(table_name=table_name).strip() + if schema_name is not None: + sql += " AND schemaname = '{schema_name}'".format(schema_name) + return sql @property def context(self): @@ -133,11 +135,21 @@ def __init__(self, cfg): super(PostgresTarget, self).__init__(cfg) def sql_columns_in_table(self, schema_name, table_name): - return """ - select column_name, data_type + sql = """ + select column_name, + -- conform to redshift pg_table_def output + case when data_type = 'character varying' then + data_type || '(' || character_maximum_length || ')' + else + data_type + end as data_type from information_schema.columns - where table_schema = '{schema_name}' and table_name = '{table_name}' - """.format(schema_name=schema_name, table_name=table_name).strip() + where table_name = '{table_name}'""".format(table_name=table_name).strip() + + if schema_name is not None: + sql += " AND table_schema = '{schema_name}'".format(schema_name=schema_name) + + return sql @property def context(self): diff --git a/dbt/templates.py b/dbt/templates.py index 100653e6573..a79a159d063 100644 --- a/dbt/templates.py +++ b/dbt/templates.py @@ -25,7 +25,7 @@ class BaseCreateTemplate(object): where ({sql_where}) or ({sql_where}) is null ); --- DBT_OPERATION {{ function: expand_column_types_if_needed, args: {{ from_table: "{identifier}__dbt_incremental_tmp", to_schema: "{schema}", to_table: "{identifier}"}} }} +-- DBT_OPERATION {{ function: expand_column_types_if_needed, args: {{ temp_table: "{identifier}__dbt_incremental_tmp", to_schema: "{schema}", to_table: "{identifier}"}} }} {incremental_delete_statement} From 0ae01dbfea77b281c175c2e504fc51c37714212d Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Sat, 15 Oct 2016 14:33:49 -0400 Subject: [PATCH 23/33] clean up dbt operation parsing --- dbt/runner.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/dbt/runner.py b/dbt/runner.py index e712b7aef27..c3adc8cd561 100644 --- a/dbt/runner.py +++ b/dbt/runner.py @@ -103,11 +103,13 @@ def execute(self, schema, target, model): if model.tmp_drop_type is not None: schema.drop(target.schema, model.tmp_drop_type, model.tmp_name) - parts = re.split(r'-- DBT_OPERATION (.*)', model.compiled_contents) + parts = re.split(r'-- (DBT_OPERATION .*)', model.compiled_contents) handle = None for i, part in enumerate(parts): - if re.match(r'^{.*}$', part) is not None: - instruction = yaml.safe_load(part) + matches = re.match(r'^DBT_OPERATION ({.*})$', part) + if matches is not None: + instruction_string = matches.groups()[0] + instruction = yaml.safe_load(instruction_string) function = instruction['function'] kwargs = instruction['args'] From 27b0ba59fe2212d26fefe78b3c5e92afdf32454a Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Sat, 15 Oct 2016 15:26:46 -0400 Subject: [PATCH 24/33] make it work for archives --- dbt/runner.py | 46 ++++++++++++++++++++++++++-------------------- dbt/schema.py | 15 +++++++-------- dbt/targets.py | 2 +- dbt/templates.py | 7 +++---- 4 files changed, 37 insertions(+), 33 deletions(-) diff --git a/dbt/runner.py b/dbt/runner.py index c3adc8cd561..a5441f83ec1 100644 --- a/dbt/runner.py +++ b/dbt/runner.py @@ -65,6 +65,30 @@ def pre_run_all(self, models): def status(self, result): raise NotImplementedError("not implemented") + def execute_contents(self, schema, target, model): + parts = re.split(r'-- (DBT_OPERATION .*)', model.compiled_contents) + handle = None + + status = 'None' + for i, part in enumerate(parts): + matches = re.match(r'^DBT_OPERATION ({.*})$', part) + if matches is not None: + instruction_string = matches.groups()[0] + instruction = yaml.safe_load(instruction_string) + function = instruction['function'] + kwargs = instruction['args'] + + func_map = { + 'expand_column_types_if_needed': lambda kwargs: schema.expand_column_types_if_needed(**kwargs), + } + + func_map[function](kwargs) + else: + handle, status = schema.execute_without_auto_commit(part, handle) + + handle.commit() + return status + class ModelRunner(BaseRunner): run_type = 'run' def pre_run_msg(self, model): @@ -103,25 +127,7 @@ def execute(self, schema, target, model): if model.tmp_drop_type is not None: schema.drop(target.schema, model.tmp_drop_type, model.tmp_name) - parts = re.split(r'-- (DBT_OPERATION .*)', model.compiled_contents) - handle = None - for i, part in enumerate(parts): - matches = re.match(r'^DBT_OPERATION ({.*})$', part) - if matches is not None: - instruction_string = matches.groups()[0] - instruction = yaml.safe_load(instruction_string) - function = instruction['function'] - kwargs = instruction['args'] - - func_map = { - 'expand_column_types_if_needed': lambda kwargs: schema.expand_column_types_if_needed(**kwargs), - } - - func_map[function](kwargs) - else: - handle, status = schema.execute_without_auto_commit(part, handle) - - handle.commit() + status = self.execute_contents(schema, target, model) if model.final_drop_type is not None: schema.drop(target.schema, model.final_drop_type, model.name) @@ -251,7 +257,7 @@ def status(self, result): return result.status def execute(self, schema, target, model): - status = schema.execute_and_handle_permissions(model.compiled_contents, model.name) + status = self.execute_contents(schema, target, model) return status class RunManager(object): diff --git a/dbt/schema.py b/dbt/schema.py index 1c4f67252a2..371c2ce2d4a 100644 --- a/dbt/schema.py +++ b/dbt/schema.py @@ -237,18 +237,17 @@ def alter_column_type(self, schema, table, column_name, new_column_type): return status def expand_column_types_if_needed(self, temp_table, to_schema, to_table): - source_columns = self.get_columns_in_table(None, temp_table) - dest_columns = self.get_columns_in_table(to_schema, to_table) + source_columns = {k:v for (k,v) in self.get_columns_in_table(None, temp_table)} + dest_columns = {k:v for (k,v) in self.get_columns_in_table(to_schema, to_table)} - if len(source_columns) != len(dest_columns): - raise RuntimeError("Staging table and model table have different numbers of columns. staging={}, dest={}".format(temp_table, to_table)) + for column_name, source_type in source_columns.items(): + dest_type = dest_columns.get(column_name) - for (source_col, dest_col) in zip(source_columns, dest_columns): - source_name, source_type = source_col - dest_name, dest_type = dest_col + if dest_type is None: + continue if source_type != dest_type: new_type = self.expand_column_to_type(source_type, dest_type) if new_type is not None: - self.alter_column_type(to_schema, to_table, dest_name, new_type) + self.alter_column_type(to_schema, to_table, column_name, new_type) diff --git a/dbt/targets.py b/dbt/targets.py index 8fd47983ea0..b2aa4381f2b 100644 --- a/dbt/targets.py +++ b/dbt/targets.py @@ -139,7 +139,7 @@ def sql_columns_in_table(self, schema_name, table_name): select column_name, -- conform to redshift pg_table_def output case when data_type = 'character varying' then - data_type || '(' || character_maximum_length || ')' + data_type || '(' || coalesce(character_maximum_length, 255) || ')' else data_type end as data_type diff --git a/dbt/templates.py b/dbt/templates.py index a79a159d063..aad640acb6a 100644 --- a/dbt/templates.py +++ b/dbt/templates.py @@ -128,6 +128,7 @@ class DryCreateTemplate(object): limit 0 ); +-- DBT_OPERATION {{ function: expand_column_types_if_needed, args: {{ temp_table: "{identifier}__dbt_incremental_tmp", to_schema: "{schema}", to_table: "{identifier}"}} }} {incremental_delete_statement} @@ -169,10 +170,6 @@ def wrap(self, opts): return "{}\n\n{}".format(opts['prologue'], sql) -SCDGetColumnsInTable = """ -""" - - SCDArchiveTemplate = """ with current_data as ( @@ -279,6 +276,8 @@ class ArchiveInsertTemplate(object): select * from dbt_archive_sbq ); +-- DBT_OPERATION {{ function: expand_column_types_if_needed, args: {{ temp_table: "{identifier}__dbt_archival_tmp", to_schema: "{schema}", to_table: "{identifier}"}} }} + update "{schema}"."{identifier}" as archive set valid_to = tmp.valid_to from "{identifier}__dbt_archival_tmp" as tmp where tmp.scd_id = archive.scd_id From 2ef1886a6def085cb5ea17e4f4cbe086e4a4e0cb Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Thu, 20 Oct 2016 13:57:47 -0400 Subject: [PATCH 25/33] Use column object --- dbt/archival.py | 8 +-- dbt/main.py | 1 + dbt/schema.py | 115 ++++++++++++++++++++++++++++---------------- dbt/targets.py | 26 ---------- dbt/task/archive.py | 2 +- dbt/templates.py | 10 ++-- 6 files changed, 85 insertions(+), 77 deletions(-) diff --git a/dbt/archival.py b/dbt/archival.py index 50638843b23..6ee97cf5634 100644 --- a/dbt/archival.py +++ b/dbt/archival.py @@ -33,10 +33,10 @@ def compile(self): # create archive table if not exists! TODO: Sort & Dist keys! Hmmmm extra_cols = [ - ("valid_from", "timestamp"), - ("valid_to", "timestamp"), - ("scd_id","text"), - ("dbt_updated_at","timestamp") + dbt.schema.Column("valid_from", "timestamp", None), + dbt.schema.Column("valid_to", "timestamp", None), + dbt.schema.Column("scd_id","text", None), + dbt.schema.Column("dbt_updated_at","timestamp", None) ] dest_columns = source_columns + extra_cols diff --git a/dbt/main.py b/dbt/main.py index a6adc74f504..5c646afba70 100644 --- a/dbt/main.py +++ b/dbt/main.py @@ -73,6 +73,7 @@ def handle(args): sub.set_defaults(cls=deps_task.DepsTask, which='deps') sub = subs.add_parser('archive', parents=[base_subparser]) + sub.add_argument('--threads', type=int, required=False, help="Specify number of threads to use while archiving tables. Overrides settings in profiles.yml") sub.set_defaults(cls=archive_task.ArchiveTask, which='archive') sub = subs.add_parser('run', parents=[base_subparser]) diff --git a/dbt/schema.py b/dbt/schema.py index 371c2ce2d4a..30d2e3c0a31 100644 --- a/dbt/schema.py +++ b/dbt/schema.py @@ -18,6 +18,51 @@ This is likely because the relation was created by a different user. Either delete the model "{schema}"."{model}" manually, or adjust the permissions of the '{user}' user in the '{schema}' schema.""" +class Column(object): + def __init__(self, column, dtype, char_size): + self.column = column + self.dtype = dtype + self.char_size = char_size + + @property + def name(self): + return self.column + + @property + def data_type(self): + if self.is_string(): + return Column.string_type(self.string_size()) + else: + return self.dtype + + def is_string(self): + return self.dtype in ['text', 'character varying'] + + def string_size(self): + if not self.is_string(): + raise RuntimeError("Called string_size() on non-string field!") + + if self.dtype == 'text' or self.char_size is None: + # char_size should never be None. Handle it reasonably just in case + return 255 + else: + return int(self.char_size) + + def can_expand_to(self, other_column): + "returns True if this column can be expanded to the size of the other column" + if not self.is_string() or not other_column.is_string(): + return False + + if other_column.string_size() > self.string_size(): + return True + + @classmethod + def string_type(cls, size): + return "character varying({})".format(size) + + def __repr__(self): + return "".format(self.name, self.data_type) + class Schema(object): def __init__(self, project, target): self.project = project @@ -140,6 +185,17 @@ def drop(self, schema, relation_type, relation): self.execute_and_handle_permissions(sql, relation) self.logger.info("dropped %s %s.%s", relation_type, schema, relation) + def sql_columns_in_table(self, schema_name, table_name): + sql = """ + select column_name, data_type, character_maximum_length + from information_schema.columns + where table_name = '{table_name}'""".format(table_name=table_name).strip() + + if schema_name is not None: + sql += " AND table_schema = '{schema_name}'".format(schema_name=schema_name) + + return sql + def get_columns_in_table(self, schema_name, table_name, use_cached=True): self.logger.debug("getting columns in table %s.%s", schema_name, table_name) @@ -148,9 +204,14 @@ def get_columns_in_table(self, schema_name, table_name, use_cached=True): self.logger.debug("Found columns (in cache): %s", columns) return columns - sql = self.target.sql_columns_in_table(schema_name, table_name) + sql = self.sql_columns_in_table(schema_name, table_name) results = self.execute_and_fetch(sql) - columns = [(column, data_type) for (column, data_type) in results] + + columns = [] + for result in results: + column, data_type, char_size = result + col = Column(column, data_type, char_size) + columns.append(col) self.cache_table_columns(schema_name, table_name, columns) @@ -165,15 +226,15 @@ def rename(self, schema, from_name, to_name): def get_missing_columns(self, from_schema, from_table, to_schema, to_table): "Returns dict of {column:type} for columns in from_table that are missing from to_table" - from_columns = {col:dtype for (col,dtype) in self.get_columns_in_table(from_schema, from_table)} - to_columns = {col:dtype for (col,dtype) in self.get_columns_in_table(to_schema, to_table)} + from_columns = {col.name:col for col in self.get_columns_in_table(from_schema, from_table)} + to_columns = {col.name:col for col in self.get_columns_in_table(to_schema, to_table)} missing_columns = set(from_columns.keys()) - set(to_columns.keys()) - return [(col, dtype) for (col, dtype) in from_columns.items() if col in missing_columns] + return [col for (col_name, col) in from_columns.items() if col_name in missing_columns] def create_table(self, schema, table, columns, sort, dist): - fields = ['"{field}" {data_type}'.format(field=field, data_type=data_type) for (field, data_type) in columns] + fields = ['"{field}" {data_type}'.format(field=column.name, data_type=column.data_type) for column in columns] fields_csv = ",\n ".join(fields) # TODO : Sort and Dist keys?? sql = 'create table if not exists "{schema}"."{table}" (\n {fields}\n);'.format(schema=schema, table=table, fields=fields_csv) @@ -186,30 +247,6 @@ def create_schema_if_not_exists(self, schema_name): if schema_name not in schemas: self.create_schema(schema_name) - def get_varchar_size(self, column_type): - if column_type == 'text': - return 255 - matches = re.match(r'character varying\((\d+)\)', column_type) - if matches is None: - return None - else: - return int(matches.groups()[0]) - - def is_varchar_field(self, dtype): - return dtype.startswith('character') or dtype == 'text' - - def expand_column_to_type(self, source_type, dest_type): - if not self.is_varchar_field(source_type) or not self.is_varchar_field(dest_type): - return None - - source_size = self.get_varchar_size(source_type) - dest_size = self.get_varchar_size(dest_type) - - if source_size is not None and dest_size is not None and source_size > dest_size: - return 'character varying({})'.format(source_size) - else: - return None - def alter_column_type(self, schema, table, column_name, new_column_type): """ 1. Create a new column (w/ temp name and correct type) @@ -237,17 +274,13 @@ def alter_column_type(self, schema, table, column_name, new_column_type): return status def expand_column_types_if_needed(self, temp_table, to_schema, to_table): - source_columns = {k:v for (k,v) in self.get_columns_in_table(None, temp_table)} - dest_columns = {k:v for (k,v) in self.get_columns_in_table(to_schema, to_table)} - - for column_name, source_type in source_columns.items(): - dest_type = dest_columns.get(column_name) + source_columns = {col.name: col for col in self.get_columns_in_table(None, temp_table)} + dest_columns = {col.name: col for col in self.get_columns_in_table(to_schema, to_table)} - if dest_type is None: - continue + for column_name, source_column in source_columns.items(): + dest_column = dest_columns.get(column_name) - if source_type != dest_type: - new_type = self.expand_column_to_type(source_type, dest_type) - if new_type is not None: - self.alter_column_type(to_schema, to_table, column_name, new_type) + if source_columns.can_expand_to(dest_column): + new_type = Column.string_type(dest_column.string_size()) + self.alter_column_type(to_schema, to_table, column_name, new_type) diff --git a/dbt/targets.py b/dbt/targets.py index e5680351676..db1ff509dc1 100644 --- a/dbt/targets.py +++ b/dbt/targets.py @@ -118,16 +118,6 @@ class RedshiftTarget(BaseSQLTarget): def __init__(self, cfg, threads): super(RedshiftTarget, self).__init__(cfg, threads) - - def sql_columns_in_table(self, schema_name, table_name): - sql = """ - select "column" as column_name, "type" as "data_type" - from pg_table_def - where tablename = '{table_name}'""".format(table_name=table_name).strip() - if schema_name is not None: - sql += " AND schemaname = '{schema_name}'".format(schema_name) - return sql - @property def context(self): return { @@ -138,22 +128,6 @@ class PostgresTarget(BaseSQLTarget): def __init__(self, cfg, threads): super(PostgresTarget, self).__init__(cfg, threads) - def sql_columns_in_table(self, schema_name, table_name): - sql = """ - select column_name, - -- conform to redshift pg_table_def output - case when data_type = 'character varying' then - data_type || '(' || coalesce(character_maximum_length, 255) || ')' - else - data_type - end as data_type - from information_schema.columns - where table_name = '{table_name}'""".format(table_name=table_name).strip() - - if schema_name is not None: - sql += " AND table_schema = '{schema_name}'".format(schema_name=schema_name) - - return sql @property def context(self): diff --git a/dbt/task/archive.py b/dbt/task/archive.py index 067b8a091dd..c074b78a114 100644 --- a/dbt/task/archive.py +++ b/dbt/task/archive.py @@ -17,7 +17,7 @@ def compile(self): def run(self): self.compile() - runner = RunManager(self.project, self.project['target-path'], self.create_template.label) + runner = RunManager(self.project, self.project['target-path'], self.create_template.label, self.args.threads) results = runner.run_archive() diff --git a/dbt/templates.py b/dbt/templates.py index aad640acb6a..8322b369d5f 100644 --- a/dbt/templates.py +++ b/dbt/templates.py @@ -187,7 +187,7 @@ def wrap(self, opts): select {% raw %} - {% for (col, type) in get_columns_in_table(source_schema, source_table) %} + {% for col in get_columns_in_table(source_schema, source_table) %} "{{ col }}" {% if not loop.last %},{% endif %} {% endfor %}, {% endraw %} @@ -252,14 +252,14 @@ class ArchiveInsertTemplate(object): """ alter_template = """ -{% for (col, dtype) in missing_columns %} - alter table "{{ target_schema }}"."{{ target_table }}" add column "{{ col }}" {{ dtype }}; +{% for col in missing_columns %} + alter table "{{ target_schema }}"."{{ target_table }}" add column "{{ col.name }}" {{ col.data_type }}; {% endfor %} """ dest_cols = """ -{% for (col, type) in dest_columns %} - "{{ col }}" {% if not loop.last %},{% endif %} +{% for col in dest_columns %} + "{{ col.name }}" {% if not loop.last %},{% endif %} {% endfor %} """ From 40995cf5879c537af4dec5a7731e0d26972bfd6a Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Thu, 20 Oct 2016 14:56:38 -0400 Subject: [PATCH 26/33] fix archival sql templates --- dbt/schema.py | 2 +- dbt/templates.py | 13 +++++++++---- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/dbt/schema.py b/dbt/schema.py index 30d2e3c0a31..cf62bdfd6d8 100644 --- a/dbt/schema.py +++ b/dbt/schema.py @@ -280,7 +280,7 @@ def expand_column_types_if_needed(self, temp_table, to_schema, to_table): for column_name, source_column in source_columns.items(): dest_column = dest_columns.get(column_name) - if source_columns.can_expand_to(dest_column): + if dest_column is not None and source_column.can_expand_to(dest_column): new_type = Column.string_type(dest_column.string_size()) self.alter_column_type(to_schema, to_table, column_name, new_type) diff --git a/dbt/templates.py b/dbt/templates.py index 8322b369d5f..14214c377b9 100644 --- a/dbt/templates.py +++ b/dbt/templates.py @@ -174,7 +174,12 @@ def wrap(self, opts): with current_data as ( - select *, + select + {% raw %} + {% for col in get_columns_in_table(source_schema, source_table) %} + "{{ col.name }}" {% if not loop.last %},{% endif %} + {% endfor %}, + {% endraw %} {{ updated_at }} as dbt_updated_at, {{ unique_key }} as dbt_pk, {{ updated_at }} as valid_from, @@ -188,7 +193,7 @@ def wrap(self, opts): select {% raw %} {% for col in get_columns_in_table(source_schema, source_table) %} - "{{ col }}" {% if not loop.last %},{% endif %} + "{{ col.name }}" {% if not loop.last %},{% endif %} {% endfor %}, {% endraw %} {{ updated_at }} as dbt_updated_at, @@ -278,9 +283,9 @@ class ArchiveInsertTemplate(object): -- DBT_OPERATION {{ function: expand_column_types_if_needed, args: {{ temp_table: "{identifier}__dbt_archival_tmp", to_schema: "{schema}", to_table: "{identifier}"}} }} -update "{schema}"."{identifier}" as archive set valid_to = tmp.valid_to +update "{schema}"."{identifier}" set valid_to = tmp.valid_to from "{identifier}__dbt_archival_tmp" as tmp -where tmp.scd_id = archive.scd_id +where tmp.scd_id = "{schema}"."{identifier}".scd_id and change_type = 'update'; insert into "{schema}"."{identifier}" ( From c6e62cccf115d178e652762749016933a8a98fe1 Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Thu, 20 Oct 2016 15:25:51 -0400 Subject: [PATCH 27/33] fix column expansion logic --- dbt/schema.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/dbt/schema.py b/dbt/schema.py index cf62bdfd6d8..eaabf2f47c4 100644 --- a/dbt/schema.py +++ b/dbt/schema.py @@ -53,8 +53,7 @@ def can_expand_to(self, other_column): if not self.is_string() or not other_column.is_string(): return False - if other_column.string_size() > self.string_size(): - return True + return other_column.string_size() > self.string_size() @classmethod def string_type(cls, size): @@ -280,7 +279,7 @@ def expand_column_types_if_needed(self, temp_table, to_schema, to_table): for column_name, source_column in source_columns.items(): dest_column = dest_columns.get(column_name) - if dest_column is not None and source_column.can_expand_to(dest_column): - new_type = Column.string_type(dest_column.string_size()) + if dest_column is not None and dest_column.can_expand_to(source_column): + new_type = Column.string_type(source_column.string_size()) self.alter_column_type(to_schema, to_table, column_name, new_type) From f06ab7aa11affedcc42542c496c569078c7ba5d9 Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Thu, 20 Oct 2016 16:39:48 -0400 Subject: [PATCH 28/33] add log line for column expansion --- dbt/schema.py | 1 + 1 file changed, 1 insertion(+) diff --git a/dbt/schema.py b/dbt/schema.py index eaabf2f47c4..b91a024d4dd 100644 --- a/dbt/schema.py +++ b/dbt/schema.py @@ -281,5 +281,6 @@ def expand_column_types_if_needed(self, temp_table, to_schema, to_table): if dest_column is not None and dest_column.can_expand_to(source_column): new_type = Column.string_type(source_column.string_size()) + self.logger.debug("Changing col type from %s to %s in table %s.%s", dest_column.data_type, new_type, to_schema, to_table) self.alter_column_type(to_schema, to_table, column_name, new_type) From 96b012be13321dc11f719d955c7edbbd6a16a9da Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Thu, 20 Oct 2016 17:02:13 -0400 Subject: [PATCH 29/33] sort and dist keys (redshift + pg nop) --- dbt/archival.py | 2 -- dbt/schema.py | 5 +++-- dbt/targets.py | 27 +++++++++++++++++++++++++++ 3 files changed, 30 insertions(+), 4 deletions(-) diff --git a/dbt/archival.py b/dbt/archival.py index 6ee97cf5634..23852e56acc 100644 --- a/dbt/archival.py +++ b/dbt/archival.py @@ -30,8 +30,6 @@ def compile(self): if len(source_columns) == 0: raise RuntimeError('Source table "{}"."{}" does not exist'.format(source_schema, source_table)) - # create archive table if not exists! TODO: Sort & Dist keys! Hmmmm - extra_cols = [ dbt.schema.Column("valid_from", "timestamp", None), dbt.schema.Column("valid_to", "timestamp", None), diff --git a/dbt/schema.py b/dbt/schema.py index b91a024d4dd..bf3eec16b56 100644 --- a/dbt/schema.py +++ b/dbt/schema.py @@ -235,8 +235,9 @@ def get_missing_columns(self, from_schema, from_table, to_schema, to_table): def create_table(self, schema, table, columns, sort, dist): fields = ['"{field}" {data_type}'.format(field=column.name, data_type=column.data_type) for column in columns] fields_csv = ",\n ".join(fields) - # TODO : Sort and Dist keys?? - sql = 'create table if not exists "{schema}"."{table}" (\n {fields}\n);'.format(schema=schema, table=table, fields=fields_csv) + dist = self.target.dist_qualifier(dist) + sort = self.target.sort_qualifier('compound', sort) + sql = 'create table if not exists "{schema}"."{table}" (\n {fields}\n) {dist} {sort};'.format(schema=schema, table=table, fields=fields_csv, sort=sort, dist=dist) self.logger.info('creating table "%s"."%s"'.format(schema, table)) self.execute_and_handle_permissions(sql, table) diff --git a/dbt/targets.py b/dbt/targets.py index db1ff509dc1..a5cc0fa768d 100644 --- a/dbt/targets.py +++ b/dbt/targets.py @@ -124,10 +124,37 @@ def context(self): "sql_now": "getdate()" } + def sort_qualifier(self, sort_type, sort_keys): + + valid_sort_types = ['compound', 'interleaved'] + if sort_type not in valid_sort_types: + raise RuntimeError("Invalid sort_type given: {} -- must be one of {}".format(sort_type, valid_sort_types)) + + if type(sort_keys) == str: + sort_keys = [sort_keys] + + formatted_sort_keys = ['"{}"'.format(sort_key) for sort_key in sort_keys] + keys_csv = ', '.join(formatted_sort_keys) + + return "{sort_type} sortkey({keys_csv})".format(sort_type=sort_type, keys_csv=keys_csv) + + def dist_qualifier(self, dist_key): + dist_key = dist_key.strip().lower() + + if dist_key in ['all', 'even']: + return 'diststyle({})'.format(dist_key) + else: + return 'diststyle key distkey("{}")'.format(dist_key) + class PostgresTarget(BaseSQLTarget): def __init__(self, cfg, threads): super(PostgresTarget, self).__init__(cfg, threads) + def dist_qualifier(self, dist_key): + return '' + + def sort_qualifier(self, sort_type, sort_keys): + return '' @property def context(self): From 657461c3bbdd0a4035adf2f2119e3fbd8ddfdf5a Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Fri, 21 Oct 2016 10:31:19 -0400 Subject: [PATCH 30/33] don't use async emitter --- dbt/tracking.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dbt/tracking.py b/dbt/tracking.py index 0689cf4c8a8..515e2bcbc20 100644 --- a/dbt/tracking.py +++ b/dbt/tracking.py @@ -1,6 +1,6 @@ from dbt import version as dbt_version -from snowplow_tracker import Subject, Tracker, AsyncEmitter, logger as sp_logger +from snowplow_tracker import Subject, Tracker, Emitter, logger as sp_logger from snowplow_tracker import SelfDescribingJson, disable_contracts disable_contracts() @@ -24,7 +24,7 @@ PLATFORM_SPEC = "https://raw.githubusercontent.com/analyst-collective/dbt/master/events/schemas/com.fishtownanalytics/platform_context.json" RUN_MODEL_SPEC = "https://raw.githubusercontent.com/analyst-collective/dbt/master/events/schemas/com.fishtownanalytics/run_model_context.json" -emitter = AsyncEmitter(COLLECTOR_URL, protocol=COLLECTOR_PROTOCOL, buffer_size=1) +emitter = Emitter(COLLECTOR_URL, protocol=COLLECTOR_PROTOCOL, buffer_size=1) tracker = Tracker(emitter, namespace="cf", app_id="dbt") def __write_user(): From 5c32c4f84048a4b3350cf7e25cebfc5d634aec46 Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Fri, 21 Oct 2016 10:36:46 -0400 Subject: [PATCH 31/33] =?UTF-8?q?Bump=20version:=200.5.0=20=E2=86=92=200.5?= =?UTF-8?q?.1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .bumpversion.cfg | 2 +- dbt/version.py | 2 +- setup.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.bumpversion.cfg b/.bumpversion.cfg index 44bdf8b2851..ea8bb7d2aac 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 0.5.0 +current_version = 0.5.1 commit = True tag = True diff --git a/dbt/version.py b/dbt/version.py index e6e4cca12c0..4c3ade3fc57 100644 --- a/dbt/version.py +++ b/dbt/version.py @@ -53,7 +53,7 @@ def get_version_information(): def is_latest(): return installed == latest -__version__ = '0.5.0' +__version__ = '0.5.1' installed = get_version() latest = get_latest_version() diff --git a/setup.py b/setup.py index 757b6f4b3fc..ca0bbbf6dcb 100644 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ import os.path package_name = "dbt" -package_version = "0.5.0" +package_version = "0.5.1" setup( name=package_name, From 31b650e8cf45185c41fc240aae69b82de8b11f5d Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Fri, 21 Oct 2016 10:39:29 -0400 Subject: [PATCH 32/33] make BaseSQLTarget a new-style class --- dbt/targets.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbt/targets.py b/dbt/targets.py index a5cc0fa768d..3fd5dc4618f 100644 --- a/dbt/targets.py +++ b/dbt/targets.py @@ -13,7 +13,7 @@ BAD_THREADS_ERROR = """Invalid value given for "threads" in active run-target. Value given was {supplied} but it should be an int between {min_val} and {max_val}""" -class BaseSQLTarget: +class BaseSQLTarget(object): def __init__(self, cfg, threads): self.target_type = cfg['type'] self.host = cfg['host'] From 1b97b70cca4221fe48595cdd39e47d0b096d86c3 Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Fri, 21 Oct 2016 10:52:52 -0400 Subject: [PATCH 33/33] python27 compatibility --- dbt/source.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dbt/source.py b/dbt/source.py index 1d98d9239c9..e23384212a9 100644 --- a/dbt/source.py +++ b/dbt/source.py @@ -66,10 +66,11 @@ def get_archives(self, create_template): if 'archive' not in self.project: return [] - raw_source_schemas = self.project['archive'].copy() + raw_source_schemas = self.project['archive'] archives = [] for schema in raw_source_schemas: + schema = schema.copy() if 'tables' not in schema: continue