Skip to content

Commit

Permalink
implement the backup and rollback
Browse files Browse the repository at this point in the history
  • Loading branch information
papa99do committed Aug 4, 2024
1 parent 497bcdd commit 1246404
Show file tree
Hide file tree
Showing 3 changed files with 187 additions and 181 deletions.
58 changes: 28 additions & 30 deletions src/marqo/core/index_management/index_management.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
from contextlib import contextmanager
from typing import List
from typing import List, Tuple
from typing import Optional

import marqo.logging
import marqo.vespa.vespa_client
from marqo import version
from marqo.core import constants
from marqo.core.distributed_lock.zookeeper_distributed_lock import get_deployment_lock
from marqo.core.exceptions import IndexExistsError, IndexNotFoundError, ApplicationNotInitializedError
from marqo.core.exceptions import IndexNotFoundError, ApplicationNotInitializedError
from marqo.core.exceptions import OperationConflictError
from marqo.core.exceptions import ZookeeperLockNotAcquiredError, InternalError
from marqo.core.index_management.vespa_application_package import VespaApplicationPackage, MarqoConfig, \
Expand Down Expand Up @@ -82,10 +82,18 @@ def bootstrap_vespa(self) -> bool:
application.gen.send(False) # tell context manager to skip deployment
return False

existing_indexes = self._get_existing_indexes() if has_marqo_settings_schema else None
existing_indexes = self._get_existing_indexes() if has_marqo_settings_schema else ()
app.bootstrap(marqo_version, existing_indexes)
return True

def rollback_vespa(self) -> bool:
with self._vespa_deployment_lock():
application = self._vespa_application()
with application as app:
app_changed = app.rollback(version.get_version())
application.gen.send(app_changed)
return app_changed

def create_index(self, marqo_index_request: MarqoIndexRequest) -> MarqoIndex:
"""
Create a Marqo index in a thread-safe manner.
Expand All @@ -103,24 +111,7 @@ def create_index(self, marqo_index_request: MarqoIndexRequest) -> MarqoIndex:
OperationConflictError: If another index creation/deletion operation is
in progress and the lock cannot be acquired
"""
with self._vespa_deployment_lock():
with self._vespa_application_with_deployment_session() as app:
return self._create_one_index(app, marqo_index_request)

@staticmethod
def _create_one_index(app, marqo_index_request):
if app.has_index(marqo_index_request.name):
raise IndexExistsError(f"Index {marqo_index_request.name} already exists")

# FIXME Ideally, this should be populated when used in inference
if marqo_index_request.model.text_query_prefix is None:
marqo_index_request.model.text_query_prefix = marqo_index_request.model.get_default_text_query_prefix()
if marqo_index_request.model.text_chunk_prefix is None:
marqo_index_request.model.text_chunk_prefix = marqo_index_request.model.get_default_text_chunk_prefix()

schema, marqo_index = vespa_schema_factory(marqo_index_request).generate_schema()
app.add_index_setting_and_schema(marqo_index, schema)
return marqo_index
return self.batch_create_indexes([marqo_index_request])[0]

def batch_create_indexes(self, marqo_index_requests: List[MarqoIndexRequest]) -> List[MarqoIndex]:
"""
Expand All @@ -141,9 +132,21 @@ def batch_create_indexes(self, marqo_index_requests: List[MarqoIndexRequest]) ->
OperationConflictError: If another index creation/deletion operation is
in progress and the lock cannot be acquired
"""
index_to_create: List[Tuple[str, MarqoIndex]] = []
for request in marqo_index_requests:
# set the default prefixes if not provided
if request.model.text_query_prefix is None:
request.model.text_query_prefix = request.model.get_default_text_query_prefix()
if request.model.text_chunk_prefix is None:
request.model.text_chunk_prefix = request.model.get_default_text_chunk_prefix()

index_to_create.append(vespa_schema_factory(request).generate_schema())

with self._vespa_deployment_lock():
with self._vespa_application_with_deployment_session() as app:
return [self._create_one_index(app, request) for request in marqo_index_requests]
app.batch_add_index_setting_and_schema(index_to_create)

return [index for _, index in index_to_create]

def delete_index_by_name(self, index_name: str) -> None:
"""
Expand All @@ -157,9 +160,7 @@ def delete_index_by_name(self, index_name: str) -> None:
OperationConflictError: If another index creation/deletion operation is
in progress and the lock cannot be acquired
"""
with self._vespa_deployment_lock():
with self._vespa_application_with_deployment_session() as app:
app.delete_index_setting_and_schema(index_name)
self.batch_delete_indexes_by_name([index_name])

def batch_delete_indexes_by_name(self, index_names: List[str]) -> None:
"""
Expand All @@ -174,8 +175,7 @@ def batch_delete_indexes_by_name(self, index_names: List[str]) -> None:
"""
with self._vespa_deployment_lock():
with self._vespa_application_with_deployment_session() as app:
for index_name in index_names:
app.delete_index_setting_and_schema(index_name)
app.batch_delete_index_setting_and_schema(index_names)

def _get_existing_indexes(self) -> List[MarqoIndex]:
"""
Expand Down Expand Up @@ -237,7 +237,7 @@ def get_index(self, index_name) -> MarqoIndex:

def get_marqo_version(self) -> str:
"""
This method is only used during upgrade and rollback, it might not be needed anymore
This method is only used during upgrade and rollback
TODO check if this is still needed
"""
application = self._vespa_application_with_deployment_session()
Expand Down Expand Up @@ -266,7 +266,6 @@ def _vespa_application(self, check_configured: bool = True):
should_deploy = yield app

if should_deploy is None or should_deploy is True:
app.save_to_store()
self.vespa_client.deploy_application(app_root_path)
self.vespa_client.wait_for_application_convergence()

Expand All @@ -291,7 +290,6 @@ def _vespa_application_with_deployment_session(self, check_configured: bool = Tr
should_deploy = yield app

if should_deploy is None or should_deploy is True:
app.save_to_store()
self.vespa_client.prepare(session_id, httpx_client)
# TODO handle prepare configChangeActions
# https://docs.vespa.ai/en/reference/deploy-rest-api-v2.html#prepare-session
Expand Down
Loading

0 comments on commit 1246404

Please sign in to comment.