Skip to content

Commit

Permalink
Merge pull request #188 from analyst-collective/development
Browse files Browse the repository at this point in the history
Release 0.5.1
  • Loading branch information
drewbanin authored Oct 21, 2016
2 parents d50bb54 + 17e5539 commit 4739be2
Show file tree
Hide file tree
Showing 25 changed files with 959 additions and 139 deletions.
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.5.0
current_version = 0.5.1
commit = True
tag = True

Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ dbt (data build tool) helps analysts write reliable, modular code using a workfl
- [What is dbt]?
- Read the [dbt viewpoint]
- [Installation]
- Join the [chat][gittr-url] on Gittr.
- Join the [chat][slack-url] on Slack for live questions and support.


## Code of Conduct
Expand All @@ -18,7 +18,7 @@ Everyone interacting in the dbt project's codebases, issue trackers, chat rooms,


[PyPA Code of Conduct]: https://www.pypa.io/en/latest/code-of-conduct/
[gittr-url]: https://gitter.im/analyst-collective/dbt
[slack-url]: http://ac-slackin.herokuapp.com/
[Installation]: http://dbt.readthedocs.io/en/master/guide/setup/
[What is dbt]: http://dbt.readthedocs.io/en/master/about/overview/
[dbt viewpoint]: http://dbt.readthedocs.io/en/master/about/viewpoint/
65 changes: 65 additions & 0 deletions dbt/archival.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@

from __future__ import print_function
import dbt.targets
import dbt.schema
import dbt.templates
import jinja2


class Archival(object):

def __init__(self, project, archive_model):
self.archive_model = archive_model
self.project = project

self.target = dbt.targets.get_target(self.project.run_environment())
self.schema = dbt.schema.Schema(self.project, self.target)

def compile(self):
source_schema = self.archive_model.source_schema
target_schema = self.archive_model.target_schema
source_table = self.archive_model.source_table
target_table = self.archive_model.target_table
unique_key = self.archive_model.unique_key
updated_at = self.archive_model.updated_at

self.schema.create_schema(target_schema)

source_columns = self.schema.get_columns_in_table(source_schema, source_table)

if len(source_columns) == 0:
raise RuntimeError('Source table "{}"."{}" does not exist'.format(source_schema, source_table))

extra_cols = [
dbt.schema.Column("valid_from", "timestamp", None),
dbt.schema.Column("valid_to", "timestamp", None),
dbt.schema.Column("scd_id","text", None),
dbt.schema.Column("dbt_updated_at","timestamp", None)
]

dest_columns = source_columns + extra_cols
self.schema.create_table(target_schema, target_table, dest_columns, sort=updated_at, dist=unique_key)

env = jinja2.Environment()

ctx = {
"columns" : source_columns,
"updated_at" : updated_at,
"unique_key" : unique_key,
"source_schema" : source_schema,
"source_table" : source_table,
"target_schema" : target_schema,
"target_table" : target_table
}

base_query = dbt.templates.SCDArchiveTemplate
template = env.from_string(base_query, globals=ctx)
rendered = template.render(ctx)

return rendered

def runtime_compile(self, compiled_model):
context = self.context.copy()
context.update(model.context())
model.compile(context)

82 changes: 74 additions & 8 deletions dbt/compilation.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,19 @@
from dbt.source import Source
from dbt.utils import find_model_by_fqn, find_model_by_name, dependency_projects, split_path, This, Var, compiler_error
from dbt.linker import Linker
import dbt.targets
import dbt.templates
import time
import sqlparse

CompilableEntities = ["models", "tests", "archives", "analyses"]

class Compiler(object):
def __init__(self, project, create_template_class):
self.project = project
self.create_template = create_template_class()
self.macro_generator = None
self.target = self.get_target()

def initialize(self):
if not os.path.exists(self.project['target-path']):
Expand All @@ -24,7 +30,7 @@ def initialize(self):

def get_target(self):
target_cfg = self.project.run_environment()
return RedshiftTarget(target_cfg)
return dbt.targets.get_target(target_cfg)

