Skip to content

Commit

Permalink
Merge pull request #193 from FederatedAI/develop-1.7.2
Browse files Browse the repository at this point in the history
Develop 1.7.2
  • Loading branch information
zhihuiwan authored Feb 25, 2022
2 parents cc95e3b + a20bcc0 commit c51a051
Show file tree
Hide file tree
Showing 13 changed files with 192 additions and 113 deletions.
5 changes: 5 additions & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
# Release 1.7.2
## Major Features and Improvements
* Separate the base connection address of the data storage table from the data table information, and compatible with historical versions;
* Optimize the component output data download interface.

# Release 1.7.1
## Major Features and Improvements
* Added the writer component, which supports exporting data to mysql and saving data as a new table;
Expand Down
7 changes: 7 additions & 0 deletions doc/configuration_instruction.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,14 @@
use_registry: false
# 是否启用更高安全级别的序列化模式
use_deserialize_safe_module: false
# fate on spark模式下是否启动依赖分发
dependent_distribution: false
# 是否启动密码加密(数据库密码),开启后配置encrypt_module和private_key才生效
encrypt_password: false
# 加密包及加密函数(“#”号拼接)
encrypt_module: fate_arch.common.encrypt_utils#pwdecrypt
# 加密私钥
private_key:
fateflow:
# 必须使用真实绑定的ip地址,避免因为多网卡/多IP引发的额外问题
# you must set real ip address, 127.0.0.1 and 0.0.0.0 is not supported
Expand Down
10 changes: 10 additions & 0 deletions examples/connector/create_or_update.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"connector_name": "test",
"engine": "MYSQL",
"connector_info": {
"user": "fate",
"passwd": "fate",
"host": "127.0.0.1",
"port": 3306
}
}
15 changes: 15 additions & 0 deletions examples/table_bind/bind_mysql_table_by_connector_name.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"engine": "MYSQL",
"address": {
"connector_name": "test",
"db": "experiment",
"name": "breast_hetero_guest"
},
"namespace": "experiment",
"name": "breast_hetero_guest",
"head": 1,
"id_delimiter": ",",
"partitions": 10,
"id_column": "id",
"feature_column": "y,x0,x1,x2,x3,x4,x5,x6,x7,x8,x9"
}
10 changes: 0 additions & 10 deletions fateflow.env

This file was deleted.

24 changes: 22 additions & 2 deletions python/fate_flow/apps/table_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,37 @@
# limitations under the License.
#
from fate_arch import storage
from fate_arch.metastore.db_utils import StorageConnector
from fate_arch.session import Session
from fate_arch.storage import StorageTableMeta
from fate_flow.entity import RunParameters
from fate_flow.manager.data_manager import DataTableTracker, TableStorage
from fate_flow.operation.job_saver import JobSaver
from fate_flow.operation.job_tracker import Tracker
from fate_flow.worker.task_executor import TaskExecutor
from fate_flow.utils.api_utils import get_json_result, error_response
from fate_flow.utils import detect_utils, job_utils, schedule_utils
from fate_flow.utils.detect_utils import validate_request
from fate_flow.utils import job_utils, schedule_utils
from flask import request
from fate_flow.utils.detect_utils import validate_request


@manager.route('/connector/create', methods=['POST'])
def create_storage_connector():
request_data = request.json
address = StorageTableMeta.create_address(request_data.get("engine"), request_data.get("connector_info"))
connector = StorageConnector(connector_name=request_data.get("connector_name"), engine=request_data.get("engine"),
connector_info=address.connector)
connector.create_or_update()
return get_json_result(retcode=0, retmsg='success')


@manager.route('/connector/query', methods=['POST'])
def query_storage_connector():
request_data = request.json
connector = StorageConnector(connector_name=request_data.get("connector_name"))
return get_json_result(retcode=0, retmsg='success', data=connector.get_info())


