Skip to content

Commit

Permalink
[CHIA-1427]: Limit full file creation when processing subscription ge…
Browse files Browse the repository at this point in the history
…nerations (#18612)

* Some logging and some code to limit full file generation based on the max number of full files allowed

* Don't write out full files that aren't needed

* Black fixes

* Adjust the full file during error conditions

* No need for try
  • Loading branch information
emlowe authored Sep 30, 2024
1 parent cc554ea commit 7759619
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 17 deletions.
14 changes: 12 additions & 2 deletions chia/_tests/core/data_layer/test_data_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -1369,6 +1369,7 @@ async def mock_http_download(
data_store=data_store,
store_id=store_id,
existing_generation=3,
target_generation=4,
root_hashes=[bytes32.random(seeded_random)],
server_info=sinfo,
client_foldername=tmp_path,
Expand All @@ -1392,6 +1393,7 @@ async def mock_http_download(
data_store=data_store,
store_id=store_id,
existing_generation=3,
target_generation=4,
root_hashes=[bytes32.random(seeded_random)],
server_info=sinfo,
client_foldername=tmp_path,
Expand Down Expand Up @@ -1830,13 +1832,15 @@ async def test_delete_store_data_protects_pending_roots(raw_data_store: DataStor

@pytest.mark.anyio
@boolean_datacases(name="group_files_by_store", true="group by singleton", false="don't group by singleton")
@pytest.mark.parametrize("max_full_files", [1, 2, 5])
async def test_insert_from_delta_file(
data_store: DataStore,
store_id: bytes32,
monkeypatch: Any,
tmp_path: Path,
seeded_random: random.Random,
group_files_by_store: bool,
max_full_files: int,
) -> None:
await data_store.create_tree(store_id=store_id, status=Status.COMMITTED)
num_files = 5
Expand Down Expand Up @@ -1908,6 +1912,7 @@ async def mock_http_download_2(
data_store=data_store,
store_id=store_id,
existing_generation=0,
target_generation=num_files + 1,
root_hashes=root_hashes,
server_info=sinfo,
client_foldername=tmp_path_1,
Expand All @@ -1916,6 +1921,7 @@ async def mock_http_download_2(
proxy_url="",
downloader=None,
group_files_by_store=group_files_by_store,
maximum_full_file_count=max_full_files,
)
assert not success

Expand All @@ -1929,6 +1935,7 @@ async def mock_http_download_2(
data_store=data_store,
store_id=store_id,
existing_generation=0,
target_generation=num_files + 1,
root_hashes=root_hashes,
server_info=sinfo,
client_foldername=tmp_path_1,
Expand All @@ -1937,14 +1944,15 @@ async def mock_http_download_2(
proxy_url="",
downloader=None,
group_files_by_store=group_files_by_store,
maximum_full_file_count=max_full_files,
)
assert success

root = await data_store.get_tree_root(store_id=store_id)
assert root.generation == num_files + 1
with os.scandir(store_path) as entries:
filenames = {entry.name for entry in entries}
assert len(filenames) == 2 * (num_files + 1)
assert len(filenames) == num_files + 1 + max_full_files # 6 deltas and max_full_files full files
kv = await data_store.get_keys_values(store_id=store_id)
assert kv == kv_before

Expand Down Expand Up @@ -2032,6 +2040,7 @@ async def test_insert_from_delta_file_correct_file_exists(
data_store=data_store,
store_id=store_id,
existing_generation=0,
target_generation=num_files + 1,
root_hashes=root_hashes,
server_info=sinfo,
client_foldername=tmp_path,
Expand All @@ -2047,7 +2056,7 @@ async def test_insert_from_delta_file_correct_file_exists(
assert root.generation == num_files + 1
with os.scandir(store_path) as entries:
filenames = {entry.name for entry in entries}
assert len(filenames) == 2 * (num_files + 1)
assert len(filenames) == num_files + 2 # 1 full and 6 deltas
kv = await data_store.get_keys_values(store_id=store_id)
assert kv == kv_before

Expand Down Expand Up @@ -2094,6 +2103,7 @@ async def test_insert_from_delta_file_incorrect_file_exists(
data_store=data_store,
store_id=store_id,
existing_generation=1,
target_generation=6,
root_hashes=[incorrect_root_hash],
server_info=sinfo,
client_foldername=tmp_path,
Expand Down
43 changes: 35 additions & 8 deletions chia/data_layer/data_layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,7 @@ async def fetch_and_validate(self, store_id: bytes32) -> None:
servers_info = await self.data_store.get_available_servers_for_store(store_id, timestamp)
# TODO: maybe append a random object to the whole DataLayer class?
random.shuffle(servers_info)
success = False
for server_info in servers_info:
url = server_info.url

Expand Down Expand Up @@ -600,14 +601,16 @@ async def fetch_and_validate(self, store_id: bytes32) -> None:
self.data_store,
store_id,
root.generation,
[record.root for record in reversed(to_download)],
server_info,
self.server_files_location,
self.client_timeout,
self.log,
proxy_url,
await self.get_downloader(store_id, url),
self.group_files_by_store,
target_generation=singleton_record.generation,
root_hashes=[record.root for record in reversed(to_download)],
server_info=server_info,
client_foldername=self.server_files_location,
timeout=self.client_timeout,
log=self.log,
proxy_url=proxy_url,
downloader=await self.get_downloader(store_id, url),
group_files_by_store=self.group_files_by_store,
maximum_full_file_count=self.maximum_full_file_count,
)
if success:
self.log.info(
Expand All @@ -621,6 +624,30 @@ async def fetch_and_validate(self, store_id: bytes32) -> None:
except Exception as e:
self.log.warning(f"Exception while downloading files for {store_id}: {e} {traceback.format_exc()}.")

# if there aren't any servers then don't try to write the full tree
if not success and len(servers_info) > 0:
root = await self.data_store.get_tree_root(store_id=store_id)
if root.node_hash is None:
return
filename_full_tree = get_full_tree_filename_path(
foldername=self.server_files_location,
store_id=store_id,
node_hash=root.node_hash,
generation=root.generation,
group_by_store=self.group_files_by_store,
)
# Had trouble with this generation, so generate full file for the generation we currently have
if not os.path.exists(filename_full_tree):
with open(filename_full_tree, "wb") as writer:
await self.data_store.write_tree_to_file(
root=root,
node_hash=root.node_hash,
store_id=store_id,
deltas_only=False,
writer=writer,
)
self.log.info(f"Successfully written full tree filename {filename_full_tree}.")

async def get_downloader(self, store_id: bytes32, url: str) -> Optional[PluginRemote]:
request_json = {"store_id": store_id.hex(), "url": url}
for d in self.downloaders:
Expand Down
23 changes: 16 additions & 7 deletions chia/data_layer/download_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ async def insert_into_data_store_from_file(
store_id: bytes32,
root_hash: Optional[bytes32],
filename: Path,
) -> None:
) -> int:
num_inserted = 0
with open(filename, "rb") as reader:
while True:
chunk = b""
Expand All @@ -119,8 +120,10 @@ async def insert_into_data_store_from_file(

node_type = NodeType.TERMINAL if serialized_node.is_terminal else NodeType.INTERNAL
await data_store.insert_node(node_type, serialized_node.value1, serialized_node.value2)
num_inserted += 1

await data_store.insert_root_with_ancestor_table(store_id=store_id, node_hash=root_hash, status=Status.COMMITTED)
return num_inserted


@dataclass
Expand Down Expand Up @@ -233,6 +236,7 @@ async def insert_from_delta_file(
data_store: DataStore,
store_id: bytes32,
existing_generation: int,
target_generation: int,
root_hashes: List[bytes32],
server_info: ServerInfo,
client_foldername: Path,
Expand All @@ -241,6 +245,7 @@ async def insert_from_delta_file(
proxy_url: str,
downloader: Optional[PluginRemote],
group_files_by_store: bool = False,
maximum_full_file_count: int = 1,
) -> bool:
if group_files_by_store:
client_foldername.joinpath(f"{store_id}").mkdir(parents=True, exist_ok=True)
Expand Down Expand Up @@ -283,21 +288,25 @@ async def insert_from_delta_file(
existing_generation,
group_files_by_store,
)
await insert_into_data_store_from_file(
num_inserted = await insert_into_data_store_from_file(
data_store,
store_id,
None if root_hash == bytes32([0] * 32) else root_hash,
target_filename_path,
)
log.info(
f"Successfully inserted hash {root_hash} from delta file. "
f"Generation: {existing_generation}. Store id: {store_id}."
f"Generation: {existing_generation}. Store id: {store_id}. Nodes inserted: {num_inserted}."
)

root = await data_store.get_tree_root(store_id=store_id)
with open(filename_full_tree, "wb") as writer:
await data_store.write_tree_to_file(root, root_hash, store_id, False, writer)
log.info(f"Successfully written full tree filename {filename_full_tree}.")
if target_generation - existing_generation <= maximum_full_file_count - 1:
root = await data_store.get_tree_root(store_id=store_id)
with open(filename_full_tree, "wb") as writer:
await data_store.write_tree_to_file(root, root_hash, store_id, False, writer)
log.info(f"Successfully written full tree filename {filename_full_tree}.")
else:
log.info(f"Skipping full file generation for {existing_generation}")

await data_store.received_correct_file(store_id, server_info)
except Exception:
try:
Expand Down

0 comments on commit 7759619

Please sign in to comment.