Skip to content

Commit

Permalink
Add Ray Cluster Upgrade test for Ray Job Long Running scenarios (#614)
Browse files Browse the repository at this point in the history
  • Loading branch information
Srihari1192 authored Aug 9, 2024
1 parent b026fae commit 4017d8b
Show file tree
Hide file tree
Showing 2 changed files with 247 additions and 0 deletions.
72 changes: 72 additions & 0 deletions tests/e2e/mnist_sleep.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# Copyright 2022 IBM, Red Hat
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import time
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from torchvision import datasets, transforms


# Define a simple neural network
class NeuralNetwork(nn.Module):
def __init__(self):
super(NeuralNetwork, self).__init__()
self.flatten = nn.Flatten()
self.linear_relu_stack = nn.Sequential(
nn.Linear(28 * 28, 512),
nn.ReLU(),
nn.Linear(512, 512),
nn.ReLU(),
nn.Linear(512, 10),
)

def forward(self, x):
x = self.flatten(x)
logits = self.linear_relu_stack(x)
return logits


# Define the training function
def train():
# Sleeping for 24 hours for upgrade test scenario
print("Sleeping for 24 hours before starting the training for upgrade testing...")
time.sleep(24 * 60 * 60)

# Load dataset
transform = transforms.Compose([transforms.ToTensor()])
train_dataset = datasets.FashionMNIST(
root="./data", train=True, download=True, transform=transform
)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)

# Initialize the neural network, loss function, and optimizer
model = NeuralNetwork()
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=0.01)

# Train the model
num_epochs = 3
for epoch in range(num_epochs):
for inputs, labels in train_loader:
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}")


if __name__ == "__main__":
train()
175 changes: 175 additions & 0 deletions tests/upgrade/raycluster_sdk_upgrade_sleep_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
import requests
from time import sleep

from codeflare_sdk import (
Cluster,
ClusterConfiguration,
TokenAuthentication,
get_cluster,
)
from codeflare_sdk.job import RayJobClient

from tests.e2e.support import *


from codeflare_sdk.utils.kube_api_helpers import _kube_api_error_handling

namespace = "test-ns-rayupgrade-sleep"
# Global variables for kueue resources
cluster_queue = "cluster-queue-mnist"
flavor = "default-flavor-mnist"
local_queue = "local-queue-mnist"


# Creates a Ray cluster , submit RayJob mnist script long running
class TestSetupSleepRayJob:
def setup_method(self):
initialize_kubernetes_client(self)
create_namespace_with_name(self, namespace)
try:
create_cluster_queue(self, cluster_queue, flavor)
create_resource_flavor(self, flavor)
create_local_queue(self, cluster_queue, local_queue)
except Exception as e:
delete_namespace(self)
delete_kueue_resources(self)
return _kube_api_error_handling(e)

def test_mnist_ray_cluster_sdk_auth(self):
self.run_mnist_raycluster_sdk_oauth()

def run_mnist_raycluster_sdk_oauth(self):
ray_image = get_ray_image()

auth = TokenAuthentication(
token=run_oc_command(["whoami", "--show-token=true"]),
server=run_oc_command(["whoami", "--show-server=true"]),
skip_tls=True,
)
auth.login()

cluster = Cluster(
ClusterConfiguration(
name="mnist",
namespace=self.namespace,
num_workers=1,
head_cpus=1,
head_memory=4,
worker_cpu_requests=1,
worker_cpu_limits=1,
worker_memory_requests=4,
worker_memory_limits=4,
image=ray_image,
write_to_file=True,
verify_tls=False,
)
)

try:
cluster.up()
cluster.status()
# wait for raycluster to be Ready
cluster.wait_ready()
cluster.status()
# Check cluster details
cluster.details()
# Assert the cluster status is READY
_, ready = cluster.status()
assert ready
submission_id = self.assert_jobsubmit()
print(f"Job submitted successfully, job submission id: ", submission_id)

except Exception as e:
print(f"An unexpected error occurred. Error: ", e)
delete_namespace(self)
delete_kueue_resources(self)
assert False, "Cluster is not ready!"

def assert_jobsubmit(self):
auth_token = run_oc_command(["whoami", "--show-token=true"])
cluster = get_cluster("mnist", namespace)
cluster.details()
ray_dashboard = cluster.cluster_dashboard_uri()
header = {"Authorization": f"Bearer {auth_token}"}
client = RayJobClient(address=ray_dashboard, headers=header, verify=False)

# Submit the job
submission_id = client.submit_job(
entrypoint="python mnist_sleep.py",
runtime_env={
"working_dir": "./tests/e2e/",
"pip": {"packages": ["torchvision==0.12.0"], "pip_check": False},
"env_vars": get_setup_env_variables(),
},
)
print(f"Submitted job with ID: {submission_id}")
done = False
time = 0
timeout = 180
while not done:
status = client.get_job_status(submission_id)
if status.is_terminal():
break
if status == "RUNNING":
print(f"Job is Running: '{status}'")
assert True
break
if not done:
print(status)
if timeout and time >= timeout:
raise TimeoutError(f"job has timed out after waiting {timeout}s")
sleep(5)
time += 5

logs = client.get_job_logs(submission_id)
print(logs)
return submission_id


class TestVerifySleepRayJobRunning:
def setup_method(self):
initialize_kubernetes_client(self)
auth = TokenAuthentication(
token=run_oc_command(["whoami", "--show-token=true"]),
server=run_oc_command(["whoami", "--show-server=true"]),
skip_tls=True,
)
auth.login()
self.namespace = namespace
self.cluster = get_cluster("mnist", self.namespace)
self.cluster_queue = cluster_queue
self.resource_flavor = flavor
if not self.cluster:
raise RuntimeError("TestSetupSleepRayJob needs to be run before this test")

def teardown_method(self):
delete_namespace(self)
delete_kueue_resources(self)

def test_mnist_job_running(self):
client = self.get_ray_job_client(self.cluster)
self.assertJobExists(client, 1)
self.assertJobRunning(client)
self.cluster.down()

def get_ray_job_client(self, cluster):
auth_token = run_oc_command(["whoami", "--show-token=true"])
ray_dashboard = cluster.cluster_dashboard_uri()
header = {"Authorization": f"Bearer {auth_token}"}
return RayJobClient(address=ray_dashboard, headers=header, verify=False)

# Assertions
def assertJobExists(self, client, expectedJobsSize):
job_list = client.list_jobs()
assert len(job_list) == expectedJobsSize

def assertJobRunning(self, client):
job_list = client.list_jobs()
submission_id = job_list[0].submission_id
status = client.get_job_status(submission_id)
if status == "RUNNING":
print(f"Job is Running: '{status}'")
assert True
else:
print(f"Job is not in Running state: '{status}'")
assert False

0 comments on commit 4017d8b

Please sign in to comment.