From 7759619b585d41b227a515696beb9775c4e733a4 Mon Sep 17 00:00:00 2001 From: Earle Lowe <30607889+emlowe@users.noreply.github.com> Date: Mon, 30 Sep 2024 09:33:08 -0700 Subject: [PATCH] [CHIA-1427]: Limit full file creation when processing subscription generations (#18612) * Some logging and some code to limit full file generation based on the max number of full files allowed * Don't write out full files that aren't needed * Black fixes * Adjust the full file during error conditions * No need for try --- .../_tests/core/data_layer/test_data_store.py | 14 +++++- chia/data_layer/data_layer.py | 43 +++++++++++++++---- chia/data_layer/download_data.py | 23 +++++++--- 3 files changed, 63 insertions(+), 17 deletions(-) diff --git a/chia/_tests/core/data_layer/test_data_store.py b/chia/_tests/core/data_layer/test_data_store.py index 5bdd76487503..1c21a6ed9e1d 100644 --- a/chia/_tests/core/data_layer/test_data_store.py +++ b/chia/_tests/core/data_layer/test_data_store.py @@ -1369,6 +1369,7 @@ async def mock_http_download( data_store=data_store, store_id=store_id, existing_generation=3, + target_generation=4, root_hashes=[bytes32.random(seeded_random)], server_info=sinfo, client_foldername=tmp_path, @@ -1392,6 +1393,7 @@ async def mock_http_download( data_store=data_store, store_id=store_id, existing_generation=3, + target_generation=4, root_hashes=[bytes32.random(seeded_random)], server_info=sinfo, client_foldername=tmp_path, @@ -1830,6 +1832,7 @@ async def test_delete_store_data_protects_pending_roots(raw_data_store: DataStor @pytest.mark.anyio @boolean_datacases(name="group_files_by_store", true="group by singleton", false="don't group by singleton") +@pytest.mark.parametrize("max_full_files", [1, 2, 5]) async def test_insert_from_delta_file( data_store: DataStore, store_id: bytes32, @@ -1837,6 +1840,7 @@ async def test_insert_from_delta_file( tmp_path: Path, seeded_random: random.Random, group_files_by_store: bool, + max_full_files: int, ) -> None: await data_store.create_tree(store_id=store_id, status=Status.COMMITTED) num_files = 5 @@ -1908,6 +1912,7 @@ async def mock_http_download_2( data_store=data_store, store_id=store_id, existing_generation=0, + target_generation=num_files + 1, root_hashes=root_hashes, server_info=sinfo, client_foldername=tmp_path_1, @@ -1916,6 +1921,7 @@ async def mock_http_download_2( proxy_url="", downloader=None, group_files_by_store=group_files_by_store, + maximum_full_file_count=max_full_files, ) assert not success @@ -1929,6 +1935,7 @@ async def mock_http_download_2( data_store=data_store, store_id=store_id, existing_generation=0, + target_generation=num_files + 1, root_hashes=root_hashes, server_info=sinfo, client_foldername=tmp_path_1, @@ -1937,6 +1944,7 @@ async def mock_http_download_2( proxy_url="", downloader=None, group_files_by_store=group_files_by_store, + maximum_full_file_count=max_full_files, ) assert success @@ -1944,7 +1952,7 @@ async def mock_http_download_2( assert root.generation == num_files + 1 with os.scandir(store_path) as entries: filenames = {entry.name for entry in entries} - assert len(filenames) == 2 * (num_files + 1) + assert len(filenames) == num_files + 1 + max_full_files # 6 deltas and max_full_files full files kv = await data_store.get_keys_values(store_id=store_id) assert kv == kv_before @@ -2032,6 +2040,7 @@ async def test_insert_from_delta_file_correct_file_exists( data_store=data_store, store_id=store_id, existing_generation=0, + target_generation=num_files + 1, root_hashes=root_hashes, server_info=sinfo, client_foldername=tmp_path, @@ -2047,7 +2056,7 @@ async def test_insert_from_delta_file_correct_file_exists( assert root.generation == num_files + 1 with os.scandir(store_path) as entries: filenames = {entry.name for entry in entries} - assert len(filenames) == 2 * (num_files + 1) + assert len(filenames) == num_files + 2 # 1 full and 6 deltas kv = await data_store.get_keys_values(store_id=store_id) assert kv == kv_before @@ -2094,6 +2103,7 @@ async def test_insert_from_delta_file_incorrect_file_exists( data_store=data_store, store_id=store_id, existing_generation=1, + target_generation=6, root_hashes=[incorrect_root_hash], server_info=sinfo, client_foldername=tmp_path, diff --git a/chia/data_layer/data_layer.py b/chia/data_layer/data_layer.py index bcbf7b1871c5..0c0360a0c5ea 100644 --- a/chia/data_layer/data_layer.py +++ b/chia/data_layer/data_layer.py @@ -568,6 +568,7 @@ async def fetch_and_validate(self, store_id: bytes32) -> None: servers_info = await self.data_store.get_available_servers_for_store(store_id, timestamp) # TODO: maybe append a random object to the whole DataLayer class? random.shuffle(servers_info) + success = False for server_info in servers_info: url = server_info.url @@ -600,14 +601,16 @@ async def fetch_and_validate(self, store_id: bytes32) -> None: self.data_store, store_id, root.generation, - [record.root for record in reversed(to_download)], - server_info, - self.server_files_location, - self.client_timeout, - self.log, - proxy_url, - await self.get_downloader(store_id, url), - self.group_files_by_store, + target_generation=singleton_record.generation, + root_hashes=[record.root for record in reversed(to_download)], + server_info=server_info, + client_foldername=self.server_files_location, + timeout=self.client_timeout, + log=self.log, + proxy_url=proxy_url, + downloader=await self.get_downloader(store_id, url), + group_files_by_store=self.group_files_by_store, + maximum_full_file_count=self.maximum_full_file_count, ) if success: self.log.info( @@ -621,6 +624,30 @@ async def fetch_and_validate(self, store_id: bytes32) -> None: except Exception as e: self.log.warning(f"Exception while downloading files for {store_id}: {e} {traceback.format_exc()}.") + # if there aren't any servers then don't try to write the full tree + if not success and len(servers_info) > 0: + root = await self.data_store.get_tree_root(store_id=store_id) + if root.node_hash is None: + return + filename_full_tree = get_full_tree_filename_path( + foldername=self.server_files_location, + store_id=store_id, + node_hash=root.node_hash, + generation=root.generation, + group_by_store=self.group_files_by_store, + ) + # Had trouble with this generation, so generate full file for the generation we currently have + if not os.path.exists(filename_full_tree): + with open(filename_full_tree, "wb") as writer: + await self.data_store.write_tree_to_file( + root=root, + node_hash=root.node_hash, + store_id=store_id, + deltas_only=False, + writer=writer, + ) + self.log.info(f"Successfully written full tree filename {filename_full_tree}.") + async def get_downloader(self, store_id: bytes32, url: str) -> Optional[PluginRemote]: request_json = {"store_id": store_id.hex(), "url": url} for d in self.downloaders: diff --git a/chia/data_layer/download_data.py b/chia/data_layer/download_data.py index 1e0e4dae5a1e..331e2cb50b1f 100644 --- a/chia/data_layer/download_data.py +++ b/chia/data_layer/download_data.py @@ -92,7 +92,8 @@ async def insert_into_data_store_from_file( store_id: bytes32, root_hash: Optional[bytes32], filename: Path, -) -> None: +) -> int: + num_inserted = 0 with open(filename, "rb") as reader: while True: chunk = b"" @@ -119,8 +120,10 @@ async def insert_into_data_store_from_file( node_type = NodeType.TERMINAL if serialized_node.is_terminal else NodeType.INTERNAL await data_store.insert_node(node_type, serialized_node.value1, serialized_node.value2) + num_inserted += 1 await data_store.insert_root_with_ancestor_table(store_id=store_id, node_hash=root_hash, status=Status.COMMITTED) + return num_inserted @dataclass @@ -233,6 +236,7 @@ async def insert_from_delta_file( data_store: DataStore, store_id: bytes32, existing_generation: int, + target_generation: int, root_hashes: List[bytes32], server_info: ServerInfo, client_foldername: Path, @@ -241,6 +245,7 @@ async def insert_from_delta_file( proxy_url: str, downloader: Optional[PluginRemote], group_files_by_store: bool = False, + maximum_full_file_count: int = 1, ) -> bool: if group_files_by_store: client_foldername.joinpath(f"{store_id}").mkdir(parents=True, exist_ok=True) @@ -283,7 +288,7 @@ async def insert_from_delta_file( existing_generation, group_files_by_store, ) - await insert_into_data_store_from_file( + num_inserted = await insert_into_data_store_from_file( data_store, store_id, None if root_hash == bytes32([0] * 32) else root_hash, @@ -291,13 +296,17 @@ async def insert_from_delta_file( ) log.info( f"Successfully inserted hash {root_hash} from delta file. " - f"Generation: {existing_generation}. Store id: {store_id}." + f"Generation: {existing_generation}. Store id: {store_id}. Nodes inserted: {num_inserted}." ) - root = await data_store.get_tree_root(store_id=store_id) - with open(filename_full_tree, "wb") as writer: - await data_store.write_tree_to_file(root, root_hash, store_id, False, writer) - log.info(f"Successfully written full tree filename {filename_full_tree}.") + if target_generation - existing_generation <= maximum_full_file_count - 1: + root = await data_store.get_tree_root(store_id=store_id) + with open(filename_full_tree, "wb") as writer: + await data_store.write_tree_to_file(root, root_hash, store_id, False, writer) + log.info(f"Successfully written full tree filename {filename_full_tree}.") + else: + log.info(f"Skipping full file generation for {existing_generation}") + await data_store.received_correct_file(store_id, server_info) except Exception: try: