Replies: 11 comments 8 replies
-
Hey @abkfenris - thanks for this really thoughtful writeup. Just to clarify and make sure I'm parsing it correctly, when you say "you can access metadata yielded from within an op in an IOManager", should "can" be "cannot"? |
Beta Was this translation helpful? Give feedback.
-
@abkfenris all the issues you're bringing up here make a lot of sense. The most "controversial" question here, in my mind, is whether runtime metadata yielded for an output should be available to the downstream Commenting on your proposals:
This depends somewhat on the output to the above question, but, at the very least, I agree that downstream ops should have access to the metadata on the upstream output definition. E.e. with something like your suggestion
I think there's value in being able to distinguish between "definition-level" metadata and "runtime" metadata, especially in debugging. My hunch is that we should keep these separate in our internal data model, though we could provide a utility that returns a merged version.
Setting an asset key dynamically on an output makes sense to me. I would lean towards
Do you have examples in mind of metadata that would be io manager-specific vs. not io manager-specific? |
Beta Was this translation helpful? Give feedback.
-
Great writeup! In general, I agree with all the points brought up by @abkfenris - and totally feel the same pain in my use cases with regards to metadata. |
Beta Was this translation helpful? Give feedback.
-
When creating a class like
I observe this failure:
when executing:
in an IO manager to check the output type (and handle it dynamically. Is there a way to fix it? |
Beta Was this translation helpful? Give feedback.
-
For a sensor, dagster has the notion of state and i.e. in the case of a backfill, a specific run can easily be started from dagit. |
Beta Was this translation helpful? Give feedback.
-
I have two assets
I am using the
this fails with the following exception:
When instead trying to manually supply the type information:
the error message switches to:
The type validation works for a non-generic type like:
How could dagster potentially support generic types? |
Beta Was this translation helpful? Give feedback.
-
It will also be important to enable overwriting the "latest update" state field - perhaps from some configuration value. |
Beta Was this translation helpful? Give feedback.
-
Has there been any progress on this with https://docs.dagster.io/concepts/ops-jobs-graphs/dynamic-graphs#returning-dynamic-outputs from dagster 0.15.0 @sryza ? |
Beta Was this translation helpful? Give feedback.
-
+1 and would love to access metadata from the yielded Output. My use case (today) is the following: I have to build a pipeline that does a query in a database, parse the data and save it to a Google Sheet (for our business users). The catch is that I have to save the data as multiple sheets, each one with a partition of the data itself. Borrowing from the OP, my use case would be something like this: class PandastoSheetsIOManager(IOManager):
def handle_output(self, context, obj: pd.DataFrame):
# logic to save the data using gspread
sheets = ...
sheets.save(file_name=...)
def load_input(self, context):
pass
@io_manager
def pandas_sheets_io_manager(init_context):
return PandastoSheetsIOManager()
@op(out=DynamicOut(io_manager_key="sheets_io"))
def sheet_op(context):
# do a query in database and return it as a Pandas df
df = context.resources.query(...)
# transform the data as needed
df = ...
# select the unique users that we are going to separate into different sheets
users = df['Users'].unique()
# output the pieces of each dataframe
for user in users:
yield DynamicOutput(
df[df["Users"] == user],
mapping_key=user,
metadata={"file_name": user},
)
@job(resource_defs={"sheets_io": pandas_sheets_io_manager})
def sheets_job():
sheet_op() In this case, technically I can obtain the information in the IOManager about the class PandastoSheetsIOManager(IOManager):
def handle_output(self, context, obj: pd.DataFrame):
# since the obj DataFrame has a single user in it per construction, we can obtain it
user = obj['User'].unique()[0]
# logic to save the data using gspread
sheet = ...
sheet.save(file_name=user) However, I see the solution above as an anti-pattern. The IOManager should handle the logic between dagster and the storage solution without coupling any OP-logic in it. In the above solution I am essentially forbidding any reutilization of that IOManager (what if I had another job to export Sheets of another entity, like The ideal would be that the Output metadata to be available to the IOManager. In that scenario, we can leverage the metadata to make informed choices of where to store the data: class PandastoSheetsIOManager(IOManager):
def handle_output(self, context, obj: pd.DataFrame):
# imagine that the output metadata is available as the output_metadata property
file_name = context.output_metadata.get("sheet_name", "sheet") # type: ignore
# now we can use the metadata to save the sheet. if it was not provided, we have a default (see above)
sheet = ...
sheet.save(file_name=file_name) This feature (accessing the Output metadata) was so "obvious" to me that I was really surprised when I found out that dagster wasn't doing that already (since the framework gives so much power and flexibility with its contexts and data). Really hope that this feature gets implemented in the near-future. Meanwhile, I will just do the same thing as what I wrote in the second-to-last code-block and hope nobody asks me to do more Sheets reports :) |
Beta Was this translation helpful? Give feedback.
-
Someone had to do it. # license: WTFPL
class MetadataIOManager(IOManager):
def load_input(self, context: InputContext) -> RawMetadataValue:
e = context.instance.event_log_storage.get_event_records(EventRecordsFilter(
event_type=DagsterEventType.ASSET_MATERIALIZATION,
asset_key=context.asset_key,
asset_partitions=[context.partition_key],
), limit=1)
if len(e) == 0:
raise Exception('Asset materialization event not found.')
context.log.info("Using materialization event from run %s",
e[0].event_log_entry.run_id)
d = e[0].event_log_entry.dagster_event.event_specific_data
return d.materialization.metadata['value'].value
def handle_output(self, context: OutputContext, obj: RawMetadataValue) -> None:
context.add_output_metadata({'value': obj})
@io_manager(
config_schema={},
description="IO manager that stores and retrieves values from asset metadata.",
)
def metadata_io_manager(init_context: InitResourceContext):
return MetadataIOManager() |
Beta Was this translation helpful? Give feedback.
-
Hi there, Wondering if anyone is still considering this? It seems very strange that I can't override my IO Manager path for example across a partitioned asset. There are various use cases where this is a key requirement. Thanks, |
Beta Was this translation helpful? Give feedback.
-
Right now there is two different sources of metadata for outputs, in the output definition and what is yielded from within the body of an op or asset.
Currently you cannot access either set of metadata from a downstream op, and you
cancannot access metadata yielded from within an op in an IOManager. The second is specifically making it hard to use IOManagers for my use case (and others from what I've seen in the Slack, issues, and discussions).With metadata otherwise being a first class citizen in Dagster this seems like an important bit to standardize.
Current state of output metadata in 0.14.1
A test of how things work in 0.14.1, I created an op that has both an output definition metadata, and that logs metadata, and an IOManager that logs the metadata that it receives. From within the definition, the metadata values are
A
, and those emitted within the op containB
:Currently the IOManager has no way of seeing what metadata about an output is emitted from within an op, however the step output only shows the metadata that is emitted from within the op. Downstream ops don't get access to either set of metadata. Additionally the two sources of metadata can lead to confusion.
Some impacts of the current state of output metadata
Right now if you want to pass a specific path into an IOManager, you can pass a fixed one in via the output definition metadata.
However if you have a dynamically generated path (say you are archiving data), you either need to create an IOManager that can figure out a path based off of a partition, or you need to pass custom classes around containing a path.
This promotes everyone to come up with their own solution to this problem which makes it harder to share and reuse IOManagers. If there was a way to pass a path generated by an op to the IOManager it would help solve these.
Personally, for my usage, much of my data is being archived to be served by external services. Either I can subclass IOManager for each output to generate the correct path and formatting, or I could use a few more general purpose IOManagers and pass paths in.
Some ideas
I've got a few ideas on how to manage these issues, but these definitely aren't exhaustive.
Merging output metadata
If metadata is both defined on an output definition and logged from within an op, the metadata should be merged. I propose that the logged metadata takes precedence, as the op has more information about the data/context when it yields the metadata, than what is in the definition.
IOManager specific metadata/configuration
Part of the issue seems to be that while there are both two sources of output metadata, there are also two different types of output metadata. There is metadata that describes the output itself, then there is metadata that is used by an IOManager.
I propose that there is separate IOManager metadata, or configuration that can both be defined in the output definition and yielded from an op. This IO metadata wouldn't be part of the 'public' metadata records that are either shown as part of the STEP_OUTPUT events, or asset records.
IOManagers would be able to get access to public metadata via
context.metadata
and then for the IOManager specific metadata viacontext.io_metadata
or similar in both the input and output contexts.For instance, when swapping from local storage to cloud storage, you probably don't want to include a specific path in the op generated metadata, but you may want to give the IOManager / filesystem a hint as to where the file should go, and then get the absolute path and/or url as part of the regular metadata. Standardizing on a name for the key would help here, see #6763
context.upstream_metadata
orcontext.upstream_metadata_for_input()
Within an op, there are times that you need access to metadata from an output. Previously the question was 'which metadata', but if the metadata gets merged, then this is an easier question to answer.
If it's cheap enough to provide all the metadata before running an op, adding a property
context.upstream_metadata
with a dict of input names to dicts of metadata. Otherwise a method that returns the metadata for a given input name.Similar properties or methods could give ops access to their own input and output metadata.
context.set_output_asset_key()
AssetKey also are limited by this separation between definition and logged metadata. Similar to how there are dynamic paths that some data should be persisted to, some AssetKeys may be dynamic also. Currently you can connect an asset key to an output is if it is in the output definition, or if the IOManager sets it, but not in a op.
Some related conversations/issues
file_key
to op output containing path for IO manager #6763Beta Was this translation helpful? Give feedback.
All reactions