def model_sources(self, this_project, own_project=None):
if own_project is None:
Expand All @@ -35,9 +41,21 @@ def model_sources(self, this_project, own_project=None):
return Source(this_project, own_project=own_project).get_models(paths, self.create_template)
elif self.create_template.label == 'test':
return Source(this_project, own_project=own_project).get_test_models(paths, self.create_template)
elif self.create_template.label == 'archive':
return []
else:
raise RuntimeError("unexpected create template type: '{}'".format(self.create_template.label))

def get_macros(self, this_project, own_project=None):
if own_project is None:
own_project = this_project
paths = own_project.get('macro-paths', [])
return Source(this_project, own_project=own_project).get_macros(paths)

def get_archives(self, project):
archive_template = dbt.templates.ArchiveInsertTemplate()
return Source(project, own_project=project).get_archives(archive_template)

def project_schemas(self):
source_paths = self.project.get('source-paths', [])
return Source(self.project).get_schemas(source_paths)
Expand Down Expand Up @@ -147,16 +165,30 @@ def wrapped_do_ref(*args):

def get_context(self, linker, model, models):
context = self.project.context()

# built-ins
context['ref'] = self.__ref(linker, context, model, models)
context['config'] = self.__model_config(model, linker)
context['this'] = This(context['env']['schema'], model.immediate_name, model.name)
context['compiled_at'] = time.strftime('%Y-%m-%d %H:%M:%S')
context['var'] = Var(model, context=context)

# these get re-interpolated at runtime!
context['run_started_at'] = '{{ run_started_at }}'
context['invocation_id'] = '{{ invocation_id }}'

# add in context from run target
context.update(self.target.context)

# add in macros (can we cache these somehow?)
for macro_name, macro in self.macro_generator(context):
context[macro_name] = macro

return context

def compile_model(self, linker, model, models):
try:
jinja = jinja2.Environment(loader=jinja2.FileSystemLoader(searchpath=model.root_dir))
fs_loader = jinja2.FileSystemLoader(searchpath=model.root_dir)
jinja = jinja2.Environment(loader=fs_loader)

# this is a dumb jinja2 bug -- on windows, forward slashes are EXPECTED
posix_filepath = '/'.join(split_path(model.rel_filepath))
Expand All @@ -169,8 +201,8 @@ def compile_model(self, linker, model, models):

return rendered

def write_graph_file(self, linker):
filename = 'graph-{}.yml'.format(self.create_template.label)
def write_graph_file(self, linker, label):
filename = 'graph-{}.yml'.format(label)
graph_path = os.path.join(self.project['target-path'], filename)
linker.write_graph(graph_path)

Expand Down Expand Up @@ -297,13 +329,39 @@ def compile_schema_tests(self, linker):

return written_tests

def generate_macros(self, all_macros):
def do_gen(ctx):
macros = []
for macro in all_macros:
new_macros = macro.get_macros(ctx)
macros.extend(new_macros)
return macros
return do_gen

def compile_archives(self):
linker = Linker()
all_archives = self.get_archives(self.project)

for archive in all_archives:
sql = archive.compile()
fqn = tuple(archive.fqn)
linker.update_node_data(fqn, archive.serialize())
self.__write(archive.build_path(), sql)

self.write_graph_file(linker, 'archive')
return all_archives

def compile(self, dry=False):
linker = Linker()

all_models = self.model_sources(this_project=self.project)
all_macros = self.get_macros(this_project=self.project)

for project in dependency_projects(self.project):
all_models.extend(self.model_sources(this_project=self.project, own_project=project))
all_macros.extend(self.get_macros(this_project=self.project, own_project=project))

self.macro_generator = self.generate_macros(all_macros)

enabled_models = [model for model in all_models if model.is_enabled]

Expand All @@ -314,11 +372,19 @@ def compile(self, dry=False):

self.validate_models_unique(compiled_models)
self.validate_models_unique(written_schema_tests)
self.write_graph_file(linker)
self.write_graph_file(linker, self.create_template.label)

if self.create_template.label != 'test':
if self.create_template.label not in ['test', 'archive']:
written_analyses = self.compile_analyses(linker, compiled_models)
else:
written_analyses = []

return len(written_models), len(written_schema_tests), len(written_analyses)

compiled_archives = self.compile_archives()

