Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Purge + Restore user timeseries data with long-term storage #952

Open
wants to merge 51 commits into
base: master
Choose a base branch
from

Commits on Jan 31, 2023

  1. feat: Add timeseries purge script

    When operating a server, the `Stage_timeseries` database can become
    quite big.
    In the case where only the `Stage_analysis_timeseries` is actually
    useful after the pipeline execution, the user's timeseries can be
    deleted to speed up the pipeline and gain some disk space.
    paultranvan committed Jan 31, 2023
    Configuration menu
    Copy the full SHA
    8a71ce5 View commit details
    Browse the repository at this point in the history

Commits on Mar 28, 2023

  1. Added csv export feature to bin/purge_user_timeseries

    Also added associated unit tests
    TTalex committed Mar 28, 2023
    Configuration menu
    Copy the full SHA
    6ea8ac5 View commit details
    Browse the repository at this point in the history

Commits on Jan 8, 2024

  1. Replaced print() with logging.debug()

    Print() statements weren't being logged in AWS Cloudwatch logs.
    
    Logging.debug() statements are meant for this purpose.
    
    These statements may or may not show up in normal execution output depending on the set logger level.
    Mahadik, Mukul Chandrakant authored and Mahadik, Mukul Chandrakant committed Jan 8, 2024
    Configuration menu
    Copy the full SHA
    d20011b View commit details
    Browse the repository at this point in the history
  2. Merge remote-tracking branch 'ttalex/purge-timeseries' into purge-res…

    …tore-timeseries
    Mahadik, Mukul Chandrakant authored and Mahadik, Mukul Chandrakant committed Jan 8, 2024
    Configuration menu
    Copy the full SHA
    0d0a0ba View commit details
    Browse the repository at this point in the history
  3. Storing data as JSON + Restore code added

    Choosing JSON instead of CSV since:
    1. CSV does not retain nested dict-like document data structure of MongoDB documents.
    2. CSV stores redundant empty NaN columns as well.
    Mahadik, Mukul Chandrakant authored and Mahadik, Mukul Chandrakant committed Jan 8, 2024
    Configuration menu
    Copy the full SHA
    ae6eae6 View commit details
    Browse the repository at this point in the history

Commits on Jan 9, 2024

  1. Current working code for JSON based purge/restore of data

    CSV export kept on hold for now as restoring from CSV is complicated due to loss of data structure.
    
    This commit includes working code for export as JSON file and import from JSON file.
    Mahadik, Mukul Chandrakant authored and Mahadik, Mukul Chandrakant committed Jan 9, 2024
    Configuration menu
    Copy the full SHA
    78979ff View commit details
    Browse the repository at this point in the history
  2. Added CSV export as an option

    Default option for now is JSON which is easier for data restore.
    
    Provided export flags as a boolean dictionary which calls the specific export function as per the set boolean flag.
    Mahadik, Mukul Chandrakant authored and Mahadik, Mukul Chandrakant committed Jan 9, 2024
    Configuration menu
    Copy the full SHA
    2c1ef44 View commit details
    Browse the repository at this point in the history

Commits on Jan 11, 2024

  1. Added more tests for purge and restore operations

    Built on and added tests for normal data operations of purge() and restore().
    
    Added edge cases tests:
    1. Loading duplicate or already existing data by calling restore function again.
    2. Loading from empty JSON file containing no data.
    
    Will add additional tests if needed.
    Mahadik, Mukul Chandrakant authored and Mahadik, Mukul Chandrakant committed Jan 11, 2024
    Configuration menu
    Copy the full SHA
    315510c View commit details
    Browse the repository at this point in the history
  2. Updated test file path

    Changed file path of empty json file used for testing to generic /var/tmp instead of local path.
    Mahadik, Mukul Chandrakant authored and Mahadik, Mukul Chandrakant committed Jan 11, 2024
    Configuration menu
    Copy the full SHA
    479a37f View commit details
    Browse the repository at this point in the history
  3. Updated default directory path

    Changed from "/tmp" to operating system's default temporary directory.
    Makes this functionality work on a cross-platform basis.
    Mahadik, Mukul Chandrakant authored and Mahadik, Mukul Chandrakant committed Jan 11, 2024
    Configuration menu
    Copy the full SHA
    d8ef5f7 View commit details
    Browse the repository at this point in the history

