Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use pytest_subtests for grouped tests #2644

Merged
merged 3 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 53 additions & 51 deletions cardano_node_tests/tests/test_pools.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import hypothesis
import hypothesis.strategies as st
import pytest
import pytest_subtests
from _pytest.fixtures import FixtureRequest
from cardano_clusterlib import clusterlib

Expand Down Expand Up @@ -1772,9 +1773,11 @@ def test_pool_registration_deregistration(
dbsync_utils.check_tx(cluster_obj=cluster, tx_raw_output=tx_raw_output)


# It takes long time to setup the cluster instance (when starting from Byron).
# We mark the tests as "long" and set the highest priority, so the setup is done at the
# beginning of the testrun, instead of needing to respin a cluster that is already running.
@pytest.mark.order(5)
@pytest.mark.long
@pytest.mark.xdist_group(name="minPoolCost")
class TestPoolCost:
"""Tests for stake pool cost."""

Expand All @@ -1791,28 +1794,22 @@ def cluster_mincost(
)

@pytest.fixture
def pool_owners(
def pool_owners_pbt(
self,
cluster_manager: cluster_management.ClusterManager,
cluster_mincost: clusterlib.ClusterLib,
):
"""Create class scoped pool owners."""
"""Create pool owners for property-based test."""
cluster = cluster_mincost
temp_template = common.get_test_id(cluster)

with cluster_manager.cache_fixture() as fixture_cache:
if fixture_cache.value:
return fixture_cache.value # type: ignore

temp_template = common.get_test_id(cluster)

pool_owners = clusterlib_utils.create_pool_users(
cluster_obj=cluster,
name_template=temp_template,
no_of_addr=1,
)
fixture_cache.value = pool_owners
pool_owners = clusterlib_utils.create_pool_users(
cluster_obj=cluster,
name_template=temp_template,
no_of_addr=1,
)

# fund source address
# Fund source address
clusterlib_utils.fund_from_faucet(
pool_owners[0].payment,
cluster_obj=cluster,
Expand All @@ -1828,7 +1825,7 @@ def pool_owners(
def test_stake_pool_low_cost(
self,
cluster_mincost: clusterlib.ClusterLib,
pool_owners: tp.List[clusterlib.PoolUser],
pool_owners_pbt: tp.List[clusterlib.PoolUser],
pool_cost: int,
):
"""Try to create and register a stake pool with pool cost lower than *minPoolCost*.
Expand All @@ -1853,7 +1850,7 @@ def test_stake_pool_low_cost(
cluster_obj=cluster,
temp_template=temp_template,
temp_dir=pl.Path(),
pool_owners=pool_owners,
pool_owners=pool_owners_pbt,
pool_data=pool_data,
)

Expand All @@ -1865,52 +1862,57 @@ def test_stake_pool_low_cost(
assert "StakePoolCostTooLowPOOL" in err

@allure.link(helpers.get_vcs_link())
@pytest.mark.parametrize("pool_cost", [500, 9_999_999])
@pytest.mark.dbsync
def test_stake_pool_cost(
self,
cluster_manager: cluster_management.ClusterManager,
cluster_mincost: clusterlib.ClusterLib,
pool_owners: tp.List[clusterlib.PoolUser],
pool_cost: int,
subtests: pytest_subtests.SubTests,
request: FixtureRequest,
):
"""Create and register a stake pool with *pool cost* >= *minPoolCost*."""
cluster = cluster_mincost
rand_str = clusterlib.get_rand_str(4)
temp_template = f"{common.get_test_id(cluster)}_{rand_str}"
temp_template = common.get_test_id(cluster)

pool_data = clusterlib.PoolData(
pool_name=f"pool_{rand_str}",
pool_pledge=12_345,
pool_cost=pool_cost,
pool_margin=0.123,
)
def _subtest(pool_cost: int) -> None:
rand_str = clusterlib.get_rand_str(4)
name_template = f"{temp_template}_{rand_str}"

# create pool owners
pool_owners = clusterlib_utils.create_pool_users(
cluster_obj=cluster,
name_template=temp_template,
no_of_addr=1,
)
pool_data = clusterlib.PoolData(
pool_name=f"pool_{rand_str}",
pool_pledge=12_345,
pool_cost=pool_cost,
pool_margin=0.123,
)

# fund source address
clusterlib_utils.fund_from_faucet(
pool_owners[0].payment,
cluster_obj=cluster,
faucet_data=cluster_manager.cache.addrs_data["user1"],
amount=900_000_000,
)
# Create pool owners
pool_owners = clusterlib_utils.create_pool_users(
cluster_obj=cluster,
name_template=name_template,
no_of_addr=1,
)

# register pool
_create_register_pool(
cluster_obj=cluster,
temp_template=temp_template,
temp_dir=pl.Path(),
pool_owners=pool_owners,
pool_data=pool_data,
request=request,
)
# Fund source address
clusterlib_utils.fund_from_faucet(
pool_owners[0].payment,
cluster_obj=cluster,
faucet_data=cluster_manager.cache.addrs_data["user1"],
amount=900_000_000,
)

# Register pool
_create_register_pool(
cluster_obj=cluster,
temp_template=name_template,
temp_dir=pl.Path(),
pool_owners=pool_owners,
pool_data=pool_data,
request=request,
)

for pc in (500, 9_999_999):
with subtests.test(pool_cost=pc):
_subtest(pc)


@pytest.mark.smoke
Expand Down
Loading
Loading