Skip to content

Commit

Permalink
Merge branch '6/edge' into shard-removal-test
Browse files Browse the repository at this point in the history
  • Loading branch information
MiaAltieri committed Nov 1, 2023
2 parents 53fa13b + 0b904b6 commit 8ae00ab
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 95 deletions.
15 changes: 10 additions & 5 deletions lib/charms/mongodb/v1/mongos.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,11 +163,6 @@ def add_shard(self, shard_name, shard_hosts, shard_port=Config.MONGODB_PORT):
shard_hosts = [f"{host}:{shard_port}" for host in shard_hosts]
shard_hosts = ",".join(shard_hosts)
shard_url = f"{shard_name}/{shard_hosts}"
# TODO Future PR raise error when number of shards currently adding are higher than the
# number of secondaries on the primary shard. This will be challenging, as there is no
# MongoDB command to retrieve the primary shard. Will likely need to be done via
# mongosh

if shard_name in self.get_shard_members():
logger.info("Skipping adding shard %s, shard is already in cluster", shard_name)
return
Expand Down Expand Up @@ -454,3 +449,13 @@ def get_shard_with_most_available_space(self, shard_to_ignore) -> Tuple[str, int
candidate_free_space = current_free_space

return (candidate_shard, candidate_free_space)

def get_draining_shards(self) -> List[str]:
"""Returns a list of the shards currently draining."""
sc_status = self.client.admin.command("listShards")
draining_shards = []
for shard in sc_status["shards"]:
if shard.get("draining", False):
draining_shards.append(shard["_id"])

return draining_shards
86 changes: 76 additions & 10 deletions lib/charms/mongodb/v1/shards_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from ops.model import (
ActiveStatus,
BlockedStatus,
ErrorStatus,
MaintenanceStatus,
StatusBase,
WaitingStatus,
Expand Down Expand Up @@ -217,9 +218,6 @@ def add_shards(self, departed_shard_id):
with MongosConnection(self.charm.mongos_config) as mongo:
cluster_shards = mongo.get_shard_members()
relation_shards = self._get_shards_from_relations(departed_shard_id)

# TODO Future PR, limit number of shards add at a time, based on the number of
# replicas in the primary shard
for shard in relation_shards - cluster_shards:
try:
shard_hosts = self._get_shard_hosts(shard)
Expand Down Expand Up @@ -280,9 +278,39 @@ def update_mongos_hosts(self):
for relation in self.charm.model.relations[self.relation_name]:
self._update_relation_data(relation.id, {HOSTS_KEY: json.dumps(self.charm._unit_ips)})

def get_config_server_status(self):
"""TODO: Implement this function in a separate PR."""
return None
def get_config_server_status(self) -> Optional[StatusBase]:
"""Returns the current status of the config-server."""
if not self.charm.is_role(Config.Role.CONFIG_SERVER):
logger.info("skipping status check, charm is not running as a shard")
return None

if not self.charm.db_initialised:
logger.info("No status for shard to report, waiting for db to be initialised.")
return None

if self.model.relations[LEGACY_REL_NAME]:
return BlockedStatus(f"relation {LEGACY_REL_NAME} to shard not supported.")

if self.model.relations[REL_NAME]:
return BlockedStatus(f"relation {REL_NAME} to shard not supported.")

if not self.is_mongos_running():
return ErrorStatus("Internal mongos is not running.")

shard_draining = self.get_draining_shards()
if shard_draining:
shard_draining = ",".join(shard_draining)
return MaintenanceStatus(f"Draining shard {shard_draining}")

if not self.model.relations[self.relation_name]:
return BlockedStatus("missing relation to shard(s)")

unreachable_shards = self.get_unreachable_shards()
if unreachable_shards:
unreachable_shards = ", ".join(unreachable_shards)
return ErrorStatus(f"Shards {unreachable_shards} are unreachable.")

return ActiveStatus()

def _update_relation_data(self, relation_id: int, data: dict) -> None:
"""Updates a set of key-value pairs in the relation.
Expand Down Expand Up @@ -334,6 +362,46 @@ def get_related_shards(self) -> List[str]:
"""Returns a list of related shards."""
return [rel.app.name for rel in self.charm.model.relations[self.relation_name]]

def get_unreachable_shards(self) -> List[str]:
"""Returns a list of unreable shard hosts."""
unreachable_hosts = []
if not self.model.relations[self.relation_name]:
logger.info("shards are not reachable, none related to config-sever")
return unreachable_hosts

for shard_name in self.get_related_shards():
shard_hosts = self._get_shard_hosts(shard_name)
if not shard_hosts:
return unreachable_hosts

# use a URI that is not dependent on the operator password, as we are not guaranteed
# that the shard has received the password yet.
uri = f"mongodb://{','.join(shard_hosts)}"
with MongoDBConnection(None, uri) as mongo:
if not mongo.is_ready:
unreachable_hosts.append(shard_name)

return unreachable_hosts

def is_mongos_running(self) -> bool:
"""Returns true if mongos service is running."""
mongos_hosts = ",".join(self.charm._unit_ips)
uri = f"mongodb://{mongos_hosts}"
with MongosConnection(None, uri) as mongo:
return mongo.is_ready

def get_draining_shards(self) -> List[str]:
"""Returns the shard that is currently draining."""
with MongosConnection(self.charm.mongos_config) as mongo:
draining_shards = mongo.get_draining_shards()

# in theory, this should always be a list of one. But if something has gone wrong we
# should take note and log it
if len(draining_shards) > 1:
logger.error("Multiple shards draining at the same time.")

return draining_shards


class ConfigServerRequirer(Object):
"""Manage relations between the config server and the shard, on the shard's side."""
Expand Down Expand Up @@ -518,16 +586,15 @@ def get_shard_status(self) -> Optional[StatusBase]:
return ActiveStatus("Shard drained from cluster, ready for removal")

if not self._is_mongos_reachable():
return BlockedStatus("Config server unreachable")
return ErrorStatus("Config server unreachable")

if not self._is_added_to_cluster():
return MaintenanceStatus("Adding shard to config-server")

if not self._is_shard_aware():
return BlockedStatus("Shard is not yet shard aware")

config_server_name = self.get_related_config_server()
return ActiveStatus(f"Shard connected to config-server: {config_server_name}")
return ActiveStatus()

def drained(self, mongos_hosts: Set[str], shard_name: str) -> bool:
"""Returns whether a shard has been drained from the cluster.
Expand Down Expand Up @@ -644,7 +711,6 @@ def _is_mongos_reachable(self) -> bool:
if not mongos_hosts:
return False

self.charm.remote_mongos_config(set(mongos_hosts))
config = self.charm.remote_mongos_config(set(mongos_hosts))

# use a URI that is not dependent on the operator password, as we are not guaranteed that
Expand Down
8 changes: 1 addition & 7 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -1380,13 +1380,7 @@ def get_status(self) -> StatusBase:
if pbm_status and not isinstance(pbm_status, ActiveStatus):
return pbm_status

# if all statuses are active report sharding statuses over mongodb status
if isinstance(shard_status, ActiveStatus):
return shard_status

if isinstance(config_server_status, ActiveStatus):
return config_server_status

# if all statuses are active report mongodb status over sharding status
return mongodb_status

def is_relation_feasible(self, rel_interface) -> bool:
Expand Down
134 changes: 61 additions & 73 deletions tests/integration/sharding_tests/test_sharding.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,20 +48,19 @@ async def test_build_and_deploy(ops_test: OpsTest) -> None:
my_charm, num_units=2, config={"role": "shard"}, application_name=SHARD_THREE_APP_NAME
)

async with ops_test.fast_forward():
await ops_test.model.wait_for_idle(
apps=[CONFIG_SERVER_APP_NAME, SHARD_ONE_APP_NAME, SHARD_THREE_APP_NAME],
idle_period=20,
raise_on_blocked=False,
timeout=TIMEOUT,
raise_on_error=False,
)
await ops_test.model.wait_for_idle(
apps=[CONFIG_SERVER_APP_NAME, SHARD_ONE_APP_NAME, SHARD_THREE_APP_NAME],
idle_period=20,
raise_on_blocked=False,
timeout=TIMEOUT,
raise_on_error=False,
)

# verify that Charmed MongoDB is blocked and reports incorrect credentials
await asyncio.gather(
ops_test.model.wait_for_idle(
apps=[CONFIG_SERVER_APP_NAME],
status="active",
status="blocked",
idle_period=20,
timeout=TIMEOUT,
),
Expand All @@ -73,7 +72,9 @@ async def test_build_and_deploy(ops_test: OpsTest) -> None:
),
)

# TODO Future PR: assert statuses for config-server
config_server_unit = ops_test.model.applications[CONFIG_SERVER_APP_NAME].units[0]
assert config_server_unit.workload_status_message == "missing relation to shard(s)"

for shard_app_name in SHARD_APPS:
shard_unit = ops_test.model.applications[shard_app_name].units[0]
assert shard_unit.workload_status_message == "missing relation to config server"
Expand All @@ -95,19 +96,18 @@ async def test_cluster_active(ops_test: OpsTest) -> None:
f"{CONFIG_SERVER_APP_NAME}:{CONFIG_SERVER_REL_NAME}",
)

async with ops_test.fast_forward():
await ops_test.model.wait_for_idle(
apps=[
CONFIG_SERVER_APP_NAME,
SHARD_ONE_APP_NAME,
SHARD_TWO_APP_NAME,
SHARD_THREE_APP_NAME,
],
idle_period=20,
status="active",
timeout=TIMEOUT,
raise_on_error=False,
)
await ops_test.model.wait_for_idle(
apps=[
CONFIG_SERVER_APP_NAME,
SHARD_ONE_APP_NAME,
SHARD_TWO_APP_NAME,
SHARD_THREE_APP_NAME,
],
idle_period=20,
status="active",
timeout=TIMEOUT,
raise_on_error=False,
)

mongos_client = await generate_mongodb_client(
ops_test, app_name=CONFIG_SERVER_APP_NAME, mongos=True
Expand All @@ -119,14 +119,6 @@ async def test_cluster_active(ops_test: OpsTest) -> None:
expected_shards=[SHARD_ONE_APP_NAME, SHARD_TWO_APP_NAME, SHARD_THREE_APP_NAME],
), "Config server did not process config properly"

# TODO Future PR: assert statuses for config-server
for shard_app_name in SHARD_APPS:
shard_unit = ops_test.model.applications[shard_app_name].units[0]
assert (
shard_unit.workload_status_message
== f"Shard connected to config-server: {CONFIG_SERVER_APP_NAME}"
)


@pytest.mark.abort_on_fail
async def test_sharding(ops_test: OpsTest) -> None:
Expand Down Expand Up @@ -207,19 +199,18 @@ async def test_shard_removal(ops_test: OpsTest) -> None:
f"{CONFIG_SERVER_APP_NAME}:{CONFIG_SERVER_REL_NAME}",
)

async with ops_test.fast_forward():
await ops_test.model.wait_for_idle(
apps=[
CONFIG_SERVER_APP_NAME,
SHARD_ONE_APP_NAME,
SHARD_TWO_APP_NAME,
SHARD_THREE_APP_NAME,
],
idle_period=20,
status="active",
timeout=TIMEOUT,
raise_on_error=False,
)
await ops_test.model.wait_for_idle(
apps=[
CONFIG_SERVER_APP_NAME,
SHARD_ONE_APP_NAME,
SHARD_TWO_APP_NAME,
SHARD_THREE_APP_NAME,
],
idle_period=20,
status="active",
timeout=TIMEOUT,
raise_on_error=False,
)

# TODO future PR: assert statuses are correct

Expand Down Expand Up @@ -248,19 +239,18 @@ async def test_removal_of_non_primary_shard(ops_test: OpsTest):
f"{CONFIG_SERVER_APP_NAME}:{CONFIG_SERVER_REL_NAME}",
)

async with ops_test.fast_forward():
await ops_test.model.wait_for_idle(
apps=[
CONFIG_SERVER_APP_NAME,
SHARD_ONE_APP_NAME,
SHARD_TWO_APP_NAME,
SHARD_THREE_APP_NAME,
],
idle_period=20,
status="active",
timeout=TIMEOUT,
raise_on_error=False,
)
await ops_test.model.wait_for_idle(
apps=[
CONFIG_SERVER_APP_NAME,
SHARD_ONE_APP_NAME,
SHARD_TWO_APP_NAME,
SHARD_THREE_APP_NAME,
],
idle_period=20,
status="active",
timeout=TIMEOUT,
raise_on_error=False,
)

await ops_test.model.applications[CONFIG_SERVER_APP_NAME].remove_relation(
f"{SHARD_TWO_APP_NAME}:{SHARD_REL_NAME}",
Expand Down Expand Up @@ -305,25 +295,23 @@ async def test_unconventual_shard_removal(ops_test: OpsTest):
)

await ops_test.model.applications[SHARD_TWO_APP_NAME].destroy_units(f"{SHARD_TWO_APP_NAME}/0")
async with ops_test.fast_forward():
await ops_test.model.wait_for_idle(
apps=[SHARD_TWO_APP_NAME],
idle_period=20,
status="active",
timeout=TIMEOUT,
raise_on_error=False,
)
await ops_test.model.wait_for_idle(
apps=[SHARD_TWO_APP_NAME],
idle_period=20,
status="active",
timeout=TIMEOUT,
raise_on_error=False,
)

await ops_test.model.remove_application(SHARD_TWO_APP_NAME, block_until_done=True)

async with ops_test.fast_forward():
await ops_test.model.wait_for_idle(
apps=[CONFIG_SERVER_APP_NAME, SHARD_ONE_APP_NAME],
idle_period=20,
status="active",
timeout=TIMEOUT,
raise_on_error=False,
)
await ops_test.model.wait_for_idle(
apps=[CONFIG_SERVER_APP_NAME, SHARD_ONE_APP_NAME],
idle_period=20,
status="active",
timeout=TIMEOUT,
raise_on_error=False,
)

mongos_client = await generate_mongodb_client(
ops_test, app_name=CONFIG_SERVER_APP_NAME, mongos=True
Expand Down

0 comments on commit 8ae00ab

Please sign in to comment.