diff --git a/file_expiry.sh b/file_expiry.sh deleted file mode 100755 index 7747121..0000000 --- a/file_expiry.sh +++ /dev/null @@ -1,15 +0,0 @@ -#!/bin/bash - -set -e - -PYTHON=$(which python) # get python location -MAIN_SCRIPT=$(realpath "infra_file_auto_expiry/source/main.py") -STORAGE_FOLDER="" - -echo "Automatic File Expiry Tool" -echo "Removes expired files" -echo "Specify arguments folder path and the days for expiry of your files" -echo "Python location is: $PYTHON" -echo "Running $MAIN_SCRIPT" - -python $MAIN_SCRIPT "$@" \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 2a565e9..ffe491a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1 @@ -typer==0.12.3 +typer>=0.12.3 diff --git a/infra_file_auto_expiry/source/data/expiry_constants.py b/source/data/expiry_constants.py similarity index 100% rename from infra_file_auto_expiry/source/data/expiry_constants.py rename to source/data/expiry_constants.py diff --git a/infra_file_auto_expiry/source/data/tuples.py b/source/data/tuples.py similarity index 100% rename from infra_file_auto_expiry/source/data/tuples.py rename to source/data/tuples.py diff --git a/infra_file_auto_expiry/source/main.py b/source/main.py similarity index 58% rename from infra_file_auto_expiry/source/main.py rename to source/main.py index 4ce002e..efa43e5 100644 --- a/infra_file_auto_expiry/source/main.py +++ b/source/main.py @@ -4,14 +4,18 @@ app = typer.Typer() @app.command() -def collect_file_info(path: str, save_file: str = "", days_for_expire: int = 10): +def collect_file_info(path: str, save_file: str = "", days_for_expiry: int = 10): """ Collects information about the top level paths within a given folder path And dumps it into a json file, specified by the save_file flag """ scrape_time = time.time() - seconds_for_expire = int(days_for_expire) * 3600 * 24 - collect_expired_file_information(path, save_file, scrape_time, seconds_for_expire) + seconds_for_expiry = int(days_for_expiry) * 3600 * 24 + expiry_threshold = scrape_time - seconds_for_expiry + collect_expired_file_information(folder_path=path, + save_file=save_file, + scrape_time=scrape_time, + seconds_for_expiry=seconds_for_expiry) @app.command() def collect_creator_info(file_info: str, save_file: str = ""): @@ -21,7 +25,9 @@ def collect_creator_info(file_info: str, save_file: str = ""): It then dumps the new information into another json file, specified by the save_file flag """ scrape_time = time.time() - collect_creator_information(file_info, save_file, scrape_time) + collect_creator_information(path_info_file=file_info, + save_file=save_file, + scrape_time=scrape_time) if __name__ == "__main__": app() \ No newline at end of file diff --git a/infra_file_auto_expiry/source/tests/test_utils.py b/source/tests/test_utils.py similarity index 100% rename from infra_file_auto_expiry/source/tests/test_utils.py rename to source/tests/test_utils.py diff --git a/infra_file_auto_expiry/source/utils/expiry_checks.py b/source/utils/expiry_checks.py similarity index 59% rename from infra_file_auto_expiry/source/utils/expiry_checks.py rename to source/utils/expiry_checks.py index 78e7e0a..d321292 100644 --- a/infra_file_auto_expiry/source/utils/expiry_checks.py +++ b/source/utils/expiry_checks.py @@ -5,50 +5,41 @@ from data.tuples import * from utils.file_creator import * -def is_expired(path, scrape_time, seconds_for_expiry): +def is_expired(path, expiry_threshold): """ Interface function to return if a file-structure is expired or not. TODO: Provide implementation for character device files, blocks, sockets. """ path_stat = os.stat(path) if stat.S_ISREG(path_stat.st_mode): # normal file - return is_expired_filepath(path, path_stat, scrape_time, seconds_for_expiry) + return is_expired_filepath(path, path_stat, expiry_threshold) elif stat.S_ISDIR(path_stat.st_mode): # folder - return is_expired_folder(path, path_stat, scrape_time, seconds_for_expiry) + return is_expired_folder(path, path_stat, expiry_threshold) elif stat.S_ISLNK(path_stat.st_mode): # symlink - return is_expired_link(path, path_stat, scrape_time, seconds_for_expiry) + return is_expired_link(path, path_stat, expiry_threshold) elif stat.S_ISCHR(path_stat.st_mode): # character driver - return is_expired_filepath(path, path_stat, scrape_time, seconds_for_expiry) + return is_expired_filepath(path, path_stat, expiry_threshold) elif stat.S_ISBLK(path_stat.st_mode): # block - return is_expired_filepath(path, path_stat, scrape_time, seconds_for_expiry) + return is_expired_filepath(path, path_stat, expiry_threshold) elif stat.S_ISFIFO(path_stat.st_mode): # pipe - return is_expired_filepath(path, path_stat, scrape_time, seconds_for_expiry) + return is_expired_filepath(path, path_stat, expiry_threshold) elif stat.S_ISSOCK(path_stat.st_mode): # socket - return is_expired_filepath(path, path_stat, scrape_time, seconds_for_expiry) + return is_expired_filepath(path, path_stat, expiry_threshold) - -def is_expired_filepath(path, file_stat, scrape_time, seconds_for_expiry): +def is_expired_filepath(path, file_stat, expiry_threshold): """ Checks the last time a file or folder has been accessed. If it has not been accessed in the days specified, then return True. False if otherwise. - string path: The full path to the file that is being checked - int days: The amount of days since last access that indicates that a file - has expired. - - output is a tuple - output[0] = True if it is expired, false if otherwise - output[1] = tuple containing creator info (name, uid, gid) - output[2], output[3], output[4] return the days since the atime, - ctime, and mtime of the file + It will also return a tuple containing the creator name and id, along with the + file atime, ctime, and mtime """ - if os.path.islink(path): file_stat = os.lstat(path) creator = get_file_creator(path) @@ -59,63 +50,62 @@ def is_expired_filepath(path, file_stat, scrape_time, seconds_for_expiry): mtime = (file_stat.st_mtime) # If all atime, ctime, mtime are more than the expiry date limit, # then this return true, along with the other information - return expiry_tuple(check_time_stamps(atime, ctime, mtime, scrape_time, seconds_for_expiry), - {creator}, atime, ctime, mtime) - -def check_time_stamps(atime, ctime, mtime, scrape_time, seconds_for_expiry): + return expiry_tuple( + is_expired=timestamps_are_expired(atime, ctime, mtime, + expiry_threshold), + creators={creator}, + atime=atime, + ctime=ctime, + mtime=mtime) + +def timestamps_are_expired(atime, ctime, mtime, expiry_threshold): """ Checks if all atime, ctime, and mtime are expired. Returns True when all are expired. """ - return ((scrape_time - atime > seconds_for_expiry) and - (scrape_time - ctime > seconds_for_expiry) and - (scrape_time - mtime > seconds_for_expiry)) + return ((atime < expiry_threshold) and + (ctime < expiry_threshold) and + (mtime < expiry_threshold)) -def is_expired_link(path, file_stat, scrape_time, seconds_for_expiry): +def is_expired_link(path, file_stat, expiry_threshold): """ - Checks if a symlink is expired. Checks the link itself, along with the - file it points to. Returns true if both are expired. - - Output is a tuple. - output[0] = True if both are expired, false if otherwise - output[1] = tuple containing creator info (name, uid, gid) - output[2], output[3], output[4] return the days since the atime, ctime, - and mtime relating to the real path that the link points to + Checks if a symlink is expired. + It will also return a tuple containing the creator name and id, along with the + file atime, ctime, and mtime """ if not os.path.islink(path): raise Exception("Given path is not a valid link.") - #TODO: implement edge case for when the link points to a recursive directory # For now, just handle by only considering the link itself - return is_expired_filepath(path, file_stat, scrape_time, - seconds_for_expiry) + return is_expired_filepath(path=path, file_stat=file_stat, + expiry_threshold=expiry_threshold) -def is_expired_folder(folder_path, folder_stat, scrape_time, seconds_for_expiry): +def is_expired_folder(folder_path, folder_stat, expiry_threshold): """ Goes through all files in a folder. Returns true if ALL files in directory are expire. - output is a tuple - output[0] = True if it is expired, false if otherwise - output[1] = tuple containing creator info (name, uid, gid) - output[2], output[3], output[4] return the days to the most recent - atime, ctime, and mtime of any file in the entire directory + It will also return a tuple containing the creator name and id, along with the + most recent atime, ctime, and mtime """ file_creators = set() - # timestamps for the folder itself recent_atime = folder_stat.st_atime recent_ctime = folder_stat.st_ctime recent_mtime = folder_stat.st_mtime folder_creator = get_file_creator(folder_path) file_creators.add(folder_creator) - is_expired_flag = check_time_stamps(recent_atime, recent_ctime, recent_mtime, - scrape_time, seconds_for_expiry) + is_expired_flag = timestamps_are_expired(recent_atime, + recent_ctime, + recent_mtime, + expiry_threshold) if check_folder_if_known(path=folder_path): - return expiry_tuple(is_expired_flag, file_creators, recent_atime, recent_ctime, recent_mtime ) + return expiry_tuple(is_expired_flag, file_creators, recent_atime, + recent_ctime, recent_mtime ) + # Check expiry status of all files and subdirectories within the folder for member_file_name in os.listdir(folder_path): # Tracks the unique names of file creators in the directory @@ -124,7 +114,8 @@ def is_expired_folder(folder_path, folder_stat, scrape_time, seconds_for_expiry) if not os.path.exists(member_file_path) or os.path.islink(member_file_path): continue - file_expiry_information = is_expired(str(member_file_path), scrape_time, seconds_for_expiry) + file_expiry_information = is_expired(path=str(member_file_path), + expiry_threshold=expiry_threshold) if file_expiry_information.is_expired: # First val in the expiry is always the boolean true or false @@ -146,7 +137,8 @@ def is_expired_folder(folder_path, folder_stat, scrape_time, seconds_for_expiry) recent_ctime = max(recent_ctime, file_expiry_information.ctime) recent_mtime = max(recent_mtime, file_expiry_information.mtime) - return expiry_tuple(is_expired_flag, file_creators, recent_atime, recent_ctime, recent_mtime ) + return expiry_tuple(is_expired_flag, file_creators, recent_atime, + recent_ctime, recent_mtime) def check_folder_if_known(path): """ diff --git a/infra_file_auto_expiry/source/utils/file_creator.py b/source/utils/file_creator.py similarity index 79% rename from infra_file_auto_expiry/source/utils/file_creator.py rename to source/utils/file_creator.py index 6997bdf..6fe68b2 100644 --- a/infra_file_auto_expiry/source/utils/file_creator.py +++ b/source/utils/file_creator.py @@ -17,9 +17,3 @@ def get_file_creator(path): """ FIX THIS LATER""" return f"user{os.stat(path).st_uid}" return creator_tuple(username, os.stat(path).st_uid, os.stat(path).st_gid) - -def notify_file_creators(): - """ - TODO: implement proper notification system - Currently is just the code to print information to a text file - """ \ No newline at end of file diff --git a/infra_file_auto_expiry/source/utils/interface.py b/source/utils/interface.py similarity index 79% rename from infra_file_auto_expiry/source/utils/interface.py rename to source/utils/interface.py index da9cb1d..3421394 100644 --- a/infra_file_auto_expiry/source/utils/interface.py +++ b/source/utils/interface.py @@ -29,7 +29,7 @@ def notify_file_creators(): Currently is just the code to print information to a text file """ -def scan_folder_for_expired(folder_path, scrape_time, seconds_for_expiry): +def scan_folder_for_expired(folder_path, expiry_threshold): """Generator function which iterates the expired top level folders in a given directory. @@ -39,21 +39,25 @@ def scan_folder_for_expired(folder_path, scrape_time, seconds_for_expiry): """ if not os.path.isdir(folder_path) : raise Exception("Given path directory "+ folder_path) + for entry in os.scandir(folder_path): if os.path.exists(entry.path): - expiry_result = is_expired(entry.path, scrape_time, seconds_for_expiry) + expiry_result = is_expired(entry.path, expiry_threshold) print(entry.path) # path, creator tuple (name, uid, gid), atime, ctime, mtime yield entry.path, expiry_result.is_expired, expiry_result.creators, \ expiry_result.atime, expiry_result.ctime, expiry_result.mtime -def collect_expired_file_information(folder_path, save_file, scrape_time, seconds_for_expiry): +def collect_expired_file_information(folder_path, save_file, scrape_time, expiry_threshold): """ Interface function which collects which directories are 'expired' String folder_path: The folder to scan for expired files - String save_file: The jsonl file path to save the information to, ie "path_name.jsonl" - int seconds_for_expiry: The amount of days since last usage that indicates expiry + String save_file: The jsonl file path to save the information to, + ie "path_name.jsonl" + Int scrape_time: the time at the start of the information scrape + Int seconds_for_expiry: The amount of days since last usage that indicates + expiry """ if not os.path.isdir(folder_path): raise Exception("Base folder does not exist") @@ -63,7 +67,8 @@ def collect_expired_file_information(folder_path, save_file, scrape_time, second save_file = f"file_information_{str(datetime.datetime.fromtimestamp(scrape_time))}.jsonl" path_info = dict() - for path, is_expired, creators, atime, ctime, mtime in scan_folder_for_expired(folder_path, scrape_time, seconds_for_expiry): + for path, is_expired, creators, atime, ctime, mtime in scan_folder_for_expired( + folder_path, expiry_threshold): # handles generating the dictionary path_info[path] = { @@ -71,16 +76,12 @@ def collect_expired_file_information(folder_path, save_file, scrape_time, second "creators": [creator for creator in creators], "expired": is_expired, "time_variables": { - "atime_unix": atime, - "ctime_unix": ctime, - "mtime_unix": mtime, "atime_datetime": str(datetime.datetime.fromtimestamp(atime)), "ctime_datetime": str(datetime.datetime.fromtimestamp(ctime)), "mtime_datetime": str(datetime.datetime.fromtimestamp(mtime)), }} write_jsonl_information(path_info, save_file, scrape_time) - return save_file def write_jsonl_information(dict_info, file_path, scrape_time): current_time = time.time() @@ -100,7 +101,14 @@ def collect_creator_information(path_info_file, save_file, scrape_time): Must be given the return value of form similar to the output of collect_expired_file_information() - String save_file: The jsonl file path to save the information to, ie "path_name.jsonl" + String path_info_file: A jsonl file path containing information about a + certain path. This should be the result of calling the collect_file_information + function. + + String save_file: The jsonl file path to save the information to, + ie "path_name.jsonl" + + Int scrape_time: The time at the start of the information scrape. """ if not os.path.exists(path_info_file): raise Exception("Given file for path information does not exist") @@ -118,29 +126,17 @@ def collect_creator_information(path_info_file, save_file, scrape_time): path_data = json.loads(line) # check if the path is expired if path_data["expired"]: - print("woo") # take all unique creators and make a new dictionary about them for user in path_data["creators"]: + time_vars = path_data["time_variables"] if user[1] in creator_info: - time_vars = path_data["time_variables"] creator_info[user[1]]["paths"][path_data["path"]] = time_vars - creator_info[user[1]]["recent_time_days"] = min([ - time_vars["atime"], - time_vars["ctime"], - time_vars["mtime"], - creator_info[user[1]]["recent_time_days"] - ]) / SECS_PER_DAY + else: creator_info[user[1]] = { "paths": {path_data["path"]: time_vars}, "name": user[0], "uid": user[1], - "gid": user[2], - "recent_time_days": min([ - time_vars["atime"], - time_vars["ctime"], - time_vars["mtime"] - ]) / SECS_PER_DAY} + "gid": user[2]} write_jsonl_information(creator_info, save_file, scrape_time) - return save_file