Skip to content

Commit

Permalink
[DPE-2776] Status reporting config server side (#290)
Browse files Browse the repository at this point in the history
## Issue
No statuses reported for config server

## Solution
Report statuses on config sever side
  • Loading branch information
MiaAltieri authored Nov 1, 2023
1 parent fc8789c commit 0b904b6
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 34 deletions.
17 changes: 11 additions & 6 deletions lib/charms/mongodb/v1/mongos.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

# Increment this PATCH version before using `charmcraft publish-lib` or reset
# to 0 if you are raising the major API version
LIBPATCH = 2
LIBPATCH = 3

# path to store mongodb ketFile
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -163,11 +163,6 @@ def add_shard(self, shard_name, shard_hosts, shard_port=Config.MONGODB_PORT):
shard_hosts = [f"{host}:{shard_port}" for host in shard_hosts]
shard_hosts = ",".join(shard_hosts)
shard_url = f"{shard_name}/{shard_hosts}"
# TODO Future PR raise error when number of shards currently adding are higher than the
# number of secondaries on the primary shard. This will be challenging, as there is no
# MongoDB command to retrieve the primary shard. Will likely need to be done via
# mongosh

if shard_name in self.get_shard_members():
logger.info("Skipping adding shard %s, shard is already in cluster", shard_name)
return
Expand Down Expand Up @@ -444,3 +439,13 @@ def get_shard_with_most_available_space(self, shard_to_ignore) -> Tuple[str, int
candidate_free_space = current_free_space

return (candidate_shard, candidate_free_space)

def get_draining_shards(self) -> List[str]:
"""Returns a list of the shards currently draining."""
sc_status = self.client.admin.command("listShards")
draining_shards = []
for shard in sc_status["shards"]:
if shard.get("draining", False):
draining_shards.append(shard["_id"])

return draining_shards
91 changes: 80 additions & 11 deletions lib/charms/mongodb/v1/shards_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from ops.model import (
ActiveStatus,
BlockedStatus,
ErrorStatus,
MaintenanceStatus,
StatusBase,
WaitingStatus,
Expand Down Expand Up @@ -217,9 +218,6 @@ def add_shards(self, departed_shard_id):
with MongosConnection(self.charm.mongos_config) as mongo:
cluster_shards = mongo.get_shard_members()
relation_shards = self._get_shards_from_relations(departed_shard_id)

# TODO Future PR, limit number of shards add at a time, based on the number of
# replicas in the primary shard
for shard in relation_shards - cluster_shards:
try:
shard_hosts = self._get_shard_hosts(shard)
Expand Down Expand Up @@ -271,9 +269,39 @@ def update_mongos_hosts(self):
for relation in self.charm.model.relations[self.relation_name]:
self._update_relation_data(relation.id, {HOSTS_KEY: json.dumps(self.charm._unit_ips)})

def get_config_server_status(self):
"""TODO: Implement this function in a separate PR."""
return None
def get_config_server_status(self) -> Optional[StatusBase]:
"""Returns the current status of the config-server."""
if not self.charm.is_role(Config.Role.CONFIG_SERVER):
logger.info("skipping status check, charm is not running as a shard")
return None

if not self.charm.db_initialised:
logger.info("No status for shard to report, waiting for db to be initialised.")
return None

if self.model.relations[LEGACY_REL_NAME]:
return BlockedStatus(f"relation {LEGACY_REL_NAME} to shard not supported.")

if self.model.relations[REL_NAME]:
return BlockedStatus(f"relation {REL_NAME} to shard not supported.")

if not self.is_mongos_running():
return ErrorStatus("Internal mongos is not running.")

shard_draining = self.get_draining_shards()
if shard_draining:
shard_draining = ",".join(shard_draining)
return MaintenanceStatus(f"Draining shard {shard_draining}")

if not self.model.relations[self.relation_name]:
return BlockedStatus("missing relation to shard(s)")

unreachable_shards = self.get_unreachable_shards()
if unreachable_shards:
unreachable_shards = ", ".join(unreachable_shards)
return ErrorStatus(f"Shards {unreachable_shards} are unreachable.")

return ActiveStatus()

def _update_relation_data(self, relation_id: int, data: dict) -> None:
"""Updates a set of key-value pairs in the relation.
Expand Down Expand Up @@ -325,6 +353,46 @@ def get_related_shards(self) -> List[str]:
"""Returns a list of related shards."""
return [rel.app.name for rel in self.charm.model.relations[self.relation_name]]

def get_unreachable_shards(self) -> List[str]:
"""Returns a list of unreable shard hosts."""
unreachable_hosts = []
if not self.model.relations[self.relation_name]:
logger.info("shards are not reachable, none related to config-sever")
return unreachable_hosts

for shard_name in self.get_related_shards():
shard_hosts = self._get_shard_hosts(shard_name)
if not shard_hosts:
return unreachable_hosts

# use a URI that is not dependent on the operator password, as we are not guaranteed
# that the shard has received the password yet.
uri = f"mongodb://{','.join(shard_hosts)}"
with MongoDBConnection(None, uri) as mongo:
if not mongo.is_ready:
unreachable_hosts.append(shard_name)

return unreachable_hosts

def is_mongos_running(self) -> bool:
"""Returns true if mongos service is running."""
mongos_hosts = ",".join(self.charm._unit_ips)
uri = f"mongodb://{mongos_hosts}"
with MongosConnection(None, uri) as mongo:
return mongo.is_ready

def get_draining_shards(self) -> List[str]:
"""Returns the shard that is currently draining."""
with MongosConnection(self.charm.mongos_config) as mongo:
draining_shards = mongo.get_draining_shards()

# in theory, this should always be a list of one. But if something has gone wrong we
# should take note and log it
if len(draining_shards) > 1:
logger.error("Multiple shards draining at the same time.")

return draining_shards


class ConfigServerRequirer(Object):
"""Manage relations between the config server and the shard, on the shard's side."""
Expand Down Expand Up @@ -499,16 +567,15 @@ def get_shard_status(self) -> Optional[StatusBase]:
return ActiveStatus("Shard drained from cluster, ready for removal")

if not self._is_mongos_reachable():
return BlockedStatus("Config server unreachable")
return ErrorStatus("Config server unreachable")

if not self._is_added_to_cluster():
return MaintenanceStatus("Adding shard to config-server")

if not self._is_shard_aware():
return BlockedStatus("Shard is not yet shard aware")

config_server_name = self.get_related_config_server()
return ActiveStatus(f"Shard connected to config-server: {config_server_name}")
return ActiveStatus()

def drained(self, mongos_hosts: Set[str], shard_name: str) -> bool:
"""Returns whether a shard has been drained from the cluster.
Expand Down Expand Up @@ -625,7 +692,6 @@ def _is_mongos_reachable(self) -> bool:
if not mongos_hosts:
return False

self.charm.remote_mongos_config(set(mongos_hosts))
config = self.charm.remote_mongos_config(set(mongos_hosts))

# use a URI that is not dependent on the operator password, as we are not guaranteed that
Expand All @@ -636,7 +702,10 @@ def _is_mongos_reachable(self) -> bool:

def _is_added_to_cluster(self) -> bool:
"""Returns True if the shard has been added to the cluster."""
return json.loads(self.charm.app_peer_data.get("added_to_cluster", "False"))
if "added_to_cluster" not in self.charm.app_peer_data:
return False

return json.loads(self.charm.app_peer_data.get("added_to_cluster"))

def _is_shard_aware(self) -> bool:
"""Returns True if shard is in cluster and shard aware."""
Expand Down
8 changes: 1 addition & 7 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -1380,13 +1380,7 @@ def get_status(self) -> StatusBase:
if pbm_status and not isinstance(pbm_status, ActiveStatus):
return pbm_status

# if all statuses are active report sharding statuses over mongodb status
if isinstance(shard_status, ActiveStatus):
return shard_status

if isinstance(config_server_status, ActiveStatus):
return config_server_status

# if all statuses are active report mongodb status over sharding status
return mongodb_status

def is_relation_feasible(self, rel_interface) -> bool:
Expand Down
14 changes: 4 additions & 10 deletions tests/integration/sharding_tests/test_sharding.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ async def test_build_and_deploy(ops_test: OpsTest) -> None:
await asyncio.gather(
ops_test.model.wait_for_idle(
apps=[CONFIG_SERVER_APP_NAME],
status="active",
status="blocked",
idle_period=20,
timeout=TIMEOUT,
),
Expand All @@ -62,7 +62,9 @@ async def test_build_and_deploy(ops_test: OpsTest) -> None:
),
)

# TODO Future PR: assert statuses for config-server
config_server_unit = ops_test.model.applications[CONFIG_SERVER_APP_NAME].units[0]
assert config_server_unit.workload_status_message == "missing relation to shard(s)"

for shard_app_name in SHARD_APPS:
shard_unit = ops_test.model.applications[shard_app_name].units[0]
assert shard_unit.workload_status_message == "missing relation to config server"
Expand All @@ -88,14 +90,6 @@ async def test_cluster_active(ops_test: OpsTest) -> None:
timeout=TIMEOUT,
)

# TODO Future PR: assert statuses for config-server
for shard_app_name in SHARD_APPS:
shard_unit = ops_test.model.applications[shard_app_name].units[0]
assert (
shard_unit.workload_status_message
== f"Shard connected to config-server: {CONFIG_SERVER_APP_NAME}"
)


async def test_sharding(ops_test: OpsTest) -> None:
"""Tests writing data to mongos gets propagated to shards."""
Expand Down

0 comments on commit 0b904b6

Please sign in to comment.