Commits on Jan 19, 2024

  1. Added import options

    Import file type added as command line argument with appropriate functions.
    Mahadik, Mukul Chandrakant authored and Mahadik, Mukul Chandrakant committed Jan 19, 2024
    Configuration menu
    Copy the full SHA
    28719f4 View commit details
    Browse the repository at this point in the history
  2. Added conditional checks before purging

    Checking for valid cstate, query results count before initiating purging.
    Mahadik, Mukul Chandrakant authored and Mahadik, Mukul Chandrakant committed Jan 19, 2024
    Configuration menu
    Copy the full SHA
    da584e6 View commit details
    Browse the repository at this point in the history
  3. Remove print

    Mahadik, Mukul Chandrakant authored and Mahadik, Mukul Chandrakant committed Jan 19, 2024
    Configuration menu
    Copy the full SHA
    190f4d8 View commit details
    Browse the repository at this point in the history

Commits on Jul 9, 2024

  1. Testing changes to get TestExportModule to work

    Mahadik, Mukul Chandrakant authored and Mahadik, Mukul Chandrakant committed Jul 9, 2024
    Configuration menu
    Copy the full SHA
    ee1aada View commit details
    Browse the repository at this point in the history

Commits on Jul 10, 2024

  1. Testing adding database argument to export script

    Details in this comment:
    e-mission#952 (comment)
    Mahadik, Mukul Chandrakant authored and Mahadik, Mukul Chandrakant committed Jul 10, 2024
    Configuration menu
    Copy the full SHA
    3550b1e View commit details
    Browse the repository at this point in the history

Commits on Jul 11, 2024

  1. Trying to run export using export pipeline + Added logging statements

    Mahadik, Mukul Chandrakant authored and Mahadik, Mukul Chandrakant committed Jul 11, 2024
    Configuration menu
    Copy the full SHA
    41ba8f0 View commit details
    Browse the repository at this point in the history
  2. Executing the purge process via the export pipeline now. Added Test f…

    …ile as well. Some pointers: had to comment out logging.config.dictConfig in export_stage as it was giving a logging has no module named config error. Also, running the test with and without running intake pipeline gives different results. I believe it's got to do with how the start_ts is being set as a part of the pipeline process. Next, will move onto purging and restoring from the .gz dump file.
    Mahadik, Mukul Chandrakant authored and Mahadik, Mukul Chandrakant committed Jul 11, 2024
    Configuration menu
    Copy the full SHA
    332ba43 View commit details
    Browse the repository at this point in the history
  3. Trying to make the entire flow work : export -> purge -> restore

    Mahadik, Mukul Chandrakant authored and Mahadik, Mukul Chandrakant committed Jul 11, 2024
    Configuration menu
    Copy the full SHA
    e601491 View commit details
    Browse the repository at this point in the history
  4. Cleaned up code before pushing commits.

    Mahadik, Mukul Chandrakant authored and Mahadik, Mukul Chandrakant committed Jul 11, 2024
    Configuration menu
    Copy the full SHA
    d7823b4 View commit details
    Browse the repository at this point in the history
  5. Added TODO comment to Test file

    Mahadik, Mukul Chandrakant authored and Mahadik, Mukul Chandrakant committed Jul 11, 2024
    Configuration menu
    Copy the full SHA
    24548f4 View commit details
    Browse the repository at this point in the history
  6. Fixed datetime module + Cleaned up redundant changes from other PRs

    Mahadik, Mukul Chandrakant authored and Mahadik, Mukul Chandrakant committed Jul 11, 2024
    Configuration menu
    Copy the full SHA
    6a990cb View commit details
    Browse the repository at this point in the history

