Skip to content

Commit

Permalink
Refactor selected tests to use fixtures
Browse files Browse the repository at this point in the history
We have the fixtures, let's use them.  Take a more global look at some of the
tests; move a helpful fixture (`ez_pack_task`) to the top-level `conftest.py`,
then reference it where appropriate.

Dedup the lines by over half.
  • Loading branch information
khk-globus authored and LeiGlobus committed Sep 6, 2024
1 parent e8f51c6 commit 7b0e426
Show file tree
Hide file tree
Showing 12 changed files with 231 additions and 332 deletions.
60 changes: 59 additions & 1 deletion RELEASING.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,62 @@ git push
are properly added to the `main` branch.

6. Create a GitHub release from the tag. See [GitHub documentation](https://docs.github.com/en/repositories/releasing-projects-on-github/managing-releases-in-a-repository#creating-a-release)
for instructions.
for instructions.

### DEB/RPM Packaging Workflow

#### Pre-requisites

Before building the packages, ensure that the release itself, either the alpha
or prod versions, is published on PyPI.

Additionally, VPN needs to be enabled for the build page.

#### Build Process

In the future, building the DEB/RPM packages will be a simple one-step button
click of the green **Build** button on the Globus Compute Agent
[build page here](https://builds.globus.org/jenkins/job/BuildGlobusComputeAgentPackages/build?delay=0sec).

As a temporary workaround, we need to add a few lines to manually set some
env variables in our [JenkinsFile](https://github.com/globus/globus-compute/blob/743fa1e398fd40a00efb5880c55e3fa6e47392fc/compute_endpoint/packaging/JenkinsFile#L24) before triggering the build, as detailed below.

1. Git checkout both the current release branch that was recently pushed to
PyPI, ie. ``v2.23.0`` or ``v2.25.0a0`` and the ``build_for_stable`` branch
2. Rebase ``build_for_stable`` on the release branch which should result in
adding the following ~6 lines:

...
env.BRANCH_NAME = scmVars.GIT_BRANCH.replaceFirst(/^.*origin\//, "")
+ env.TAG_NAME = sh(returnStdout: true, script: "git tag --contains | head -1").trim()
env.SOURCE_STASH_NAME = "${UUID.randomUUID()}"
echo "env.BRANCH_NAME = ${env.BRANCH_NAME}"
sh "git clean -fdx"

+ // temporary hack to build for stable
+ sh "git checkout build_for_stable"
+ env.TAG_NAME = "v2.23.0"
+ env.DEFAULT_BRANCH = "build_for_stable"
+
dir("compute_endpoint/packaging/") {
...

3. Change the ``env.TAG_NAME`` above to the current production release version
* Note that ``env.TAG_NAME`` determines whether the build is sent to
the ``unstable`` repo or also to the ``testing`` and ``stable`` ones.
* Example of unstable repo:
* https://downloads.globus.org/globus-connect-server/unstable/rpm/el/9/x86_64/
* Example of stable repo:
* https://downloads.globus.org/globus-connect-server/stable/rpm/el/9/x86_64/
* The logic of whether a release is stable is determined by whether the
package version of Globus Compute Endpoint set in ``version.py`` or
``setup.py`` matches ``env.TAG_NAME`` above. If they are unequal, then
[publishResults.groovy line 85](https://github.com/globusonline/gcs-build-scripts/blob/168617a0ccbb0aee7b3bee04ee67940bbe2a80f6/vars/publishResults.groovy#L85)
will be (``tag`` : v2.23.0) != (``stable_tag`` : v2.23.0a0), where
stable_tag is constructed from the package version of an alpha release.
4. Commit and push your ``build_for_stable`` branch
5. (Access on VPN) Click the [build button here](https://builds.globus.org/jenkins/job/BuildGlobusComputeAgentPackages/build?delay=0sec)
6. Wait 20-30 minutes and confirm that the [build is green](https://builds.globus.org/jenkins/job/BuildGlobusComputeAgentPackages/)
7. For production release, we will have finished the build before the GCS
team, and can notify them that our build is complete. They then will
publish all packages when they finish their builds, which includes ours.
2 changes: 2 additions & 0 deletions compute_endpoint/packaging/JenkinsFile
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ pipeline {
script {
def scmVars = checkout scm
env.BRANCH_NAME = scmVars.GIT_BRANCH.replaceFirst(/^.*origin\//, "")
env.TAG_NAME = sh(returnStdout: true, script: "git tag --contains | head -1").trim()
env.SOURCE_STASH_NAME = "${UUID.randomUUID()}"
echo "env.BRANCH_NAME = ${env.BRANCH_NAME}"
echo "env.DEFAULT_BRANCH = ${env.DEFAULT_BRANCH}"
sh "git clean -fdx"

dir("compute_endpoint/packaging/") {
Expand Down
31 changes: 31 additions & 0 deletions compute_endpoint/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@
import globus_sdk
import pytest
import responses
from globus_compute_common import messagepack
from globus_compute_endpoint import engines
from globus_compute_endpoint.engines.base import GlobusComputeEngineBase
from globus_compute_sdk.sdk.web_client import WebClient
from globus_compute_sdk.serialize import ComputeSerializer
from parsl.launchers import SimpleLauncher
from parsl.providers import LocalProvider
from tests.utils import ez_pack_function


@pytest.fixture(autouse=True)
Expand Down Expand Up @@ -192,3 +195,31 @@ def _wrapped(*args, **kwargs):
pass

return _wrapped


@pytest.fixture
def task_uuid() -> uuid.UUID:
return uuid.uuid4()


@pytest.fixture
def container_uuid() -> uuid.UUID:
return uuid.uuid4()


@pytest.fixture(scope="module")
def serde():
return ComputeSerializer()


@pytest.fixture
def ez_pack_task(serde, task_uuid, container_uuid):
def _pack_it(fn, *a, **k) -> bytes:
task_body = ez_pack_function(serde, fn, a, k)
return messagepack.pack(
messagepack.message_types.Task(
task_id=task_uuid, container_id=container_uuid, task_buffer=task_body
)
)

return _pack_it
Original file line number Diff line number Diff line change
@@ -1,31 +1,22 @@
import uuid

import pytest
from globus_compute_common import messagepack
from globus_compute_endpoint.engines import GlobusComputeEngine
from globus_compute_sdk.sdk.shell_function import ShellFunction
from globus_compute_sdk.serialize import ComputeSerializer
from tests.utils import ez_pack_function


def test_shell_function(engine_runner, tmp_path):
def test_shell_function(engine_runner, tmp_path, task_uuid, serde, ez_pack_task):
"""Test running ShellFunction with GCE: Happy path"""
engine = engine_runner(GlobusComputeEngine)
task_id = uuid.uuid1()
serializer = ComputeSerializer()
shell_func = ShellFunction("pwd")
task_body = ez_pack_function(serializer, shell_func, (), {})
task_message = messagepack.pack(
messagepack.message_types.Task(task_id=task_id, task_buffer=task_body)
)
future = engine.submit(task_id, task_message, resource_specification={})
task_bytes = ez_pack_task(shell_func)
future = engine.submit(task_uuid, task_bytes, resource_specification={})

packed_result = future.result()
result = messagepack.unpack(packed_result)

assert result.task_id == task_id
assert result.task_id == task_uuid
assert result.error_details is None
result_obj = serializer.deserialize(result.data)
result_obj = serde.deserialize(result.data)

assert "pwd" == result_obj.cmd
assert result_obj.returncode == 0
Expand All @@ -42,46 +33,38 @@ def test_shell_function(engine_runner, tmp_path):
("touch foo; ./foo", "Permission denied", 126),
],
)
def test_fail_shell_function(engine_runner, tmp_path, cmd, error_str, returncode):
def test_fail_shell_function(
engine_runner, cmd, error_str, returncode, serde, task_uuid, ez_pack_task
):
"""Test running ShellFunction with GCE: Failure path"""
engine = engine_runner(GlobusComputeEngine, run_in_sandbox=True)
task_id = uuid.uuid1()
serializer = ComputeSerializer()
shell_func = ShellFunction(cmd, walltime=0.1)
task_body = ez_pack_function(serializer, shell_func, (), {})
task_message = messagepack.pack(
messagepack.message_types.Task(task_id=task_id, task_buffer=task_body)
)
future = engine.submit(task_id, task_message, resource_specification={})
task_bytes = ez_pack_task(shell_func)
future = engine.submit(task_uuid, task_bytes, resource_specification={})

packed_result = future.result()
result = messagepack.unpack(packed_result)
assert result.task_id == task_id
assert result.task_id == task_uuid
assert not result.error_details

result_obj = serializer.deserialize(result.data)
result_obj = serde.deserialize(result.data)

assert error_str in result_obj.stderr
assert result_obj.returncode == returncode


def test_no_sandbox(engine_runner, tmp_path):
def test_no_sandbox(engine_runner, task_uuid, serde, ez_pack_task):
"""Test running ShellFunction without sandbox"""
engine = engine_runner(GlobusComputeEngine, run_in_sandbox=False)
task_id = uuid.uuid1()
serializer = ComputeSerializer()
shell_func = ShellFunction("pwd")
task_body = ez_pack_function(serializer, shell_func, (), {})
task_message = messagepack.pack(
messagepack.message_types.Task(task_id=task_id, task_buffer=task_body)
)
future = engine.submit(task_id, task_message, resource_specification={})
task_bytes = ez_pack_task(shell_func)
future = engine.submit(task_uuid, task_bytes, resource_specification={})

packed_result = future.result()
result = messagepack.unpack(packed_result)
assert result.task_id == task_id
assert result.task_id == task_uuid
assert result.error_details is None
result_obj = serializer.deserialize(result.data)
result_obj = serde.deserialize(result.data)

assert "pwd" == result_obj.cmd
assert result_obj.returncode == 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@
import pytest
from globus_compute_common import messagepack
from globus_compute_endpoint.engines import HighThroughputEngine
from globus_compute_sdk.serialize import ComputeSerializer
from tests.utils import double, ez_pack_function
from tests.utils import double


@pytest.fixture
Expand All @@ -24,28 +23,21 @@ def engine(tmp_path):
engine.shutdown()


def test_engine_submit(engine):
def test_engine_submit(engine, serde, task_uuid, ez_pack_task):
q = engine.results_passthrough
task_id = uuid.uuid1()
serializer = ComputeSerializer()
task_arg = random.randint(1, 1000)
task_body = ez_pack_function(serializer, double, (task_arg,), {})
task_message = messagepack.pack(
messagepack.message_types.Task(
task_id=task_id, container_id=uuid.uuid1(), task_buffer=task_body
)
)
task_bytes = ez_pack_task(double, task_arg)
resource_spec = {}
future = engine.submit(
str(task_id), task_message, resource_specification=resource_spec
str(task_uuid), task_bytes, resource_specification=resource_spec
)
packed_result = future.result()

# Confirm that the future got the right answer
assert isinstance(packed_result, bytes)
result = messagepack.unpack(packed_result)
assert isinstance(result, messagepack.message_types.Result)
assert result.task_id == task_id
assert result.task_id == task_uuid

# Confirm that the same result got back though the queue
for _i in range(10):
Expand All @@ -63,8 +55,8 @@ def test_engine_submit(engine):
packed_result == packed_result_q
), "Result from passthrough_q and future should match"

assert result.task_id == task_id
final_result = serializer.deserialize(result.data)
assert result.task_id == task_uuid
final_result = serde.deserialize(result.data)
expected = task_arg * 2
assert final_result == expected, f"Expected {expected}, but got: {final_result}"
break
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@
import pytest
from globus_compute_common import messagepack
from globus_compute_endpoint.engines import GlobusComputeEngine
from globus_compute_sdk.serialize import ComputeSerializer
from parsl.executors.high_throughput.interchange import ManagerLost
from parsl.providers import LocalProvider
from tests.utils import ez_pack_function, slow_double
from tests.utils import slow_double


class MockHTEX:
Expand Down Expand Up @@ -54,59 +53,41 @@ def mock_gce(tmp_path):
yield engine


def test_success_after_1_fail(mock_gce, tmp_path):
def test_success_after_1_fail(mock_gce, serde, ez_pack_task):
engine = mock_gce
engine.max_retries_on_system_failure = 2
queue = engine.results_passthrough
q = engine.results_passthrough
task_id = uuid.uuid1()
serializer = ComputeSerializer()
num = random.randint(1, 10000)
task_bytes = ez_pack_task(slow_double, num, 0.2)

# Set the failure count on the mock executor to force failure
engine.executor.fail_count = 1
engine.submit(task_id, task_bytes, resource_specification={})

task_body = ez_pack_function(
serializer,
slow_double,
(
num,
0.2,
),
{},
)
task_message = messagepack.pack(
messagepack.message_types.Task(task_id=task_id, task_buffer=task_body)
)

engine.submit(task_id, task_message, resource_specification={})

packed_result = queue.get()
packed_result = q.get()
assert isinstance(packed_result, dict)
result = messagepack.unpack(packed_result["message"])

assert result.task_id == task_id
assert serializer.deserialize(result.data) == 2 * num
assert serde.deserialize(result.data) == 2 * num


def test_repeated_fail(mock_gce, tmp_path):
def test_repeated_fail(mock_gce, ez_pack_task):
fail_count = 2
engine = mock_gce
engine.max_retries_on_system_failure = fail_count
queue = engine.results_passthrough
q = engine.results_passthrough
task_id = uuid.uuid1()
serializer = ComputeSerializer()

# Set executor to continue failures beyond retry limit
engine.executor.fail_count = fail_count + 1

task_body = ez_pack_function(serializer, slow_double, (5,), {})
task_message = messagepack.pack(
messagepack.message_types.Task(task_id=task_id, task_buffer=task_body)
)
task_bytes = ez_pack_task(slow_double, 5)

engine.submit(task_id, task_message, resource_specification={})
engine.submit(task_id, task_bytes, resource_specification={})

packed_result_q = queue.get(10)
packed_result_q = q.get()
result = messagepack.unpack(packed_result_q["message"])
assert isinstance(result, messagepack.message_types.Result)
assert result.task_id == task_id
Expand Down
Loading

0 comments on commit 7b0e426

Please sign in to comment.