-
Notifications
You must be signed in to change notification settings - Fork 268
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implements a first try at splitting xref into batches #3849
base: release/4.0.0
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -24,19 +24,19 @@ | |||||
from aleph.model import EntitySet | ||||||
from aleph.authz import Authz | ||||||
from aleph.logic import resolver | ||||||
from aleph.logic.collections import reindex_collection | ||||||
from aleph.logic.collections import reindex_collection, refresh_collection | ||||||
from aleph.logic.aggregator import get_aggregator | ||||||
from aleph.logic.matching import match_query | ||||||
from aleph.logic.util import entity_url | ||||||
from aleph.index.xref import index_matches, delete_xref, iter_matches | ||||||
from aleph.index.entities import iter_proxies, entities_by_ids | ||||||
from aleph.index.entities import iter_proxies, entities_by_ids, iter_entities | ||||||
from aleph.index.entities import ENTITY_SOURCE | ||||||
from aleph.index.indexes import entities_read_index | ||||||
from aleph.index.collections import delete_entities | ||||||
from aleph.index.util import unpack_result, none_query | ||||||
from aleph.index.util import BULK_PAGE | ||||||
from aleph.logic.export import complete_export | ||||||
|
||||||
from aleph.queues import queue_task | ||||||
|
||||||
log = logging.getLogger(__name__) | ||||||
ORIGIN = "xref" | ||||||
|
@@ -272,12 +272,38 @@ def xref_collection(collection): | |||||
) | ||||||
log.info(f"[{collection}] Clearing previous xref state....") | ||||||
delete_xref(collection, sync=True) | ||||||
delete_entities(collection.id, origin=ORIGIN, sync=True) | ||||||
delete_entities(collection.id, origin='xref', sync=True) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Probably just an oversight/unintended, but it would be good to keep the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes that is a good point, I will put that back :) |
||||||
|
||||||
batch = [] | ||||||
batch_size = int(SETTINGS.XREF_SCROLL_SIZE) # Ensure batch_size is an integer | ||||||
for entity in iter_entities(collection_id=collection.id): | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just a suggestion to clean this up could be to use itertools.batched. Along the lines of: for batch in itertools.batched(iter_entities(collection_id=collection.id), int(SETTINGS.XREF_SCROLL_SIZE)):
enqueue_xref_batch(collection, batch) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not super important and I’m not sure if this would have any measurable impact, but as you only need the ID and no other fields from ES, you could try using the
Suggested change
|
||||||
batch.append(entity['id']) | ||||||
if len(batch) >= batch_size: | ||||||
enqueue_xref_batch(collection, batch) | ||||||
batch = [] | ||||||
|
||||||
if batch: | ||||||
enqueue_xref_batch(collection, batch) | ||||||
|
||||||
log.info(f"[{collection}] Xref batches enqueued, processing will continue in the background.") | ||||||
refresh_collection(collection.id) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is probably a little tricky. Previously, this would have invalidated cached statistics etc. after the xref processing has completed. That was easy when xref was a single task, but I’m not sure there’s a simple/clean way to solve this with subtasks, as we don’t have a way to specify task dependencies (i.e. we cannot execute a specific task after certain other tasks have been executed). I need to have a closer look at what exactly is refreshed here to understand if not refreshing stats after xref might be acceptable. I guess the alternative would be to simply invalidate stats after every batch. If you’ve thought about this, too, and have ideas, let me know! :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll have to check a couple of things as well before I could speak on that, so I will get back on this point, I think by tomorrow. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Had another look: The cached data is mostly statistics about the collection (for example, number of entities by schema, …). Computing these stats uses ES aggregation queries that can be quite expensive, but in this case they are always limited to a single collection, so they should usually be relatively quick to compute. Additionally, I think we should be fine simply calling |
||||||
|
||||||
|
||||||
def enqueue_xref_batch(collection, entity_ids): | ||||||
"""Enqueue a task to process a batch of entities for cross-referencing.""" | ||||||
queue_task(collection, SETTINGS.STAGE_XREF_BATCH, entity_ids=entity_ids) | ||||||
|
||||||
|
||||||
def process_xref_batch(collection, entity_ids): | ||||||
"""Process a batch of entities for cross-referencing.""" | ||||||
log.info(f"[{collection}] Processing xref batch with {len(entity_ids)} entities") | ||||||
index_matches(collection, _query_entities(collection)) | ||||||
index_matches(collection, _query_mentions(collection)) | ||||||
Comment on lines
300
to
301
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry, I didn’t notice this when I first looked at the PR: I think you’d also need to adjust this to work with the list of I’m still undecided what the best solution for this would be, but maybe something like this (pseudo code)? matches = []
for data in entities_by_id(entity_ids):
schema = model.get(data.get("schema"))
if not schema:
continue
proxy = model.get_proxy(data)
if schema.name == "Mention":
matches.extend(_query_mention(proxy))
else:
matches.extend(_query_item(proxy))
index_matches(matches) Please feel free to reach out if you get stuck with this! |
||||||
log.info(f"[{collection}] Xref done, re-indexing to reify mentions...") | ||||||
reindex_collection(collection, sync=False) | ||||||
|
||||||
# TODO: I just realized I had accidentally removed these lines, which is probably why it was not reindexing/mapping. | ||||||
# I have to test this first before I can confidently add these back. Currently, at least there is nothing breaking catastrophically. | ||||||
# log.info(f"[{collection}] Xref done, re-indexing to reify mentions...") | ||||||
# reindex_collection(collection, sync=False) | ||||||
Comment on lines
+303
to
+306
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If I remember correctly, this is due to the way cross-referencing mentions currently works. Basically, every mention that has matches is converted to a proper entity, and these new entities need to be indexed. This isn’t great from a UX perspective, and users have been confused when there are new entities created magically. But at the same time, being able to cross-reference mentions is an important feature for many Aleph users. Similarly to my comment above, reindexing needs to happen after all xref batches have been processed. I guess there are two options here:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, and in case you’ve never had to cross-reference mentions from documents before, here’s how you can test it:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would imagine that we need to reindex per batch now, otherwise the batches will become dependent on each other again for success/completion? I have to take another look at this as well. |
||||||
|
||||||
def _format_date(proxy): | ||||||
dates = proxy.get_type_values(registry.date) | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note to self: Might need to run format/lint checks manually as they are currently not executed for PRs from forks.