diff --git a/RELEASE.md b/RELEASE.md index a8cd85e41..ff3f7c591 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -1,3 +1,8 @@ +# Release 1.7.2 +## Major Features and Improvements +* Separate the base connection address of the data storage table from the data table information, and compatible with historical versions; +* Optimize the component output data download interface. + # Release 1.7.1 ## Major Features and Improvements * Added the writer component, which supports exporting data to mysql and saving data as a new table; diff --git a/doc/configuration_instruction.zh.md b/doc/configuration_instruction.zh.md index 68e586e5d..07e73fbe1 100644 --- a/doc/configuration_instruction.zh.md +++ b/doc/configuration_instruction.zh.md @@ -15,7 +15,14 @@ use_registry: false # 是否启用更高安全级别的序列化模式 use_deserialize_safe_module: false +# fate on spark模式下是否启动依赖分发 dependent_distribution: false +# 是否启动密码加密(数据库密码),开启后配置encrypt_module和private_key才生效 +encrypt_password: false +# 加密包及加密函数(“#”号拼接) +encrypt_module: fate_arch.common.encrypt_utils#pwdecrypt +# 加密私钥 +private_key: fateflow: # 必须使用真实绑定的ip地址,避免因为多网卡/多IP引发的额外问题 # you must set real ip address, 127.0.0.1 and 0.0.0.0 is not supported diff --git a/examples/connector/create_or_update.json b/examples/connector/create_or_update.json new file mode 100644 index 000000000..473b68bae --- /dev/null +++ b/examples/connector/create_or_update.json @@ -0,0 +1,10 @@ +{ + "connector_name": "test", + "engine": "MYSQL", + "connector_info": { + "user": "fate", + "passwd": "fate", + "host": "127.0.0.1", + "port": 3306 + } +} \ No newline at end of file diff --git a/examples/table_bind/bind_mysql_table_by_connector_name.json b/examples/table_bind/bind_mysql_table_by_connector_name.json new file mode 100644 index 000000000..f04c5a291 --- /dev/null +++ b/examples/table_bind/bind_mysql_table_by_connector_name.json @@ -0,0 +1,15 @@ +{ + "engine": "MYSQL", + "address": { + "connector_name": "test", + "db": "experiment", + "name": "breast_hetero_guest" + }, + "namespace": "experiment", + "name": "breast_hetero_guest", + "head": 1, + "id_delimiter": ",", + "partitions": 10, + "id_column": "id", + "feature_column": "y,x0,x1,x2,x3,x4,x5,x6,x7,x8,x9" +} \ No newline at end of file diff --git a/fateflow.env b/fateflow.env deleted file mode 100755 index 8cfda3c41..000000000 --- a/fateflow.env +++ /dev/null @@ -1,10 +0,0 @@ -FATE=1.7.1 -FATEFlow=1.7.1 -FATEBoard=1.7.2 -EGGROLL=2.4.2 -CENTOS=7.2 -UBUNTU=16.04 -PYTHON=3.6.5 -MAVEN=3.6.3 -JDK=8 -SPARK=2.4.1 diff --git a/python/fate_flow/apps/table_app.py b/python/fate_flow/apps/table_app.py index 7d8089881..f36d2f4c5 100644 --- a/python/fate_flow/apps/table_app.py +++ b/python/fate_flow/apps/table_app.py @@ -14,19 +14,37 @@ # limitations under the License. # from fate_arch import storage +from fate_arch.metastore.db_utils import StorageConnector from fate_arch.session import Session +from fate_arch.storage import StorageTableMeta from fate_flow.entity import RunParameters from fate_flow.manager.data_manager import DataTableTracker, TableStorage from fate_flow.operation.job_saver import JobSaver from fate_flow.operation.job_tracker import Tracker from fate_flow.worker.task_executor import TaskExecutor from fate_flow.utils.api_utils import get_json_result, error_response -from fate_flow.utils import detect_utils, job_utils, schedule_utils -from fate_flow.utils.detect_utils import validate_request +from fate_flow.utils import job_utils, schedule_utils from flask import request from fate_flow.utils.detect_utils import validate_request +@manager.route('/connector/create', methods=['POST']) +def create_storage_connector(): + request_data = request.json + address = StorageTableMeta.create_address(request_data.get("engine"), request_data.get("connector_info")) + connector = StorageConnector(connector_name=request_data.get("connector_name"), engine=request_data.get("engine"), + connector_info=address.connector) + connector.create_or_update() + return get_json_result(retcode=0, retmsg='success') + + +@manager.route('/connector/query', methods=['POST']) +def query_storage_connector(): + request_data = request.json + connector = StorageConnector(connector_name=request_data.get("connector_name")) + return get_json_result(retcode=0, retmsg='success', data=connector.get_info()) + + @manager.route('/add', methods=['post']) @manager.route('/bind', methods=['post']) @validate_request("engine", "address", "namespace", "name") @@ -53,6 +71,8 @@ def table_bind(): schema = None if id_column and feature_column: schema = {'header': feature_column, 'sid': id_column} + elif id_column: + schema = {'sid': id_column, 'header': ''} sess = Session() storage_session = sess.storage(storage_engine=engine, options=request_data.get("options")) table = storage_session.create_table(address=address, name=name, namespace=namespace, diff --git a/python/fate_flow/apps/tracking_app.py b/python/fate_flow/apps/tracking_app.py index 4fd64f5ae..9576779d7 100644 --- a/python/fate_flow/apps/tracking_app.py +++ b/python/fate_flow/apps/tracking_app.py @@ -219,13 +219,14 @@ def component_output_data(): totals.append(total) if output_data: header = get_component_output_data_schema(output_table_meta=output_table_meta, is_str=is_str, - extend_header=extend_header) + extend_header=extend_header) headers.append(header) else: headers.append(None) if len(output_data_list) == 1 and not output_data_list[0]: return get_json_result(retcode=0, retmsg='no data', data=[]) - return get_json_result(retcode=0, retmsg='success', data=output_data_list, meta={'header': headers, 'total': totals, 'names':data_names}) + return get_json_result(retcode=0, retmsg='success', data=output_data_list, + meta={'header': headers, 'total': totals, 'names': data_names}) @manager.route('/component/output/data/download', methods=['get']) diff --git a/python/fate_flow/controller/job_controller.py b/python/fate_flow/controller/job_controller.py index 5137d12b0..c2169bcb6 100644 --- a/python/fate_flow/controller/job_controller.py +++ b/python/fate_flow/controller/job_controller.py @@ -590,8 +590,9 @@ def log_reload(cls, job): def output_reload(cls, job, source_tasks: dict, target_tasks: dict): # model reload schedule_logger(job.f_job_id).info("start reload model") - cls.output_model_reload(job) - cls.checkpoint_reload(job) + source_job = JobSaver.query_job(job_id=job.f_inheritance_info.get("job_id"))[0] + cls.output_model_reload(job, source_job) + cls.checkpoint_reload(job, source_job) schedule_logger(job.f_job_id).info("start reload data") source_tracker_dict = cls.load_task_tracker(source_tasks) target_tracker_dict = cls.load_task_tracker(target_tasks) @@ -634,22 +635,43 @@ def status_reload(cls, job, source_tasks, target_tasks): schedule_logger(job.f_job_id).info("reload status success") @classmethod - def output_model_reload(cls, job): - - model_id = model_utils.gen_party_model_id(job.f_runtime_conf.get("job_parameters").get("common").get("model_id"), - job.f_role, job.f_party_id) - PipelinedModel(model_id=model_id, model_version=job.f_job_id).reload_component_model(model_id=model_id, model_version=job.f_inheritance_info.get("job_id"), - component_list=job.f_inheritance_info.get("component_list")) - @classmethod - def checkpoint_reload(cls, job): + def output_model_reload(cls, job, source_job): + source_model_id = model_utils.gen_party_model_id( + source_job.f_runtime_conf.get("job_parameters").get("common").get("model_id"), + job.f_role, + job.f_party_id + ) + model_id = model_utils.gen_party_model_id( + job.f_runtime_conf.get("job_parameters").get("common").get("model_id"), + job.f_role, + job.f_party_id + ) + PipelinedModel( + model_id=model_id, + model_version=job.f_job_id + ).reload_component_model( + model_id=source_model_id, + model_version=job.f_inheritance_info.get("job_id"), + component_list=job.f_inheritance_info.get("component_list") + ) + + @classmethod + def checkpoint_reload(cls, job, source_job): for component_name in job.f_inheritance_info.get("component_list"): - path = CheckpointManager(role=job.f_role, party_id=job.f_party_id, - component_name=component_name, model_version=job.f_inheritance_info.get("job_id"), - model_id=job.f_runtime_conf.get("job_parameters").get("common").get("model_id")).directory - target_path = CheckpointManager(role=job.f_role, party_id=job.f_party_id, - component_name=component_name, model_version=job.f_job_id, - model_id=job.f_runtime_conf.get("job_parameters").get("common").get( - "model_id")).directory + path = CheckpointManager( + role=job.f_role, + party_id=job.f_party_id, + component_name=component_name, + model_version=job.f_inheritance_info.get("job_id"), + model_id=source_job.f_runtime_conf.get("job_parameters").get("common").get("model_id") + ).directory + target_path = CheckpointManager( + role=job.f_role, + party_id=job.f_party_id, + component_name=component_name, + model_version=job.f_job_id, + model_id=job.f_runtime_conf.get("job_parameters").get("common").get("model_id") + ).directory if os.path.exists(path): if os.path.exists(target_path): shutil.rmtree(target_path) diff --git a/python/fate_flow/db/db_models.py b/python/fate_flow/db/db_models.py index 460c85003..f8bc28b12 100644 --- a/python/fate_flow/db/db_models.py +++ b/python/fate_flow/db/db_models.py @@ -125,10 +125,20 @@ class Meta: def init_database_tables(): members = inspect.getmembers(sys.modules[__name__], inspect.isclass) table_objs = [] + create_failed_list = [] for name, obj in members: if obj != DataBaseModel and issubclass(obj, DataBaseModel): table_objs.append(obj) - DB.create_tables(table_objs) + LOGGER.info(f"start create table {obj.__name__}") + try: + obj.create_table() + LOGGER.info(f"create table success: {obj.__name__}") + except Exception as e: + LOGGER.exception(e) + create_failed_list.append(obj.__name__) + if create_failed_list: + LOGGER.info(f"create tables failed: {create_failed_list}") + raise Exception(f"create tables failed: {create_failed_list}") def fill_db_model_object(model_object, human_model_dict): @@ -141,40 +151,40 @@ def fill_db_model_object(model_object, human_model_dict): class Job(DataBaseModel): # multi-party common configuration - f_user_id = CharField(max_length=25, index=True, null=True) + f_user_id = CharField(max_length=25, null=True) f_job_id = CharField(max_length=25, index=True) f_name = CharField(max_length=500, null=True, default='') f_description = TextField(null=True, default='') - f_tag = CharField(max_length=50, null=True, index=True, default='') + f_tag = CharField(max_length=50, null=True, default='') f_dsl = JSONField() f_runtime_conf = JSONField() f_runtime_conf_on_party = JSONField() f_train_runtime_conf = JSONField(null=True) f_roles = JSONField() - f_initiator_role = CharField(max_length=50, index=True) - f_initiator_party_id = CharField(max_length=50, index=True) - f_status = CharField(max_length=50, index=True) - f_status_code = IntegerField(null=True, index=True) + f_initiator_role = CharField(max_length=50) + f_initiator_party_id = CharField(max_length=50) + f_status = CharField(max_length=50) + f_status_code = IntegerField(null=True) f_user = JSONField() # this party configuration f_role = CharField(max_length=50, index=True) f_party_id = CharField(max_length=10, index=True) - f_is_initiator = BooleanField(null=True, index=True, default=False) + f_is_initiator = BooleanField(null=True, default=False) f_progress = IntegerField(null=True, default=0) - f_ready_signal = BooleanField(index=True, default=False) + f_ready_signal = BooleanField(default=False) f_ready_time = BigIntegerField(null=True) - f_cancel_signal = BooleanField(index=True, default=False) + f_cancel_signal = BooleanField(default=False) f_cancel_time = BigIntegerField(null=True) - f_rerun_signal = BooleanField(index=True, default=False) + f_rerun_signal = BooleanField(default=False) f_end_scheduling_updates = IntegerField(null=True, default=0) - f_engine_name = CharField(max_length=50, null=True, index=True) - f_engine_type = CharField(max_length=10, null=True, index=True) - f_cores = IntegerField(index=True, default=0) - f_memory = IntegerField(index=True, default=0) # MB - f_remaining_cores = IntegerField(index=True, default=0) - f_remaining_memory = IntegerField(index=True, default=0) # MB - f_resource_in_use = BooleanField(index=True, default=False) + f_engine_name = CharField(max_length=50, null=True) + f_engine_type = CharField(max_length=10, null=True) + f_cores = IntegerField(default=0) + f_memory = IntegerField(default=0) # MB + f_remaining_cores = IntegerField(default=0) + f_remaining_memory = IntegerField(default=0) # MB + f_resource_in_use = BooleanField(default=False) f_apply_resource_time = BigIntegerField(null=True) f_return_resource_time = BigIntegerField(null=True) @@ -196,26 +206,26 @@ class Task(DataBaseModel): # multi-party common configuration f_job_id = CharField(max_length=25, index=True) f_component_name = TextField() - f_component_module = CharField(max_length=200, index=True) - f_task_id = CharField(max_length=100, index=True) - f_task_version = BigIntegerField(index=True) - f_initiator_role = CharField(max_length=50, index=True) - f_initiator_party_id = CharField(max_length=50, index=True, default=-1) - f_federated_mode = CharField(max_length=10, index=True) - f_federated_status_collect_type = CharField(max_length=10, index=True) + f_component_module = CharField(max_length=200) + f_task_id = CharField(max_length=100) + f_task_version = BigIntegerField() + f_initiator_role = CharField(max_length=50) + f_initiator_party_id = CharField(max_length=50, default=-1) + f_federated_mode = CharField(max_length=10) + f_federated_status_collect_type = CharField(max_length=10) f_status = CharField(max_length=50, index=True) - f_status_code = IntegerField(null=True, index=True) - f_auto_retries = IntegerField(default=0, index=True) + f_status_code = IntegerField(null=True) + f_auto_retries = IntegerField(default=0) f_auto_retry_delay = IntegerField(default=0) # this party configuration f_role = CharField(max_length=50, index=True) f_party_id = CharField(max_length=10, index=True) f_run_on_this_party = BooleanField(null=True, index=True, default=False) - f_worker_id = CharField(null=True, max_length=100, index=True) + f_worker_id = CharField(null=True, max_length=100) f_cmd = JSONField(null=True) f_run_ip = CharField(max_length=100, null=True) f_run_pid = IntegerField(null=True) - f_party_status = CharField(max_length=50, index=True) + f_party_status = CharField(max_length=50) f_provider_info = JSONField() f_component_parameters = JSONField() f_engine_conf = JSONField(null=True) @@ -260,11 +270,11 @@ class Meta: f_task_version = BigIntegerField(null=True, index=True) f_role = CharField(max_length=50, index=True) f_party_id = CharField(max_length=10, index=True) - f_metric_namespace = CharField(max_length=180, index=True) - f_metric_name = CharField(max_length=180, index=True) + f_metric_namespace = CharField(max_length=180) + f_metric_name = CharField(max_length=180) f_key = CharField(max_length=200) f_value = LongTextField() - f_type = IntegerField(index=True) # 0 is data, 1 is meta + f_type = IntegerField() # 0 is data, 1 is meta class TrackingOutputDataInfo(DataBaseModel): @@ -294,7 +304,7 @@ class Meta: f_job_id = CharField(max_length=25, index=True) f_component_name = TextField() f_task_id = CharField(max_length=100, null=True, index=True) - f_task_version = BigIntegerField(null=True, index=True) + f_task_version = BigIntegerField(null=True) f_data_name = CharField(max_length=30) # this party configuration f_role = CharField(max_length=50, index=True) @@ -305,8 +315,8 @@ class Meta: class MachineLearningModelInfo(DataBaseModel): - f_role = CharField(max_length=50, index=True) - f_party_id = CharField(max_length=10, index=True) + f_role = CharField(max_length=50) + f_party_id = CharField(max_length=10) f_roles = JSONField(default={}) f_job_id = CharField(max_length=25, index=True) f_model_id = CharField(max_length=100, index=True) @@ -314,8 +324,8 @@ class MachineLearningModelInfo(DataBaseModel): f_loaded_times = IntegerField(default=0) f_size = BigIntegerField(default=0) f_description = TextField(null=True, default='') - f_initiator_role = CharField(max_length=50, index=True) - f_initiator_party_id = CharField(max_length=50, index=True, default=-1) + f_initiator_role = CharField(max_length=50) + f_initiator_party_id = CharField(max_length=50, default=-1) f_runtime_conf = JSONField(default={}) f_train_dsl = JSONField(default={}) f_train_runtime_conf = JSONField(default={}) @@ -334,8 +344,8 @@ class Meta: class DataTableTracking(DataBaseModel): f_table_id = BigAutoField(primary_key=True) - f_table_name = CharField(max_length=300, index=True, null=True) - f_table_namespace = CharField(max_length=300, index=True, null=True) + f_table_name = CharField(max_length=300, null=True) + f_table_namespace = CharField(max_length=300, null=True) f_job_id = CharField(max_length=25, index=True, null=True) f_have_parent = BooleanField(default=False) f_parent_number = IntegerField(default=0) @@ -350,13 +360,13 @@ class Meta: class CacheRecord(DataBaseModel): - f_cache_key = CharField(max_length=500, primary_key=True) + f_cache_key = CharField(max_length=500) f_cache = JsonSerializedField() f_job_id = CharField(max_length=25, index=True, null=True) f_role = CharField(max_length=50, index=True, null=True) f_party_id = CharField(max_length=10, index=True, null=True) f_component_name = TextField(null=True) - f_task_id = CharField(max_length=100, null=True, index=True) + f_task_id = CharField(max_length=100, null=True) f_task_version = BigIntegerField(null=True, index=True) f_cache_name = CharField(max_length=50, null=True) t_ttl = BigIntegerField(default=0) @@ -376,7 +386,7 @@ class Meta: class Tag(DataBaseModel): f_id = BigAutoField(primary_key=True) - f_name = CharField(max_length=100, index=True, unique=True) + f_name = CharField(max_length=100, unique=True) f_desc = TextField(null=True) class Meta: @@ -410,7 +420,7 @@ class Meta: f_party_id = CharField(max_length=10, index=True) f_component_name = TextField() f_task_id = CharField(max_length=50, null=True, index=True) - f_task_version = CharField(max_length=50, null=True, index=True) + f_task_version = CharField(max_length=50, null=True) f_summary = LongTextField() @@ -420,8 +430,8 @@ class ModelOperationLog(DataBaseModel): f_initiator_role = CharField(max_length=50, index=True, null=True) f_initiator_party_id = CharField(max_length=10, index=True, null=True) f_request_ip = CharField(max_length=20, null=True) - f_model_id = CharField(max_length=100, index=True) - f_model_version = CharField(max_length=100, index=True) + f_model_id = CharField(max_length=100) + f_model_version = CharField(max_length=100) class Meta: db_table = "t_model_operation_log" @@ -432,11 +442,11 @@ class EngineRegistry(DataBaseModel): f_engine_name = CharField(max_length=50, index=True) f_engine_entrance = CharField(max_length=50, index=True) f_engine_config = JSONField() - f_cores = IntegerField(index=True) - f_memory = IntegerField(index=True) # MB - f_remaining_cores = IntegerField(index=True) - f_remaining_memory = IntegerField(index=True) # MB - f_nodes = IntegerField(index=True) + f_cores = IntegerField() + f_memory = IntegerField() # MB + f_remaining_cores = IntegerField() + f_remaining_memory = IntegerField() # MB + f_nodes = IntegerField() class Meta: db_table = "t_engine_registry" @@ -481,9 +491,9 @@ class WorkerInfo(DataBaseModel): f_worker_id = CharField(max_length=100, primary_key=True) f_worker_name = CharField(max_length=50, index=True) f_job_id = CharField(max_length=25, index=True) - f_task_id = CharField(max_length=100, index=True) + f_task_id = CharField(max_length=100) f_task_version = BigIntegerField(index=True) - f_role = CharField(max_length=50, index=True) + f_role = CharField(max_length=50) f_party_id = CharField(max_length=10, index=True) f_run_ip = CharField(max_length=100, null=True) f_run_pid = IntegerField(null=True) diff --git a/python/fate_flow/manager/data_manager.py b/python/fate_flow/manager/data_manager.py index b396e181e..62a693474 100644 --- a/python/fate_flow/manager/data_manager.py +++ b/python/fate_flow/manager/data_manager.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import datetime import io import json import operator @@ -30,6 +31,7 @@ from fate_flow.settings import stat_logger from fate_flow.db.db_models import DB, TrackingMetric, DataTableTracking from fate_flow.utils import data_utils +from fate_flow.utils.base_utils import get_fate_flow_directory from fate_flow.utils.data_utils import get_header_schema @@ -195,11 +197,11 @@ def put_in_table(table: StorageTableABC, k, v, temp, count, part_of_data, max_nu def send_table(output_tables_meta, tar_file_name, limit=-1, need_head=True): output_data_file_list = [] output_data_meta_file_list = [] - output_tmp_dir = os.path.join(os.getcwd(), 'tmp/{}'.format(fate_uuid())) + output_tmp_dir = os.path.join(get_fate_flow_directory(), 'tmp/{}/{}'.format(datetime.datetime.now().strftime("%Y%m%d"), fate_uuid())) for output_name, output_table_meta in output_tables_meta.items(): output_data_count = 0 - is_str = False output_data_file_path = "{}/{}.csv".format(output_tmp_dir, output_name) + output_data_meta_file_path = "{}/{}.meta".format(output_tmp_dir, output_name) os.makedirs(os.path.dirname(output_data_file_path), exist_ok=True) with open(output_data_file_path, 'w') as fw: with Session() as sess: @@ -209,43 +211,37 @@ def send_table(output_tables_meta, tar_file_name, limit=-1, need_head=True): for k, v in output_table.collect(): data_line, is_str, extend_header = feature_utils.get_component_output_data_line(src_key=k, src_value=v) + # save meta + if output_data_count == 0: + output_data_file_list.append(output_data_file_path) + header = get_component_output_data_schema(output_table_meta=output_table_meta, + is_str=is_str, + extend_header=extend_header) + output_data_meta_file_list.append(output_data_meta_file_path) + with open(output_data_meta_file_path, 'w') as f: + json.dump({'header': header}, f, indent=4) + if need_head and header and output_table_meta.get_have_head(): + fw.write('{}\n'.format(','.join(header))) fw.write('{}\n'.format(','.join(map(lambda x: str(x), data_line)))) output_data_count += 1 if output_data_count == limit: break - - if output_data_count: - # get meta - output_data_file_list.append(output_data_file_path) - header = get_component_output_data_schema(output_table_meta=output_table_meta, - is_str=is_str, - extend_header=extend_header) - output_data_meta_file_path = "{}/{}.meta".format(output_tmp_dir, output_name) - output_data_meta_file_list.append(output_data_meta_file_path) - with open(output_data_meta_file_path, 'w') as fw: - json.dump({'header': header}, fw, indent=4) - if need_head and header: - with open(output_data_file_path, 'r+') as f: - content = f.read() - f.seek(0, 0) - f.write('{}\n'.format(','.join(header)) + content) # tar - memory_file = io.BytesIO() - tar = tarfile.open(fileobj=memory_file, mode='w:gz') + output_data_tarfile = "{}/{}".format(output_tmp_dir, tar_file_name) + tar = tarfile.open(output_data_tarfile, mode='w:gz') for index in range(0, len(output_data_file_list)): tar.add(output_data_file_list[index], os.path.relpath(output_data_file_list[index], output_tmp_dir)) tar.add(output_data_meta_file_list[index], os.path.relpath(output_data_meta_file_list[index], output_tmp_dir)) tar.close() - memory_file.seek(0) - output_data_file_list.extend(output_data_meta_file_list) - for path in output_data_file_list: + for key, path in enumerate(output_data_file_list): try: - shutil.rmtree(os.path.dirname(path)) + os.remove(path) + os.remove(output_data_meta_file_list[key]) except Exception as e: # warning stat_logger.warning(e) - return send_file(memory_file, attachment_filename=tar_file_name, as_attachment=True) + return send_file(output_data_tarfile, attachment_filename=tar_file_name) def delete_tables_by_table_infos(output_data_table_infos): @@ -293,7 +289,7 @@ def get_component_output_data_schema(output_table_meta, extend_header, is_str=Fa # get schema schema = output_table_meta.get_schema() if not schema: - return ['sid'] + return [] header = [schema.get('sid_name', 'sid')] if "label" in extend_header and schema.get("label_name"): extend_header[extend_header.index("label")] = schema.get("label_name") diff --git a/python/fate_flow/pipelined_model/mysql_model_storage.py b/python/fate_flow/pipelined_model/mysql_model_storage.py index 41b5646c6..b0d497967 100644 --- a/python/fate_flow/pipelined_model/mysql_model_storage.py +++ b/python/fate_flow/pipelined_model/mysql_model_storage.py @@ -20,6 +20,7 @@ from peewee import Model, CharField, BigIntegerField, TextField, CompositeKey, IntegerField, PeeweeException from playhouse.pool import PooledMySQLDatabase +from fate_arch.common.conf_utils import decrypt_database_config from fate_flow.pipelined_model.pipelined_model import PipelinedModel from fate_flow.pipelined_model.model_storage_base import ModelStorageBase from fate_flow.utils.log_utils import getLogger @@ -149,6 +150,7 @@ def restore(self, model_id: str, model_version: str, store_address: dict): def get_connection(store_address: dict): store_address = deepcopy(store_address) db_name = store_address.pop('database') + store_address = decrypt_database_config(store_address, passwd_key="password") del store_address['storage'] DB.init(db_name, **store_address) diff --git a/python/fate_flow/scheduler/federated_scheduler.py b/python/fate_flow/scheduler/federated_scheduler.py index 2599bff10..364166e5e 100644 --- a/python/fate_flow/scheduler/federated_scheduler.py +++ b/python/fate_flow/scheduler/federated_scheduler.py @@ -283,7 +283,7 @@ def federated_command(cls, job_id, src_role, src_party_id, dest_role, dest_party "retmsg": "Federated schedule error, {}".format(e) } if response["retcode"] != RetCode.SUCCESS: - if response["retcode"] == RetCode.NOT_EFFECTIVE: + if response["retcode"] in [RetCode.NOT_EFFECTIVE, RetCode.RUNNING]: schedule_logger(job_id).warning(warning_log(msg=log_msg, role=dest_role, party_id=dest_party_id)) else: schedule_logger(job_id).error(failed_log(msg=log_msg, role=dest_role, party_id=dest_party_id, detail=response["retmsg"])) @@ -354,7 +354,7 @@ def return_federated_response(cls, federated_response): federated_scheduling_status_code = FederatedSchedulingStatusCode.SUCCESS elif RetCode.EXCEPTION_ERROR in retcode_set: federated_scheduling_status_code = FederatedSchedulingStatusCode.ERROR - elif retcode_set == {RetCode.SUCCESS, RetCode.NOT_EFFECTIVE}: + elif RetCode.NOT_EFFECTIVE in retcode_set: federated_scheduling_status_code = FederatedSchedulingStatusCode.NOT_EFFECTIVE elif RetCode.SUCCESS in retcode_set: federated_scheduling_status_code = FederatedSchedulingStatusCode.PARTIAL diff --git a/python/fate_flow/settings.py b/python/fate_flow/settings.py index ed72c45c2..6ed2f904f 100644 --- a/python/fate_flow/settings.py +++ b/python/fate_flow/settings.py @@ -17,7 +17,7 @@ from fate_arch.computing import ComputingEngine from fate_arch.common import engine_utils -from fate_arch.common.conf_utils import get_base_config +from fate_arch.common.conf_utils import get_base_config, decrypt_database_config from fate_flow.utils.base_utils import get_fate_flow_directory from fate_flow.utils.log_utils import LoggerFactory, getLogger @@ -61,7 +61,7 @@ ENGINES = engine_utils.get_engines() IS_STANDALONE = engine_utils.is_standalone() -DATABASE = get_base_config("database", {}) +DATABASE = decrypt_database_config() ZOOKEEPER = get_base_config("zookeeper", {}) FATE_FLOW_SERVER_START_CONFIG_ITEMS = { "use_registry", @@ -71,6 +71,7 @@ "database", "zookeeper", "enable_model_store", + "private_key", "encrypt_password", "encrypt_module" } # Registry