Skip to content

Commit

Permalink
Merge branch 'master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
padioca authored Jul 27, 2023
2 parents 1a3c8ac + 9f6963c commit d6f1585
Show file tree
Hide file tree
Showing 30 changed files with 252 additions and 286 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,16 @@
metadata,
)

from orchestrator.jobs.registry import generate_registry_reports, generate_oss_registry, generate_cloud_registry, generate_registry_entry
from orchestrator.jobs.registry import (
generate_registry_reports,
generate_oss_registry,
generate_cloud_registry,
generate_registry_entry,
add_new_metadata_partitions,
)
from orchestrator.jobs.connector_test_report import generate_nightly_reports, generate_connector_test_summary_reports
from orchestrator.sensors.registry import registry_updated_sensor
from orchestrator.sensors.gcs import new_gcs_blobs_sensor, new_gcs_blobs_partition_sensor
from orchestrator.sensors.gcs import new_gcs_blobs_sensor

from orchestrator.config import (
REPORT_FOLDER,
Expand Down Expand Up @@ -148,25 +154,21 @@
gcs_blobs_resource_key="latest_nightly_complete_file_blobs",
interval=(1 * 60 * 60),
),
new_gcs_blobs_partition_sensor(
job=generate_registry_entry,
resources_def=METADATA_RESOURCE_TREE,
partitions_def=registry_entry.metadata_partitions_def,
gcs_blobs_resource_key="all_metadata_file_blobs",
interval=(10 * 60),
),
new_gcs_blobs_partition_sensor(
job=generate_registry_entry,
resources_def=METADATA_RESOURCE_TREE,
partitions_def=registry_entry.metadata_partitions_def,
gcs_blobs_resource_key="latest_metadata_file_blobs",
interval=60,
),
]

SCHEDULES = [ScheduleDefinition(job=generate_connector_test_summary_reports, cron_schedule="@hourly")]
SCHEDULES = [
ScheduleDefinition(job=add_new_metadata_partitions, cron_schedule="* * * * *"),
ScheduleDefinition(job=generate_connector_test_summary_reports, cron_schedule="@hourly"),
]

JOBS = [generate_registry_reports, generate_oss_registry, generate_cloud_registry, generate_registry_entry, generate_nightly_reports]
JOBS = [
generate_registry_reports,
generate_oss_registry,
generate_cloud_registry,
generate_registry_entry,
generate_nightly_reports,
add_new_metadata_partitions,
]