Commits on Jul 31, 2024

  1. Removing changes made to original export PR scripts.

    Mahadik, Mukul Chandrakant authored and Mahadik, Mukul Chandrakant committed Jul 31, 2024
    Configuration menu
    Copy the full SHA
    2d73ef9 View commit details
    Browse the repository at this point in the history
  2. Removed newline change from .gitignore

    Mahadik, Mukul Chandrakant authored and Mahadik, Mukul Chandrakant committed Jul 31, 2024
    Configuration menu
    Copy the full SHA
    82ecd73 View commit details
    Browse the repository at this point in the history
  3. Added purge + restore pipeline implementation + Corrected EXPORT_DATA…

    … pipeline state to use unique value.
    Mahadik, Mukul Chandrakant authored and Mahadik, Mukul Chandrakant committed Jul 31, 2024
    Configuration menu
    Copy the full SHA
    01636bf View commit details
    Browse the repository at this point in the history
  4. Added purge + restore pipeline implementation + Corrected EXPORT_DATA…

    … pipeline state to use unique value.
    Mahadik, Mukul Chandrakant authored and Mahadik, Mukul Chandrakant committed Jul 31, 2024
    Configuration menu
    Copy the full SHA
    b9fc467 View commit details
    Browse the repository at this point in the history
  5. Removed import_timeseries import - no longer using this file.

    Mahadik, Mukul Chandrakant authored and Mahadik, Mukul Chandrakant committed Jul 31, 2024
    Configuration menu
    Copy the full SHA
    3bdc8cb View commit details
    Browse the repository at this point in the history
  6. Skipping export, purge, restore stages in pipeline test.

    Mahadik, Mukul Chandrakant authored and Mahadik, Mukul Chandrakant committed Jul 31, 2024
    Configuration menu
    Copy the full SHA
    2729f2b View commit details
    Browse the repository at this point in the history

Commits on Aug 1, 2024

  1. Removed purge/restore standalone scripts; can reuse existing extract/…

    …load scripts.
    Mahadik, Mukul Chandrakant authored and Mahadik, Mukul Chandrakant committed Aug 1, 2024
    Configuration menu
    Copy the full SHA
    a600682 View commit details
    Browse the repository at this point in the history
  2. Added flag to existing extract script to allow purging

    Mahadik, Mukul Chandrakant authored and Mahadik, Mukul Chandrakant committed Aug 1, 2024
    Configuration menu
    Copy the full SHA
    1b6833d View commit details
    Browse the repository at this point in the history
  3. Removed continue_on_error parameter to load timeseries function

    This was causing the exception handling to kick in with the flag considered and no data was being inserted in the database
    Mahadik, Mukul Chandrakant authored and Mahadik, Mukul Chandrakant committed Aug 1, 2024
    Configuration menu
    Copy the full SHA
    306f4de View commit details
    Browse the repository at this point in the history
  4. Deleting unused Test file

    Mahadik, Mukul Chandrakant authored and Mahadik, Mukul Chandrakant committed Aug 1, 2024
    Configuration menu
    Copy the full SHA
    9fb7a93 View commit details
    Browse the repository at this point in the history

Commits on Aug 2, 2024

  1. Tests added (more tests to be added) + Using continue_on_error flag f…

    …or loading data
    Mahadik, Mukul Chandrakant authored and Mahadik, Mukul Chandrakant committed Aug 2, 2024
    Configuration menu
    Copy the full SHA
    2097ff4 View commit details
    Browse the repository at this point in the history
  2. Added duplicate data test + log message + returning inserted entries …

    …count
    Mahadik, Mukul Chandrakant authored and Mahadik, Mukul Chandrakant committed Aug 2, 2024
    Configuration menu
    Copy the full SHA
    103537a View commit details
    Browse the repository at this point in the history
  3. Removed unused Test file for purge / restore

    Using newer test file in exportTests
    Mahadik, Mukul Chandrakant authored and Mahadik, Mukul Chandrakant committed Aug 2, 2024
    Configuration menu
    Copy the full SHA
    33eae2a View commit details
    Browse the repository at this point in the history

Commits on Aug 9, 2024

  1. New logic for handling last processed entry; need to add last_trip va…

    …riable.
    Mahadik, Mukul Chandrakant authored and Mahadik, Mukul Chandrakant committed Aug 9, 2024
    Configuration menu
    Copy the full SHA
    b65430d View commit details
    Browse the repository at this point in the history
  2. Using data.ts for last_processed_ts; not using last_trip_done

    Fetching the last trip using the queries used to export and delete the data.
    Fetched using indexing [-1] and then getting time from 'data.ts' instead of 'data.write_ts.'
    Mahadik, Mukul Chandrakant authored and Mahadik, Mukul Chandrakant committed Aug 9, 2024
    Configuration menu
    Copy the full SHA
    5fb6370 View commit details
    Browse the repository at this point in the history

