From faec4e8e4f6559751f3e65d079954f040139fdab Mon Sep 17 00:00:00 2001 From: Alex Date: Mon, 5 Aug 2024 13:03:39 -0400 Subject: [PATCH] Implement pre-commit hooks (#2184) * Implement pre-commit hooks Signed-off-by: droctothorpe * Fix broken file links and update reqs Signed-off-by: droctothorpe * Add examples folder Signed-off-by: droctothorpe * Remove black instructions from developer guide Signed-off-by: droctothorpe --------- Signed-off-by: droctothorpe --- .github/workflows/pre-commit.yaml | 14 + .github/workflows/test-python.yaml | 9 - .pre-commit-config.yaml | 44 ++ docs/development/developer_guide.md | 49 +- docs/diagrams/tfjob_k8s_resources.svg | 1 - docs/release/changelog.py | 3 +- examples/paddlepaddle/simple-gpu.yaml | 1 - examples/pytorch/README.md | 8 +- examples/pytorch/elastic/echo/echo.yaml | 1 - examples/pytorch/elastic/etcd.yaml | 2 +- .../pytorch/elastic/imagenet/.dockerignore | 2 +- examples/pytorch/elastic/imagenet/imagenet.py | 14 +- .../Train-CNN-with-FashionMNIST.ipynb | 2 +- examples/pytorch/mnist/Makefile | 6 +- examples/pytorch/mnist/mnist.py | 5 +- .../mnist/v1/pytorch_job_mnist_gloo.yaml | 2 +- .../mnist/v1/pytorch_job_mnist_mpi.yaml | 8 +- .../mnist/v1/pytorch_job_mnist_nccl.yaml | 6 +- examples/pytorch/smoke-dist/README.md | 2 +- examples/pytorch/smoke-dist/dist_sendrecv.py | 22 +- examples/tensorflow/dist-mnist/README.md | 2 +- examples/tensorflow/dist-mnist/dist_mnist.py | 496 ++++++++++-------- .../estimator-API/keras_model_to_estimator.py | 92 ++-- .../distribution_strategy/keras-API/README.md | 10 +- .../multi_worker_strategy-with-keras.py | 249 ++++----- .../tensorflow/mnist_with_summaries/README.md | 2 +- .../mnist_with_summaries.py | 363 +++++++------ .../mnist_with_summaries/tf_job_mnist.yaml | 6 +- .../tfevent-volume/tfevent-pv.yaml | 2 +- .../tfevent-volume/tfevent-pvc.yaml | 2 +- examples/tensorflow/tf_sample/setup.py | 13 +- examples/tensorflow/tf_sample/tf_smoke.py | 240 ++++----- examples/xgboost/lightgbm-dist/README.md | 2 +- examples/xgboost/lightgbm-dist/main.py | 3 +- .../xgboostjob_v1_lightgbm_dist_training.yaml | 1 - examples/xgboost/smoke-dist/README.md | 3 - examples/xgboost/smoke-dist/tracker.py | 165 +++--- .../xgboost/smoke-dist/xgboost_smoke_test.py | 40 +- .../smoke-dist/xgboostjob_v1_rabit_test.yaml | 1 - .../xgboostjob_v1alpha1_rabit_test.yaml | 1 - examples/xgboost/xgboost-dist/README.md | 16 +- examples/xgboost/xgboost-dist/local_test.py | 21 +- examples/xgboost/xgboost-dist/main.py | 61 +-- examples/xgboost/xgboost-dist/predict.py | 6 +- examples/xgboost/xgboost-dist/tracker.py | 165 +++--- examples/xgboost/xgboost-dist/train.py | 32 +- examples/xgboost/xgboost-dist/utils.py | 78 +-- .../xgboostjob_v1_iris_predict.yaml | 2 - .../xgboostjob_v1_iris_train.yaml | 2 - hack/python-sdk/post_gen.py | 2 +- sdk/python/Dockerfile.conformance | 2 +- sdk/python/conformance/run.sh | 2 +- .../abstract_dataset_provider.py | 3 +- .../abstract_model_provider.py | 3 +- .../storage_initializer/hugging_face.py | 17 +- sdk/python/kubeflow/storage_initializer/s3.py | 4 +- .../kubeflow/storage_initializer/storage.py | 4 +- .../kubeflow/trainer/hf_llm_training.py | 25 +- .../kubeflow/training/api/training_client.py | 32 +- .../training/api/training_client_test.py | 18 +- .../kubeflow/training/constants/constants.py | 5 +- sdk/python/kubeflow/training/utils/utils.py | 13 +- test_job/README.md | 6 +- third_party/library/license.txt | 1 - 64 files changed, 1298 insertions(+), 1116 deletions(-) create mode 100644 .github/workflows/pre-commit.yaml create mode 100644 .pre-commit-config.yaml diff --git a/.github/workflows/pre-commit.yaml b/.github/workflows/pre-commit.yaml new file mode 100644 index 0000000000..2b11178bf9 --- /dev/null +++ b/.github/workflows/pre-commit.yaml @@ -0,0 +1,14 @@ +name: pre-commit + +on: + pull_request: + push: + branches: [main] + +jobs: + pre-commit: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - uses: actions/setup-python@v3 + - uses: pre-commit/action@v3.0.1 diff --git a/.github/workflows/test-python.yaml b/.github/workflows/test-python.yaml index 3d6faae951..9a706461b7 100644 --- a/.github/workflows/test-python.yaml +++ b/.github/workflows/test-python.yaml @@ -26,15 +26,6 @@ jobs: with: python-version: ${{ matrix.python-version }} - # TODO (andreyvelich): We need to replace this action with script to do - # linting and formatting for Training Operator SDK. - - name: Check Python code with Black - uses: psf/black@stable - with: - version: 24.2.0 - options: --check --exclude '/*kubeflow_org_v1*|__init__.py|api_client.py|configuration.py|exceptions.py|rest.py' - src: sdk/ - - name: Install dependencies run: | pip install pytest python-dateutil urllib3 kubernetes diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000000..bcbe75c067 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,44 @@ +repos: + - repo: https://github.com/pre-commit/pre-commit-hooks + rev: v2.3.0 + hooks: + - id: check-yaml + args: [--allow-multiple-documents] + - id: check-json + - id: end-of-file-fixer + - id: trailing-whitespace + - repo: https://github.com/pycqa/isort + rev: 5.11.5 + hooks: + - id: isort + name: isort + entry: isort --profile google + - repo: https://github.com/psf/black + rev: 24.2.0 + hooks: + - id: black + files: (sdk|examples)/.* + exclude: | + (?x)^( + /*kubeflow_org_v1*| + __init__.py| + api_client.py| + configuration.py| + exceptions.py| + rest.py + )$ +exclude: | + (?x)^( + pkg/apis/kubeflow.org/v1/openapi_generated.go| + pkg/apis/kubeflow.org/v1/zz_.*| + pkg/client/.*| + test_job/apis/test_job/v1/.*generated.*.go| + test_job/client/.*| + sdk/python/kubeflow/training/[^/]*.py| + sdk/python/kubeflow/training/models/.*| + sdk/python/test/.*| + docs/api/.*| + sdk/python/docs/.*| + sdk/python/.openapi-generator/VERSION| + sdk/python/kubeflow/__init__.py + )$ diff --git a/docs/development/developer_guide.md b/docs/development/developer_guide.md index 4b5808c6b7..03312643a9 100644 --- a/docs/development/developer_guide.md +++ b/docs/development/developer_guide.md @@ -5,7 +5,7 @@ Kubeflow Training Operator is currently at v1. ## Requirements - [Go](https://golang.org/) (1.22 or later) -- [Docker](https://docs.docker.com/) +- [Docker](https://docs.docker.com/) - [Docker](https://docs.docker.com/) (20.10 or later) - [Docker Buildx](https://docs.docker.com/build/buildx/) (0.8.0 or later) - [Python](https://www.python.org/) (3.11 or later) @@ -13,6 +13,7 @@ Kubeflow Training Operator is currently at v1. - [Kind](https://kind.sigs.k8s.io/) (0.22.0 or later) - [Lima](https://github.com/lima-vm/lima?tab=readme-ov-file#adopters) (an alternative to DockerDesktop) (0.21.0 or later) - [Colima](https://github.com/abiosoft/colima) (Lima specifically for MacOS) (0.6.8 or later) +- [pre-commit](https://pre-commit.com/) Note for Lima the link is to the Adopters, which supports several different container environments. @@ -49,12 +50,12 @@ First, you need to run a Kubernetes cluster locally. We recommend [Kind](https:/ You can create a `kind` cluster by running ```sh -kind create cluster +kind create cluster ``` -This will load your kubernetes config file with the new cluster. +This will load your kubernetes config file with the new cluster. -After creating the cluster, you can check the nodes with the code below which should show you the kind-control-plane. -```sh +After creating the cluster, you can check the nodes with the code below which should show you the kind-control-plane. +```sh kubectl get nodes ``` The output should look something like below: @@ -74,9 +75,9 @@ Then we can patch it with the latest operator image. ```sh kubectl patch -n kubeflow deployments training-operator --type json -p '[{"op": "replace", "path": "/spec/template/spec/containers/0/image", "value": "kubeflow/training-operator:latest"}]' ``` -Then we can run the job with the following command. +Then we can run the job with the following command. -```sh +```sh kubectl apply -f https://raw.githubusercontent.com/kubeflow/training-operator/master/examples/pytorch/simple.yaml ``` And we can see the output of the job from the logs, which may take some time to produce but should look something like below. @@ -116,10 +117,10 @@ make docker-build IMG=my-username/training-operator:my-pr-01 ``` You can swap `my-username/training-operator:my-pr-01` with whatever you would like. -## Load docker image +## Load docker image ```sh kind load docker-image my-username/training-operator:my-pr-01 -``` +``` ## Modify operator image with new one @@ -129,8 +130,8 @@ kustomize edit set image my-username/training-operator=my-username/training-oper ``` Update the `newTag` key in `./manifests/overlayes/standalone/kustimization.yaml` with the new image. -Deploy the operator with: -```sh +Deploy the operator with: +```sh kubectl apply -k ./manifests/overlays/standalone ``` And now we can submit jobs to the operator. @@ -140,7 +141,7 @@ kubectl apply -f https://raw.githubusercontent.com/kubeflow/training-operator/ma ``` You should be able to see a pod for your training operator running in your namespace using ``` -kubectl logs -n kubeflow -l training.kubeflow.org/job-name=pytorch-simple +kubectl logs -n kubeflow -l training.kubeflow.org/job-name=pytorch-simple ``` ## Go version @@ -172,18 +173,18 @@ sdk/python/kubeflow/training/api ## Code Style -### Python - -- Use [`black`](https://github.com/psf/black) to format Python code - -- Run the following to install `black`: +### pre-commit - ``` - pip install black==23.9.1 - ``` +Make sure to install [pre-commit](https://pre-commit.com/) (`pip install +pre-commit`) and run `pre-commit install` from the root of the repository at +least once before creating git commits. -- To check your code: +The pre-commit [hooks](../../.pre-commit-config.yaml) ensure code quality and +consistency. They are executed in CI. PRs that fail to comply with the hooks +will not be able to pass the corresponding CI gate. The hooks are only executed +against staged files unless you run `pre-commit run --all`, in which case, +they'll be executed against every file in the repository. - ```sh - black --check --exclude '/*kubeflow_org_v1*|__init__.py|api_client.py|configuration.py|exceptions.py|rest.py' sdk/ - ``` +Specific programmatically generated files listed in the `exclude` field in +[.pre-commit-config.yaml](../../.pre-commit-config.yaml) are deliberately +excluded from the hooks. diff --git a/docs/diagrams/tfjob_k8s_resources.svg b/docs/diagrams/tfjob_k8s_resources.svg index b0eda947bd..17dbe8a8cc 100644 --- a/docs/diagrams/tfjob_k8s_resources.svg +++ b/docs/diagrams/tfjob_k8s_resources.svg @@ -1,4 +1,3 @@ - diff --git a/docs/release/changelog.py b/docs/release/changelog.py index ac508d025f..b9eeeb7137 100644 --- a/docs/release/changelog.py +++ b/docs/release/changelog.py @@ -1,6 +1,7 @@ -from github import Github import argparse +from github import Github + REPO_NAME = "kubeflow/training-operator" CHANGELOG_FILE = "CHANGELOG.md" diff --git a/examples/paddlepaddle/simple-gpu.yaml b/examples/paddlepaddle/simple-gpu.yaml index a97191b05d..e726536bd7 100644 --- a/examples/paddlepaddle/simple-gpu.yaml +++ b/examples/paddlepaddle/simple-gpu.yaml @@ -33,4 +33,3 @@ spec: - name: dshm emptyDir: medium: Memory - diff --git a/examples/pytorch/README.md b/examples/pytorch/README.md index cd67e750f7..0db74af091 100644 --- a/examples/pytorch/README.md +++ b/examples/pytorch/README.md @@ -1,5 +1,5 @@ -## Installation & deployment tips -1. You need to configure your node to utilize GPU. This can be done the following way: +## Installation & deployment tips +1. You need to configure your node to utilize GPU. This can be done the following way: * Install [nvidia-docker2](https://github.com/NVIDIA/nvidia-docker) * Connect to your MasterNode and set nvidia as the default run in `/etc/docker/daemon.json`: ``` @@ -13,11 +13,11 @@ } } ``` - * After that deploy nvidia-daemon to kubernetes: + * After that deploy nvidia-daemon to kubernetes: ```bash kubectl create -f https://raw.githubusercontent.com/NVIDIA/k8s-device-plugin/v1.11/nvidia-device-plugin.yml ``` - + 2. NVIDIA GPUs can now be consumed via container level resource requirements using the resource name nvidia.com/gpu: ``` resources: diff --git a/examples/pytorch/elastic/echo/echo.yaml b/examples/pytorch/elastic/echo/echo.yaml index 6df8fe415f..ee51ec8418 100644 --- a/examples/pytorch/elastic/echo/echo.yaml +++ b/examples/pytorch/elastic/echo/echo.yaml @@ -26,4 +26,3 @@ spec: - torch.distributed.run - --rdzv_backend=c10d - ./echo.py - diff --git a/examples/pytorch/elastic/etcd.yaml b/examples/pytorch/elastic/etcd.yaml index a158187901..edb3bb1e9d 100644 --- a/examples/pytorch/elastic/etcd.yaml +++ b/examples/pytorch/elastic/etcd.yaml @@ -71,4 +71,4 @@ spec: protocol: TCP targetPort: 2380 selector: - etcd_node: etcd-server \ No newline at end of file + etcd_node: etcd-server diff --git a/examples/pytorch/elastic/imagenet/.dockerignore b/examples/pytorch/elastic/imagenet/.dockerignore index 6320cd248d..1269488f7f 100644 --- a/examples/pytorch/elastic/imagenet/.dockerignore +++ b/examples/pytorch/elastic/imagenet/.dockerignore @@ -1 +1 @@ -data \ No newline at end of file +data diff --git a/examples/pytorch/elastic/imagenet/imagenet.py b/examples/pytorch/elastic/imagenet/imagenet.py index dcab82c8ce..00faa73678 100644 --- a/examples/pytorch/elastic/imagenet/imagenet.py +++ b/examples/pytorch/elastic/imagenet/imagenet.py @@ -44,30 +44,30 @@ """ import argparse +from contextlib import contextmanager +from datetime import timedelta import io import os import shutil import time -from contextlib import contextmanager -from datetime import timedelta from typing import List, Tuple import numpy import torch import torch.distributed as dist +from torch.distributed.elastic.multiprocessing.errors import record +from torch.distributed.elastic.utils.data import ElasticDistributedSampler import torch.nn as nn import torch.nn.parallel +from torch.nn.parallel import DistributedDataParallel import torch.optim +from torch.optim import SGD import torch.utils.data +from torch.utils.data import DataLoader import torch.utils.data.distributed import torchvision.datasets as datasets import torchvision.models as models import torchvision.transforms as transforms -from torch.distributed.elastic.multiprocessing.errors import record -from torch.distributed.elastic.utils.data import ElasticDistributedSampler -from torch.nn.parallel import DistributedDataParallel -from torch.optim import SGD -from torch.utils.data import DataLoader model_names = sorted( name diff --git a/examples/pytorch/image-classification/Train-CNN-with-FashionMNIST.ipynb b/examples/pytorch/image-classification/Train-CNN-with-FashionMNIST.ipynb index 7c040e6a0d..7cb5e8fba9 100644 --- a/examples/pytorch/image-classification/Train-CNN-with-FashionMNIST.ipynb +++ b/examples/pytorch/image-classification/Train-CNN-with-FashionMNIST.ipynb @@ -696,4 +696,4 @@ }, "nbformat": 4, "nbformat_minor": 5 -} \ No newline at end of file +} diff --git a/examples/pytorch/mnist/Makefile b/examples/pytorch/mnist/Makefile index c724caf0fb..4d1ffe50f6 100644 --- a/examples/pytorch/mnist/Makefile +++ b/examples/pytorch/mnist/Makefile @@ -19,7 +19,7 @@ IMG = gcr.io/kubeflow-examples/pytorch-dist-mnist PUBLIC = gcr.io/kubeflow-examples/pytorch-dist-mnist DIR := ${CURDIR} -# List any changed files. +# List any changed files. CHANGED_FILES := $(shell git diff-files --relative=examples/dist-mnist) ifeq ($(strip $(CHANGED_FILES)),) @@ -43,9 +43,9 @@ build: # Build but don't attach the latest tag. This allows manual testing/inspection of the image # first. push: build - gcloud docker -- push $(IMG):$(TAG) + gcloud docker -- push $(IMG):$(TAG) @echo Pushed $(IMG) with :$(TAG) tags - + push-latest: push gcloud container images add-tag --quiet $(IMG):$(TAG) $(IMG):latest --verbosity=info echo created $(IMG):latest diff --git a/examples/pytorch/mnist/mnist.py b/examples/pytorch/mnist/mnist.py index 4ccd051999..3e8eed2ce2 100644 --- a/examples/pytorch/mnist/mnist.py +++ b/examples/pytorch/mnist/mnist.py @@ -3,14 +3,15 @@ import argparse import os +from tensorboardX import SummaryWriter import torch import torch.distributed as dist import torch.nn as nn import torch.nn.functional as F import torch.optim as optim -from tensorboardX import SummaryWriter from torch.utils.data import DistributedSampler -from torchvision import datasets, transforms +from torchvision import datasets +from torchvision import transforms class Net(nn.Module): diff --git a/examples/pytorch/mnist/v1/pytorch_job_mnist_gloo.yaml b/examples/pytorch/mnist/v1/pytorch_job_mnist_gloo.yaml index eddfa2d9cc..3e42a2685f 100644 --- a/examples/pytorch/mnist/v1/pytorch_job_mnist_gloo.yaml +++ b/examples/pytorch/mnist/v1/pytorch_job_mnist_gloo.yaml @@ -22,7 +22,7 @@ spec: restartPolicy: OnFailure template: spec: - containers: + containers: - name: pytorch image: kubeflow/pytorch-dist-mnist:latest args: ["--backend", "gloo"] diff --git a/examples/pytorch/mnist/v1/pytorch_job_mnist_mpi.yaml b/examples/pytorch/mnist/v1/pytorch_job_mnist_mpi.yaml index fdc090fc53..53b8da80ea 100644 --- a/examples/pytorch/mnist/v1/pytorch_job_mnist_mpi.yaml +++ b/examples/pytorch/mnist/v1/pytorch_job_mnist_mpi.yaml @@ -14,19 +14,19 @@ spec: image: kubeflow/pytorch-dist-mnist:latest args: ["--backend", "mpi"] # Comment out the below resources to use the CPU. - resources: + resources: limits: nvidia.com/gpu: 1 Worker: replicas: 1 - restartPolicy: OnFailure + restartPolicy: OnFailure template: spec: - containers: + containers: - name: pytorch image: kubeflow/pytorch-dist-mnist:latest args: ["--backend", "mpi"] # Comment out the below resources to use the CPU. - resources: + resources: limits: nvidia.com/gpu: 1 diff --git a/examples/pytorch/mnist/v1/pytorch_job_mnist_nccl.yaml b/examples/pytorch/mnist/v1/pytorch_job_mnist_nccl.yaml index e3b263902b..0807abe32f 100644 --- a/examples/pytorch/mnist/v1/pytorch_job_mnist_nccl.yaml +++ b/examples/pytorch/mnist/v1/pytorch_job_mnist_nccl.yaml @@ -13,7 +13,7 @@ spec: - name: pytorch image: kubeflow/pytorch-dist-mnist:latest args: ["--backend", "nccl"] - resources: + resources: limits: nvidia.com/gpu: 1 Worker: @@ -21,10 +21,10 @@ spec: restartPolicy: OnFailure template: spec: - containers: + containers: - name: pytorch image: kubeflow/pytorch-dist-mnist:latest args: ["--backend", "nccl"] - resources: + resources: limits: nvidia.com/gpu: 1 diff --git a/examples/pytorch/smoke-dist/README.md b/examples/pytorch/smoke-dist/README.md index 60d77ae4aa..ccfc1c928c 100644 --- a/examples/pytorch/smoke-dist/README.md +++ b/examples/pytorch/smoke-dist/README.md @@ -1,4 +1,4 @@ -### Distributed send/recv e2e test +### Distributed send/recv e2e test This folder containers Dockerfile and distributed send/recv test. diff --git a/examples/pytorch/smoke-dist/dist_sendrecv.py b/examples/pytorch/smoke-dist/dist_sendrecv.py index e4c7e35eae..287f38b284 100644 --- a/examples/pytorch/smoke-dist/dist_sendrecv.py +++ b/examples/pytorch/smoke-dist/dist_sendrecv.py @@ -6,11 +6,11 @@ def run(): - """ Simple Send/Recv for testing Master <--> Workers communication """ + """Simple Send/Recv for testing Master <--> Workers communication""" rank = dist.get_rank() size = dist.get_world_size() - inp = torch.randn(2,2) - result = torch.zeros(2,2) + inp = torch.randn(2, 2) + result = torch.zeros(2, 2) if rank == 0: # Send the input tensor to all workers for i in range(1, size): @@ -22,30 +22,32 @@ def run(): # Receive input tensor from master dist.recv(tensor=inp, src=0) # Elementwise tensor multiplication - result = torch.mul(inp,inp) + result = torch.mul(inp, inp) # Send the result tensor back to master dist.send(tensor=result, dst=0) -def init_processes(fn, backend='gloo'): - """ Initialize the distributed environment. """ + +def init_processes(fn, backend="gloo"): + """Initialize the distributed environment.""" dist.init_process_group(backend) fn() + def main(): logging.info("Torch version: %s", torch.__version__) - + port = os.environ.get("MASTER_PORT", "{}") logging.info("MASTER_PORT: %s", port) - + addr = os.environ.get("MASTER_ADDR", "{}") logging.info("MASTER_ADDR: %s", addr) world_size = os.environ.get("WORLD_SIZE", "{}") logging.info("WORLD_SIZE: %s", world_size) - + rank = os.environ.get("RANK", "{}") logging.info("RANK: %s", rank) - + init_processes(run) diff --git a/examples/tensorflow/dist-mnist/README.md b/examples/tensorflow/dist-mnist/README.md index 80e37de649..4d3f842850 100644 --- a/examples/tensorflow/dist-mnist/README.md +++ b/examples/tensorflow/dist-mnist/README.md @@ -20,4 +20,4 @@ docker build -f Dockerfile.ppc64le -t kubeflow123/tf-dist-mnist-test:1.0 ./ ``` kubectl create -f ./tf_job_mnist.yaml ``` - * If on ppc64le, please update tf_job_mnist.yaml to use the image of ppc64le firstly. \ No newline at end of file + * If on ppc64le, please update tf_job_mnist.yaml to use the image of ppc64le firstly. diff --git a/examples/tensorflow/dist-mnist/dist_mnist.py b/examples/tensorflow/dist-mnist/dist_mnist.py index 95a8524230..3ff985b7d9 100755 --- a/examples/tensorflow/dist-mnist/dist_mnist.py +++ b/examples/tensorflow/dist-mnist/dist_mnist.py @@ -46,41 +46,64 @@ from tensorflow.examples.tutorials.mnist import input_data flags = tf.app.flags -flags.DEFINE_string("data_dir", "/tmp/mnist-data", - "Directory for storing mnist data") -flags.DEFINE_boolean("download_only", False, - "Only perform downloading of data; Do not proceed to " - "session preparation, model definition or training") -flags.DEFINE_integer("task_index", None, - "Worker task index, should be >= 0. task_index=0 is " - "the master worker task the performs the variable " - "initialization ") -flags.DEFINE_integer("num_gpus", 1, "Total number of gpus for each machine." - "If you don't use GPU, please set it to '0'") -flags.DEFINE_integer("replicas_to_aggregate", None, - "Number of replicas to aggregate before parameter update" - "is applied (For sync_replicas mode only; default: " - "num_workers)") -flags.DEFINE_integer("hidden_units", 100, - "Number of units in the hidden layer of the NN") -flags.DEFINE_integer("train_steps", 20000, - "Number of (global) training steps to perform") +flags.DEFINE_string("data_dir", "/tmp/mnist-data", "Directory for storing mnist data") +flags.DEFINE_boolean( + "download_only", + False, + "Only perform downloading of data; Do not proceed to " + "session preparation, model definition or training", +) +flags.DEFINE_integer( + "task_index", + None, + "Worker task index, should be >= 0. task_index=0 is " + "the master worker task the performs the variable " + "initialization ", +) +flags.DEFINE_integer( + "num_gpus", + 1, + "Total number of gpus for each machine." + "If you don't use GPU, please set it to '0'", +) +flags.DEFINE_integer( + "replicas_to_aggregate", + None, + "Number of replicas to aggregate before parameter update" + "is applied (For sync_replicas mode only; default: " + "num_workers)", +) +flags.DEFINE_integer( + "hidden_units", 100, "Number of units in the hidden layer of the NN" +) +flags.DEFINE_integer( + "train_steps", 20000, "Number of (global) training steps to perform" +) flags.DEFINE_integer("batch_size", 100, "Training batch size") flags.DEFINE_float("learning_rate", 0.01, "Learning rate") flags.DEFINE_boolean( - "sync_replicas", False, + "sync_replicas", + False, "Use the sync_replicas (synchronized replicas) mode, " "wherein the parameter updates from workers are aggregated " - "before applied to avoid stale gradients") + "before applied to avoid stale gradients", +) flags.DEFINE_boolean( - "existing_servers", False, "Whether servers already exists. If True, " + "existing_servers", + False, + "Whether servers already exists. If True, " "will use the worker hosts via their GRPC URLs (one client process " "per worker host). Otherwise, will create an in-process TensorFlow " - "server.") -flags.DEFINE_string("ps_hosts", "localhost:2222", - "Comma-separated list of hostname:port pairs") -flags.DEFINE_string("worker_hosts", "localhost:2223,localhost:2224", - "Comma-separated list of hostname:port pairs") + "server.", +) +flags.DEFINE_string( + "ps_hosts", "localhost:2222", "Comma-separated list of hostname:port pairs" +) +flags.DEFINE_string( + "worker_hosts", + "localhost:2223,localhost:2224", + "Comma-separated list of hostname:port pairs", +) flags.DEFINE_string("job_name", None, "job name: worker or ps") FLAGS = flags.FLAGS @@ -94,210 +117,223 @@ # {'cluster': cluster, # 'task': {'type': 'worker', 'index': 1}}) + def main(unused_argv): - # Parse environment variable TF_CONFIG to get job_name and task_index - - # If not explicitly specified in the constructor and the TF_CONFIG - # environment variable is present, load cluster_spec from TF_CONFIG. - tf_config = json.loads(os.environ.get('TF_CONFIG') or '{}') - task_config = tf_config.get('task', {}) - task_type = task_config.get('type') - task_index = task_config.get('index') - - FLAGS.job_name = task_type - FLAGS.task_index = task_index - - mnist = input_data.read_data_sets(FLAGS.data_dir, one_hot=True) - if FLAGS.download_only: - sys.exit(0) - - if FLAGS.job_name is None or FLAGS.job_name == "": - raise ValueError("Must specify an explicit `job_name`") - if FLAGS.task_index is None or FLAGS.task_index == "": - raise ValueError("Must specify an explicit `task_index`") - - print("job name = %s" % FLAGS.job_name) - print("task index = %d" % FLAGS.task_index) - - cluster_config = tf_config.get('cluster', {}) - ps_hosts = cluster_config.get('ps') - worker_hosts = cluster_config.get('worker') - - ps_hosts_str = ','.join(ps_hosts) - worker_hosts_str = ','.join(worker_hosts) - - FLAGS.ps_hosts = ps_hosts_str - FLAGS.worker_hosts = worker_hosts_str - - # Construct the cluster and start the server - ps_spec = FLAGS.ps_hosts.split(",") - worker_spec = FLAGS.worker_hosts.split(",") - - # Get the number of workers. - num_workers = len(worker_spec) - - cluster = tf.train.ClusterSpec({"ps": ps_spec, "worker": worker_spec}) - - if not FLAGS.existing_servers: - # Not using existing servers. Create an in-process server. - server = tf.train.Server( - cluster, job_name=FLAGS.job_name, task_index=FLAGS.task_index) - if FLAGS.job_name == "ps": - server.join() - - is_chief = (FLAGS.task_index == 0) - if FLAGS.num_gpus > 0: - # Avoid gpu allocation conflict: now allocate task_num -> #gpu - # for each worker in the corresponding machine - gpu = (FLAGS.task_index % FLAGS.num_gpus) - worker_device = "/job:worker/task:%d/gpu:%d" % (FLAGS.task_index, gpu) - elif FLAGS.num_gpus == 0: - # Just allocate the CPU to worker server - cpu = 0 - worker_device = "/job:worker/task:%d/cpu:%d" % (FLAGS.task_index, cpu) - # The device setter will automatically place Variables ops on separate - # parameter servers (ps). The non-Variable ops will be placed on the workers. - # The ps use CPU and workers use corresponding GPU - with tf.device( - tf.train.replica_device_setter( - worker_device=worker_device, - ps_device="/job:ps/cpu:0", - cluster=cluster)): - global_step = tf.Variable(0, name="global_step", trainable=False) - - # Variables of the hidden layer - hid_w = tf.Variable( - tf.truncated_normal( - [IMAGE_PIXELS * IMAGE_PIXELS, FLAGS.hidden_units], - stddev=1.0 / IMAGE_PIXELS), - name="hid_w") - hid_b = tf.Variable(tf.zeros([FLAGS.hidden_units]), name="hid_b") - - # Variables of the softmax layer - sm_w = tf.Variable( - tf.truncated_normal( - [FLAGS.hidden_units, 10], - stddev=1.0 / math.sqrt(FLAGS.hidden_units)), - name="sm_w") - sm_b = tf.Variable(tf.zeros([10]), name="sm_b") - - # Ops: located on the worker specified with FLAGS.task_index - x = tf.placeholder(tf.float32, [None, IMAGE_PIXELS * IMAGE_PIXELS]) - y_ = tf.placeholder(tf.float32, [None, 10]) - - hid_lin = tf.nn.xw_plus_b(x, hid_w, hid_b) - hid = tf.nn.relu(hid_lin) - - y = tf.nn.softmax(tf.nn.xw_plus_b(hid, sm_w, sm_b)) - cross_entropy = -tf.reduce_sum(y_ * tf.log(tf.clip_by_value(y, 1e-10, 1.0))) - - opt = tf.train.AdamOptimizer(FLAGS.learning_rate) - - if FLAGS.sync_replicas: - if FLAGS.replicas_to_aggregate is None: - replicas_to_aggregate = num_workers - else: - replicas_to_aggregate = FLAGS.replicas_to_aggregate - - opt = tf.train.SyncReplicasOptimizer( - opt, - replicas_to_aggregate=replicas_to_aggregate, - total_num_replicas=num_workers, - name="mnist_sync_replicas") - - train_step = opt.minimize(cross_entropy, global_step=global_step) - - if FLAGS.sync_replicas: - local_init_op = opt.local_step_init_op - if is_chief: - local_init_op = opt.chief_init_op - - ready_for_local_init_op = opt.ready_for_local_init_op - - # Initial token and chief queue runners required by the sync_replicas mode - chief_queue_runner = opt.get_chief_queue_runner() - sync_init_op = opt.get_init_tokens_op() - - init_op = tf.global_variables_initializer() - train_dir = tempfile.mkdtemp() - - if FLAGS.sync_replicas: - sv = tf.train.Supervisor( - is_chief=is_chief, - logdir=train_dir, - init_op=init_op, - local_init_op=local_init_op, - ready_for_local_init_op=ready_for_local_init_op, - recovery_wait_secs=1, - global_step=global_step) - else: - sv = tf.train.Supervisor( - is_chief=is_chief, - logdir=train_dir, - init_op=init_op, - recovery_wait_secs=1, - global_step=global_step) - - sess_config = tf.ConfigProto( - allow_soft_placement=True, - log_device_placement=False, - device_filters=["/job:ps", - "/job:worker/task:%d" % FLAGS.task_index]) - - # The chief worker (task_index==0) session will prepare the session, - # while the remaining workers will wait for the preparation to complete. - if is_chief: - print("Worker %d: Initializing session..." % FLAGS.task_index) - else: - print("Worker %d: Waiting for session to be initialized..." % - FLAGS.task_index) - - if FLAGS.existing_servers: - server_grpc_url = "grpc://" + worker_spec[FLAGS.task_index] - print("Using existing server at: %s" % server_grpc_url) - - sess = sv.prepare_or_wait_for_session(server_grpc_url, config=sess_config) - else: - sess = sv.prepare_or_wait_for_session(server.target, config=sess_config) - - print("Worker %d: Session initialization complete." % FLAGS.task_index) - - if FLAGS.sync_replicas and is_chief: - # Chief worker will start the chief queue runner and call the init op. - sess.run(sync_init_op) - sv.start_queue_runners(sess, [chief_queue_runner]) - - # Perform training - time_begin = time.time() - print("Training begins @ %f" % time_begin) - - local_step = 0 - while True: - # Training feed - batch_xs, batch_ys = mnist.train.next_batch(FLAGS.batch_size) - train_feed = {x: batch_xs, y_: batch_ys} - - _, step = sess.run([train_step, global_step], feed_dict=train_feed) - local_step += 1 - - now = time.time() - print("%f: Worker %d: training step %d done (global step: %d)" % - (now, FLAGS.task_index, local_step, step)) - - if step >= FLAGS.train_steps: - break - - time_end = time.time() - print("Training ends @ %f" % time_end) - training_time = time_end - time_begin - print("Training elapsed time: %f s" % training_time) - - # Validation feed - val_feed = {x: mnist.validation.images, y_: mnist.validation.labels} - val_xent = sess.run(cross_entropy, feed_dict=val_feed) - print("After %d training step(s), validation cross entropy = %g" % - (FLAGS.train_steps, val_xent)) + # Parse environment variable TF_CONFIG to get job_name and task_index + + # If not explicitly specified in the constructor and the TF_CONFIG + # environment variable is present, load cluster_spec from TF_CONFIG. + tf_config = json.loads(os.environ.get("TF_CONFIG") or "{}") + task_config = tf_config.get("task", {}) + task_type = task_config.get("type") + task_index = task_config.get("index") + + FLAGS.job_name = task_type + FLAGS.task_index = task_index + + mnist = input_data.read_data_sets(FLAGS.data_dir, one_hot=True) + if FLAGS.download_only: + sys.exit(0) + + if FLAGS.job_name is None or FLAGS.job_name == "": + raise ValueError("Must specify an explicit `job_name`") + if FLAGS.task_index is None or FLAGS.task_index == "": + raise ValueError("Must specify an explicit `task_index`") + + print("job name = %s" % FLAGS.job_name) + print("task index = %d" % FLAGS.task_index) + + cluster_config = tf_config.get("cluster", {}) + ps_hosts = cluster_config.get("ps") + worker_hosts = cluster_config.get("worker") + + ps_hosts_str = ",".join(ps_hosts) + worker_hosts_str = ",".join(worker_hosts) + + FLAGS.ps_hosts = ps_hosts_str + FLAGS.worker_hosts = worker_hosts_str + + # Construct the cluster and start the server + ps_spec = FLAGS.ps_hosts.split(",") + worker_spec = FLAGS.worker_hosts.split(",") + + # Get the number of workers. + num_workers = len(worker_spec) + + cluster = tf.train.ClusterSpec({"ps": ps_spec, "worker": worker_spec}) + + if not FLAGS.existing_servers: + # Not using existing servers. Create an in-process server. + server = tf.train.Server( + cluster, job_name=FLAGS.job_name, task_index=FLAGS.task_index + ) + if FLAGS.job_name == "ps": + server.join() + + is_chief = FLAGS.task_index == 0 + if FLAGS.num_gpus > 0: + # Avoid gpu allocation conflict: now allocate task_num -> #gpu + # for each worker in the corresponding machine + gpu = FLAGS.task_index % FLAGS.num_gpus + worker_device = "/job:worker/task:%d/gpu:%d" % (FLAGS.task_index, gpu) + elif FLAGS.num_gpus == 0: + # Just allocate the CPU to worker server + cpu = 0 + worker_device = "/job:worker/task:%d/cpu:%d" % (FLAGS.task_index, cpu) + # The device setter will automatically place Variables ops on separate + # parameter servers (ps). The non-Variable ops will be placed on the workers. + # The ps use CPU and workers use corresponding GPU + with tf.device( + tf.train.replica_device_setter( + worker_device=worker_device, ps_device="/job:ps/cpu:0", cluster=cluster + ) + ): + global_step = tf.Variable(0, name="global_step", trainable=False) + + # Variables of the hidden layer + hid_w = tf.Variable( + tf.truncated_normal( + [IMAGE_PIXELS * IMAGE_PIXELS, FLAGS.hidden_units], + stddev=1.0 / IMAGE_PIXELS, + ), + name="hid_w", + ) + hid_b = tf.Variable(tf.zeros([FLAGS.hidden_units]), name="hid_b") + + # Variables of the softmax layer + sm_w = tf.Variable( + tf.truncated_normal( + [FLAGS.hidden_units, 10], stddev=1.0 / math.sqrt(FLAGS.hidden_units) + ), + name="sm_w", + ) + sm_b = tf.Variable(tf.zeros([10]), name="sm_b") + + # Ops: located on the worker specified with FLAGS.task_index + x = tf.placeholder(tf.float32, [None, IMAGE_PIXELS * IMAGE_PIXELS]) + y_ = tf.placeholder(tf.float32, [None, 10]) + + hid_lin = tf.nn.xw_plus_b(x, hid_w, hid_b) + hid = tf.nn.relu(hid_lin) + + y = tf.nn.softmax(tf.nn.xw_plus_b(hid, sm_w, sm_b)) + cross_entropy = -tf.reduce_sum(y_ * tf.log(tf.clip_by_value(y, 1e-10, 1.0))) + + opt = tf.train.AdamOptimizer(FLAGS.learning_rate) + + if FLAGS.sync_replicas: + if FLAGS.replicas_to_aggregate is None: + replicas_to_aggregate = num_workers + else: + replicas_to_aggregate = FLAGS.replicas_to_aggregate + + opt = tf.train.SyncReplicasOptimizer( + opt, + replicas_to_aggregate=replicas_to_aggregate, + total_num_replicas=num_workers, + name="mnist_sync_replicas", + ) + + train_step = opt.minimize(cross_entropy, global_step=global_step) + + if FLAGS.sync_replicas: + local_init_op = opt.local_step_init_op + if is_chief: + local_init_op = opt.chief_init_op + + ready_for_local_init_op = opt.ready_for_local_init_op + + # Initial token and chief queue runners required by the sync_replicas mode + chief_queue_runner = opt.get_chief_queue_runner() + sync_init_op = opt.get_init_tokens_op() + + init_op = tf.global_variables_initializer() + train_dir = tempfile.mkdtemp() + + if FLAGS.sync_replicas: + sv = tf.train.Supervisor( + is_chief=is_chief, + logdir=train_dir, + init_op=init_op, + local_init_op=local_init_op, + ready_for_local_init_op=ready_for_local_init_op, + recovery_wait_secs=1, + global_step=global_step, + ) + else: + sv = tf.train.Supervisor( + is_chief=is_chief, + logdir=train_dir, + init_op=init_op, + recovery_wait_secs=1, + global_step=global_step, + ) + + sess_config = tf.ConfigProto( + allow_soft_placement=True, + log_device_placement=False, + device_filters=["/job:ps", "/job:worker/task:%d" % FLAGS.task_index], + ) + + # The chief worker (task_index==0) session will prepare the session, + # while the remaining workers will wait for the preparation to complete. + if is_chief: + print("Worker %d: Initializing session..." % FLAGS.task_index) + else: + print( + "Worker %d: Waiting for session to be initialized..." % FLAGS.task_index + ) + + if FLAGS.existing_servers: + server_grpc_url = "grpc://" + worker_spec[FLAGS.task_index] + print("Using existing server at: %s" % server_grpc_url) + + sess = sv.prepare_or_wait_for_session(server_grpc_url, config=sess_config) + else: + sess = sv.prepare_or_wait_for_session(server.target, config=sess_config) + + print("Worker %d: Session initialization complete." % FLAGS.task_index) + + if FLAGS.sync_replicas and is_chief: + # Chief worker will start the chief queue runner and call the init op. + sess.run(sync_init_op) + sv.start_queue_runners(sess, [chief_queue_runner]) + + # Perform training + time_begin = time.time() + print("Training begins @ %f" % time_begin) + + local_step = 0 + while True: + # Training feed + batch_xs, batch_ys = mnist.train.next_batch(FLAGS.batch_size) + train_feed = {x: batch_xs, y_: batch_ys} + + _, step = sess.run([train_step, global_step], feed_dict=train_feed) + local_step += 1 + + now = time.time() + print( + "%f: Worker %d: training step %d done (global step: %d)" + % (now, FLAGS.task_index, local_step, step) + ) + + if step >= FLAGS.train_steps: + break + + time_end = time.time() + print("Training ends @ %f" % time_end) + training_time = time_end - time_begin + print("Training elapsed time: %f s" % training_time) + + # Validation feed + val_feed = {x: mnist.validation.images, y_: mnist.validation.labels} + val_xent = sess.run(cross_entropy, feed_dict=val_feed) + print( + "After %d training step(s), validation cross entropy = %g" + % (FLAGS.train_steps, val_xent) + ) if __name__ == "__main__": - tf.app.run() + tf.app.run() diff --git a/examples/tensorflow/distribution_strategy/estimator-API/keras_model_to_estimator.py b/examples/tensorflow/distribution_strategy/estimator-API/keras_model_to_estimator.py index 923fc99fb4..1f3deab561 100644 --- a/examples/tensorflow/distribution_strategy/estimator-API/keras_model_to_estimator.py +++ b/examples/tensorflow/distribution_strategy/estimator-API/keras_model_to_estimator.py @@ -24,55 +24,63 @@ def input_fn(): - x = np.random.random((1024, 10)) - y = np.random.randint(2, size=(1024, 1)) - x = tf.cast(x, tf.float32) - dataset = tf.data.Dataset.from_tensor_slices((x, y)) - dataset = dataset.repeat(100) - dataset = dataset.batch(32) - return dataset + x = np.random.random((1024, 10)) + y = np.random.randint(2, size=(1024, 1)) + x = tf.cast(x, tf.float32) + dataset = tf.data.Dataset.from_tensor_slices((x, y)) + dataset = dataset.repeat(100) + dataset = dataset.batch(32) + return dataset def main(args): - if len(args) < 2: - print('You must specify model_dir for checkpoints such as' - ' /tmp/tfkeras_example/.') - return + if len(args) < 2: + print( + "You must specify model_dir for checkpoints such as" + " /tmp/tfkeras_example/." + ) + return - model_dir = args[1] - print('Using %s to store checkpoints.' % model_dir) + model_dir = args[1] + print("Using %s to store checkpoints." % model_dir) - # Define a Keras Model. - model = tf.keras.Sequential() - model.add(tf.keras.layers.Dense(16, activation='relu', input_shape=(10,))) - model.add(tf.keras.layers.Dense(1, activation='sigmoid')) + # Define a Keras Model. + model = tf.keras.Sequential() + model.add(tf.keras.layers.Dense(16, activation="relu", input_shape=(10,))) + model.add(tf.keras.layers.Dense(1, activation="sigmoid")) - # Compile the model. - optimizer = tf.train.GradientDescentOptimizer(0.2) - model.compile(loss='binary_crossentropy', optimizer=optimizer) - model.summary() - tf.keras.backend.set_learning_phase(True) + # Compile the model. + optimizer = tf.train.GradientDescentOptimizer(0.2) + model.compile(loss="binary_crossentropy", optimizer=optimizer) + model.summary() + tf.keras.backend.set_learning_phase(True) - # Define DistributionStrategies and convert the Keras Model to an - # Estimator that utilizes these DistributionStrateges. - # Evaluator is a single worker, so using MirroredStrategy. - config = tf.estimator.RunConfig( - experimental_distribute=tf.contrib.distribute.DistributeConfig( - train_distribute=tf.contrib.distribute.CollectiveAllReduceStrategy( - num_gpus_per_worker=0), - eval_distribute=tf.contrib.distribute.MirroredStrategy( - num_gpus_per_worker=0))) - keras_estimator = tf.keras.estimator.model_to_estimator( - keras_model=model, config=config, model_dir=model_dir) + # Define DistributionStrategies and convert the Keras Model to an + # Estimator that utilizes these DistributionStrateges. + # Evaluator is a single worker, so using MirroredStrategy. + config = tf.estimator.RunConfig( + experimental_distribute=tf.contrib.distribute.DistributeConfig( + train_distribute=tf.contrib.distribute.CollectiveAllReduceStrategy( + num_gpus_per_worker=0 + ), + eval_distribute=tf.contrib.distribute.MirroredStrategy( + num_gpus_per_worker=0 + ), + ) + ) + keras_estimator = tf.keras.estimator.model_to_estimator( + keras_model=model, config=config, model_dir=model_dir + ) - # Train and evaluate the model. Evaluation will be skipped if there is not an - # "evaluator" job in the cluster. - tf.estimator.train_and_evaluate( - keras_estimator, - train_spec=tf.estimator.TrainSpec(input_fn=input_fn), - eval_spec=tf.estimator.EvalSpec(input_fn=input_fn)) + # Train and evaluate the model. Evaluation will be skipped if there is not an + # "evaluator" job in the cluster. + tf.estimator.train_and_evaluate( + keras_estimator, + train_spec=tf.estimator.TrainSpec(input_fn=input_fn), + eval_spec=tf.estimator.EvalSpec(input_fn=input_fn), + ) -if __name__ == '__main__': - tf.logging.set_verbosity(tf.logging.INFO) - tf.app.run(argv=sys.argv) +if __name__ == "__main__": + tf.logging.set_verbosity(tf.logging.INFO) + tf.app.run(argv=sys.argv) diff --git a/examples/tensorflow/distribution_strategy/keras-API/README.md b/examples/tensorflow/distribution_strategy/keras-API/README.md index 70b58421e0..25d1ddbf09 100644 --- a/examples/tensorflow/distribution_strategy/keras-API/README.md +++ b/examples/tensorflow/distribution_strategy/keras-API/README.md @@ -1,12 +1,12 @@ # Multi-worker training with Keras -This directory contains a example for running multi-worker distributed training -using Tensorflow 2.1 keras API on Kubeflow. For more information about the +This directory contains a example for running multi-worker distributed training +using Tensorflow 2.1 keras API on Kubeflow. For more information about the source code, please see TensorFlow tutorials [here](https://www.tensorflow.org/tutorials/distribute/keras) and [here](https://www.tensorflow.org/tutorials/distribute/multi_worker_with_keras) ## Prerequisite -Your cluster must be configured to use Multiple GPUs, +Your cluster must be configured to use Multiple GPUs, please follow the [instructions](https://www.kubeflow.org/docs/components/training/tftraining/#using-gpus) ## Steps @@ -16,13 +16,13 @@ please follow the [instructions](https://www.kubeflow.org/docs/components/traini docker build -f Dockerfile -t kubeflow/multi_worker_strategy:v1.0 . ``` -2. Specify your storageClassName and create a persistent volume claim to save +2. Specify your storageClassName and create a persistent volume claim to save models and checkpoints ``` kubectl -n ${NAMESPACE} create -f pvc.yaml ``` -3. Create a TFJob, if you use some GPUs other than NVIDIA, please replace +3. Create a TFJob, if you use some GPUs other than NVIDIA, please replace `nvidia.com/gpu` with your GPU vendor in the `limits` section. ``` kubectl -n ${NAMESPACE} create -f multi_worker_tfjob.yaml diff --git a/examples/tensorflow/distribution_strategy/keras-API/multi_worker_strategy-with-keras.py b/examples/tensorflow/distribution_strategy/keras-API/multi_worker_strategy-with-keras.py index 9a1b9a71cb..6a71fa828e 100644 --- a/examples/tensorflow/distribution_strategy/keras-API/multi_worker_strategy-with-keras.py +++ b/examples/tensorflow/distribution_strategy/keras-API/multi_worker_strategy-with-keras.py @@ -14,147 +14,158 @@ # ============================================================================== """An example of multi-worker training with Keras model using Strategy API.""" -from __future__ import absolute_import, division, print_function +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function import argparse import json import os -import tensorflow_datasets as tfds import tensorflow as tf -from tensorflow.keras import layers, models +from tensorflow.keras import layers +from tensorflow.keras import models +import tensorflow_datasets as tfds def make_datasets_unbatched(): - BUFFER_SIZE = 10000 + BUFFER_SIZE = 10000 - # Scaling MNIST data from (0, 255] to (0., 1.] - def scale(image, label): - image = tf.cast(image, tf.float32) - image /= 255 - return image, label + # Scaling MNIST data from (0, 255] to (0., 1.] + def scale(image, label): + image = tf.cast(image, tf.float32) + image /= 255 + return image, label - datasets, _ = tfds.load(name='mnist', with_info=True, as_supervised=True) + datasets, _ = tfds.load(name="mnist", with_info=True, as_supervised=True) - return datasets['train'].map(scale).cache().shuffle(BUFFER_SIZE) + return datasets["train"].map(scale).cache().shuffle(BUFFER_SIZE) def build_and_compile_cnn_model(): - model = models.Sequential() - model.add( - layers.Conv2D(32, (3, 3), activation='relu', input_shape=(28, 28, 1))) - model.add(layers.MaxPooling2D((2, 2))) - model.add(layers.Conv2D(64, (3, 3), activation='relu')) - model.add(layers.MaxPooling2D((2, 2))) - model.add(layers.Conv2D(64, (3, 3), activation='relu')) - model.add(layers.Flatten()) - model.add(layers.Dense(64, activation='relu')) - model.add(layers.Dense(10, activation='softmax')) + model = models.Sequential() + model.add(layers.Conv2D(32, (3, 3), activation="relu", input_shape=(28, 28, 1))) + model.add(layers.MaxPooling2D((2, 2))) + model.add(layers.Conv2D(64, (3, 3), activation="relu")) + model.add(layers.MaxPooling2D((2, 2))) + model.add(layers.Conv2D(64, (3, 3), activation="relu")) + model.add(layers.Flatten()) + model.add(layers.Dense(64, activation="relu")) + model.add(layers.Dense(10, activation="softmax")) - model.summary() + model.summary() - model.compile(optimizer='adam', - loss='sparse_categorical_crossentropy', - metrics=['accuracy']) + model.compile( + optimizer="adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"] + ) - return model + return model def decay(epoch): - if epoch < 3: #pylint: disable=no-else-return - return 1e-3 - if 3 <= epoch < 7: - return 1e-4 - return 1e-5 + if epoch < 3: # pylint: disable=no-else-return + return 1e-3 + if 3 <= epoch < 7: + return 1e-4 + return 1e-5 def main(args): - # MultiWorkerMirroredStrategy creates copies of all variables in the model's - # layers on each device across all workers - # if your GPUs don't support NCCL, replace "communication" with another - strategy = tf.distribute.MultiWorkerMirroredStrategy( - communication_options=tf.distribute.experimental.CommunicationOptions(implementation=tf.distribute.experimental.CollectiveCommunication.AUTO)) - - BATCH_SIZE_PER_REPLICA = 64 - BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync - - with strategy.scope(): - ds_train = make_datasets_unbatched().batch(BATCH_SIZE).repeat() - options = tf.data.Options() - options.experimental_distribute.auto_shard_policy = \ - tf.data.experimental.AutoShardPolicy.DATA - ds_train = ds_train.with_options(options) - # Model building/compiling need to be within `strategy.scope()`. - multi_worker_model = build_and_compile_cnn_model() - - # Define the checkpoint directory to store the checkpoints - checkpoint_dir = args.checkpoint_dir - - # Name of the checkpoint files - checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt_{epoch}") - - # Function for decaying the learning rate. - # You can define any decay function you need. - # Callback for printing the LR at the end of each epoch. - class PrintLR(tf.keras.callbacks.Callback): - - def on_epoch_end(self, epoch, logs=None): #pylint: disable=no-self-use - print('\nLearning rate for epoch {} is {}'.format( - epoch + 1, multi_worker_model.optimizer.lr.numpy())) - - callbacks = [ - tf.keras.callbacks.TensorBoard(log_dir='./logs'), - tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_prefix, - save_weights_only=True), - tf.keras.callbacks.LearningRateScheduler(decay), - PrintLR() - ] - - # Keras' `model.fit()` trains the model with specified number of epochs and - # number of steps per epoch. Note that the numbers here are for demonstration - # purposes only and may not sufficiently produce a model with good quality. - multi_worker_model.fit(ds_train, - epochs=10, - steps_per_epoch=70, - callbacks=callbacks) - - # Saving a model - # Let `is_chief` be a utility function that inspects the cluster spec and - # current task type and returns True if the worker is the chief and False - # otherwise. - def is_chief(): - return TASK_INDEX == 0 - - if is_chief(): - model_path = args.saved_model_dir - - else: - # Save to a path that is unique across workers. - model_path = args.saved_model_dir + '/worker_tmp_' + str(TASK_INDEX) - - multi_worker_model.save(model_path) - - -if __name__ == '__main__': - os.environ['NCCL_DEBUG'] = 'INFO' - - tfds.disable_progress_bar() - - # to decide if a worker is chief, get TASK_INDEX in Cluster info - tf_config = json.loads(os.environ.get('TF_CONFIG') or '{}') - TASK_INDEX = tf_config['task']['index'] - - parser = argparse.ArgumentParser() - parser.add_argument('--saved_model_dir', - type=str, - required=True, - help='Tensorflow export directory.') - - parser.add_argument('--checkpoint_dir', - type=str, - required=True, - help='Tensorflow checkpoint directory.') - - parsed_args = parser.parse_args() - main(parsed_args) + # MultiWorkerMirroredStrategy creates copies of all variables in the model's + # layers on each device across all workers + # if your GPUs don't support NCCL, replace "communication" with another + strategy = tf.distribute.MultiWorkerMirroredStrategy( + communication_options=tf.distribute.experimental.CommunicationOptions( + implementation=tf.distribute.experimental.CollectiveCommunication.AUTO + ) + ) + + BATCH_SIZE_PER_REPLICA = 64 + BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync + + with strategy.scope(): + ds_train = make_datasets_unbatched().batch(BATCH_SIZE).repeat() + options = tf.data.Options() + options.experimental_distribute.auto_shard_policy = ( + tf.data.experimental.AutoShardPolicy.DATA + ) + ds_train = ds_train.with_options(options) + # Model building/compiling need to be within `strategy.scope()`. + multi_worker_model = build_and_compile_cnn_model() + + # Define the checkpoint directory to store the checkpoints + checkpoint_dir = args.checkpoint_dir + + # Name of the checkpoint files + checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt_{epoch}") + + # Function for decaying the learning rate. + # You can define any decay function you need. + # Callback for printing the LR at the end of each epoch. + class PrintLR(tf.keras.callbacks.Callback): + + def on_epoch_end(self, epoch, logs=None): # pylint: disable=no-self-use + print( + "\nLearning rate for epoch {} is {}".format( + epoch + 1, multi_worker_model.optimizer.lr.numpy() + ) + ) + + callbacks = [ + tf.keras.callbacks.TensorBoard(log_dir="./logs"), + tf.keras.callbacks.ModelCheckpoint( + filepath=checkpoint_prefix, save_weights_only=True + ), + tf.keras.callbacks.LearningRateScheduler(decay), + PrintLR(), + ] + + # Keras' `model.fit()` trains the model with specified number of epochs and + # number of steps per epoch. Note that the numbers here are for demonstration + # purposes only and may not sufficiently produce a model with good quality. + multi_worker_model.fit(ds_train, epochs=10, steps_per_epoch=70, callbacks=callbacks) + + # Saving a model + # Let `is_chief` be a utility function that inspects the cluster spec and + # current task type and returns True if the worker is the chief and False + # otherwise. + def is_chief(): + return TASK_INDEX == 0 + + if is_chief(): + model_path = args.saved_model_dir + + else: + # Save to a path that is unique across workers. + model_path = args.saved_model_dir + "/worker_tmp_" + str(TASK_INDEX) + + multi_worker_model.save(model_path) + + +if __name__ == "__main__": + os.environ["NCCL_DEBUG"] = "INFO" + + tfds.disable_progress_bar() + + # to decide if a worker is chief, get TASK_INDEX in Cluster info + tf_config = json.loads(os.environ.get("TF_CONFIG") or "{}") + TASK_INDEX = tf_config["task"]["index"] + + parser = argparse.ArgumentParser() + parser.add_argument( + "--saved_model_dir", + type=str, + required=True, + help="Tensorflow export directory.", + ) + + parser.add_argument( + "--checkpoint_dir", + type=str, + required=True, + help="Tensorflow checkpoint directory.", + ) + + parsed_args = parser.parse_args() + main(parsed_args) diff --git a/examples/tensorflow/mnist_with_summaries/README.md b/examples/tensorflow/mnist_with_summaries/README.md index 075e87ce60..ddef953fc1 100644 --- a/examples/tensorflow/mnist_with_summaries/README.md +++ b/examples/tensorflow/mnist_with_summaries/README.md @@ -18,4 +18,4 @@ docker build -f Dockerfile.ppc64le -t kubeflow123/tf-mnist-with-summaries:1.0 ./ Usage: 1. Add the persistent volume and claim: `kubectl apply -f tfevent-volume/.` 1. Deploy the TFJob: `kubectl apply -f tf_job_mnist.yaml` - * If on ppc64le, please update tf_job_mnist.yaml to use the image of ppc64le firstly. \ No newline at end of file + * If on ppc64le, please update tf_job_mnist.yaml to use the image of ppc64le firstly. diff --git a/examples/tensorflow/mnist_with_summaries/mnist_with_summaries.py b/examples/tensorflow/mnist_with_summaries/mnist_with_summaries.py index 65d3b12233..57ee0fa200 100644 --- a/examples/tensorflow/mnist_with_summaries/mnist_with_summaries.py +++ b/examples/tensorflow/mnist_with_summaries/mnist_with_summaries.py @@ -27,186 +27,201 @@ import sys import tensorflow as tf - from tensorflow.examples.tutorials.mnist import input_data FLAGS = None def train(): - # Import data - mnist = input_data.read_data_sets(FLAGS.data_dir, - fake_data=FLAGS.fake_data) - - sess = tf.InteractiveSession() - # Create a multilayer model. - - # Input placeholders - with tf.name_scope('input'): - x = tf.placeholder(tf.float32, [None, 784], name='x-input') - y_ = tf.placeholder(tf.int64, [None], name='y-input') - - with tf.name_scope('input_reshape'): - image_shaped_input = tf.reshape(x, [-1, 28, 28, 1]) - tf.summary.image('input', image_shaped_input, 10) - - # We can't initialize these variables to 0 - the network will get stuck. - def weight_variable(shape): - """Create a weight variable with appropriate initialization.""" - initial = tf.truncated_normal(shape, stddev=0.1) - return tf.Variable(initial) - - def bias_variable(shape): - """Create a bias variable with appropriate initialization.""" - initial = tf.constant(0.1, shape=shape) - return tf.Variable(initial) - - def variable_summaries(var): - """Attach a lot of summaries to a Tensor (for TensorBoard visualization).""" - with tf.name_scope('summaries'): - mean = tf.reduce_mean(var) - tf.summary.scalar('mean', mean) - with tf.name_scope('stddev'): - stddev = tf.sqrt(tf.reduce_mean(tf.square(var - mean))) - tf.summary.scalar('stddev', stddev) - tf.summary.scalar('max', tf.reduce_max(var)) - tf.summary.scalar('min', tf.reduce_min(var)) - tf.summary.histogram('histogram', var) - - def nn_layer(input_tensor, input_dim, output_dim, layer_name, act=tf.nn.relu): - """Reusable code for making a simple neural net layer. - It does a matrix multiply, bias add, and then uses ReLU to nonlinearize. - It also sets up name scoping so that the resultant graph is easy to read, - and adds a number of summary ops. - """ - # Adding a name scope ensures logical grouping of the layers in the graph. - with tf.name_scope(layer_name): - # This Variable will hold the state of the weights for the layer - with tf.name_scope('weights'): - weights = weight_variable([input_dim, output_dim]) - variable_summaries(weights) - with tf.name_scope('biases'): - biases = bias_variable([output_dim]) - variable_summaries(biases) - with tf.name_scope('Wx_plus_b'): - preactivate = tf.matmul(input_tensor, weights) + biases - tf.summary.histogram('pre_activations', preactivate) - activations = act(preactivate, name='activation') - tf.summary.histogram('activations', activations) - return activations - - hidden1 = nn_layer(x, 784, 500, 'layer1') - - with tf.name_scope('dropout'): - keep_prob = tf.placeholder(tf.float32) - tf.summary.scalar('dropout_keep_probability', keep_prob) - dropped = tf.nn.dropout(hidden1, keep_prob) - - # Do not apply softmax activation yet, see below. - y = nn_layer(dropped, 500, 10, 'layer2', act=tf.identity) - - with tf.name_scope('cross_entropy'): - # The raw formulation of cross-entropy, - # - # tf.reduce_mean(-tf.reduce_sum(y_ * tf.log(tf.softmax(y)), - # reduction_indices=[1])) - # - # can be numerically unstable. - # - # So here we use tf.losses.sparse_softmax_cross_entropy on the - # raw logit outputs of the nn_layer above, and then average across - # the batch. - with tf.name_scope('total'): - cross_entropy = tf.losses.sparse_softmax_cross_entropy( - labels=y_, logits=y) - tf.summary.scalar('cross_entropy', cross_entropy) - - with tf.name_scope('train'): - train_step = tf.train.AdamOptimizer(FLAGS.learning_rate).minimize( - cross_entropy) - - with tf.name_scope('accuracy'): - with tf.name_scope('correct_prediction'): - correct_prediction = tf.equal(tf.argmax(y, 1), y_) - with tf.name_scope('accuracy'): - accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32)) - tf.summary.scalar('accuracy', accuracy) - - # Merge all the summaries and write them out to - # /tmp/tensorflow/mnist/logs/mnist_with_summaries (by default) - merged = tf.summary.merge_all() - train_writer = tf.summary.FileWriter(FLAGS.log_dir + '/train', sess.graph) - test_writer = tf.summary.FileWriter(FLAGS.log_dir + '/test') - tf.global_variables_initializer().run() - - # Train the model, and also write summaries. - # Every 10th step, measure test-set accuracy, and write test summaries - # All other steps, run train_step on training data, & add training summaries - - def feed_dict(train): # pylint: disable=redefined-outer-name - """Make a TensorFlow feed_dict: maps data onto Tensor placeholders.""" - if train or FLAGS.fake_data: - xs, ys = mnist.train.next_batch(FLAGS.batch_size, fake_data=FLAGS.fake_data) - k = FLAGS.dropout - else: - xs, ys = mnist.test.images, mnist.test.labels - k = 1.0 - return {x: xs, y_: ys, keep_prob: k} - - for i in range(FLAGS.max_steps): - if i % 10 == 0: # Record summaries and test-set accuracy - summary, acc = sess.run([merged, accuracy], feed_dict=feed_dict(False)) - test_writer.add_summary(summary, i) - print('Accuracy at step %s: %s' % (i, acc)) - else: # Record train set summaries, and train - if i % 100 == 99: # Record execution stats - run_options = tf.RunOptions(trace_level=tf.RunOptions.FULL_TRACE) - run_metadata = tf.RunMetadata() - summary, _ = sess.run([merged, train_step], - feed_dict=feed_dict(True), - options=run_options, - run_metadata=run_metadata) - train_writer.add_run_metadata(run_metadata, 'step%03d' % i) - train_writer.add_summary(summary, i) - print('Adding run metadata for', i) - else: # Record a summary - summary, _ = sess.run([merged, train_step], feed_dict=feed_dict(True)) - train_writer.add_summary(summary, i) - train_writer.close() - test_writer.close() + # Import data + mnist = input_data.read_data_sets(FLAGS.data_dir, fake_data=FLAGS.fake_data) + + sess = tf.InteractiveSession() + # Create a multilayer model. + + # Input placeholders + with tf.name_scope("input"): + x = tf.placeholder(tf.float32, [None, 784], name="x-input") + y_ = tf.placeholder(tf.int64, [None], name="y-input") + + with tf.name_scope("input_reshape"): + image_shaped_input = tf.reshape(x, [-1, 28, 28, 1]) + tf.summary.image("input", image_shaped_input, 10) + + # We can't initialize these variables to 0 - the network will get stuck. + def weight_variable(shape): + """Create a weight variable with appropriate initialization.""" + initial = tf.truncated_normal(shape, stddev=0.1) + return tf.Variable(initial) + + def bias_variable(shape): + """Create a bias variable with appropriate initialization.""" + initial = tf.constant(0.1, shape=shape) + return tf.Variable(initial) + + def variable_summaries(var): + """Attach a lot of summaries to a Tensor (for TensorBoard visualization).""" + with tf.name_scope("summaries"): + mean = tf.reduce_mean(var) + tf.summary.scalar("mean", mean) + with tf.name_scope("stddev"): + stddev = tf.sqrt(tf.reduce_mean(tf.square(var - mean))) + tf.summary.scalar("stddev", stddev) + tf.summary.scalar("max", tf.reduce_max(var)) + tf.summary.scalar("min", tf.reduce_min(var)) + tf.summary.histogram("histogram", var) + + def nn_layer(input_tensor, input_dim, output_dim, layer_name, act=tf.nn.relu): + """Reusable code for making a simple neural net layer. + It does a matrix multiply, bias add, and then uses ReLU to nonlinearize. + It also sets up name scoping so that the resultant graph is easy to read, + and adds a number of summary ops. + """ + # Adding a name scope ensures logical grouping of the layers in the graph. + with tf.name_scope(layer_name): + # This Variable will hold the state of the weights for the layer + with tf.name_scope("weights"): + weights = weight_variable([input_dim, output_dim]) + variable_summaries(weights) + with tf.name_scope("biases"): + biases = bias_variable([output_dim]) + variable_summaries(biases) + with tf.name_scope("Wx_plus_b"): + preactivate = tf.matmul(input_tensor, weights) + biases + tf.summary.histogram("pre_activations", preactivate) + activations = act(preactivate, name="activation") + tf.summary.histogram("activations", activations) + return activations + + hidden1 = nn_layer(x, 784, 500, "layer1") + + with tf.name_scope("dropout"): + keep_prob = tf.placeholder(tf.float32) + tf.summary.scalar("dropout_keep_probability", keep_prob) + dropped = tf.nn.dropout(hidden1, keep_prob) + + # Do not apply softmax activation yet, see below. + y = nn_layer(dropped, 500, 10, "layer2", act=tf.identity) + + with tf.name_scope("cross_entropy"): + # The raw formulation of cross-entropy, + # + # tf.reduce_mean(-tf.reduce_sum(y_ * tf.log(tf.softmax(y)), + # reduction_indices=[1])) + # + # can be numerically unstable. + # + # So here we use tf.losses.sparse_softmax_cross_entropy on the + # raw logit outputs of the nn_layer above, and then average across + # the batch. + with tf.name_scope("total"): + cross_entropy = tf.losses.sparse_softmax_cross_entropy(labels=y_, logits=y) + tf.summary.scalar("cross_entropy", cross_entropy) + + with tf.name_scope("train"): + train_step = tf.train.AdamOptimizer(FLAGS.learning_rate).minimize(cross_entropy) + + with tf.name_scope("accuracy"): + with tf.name_scope("correct_prediction"): + correct_prediction = tf.equal(tf.argmax(y, 1), y_) + with tf.name_scope("accuracy"): + accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32)) + tf.summary.scalar("accuracy", accuracy) + + # Merge all the summaries and write them out to + # /tmp/tensorflow/mnist/logs/mnist_with_summaries (by default) + merged = tf.summary.merge_all() + train_writer = tf.summary.FileWriter(FLAGS.log_dir + "/train", sess.graph) + test_writer = tf.summary.FileWriter(FLAGS.log_dir + "/test") + tf.global_variables_initializer().run() + + # Train the model, and also write summaries. + # Every 10th step, measure test-set accuracy, and write test summaries + # All other steps, run train_step on training data, & add training summaries + + def feed_dict(train): # pylint: disable=redefined-outer-name + """Make a TensorFlow feed_dict: maps data onto Tensor placeholders.""" + if train or FLAGS.fake_data: + xs, ys = mnist.train.next_batch(FLAGS.batch_size, fake_data=FLAGS.fake_data) + k = FLAGS.dropout + else: + xs, ys = mnist.test.images, mnist.test.labels + k = 1.0 + return {x: xs, y_: ys, keep_prob: k} + + for i in range(FLAGS.max_steps): + if i % 10 == 0: # Record summaries and test-set accuracy + summary, acc = sess.run([merged, accuracy], feed_dict=feed_dict(False)) + test_writer.add_summary(summary, i) + print("Accuracy at step %s: %s" % (i, acc)) + else: # Record train set summaries, and train + if i % 100 == 99: # Record execution stats + run_options = tf.RunOptions(trace_level=tf.RunOptions.FULL_TRACE) + run_metadata = tf.RunMetadata() + summary, _ = sess.run( + [merged, train_step], + feed_dict=feed_dict(True), + options=run_options, + run_metadata=run_metadata, + ) + train_writer.add_run_metadata(run_metadata, "step%03d" % i) + train_writer.add_summary(summary, i) + print("Adding run metadata for", i) + else: # Record a summary + summary, _ = sess.run([merged, train_step], feed_dict=feed_dict(True)) + train_writer.add_summary(summary, i) + train_writer.close() + test_writer.close() def main(_): - if tf.gfile.Exists(FLAGS.log_dir): - tf.gfile.DeleteRecursively(FLAGS.log_dir) - tf.gfile.MakeDirs(FLAGS.log_dir) - train() - - -if __name__ == '__main__': - parser = argparse.ArgumentParser() - parser.add_argument('--fake_data', nargs='?', const=True, type=bool, - default=False, - help='If true, uses fake data for unit testing.') - parser.add_argument('--max_steps', type=int, default=1000, - help='Number of steps to run trainer.') - parser.add_argument('--learning_rate', type=float, default=0.001, - help='Initial learning rate') - parser.add_argument('--batch_size', type=int, default=100, - help='Training batch size') - parser.add_argument('--dropout', type=float, default=0.9, - help='Keep probability for training dropout.') - parser.add_argument( - '--data_dir', - type=str, - default=os.path.join(os.getenv('TEST_TMPDIR', '/tmp'), - 'tensorflow/mnist/input_data'), - help='Directory for storing input data') - parser.add_argument( - '--log_dir', - type=str, - default=os.path.join(os.getenv('TEST_TMPDIR', '/tmp'), - 'tensorflow/mnist/logs/mnist_with_summaries'), - help='Summaries log directory') - FLAGS, unparsed = parser.parse_known_args() - tf.app.run(main=main, argv=[sys.argv[0]] + unparsed) + if tf.gfile.Exists(FLAGS.log_dir): + tf.gfile.DeleteRecursively(FLAGS.log_dir) + tf.gfile.MakeDirs(FLAGS.log_dir) + train() + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument( + "--fake_data", + nargs="?", + const=True, + type=bool, + default=False, + help="If true, uses fake data for unit testing.", + ) + parser.add_argument( + "--max_steps", type=int, default=1000, help="Number of steps to run trainer." + ) + parser.add_argument( + "--learning_rate", type=float, default=0.001, help="Initial learning rate" + ) + parser.add_argument( + "--batch_size", type=int, default=100, help="Training batch size" + ) + parser.add_argument( + "--dropout", + type=float, + default=0.9, + help="Keep probability for training dropout.", + ) + parser.add_argument( + "--data_dir", + type=str, + default=os.path.join( + os.getenv("TEST_TMPDIR", "/tmp"), "tensorflow/mnist/input_data" + ), + help="Directory for storing input data", + ) + parser.add_argument( + "--log_dir", + type=str, + default=os.path.join( + os.getenv("TEST_TMPDIR", "/tmp"), + "tensorflow/mnist/logs/mnist_with_summaries", + ), + help="Summaries log directory", + ) + FLAGS, unparsed = parser.parse_known_args() + tf.app.run(main=main, argv=[sys.argv[0]] + unparsed) diff --git a/examples/tensorflow/mnist_with_summaries/tf_job_mnist.yaml b/examples/tensorflow/mnist_with_summaries/tf_job_mnist.yaml index 88e0e94848..3c1884a05c 100644 --- a/examples/tensorflow/mnist_with_summaries/tf_job_mnist.yaml +++ b/examples/tensorflow/mnist_with_summaries/tf_job_mnist.yaml @@ -2,13 +2,13 @@ apiVersion: "kubeflow.org/v1" kind: "TFJob" metadata: name: "mnist" - namespace: kubeflow + namespace: kubeflow spec: runPolicy: cleanPodPolicy: None tfReplicaSpecs: Worker: - replicas: 1 + replicas: 1 restartPolicy: Never template: spec: @@ -27,4 +27,4 @@ spec: volumes: - name: "training" persistentVolumeClaim: - claimName: "tfevent-volume" + claimName: "tfevent-volume" diff --git a/examples/tensorflow/mnist_with_summaries/tfevent-volume/tfevent-pv.yaml b/examples/tensorflow/mnist_with_summaries/tfevent-volume/tfevent-pv.yaml index a450c6a492..cf41c6f982 100644 --- a/examples/tensorflow/mnist_with_summaries/tfevent-volume/tfevent-pv.yaml +++ b/examples/tensorflow/mnist_with_summaries/tfevent-volume/tfevent-pv.yaml @@ -8,7 +8,7 @@ metadata: spec: capacity: storage: 10Gi - storageClassName: standard + storageClassName: standard accessModes: - ReadWriteMany hostPath: diff --git a/examples/tensorflow/mnist_with_summaries/tfevent-volume/tfevent-pvc.yaml b/examples/tensorflow/mnist_with_summaries/tfevent-volume/tfevent-pvc.yaml index 7d7f8487a1..6bab17d803 100644 --- a/examples/tensorflow/mnist_with_summaries/tfevent-volume/tfevent-pvc.yaml +++ b/examples/tensorflow/mnist_with_summaries/tfevent-volume/tfevent-pvc.yaml @@ -2,7 +2,7 @@ apiVersion: v1 kind: PersistentVolumeClaim metadata: name: tfevent-volume - namespace: kubeflow + namespace: kubeflow labels: type: local app: tfjob diff --git a/examples/tensorflow/tf_sample/setup.py b/examples/tensorflow/tf_sample/setup.py index bde533c8bd..5bb0d4257a 100644 --- a/examples/tensorflow/tf_sample/setup.py +++ b/examples/tensorflow/tf_sample/setup.py @@ -10,17 +10,18 @@ # See the License for the specific language governing permissions and # limitations under the License. """A setup.py file for the tf_sample package.""" -from setuptools import find_packages, setup +from setuptools import find_packages +from setuptools import setup REQUIRED_PACKAGES = [] setup( - name='tf_sample', - version='0.1.1', - author='Jeremy Lewi', - author_email='jlewi@google.com', + name="tf_sample", + version="0.1.1", + author="Jeremy Lewi", + author_email="jlewi@google.com", install_requires=REQUIRED_PACKAGES, packages=find_packages(), - description='Sample TF program', + description="Sample TF program", requires=[], ) diff --git a/examples/tensorflow/tf_sample/tf_smoke.py b/examples/tensorflow/tf_sample/tf_smoke.py index d0b5eeb60d..bbde61167c 100644 --- a/examples/tensorflow/tf_sample/tf_smoke.py +++ b/examples/tensorflow/tf_sample/tf_smoke.py @@ -7,141 +7,147 @@ The master will assign ops to every task in the cluster. This way we can verify that distributed training is working by executing ops on all devices. """ + import argparse import json import logging import os -import retrying +import retrying import tensorflow as tf def parse_args(): - """Parse the command line arguments.""" - parser = argparse.ArgumentParser() + """Parse the command line arguments.""" + parser = argparse.ArgumentParser() + + parser.add_argument( + "--sleep_secs", default=0, type=int, help=("Amount of time to sleep at the end") + ) - parser.add_argument( - "--sleep_secs", - default=0, - type=int, - help=("Amount of time to sleep at the end")) + # TODO(jlewi): We ignore unknown arguments because the backend is currently + # setting some flags to empty values like metadata path. + args, _ = parser.parse_known_args() + return args - # TODO(jlewi): We ignore unknown arguments because the backend is currently - # setting some flags to empty values like metadata path. - args, _ = parser.parse_known_args() - return args # Add retries to deal with things like gRPC errors that result in # UnavailableError. -@retrying.retry(wait_exponential_multiplier=1000, wait_exponential_max=10000, - stop_max_delay=60*3*1000) +@retrying.retry( + wait_exponential_multiplier=1000, + wait_exponential_max=10000, + stop_max_delay=60 * 3 * 1000, +) def run(server, cluster_spec): # pylint: disable=too-many-statements, too-many-locals - """Build the graph and run the example. - - Args: - server: The TensorFlow server to use. - - Raises: - RuntimeError: If the expected log entries aren't found. - """ - - # construct the graph and create a saver object - with tf.Graph().as_default(): # pylint: disable=not-context-manager - # The initial value should be such that type is correctly inferred as - # float. - width = 10 - height = 10 - results = [] - - # The master assigns ops to every TFProcess in the cluster. - for job_name in cluster_spec.keys(): - for i in range(len(cluster_spec[job_name])): - d = "/job:{0}/task:{1}".format(job_name, i) - with tf.device(d): - a = tf.constant(range(width * height), shape=[height, width]) - b = tf.constant(range(width * height), shape=[height, width]) - c = tf.multiply(a, b) - results.append(c) - - init_op = tf.global_variables_initializer() - - if server: - target = server.target - else: - # Create a direct session. - target = "" - - logging.info("Server target: %s", target) - with tf.Session( - target, config=tf.ConfigProto(log_device_placement=True)) as sess: - sess.run(init_op) - for r in results: - result = sess.run(r) - logging.info("Result: %s", result) + """Build the graph and run the example. + + Args: + server: The TensorFlow server to use. + + Raises: + RuntimeError: If the expected log entries aren't found. + """ + + # construct the graph and create a saver object + with tf.Graph().as_default(): # pylint: disable=not-context-manager + # The initial value should be such that type is correctly inferred as + # float. + width = 10 + height = 10 + results = [] + + # The master assigns ops to every TFProcess in the cluster. + for job_name in cluster_spec.keys(): + for i in range(len(cluster_spec[job_name])): + d = "/job:{0}/task:{1}".format(job_name, i) + with tf.device(d): + a = tf.constant(range(width * height), shape=[height, width]) + b = tf.constant(range(width * height), shape=[height, width]) + c = tf.multiply(a, b) + results.append(c) + + init_op = tf.global_variables_initializer() + + if server: + target = server.target + else: + # Create a direct session. + target = "" + + logging.info("Server target: %s", target) + with tf.Session( + target, config=tf.ConfigProto(log_device_placement=True) + ) as sess: + sess.run(init_op) + for r in results: + result = sess.run(r) + logging.info("Result: %s", result) def main(): - """Run training. - - Raises: - ValueError: If the arguments are invalid. - """ - logging.info("Tensorflow version: %s", tf.__version__) - logging.info("Tensorflow git version: %s", tf.__git_version__) - - tf_config_json = os.environ.get("TF_CONFIG", "{}") - tf_config = json.loads(tf_config_json) - logging.info("tf_config: %s", tf_config) - - task = tf_config.get("task", {}) - logging.info("task: %s", task) - - cluster_spec = tf_config.get("cluster", {}) - logging.info("cluster_spec: %s", cluster_spec) - - server = None - device_func = None - if cluster_spec: - cluster_spec_object = tf.train.ClusterSpec(cluster_spec) - server_def = tf.train.ServerDef( - cluster=cluster_spec_object.as_cluster_def(), - protocol="grpc", - job_name=task["type"], - task_index=task["index"]) - - logging.info("server_def: %s", server_def) - - logging.info("Building server.") - # Create and start a server for the local task. - server = tf.train.Server(server_def) - logging.info("Finished building server.") - - # Assigns ops to the local worker by default. - device_func = tf.train.replica_device_setter( - worker_device="/job:worker/task:%d" % server_def.task_index, - cluster=server_def.cluster) - else: - # This should return a null op device setter since we are using - # all the defaults. - logging.error("Using default device function.") - device_func = tf.train.replica_device_setter() - - job_type = task.get("type", "").lower() - if job_type == "ps": - logging.info("Running PS code.") - server.join() - elif job_type == "worker": - logging.info("Running Worker code.") - # The worker just blocks because we let the master assign all ops. - server.join() - elif job_type in ["master", "chief"] or not job_type: - logging.info("Running master/chief.") - with tf.device(device_func): - run(server=server, cluster_spec=cluster_spec) - else: - raise ValueError("invalid job_type %s" % (job_type,)) + """Run training. + + Raises: + ValueError: If the arguments are invalid. + """ + logging.info("Tensorflow version: %s", tf.__version__) + logging.info("Tensorflow git version: %s", tf.__git_version__) + + tf_config_json = os.environ.get("TF_CONFIG", "{}") + tf_config = json.loads(tf_config_json) + logging.info("tf_config: %s", tf_config) + + task = tf_config.get("task", {}) + logging.info("task: %s", task) + + cluster_spec = tf_config.get("cluster", {}) + logging.info("cluster_spec: %s", cluster_spec) + + server = None + device_func = None + if cluster_spec: + cluster_spec_object = tf.train.ClusterSpec(cluster_spec) + server_def = tf.train.ServerDef( + cluster=cluster_spec_object.as_cluster_def(), + protocol="grpc", + job_name=task["type"], + task_index=task["index"], + ) + + logging.info("server_def: %s", server_def) + + logging.info("Building server.") + # Create and start a server for the local task. + server = tf.train.Server(server_def) + logging.info("Finished building server.") + + # Assigns ops to the local worker by default. + device_func = tf.train.replica_device_setter( + worker_device="/job:worker/task:%d" % server_def.task_index, + cluster=server_def.cluster, + ) + else: + # This should return a null op device setter since we are using + # all the defaults. + logging.error("Using default device function.") + device_func = tf.train.replica_device_setter() + + job_type = task.get("type", "").lower() + if job_type == "ps": + logging.info("Running PS code.") + server.join() + elif job_type == "worker": + logging.info("Running Worker code.") + # The worker just blocks because we let the master assign all ops. + server.join() + elif job_type in ["master", "chief"] or not job_type: + logging.info("Running master/chief.") + with tf.device(device_func): + run(server=server, cluster_spec=cluster_spec) + else: + raise ValueError("invalid job_type %s" % (job_type,)) if __name__ == "__main__": - logging.getLogger().setLevel(logging.INFO) - main() + logging.getLogger().setLevel(logging.INFO) + main() diff --git a/examples/xgboost/lightgbm-dist/README.md b/examples/xgboost/lightgbm-dist/README.md index 79d9334f16..616425f3cf 100644 --- a/examples/xgboost/lightgbm-dist/README.md +++ b/examples/xgboost/lightgbm-dist/README.md @@ -200,4 +200,4 @@ status: succeeded: 1 Worker: succeeded: 2 -``` \ No newline at end of file +``` diff --git a/examples/xgboost/lightgbm-dist/main.py b/examples/xgboost/lightgbm-dist/main.py index 25a96055f5..e9ba951957 100644 --- a/examples/xgboost/lightgbm-dist/main.py +++ b/examples/xgboost/lightgbm-dist/main.py @@ -15,7 +15,8 @@ import os from train import train -from utils import generate_machine_list_file, generate_train_conf_file +from utils import generate_machine_list_file +from utils import generate_train_conf_file logger = logging.getLogger(__name__) diff --git a/examples/xgboost/lightgbm-dist/xgboostjob_v1_lightgbm_dist_training.yaml b/examples/xgboost/lightgbm-dist/xgboostjob_v1_lightgbm_dist_training.yaml index c34a7cbf92..35487b1511 100644 --- a/examples/xgboost/lightgbm-dist/xgboostjob_v1_lightgbm_dist_training.yaml +++ b/examples/xgboost/lightgbm-dist/xgboostjob_v1_lightgbm_dist_training.yaml @@ -72,4 +72,3 @@ spec: - --is_enable_sparse=true - --use_two_round_loading=false - --is_save_binary_file=false - diff --git a/examples/xgboost/smoke-dist/README.md b/examples/xgboost/smoke-dist/README.md index e04afb5a5c..c0ce8e6cc2 100644 --- a/examples/xgboost/smoke-dist/README.md +++ b/examples/xgboost/smoke-dist/README.md @@ -85,6 +85,3 @@ status: Worker: succeeded: 2 ``` - - - diff --git a/examples/xgboost/smoke-dist/tracker.py b/examples/xgboost/smoke-dist/tracker.py index 1074f594ae..c2e17cb0f0 100644 --- a/examples/xgboost/smoke-dist/tracker.py +++ b/examples/xgboost/smoke-dist/tracker.py @@ -12,6 +12,7 @@ https://github.com/dmlc/dmlc-core/blob/master/LICENSE No code modified or added except for this explanatory comment. """ + # pylint: disable=invalid-name, missing-docstring, too-many-arguments # pylint: disable=too-many-locals # pylint: disable=too-many-branches, too-many-statements @@ -24,14 +25,15 @@ import struct import subprocess import sys -import time from threading import Thread +import time class ExSocket(object): """ Extension of socket to handle recv and send of special data """ + def __init__(self, sock): self.sock = sock @@ -42,13 +44,13 @@ def recvall(self, nbytes): chunk = self.sock.recv(min(nbytes - nread, 1024)) nread += len(chunk) res.append(chunk) - return b''.join(res) + return b"".join(res) def recvint(self): - return struct.unpack('@i', self.recvall(4))[0] + return struct.unpack("@i", self.recvall(4))[0] def sendint(self, n): - self.sock.sendall(struct.pack('@i', n)) + self.sock.sendall(struct.pack("@i", n)) def sendstr(self, s): self.sendint(len(s)) @@ -60,7 +62,7 @@ def recvstr(self): # magic number used to verify existence of data -kMagic = 0xff99 +kMagic = 0xFF99 def get_some_ip(host): @@ -77,8 +79,7 @@ def __init__(self, sock, s_addr): self.sock = slave self.host = get_some_ip(s_addr[0]) magic = slave.recvint() - assert magic == kMagic, 'invalid magic number=%d from %s' % ( - magic, self.host) + assert magic == kMagic, "invalid magic number=%d from %s" % (magic, self.host) slave.sendint(kMagic) self.rank = slave.recvint() self.world_size = slave.recvint() @@ -90,7 +91,7 @@ def __init__(self, sock, s_addr): def decide_rank(self, job_map): if self.rank >= 0: return self.rank - if self.jobid != 'NULL' and self.jobid in job_map: + if self.jobid != "NULL" and self.jobid in job_map: return job_map[self.jobid] return -1 @@ -156,6 +157,7 @@ class RabitTracker(object): """ tracker for rabit """ + def __init__(self, hostIP, nslave, port=9091, port_end=9999): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) for port in range(port, port_end): @@ -175,7 +177,7 @@ def __init__(self, hostIP, nslave, port=9091, port_end=9999): self.start_time = None self.end_time = None self.nslave = nslave - logging.info('start listen on %s:%d', hostIP, self.port) + logging.info("start listen on %s:%d", hostIP, self.port) def __del__(self): self.sock.close() @@ -197,8 +199,7 @@ def slave_envs(self): get enviroment variables for slaves can be passed in as args or envs """ - return {'DMLC_TRACKER_URI': self.hostIP, - 'DMLC_TRACKER_PORT': self.port} + return {"DMLC_TRACKER_URI": self.hostIP, "DMLC_TRACKER_PORT": self.port} def get_tree(self, nslave): tree_map = {} @@ -284,20 +285,20 @@ def accept_slaves(self, nslave): while len(shutdown) != nslave: fd, s_addr = self.sock.accept() s = SlaveEntry(fd, s_addr) - if s.cmd == 'print': + if s.cmd == "print": msg = s.sock.recvstr() logging.info(msg.strip()) continue - if s.cmd == 'shutdown': + if s.cmd == "shutdown": assert s.rank >= 0 and s.rank not in shutdown assert s.rank not in wait_conn shutdown[s.rank] = s - logging.debug('Recieve %s signal from %d', s.cmd, s.rank) + logging.debug("Recieve %s signal from %d", s.cmd, s.rank) continue - assert s.cmd == 'start' or s.cmd == 'recover' + assert s.cmd == "start" or s.cmd == "recover" # lazily initialize the slaves if tree_map is None: - assert s.cmd == 'start' + assert s.cmd == "start" if s.world_size > 0: nslave = s.world_size tree_map, parent_map, ring_map = self.get_link_map(nslave) @@ -305,7 +306,7 @@ def accept_slaves(self, nslave): todo_nodes = list(range(nslave)) else: assert s.world_size == -1 or s.world_size == nslave - if s.cmd == 'recover': + if s.cmd == "recover": assert s.rank >= 0 rank = s.decide_rank(job_map) @@ -317,34 +318,39 @@ def accept_slaves(self, nslave): pending.sort(key=lambda x: x.host) for s in pending: rank = todo_nodes.pop(0) - if s.jobid != 'NULL': + if s.jobid != "NULL": job_map[s.jobid] = rank - s.assign_rank(rank, wait_conn, tree_map, parent_map, - ring_map) + s.assign_rank(rank, wait_conn, tree_map, parent_map, ring_map) if s.wait_accept > 0: wait_conn[rank] = s - logging.debug('Recieve %s signal from %s; ' - 'assign rank %d', s.cmd, s.host, s.rank) + logging.debug( + "Recieve %s signal from %s; " "assign rank %d", + s.cmd, + s.host, + s.rank, + ) if len(todo_nodes) == 0: - logging.info('@tracker All of %d nodes getting started', - nslave) + logging.info("@tracker All of %d nodes getting started", nslave) self.start_time = time.time() else: s.assign_rank(rank, wait_conn, tree_map, parent_map, ring_map) - logging.debug('Recieve %s signal from %d', s.cmd, s.rank) + logging.debug("Recieve %s signal from %d", s.cmd, s.rank) if s.wait_accept > 0: wait_conn[rank] = s logging.info("worker(ip_address=%s) connected!" % get_some_ip(s_addr[0])) - logging.info('@tracker All nodes finishes job') + logging.info("@tracker All nodes finishes job") self.end_time = time.time() - logging.info('@tracker %s secs between node start and job finish', - str(self.end_time - self.start_time)) + logging.info( + "@tracker %s secs between node start and job finish", + str(self.end_time - self.start_time), + ) def start(self, nslave): def run(): self.accept_slaves(nslave) + self.thread = Thread(target=run, args=()) self.thread.setDaemon(True) self.thread.start() @@ -358,6 +364,7 @@ class PSTracker(object): """ Tracker module for PS """ + def __init__(self, hostIP, cmd, port=9091, port_end=9999, envs=None): """ Starts the PS scheduler @@ -370,7 +377,7 @@ def __init__(self, hostIP, cmd, port=9091, port_end=9999, envs=None): sock = socket.socket(get_family(hostIP), socket.SOCK_STREAM) for port in range(port, port_end): try: - sock.bind(('', port)) + sock.bind(("", port)) self.port = port sock.close() break @@ -378,14 +385,15 @@ def __init__(self, hostIP, cmd, port=9091, port_end=9999, envs=None): continue env = os.environ.copy() - env['DMLC_ROLE'] = 'scheduler' - env['DMLC_PS_ROOT_URI'] = str(self.hostIP) - env['DMLC_PS_ROOT_PORT'] = str(self.port) + env["DMLC_ROLE"] = "scheduler" + env["DMLC_PS_ROOT_URI"] = str(self.hostIP) + env["DMLC_PS_ROOT_PORT"] = str(self.port) for k, v in envs.items(): env[k] = str(v) self.thread = Thread( - target=(lambda: subprocess.check_call(self.cmd, env=env, - shell=True)), args=()) + target=(lambda: subprocess.check_call(self.cmd, env=env, shell=True)), + args=(), + ) self.thread.setDaemon(True) self.thread.start() @@ -398,38 +406,38 @@ def slave_envs(self): if self.cmd is None: return {} else: - return {'DMLC_PS_ROOT_URI': self.hostIP, - 'DMLC_PS_ROOT_PORT': self.port} + return {"DMLC_PS_ROOT_URI": self.hostIP, "DMLC_PS_ROOT_PORT": self.port} def get_host_ip(hostIP=None): - if hostIP is None or hostIP == 'auto': - hostIP = 'ip' + if hostIP is None or hostIP == "auto": + hostIP = "ip" - if hostIP == 'dns': + if hostIP == "dns": hostIP = socket.getfqdn() - elif hostIP == 'ip': + elif hostIP == "ip": from socket import gaierror + try: hostIP = socket.gethostbyname(socket.getfqdn()) except gaierror: - logging.warn('gethostbyname(socket.getfqdn()) failed... trying on ' - 'hostname()') + logging.warn( + "gethostbyname(socket.getfqdn()) failed... trying on " "hostname()" + ) hostIP = socket.gethostbyname(socket.gethostname()) if hostIP.startswith("127."): s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # doesn't have to be reachable - s.connect(('10.255.255.255', 1)) + s.connect(("10.255.255.255", 1)) hostIP = s.getsockname()[0] return hostIP -def submit(nworker, nserver, fun_submit, hostIP='auto', pscmd=None): +def submit(nworker, nserver, fun_submit, hostIP="auto", pscmd=None): if nserver == 0: pscmd = None - envs = {'DMLC_NUM_WORKER': nworker, - 'DMLC_NUM_SERVER': nserver} + envs = {"DMLC_NUM_WORKER": nworker, "DMLC_NUM_SERVER": nserver} hostIP = get_host_ip(hostIP) if nserver == 0: @@ -453,42 +461,56 @@ def start_rabit_tracker(args): ---------- args: arguments to start the rabit tracker. """ - envs = {'DMLC_NUM_WORKER': args.num_workers, - 'DMLC_NUM_SERVER': args.num_servers} - rabit = RabitTracker(hostIP=get_host_ip(args.host_ip), - nslave=args.num_workers) + envs = {"DMLC_NUM_WORKER": args.num_workers, "DMLC_NUM_SERVER": args.num_servers} + rabit = RabitTracker(hostIP=get_host_ip(args.host_ip), nslave=args.num_workers) envs.update(rabit.slave_envs()) rabit.start(args.num_workers) - sys.stdout.write('DMLC_TRACKER_ENV_START\n') + sys.stdout.write("DMLC_TRACKER_ENV_START\n") # simply write configuration to stdout for k, v in envs.items(): - sys.stdout.write('%s=%s\n' % (k, str(v))) - sys.stdout.write('DMLC_TRACKER_ENV_END\n') + sys.stdout.write("%s=%s\n" % (k, str(v))) + sys.stdout.write("DMLC_TRACKER_ENV_END\n") sys.stdout.flush() rabit.join() def main(): """Main function if tracker is executed in standalone mode.""" - parser = argparse.ArgumentParser(description='Rabit Tracker start.') - parser.add_argument('--num-workers', required=True, type=int, - help='Number of worker proccess to be launched.') - parser.add_argument('--num-servers', default=0, type=int, - help='Number of server process to be launched. Only ' - 'used in PS jobs.') - parser.add_argument('--host-ip', default=None, type=str, - help=('Host IP addressed, this is only needed ' + - 'if the host IP cannot be automatically guessed.' - )) - parser.add_argument('--log-level', default='INFO', type=str, - choices=['INFO', 'DEBUG'], - help='Logging level of the logger.') + parser = argparse.ArgumentParser(description="Rabit Tracker start.") + parser.add_argument( + "--num-workers", + required=True, + type=int, + help="Number of worker proccess to be launched.", + ) + parser.add_argument( + "--num-servers", + default=0, + type=int, + help="Number of server process to be launched. Only " "used in PS jobs.", + ) + parser.add_argument( + "--host-ip", + default=None, + type=str, + help=( + "Host IP addressed, this is only needed " + + "if the host IP cannot be automatically guessed." + ), + ) + parser.add_argument( + "--log-level", + default="INFO", + type=str, + choices=["INFO", "DEBUG"], + help="Logging level of the logger.", + ) args = parser.parse_args() - fmt = '%(asctime)s %(levelname)s %(message)s' - if args.log_level == 'INFO': + fmt = "%(asctime)s %(levelname)s %(message)s" + if args.log_level == "INFO": level = logging.INFO - elif args.log_level == 'DEBUG': + elif args.log_level == "DEBUG": level = logging.DEBUG else: raise RuntimeError("Unknown logging level %s" % args.log_level) @@ -498,9 +520,8 @@ def main(): if args.num_servers == 0: start_rabit_tracker(args) else: - raise RuntimeError("Do not yet support start ps tracker in standalone " - "mode.") + raise RuntimeError("Do not yet support start ps tracker in standalone " "mode.") if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/examples/xgboost/smoke-dist/xgboost_smoke_test.py b/examples/xgboost/smoke-dist/xgboost_smoke_test.py index 8ca4476b13..b56640f144 100644 --- a/examples/xgboost/smoke-dist/xgboost_smoke_test.py +++ b/examples/xgboost/smoke-dist/xgboost_smoke_test.py @@ -1,4 +1,3 @@ - # Copyright 2018 Google Inc. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -18,11 +17,11 @@ import traceback from tracker import RabitTracker - import xgboost as xgb logger = logging.getLogger(__name__) + def extract_xgbooost_cluster_env(): logger.info("start to extract system env") @@ -32,11 +31,17 @@ def extract_xgbooost_cluster_env(): rank = int(os.environ.get("RANK", "{}")) world_size = int(os.environ.get("WORLD_SIZE", "{}")) - logger.info("extract the rabit env from cluster : %s, port: %d, rank: %d, word_size: %d ", - master_addr, master_port, rank, world_size) + logger.info( + "extract the rabit env from cluster : %s, port: %d, rank: %d, word_size: %d ", + master_addr, + master_port, + rank, + world_size, + ) return master_addr, master_port, rank, world_size + def setup_rabit_cluster(): addr, port, rank, world_size = extract_xgbooost_cluster_env() @@ -47,35 +52,36 @@ def setup_rabit_cluster(): if rank == 0: logger.info("start the master node") - rabit = RabitTracker(hostIP="0.0.0.0", nslave=world_size, - port=port, port_end=port + 1) + rabit = RabitTracker( + hostIP="0.0.0.0", nslave=world_size, port=port, port_end=port + 1 + ) rabit.start(world_size) rabit_tracker = rabit - logger.info('########### RabitTracker Setup Finished #########') + logger.info("########### RabitTracker Setup Finished #########") envs = [ - 'DMLC_NUM_WORKER=%d' % world_size, - 'DMLC_TRACKER_URI=%s' % addr, - 'DMLC_TRACKER_PORT=%d' % port, - 'DMLC_TASK_ID=%d' % rank + "DMLC_NUM_WORKER=%d" % world_size, + "DMLC_TRACKER_URI=%s" % addr, + "DMLC_TRACKER_PORT=%d" % port, + "DMLC_TASK_ID=%d" % rank, ] - logger.info('##### Rabit rank setup with below envs #####') + logger.info("##### Rabit rank setup with below envs #####") for i, env in enumerate(envs): logger.info(env) envs[i] = str.encode(env) xgb.rabit.init(envs) - logger.info('##### Rabit rank = %d' % xgb.rabit.get_rank()) + logger.info("##### Rabit rank = %d" % xgb.rabit.get_rank()) rank = xgb.rabit.get_rank() s = None if rank == 0: - s = {'hello world': 100, 2: 3} + s = {"hello world": 100, 2: 3} - logger.info('@node[%d] before-broadcast: s=\"%s\"' % (rank, str(s))) + logger.info('@node[%d] before-broadcast: s="%s"' % (rank, str(s))) s = xgb.rabit.broadcast(s, 0) - logger.info('@node[%d] after-broadcast: s=\"%s\"' % (rank, str(s))) + logger.info('@node[%d] after-broadcast: s="%s"' % (rank, str(s))) except Exception as e: logger.error("something wrong happen: %s", traceback.format_exc()) @@ -88,6 +94,7 @@ def setup_rabit_cluster(): logger.info("the rabit network testing finished!") + def main(): port = os.environ.get("MASTER_PORT", "{}") @@ -104,6 +111,7 @@ def main(): setup_rabit_cluster() + if __name__ == "__main__": logging.getLogger().setLevel(logging.INFO) main() diff --git a/examples/xgboost/smoke-dist/xgboostjob_v1_rabit_test.yaml b/examples/xgboost/smoke-dist/xgboostjob_v1_rabit_test.yaml index 15f55a01b1..80ce3adce1 100644 --- a/examples/xgboost/smoke-dist/xgboostjob_v1_rabit_test.yaml +++ b/examples/xgboost/smoke-dist/xgboostjob_v1_rabit_test.yaml @@ -28,4 +28,3 @@ spec: - containerPort: 9991 name: xgboostjob-port imagePullPolicy: Always - diff --git a/examples/xgboost/smoke-dist/xgboostjob_v1alpha1_rabit_test.yaml b/examples/xgboost/smoke-dist/xgboostjob_v1alpha1_rabit_test.yaml index 24fe7526fc..5364eb07cd 100644 --- a/examples/xgboost/smoke-dist/xgboostjob_v1alpha1_rabit_test.yaml +++ b/examples/xgboost/smoke-dist/xgboostjob_v1alpha1_rabit_test.yaml @@ -32,4 +32,3 @@ spec: - containerPort: 9991 name: xgboostjob-port imagePullPolicy: Always - diff --git a/examples/xgboost/xgboost-dist/README.md b/examples/xgboost/xgboost-dist/README.md index 506f6230ec..4234e3ebbc 100644 --- a/examples/xgboost/xgboost-dist/README.md +++ b/examples/xgboost/xgboost-dist/README.md @@ -9,10 +9,10 @@ User can extend provided data reader to read data from distributed data storage **Configure the job runtime via Yaml file** The following files are available to setup distributed XGBoost computation runtime - + To store the model in OSS: -* xgboostjob_v1_iris_train.yaml +* xgboostjob_v1_iris_train.yaml * xgboostjob_v1_iris_predict.yaml To store the model in local path: @@ -160,7 +160,7 @@ Kind: XGBoostJob Metadata: Creation Timestamp: 2019-06-27T06:06:53Z Generation: 8 - Resource Version: 394523 + Resource Version: 394523 UID: c2a04cbc-98a1-11e9-bbab-080027dfbfe2 Spec: Run Policy: @@ -249,8 +249,8 @@ Events: **Start the distributed XGBoost train to store the model locally** Before proceeding with training we will create a PVC to store the model trained. -Creating pvc : -create a yaml file with the below content +Creating pvc : +create a yaml file with the below content pvc.yaml ``` apiVersion: v1 @@ -268,13 +268,13 @@ spec: ``` kubectl create -f pvc.yaml ``` -Note: +Note: * Please use the storage class which supports ReadWriteMany. The example yaml above uses glusterfs * Mention model_storage_type=local and model_path accordingly( In the example /tmp/xgboost_model/2 is used ) in xgboostjob_v1_iris_train_local.yaml and xgboostjob_v1_iris_predict_local.yaml" -Now start the distributed XGBoost train. +Now start the distributed XGBoost train. ``` kubectl create -f xgboostjob_v1_iris_train_local.yaml ``` @@ -382,7 +382,7 @@ status: Master: succeeded: 1 Worker: - succeeded: 2 + succeeded: 2 ``` **Start the distributed XGBoost job predict** ``` diff --git a/examples/xgboost/xgboost-dist/local_test.py b/examples/xgboost/xgboost-dist/local_test.py index 17a80f2c61..16483df181 100644 --- a/examples/xgboost/xgboost-dist/local_test.py +++ b/examples/xgboost/xgboost-dist/local_test.py @@ -18,8 +18,10 @@ import numpy as np from sklearn.metrics import precision_score -from utils import dump_model, read_model, read_predict_data, read_train_data - +from utils import dump_model +from utils import read_model +from utils import read_predict_data +from utils import read_train_data import xgboost as xgb logger = logging.getLogger(__name__) @@ -35,8 +37,13 @@ def test_train_model(): place = "/tmp/data" dmatrix = read_train_data(rank, world_size, place) - param_xgboost_default = {'max_depth': 2, 'eta': 1, 'silent': 1, - 'objective': 'multi:softprob', 'num_class': 3} + param_xgboost_default = { + "max_depth": 2, + "eta": 1, + "silent": 1, + "objective": "multi:softprob", + "num_class": 3, + } booster = xgb.train(param_xgboost_default, dtrain=dmatrix) @@ -57,7 +64,7 @@ def test_model_predict(booster): preds = booster.predict(dmatrix) best_preds = np.asarray([np.argmax(line) for line in preds]) - score = precision_score(y_test, best_preds, average='macro') + score = precision_score(y_test, best_preds, average="macro") assert score > 0.99 @@ -90,9 +97,9 @@ def run_test(): logging.info("Finish the local test") -if __name__ == '__main__': +if __name__ == "__main__": - logging.basicConfig(format='%(message)s') + logging.basicConfig(format="%(message)s") logging.getLogger().setLevel(logging.INFO) run_test() diff --git a/examples/xgboost/xgboost-dist/main.py b/examples/xgboost/xgboost-dist/main.py index fbdc012326..f11227e38d 100644 --- a/examples/xgboost/xgboost-dist/main.py +++ b/examples/xgboost/xgboost-dist/main.py @@ -21,10 +21,10 @@ def main(args): model_storage_type = args.model_storage_type - if (model_storage_type == "local" or model_storage_type == "oss"): - print ( "The storage type is " + model_storage_type) + if model_storage_type == "local" or model_storage_type == "oss": + print("The storage type is " + model_storage_type) else: - raise Exception("Only supports storage types like local and OSS") + raise Exception("Only supports storage types like local and OSS") if args.job_type == "Predict": logging.info("starting the predict job") @@ -45,50 +45,37 @@ def main(args): logging.info("Finish distributed XGBoost job") -if __name__ == '__main__': +if __name__ == "__main__": parser = argparse.ArgumentParser() + parser.add_argument("--job_type", help="Train, Predict, All", required=True) parser.add_argument( - '--job_type', - help="Train, Predict, All", - required=True - ) + "--xgboost_parameter", + help="XGBoost model parameter like: objective, number_class", + ) parser.add_argument( - '--xgboost_parameter', - help='XGBoost model parameter like: objective, number_class', - ) + "--n_estimators", help="Number of trees in the model", type=int, default=1000 + ) parser.add_argument( - '--n_estimators', - help='Number of trees in the model', - type=int, - default=1000 - ) + "--learning_rate", help="Learning rate for the model", default=0.1 + ) parser.add_argument( - '--learning_rate', - help='Learning rate for the model', - default=0.1 - ) + "--early_stopping_rounds", + help="XGBoost argument for stopping early", + default=50, + ) parser.add_argument( - '--early_stopping_rounds', - help='XGBoost argument for stopping early', - default=50 - ) + "--model_path", help="place to store model", default="/tmp/xgboost_model" + ) parser.add_argument( - '--model_path', - help='place to store model', - default="/tmp/xgboost_model" - ) + "--model_storage_type", help="place to store the model", default="oss" + ) parser.add_argument( - '--model_storage_type', - help='place to store the model', - default="oss" - ) - parser.add_argument( - '--oss_param', - help='oss parameter if you choose the model storage as OSS type', - ) + "--oss_param", + help="oss parameter if you choose the model storage as OSS type", + ) - logging.basicConfig(format='%(message)s') + logging.basicConfig(format="%(message)s") logging.getLogger().setLevel(logging.INFO) main_args = parser.parse_args() main(main_args) diff --git a/examples/xgboost/xgboost-dist/predict.py b/examples/xgboost/xgboost-dist/predict.py index 91dd29cc11..9de602315f 100644 --- a/examples/xgboost/xgboost-dist/predict.py +++ b/examples/xgboost/xgboost-dist/predict.py @@ -14,7 +14,9 @@ import numpy as np from sklearn.metrics import precision_score -from utils import extract_xgbooost_cluster_env, read_model, read_predict_data +from utils import extract_xgbooost_cluster_env +from utils import read_model +from utils import read_predict_data def predict(args): @@ -34,6 +36,6 @@ def predict(args): preds = booster.predict(dmatrix) best_preds = np.asarray([np.argmax(line) for line in preds]) - score = precision_score(y_test, best_preds, average='macro') + score = precision_score(y_test, best_preds, average="macro") logging.info("Predict accuracy: %f", score) diff --git a/examples/xgboost/xgboost-dist/tracker.py b/examples/xgboost/xgboost-dist/tracker.py index 1074f594ae..c2e17cb0f0 100644 --- a/examples/xgboost/xgboost-dist/tracker.py +++ b/examples/xgboost/xgboost-dist/tracker.py @@ -12,6 +12,7 @@ https://github.com/dmlc/dmlc-core/blob/master/LICENSE No code modified or added except for this explanatory comment. """ + # pylint: disable=invalid-name, missing-docstring, too-many-arguments # pylint: disable=too-many-locals # pylint: disable=too-many-branches, too-many-statements @@ -24,14 +25,15 @@ import struct import subprocess import sys -import time from threading import Thread +import time class ExSocket(object): """ Extension of socket to handle recv and send of special data """ + def __init__(self, sock): self.sock = sock @@ -42,13 +44,13 @@ def recvall(self, nbytes): chunk = self.sock.recv(min(nbytes - nread, 1024)) nread += len(chunk) res.append(chunk) - return b''.join(res) + return b"".join(res) def recvint(self): - return struct.unpack('@i', self.recvall(4))[0] + return struct.unpack("@i", self.recvall(4))[0] def sendint(self, n): - self.sock.sendall(struct.pack('@i', n)) + self.sock.sendall(struct.pack("@i", n)) def sendstr(self, s): self.sendint(len(s)) @@ -60,7 +62,7 @@ def recvstr(self): # magic number used to verify existence of data -kMagic = 0xff99 +kMagic = 0xFF99 def get_some_ip(host): @@ -77,8 +79,7 @@ def __init__(self, sock, s_addr): self.sock = slave self.host = get_some_ip(s_addr[0]) magic = slave.recvint() - assert magic == kMagic, 'invalid magic number=%d from %s' % ( - magic, self.host) + assert magic == kMagic, "invalid magic number=%d from %s" % (magic, self.host) slave.sendint(kMagic) self.rank = slave.recvint() self.world_size = slave.recvint() @@ -90,7 +91,7 @@ def __init__(self, sock, s_addr): def decide_rank(self, job_map): if self.rank >= 0: return self.rank - if self.jobid != 'NULL' and self.jobid in job_map: + if self.jobid != "NULL" and self.jobid in job_map: return job_map[self.jobid] return -1 @@ -156,6 +157,7 @@ class RabitTracker(object): """ tracker for rabit """ + def __init__(self, hostIP, nslave, port=9091, port_end=9999): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) for port in range(port, port_end): @@ -175,7 +177,7 @@ def __init__(self, hostIP, nslave, port=9091, port_end=9999): self.start_time = None self.end_time = None self.nslave = nslave - logging.info('start listen on %s:%d', hostIP, self.port) + logging.info("start listen on %s:%d", hostIP, self.port) def __del__(self): self.sock.close() @@ -197,8 +199,7 @@ def slave_envs(self): get enviroment variables for slaves can be passed in as args or envs """ - return {'DMLC_TRACKER_URI': self.hostIP, - 'DMLC_TRACKER_PORT': self.port} + return {"DMLC_TRACKER_URI": self.hostIP, "DMLC_TRACKER_PORT": self.port} def get_tree(self, nslave): tree_map = {} @@ -284,20 +285,20 @@ def accept_slaves(self, nslave): while len(shutdown) != nslave: fd, s_addr = self.sock.accept() s = SlaveEntry(fd, s_addr) - if s.cmd == 'print': + if s.cmd == "print": msg = s.sock.recvstr() logging.info(msg.strip()) continue - if s.cmd == 'shutdown': + if s.cmd == "shutdown": assert s.rank >= 0 and s.rank not in shutdown assert s.rank not in wait_conn shutdown[s.rank] = s - logging.debug('Recieve %s signal from %d', s.cmd, s.rank) + logging.debug("Recieve %s signal from %d", s.cmd, s.rank) continue - assert s.cmd == 'start' or s.cmd == 'recover' + assert s.cmd == "start" or s.cmd == "recover" # lazily initialize the slaves if tree_map is None: - assert s.cmd == 'start' + assert s.cmd == "start" if s.world_size > 0: nslave = s.world_size tree_map, parent_map, ring_map = self.get_link_map(nslave) @@ -305,7 +306,7 @@ def accept_slaves(self, nslave): todo_nodes = list(range(nslave)) else: assert s.world_size == -1 or s.world_size == nslave - if s.cmd == 'recover': + if s.cmd == "recover": assert s.rank >= 0 rank = s.decide_rank(job_map) @@ -317,34 +318,39 @@ def accept_slaves(self, nslave): pending.sort(key=lambda x: x.host) for s in pending: rank = todo_nodes.pop(0) - if s.jobid != 'NULL': + if s.jobid != "NULL": job_map[s.jobid] = rank - s.assign_rank(rank, wait_conn, tree_map, parent_map, - ring_map) + s.assign_rank(rank, wait_conn, tree_map, parent_map, ring_map) if s.wait_accept > 0: wait_conn[rank] = s - logging.debug('Recieve %s signal from %s; ' - 'assign rank %d', s.cmd, s.host, s.rank) + logging.debug( + "Recieve %s signal from %s; " "assign rank %d", + s.cmd, + s.host, + s.rank, + ) if len(todo_nodes) == 0: - logging.info('@tracker All of %d nodes getting started', - nslave) + logging.info("@tracker All of %d nodes getting started", nslave) self.start_time = time.time() else: s.assign_rank(rank, wait_conn, tree_map, parent_map, ring_map) - logging.debug('Recieve %s signal from %d', s.cmd, s.rank) + logging.debug("Recieve %s signal from %d", s.cmd, s.rank) if s.wait_accept > 0: wait_conn[rank] = s logging.info("worker(ip_address=%s) connected!" % get_some_ip(s_addr[0])) - logging.info('@tracker All nodes finishes job') + logging.info("@tracker All nodes finishes job") self.end_time = time.time() - logging.info('@tracker %s secs between node start and job finish', - str(self.end_time - self.start_time)) + logging.info( + "@tracker %s secs between node start and job finish", + str(self.end_time - self.start_time), + ) def start(self, nslave): def run(): self.accept_slaves(nslave) + self.thread = Thread(target=run, args=()) self.thread.setDaemon(True) self.thread.start() @@ -358,6 +364,7 @@ class PSTracker(object): """ Tracker module for PS """ + def __init__(self, hostIP, cmd, port=9091, port_end=9999, envs=None): """ Starts the PS scheduler @@ -370,7 +377,7 @@ def __init__(self, hostIP, cmd, port=9091, port_end=9999, envs=None): sock = socket.socket(get_family(hostIP), socket.SOCK_STREAM) for port in range(port, port_end): try: - sock.bind(('', port)) + sock.bind(("", port)) self.port = port sock.close() break @@ -378,14 +385,15 @@ def __init__(self, hostIP, cmd, port=9091, port_end=9999, envs=None): continue env = os.environ.copy() - env['DMLC_ROLE'] = 'scheduler' - env['DMLC_PS_ROOT_URI'] = str(self.hostIP) - env['DMLC_PS_ROOT_PORT'] = str(self.port) + env["DMLC_ROLE"] = "scheduler" + env["DMLC_PS_ROOT_URI"] = str(self.hostIP) + env["DMLC_PS_ROOT_PORT"] = str(self.port) for k, v in envs.items(): env[k] = str(v) self.thread = Thread( - target=(lambda: subprocess.check_call(self.cmd, env=env, - shell=True)), args=()) + target=(lambda: subprocess.check_call(self.cmd, env=env, shell=True)), + args=(), + ) self.thread.setDaemon(True) self.thread.start() @@ -398,38 +406,38 @@ def slave_envs(self): if self.cmd is None: return {} else: - return {'DMLC_PS_ROOT_URI': self.hostIP, - 'DMLC_PS_ROOT_PORT': self.port} + return {"DMLC_PS_ROOT_URI": self.hostIP, "DMLC_PS_ROOT_PORT": self.port} def get_host_ip(hostIP=None): - if hostIP is None or hostIP == 'auto': - hostIP = 'ip' + if hostIP is None or hostIP == "auto": + hostIP = "ip" - if hostIP == 'dns': + if hostIP == "dns": hostIP = socket.getfqdn() - elif hostIP == 'ip': + elif hostIP == "ip": from socket import gaierror + try: hostIP = socket.gethostbyname(socket.getfqdn()) except gaierror: - logging.warn('gethostbyname(socket.getfqdn()) failed... trying on ' - 'hostname()') + logging.warn( + "gethostbyname(socket.getfqdn()) failed... trying on " "hostname()" + ) hostIP = socket.gethostbyname(socket.gethostname()) if hostIP.startswith("127."): s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # doesn't have to be reachable - s.connect(('10.255.255.255', 1)) + s.connect(("10.255.255.255", 1)) hostIP = s.getsockname()[0] return hostIP -def submit(nworker, nserver, fun_submit, hostIP='auto', pscmd=None): +def submit(nworker, nserver, fun_submit, hostIP="auto", pscmd=None): if nserver == 0: pscmd = None - envs = {'DMLC_NUM_WORKER': nworker, - 'DMLC_NUM_SERVER': nserver} + envs = {"DMLC_NUM_WORKER": nworker, "DMLC_NUM_SERVER": nserver} hostIP = get_host_ip(hostIP) if nserver == 0: @@ -453,42 +461,56 @@ def start_rabit_tracker(args): ---------- args: arguments to start the rabit tracker. """ - envs = {'DMLC_NUM_WORKER': args.num_workers, - 'DMLC_NUM_SERVER': args.num_servers} - rabit = RabitTracker(hostIP=get_host_ip(args.host_ip), - nslave=args.num_workers) + envs = {"DMLC_NUM_WORKER": args.num_workers, "DMLC_NUM_SERVER": args.num_servers} + rabit = RabitTracker(hostIP=get_host_ip(args.host_ip), nslave=args.num_workers) envs.update(rabit.slave_envs()) rabit.start(args.num_workers) - sys.stdout.write('DMLC_TRACKER_ENV_START\n') + sys.stdout.write("DMLC_TRACKER_ENV_START\n") # simply write configuration to stdout for k, v in envs.items(): - sys.stdout.write('%s=%s\n' % (k, str(v))) - sys.stdout.write('DMLC_TRACKER_ENV_END\n') + sys.stdout.write("%s=%s\n" % (k, str(v))) + sys.stdout.write("DMLC_TRACKER_ENV_END\n") sys.stdout.flush() rabit.join() def main(): """Main function if tracker is executed in standalone mode.""" - parser = argparse.ArgumentParser(description='Rabit Tracker start.') - parser.add_argument('--num-workers', required=True, type=int, - help='Number of worker proccess to be launched.') - parser.add_argument('--num-servers', default=0, type=int, - help='Number of server process to be launched. Only ' - 'used in PS jobs.') - parser.add_argument('--host-ip', default=None, type=str, - help=('Host IP addressed, this is only needed ' + - 'if the host IP cannot be automatically guessed.' - )) - parser.add_argument('--log-level', default='INFO', type=str, - choices=['INFO', 'DEBUG'], - help='Logging level of the logger.') + parser = argparse.ArgumentParser(description="Rabit Tracker start.") + parser.add_argument( + "--num-workers", + required=True, + type=int, + help="Number of worker proccess to be launched.", + ) + parser.add_argument( + "--num-servers", + default=0, + type=int, + help="Number of server process to be launched. Only " "used in PS jobs.", + ) + parser.add_argument( + "--host-ip", + default=None, + type=str, + help=( + "Host IP addressed, this is only needed " + + "if the host IP cannot be automatically guessed." + ), + ) + parser.add_argument( + "--log-level", + default="INFO", + type=str, + choices=["INFO", "DEBUG"], + help="Logging level of the logger.", + ) args = parser.parse_args() - fmt = '%(asctime)s %(levelname)s %(message)s' - if args.log_level == 'INFO': + fmt = "%(asctime)s %(levelname)s %(message)s" + if args.log_level == "INFO": level = logging.INFO - elif args.log_level == 'DEBUG': + elif args.log_level == "DEBUG": level = logging.DEBUG else: raise RuntimeError("Unknown logging level %s" % args.log_level) @@ -498,9 +520,8 @@ def main(): if args.num_servers == 0: start_rabit_tracker(args) else: - raise RuntimeError("Do not yet support start ps tracker in standalone " - "mode.") + raise RuntimeError("Do not yet support start ps tracker in standalone " "mode.") if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/examples/xgboost/xgboost-dist/train.py b/examples/xgboost/xgboost-dist/train.py index f1da6a2cf5..45837ff7ed 100644 --- a/examples/xgboost/xgboost-dist/train.py +++ b/examples/xgboost/xgboost-dist/train.py @@ -15,8 +15,8 @@ import traceback from tracker import RabitTracker -from utils import extract_xgbooost_cluster_env, read_train_data - +from utils import extract_xgbooost_cluster_env +from utils import read_train_data import xgboost as xgb logger = logging.getLogger(__name__) @@ -36,25 +36,26 @@ def train(args): if rank == 0: logger.info("start the master node") - rabit = RabitTracker(hostIP="0.0.0.0", nslave=world_size, - port=port, port_end=port + 1) + rabit = RabitTracker( + hostIP="0.0.0.0", nslave=world_size, port=port, port_end=port + 1 + ) rabit.start(world_size) rabit_tracker = rabit - logger.info('###### RabitTracker Setup Finished ######') + logger.info("###### RabitTracker Setup Finished ######") envs = [ - 'DMLC_NUM_WORKER=%d' % world_size, - 'DMLC_TRACKER_URI=%s' % addr, - 'DMLC_TRACKER_PORT=%d' % port, - 'DMLC_TASK_ID=%d' % rank + "DMLC_NUM_WORKER=%d" % world_size, + "DMLC_TRACKER_URI=%s" % addr, + "DMLC_TRACKER_PORT=%d" % port, + "DMLC_TASK_ID=%d" % rank, ] - logger.info('##### Rabit rank setup with below envs #####') + logger.info("##### Rabit rank setup with below envs #####") for i, env in enumerate(envs): logger.info(env) envs[i] = str.encode(env) xgb.rabit.init(envs) - logger.info('##### Rabit rank = %d' % xgb.rabit.get_rank()) + logger.info("##### Rabit rank = %d" % xgb.rabit.get_rank()) rank = xgb.rabit.get_rank() else: @@ -65,8 +66,13 @@ def train(args): kwargs = {} kwargs["dtrain"] = df kwargs["num_boost_round"] = int(args.n_estimators) - param_xgboost_default = {'max_depth': 2, 'eta': 1, 'silent': 1, - 'objective': 'multi:softprob', 'num_class': 3} + param_xgboost_default = { + "max_depth": 2, + "eta": 1, + "silent": 1, + "objective": "multi:softprob", + "num_class": 3, + } kwargs["params"] = param_xgboost_default logging.info("starting to train xgboost at node with rank %d", rank) diff --git a/examples/xgboost/xgboost-dist/utils.py b/examples/xgboost/xgboost-dist/utils.py index c9ac7ddfa2..b396d04a7e 100644 --- a/examples/xgboost/xgboost-dist/utils.py +++ b/examples/xgboost/xgboost-dist/utils.py @@ -19,7 +19,6 @@ import oss2 import pandas as pd from sklearn import datasets - import xgboost as xgb logger = logging.getLogger(__name__) @@ -38,9 +37,14 @@ def extract_xgbooost_cluster_env(): rank = int(os.environ.get("RANK", "{}")) world_size = int(os.environ.get("WORLD_SIZE", "{}")) - logger.info("extract the Rabit env from cluster :" - " %s, port: %d, rank: %d, word_size: %d ", - master_addr, master_port, rank, world_size) + logger.info( + "extract the Rabit env from cluster :" + " %s, port: %d, rank: %d, word_size: %d ", + master_addr, + master_port, + rank, + world_size, + ) return master_addr, master_port, rank, world_size @@ -67,8 +71,7 @@ def read_train_data(rank, num_workers, path): y = pd.DataFrame(y) dtrain = xgb.DMatrix(data=x, label=y) - logging.info("Read data from IRIS data source with range from %d to %d", - start, end) + logging.info("Read data from IRIS data source with range from %d to %d", start, end) return dtrain @@ -93,8 +96,7 @@ def read_predict_data(rank, num_workers, path): x = pd.DataFrame(x) y = pd.DataFrame(y) - logging.info("Read data from IRIS datasource with range from %d to %d", - start, end) + logging.info("Read data from IRIS datasource with range from %d to %d", start, end) predict = xgb.DMatrix(x, label=y) @@ -109,7 +111,7 @@ def get_range_data(num_row, rank, num_workers): :param num_workers: total number of workers :return: begin and end range of input matrix """ - num_per_partition = int(num_row/num_workers) + num_per_partition = int(num_row / num_workers) x_start = rank * num_per_partition x_end = (rank + 1) * num_per_partition @@ -142,7 +144,7 @@ def dump_model(model, type, model_path, args): if oss_param is None: raise Exception("Please config oss parameter to store model") - oss_param['path'] = args.model_path + oss_param["path"] = args.model_path dump_model_to_oss(oss_param, model) logging.info("Dump model into oss place %s", args.model_path) @@ -168,7 +170,7 @@ def read_model(type, model_path, args): raise Exception("Please config oss to read model") return False - oss_param['path'] = args.model_path + oss_param["path"] = args.model_path model = read_model_from_oss(oss_param) logging.info("read model from oss place %s", model_path) @@ -184,28 +186,27 @@ def dump_model_to_oss(oss_parameters, booster): :return: True if stored procedure is success """ """export model into oss""" - model_fname = os.path.join(tempfile.mkdtemp(), 'model') - text_model_fname = os.path.join(tempfile.mkdtemp(), 'model.text') - feature_importance = os.path.join(tempfile.mkdtemp(), - 'feature_importance.json') + model_fname = os.path.join(tempfile.mkdtemp(), "model") + text_model_fname = os.path.join(tempfile.mkdtemp(), "model.text") + feature_importance = os.path.join(tempfile.mkdtemp(), "feature_importance.json") - oss_path = oss_parameters['path'] - logger.info('---- export model ----') + oss_path = oss_parameters["path"] + logger.info("---- export model ----") booster.save_model(model_fname) booster.dump_model(text_model_fname) # format output model fscore_dict = booster.get_fscore() - with open(feature_importance, 'w') as file: + with open(feature_importance, "w") as file: file.write(json.dumps(fscore_dict)) - logger.info('---- chief dump model successfully!') + logger.info("---- chief dump model successfully!") if os.path.exists(model_fname): - logger.info('---- Upload Model start...') + logger.info("---- Upload Model start...") - while oss_path[-1] == '/': + while oss_path[-1] == "/": oss_path = oss_path[:-1] upload_oss(oss_parameters, model_fname, oss_path) - aux_path = oss_path + '_dir/' + aux_path = oss_path + "_dir/" upload_oss(oss_parameters, model_fname, aux_path) upload_oss(oss_parameters, text_model_fname, aux_path) upload_oss(oss_parameters, feature_importance, aux_path) @@ -224,20 +225,22 @@ def upload_oss(kw, local_file, oss_path): :param oss_path: remote place of OSS :return: True if the procedure is success """ - if oss_path[-1] == '/': - oss_path = '%s%s' % (oss_path, os.path.basename(local_file)) + if oss_path[-1] == "/": + oss_path = "%s%s" % (oss_path, os.path.basename(local_file)) - auth = oss2.Auth(kw['access_id'], kw['access_key']) - bucket = kw['access_bucket'] - bkt = oss2.Bucket(auth=auth, endpoint=kw['endpoint'], bucket_name=bucket) + auth = oss2.Auth(kw["access_id"], kw["access_key"]) + bucket = kw["access_bucket"] + bkt = oss2.Bucket(auth=auth, endpoint=kw["endpoint"], bucket_name=bucket) try: bkt.put_object_from_file(key=oss_path, filename=local_file) - logger.info("upload %s to %s successfully!" % - (os.path.abspath(local_file), oss_path)) + logger.info( + "upload %s to %s successfully!" % (os.path.abspath(local_file), oss_path) + ) except Exception(): - raise ValueError('upload %s to %s failed' % - (os.path.abspath(local_file), oss_path)) + raise ValueError( + "upload %s to %s failed" % (os.path.abspath(local_file), oss_path) + ) def read_model_from_oss(kw): @@ -246,12 +249,12 @@ def read_model_from_oss(kw): :param kw: OSS parameter :return: XGBoost booster model """ - auth = oss2.Auth(kw['access_id'], kw['access_key']) - bucket = kw['access_bucket'] - bkt = oss2.Bucket(auth=auth, endpoint=kw['endpoint'], bucket_name=bucket) + auth = oss2.Auth(kw["access_id"], kw["access_key"]) + bucket = kw["access_bucket"] + bkt = oss2.Bucket(auth=auth, endpoint=kw["endpoint"], bucket_name=bucket) oss_path = kw["path"] - temp_model_fname = os.path.join(tempfile.mkdtemp(), 'local_model') + temp_model_fname = os.path.join(tempfile.mkdtemp(), "local_model") try: bkt.get_object_to_file(key=oss_path, filename=temp_model_fname) logger.info("success to load model from oss %s", oss_path) @@ -259,7 +262,7 @@ def read_model_from_oss(kw): logging.error("fail to load model: " + e) raise Exception("fail to load model from oss %s", oss_path) - bst = xgb.Booster({'nthread': 2}) # init model + bst = xgb.Booster({"nthread": 2}) # init model bst.load_model(temp_model_fname) @@ -283,10 +286,9 @@ def parse_parameters(input, splitter_between, splitter_in): conf = kv.split(splitter_in) key = conf[0].strip(" ") if key == "objective" or key == "endpoint": - value = conf[1].strip("'") + ":" + conf[2].strip("'") + value = conf[1].strip("'") + ":" + conf[2].strip("'") else: value = conf[1] confs[key] = value return confs - diff --git a/examples/xgboost/xgboost-dist/xgboostjob_v1_iris_predict.yaml b/examples/xgboost/xgboost-dist/xgboostjob_v1_iris_predict.yaml index 218219c818..9f0c773b1f 100644 --- a/examples/xgboost/xgboost-dist/xgboostjob_v1_iris_predict.yaml +++ b/examples/xgboost/xgboost-dist/xgboostjob_v1_iris_predict.yaml @@ -38,5 +38,3 @@ spec: - --model_path=autoAI/xgb-opt/2 - --model_storage_type=oss - --oss_param=unknown - - diff --git a/examples/xgboost/xgboost-dist/xgboostjob_v1_iris_train.yaml b/examples/xgboost/xgboost-dist/xgboostjob_v1_iris_train.yaml index 3600a0a7f2..5824ec056d 100644 --- a/examples/xgboost/xgboost-dist/xgboostjob_v1_iris_train.yaml +++ b/examples/xgboost/xgboost-dist/xgboostjob_v1_iris_train.yaml @@ -40,5 +40,3 @@ spec: - --xgboost_parameter="objective:multi:softprob,num_class:3" - --n_estimators=10 - --learning_rate=0.1 - - diff --git a/hack/python-sdk/post_gen.py b/hack/python-sdk/post_gen.py index 07b15a4468..c54767ba79 100755 --- a/hack/python-sdk/post_gen.py +++ b/hack/python-sdk/post_gen.py @@ -18,8 +18,8 @@ This script is used for updating generated SDK files. """ -import os import fileinput +import os import re __replacements = [ diff --git a/sdk/python/Dockerfile.conformance b/sdk/python/Dockerfile.conformance index 4cdc6f3bd6..2af8a23fbe 100644 --- a/sdk/python/Dockerfile.conformance +++ b/sdk/python/Dockerfile.conformance @@ -27,4 +27,4 @@ RUN chmod +x run.sh RUN pip install pytest RUN python -m pip install -e . -ENTRYPOINT [ "./run.sh" ] \ No newline at end of file +ENTRYPOINT [ "./run.sh" ] diff --git a/sdk/python/conformance/run.sh b/sdk/python/conformance/run.sh index 815720a19f..a73d998a9b 100644 --- a/sdk/python/conformance/run.sh +++ b/sdk/python/conformance/run.sh @@ -8,4 +8,4 @@ touch /tmp/training-operator-conformance.done echo "Done..." # Keep the container running so the test logs can be downloaded. -while true; do sleep 10000; done \ No newline at end of file +while true; do sleep 10000; done diff --git a/sdk/python/kubeflow/storage_initializer/abstract_dataset_provider.py b/sdk/python/kubeflow/storage_initializer/abstract_dataset_provider.py index 3f75faf0a2..bdae751205 100644 --- a/sdk/python/kubeflow/storage_initializer/abstract_dataset_provider.py +++ b/sdk/python/kubeflow/storage_initializer/abstract_dataset_provider.py @@ -1,4 +1,5 @@ -from abc import ABC, abstractmethod +from abc import ABC +from abc import abstractmethod class datasetProvider(ABC): diff --git a/sdk/python/kubeflow/storage_initializer/abstract_model_provider.py b/sdk/python/kubeflow/storage_initializer/abstract_model_provider.py index 392478a346..88c9c2af4d 100644 --- a/sdk/python/kubeflow/storage_initializer/abstract_model_provider.py +++ b/sdk/python/kubeflow/storage_initializer/abstract_model_provider.py @@ -1,4 +1,5 @@ -from abc import ABC, abstractmethod +from abc import ABC +from abc import abstractmethod class modelProvider(ABC): diff --git a/sdk/python/kubeflow/storage_initializer/hugging_face.py b/sdk/python/kubeflow/storage_initializer/hugging_face.py index 4b5b0794a9..0fbf511350 100644 --- a/sdk/python/kubeflow/storage_initializer/hugging_face.py +++ b/sdk/python/kubeflow/storage_initializer/hugging_face.py @@ -1,16 +1,17 @@ -import logging +from dataclasses import dataclass +from dataclasses import field import json -from typing import Union, Optional -from dataclasses import dataclass, field +import logging +from typing import Optional, Union from urllib.parse import urlparse -import transformers from peft import LoraConfig +import transformers -from .constants import VOLUME_PATH_DATASET, VOLUME_PATH_MODEL -from .abstract_model_provider import modelProvider from .abstract_dataset_provider import datasetProvider - +from .abstract_model_provider import modelProvider +from .constants import VOLUME_PATH_DATASET +from .constants import VOLUME_PATH_MODEL TRANSFORMER_TYPES = Union[ transformers.AutoModelForSequenceClassification, @@ -96,8 +97,8 @@ def load_config(self, serialised_args): def download_dataset(self): logger.info("Downloading dataset") logger.info("-" * 40) - import huggingface_hub from datasets import load_dataset + import huggingface_hub if self.config.access_token: huggingface_hub.login(self.config.access_token) diff --git a/sdk/python/kubeflow/storage_initializer/s3.py b/sdk/python/kubeflow/storage_initializer/s3.py index 5f60bbc72d..8dbdc7ef4c 100644 --- a/sdk/python/kubeflow/storage_initializer/s3.py +++ b/sdk/python/kubeflow/storage_initializer/s3.py @@ -1,7 +1,9 @@ -from dataclasses import dataclass, field +from dataclasses import dataclass +from dataclasses import field import json import os from urllib.parse import urlparse + from .abstract_dataset_provider import datasetProvider from .constants import VOLUME_PATH_DATASET diff --git a/sdk/python/kubeflow/storage_initializer/storage.py b/sdk/python/kubeflow/storage_initializer/storage.py index f65d9d324c..b1d59f662b 100644 --- a/sdk/python/kubeflow/storage_initializer/storage.py +++ b/sdk/python/kubeflow/storage_initializer/storage.py @@ -1,5 +1,7 @@ import argparse -from .hugging_face import HuggingFace, HuggingFaceDataset + +from .hugging_face import HuggingFace +from .hugging_face import HuggingFaceDataset from .s3 import S3 diff --git a/sdk/python/kubeflow/trainer/hf_llm_training.py b/sdk/python/kubeflow/trainer/hf_llm_training.py index 26dd4fbe0e..e7ad3a0c5f 100644 --- a/sdk/python/kubeflow/trainer/hf_llm_training.py +++ b/sdk/python/kubeflow/trainer/hf_llm_training.py @@ -1,22 +1,21 @@ import argparse -import logging -from urllib.parse import urlparse import json +import logging import os +from urllib.parse import urlparse -from datasets import load_from_disk, Dataset +from datasets import Dataset +from datasets import load_from_disk from datasets.distributed import split_dataset_by_node -from peft import LoraConfig, get_peft_model +from peft import get_peft_model +from peft import LoraConfig import transformers -from transformers import ( - AutoModelForCausalLM, - AutoTokenizer, - AutoModelForImageClassification, - TrainingArguments, - DataCollatorForLanguageModeling, - Trainer, -) - +from transformers import AutoModelForCausalLM +from transformers import AutoModelForImageClassification +from transformers import AutoTokenizer +from transformers import DataCollatorForLanguageModeling +from transformers import Trainer +from transformers import TrainingArguments # Configure logger. log_formatter = logging.Formatter( diff --git a/sdk/python/kubeflow/training/api/training_client.py b/sdk/python/kubeflow/training/api/training_client.py index 972b26829e..dbf27c6f3e 100644 --- a/sdk/python/kubeflow/training/api/training_client.py +++ b/sdk/python/kubeflow/training/api/training_client.py @@ -12,23 +12,22 @@ # See the License for the specific language governing permissions and # limitations under the License. -import multiprocessing -import logging -import time import json -from typing import Optional, Callable, Tuple, List, Dict, Any, Set, Union +import logging +import multiprocessing import queue -from kubernetes import client, config, watch +import time +from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union +from kubeflow.storage_initializer.constants import VOLUME_PATH_DATASET +from kubeflow.storage_initializer.constants import VOLUME_PATH_MODEL from kubeflow.training import models from kubeflow.training.api_client import ApiClient from kubeflow.training.constants import constants from kubeflow.training.utils import utils -from kubeflow.storage_initializer.constants import ( - VOLUME_PATH_DATASET, - VOLUME_PATH_MODEL, -) - +from kubernetes import client +from kubernetes import config +from kubernetes import watch logger = logging.getLogger(__name__) @@ -176,11 +175,16 @@ def train( "Train API dependencies not installed. " + "Run: pip install -U 'kubeflow-training[huggingface]' " ) + + # fmt: off + + from kubeflow.storage_initializer.hugging_face import \ + HuggingFaceDatasetParams + from kubeflow.storage_initializer.hugging_face import \ + HuggingFaceModelParams from kubeflow.storage_initializer.s3 import S3DatasetParams - from kubeflow.storage_initializer.hugging_face import ( - HuggingFaceModelParams, - HuggingFaceDatasetParams, - ) + + # fmt: on print( "Thank you for using `train` API for LLMs fine-tuning. This feature is in alpha stage " diff --git a/sdk/python/kubeflow/training/api/training_client_test.py b/sdk/python/kubeflow/training/api/training_client_test.py index 04187ac20c..90ae04637f 100644 --- a/sdk/python/kubeflow/training/api/training_client_test.py +++ b/sdk/python/kubeflow/training/api/training_client_test.py @@ -1,21 +1,21 @@ import multiprocessing -import pytest -from unittest.mock import patch, Mock - from typing import Optional -from kubeflow.training import TrainingClient -from kubeflow.training import KubeflowOrgV1ReplicaSpec +from unittest.mock import Mock +from unittest.mock import patch + +from kubeflow.training import constants from kubeflow.training import KubeflowOrgV1PyTorchJob from kubeflow.training import KubeflowOrgV1PyTorchJobSpec +from kubeflow.training import KubeflowOrgV1ReplicaSpec from kubeflow.training import KubeflowOrgV1RunPolicy from kubeflow.training import KubeflowOrgV1SchedulingPolicy -from kubeflow.training import constants - -from kubernetes.client import V1PodTemplateSpec +from kubeflow.training import TrainingClient +from kubernetes.client import V1Container from kubernetes.client import V1ObjectMeta from kubernetes.client import V1PodSpec -from kubernetes.client import V1Container +from kubernetes.client import V1PodTemplateSpec from kubernetes.client import V1ResourceRequirements +import pytest LIST_RESPONSE = [{"metadata": {"name": "Dummy V1PodList"}}] TEST_NAME = "test" diff --git a/sdk/python/kubeflow/training/constants/constants.py b/sdk/python/kubeflow/training/constants/constants.py index 506edd267a..e258105e0f 100644 --- a/sdk/python/kubeflow/training/constants/constants.py +++ b/sdk/python/kubeflow/training/constants/constants.py @@ -12,9 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -from kubeflow.training import models -from typing import Union, Dict +from typing import Dict, Union + from kubeflow.storage_initializer.constants import INIT_CONTAINER_MOUNT_PATH +from kubeflow.training import models # How long to wait in seconds for requests to the Kubernetes API Server. DEFAULT_TIMEOUT = 120 diff --git a/sdk/python/kubeflow/training/utils/utils.py b/sdk/python/kubeflow/training/utils/utils.py index 04665951de..0c112c9003 100644 --- a/sdk/python/kubeflow/training/utils/utils.py +++ b/sdk/python/kubeflow/training/utils/utils.py @@ -13,18 +13,17 @@ # limitations under the License. from datetime import datetime -import os -import logging -import textwrap import inspect -from typing import Optional, Callable, List, Dict, Any, Tuple, Union import json -import threading +import logging +import os import queue +import textwrap +import threading +from typing import Any, Callable, Dict, List, Optional, Tuple, Union -from kubeflow.training.constants import constants from kubeflow.training import models - +from kubeflow.training.constants import constants logger = logging.getLogger(__name__) diff --git a/test_job/README.md b/test_job/README.md index e1055a97b6..e92ab334d2 100644 --- a/test_job/README.md +++ b/test_job/README.md @@ -1,9 +1,9 @@ ## Test Job Controller -This is a Test Job Controller example. As you can see, we have job crd definition under `apis/test_job/v1`. +This is a Test Job Controller example. As you can see, we have job crd definition under `apis/test_job/v1`. [code-generator](https://github.com/kubernetes/code-generator) generate deepcopy, clientset and other libraries. -`controler.v1/test_job/test_job_controller` defines a struct `TestJobController` which implements [commonv1.ControllerInterface](../pkg/apis/common/v1/interface.go) +`controler.v1/test_job/test_job_controller` defines a struct `TestJobController` which implements [commonv1.ControllerInterface](../pkg/apis/common/v1/interface.go) ```yaml ├── README.md @@ -26,4 +26,4 @@ This is a Test Job Controller example. As you can see, we have job crd definitio │   └── test_job │   └── test_job_controller.go └── test_util -``` \ No newline at end of file +``` diff --git a/third_party/library/license.txt b/third_party/library/license.txt index 5623527697..865d93aab1 100644 --- a/third_party/library/license.txt +++ b/third_party/library/license.txt @@ -7037,4 +7037,3 @@ kubernetes/kubernetes Apache License 2.0 https://github.com/kubernetes/kuberne WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. -