Skip to content

Commit

Permalink
Exposure dependency resolution by fully-qualified names
Browse files Browse the repository at this point in the history
  • Loading branch information
gouline committed Oct 16, 2024
1 parent 160e2fb commit 10bb875
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 7 deletions.
36 changes: 30 additions & 6 deletions dbtmetabase/_exposures.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

from .errors import ArgumentError
from .format import Filter, dump_yaml, safe_description, safe_name
from .manifest import Manifest
from .manifest import DEFAULT_SCHEMA, Manifest

_RESOURCE_VERSION = 2

Expand Down Expand Up @@ -79,8 +79,18 @@ def extract_exposures(
models = self.manifest.read_models()

ctx = self.__Context(
model_refs={m.alias.lower(): m.ref for m in models if m.ref},
table_names={t["id"]: t["name"] for t in self.metabase.get_tables()},
model_refs={m.alias_path.lower(): m.ref for m in models if m.ref},
database_names={d["id"]: d["name"] for d in self.metabase.get_databases()},
table_names={
t["id"]: ".".join(
[
t.get("db", {}).get("name", ""),
t.get("schema", DEFAULT_SCHEMA),
t["name"],
]
).lower()
for t in self.metabase.get_tables()
},
)

exposures = []
Expand Down Expand Up @@ -288,13 +298,26 @@ def __extract_card_exposures(

# Parse SQL for exposures through FROM or JOIN clauses
for sql_ref in re.findall(_EXPOSURE_PARSER, native_query):
# Grab just the table / model name
parsed_model = sql_ref.split(".")[-1].strip('"').lower()
# DATABASE.schema.table -> [database, schema, table]
parsed_model_path = [
s.strip('"').lower() for s in sql_ref.split(".")
]

# Scrub CTEs (qualified sql_refs can not reference CTEs)
if parsed_model in ctes and "." not in sql_ref:
if parsed_model_path[-1] in ctes and "." not in sql_ref:
continue

# Missing schema -> use default schema
if len(parsed_model_path) < 2:
parsed_model_path.insert(0, DEFAULT_SCHEMA.lower())
# Missing database -> use query's database
if len(parsed_model_path) < 3:
database_name = ctx.database_names.get(query["database"], "")
parsed_model_path.insert(0, database_name.lower())

# Fully-qualified database.schema.table
parsed_model = ".".join(parsed_model_path)

# Verify this is one of our parsed refable models so exposures dont break the DAG
if not ctx.model_refs.get(parsed_model):
continue
Expand Down Expand Up @@ -429,4 +452,5 @@ def __write_exposures(
@dc.dataclass
class __Context:
model_refs: Mapping[str, str]
database_names: Mapping[str, str]
table_names: Mapping[str, str]
4 changes: 4 additions & 0 deletions dbtmetabase/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,10 @@ def ref(self) -> Optional[str]:
return f"source('{self.source}', '{self.name}')"
return None

@property
def alias_path(self) -> str:
return ".".join([self.database, self.schema or DEFAULT_SCHEMA, self.alias])

def format_description(
self,
append_tags: bool = False,
Expand Down
6 changes: 5 additions & 1 deletion dbtmetabase/metabase.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,13 @@ def _api(

return response_json

def get_databases(self) -> Sequence[Mapping]:
"""Retrieves all databases."""
return list(self._api("get", "/api/database"))

def find_database(self, name: str) -> Optional[Mapping]:
"""Finds database by name attribute or returns none."""
for api_database in list(self._api("get", "/api/database")):
for api_database in self.get_databases():
if api_database["name"].upper() == name.upper():
return api_database
return None
Expand Down

0 comments on commit 10bb875

Please sign in to comment.