Commits on Aug 27, 2024

  1. Added last_processed_ts logic to restore_data by using tsdb_count

    Mahadik, Mukul Chandrakant authored and Mahadik, Mukul Chandrakant committed Aug 27, 2024
    Configuration menu
    Copy the full SHA
    592636a View commit details
    Browse the repository at this point in the history

Commits on Aug 28, 2024

  1. Added more tests for testing purge restore pipelines + Using last_pro…

    …cesssed_ts for restore_data
    
    Simply using last index value like how it was done in purge_data
    Mahadik, Mukul Chandrakant authored and Mahadik, Mukul Chandrakant committed Aug 28, 2024
    Configuration menu
    Copy the full SHA
    0e57a76 View commit details
    Browse the repository at this point in the history

Commits on Aug 29, 2024

  1. Draft commit - Testing 1 hour incremental export - some entries missing

    When validate truncate function signature still has 3 arguments, it gives an error but exports all entries.
    However, when I modify it to include only location query, it passes but does not export all entries.
    The last curr_end_ts timetstamp doesn't go all the way to the last one.
    
    Committing changes before switching to another branch.
    Mahadik, Mukul Chandrakant authored and Mahadik, Mukul Chandrakant committed Aug 29, 2024
    Configuration menu
    Copy the full SHA
    661a222 View commit details
    Browse the repository at this point in the history
  2. Implemented Full + Incremental export + purge + restore

    Exporting and Purging entries in 1 hour time chunks into separate files with a defined start_ts and end_ts.
    
    Start_ts is the last_processed_ts for the user_id's Purge pipeline state.
    - If the pipeline has already been run for the user, then this would be a non-None value.
    - If pipeline hasn't been run, then it will be None; in this case, the earliest timestamp entry is chosen as the start ts. This helps avoid ValueErrors when adding 1 hour (3600 seconds) to start_ts for incremental export.
    
    End_ts differs for Full export and Incremental export:
    - Full export: current time at which pipeline is run will be the end ts; value returned by pipeline_queries on initiating pipeline stage is used.
    - Incremental export: First end_ts value would be 1 hour ahead of start_ts; this value would continue to be incremented by 1 hour as long as data exists for a user. If the value after adding 1 hour exceeds the current time, then the end_ts is set to the current time itself. The export + delete process continue as long as there is raw timeseries data for the user.
    
    -------
    
    But what does 1 hour’s worth of data mean?
    - In any case, purge pipeline runs upto the current time or until no more raw timeseries entries present in db for the user.
    - If Purge pipeline running for the first time for a user, then it will export and purge all the timeseries data for a user from its first entry (which can be really old data and first purge run might take a lot of time)
    - If Purge pipeline has already been run before for a user, then it will set start_ts to last_processed_ts and export data from that point.
        - If purge pipeline run hourly, then it would eventually just have a small subset of entries.
    
    -------
    
    Some points to consider:
    
    A. Numerous files; Less data quantity per file
    
    One observation is that current logic is creating multiple files in 1 hour chunks, which is okay.
    But these files don’t really have a lot of entries.
    What could be more efficient is to perhaps store more entries until a threshold say 5000 or 10000 (like batch_size in load_multi_timeline_for_range).
    If this default threshold batch size isn't reached, keep adding to the same file. Keeping updating the end_ts but start_ts would remain the same.
    
    Will attempt this next step.
    
    ------
    
    B. Right approach for Export + Purge?
    
    Approach A
    1. Export data in chunks to File
    2. Delete exported data from DB.
    3. Repeat until all data purged.
    
    Flow looks like: Export -> Delete -> Export -> Delete -> Export -> Delete
    
    ——
    
    Approach B
    1. Export data in chunks to file.
    2. Repeat until all data exported.
    3. Delete all exported data from DB.
    
    Flow looks like: Export -> Export -> Export ... -> Delete
    
    ---------------
    
    C. Do we need all 3 types of entries: locations, trips, places?
    
    For now, commented out code from export_timeseries.py.
    If we only need location entries, can simplify code further to just work for these entries.
    
    If these are sort of like subsets of each other: location -> trip -> place.
    Then I can safely just take location.
    But this is valid only if all entries contain location and hence ts values.
    If only trip entries present or only place entries, then directly choosing latest ts is incorrect since trips use enter_ts while places use start_ts
    
    Searching codebase for references and read through Shankari’s thesis to start_ts and enter_ts.
    
    I’m getting hints that start_ts and enter_ts are analysis_timeseries entries?
    In that case, can ignore these since the purge + restore is concerned with raw timeseries data only.
    
    Trip entries created in emission/analysis/intake/segmentation/trip_segmentation.py
    
    ——
    
    Hint 1: Timeseries_Sample.ipynb
    
    - ct_df fetches analysis/cleaned_trip entries -> analysis DB
    
    ------
    
    Hint 2:  bin/historical/migrations/populate_local_dt.py
    
    - Looks like old code, some changes were last made 8 years ago.
    - The collection parameter refers to some non-time series databases as seen from the function calls.
    - The entry[start_ts] or entry[‘enter_ts’] values are then used in the find query by setting data.ts to this value.
    
    ---------
    
    D. Is pipeline_states export needed?
    
    Remove pipeline_states export if not needed.
    Currently being used in existing export + load scripts.
    
    ---------
    Mahadik, Mukul Chandrakant authored and Mahadik, Mukul Chandrakant committed Aug 29, 2024
    Configuration menu
    Copy the full SHA
    ec162ad View commit details
    Browse the repository at this point in the history

