Skip to content

Commit

Permalink
Experimental Workflow: Fixed KeyError issue in Global_DP Implementati…
Browse files Browse the repository at this point in the history
…on_1 & Workflow_Interface_Vertical_FL (securefederatedai#991)

* updated participant names & added testcase to validate participant names

Signed-off-by: Ishant Thakare <[email protected]>

* Updated aggregator import

Signed-off-by: Ishant Thakare <[email protected]>

---------

Signed-off-by: Ishant Thakare <[email protected]>
  • Loading branch information
ishant162 authored Jun 21, 2024
1 parent 604b81c commit 41b175e
Show file tree
Hide file tree
Showing 13 changed files with 345 additions and 5 deletions.
5 changes: 2 additions & 3 deletions openfl/experimental/interface/cli/collaborator.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ def start_(plan, collaborator_name, secure, data_config="plan/data.yaml"):
import yaml
from yaml.loader import SafeLoader

collaborator_name = collaborator_name.lower()
with open(data_config, "r") as f:
data = yaml.load(f, Loader=SafeLoader)
if data.get(collaborator_name, None) is None:
Expand Down Expand Up @@ -118,7 +117,7 @@ def generate_cert_request(collaborator_name, silent, skip_package):
from openfl.cryptography.participant import generate_csr
from openfl.experimental.interface.cli.cli_helper import CERT_DIR

common_name = f"{collaborator_name}".lower()
common_name = f"{collaborator_name}"
subject_alternative_name = f"DNS:{common_name}"
file_name = f"col_{common_name}"

Expand Down Expand Up @@ -282,7 +281,7 @@ def certify(collaborator_name, silent, request_pkg=None, import_=False):
from openfl.experimental.interface.cli.cli_helper import CERT_DIR
from openfl.utilities.utils import rmtree

common_name = f"{collaborator_name}".lower()
common_name = f"{collaborator_name}"

if not import_:
if request_pkg:
Expand Down
4 changes: 2 additions & 2 deletions openfl/experimental/interface/participants.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ class Participant:

def __init__(self, name: str = ""):
self.private_attributes = {}
self._name = name.lower()
self._name = name

@property
def name(self):
return self._name

@name.setter
def name(self, name: str):
self._name = name.lower()
self._name = name

def private_attributes(self, attrs: Dict[str, Any]) -> None:
"""
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
current_plan_name: default

Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Copyright (C) 2020-2023 Intel Corporation
# Licensed subject to the terms of the separately executed evaluation license agreement between Intel Corporation and you.

collaborators:
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
## Copyright (C) 2020-2023 Intel Corporation
# Licensed subject to the terms of the separately executed evaluation license agreement between Intel Corporation and you.

# all keys under 'collaborators' corresponds to a specific colaborator name the corresponding dictionary has data_name, data_path pairs.
# Note that in the mnist case we do not store the data locally, and the data_path is used to pass an integer that helps the data object
# construct the shard of the mnist dataset to be use for this collaborator.

portland:
callable_func:
settings:
index: 1
template: src.collaborator_private_attrs.collaborator_private_attrs

NewYork:
callable_func:
settings:
index: 2
template: src.collaborator_private_attrs.collaborator_private_attrs

BANGALORE:
callable_func:
settings:
index: 3
template: src.collaborator_private_attrs.collaborator_private_attrs

collaborator_4:
callable_func:
settings:
index: 4
template: src.collaborator_private_attrs.collaborator_private_attrs

intel@lab_5:
callable_func:
settings:
index: 5
template: src.collaborator_private_attrs.collaborator_private_attrs

Tel-aviv#6:
callable_func:
settings:
index: 6
template: src.collaborator_private_attrs.collaborator_private_attrs

newDelhi$+7:
callable_func:
settings:
index: 7
template: src.collaborator_private_attrs.collaborator_private_attrs

port_land-@lab#8:
callable_func:
settings:
index: 8
template: src.collaborator_private_attrs.collaborator_private_attrs

San@francisco_&_9:
callable_func:
settings:
index: 9
template: src.collaborator_private_attrs.collaborator_private_attrs

los@angele$--@10:
callable_func:
settings:
index: 10
template: src.collaborator_private_attrs.collaborator_private_attrs

aggregator:
callable_func:
settings: {}
template: src.aggregator_private_attrs.aggregator_private_attrs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
../../workspace/plan/defaults

Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Copyright (C) 2020-2023 Intel Corporation
# Licensed subject to the terms of the separately executed evaluation license agreement between Intel Corporation and you.

aggregator :
defaults : plan/defaults/aggregator.yaml
template : openfl.experimental.component.aggregator.Aggregator
settings :
rounds_to_train : 10
log_metric_callback :
template : src.utils.write_metric


collaborator :
defaults : plan/defaults/collaborator.yaml
template : openfl.experimental.component.collaborator.Collaborator
settings : {}


federated_flow:
template: src.testflow_participant_name.TestFlowParticipantName
settings:
checkpoint: true


network :
defaults : plan/defaults/network.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
torch==1.13.1
torchvision==0.14.1
tensorboard
wheel>=0.38.0 # not directly required, pinned by Snyk to avoid a vulnerability
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Copyright (C) 2020-2021 Intel Corporation
# SPDX-License-Identifier: Apache-2.0
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Copyright (C) 2020-2023 Intel Corporation
# SPDX-License-Identifier: Apache-2.0
import numpy as np


def aggregator_private_attrs():
return {"test_loader": np.random.rand(10, 28, 28)} # Random data
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Copyright (C) 2020-2023 Intel Corporation
# SPDX-License-Identifier: Apache-2.0
import numpy as np


def collaborator_private_attrs(index):
return {
"train_loader": np.random.rand(index * 50, 28, 28),
"test_loader": np.random.rand(index * 10, 28, 28),
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
# Copyright (C) 2020-2023 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

from openfl.experimental.component import Aggregator
from openfl.experimental.interface import FLSpec
from openfl.experimental.placement import aggregator, collaborator


class bcolors: # NOQA: N801
HEADER = "\033[95m"
OKBLUE = "\033[94m"
OKCYAN = "\033[96m"
OKGREEN = "\033[92m"
WARNING = "\033[93m"
FAIL = "\033[91m"
ENDC = "\033[0m"
BOLD = "\033[1m"
UNDERLINE = "\033[4m"


class TestFlowParticipantName(FLSpec):
"""
Testflow to validate Aggregator private attributes are not accessible to collaborators
and vice versa
"""

ERROR_LIST = []

@aggregator
def start(self):
"""
Flow start.
"""
print(
f"{bcolors.OKBLUE}Testing FederatedFlow - Starting Test for accessibility of private "
+ f"attributes {bcolors.ENDC}"
)
self.collaborators = self.runtime.collaborators

validate_collab_private_attr(self, "test_loader", "start")

self.exclude_agg_to_agg = 10
self.include_agg_to_agg = 100
self.next(self.aggregator_step, exclude=["exclude_agg_to_agg"])

@aggregator
def aggregator_step(self):
"""
Testing whether Agg private attributes are accessible in next agg step.
Collab private attributes should not be accessible here
"""
validate_collab_private_attr(self, "test_loader", "aggregator_step")

self.include_agg_to_collab = 42
self.exclude_agg_to_collab = 40
self.next(
self.collaborator_step_a,
foreach="collaborators",
exclude=["exclude_agg_to_collab"],
)

@collaborator
def collaborator_step_a(self):
"""
Testing whether Collab private attributes are accessible in collab step
Aggregator private attributes should not be accessible here
"""
validate_agg_private_attrs(
self, "train_loader", "test_loader", "collaborator_step_a"
)

self.exclude_collab_to_collab = 2
self.include_collab_to_collab = 22
self.next(self.collaborator_step_b, exclude=["exclude_collab_to_collab"])

@collaborator
def collaborator_step_b(self):
"""
Testing whether Collab private attributes are accessible in collab step
Aggregator private attributes should not be accessible here
"""

validate_agg_private_attrs(
self, "train_loader", "test_loader", "collaborator_step_b"
)
self.exclude_collab_to_agg = 10
self.include_collab_to_agg = 12
self.next(self.join, exclude=["exclude_collab_to_agg"])

@aggregator
def join(self, inputs):
"""
Testing whether attributes are excluded from collab to agg
"""
# Aggregator should only be able to access its own attributes
if hasattr(self, "test_loader") is False:
TestFlowParticipantName.ERROR_LIST.append(
"aggregator_join_aggregator_attributes_missing"
)
print(
f"{bcolors.FAIL} ... Attribute test failed in join - aggregator private attributes"
+ f" not accessible {bcolors.ENDC}"
)

for input in enumerate(inputs):
collab = input[1].input
if (
hasattr(input, "train_loader") is True
or hasattr(input, "test_loader") is True
):
# Error - we are able to access collaborator attributes
TestFlowParticipantName.ERROR_LIST.append(
"join_collaborator_attributes_found"
)
print(
f"{bcolors.FAIL} ... Attribute test failed in Join - Collaborator: {collab}"
+ f" private attributes accessible {bcolors.ENDC}"
)

self.next(self.end)

@aggregator
def end(self):
"""
This is the 'end' step. All flows must have an 'end' step, which is the
last step in the flow.
"""
print(
f"{bcolors.OKBLUE}Testing FederatedFlow - Ending Test for accessibility of private "
+ f"attributes {bcolors.ENDC}"
)

if TestFlowParticipantName.ERROR_LIST:
raise (
AssertionError(
f"{bcolors.FAIL}\n ...Test case failed ... {bcolors.ENDC}"
)
)
else:
print(f"{bcolors.OKGREEN}\n ...Test case passed ... {bcolors.ENDC}")

TestFlowParticipantName.ERROR_LIST = []


def validate_collab_private_attr(self, private_attr, step_name):
# Aggregator should only be able to access its own attributes
if hasattr(self, private_attr) is False:
TestFlowParticipantName.ERROR_LIST.append(
step_name + "_aggregator_attributes_missing"
)
print(
f"{bcolors.FAIL} ...Failed in {step_name} - aggregator private attributes not "
+ f"accessible {bcolors.ENDC}"
)

for idx, collab in enumerate(self.collaborators):
# Collaborator private attributes should not be accessible
if (
type(self.collaborators[idx]) is not str
or hasattr(self.runtime, "_collaborators") is True
or hasattr(self.runtime, "__collaborators") is True
):
# Error - we are able to access collaborator attributes
TestFlowParticipantName.ERROR_LIST.append(
step_name + "_collaborator_attributes_found"
)
print(
f"{bcolors.FAIL} ... Attribute test failed in {step_name} - collaborator {collab} "
+ f"private attributes accessible {bcolors.ENDC}"
)


def validate_agg_private_attrs(self, private_attr_1, private_attr_2, step_name):
# Collaborator should only be able to access its own attributes
if not hasattr(self, private_attr_1) or not hasattr(self, private_attr_2):
TestFlowParticipantName.ERROR_LIST.append(
step_name + "collab_attributes_not_found"
)
print(
f"{bcolors.FAIL} ... Attribute test failed in {step_name} - Collab "
+ f"private attributes not accessible {bcolors.ENDC}"
)

if hasattr(self.runtime, "_aggregator") and isinstance(self.runtime._aggregator, Aggregator):
# Error - we are able to access aggregator attributes
TestFlowParticipantName.ERROR_LIST.append(
step_name + "_aggregator_attributes_found"
)
print(
f"{bcolors.FAIL} ... Attribute test failed in {step_name} - Aggregator"
+ f" private attributes accessible {bcolors.ENDC}"
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Copyright (C) 2020-2021 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

from torch.utils.tensorboard import SummaryWriter


writer = None


def get_writer():
"""Create global writer object."""
global writer
if not writer:
writer = SummaryWriter('./logs/cnn_mnist', flush_secs=5)


def write_metric(node_name, task_name, metric_name, metric, round_number):
"""Write metric callback."""
get_writer()
writer.add_scalar(f'{node_name}/{task_name}/{metric_name}', metric, round_number)

0 comments on commit 41b175e

Please sign in to comment.