"""
START HERE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from pydantic import ValidationError
from google.cloud import storage
from dagster_gcp.gcs.file_manager import GCSFileManager, GCSFileHandle
from dagster import DynamicPartitionsDefinition, asset, OpExecutionContext, Output, MetadataValue
from dagster import DynamicPartitionsDefinition, asset, OpExecutionContext, Output, MetadataValue, AutoMaterializePolicy
from pydash.objects import get

from metadata_service.spec_cache import get_cached_spec
Expand All @@ -18,7 +18,7 @@
from orchestrator.utils.object_helpers import deep_copy_params
from orchestrator.utils.dagster_helpers import OutputDataFrame
from orchestrator.models.metadata import MetadataDefinition, LatestMetadataEntry
from orchestrator.config import get_public_url_for_gcs_file, VALID_REGISTRIES
from orchestrator.config import get_public_url_for_gcs_file, VALID_REGISTRIES, MAX_METADATA_PARTITION_RUN_REQUEST

from typing import List, Optional, Tuple, Union

Expand Down Expand Up @@ -304,57 +304,76 @@ def safe_parse_metadata_definition(metadata_blob: storage.Blob) -> Optional[Meta


@asset(
required_resource_keys={"all_metadata_file_blobs"}, group_name=GROUP_NAME, partitions_def=metadata_partitions_def, output_required=False
required_resource_keys={"all_metadata_file_blobs"},
group_name=GROUP_NAME,
partitions_def=metadata_partitions_def,
output_required=False,
auto_materialize_policy=AutoMaterializePolicy.eager(max_materializations_per_minute=MAX_METADATA_PARTITION_RUN_REQUEST),
)
def metadata_entry(context: OpExecutionContext) -> Output[LatestMetadataEntry]:
def metadata_entry(context: OpExecutionContext) -> Output[Optional[LatestMetadataEntry]]:
"""Parse and compute the LatestMetadataEntry for the given metadata file."""
etag = context.partition_key
context.log.info(f"Processing metadata file with etag {etag}")
all_metadata_file_blobs = context.resources.all_metadata_file_blobs

# find the blob with the matching etag
matching_blob = next((blob for blob in all_metadata_file_blobs if blob.etag == etag), None)
metadata_file_path = matching_blob.name

if not matching_blob:
raise Exception(f"Could not find blob with etag {etag}")

metadata_file_path = matching_blob.name
context.log.info(f"Found metadata file with path {metadata_file_path} for etag {etag}")

# read the matching_blob into a metadata definition
metadata_def = safe_parse_metadata_definition(matching_blob)

dagster_metadata = {
"bucket_name": matching_blob.bucket.name,
"file_path": metadata_file_path,
"partition_key": etag,
"invalid_metadata": metadata_def is None,
}

# read the matching_blob into a metadata definition
metadata_def = safe_parse_metadata_definition(matching_blob)

# return only if the metadata definition is valid
if not metadata_def:
context.log.warn(f"Could not parse metadata definition for {metadata_file_path}")
else:
icon_file_path = metadata_file_path.replace(METADATA_FILE_NAME, ICON_FILE_NAME)
icon_blob = matching_blob.bucket.blob(icon_file_path)
return Output(value=None, metadata=dagster_metadata)

icon_file_path = metadata_file_path.replace(METADATA_FILE_NAME, ICON_FILE_NAME)
icon_blob = matching_blob.bucket.blob(icon_file_path)

icon_url = (
get_public_url_for_gcs_file(icon_blob.bucket.name, icon_blob.name, os.getenv("METADATA_CDN_BASE_URL"))
if icon_blob.exists()
else None
)
icon_url = (
get_public_url_for_gcs_file(icon_blob.bucket.name, icon_blob.name, os.getenv("METADATA_CDN_BASE_URL"))
if icon_blob.exists()
else None
)

metadata_entry = LatestMetadataEntry(
metadata_definition=metadata_def,
icon_url=icon_url,
bucket_name=matching_blob.bucket.name,
file_path=metadata_file_path,
)
metadata_entry = LatestMetadataEntry(
metadata_definition=metadata_def,
icon_url=icon_url,
bucket_name=matching_blob.bucket.name,
file_path=metadata_file_path,
)

yield Output(value=metadata_entry, metadata=dagster_metadata)
return Output(value=metadata_entry, metadata=dagster_metadata)


@asset(required_resource_keys={"root_metadata_directory_manager"}, group_name=GROUP_NAME, partitions_def=metadata_partitions_def)
def registry_entry(context: OpExecutionContext, metadata_entry: LatestMetadataEntry, cached_specs: pd.DataFrame) -> Output[dict]:
@asset(
required_resource_keys={"root_metadata_directory_manager"},
group_name=GROUP_NAME,
partitions_def=metadata_partitions_def,
auto_materialize_policy=AutoMaterializePolicy.eager(max_materializations_per_minute=MAX_METADATA_PARTITION_RUN_REQUEST),
)
def registry_entry(
context: OpExecutionContext, metadata_entry: Optional[LatestMetadataEntry], cached_specs: pd.DataFrame
) -> Output[Optional[dict]]:
"""
Generate the registry entry files from the given metadata file, and persist it to GCS.
"""
if not metadata_entry:
# if the metadata entry is invalid, return an empty dict
return Output(metadata={"empty_metadata": True}, value=None)

root_metadata_directory_manager = context.resources.root_metadata_directory_manager
enabled_registries, disabled_registries = get_registry_status_lists(metadata_entry)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from dagster import asset
from dagster import asset, AutoMaterializePolicy, FreshnessPolicy
import pandas as pd
from metadata_service.spec_cache import list_cached_specs
from orchestrator.utils.dagster_helpers import OutputDataFrame, output_dataframe
Expand All @@ -7,6 +7,10 @@
GROUP_NAME = "spec_cache"


@asset(group_name=GROUP_NAME)
@asset(
group_name=GROUP_NAME,
auto_materialize_policy=AutoMaterializePolicy.eager(max_materializations_per_minute=30),
freshness_policy=FreshnessPolicy(maximum_lag_minutes=1),
)
def cached_specs() -> OutputDataFrame:
return output_dataframe(pd.DataFrame(list_cached_specs()))
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
CONNECTORS_PATH = "airbyte-integrations/connectors"
CONNECTOR_TEST_SUMMARY_FOLDER = "test_summary"

MAX_METADATA_PARTITION_RUN_REQUEST = 50


def get_public_url_for_gcs_file(bucket_name: str, file_path: str, cdn_url: Optional[str] = None) -> str:
"""Get the public URL to a file in the GCS bucket.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from dagster import define_asset_job, AssetSelection
from dagster import define_asset_job, AssetSelection, job, SkipReason, op
from orchestrator.assets import registry_entry
from orchestrator.config import MAX_METADATA_PARTITION_RUN_REQUEST

oss_registry_inclusive = AssetSelection.keys("persisted_oss_registry", "specs_secrets_mask_yaml").upstream()
generate_oss_registry = define_asset_job(name="generate_oss_registry", selection=oss_registry_inclusive)
Expand All @@ -16,3 +17,36 @@
selection=registry_entry_inclusive,
partitions_def=registry_entry.metadata_partitions_def,
)


@op(required_resource_keys={"all_metadata_file_blobs"})
def add_new_metadata_partitions_op(context):
"""
This op is responsible for polling for new metadata files and adding their etag to the dynamic partition.
"""
all_metadata_file_blobs = context.resources.all_metadata_file_blobs
partition_name = registry_entry.metadata_partitions_def.name

new_etags_found = [
blob.etag for blob in all_metadata_file_blobs if not context.instance.has_dynamic_partition(partition_name, blob.etag)
]

context.log.info(f"New etags found: {new_etags_found}")

if not new_etags_found:
return SkipReason(f"No new metadata files to process in GCS bucket")

# if there are more than the MAX_METADATA_PARTITION_RUN_REQUEST, we need to split them into multiple runs
if len(new_etags_found) > MAX_METADATA_PARTITION_RUN_REQUEST:
new_etags_found = new_etags_found[:MAX_METADATA_PARTITION_RUN_REQUEST]
context.log.info(f"Only processing first {MAX_METADATA_PARTITION_RUN_REQUEST} new blobs: {new_etags_found}")

context.instance.add_dynamic_partitions(partition_name, new_etags_found)


@job
def add_new_metadata_partitions():
"""
This job is responsible for polling for new metadata files and adding their etag to the dynamic partition.
"""
add_new_metadata_partitions_op()
Loading

0 comments on commit d6f1585

Please sign in to comment.