Commits on Aug 30, 2024

  1. Revert "Added flag to existing extract script to allow purging"

    This reverts commit 1b6833d.
    Mahadik, Mukul Chandrakant authored and Mahadik, Mukul Chandrakant committed Aug 30, 2024
    Configuration menu
    Copy the full SHA
    852fd09 View commit details
    Browse the repository at this point in the history
  2. Added a new import_timeseries that replicates load_multi_timeline_for…

    …_range
    
    Main reason for this change was to avoid dealing with the pipeline_states file.
    
    -----
    
    Problem:
    
    Observed in the exported pipeline state tar file that last_processed_ts for PURGE stage was null.
    
    But from logs it looks like updated pipeline state inserted in the DB
    ```
    INFO:root:For stage PipelineStages.PURGE_TIMESERIES_DATA, last_ts_processed = 2015-07-23T06:40:40.069000
    
    DEBUG:root:About to save object PipelineState({'_id': ObjectId('66c79deddbf93d53d0f61184'), 'user_id': UUID('222a6ab7-94f0-4fec-a48f-0471297a9644'), 'pipeline_stage': 19, 'curr_run_ts': None, 'last_processed_ts': 1437633640.069, 'last_ts_run': 1724358120.242778})
    
    DEBUG:root:After saving state PipelineState({'_id': ObjectId('66c79deddbf93d53d0f61184'), 'user_id': UUID('222a6ab7-94f0-4fec-a48f-0471297a9644'), 'pipeline_stage': 19, 'curr_run_ts': None, 'last_processed_ts': 1437633640.069, 'last_ts_run': 1724358120.242778}), list is [{'_id': ObjectId('66c79deddbf93d53d0f61184'), 'user_id': UUID('222a6ab7-94f0-4fec-a48f-0471297a9644'), 'pipeline_stage': 19, 'curr_run_ts': None, 'last_processed_ts': 1437633640.069, 'last_ts_run': 1724358120.242778}]
    ```
    ---
    
    Checked the DB, last_processed_ts is updated in the pipeline state db.
    
    ```
    Command:
    	pipeline_states_list = list(edb.get_pipeline_state_db().find({"user_id": UUID(uuid_test)}))
           	for state in  pipeline_states_list:
    		print(state)
    
    Output:
    {'_id': ObjectId('66c79eaad2f85784cd19fd79'), 'user_id': UUID('b9f26d62-ef7b-489a-b814-154ea8e08dae'), 'pipeline_stage': 19, 'curr_run_ts': None, 'last_processed_ts': 1437633640.069, 'last_ts_run': 1724358309.836113}
    ```
    
    -----
    
    Why was it added in the original script?
    
    Commit that added it in the export PR
    e-mission@dd9ec1c
    
    These were added in Jan 2018
    Commit that added it in extract_timeline_for_day_range_and_user.py
    e-mission@b38366b
    
    Commit that added it in load_multi_timeline_for_range.py
    e-mission@06a0a4e
    
    Why do we need it?
    Shankari
    “””
    The raw data and the analysis results do not constitute the entire state of a
    pipeline. In particular, if we store only the raw + analysis results, and then
    we try to run the pipeline again, we will end up with two copies of the
    analysis results.
    “””
    
    -----
    
    Do we need it for purge PR?
    - The commit message states that pipelinestates were also exported / loaded so that we don't have duplicate analysis entries.
    - In the purge PR, we are strictly dealing with timeseries_db data.
    - Hence can remove it from the purge_restore related code.
    
    ------
    
    Something wrong with the export_pipeline_states() in purge_data then?
    - No, I was calling export_timeseries to export pipeline states inside the run_purge_pipeline function in purge_data.
    - This was running for every export file but at this point last_processed_ts isn’t updated.
    - It is only updated once the function exits and goes back to the parent function where the stage is marked as done.
    final update to the pipeline state occurs when on returning to the parent function purge_data()
    
    ```
            if pdp.last_processed_ts is None:
                logging.debug("After run, last_processed_ts == None, must be early return")
            espq.mark_purge_data_done(user_id, pdp.last_processed_ts)
    ```
    - Hence in all the pipeline state files, last_processed_ts has the value NULL.
    - Also, we get multiple pipeline state files since the function call to export the pipeline states is also within the run_purge_data_pipeline function that exports the timeseries data.
    
    Now, one approach to resolve this that I thought would work:
    - Move the export pipeline states call to the parent function after the stage has been marked as done.
    - Also if we move the export pipeline states to the parent function, only one pipelinestates file would be created; I tried using the earliest start ts and the latest end ts to name the file.
    - But this had a few problems.
    - First, it doesn’t seem right that after a stage is marked as done we are still performing some steps. I checked the existing intake pipeline stages and none of them do this.
    - Second, while it would work for full export, it would be difficult for incremental export where each export filename has a different start and end ts corresponding to multiple time ranges.
    - This is because, load_multi_timeline_for_range calls load_pipeline_states which uses the same file_prefix name as the exported timeseries filename to load the pipeline states; it just adds "pipelinestates" to the file_prefix.
    - But then this file wasn’t found by the load_multi_timeline function since it would load the data from the first time range file say start_ts_1 to end_ts_1, so on. But when it tries to load pipeline_states file with the same prefix, it doesn’t exist as we now only have only file with earliest_start_ts to latest_end_ts.
    - Hence, it gives a FileNotFoundError.
    
    ------
    Mahadik, Mukul Chandrakant authored and Mahadik, Mukul Chandrakant committed Aug 30, 2024
    Configuration menu
    Copy the full SHA
    63f7985 View commit details
    Browse the repository at this point in the history

Commits on Aug 31, 2024

  1. Reduced number of files by merging data using batch_size

    One observation is that current logic is creating multiple files, which is okay.
    But these files don’t really have a lot of entries.
    
    What could be more efficient is to perhaps store more entries until a threshold say 5000 or 10000 (like batch_size in load_multi_timeline_for_range).
    If this default threshold batch size isn't reached, keep adding to the same file.
    Keeping updating the end_ts but start_ts would remain the same.
    
    ----
    
    Found an edge case
    Incremental export is fine.
    
    Let’s say we have chosen full export.
    
    In the sample data we have 1906 entries.
    In batch testing I’m setting batch_size_limit to 500.
    
    Now, when the code executes:
    - current_end_ts will be set to initEndTs which is current time () - FUZZ time as set by the pipeline queries.
    - new_entries will have all 1906 entries which is more than the batch_size_limit
    - BOTH batch_size_limit check and current_end_ts checks will be TRUE.
    - It will export the excessive batch of more than limit and also delete entries.
    - While it seems fine, it will cause issues when we attempt to restore data whose size exceeds batch size.
    
    Hence, need a way to handle this by perhaps:
    - Setting the current_end_ts to the ts value of the entry at the batch_size_limit - 1 index.
    - Fetching entries unto this point only.
    - Then fetching the next batch of entries.
    
    Essentially, in this scenario, unlike the incremental scenario where we are incrementing current_end_ts by 3600 seconds,
    Here, we need to increment current_end_ts to the next batch size limit - 1 index entry’s ts value.
    
    --------
    
    Working on this but pending writing tests for this.
    Also, batch size still being exceeded.
    Mahadik, Mukul Chandrakant authored and Mahadik, Mukul Chandrakant committed Aug 31, 2024
    Configuration menu
    Copy the full SHA
    34ab73d View commit details
    Browse the repository at this point in the history
  2. Draft commit -> Fix full export batch size limit + Trying to fix incr…

    …emental
    
    But 1st batch has 501 entries.
    2nd has 500, 3rd has 500.
    4th has 405 entries.
    
    Understood the issue.
    
    Right now we are segregating based on time ranges as well as batch sizes.
    For incremental export, both are in play and right now, logic is getting messed up.
    
    For full export, mainly batch size is in play as end_ts would initially be set to current time.
    But if batch size exceeds limit, then we are setting end_ts to current batch size’s last entry.
    
    Now, while the run_purge_data_pipeline() is able to stop at batch size, the existing export() script is unable to do so.
    The export script just checks for the timestamps and exports everything in that range.
    Similarly, the delete function also doesn’t care about the batch size and just deletes all matching entries within the time range.
    
    A simple fix could be to try and limit the entries exported and deleted.
    
    For export, just returning 500 entries for now in export script. This works.
    
    For delete, there is no limit flag.
    Can try deleting only matching IDs
    
    -------
    
    Trying to solve for incremental export.
    
    But realized that we might not need the batch size at all.
    The batch_size default in load_multi_timeline_for_range isn't a fixed cutoff that it'll only process the limited data. It just separates the data into batches in the script itself.
    
    No need to handle in the purge export script.
    
    ----------
    
    Also, can simplify delete function in purge.
    
    -------
    
    New test added for batch size
    
    ------
    
    Just committing code here for reference.
    Mahadik, Mukul Chandrakant authored and Mahadik, Mukul Chandrakant committed Aug 31, 2024
    Configuration menu
    Copy the full SHA
    4ab627b View commit details
    Browse the repository at this point in the history
  3. Removed batch size limit

    Realized that we might not need the batch size at all.
    The batch_size default in load_multi_timeline_for_range isn't a fixed cutoff that it'll only process the limited data. It just separates the data into batches in the script itself.
    
    ------
    
    Will clean up code in next commit.
    Mahadik, Mukul Chandrakant authored and Mahadik, Mukul Chandrakant committed Aug 31, 2024
    Configuration menu
    Copy the full SHA
    02fb2ce View commit details
    Browse the repository at this point in the history
  4. Shortened core logic + Added tests to check file contents

    Will clean up and add more tests.
    
    Looks good for now.
    Need to update PR with queries now.
    Mahadik, Mukul Chandrakant authored and Mahadik, Mukul Chandrakant committed Aug 31, 2024
    Configuration menu
    Copy the full SHA
    c38b82d View commit details
    Browse the repository at this point in the history
  5. Draft commit - added print statements to Test; pending restore multip…

    …le times test
    Mahadik, Mukul Chandrakant authored and Mahadik, Mukul Chandrakant committed Aug 31, 2024
    Configuration menu
    Copy the full SHA
    6c82123 View commit details
    Browse the repository at this point in the history
  6. Revert "Shortened core logic + Added tests to check file contents"

    This reverts commit c38b82d.
    Mahadik, Mukul Chandrakant authored and Mahadik, Mukul Chandrakant committed Aug 31, 2024
    Configuration menu
    Copy the full SHA
    0a7138b View commit details
    Browse the repository at this point in the history
  7. Tests for assserting few entries after export

    This seems fine but can read in from DB before deletion occurs.
    Then compare db entries with those read in from export file.
    
    Will work on that next.
    Also pending, temp directory for tests to store generated export files.
    Mahadik, Mukul Chandrakant authored and Mahadik, Mukul Chandrakant committed Aug 31, 2024
    Configuration menu
    Copy the full SHA
    6dc72b6 View commit details
    Browse the repository at this point in the history

