Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implements a first try at splitting xref into batches #3849

Open
wants to merge 1 commit into
base: release/4.0.0
Choose a base branch
from

Conversation

TheApeMachine
Copy link

  • Changes the type of the XREF_SCROLL_SIZE setting to integer.
  • Adds new setting for STAGE_XREF_BATCH
  • Adds above setting to Worker
  • Modifies xref_collection method to enqueue in batches determined by scroll size setting

Note: I think I have made a mistake in accidentally removing a line, I added a TODO in the code. I have to quickly test things when I put it back, as I cannot guarantee things will remain in a working state if I put it back blindly now. However, I would love some feedback to understand if I am on the right track with all this :)

Copy link
Contributor

@tillprochaska tillprochaska left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven’t actually tested this myself, but had a look at the code. In general, this looks good to me, thanks a lot for working on this! 🎉

Note: I think I have made a mistake in accidentally removing a line, I added a TODO in the code

I think your instinct was right here, I‘ve added a comment with more info below.

delete_entities(collection.id, origin=ORIGIN, sync=True)
delete_entities(collection.id, origin='xref', sync=True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably just an oversight/unintended, but it would be good to keep the ORIGIN constant as it is used elsewhere, too.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that is a good point, I will put that back :)

enqueue_xref_batch(collection, batch)

log.info(f"[{collection}] Xref batches enqueued, processing will continue in the background.")
refresh_collection(collection.id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is probably a little tricky. Previously, this would have invalidated cached statistics etc. after the xref processing has completed. That was easy when xref was a single task, but I’m not sure there’s a simple/clean way to solve this with subtasks, as we don’t have a way to specify task dependencies (i.e. we cannot execute a specific task after certain other tasks have been executed).

I need to have a closer look at what exactly is refreshed here to understand if not refreshing stats after xref might be acceptable. I guess the alternative would be to simply invalidate stats after every batch. If you’ve thought about this, too, and have ideas, let me know! :)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll have to check a couple of things as well before I could speak on that, so I will get back on this point, I think by tomorrow.

Copy link
Contributor

@tillprochaska tillprochaska Aug 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had another look: The cached data is mostly statistics about the collection (for example, number of entities by schema, …).

Computing these stats uses ES aggregation queries that can be quite expensive, but in this case they are always limited to a single collection, so they should usually be relatively quick to compute. Additionally, refresh_collection doesn’t actually recompute the stats, it invalidates them so that they are recomputed the next time they are accessed.

I think we should be fine simply calling refresh_collection after each batch of entities.

Comment on lines +303 to +306
# TODO: I just realized I had accidentally removed these lines, which is probably why it was not reindexing/mapping.
# I have to test this first before I can confidently add these back. Currently, at least there is nothing breaking catastrophically.
# log.info(f"[{collection}] Xref done, re-indexing to reify mentions...")
# reindex_collection(collection, sync=False)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I remember correctly, this is due to the way cross-referencing mentions currently works. Basically, every mention that has matches is converted to a proper entity, and these new entities need to be indexed. This isn’t great from a UX perspective, and users have been confused when there are new entities created magically. But at the same time, being able to cross-reference mentions is an important feature for many Aleph users.

Similarly to my comment above, reindexing needs to happen after all xref batches have been processed. I guess there are two options here:

  1. Fix the underlying issue (BUG: Entities automatically generated from mentions in an investigation during cross reference #2994) by not auto-creating entities from mentions during cross-referencing. This likely includes UI changes, too, so probably a little more work and best done as a separate PR.
  2. Enqueue index tasks for the specific entities that are created from mentions during cross-referencing. I think this might be a good idea in general. Right now, we re-index everything, even all the "normal" entities that do not need to be re-indexed. This seems quite straight forward to me, maybe a little too easy, so now I’m wondering if there is a non-obvious downside to this approach.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, and in case you’ve never had to cross-reference mentions from documents before, here’s how you can test it:

  • Create Investigation A.
  • Create a few person entities with common names in Investigation A.
  • Create Investigation B.
  • Create a PDF or plaintext document that contains the names of the entities in Collection A. Upload the document to Investigation B and make sure that Aleph recognized the names in the document.
  • Run cross-referencing in Investigation B. Once it has completed, you should see matches for the names in the document.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would imagine that we need to reindex per batch now, otherwise the batches will become dependent on each other again for success/completion? I have to take another look at this as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to self: Might need to run format/lint checks manually as they are currently not executed for PRs from forks.


batch = []
batch_size = int(SETTINGS.XREF_SCROLL_SIZE) # Ensure batch_size is an integer
for entity in iter_entities(collection_id=collection.id):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a suggestion to clean this up could be to use itertools.batched. Along the lines of:

for batch in itertools.batched(iter_entities(collection_id=collection.id), int(SETTINGS.XREF_SCROLL_SIZE)):
  enqueue_xref_batch(collection, batch)

Comment on lines 300 to 301
index_matches(collection, _query_entities(collection))
index_matches(collection, _query_mentions(collection))
Copy link
Contributor

@tillprochaska tillprochaska Aug 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I didn’t notice this when I first looked at the PR: I think you’d also need to adjust this to work with the list of entity_ids. If I’m not misunderstanding something, right now, these methods iterate over all entities/mentions in the collection. So effectively, you’d xref all entities and mentions for every batch (rather than just the entities/mentions in that batch).

I’m still undecided what the best solution for this would be, but maybe something like this (pseudo code)?

matches = []

for data in entities_by_id(entity_ids):
  schema = model.get(data.get("schema"))

  if not schema:
    continue

  proxy = model.get_proxy(data)

  if schema.name == "Mention":
    matches.extend(_query_mention(proxy))
  else:
    matches.extend(_query_item(proxy))

  index_matches(matches)

Please feel free to reach out if you get stuck with this!


batch = []
batch_size = int(SETTINGS.XREF_SCROLL_SIZE) # Ensure batch_size is an integer
for entity in iter_entities(collection_id=collection.id):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not super important and I’m not sure if this would have any measurable impact, but as you only need the ID and no other fields from ES, you could try using the includes argument with a value of [] (or maybe if that doesn’t work something like ["id"] or ["_id"]):

Suggested change
for entity in iter_entities(collection_id=collection.id):
for entity in iter_entities(collection_id=collection.id, includes=[]):

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants