Skip to content

Commit

Permalink
Add info level logging for bootstrapping and rollback
Browse files Browse the repository at this point in the history
  • Loading branch information
papa99do committed Aug 12, 2024
1 parent 5e5ade2 commit b3bed9b
Showing 1 changed file with 31 additions and 2 deletions.
33 changes: 31 additions & 2 deletions src/marqo/core/index_management/vespa_application_package.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,13 +261,18 @@ def __init__(self, backup_zip_file_content: Optional[bytes] = None) -> None:
self._dir = tempfile.mkdtemp()
self._removal_mark_file = os.path.join(self._dir, self._REMOVE_FILE_LIST)
self._files_to_remove = []
self._files_to_rollback = []

if backup_zip_file_content is not None:
self._extract_gzip_from_bytes(backup_zip_file_content)
if os.path.isfile(self._removal_mark_file):
self._files_to_remove = json.load(open(self._removal_mark_file))
os.remove(self._removal_mark_file)

def __repr__(self):
return (f'<VespaAppBackup files_to_rollback={[os.path.join(paths) for paths in self._files_to_rollback]} '
f'files_to_remove={[os.path.join(paths) for paths in self._files_to_remove]}>')

def read_text_file(self, *paths: str) -> Optional[str]:
path = os.path.join(self._dir, *paths)
if not os.path.isfile(path):
Expand Down Expand Up @@ -297,6 +302,8 @@ def backup_file(self, content: Union[str, bytes], *paths: str) -> None:
with open(path, mode) as f:
f.write(content)

self._files_to_rollback.append(paths)

def mark_for_removal(self, *paths: str) -> None:
self._files_to_remove.append(paths)

Expand Down Expand Up @@ -485,23 +492,31 @@ def need_bootstrapping(self, marqo_version: str, marqo_config_doc: Optional[Marq
- from the marqo_config_doc passed in which is marqo__config doc saved in marqo__settings schema (Post v2.1.0)
- if neither is available, return 2.0.0 as default (Pre v2.1.0 or not bootstrapped yet)
"""
logger.info(f'Marqo version is {marqo_version}')

if self.is_configured and self.get_marqo_config() is not None:
app_version = self.get_marqo_config().version
logger.info(f'Vespa app version is detected in marqo_config.json: {app_version}')
elif marqo_config_doc:
app_version = marqo_config_doc.version
logger.info(f'Vespa app version is detected in marqo__config doc: {app_version}')
else:
app_version = '2.0.0'
logger.info(f'Vespa app version is not detected, set to default: {app_version}')

marqo_sem_version = semver.VersionInfo.parse(marqo_version, optional_minor_and_patch=True)
app_sem_version = semver.VersionInfo.parse(app_version, optional_minor_and_patch=True)

return app_sem_version < marqo_sem_version

def bootstrap(self, marqo_version: str, existing_index_settings: List[MarqoIndex] = ()) -> None:
logger.info(f'Bootstrapping Vespa app to {marqo_version}')
# Migrate existing index settings from previous versions of Marqo
if not self.is_configured:
for index in existing_index_settings:
self._index_setting_store.save_index_setting(index)
if existing_index_settings is not None and len(existing_index_settings) > 0:
logger.info(f'Importing existing index settings {[index.json for index in existing_index_settings]}')
for index in existing_index_settings:
self._index_setting_store.save_index_setting(index)
self._persist_index_settings()

backup = VespaAppBackup()
Expand All @@ -510,11 +525,17 @@ def bootstrap(self, marqo_version: str, existing_index_settings: List[MarqoIndex
self._service_xml.config_components()
self._marqo_config_store.update_version(marqo_version)

logger.info(f'Persisting services.xml file: {self._service_xml}')
self._store.save_file(self._service_xml.to_xml(), self._SERVICES_XML_FILE, backup=backup)

logger.info(f'Persisting marqo config: {self._marqo_config_store.get().json()}')
self._store.save_file(self._marqo_config_store.get().json(), self._MARQO_CONFIG_FILE, backup=backup)

logger.info(f'Generating backup file {self._BACKUP_FILE} from {backup}')
self._store.save_file(backup.to_zip_stream().read(), self._BACKUP_FILE)

def rollback(self, marqo_version: str) -> None:
logger.info(f'Rolling Vespa app back to {marqo_version}')
marqo_sem_version = semver.VersionInfo.parse(marqo_version, optional_minor_and_patch=True)
app_sem_version = semver.VersionInfo.parse(self.get_marqo_config().version, optional_minor_and_patch=True)
if marqo_sem_version >= app_sem_version:
Expand All @@ -525,6 +546,7 @@ def rollback(self, marqo_version: str) -> None:
raise ApplicationRollbackError(f"{self._BACKUP_FILE} does not exist in current session, failed to rollback")

old_backup = VespaAppBackup(self._store.read_binary_file(self._BACKUP_FILE))
logger.info(f'Old backup {old_backup}')

marqo_config = MarqoConfigStore(old_backup.read_text_file(self._MARQO_CONFIG_FILE)).get()
if marqo_config is not None and marqo_config.version != marqo_version:
Expand All @@ -535,13 +557,16 @@ def rollback(self, marqo_version: str) -> None:

new_backup = VespaAppBackup()
for paths in old_backup.files_to_remove():
logger.info(f'Removing file {paths}')
self._store.remove_file(*paths, backup=new_backup)

for (path, file_content) in old_backup.files_to_rollback():
logger.info(f'Rolling back file {path}')
if path == self._SERVICES_XML_FILE:
self._validate_services_xml_for_rollback(file_content.decode('utf-8'))
self._store.save_file(file_content, path, backup=new_backup)

logger.info(f'Generating backup file {self._BACKUP_FILE} from {new_backup}')
self._store.save_file(new_backup.to_zip_stream().read(), self._BACKUP_FILE)

def batch_add_index_setting_and_schema(self, indexes: List[Tuple[str, MarqoIndex]]) -> None:
Expand Down Expand Up @@ -589,11 +614,13 @@ def _config_query_profiles(self, backup: VespaAppBackup) -> None:
</query-profile>
'''
)
logger.info(f'Configuring query profiles {content}')
self._store.save_file(content, 'search', 'query-profiles', 'default.xml', backup=backup)

def _copy_components_jar(self) -> None:
components_jar_file = 'marqo-custom-searchers-deploy.jar'

logger.info(f'Copying components jar file {components_jar_file}')
with open(self._COMPONENTS_JAR_FOLDER/components_jar_file, 'rb') as f:
self._store.save_file(f.read(), 'components', components_jar_file)

Expand All @@ -610,6 +637,8 @@ def _add_schema_removal_override(self) -> None:
def _validate_services_xml_for_rollback(self, services_xml_backup: str) -> None:
services_xml_old = ServiceXml(services_xml_backup)
if not self._service_xml.compare_element(services_xml_old, 'content/documents'):
# TODO should we capture diff in the log?
raise ApplicationRollbackError('Indexes have been added or removed since last backup. Aborting rollback.')
if not self._service_xml.compare_element(services_xml_old, '*/nodes'):
# TODO should we capture diff in the log?
raise ApplicationRollbackError('Nodes have been added or removed since last backup. Aborting rollback.')

0 comments on commit b3bed9b

Please sign in to comment.