Commits on Sep 1, 2024

  1. Added more tests for comparing entries from db and export files

    Added tests that assert first and last few entries from db and export files.
    Comparing object IDs only for now.
    
    Also added temp directory for tests so that local directory isn't filled with export files in emission/archived
    Mahadik, Mukul Chandrakant authored and Mahadik, Mukul Chandrakant committed Sep 1, 2024
    Configuration menu
    Copy the full SHA
    23734e5 View commit details
    Browse the repository at this point in the history

Commits on Sep 16, 2024

  1. Cleaned up duplicate code, log statements + Refactored export.py

    Changes
    
    1. Fetching only loc-like entries from the existing export data logic as the raw timeseries entries.
    - Found a lot of references that trip and place entries are a part of analysis timeseries database.
    
    Almost every place I’ve found uses data.start_ts for “analysis/*” metadata key entries
    
    In bin/debug/export_participants_trips_csv.py
    ```
        ts = esta.TimeSeries.get_time_series(user_id)
        trip_time_query = estt.TimeQuery("data.start_ts", start_day_ts, end_day_ts)
        ct_df = ts.get_data_df("analysis/confirmed_trip", trip_time_query)
    
    ```
    
    ---------
    
    In bin/debug/label_stats.py
    ```
    for t in list(edb.get_analysis_timeseries_db().find({"metadata.key": "analysis/inferred_trip", "user_id": sel_uuid})):
        if t["data"]["inferred_labels"] != []:
            confirmed_trip = edb.get_analysis_timeseries_db().find_one({"user_id": t["user_id"],
                    "metadata.key": "analysis/confirmed_trip",
                    "data.start_ts": t["data"]["start_ts"]})
    ```
    
    Similarly for data.entry_ts.
    
    -----------------
    
    On the other hand for data.ts, timeseries_db was used since “background/*” metadata key entries were used:
    
    In emission/analysis/intake/segmentation/section_segmentation.py
    ```
        get_loc_for_ts = lambda time: ecwl.Location(ts.get_entry_at_ts("background/filtered_location", "data.ts", time)["data"])
        trip_start_loc = get_loc_for_ts(trip_entry.data.start_ts)
        trip_end_loc = get_loc_for_ts(trip_entry.data.end_ts)
    ```
    
    ----------------
    
    In emission/analysis/intake/segmentation/trip_segmentation.py
    ```
                untracked_start_loc = ecwe.Entry(ts.get_entry_at_ts("background/filtered_location",
                                                         "data.ts", last_place_entry.data.enter_ts)).data
    
    ```
    
    --------------------------------------
    
    2. Refactored emission/export/export.py
    
    - Added a separate function that returns exported entries so that this function can be reused in the purge pipeline code.
    - This helped to remove repeated code for re-fetching exported entries.
    
    - Also using databases parameter for exporting data from specific db. For the purge usecase, `databases` should only have 'timeseries_db'
    
    --------------------------------------
    
    3. Added raw_timeseries_only parameter to load_multi_timeline_for_range.py
    - If this argument is set, then pipeline_states will not be loaded since we don't want pipeline states to be restored during restoring raw timeseries data.
    
    --------------------------------------
    
    4. Cleaned up tests
    - Reduced repetitive code by moving assertion tests to functions that can be reused for both full and incremental export testing.
    
    --------------------------------------
    
    5. Removed export_timeseries.py and import_timeseries.py
    - No need to have duplicate code since now using existing scripts present in load_multi_timeline_for_range.py and export.py
    
    --------------------------------------
    Mahadik, Mukul Chandrakant authored and Mahadik, Mukul Chandrakant committed Sep 16, 2024
    Configuration menu
    Copy the full SHA
    4703f04 View commit details
    Browse the repository at this point in the history