return {
"models": len(written_models),
"tests" : len(written_schema_tests),
"archives": len(compiled_archives),
"analyses" : len(written_analyses)
}
34 changes: 34 additions & 0 deletions dbt/compiled_model.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import hashlib
import jinja2
from dbt.utils import compiler_error

class CompiledModel(object):
def __init__(self, fqn, data):
self.fqn = fqn
self.data = data
self.nice_name = ".".join(fqn)

# these are set just before the models are executed
self.tmp_drop_type = None
Expand All @@ -12,6 +15,7 @@ def __init__(self, fqn, data):

self.skip = False
self._contents = None
self.compiled_contents = None

def __getitem__(self, key):
return self.data[key]
Expand All @@ -20,6 +24,9 @@ def hashed_name(self):
fqn_string = ".".join(self.fqn)
return hashlib.md5(fqn_string.encode('utf-8')).hexdigest()

def context(self):
return self.data

def hashed_contents(self):
return hashlib.md5(self.contents.encode('utf-8')).hexdigest()

Expand All @@ -39,6 +46,15 @@ def contents(self):
self._contents = fh.read()
return self._contents

def compile(self, context):
contents = self.contents
try:
env = jinja2.Environment()
self.compiled_contents = env.from_string(contents).render(context)
return self.compiled_contents
except jinja2.exceptions.TemplateSyntaxError as e:
compiler_error(self, str(e))

@property
def materialization(self):
return self.data['materialized']
Expand Down Expand Up @@ -98,13 +114,31 @@ def prepare(self, existing, target):
def __repr__(self):
return "<CompiledModel {}.{}: {}>".format(self.data['project_name'], self.name, self.data['build_path'])

class CompiledArchive(CompiledModel):
def __init__(self, fqn, data):
super(CompiledArchive, self).__init__(fqn, data)

def should_rename(self):
return False

def should_execute(self):
return True

def prepare(self, existing, target):
self.target = target

def __repr__(self):
return "<CompiledArchive {}.{}: {}>".format(self.data['project_name'], self.name, self.data['build_path'])

def make_compiled_model(fqn, data):
run_type = data['dbt_run_type']

if run_type in ['run', 'dry-run']:
return CompiledModel(fqn, data)
elif run_type == 'test':
return CompiledTest(fqn, data)
elif run_type == 'archive':
return CompiledArchive(fqn, data)
else:
raise RuntimeError("invalid run_type given: {}".format(run_type))

Expand Down
7 changes: 7 additions & 0 deletions dbt/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import dbt.task.init as init_task
import dbt.task.seed as seed_task
import dbt.task.test as test_task
import dbt.task.archive as archive_task
import dbt.tracking


Expand Down Expand Up @@ -71,9 +72,14 @@ def handle(args):
sub = subs.add_parser('deps', parents=[base_subparser])
sub.set_defaults(cls=deps_task.DepsTask, which='deps')

sub = subs.add_parser('archive', parents=[base_subparser])
sub.add_argument('--threads', type=int, required=False, help="Specify number of threads to use while archiving tables. Overrides settings in profiles.yml")
sub.set_defaults(cls=archive_task.ArchiveTask, which='archive')

sub = subs.add_parser('run', parents=[base_subparser])
sub.add_argument('--dry', action='store_true', help="'dry run' models")
sub.add_argument('--models', required=False, nargs='+', help="Specify the models to run. All models depending on these models will also be run")
sub.add_argument('--threads', type=int, required=False, help="Specify number of threads to use while executing models. Overrides settings in profiles.yml")
sub.set_defaults(cls=run_task.RunTask, which='run')

sub = subs.add_parser('seed', parents=[base_subparser])
Expand All @@ -83,6 +89,7 @@ def handle(args):
sub = subs.add_parser('test', parents=[base_subparser])
sub.add_argument('--skip-test-creates', action='store_true', help="Don't create temporary views to validate model SQL")
sub.add_argument('--validate', action='store_true', help='Run constraint validations from schema.yml files')
sub.add_argument('--threads', type=int, required=False, help="Specify number of threads to use while executing tests. Overrides settings in profiles.yml")
sub.set_defaults(cls=test_task.TestTask, which='test')

if len(args) == 0: return p.print_help()
Expand Down
Loading

0 comments on commit 4739be2

Please sign in to comment.