-
Notifications
You must be signed in to change notification settings - Fork 297
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BUG] Convert protobuf to literal as remote exec #2925
base: master
Are you sure you want to change the base?
[BUG] Convert protobuf to literal as remote exec #2925
Conversation
Signed-off-by: JiaWei Jiang <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My ideal test is like this.
Note: change the flytefile and flytedir path to local.
you can refer these 2 URLs.
flytekit/tests/flytekit/unit/core/test_flyte_file.py
Lines 31 to 39 in ff2d0da
@pytest.fixture | |
def local_dummy_file(): | |
fd, path = tempfile.mkstemp() | |
try: | |
with os.fdopen(fd, "w") as tmp: | |
tmp.write("Hello world") | |
yield path | |
finally: | |
os.remove(path) |
flytekit/tests/flytekit/unit/core/test_flyte_directory.py
Lines 33 to 41 in ff2d0da
@pytest.fixture | |
def local_dummy_directory(): | |
temp_dir = tempfile.TemporaryDirectory() | |
try: | |
with open(os.path.join(temp_dir.name, "file"), "w") as tmp: | |
tmp.write("Hello world") | |
yield temp_dir.name | |
finally: | |
temp_dir.cleanup() |
for structured dataset and flyteschema, you can add parquet file from here.
https://github.com/flyteorg/flytekit/tree/master/tests/flytekit/integration/remote/workflows/basic/data/df.parquet
import typing
import os
from dataclasses import dataclass, field
from typing import Dict, List
from flytekit.types.file import FlyteFile
from flytekit.types.directory import FlyteDirectory
from flytekit.types.structured import StructuredDataset
from flytekit.types.schema import FlyteSchema
from flytekit import task, workflow, ImageSpec
from enum import Enum
import pandas as pd
class Status(Enum):
PENDING = "pending"
APPROVED = "approved"
REJECTED = "rejected"
@dataclass
class InnerDC:
a: int = -1
b: float = 2.1
c: str = "Hello, Flyte"
d: bool = False
e: List[int] = field(default_factory=lambda: [0, 1, 2, -1, -2])
f: List[FlyteFile] = field(default_factory=lambda: [FlyteFile("s3://my-s3-bucket/s3_flyte_dir/example.txt")])
g: List[List[int]] = field(default_factory=lambda: [[0], [1], [-1]])
h: List[Dict[int, bool]] = field(default_factory=lambda: [{0: False}, {1: True}, {-1: True}])
i: Dict[int, bool] = field(default_factory=lambda: {0: False, 1: True, -1: False})
j: Dict[int, FlyteFile] = field(default_factory=lambda: {
0: FlyteFile("s3://my-s3-bucket/s3_flyte_dir/example.txt"),
1: FlyteFile("s3://my-s3-bucket/s3_flyte_dir/example.txt"),
-1: FlyteFile("s3://my-s3-bucket/s3_flyte_dir/example.txt")
})
k: Dict[int, List[int]] = field(default_factory=lambda: {0: [0, 1, -1]})
l: Dict[int, Dict[int, int]] = field(default_factory=lambda: {1: {-1: 0}})
m: dict = field(default_factory=lambda: {"key": "value"})
n: FlyteFile = field(default_factory=lambda: FlyteFile("s3://my-s3-bucket/s3_flyte_dir/example.txt"))
o: FlyteDirectory = field(default_factory=lambda: FlyteDirectory("s3://my-s3-bucket/s3_flyte_dir"))
enum_status: Status = field(default=Status.PENDING)
sd: StructuredDataset = field(default_factory=lambda: StructuredDataset(uri="s3://my-s3-bucket/s3_flyte_dir/df.parquet", file_format="parquet"))
fsc: FlyteSchema = field(default_factory=lambda: FlyteSchema(remote_path="s3://my-s3-bucket/s3_flyte_dir/df.parquet"))
@dataclass
class DC:
a: int = -1
b: float = 2.1
c: str = "Hello, Flyte"
d: bool = False
e: List[int] = field(default_factory=lambda: [0, 1, 2, -1, -2])
f: List[FlyteFile] = field(default_factory=lambda: [FlyteFile("s3://my-s3-bucket/s3_flyte_dir/example.txt")])
g: List[List[int]] = field(default_factory=lambda: [[0], [1], [-1]])
h: List[Dict[int, bool]] = field(default_factory=lambda: [{0: False}, {1: True}, {-1: True}])
i: Dict[int, bool] = field(default_factory=lambda: {0: False, 1: True, -1: False})
j: Dict[int, FlyteFile] = field(default_factory=lambda: {
0: FlyteFile("s3://my-s3-bucket/s3_flyte_dir/example.txt"),
1: FlyteFile("s3://my-s3-bucket/s3_flyte_dir/example.txt"),
-1: FlyteFile("s3://my-s3-bucket/s3_flyte_dir/example.txt")
})
k: Dict[int, List[int]] = field(default_factory=lambda: {0: [0, 1, -1]})
l: Dict[int, Dict[int, int]] = field(default_factory=lambda: {1: {-1: 0}})
m: dict = field(default_factory=lambda: {"key": "value"})
n: FlyteFile = field(default_factory=lambda: FlyteFile("s3://my-s3-bucket/s3_flyte_dir/example.txt"))
o: FlyteDirectory = field(default_factory=lambda: FlyteDirectory("s3://my-s3-bucket/s3_flyte_dir"))
inner_dc: InnerDC = field(default_factory=lambda: InnerDC())
enum_status: Status = field(default=Status.PENDING)
sd: StructuredDataset = field(default_factory=lambda: StructuredDataset(uri="s3://my-s3-bucket/s3_flyte_dir/df.parquet", file_format="parquet"))
fsc: FlyteSchema = field(default_factory=lambda: FlyteSchema(remote_path="s3://my-s3-bucket/s3_flyte_dir/df.parquet"))
@task(container_image=image)
def t_dc(dc: DC) -> DC:
return dc
@task(container_image=image)
def t_inner(inner_dc: InnerDC) -> InnerDC:
assert isinstance(inner_dc, InnerDC), "inner_dc is not of type InnerDC"
expected_file_content = "Default content"
# f: List[FlyteFile]
for ff in inner_dc.f:
assert isinstance(ff, FlyteFile), "Expected FlyteFile"
with open(ff, "r") as f:
assert f.read() == expected_file_content, "File content mismatch in f"
# j: Dict[int, FlyteFile]
for _, ff in inner_dc.j.items():
assert isinstance(ff, FlyteFile), "Expected FlyteFile in j"
with open(ff, "r") as f:
assert f.read() == expected_file_content, "File content mismatch in j"
# n: FlyteFile
assert isinstance(inner_dc.n, FlyteFile), "n is not FlyteFile"
with open(inner_dc.n, "r") as f:
assert f.read() == expected_file_content, "File content mismatch in n"
# o: FlyteDirectory
assert isinstance(inner_dc.o, FlyteDirectory), "o is not FlyteDirectory"
assert not inner_dc.o.downloaded, "o should not be downloaded initially"
with open(os.path.join(inner_dc.o, "example.txt"), "r") as fh:
assert fh.read() == expected_file_content, "File content mismatch in o"
assert inner_dc.o.downloaded, "o should be marked as downloaded after access"
assert inner_dc.enum_status == Status.PENDING, "enum_status does not match"
assert isinstance(inner_dc.sd, StructuredDataset), "sd is not StructuredDataset"
assert isinstance(inner_dc.fsc, FlyteSchema), "fsc is not FlyteSchema"
print("All checks in InnerDC passed")
return inner_dc
@task(container_image=image)
def t_test_all_attributes(a: int, b: float, c: str, d: bool, e: List[int], f: List[FlyteFile], g: List[List[int]],
h: List[Dict[int, bool]], i: Dict[int, bool], j: Dict[int, FlyteFile],
k: Dict[int, List[int]], l: Dict[int, Dict[int, int]], m: dict,
n: FlyteFile, o: FlyteDirectory, enum_status: Status, sd: StructuredDataset,
fsc: FlyteSchema
):
# Strict type checks for simple types
assert isinstance(a, int), f"a is not int, it's {type(a)}"
assert a == -1
assert isinstance(b, float), f"b is not float, it's {type(b)}"
assert isinstance(c, str), f"c is not str, it's {type(c)}"
assert isinstance(d, bool), f"d is not bool, it's {type(d)}"
# Strict type checks for List[int]
assert isinstance(e, list) and all(isinstance(i, int) for i in e), "e is not List[int]"
# Strict type checks for List[FlyteFile]
assert isinstance(f, list) and all(isinstance(i, FlyteFile) for i in f), "f is not List[FlyteFile]"
# Strict type checks for List[List[int]]
assert isinstance(g, list) and all(
isinstance(i, list) and all(isinstance(j, int) for j in i) for i in g), "g is not List[List[int]]"
# Strict type checks for List[Dict[int, bool]]
assert isinstance(h, list) and all(
isinstance(i, dict) and all(isinstance(k, int) and isinstance(v, bool) for k, v in i.items()) for i in h
), "h is not List[Dict[int, bool]]"
# Strict type checks for Dict[int, bool]
assert isinstance(i, dict) and all(
isinstance(k, int) and isinstance(v, bool) for k, v in i.items()), "i is not Dict[int, bool]"
# Strict type checks for Dict[int, FlyteFile]
assert isinstance(j, dict) and all(
isinstance(k, int) and isinstance(v, FlyteFile) for k, v in j.items()), "j is not Dict[int, FlyteFile]"
# Strict type checks for Dict[int, List[int]]
assert isinstance(k, dict) and all(
isinstance(k, int) and isinstance(v, list) and all(isinstance(i, int) for i in v) for k, v in
k.items()), "k is not Dict[int, List[int]]"
# Strict type checks for Dict[int, Dict[int, int]]
assert isinstance(l, dict) and all(
isinstance(k, int) and isinstance(v, dict) and all(
isinstance(sub_k, int) and isinstance(sub_v, int) for sub_k, sub_v in v.items())
for k, v in l.items()), "l is not Dict[int, Dict[int, int]]"
# Strict type check for a generic dict
assert isinstance(m, dict), "m is not dict"
# Strict type check for FlyteFile
assert isinstance(n, FlyteFile), "n is not FlyteFile"
# Strict type check for FlyteDirectory
assert isinstance(o, FlyteDirectory), "o is not FlyteDirectory"
# # Strict type check for Enum
assert isinstance(enum_status, Status), "enum_status is not Status"
assert isinstance(sd, StructuredDataset), "sd is not StructuredDataset"
print("sd:", sd.open(pd.DataFrame).all())
assert isinstance(fsc, FlyteSchema), "fsc is not FlyteSchema"
print("fsc: ", fsc.open().all())
print("All attributes passed strict type checks.")
@workflow
def wf(dc: DC):
new_dc = t_dc(dc=dc)
t_inner(new_dc.inner_dc)
t_test_all_attributes(a=new_dc.a, b=new_dc.b, c=new_dc.c,
d=new_dc.d, e=new_dc.e, f=new_dc.f,
g=new_dc.g, h=new_dc.h, i=new_dc.i,
j=new_dc.j, k=new_dc.k, l=new_dc.l,
m=new_dc.m, n=new_dc.n, o=new_dc.o,
enum_status=new_dc.enum_status,
sd=new_dc.sd,
fsc=new_dc.fsc
)
t_test_all_attributes(a=new_dc.inner_dc.a, b=new_dc.inner_dc.b, c=new_dc.inner_dc.c,
d=new_dc.inner_dc.d, e=new_dc.inner_dc.e, f=new_dc.inner_dc.f,
g=new_dc.inner_dc.g, h=new_dc.inner_dc.h, i=new_dc.inner_dc.i,
j=new_dc.inner_dc.j, k=new_dc.inner_dc.k, l=new_dc.inner_dc.l,
m=new_dc.inner_dc.m, n=new_dc.inner_dc.n, o=new_dc.inner_dc.o,
enum_status=new_dc.inner_dc.enum_status,
sd=new_dc.inner_dc.sd, fsc=new_dc.inner_dc.fsc)
if __name__ == "__main__":
from flytekit.clis.sdk_in_container import pyflyte
from click.testing import CliRunner
import os
# FLYTE_USE_OLD_DC_FORMAT": "true"
# os.environ["FLYTE_USE_OLD_DC_FORMAT"] = "true"
runner = CliRunner()
path = os.path.realpath(__file__)
# os.environ["FLYTE_USE_OLD_DC_FORMAT"] = "True"
input_val = '{"a": -1, "b": 3.14}'
# result = runner.invoke(pyflyte.main, ["run", path, "wf", "--dc", input_val])
# print("Local Execution: ", result.output)
result = runner.invoke(pyflyte.main, ["run", "--remote", path, "wf", "--dc", input_val])
print("Remote Execution: ", result.output)
Tracking issue
Closes flyteorg/flyte#5959.
Why are the changes needed?
When resolving the protobuf struct with the attribute path, the return values can be of type
google.protobuf.struct_pb2.Struct
orgoogle.protobuf.struct_pb2.ListValue
. However,ProtobufTransformer
doesn't support convertingListValue
to flyteLiteral
, as can be seen here.What changes were proposed in this pull request?
Support
ListValue
conversion from a protobuf to a flyteLiteral
following the logic of remote execution.How was this patch tested?
Local test in the PR, flyteorg/flytekit#2894:
Will add unit test soon.
Setup process
Check all the applicable boxes
Related PRs
flyteorg/flytekit#2894
Docs link