@manager.route('/add', methods=['post'])
@manager.route('/bind', methods=['post'])
@validate_request("engine", "address", "namespace", "name")
Expand All @@ -53,6 +71,8 @@ def table_bind():
schema = None
if id_column and feature_column:
schema = {'header': feature_column, 'sid': id_column}
elif id_column:
schema = {'sid': id_column, 'header': ''}
sess = Session()
storage_session = sess.storage(storage_engine=engine, options=request_data.get("options"))
table = storage_session.create_table(address=address, name=name, namespace=namespace,
Expand Down
5 changes: 3 additions & 2 deletions python/fate_flow/apps/tracking_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,13 +219,14 @@ def component_output_data():
totals.append(total)
if output_data:
header = get_component_output_data_schema(output_table_meta=output_table_meta, is_str=is_str,
extend_header=extend_header)
extend_header=extend_header)
headers.append(header)
else:
headers.append(None)
if len(output_data_list) == 1 and not output_data_list[0]:
return get_json_result(retcode=0, retmsg='no data', data=[])
return get_json_result(retcode=0, retmsg='success', data=output_data_list, meta={'header': headers, 'total': totals, 'names':data_names})
return get_json_result(retcode=0, retmsg='success', data=output_data_list,
meta={'header': headers, 'total': totals, 'names': data_names})


@manager.route('/component/output/data/download', methods=['get'])
Expand Down
56 changes: 39 additions & 17 deletions python/fate_flow/controller/job_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -590,8 +590,9 @@ def log_reload(cls, job):
def output_reload(cls, job, source_tasks: dict, target_tasks: dict):
# model reload
schedule_logger(job.f_job_id).info("start reload model")
cls.output_model_reload(job)
cls.checkpoint_reload(job)
source_job = JobSaver.query_job(job_id=job.f_inheritance_info.get("job_id"))[0]
cls.output_model_reload(job, source_job)
cls.checkpoint_reload(job, source_job)
schedule_logger(job.f_job_id).info("start reload data")
source_tracker_dict = cls.load_task_tracker(source_tasks)
target_tracker_dict = cls.load_task_tracker(target_tasks)
Expand Down Expand Up @@ -634,22 +635,43 @@ def status_reload(cls, job, source_tasks, target_tasks):
schedule_logger(job.f_job_id).info("reload status success")

@classmethod
def output_model_reload(cls, job):

model_id = model_utils.gen_party_model_id(job.f_runtime_conf.get("job_parameters").get("common").get("model_id"),
job.f_role, job.f_party_id)
PipelinedModel(model_id=model_id, model_version=job.f_job_id).reload_component_model(model_id=model_id, model_version=job.f_inheritance_info.get("job_id"),
component_list=job.f_inheritance_info.get("component_list"))
@classmethod
def checkpoint_reload(cls, job):
def output_model_reload(cls, job, source_job):
source_model_id = model_utils.gen_party_model_id(
source_job.f_runtime_conf.get("job_parameters").get("common").get("model_id"),
job.f_role,
job.f_party_id
)
model_id = model_utils.gen_party_model_id(
job.f_runtime_conf.get("job_parameters").get("common").get("model_id"),
job.f_role,
job.f_party_id
)
PipelinedModel(
model_id=model_id,
model_version=job.f_job_id
).reload_component_model(
model_id=source_model_id,
model_version=job.f_inheritance_info.get("job_id"),
component_list=job.f_inheritance_info.get("component_list")
)

@classmethod
def checkpoint_reload(cls, job, source_job):
for component_name in job.f_inheritance_info.get("component_list"):
path = CheckpointManager(role=job.f_role, party_id=job.f_party_id,
component_name=component_name, model_version=job.f_inheritance_info.get("job_id"),
model_id=job.f_runtime_conf.get("job_parameters").get("common").get("model_id")).directory
target_path = CheckpointManager(role=job.f_role, party_id=job.f_party_id,
component_name=component_name, model_version=job.f_job_id,
model_id=job.f_runtime_conf.get("job_parameters").get("common").get(
"model_id")).directory
path = CheckpointManager(
role=job.f_role,
party_id=job.f_party_id,
component_name=component_name,
model_version=job.f_inheritance_info.get("job_id"),
model_id=source_job.f_runtime_conf.get("job_parameters").get("common").get("model_id")
).directory
target_path = CheckpointManager(
role=job.f_role,
party_id=job.f_party_id,
component_name=component_name,
model_version=job.f_job_id,
model_id=job.f_runtime_conf.get("job_parameters").get("common").get("model_id")
).directory
if os.path.exists(path):
if os.path.exists(target_path):
shutil.rmtree(target_path)
Expand Down
Loading

0 comments on commit c51a051

Please sign in to comment.