Skip to content

Commit

Permalink
Merge pull request #351 from FederatedAI/develop-1.9.1
Browse files Browse the repository at this point in the history
Develop 1.9.1
  • Loading branch information
zhihuiwan authored Nov 23, 2022
2 parents 3afbc3e + 81d975f commit 6feffaf
Show file tree
Hide file tree
Showing 19 changed files with 247 additions and 187 deletions.
7 changes: 7 additions & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
# Release 1.9.1
## Bug Fixes
* Fix parameter inheritance when loading non-model modules from ModelLoader
* Fix job inheritance after adding or removing roles from training configuration
* Fix delimiter error in uploaded/downloaded data
* Fix anonymous feature name renewal

# Release 1.9.0
## Major Features and Improvements
* Support high availability and load balancing to improve system availability and stability
Expand Down
2 changes: 1 addition & 1 deletion python/fate_flow/apps/data_access_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ def gen_data_access_job_config(config_data, access_module):
"file",
"namespace",
"name",
"delimiter",
"id_delimiter",
"storage_engine",
"storage_address",
"destroy",
Expand Down
4 changes: 3 additions & 1 deletion python/fate_flow/apps/table_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ def table_bind():
if request_data.get("extend_sid", False):
meta.with_match_id = True
schema.update({"meta": meta.to_dict()})
extra_schema["meta"] = meta.to_dict()
sess = Session()
storage_session = sess.storage(storage_engine=engine, options=request_data.get("options"))
table = storage_session.create_table(address=address, name=name, namespace=namespace,
Expand Down Expand Up @@ -112,9 +113,10 @@ def schema_update():
schema = data_table_meta.get_schema()
if request_data.get("schema", {}).get("meta"):
if schema.get("meta"):
schema = AnonymousGenerator.recover_schema(schema)
schema["meta"].update(request_data.get("schema").get("meta"))
else:
schema["meta"] = request_data.get("schema").get("meta")
return get_json_result(retcode=101, retmsg="no found meta")
request_data["schema"].pop("meta", {})
schema.update(request_data.get("schema", {}))
data_table_meta.update_metas(schema=schema)
Expand Down
19 changes: 12 additions & 7 deletions python/fate_flow/components/api_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,14 @@ def __init__(
id_delimiter=",",
head=True,
extend_sid=False,
timeout=60 * 60 * 8
timeout=60 * 12
):
self.server_name = server_name
self.parameters = parameters
self.id_delimiter = id_delimiter
self.head = head
self.extend_sid = extend_sid
self.time_out = timeout
self.timeout = timeout

def check(self):
return True
Expand Down Expand Up @@ -144,17 +144,22 @@ def output_feature_table(self):

def check_status(self, job_id):
query_registry_info = self.service_info.get("query")
for i in range(0, self.parameters.get("timeout", 60 * 5)):
logger.info(f"parameters timeout: {self.parameters.get('timeout', 60 * 12)} min")
for i in range(0, self.parameters.get("timeout", 60 * 12)):
status_response = getattr(requests, query_registry_info.f_method.lower(), None)(
url=query_registry_info.f_url,
json={"jobId": job_id}
)
logger.info(f"status: {status_response.text}")
if status_response.status_code == 200 and status_response.json().get("data").get("status") == "success":
logger.info(f"job id {job_id} status success, start download")
return True
if status_response.status_code == 200:
if status_response.json().get("data").get("status").lower() == "success":
logger.info(f"job id {job_id} status success, start download")
return True
if status_response.json().get("data").get("status").lower() != "running":
logger.error(f"job id {job_id} status: {status_response.json().get('data').get('status')}")
raise Exception(status_response.json().get("data"))
logger.info(f"job id {job_id} status: {status_response.json().get('data').get('status')}")
time.sleep(30)
time.sleep(60)
raise TimeoutError("check status timeout")

def download_data(self, job_id):
Expand Down
8 changes: 4 additions & 4 deletions python/fate_flow/components/model_operation.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ def check(self):

@model_store_cpn_meta.bind_runner.on_local
class ModelStore(ComponentBase):
def _run(self, input_cpn: ComponentInputProtocol):
parameters = input_cpn.parameters
def _run(self, cpn_input: ComponentInputProtocol):
parameters = cpn_input.parameters
model_storage = get_model_storage(parameters)
model_storage.store(
parameters["model_id"], parameters["model_version"],
Expand Down Expand Up @@ -99,8 +99,8 @@ def check(self):

@model_restore_cpn_meta.bind_runner.on_local
class ModelRestore(ComponentBase):
def _run(self, input_cpn: ComponentInputProtocol):
parameters = input_cpn.parameters
def _run(self, cpn_input: ComponentInputProtocol):
parameters = cpn_input.parameters
model_storage = get_model_storage(parameters)
model_storage.restore(
parameters["model_id"], parameters["model_version"],
Expand Down
6 changes: 4 additions & 2 deletions python/fate_flow/components/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ def to_save(self, src_table, dest_table):
schema=schema,
need_read=False
)
schema = self.update_anonymous(computing_table=src_computing_table,schema=schema)
schema = self.update_anonymous(computing_table=src_computing_table,schema=schema, src_table_meta=src_table_meta)
LOGGER.info(f"dest schema: {schema}")
dest_table.meta.update_metas(
schema=schema,
Expand All @@ -246,12 +246,14 @@ def to_save(self, src_table, dest_table):
LOGGER.info(
f"save {dest_table.namespace} {dest_table.name} success"
)
return src_computing_table

def update_anonymous(self, computing_table, schema):
def update_anonymous(self, computing_table, schema, src_table_meta):
if schema.get("meta"):
if "anonymous_header" not in schema:
schema.update(AnonymousGenerator.generate_header(computing_table, schema))
schema = AnonymousGenerator.generate_anonymous_header(schema=schema)
src_table_meta.update_metas(schema=schema)
schema = AnonymousGenerator.update_anonymous_header_with_role(schema, self.tracker.role, self.tracker.party_id)
return schema

Expand Down
2 changes: 2 additions & 0 deletions python/fate_flow/components/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ def _run(self, cpn_input: ComponentInputProtocol):
namespace = _namespace
if name is None:
name = _table_name
if self.parameters.get("with_meta"):
self.parameters["id_delimiter"] = self.parameters.get("meta").get("delimiter")
read_head = self.parameters["head"]
if read_head == 0:
head = False
Expand Down
72 changes: 33 additions & 39 deletions python/fate_flow/controller/job_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -548,25 +548,27 @@ def job_reload(cls, job):
source_inheritance_tasks, target_inheritance_tasks = cls.load_source_target_tasks(job)
schedule_logger(job.f_job_id).info(f"source_inheritance_tasks:{source_inheritance_tasks}, target_inheritance_tasks:{target_inheritance_tasks}")
cls.output_reload(job, source_inheritance_tasks, target_inheritance_tasks)
if job.f_is_initiator:
source_inheritance_tasks, target_inheritance_tasks = cls.load_source_target_tasks(job, update_status=True)
cls.status_reload(job, source_inheritance_tasks, target_inheritance_tasks)

@classmethod
def load_source_target_tasks(cls, job):
source_inheritance_tasks = cls.load_tasks(job_id=job.f_inheritance_info.get("job_id"), role=job.f_role,
party_id=job.f_party_id,
component_list=job.f_inheritance_info.get("component_list", []))
target_inheritance_tasks = cls.load_tasks(job_id=job.f_job_id, role=job.f_role, party_id=job.f_party_id,
component_list=job.f_inheritance_info.get("component_list", []))
def load_source_target_tasks(cls, job, update_status=False):
filters = {"component_list": job.f_inheritance_info.get("component_list", [])}
if not update_status:
filters.update({"role": job.f_role, "party_id": job.f_party_id})
source_inheritance_tasks = cls.load_tasks(job_id=job.f_inheritance_info.get("job_id"), **filters)
target_inheritance_tasks = cls.load_tasks(job_id=job.f_job_id, **filters)
return source_inheritance_tasks, target_inheritance_tasks

@classmethod
def load_tasks(cls, component_list, job_id, role, party_id):
tasks = JobSaver.query_task(job_id=job_id, role=role, party_id=party_id, only_latest=True)
def load_tasks(cls, component_list, job_id, **kwargs):
tasks = JobSaver.query_task(job_id=job_id, only_latest=True, **kwargs)
task_dict = {}
for cpn in component_list:
for task in tasks:
if cpn == task.f_component_name:
task_dict[cpn] = task
task_dict[f"{cpn}_{task.f_role}_{task.f_task_version}"] = task
return task_dict

@classmethod
Expand Down Expand Up @@ -599,9 +601,9 @@ def log_reload(cls, job):
def output_reload(cls, job, source_tasks: dict, target_tasks: dict):
# model reload
schedule_logger(job.f_job_id).info("start reload model")
source_job = JobSaver.query_job(job_id=job.f_inheritance_info.get("job_id"))[0]
cls.output_model_reload(job, source_job)
cls.checkpoint_reload(job, source_job)
source_jobs = JobSaver.query_job(job_id=job.f_inheritance_info["job_id"], role=job.f_role, party_id=job.f_party_id)
if source_jobs:
cls.output_model_reload(job, source_jobs[0])
schedule_logger(job.f_job_id).info("start reload data")
source_tracker_dict = cls.load_task_tracker(source_tasks)
target_tracker_dict = cls.load_task_tracker(target_tasks)
Expand Down Expand Up @@ -637,7 +639,10 @@ def status_reload(cls, job, source_tasks, target_tasks):
schedule_logger(job.f_job_id).info("start reload status")
# update task status
for key, source_task in source_tasks.items():
JobSaver.reload_task(source_task, target_tasks[key])
try:
JobSaver.reload_task(source_task, target_tasks[key])
except Exception as e:
schedule_logger(job.f_job_id).warning(f"reload failed: {e}")

# update job status
JobSaver.update_job(job_info={
Expand All @@ -662,36 +667,25 @@ def output_model_reload(cls, job, source_job):
)

query_args = (
PipelineComponentMeta.f_component_name.in_(
job.f_inheritance_info['component_list'],
),
PipelineComponentMeta.f_component_name.in_(job.f_inheritance_info['component_list']),
)

query = source_pipelined_component.get_define_meta_from_db(*query_args)

for row in query:
shutil.copytree(
source_pipelined_component.variables_data_path / row.f_component_name,
target_pipelined_component.variables_data_path / row.f_component_name,
)
for i in ('variables_data_path', 'run_parameters_path', 'checkpoint_path'):
source_dir = getattr(source_pipelined_component, i) / row.f_component_name
target_dir = getattr(target_pipelined_component, i) / row.f_component_name

if not source_dir.is_dir():
continue
if target_dir.is_dir():
shutil.rmtree(target_dir)

shutil.copytree(source_dir, target_dir)

source_pipelined_component.replicate_define_meta({
'f_role': target_pipelined_component.role,
'f_party_id': target_pipelined_component.party_id,
'f_model_id': target_pipelined_component.model_id,
'f_model_version': target_pipelined_component.model_version,
}, query_args)

@classmethod
def checkpoint_reload(cls, job, source_job):
for component_name in job.f_inheritance_info['component_list']:
source_path = CheckpointManager(
role=source_job.f_role, party_id=source_job.f_party_id,
model_id=source_job.f_runtime_conf['job_parameters']['common']['model_id'],
model_version=source_job.f_job_id, component_name=component_name,
).directory
target_path = CheckpointManager(
role=job.f_role, party_id=job.f_party_id,
model_id=job.f_runtime_conf['job_parameters']['common']['model_id'],
model_version=job.f_job_id, component_name=component_name,
).directory

if os.path.exists(source_path):
shutil.copytree(source_path, target_path)
}, query_args, True)
7 changes: 5 additions & 2 deletions python/fate_flow/db/db_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,8 +311,10 @@ def model(cls, table_index=None, date=None):
if ModelClass is None:
class Meta:
db_table = '%s_%s' % ('t_tracking_output_data_info', table_index)
primary_key = CompositeKey('f_job_id', 'f_task_id', 'f_task_version', 'f_data_name', 'f_role',
'f_party_id')
primary_key = CompositeKey(
'f_job_id', 'f_task_id', 'f_task_version',
'f_data_name', 'f_role', 'f_party_id',
)

attrs = {'__module__': cls.__module__, 'Meta': Meta}
ModelClass = type("%s_%s" % (cls.__name__, table_index), (cls,),
Expand Down Expand Up @@ -573,6 +575,7 @@ class Meta:
db_table = "t_site_key_info"
primary_key = CompositeKey('f_party_id', 'f_key_name')


class PipelineComponentMeta(DataBaseModel):
f_model_id = CharField(max_length=100, index=True)
f_model_version = CharField(max_length=100, index=True)
Expand Down
4 changes: 2 additions & 2 deletions python/fate_flow/db/db_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@


@DB.connection_context()
def bulk_insert_into_db(model, data_source, replace=False):
def bulk_insert_into_db(model, data_source, replace_on_conflict=False):
DB.create_tables([model])

current_time = current_timestamp()
Expand All @@ -48,7 +48,7 @@ def bulk_insert_into_db(model, data_source, replace=False):
for i in range(0, len(data_source), batch_size):
with DB.atomic():
query = model.insert_many(data_source[i:i + batch_size])
if replace:
if replace_on_conflict:
query = query.on_conflict(preserve=preserve)
query.execute()

Expand Down
9 changes: 0 additions & 9 deletions python/fate_flow/db/key_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,6 @@ def get_key(cls, party_id, key_name=SiteKeyName.PUBLIC.value):
else:
return None

@classmethod
@DB.connection_context()
def get_key(cls, party_id, key_name=SiteKeyName.PUBLIC.value):
site_info = SiteKeyInfo.query(party_id=party_id, key_name=key_name)
if site_info:
return site_info[0].f_key
else:
return None

@classmethod
@DB.connection_context()
def delete(cls, party_id, key_name=SiteKeyName.PUBLIC.value):
Expand Down
4 changes: 2 additions & 2 deletions python/fate_flow/entity/run_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class JobStatus(BaseStatus):
class StateTransitionRule(BaseStateTransitionRule):
RULES = {
StatusSet.READY: [StatusSet.WAITING, StatusSet.CANCELED, StatusSet.TIMEOUT, StatusSet.FAILED],
StatusSet.WAITING: [StatusSet.RUNNING, StatusSet.CANCELED, StatusSet.TIMEOUT, StatusSet.FAILED, StatusSet.SUCCESS],
StatusSet.WAITING: [StatusSet.RUNNING, StatusSet.CANCELED, StatusSet.TIMEOUT, StatusSet.FAILED, StatusSet.SUCCESS, StatusSet.PASS],
StatusSet.RUNNING: [StatusSet.CANCELED, StatusSet.TIMEOUT, StatusSet.FAILED, StatusSet.SUCCESS],
StatusSet.CANCELED: [StatusSet.WAITING],
StatusSet.TIMEOUT: [StatusSet.FAILED, StatusSet.SUCCESS, StatusSet.WAITING],
Expand All @@ -83,7 +83,7 @@ class TaskStatus(BaseStatus):

class StateTransitionRule(BaseStateTransitionRule):
RULES = {
StatusSet.WAITING: [StatusSet.RUNNING, StatusSet.SUCCESS],
StatusSet.WAITING: [StatusSet.RUNNING, StatusSet.SUCCESS, StatusSet.PASS],
StatusSet.RUNNING: [StatusSet.CANCELED, StatusSet.TIMEOUT, StatusSet.FAILED, StatusSet.PASS, StatusSet.SUCCESS],
StatusSet.CANCELED: [StatusSet.WAITING],
StatusSet.TIMEOUT: [StatusSet.FAILED, StatusSet.SUCCESS],
Expand Down
8 changes: 7 additions & 1 deletion python/fate_flow/manager/data_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ def reconstruct_header(schema):
obj = env_utils.get_class_object("data_format")
return obj.reconstruct_header(schema)

@staticmethod
def recover_schema(schema):
obj = env_utils.get_class_object("data_format")
return obj.recover_schema(schema)


class DataTableTracker(object):
@classmethod
Expand Down Expand Up @@ -317,7 +322,8 @@ def send_table(output_tables_meta, tar_file_name="", limit=-1, need_head=True, l
if need_head and header and output_table_meta.get_have_head() and \
output_table_meta.get_schema().get("is_display", True):
fw.write('{}\n'.format(','.join(header)))
fw.write('{}\n'.format(','.join(map(lambda x: str(x), data_line))))
delimiter = output_table_meta.get_id_delimiter() if output_table_meta.get_id_delimiter() else ","
fw.write('{}\n'.format(delimiter.join(map(lambda x: str(x), data_line))))
output_data_count += 1
if output_data_count == limit:
break
Expand Down
Loading

0 comments on commit 6feffaf

Please sign in to comment.