Skip to content

Commit

Permalink
Cleaning Directory
Browse files Browse the repository at this point in the history
  • Loading branch information
cehune committed May 27, 2024
1 parent f7d767f commit cd25e5f
Show file tree
Hide file tree
Showing 9 changed files with 76 additions and 103 deletions.
15 changes: 0 additions & 15 deletions file_expiry.sh

This file was deleted.

2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
typer==0.12.3
typer>=0.12.3
File renamed without changes.
File renamed without changes.
14 changes: 10 additions & 4 deletions infra_file_auto_expiry/source/main.py → source/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,18 @@
app = typer.Typer()

@app.command()
def collect_file_info(path: str, save_file: str = "", days_for_expire: int = 10):
def collect_file_info(path: str, save_file: str = "", days_for_expiry: int = 10):
"""
Collects information about the top level paths within a given folder path
And dumps it into a json file, specified by the save_file flag
"""
scrape_time = time.time()
seconds_for_expire = int(days_for_expire) * 3600 * 24
collect_expired_file_information(path, save_file, scrape_time, seconds_for_expire)
seconds_for_expiry = int(days_for_expiry) * 3600 * 24
expiry_threshold = scrape_time - seconds_for_expiry
collect_expired_file_information(folder_path=path,
save_file=save_file,
scrape_time=scrape_time,
seconds_for_expiry=seconds_for_expiry)

@app.command()
def collect_creator_info(file_info: str, save_file: str = ""):
Expand All @@ -21,7 +25,9 @@ def collect_creator_info(file_info: str, save_file: str = ""):
It then dumps the new information into another json file, specified by the save_file flag
"""
scrape_time = time.time()
collect_creator_information(file_info, save_file, scrape_time)
collect_creator_information(path_info_file=file_info,
save_file=save_file,
scrape_time=scrape_time)

if __name__ == "__main__":
app()
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -5,50 +5,41 @@
from data.tuples import *
from utils.file_creator import *

def is_expired(path, scrape_time, seconds_for_expiry):
def is_expired(path, expiry_threshold):
""" Interface function to return if a file-structure is expired or not.
TODO: Provide implementation for character device files, blocks, sockets.
"""

path_stat = os.stat(path)
if stat.S_ISREG(path_stat.st_mode): # normal file
return is_expired_filepath(path, path_stat, scrape_time, seconds_for_expiry)
return is_expired_filepath(path, path_stat, expiry_threshold)

elif stat.S_ISDIR(path_stat.st_mode): # folder
return is_expired_folder(path, path_stat, scrape_time, seconds_for_expiry)
return is_expired_folder(path, path_stat, expiry_threshold)

elif stat.S_ISLNK(path_stat.st_mode): # symlink
return is_expired_link(path, path_stat, scrape_time, seconds_for_expiry)
return is_expired_link(path, path_stat, expiry_threshold)

elif stat.S_ISCHR(path_stat.st_mode): # character driver
return is_expired_filepath(path, path_stat, scrape_time, seconds_for_expiry)
return is_expired_filepath(path, path_stat, expiry_threshold)

elif stat.S_ISBLK(path_stat.st_mode): # block
return is_expired_filepath(path, path_stat, scrape_time, seconds_for_expiry)
return is_expired_filepath(path, path_stat, expiry_threshold)

elif stat.S_ISFIFO(path_stat.st_mode): # pipe
return is_expired_filepath(path, path_stat, scrape_time, seconds_for_expiry)
return is_expired_filepath(path, path_stat, expiry_threshold)

elif stat.S_ISSOCK(path_stat.st_mode): # socket
return is_expired_filepath(path, path_stat, scrape_time, seconds_for_expiry)
return is_expired_filepath(path, path_stat, expiry_threshold)


def is_expired_filepath(path, file_stat, scrape_time, seconds_for_expiry):
def is_expired_filepath(path, file_stat, expiry_threshold):
"""
Checks the last time a file or folder has been accessed. If it has not
been accessed in the days specified, then return True. False if otherwise.
string path: The full path to the file that is being checked
int days: The amount of days since last access that indicates that a file
has expired.
output is a tuple
output[0] = True if it is expired, false if otherwise
output[1] = tuple containing creator info (name, uid, gid)
output[2], output[3], output[4] return the days since the atime,
ctime, and mtime of the file
It will also return a tuple containing the creator name and id, along with the
file atime, ctime, and mtime
"""

if os.path.islink(path):
file_stat = os.lstat(path)
creator = get_file_creator(path)
Expand All @@ -59,63 +50,62 @@ def is_expired_filepath(path, file_stat, scrape_time, seconds_for_expiry):
mtime = (file_stat.st_mtime)
# If all atime, ctime, mtime are more than the expiry date limit,
# then this return true, along with the other information
return expiry_tuple(check_time_stamps(atime, ctime, mtime, scrape_time, seconds_for_expiry),
{creator}, atime, ctime, mtime)

def check_time_stamps(atime, ctime, mtime, scrape_time, seconds_for_expiry):
return expiry_tuple(
is_expired=timestamps_are_expired(atime, ctime, mtime,
expiry_threshold),
creators={creator},
atime=atime,
ctime=ctime,
mtime=mtime)

def timestamps_are_expired(atime, ctime, mtime, expiry_threshold):
"""
Checks if all atime, ctime, and mtime are expired.
Returns True when all are expired.
"""
return ((scrape_time - atime > seconds_for_expiry) and
(scrape_time - ctime > seconds_for_expiry) and
(scrape_time - mtime > seconds_for_expiry))
return ((atime < expiry_threshold) and
(ctime < expiry_threshold) and
(mtime < expiry_threshold))

def is_expired_link(path, file_stat, scrape_time, seconds_for_expiry):
def is_expired_link(path, file_stat, expiry_threshold):
"""
Checks if a symlink is expired. Checks the link itself, along with the
file it points to. Returns true if both are expired.
Output is a tuple.
output[0] = True if both are expired, false if otherwise
output[1] = tuple containing creator info (name, uid, gid)
output[2], output[3], output[4] return the days since the atime, ctime,
and mtime relating to the real path that the link points to
Checks if a symlink is expired.
It will also return a tuple containing the creator name and id, along with the
file atime, ctime, and mtime
"""
if not os.path.islink(path):
raise Exception("Given path is not a valid link.")


#TODO: implement edge case for when the link points to a recursive directory
# For now, just handle by only considering the link itself
return is_expired_filepath(path, file_stat, scrape_time,
seconds_for_expiry)
return is_expired_filepath(path=path, file_stat=file_stat,
expiry_threshold=expiry_threshold)


def is_expired_folder(folder_path, folder_stat, scrape_time, seconds_for_expiry):
def is_expired_folder(folder_path, folder_stat, expiry_threshold):
"""
Goes through all files in a folder. Returns true if ALL files in directory
are expire.
output is a tuple
output[0] = True if it is expired, false if otherwise
output[1] = tuple containing creator info (name, uid, gid)
output[2], output[3], output[4] return the days to the most recent
atime, ctime, and mtime of any file in the entire directory
It will also return a tuple containing the creator name and id, along with the
most recent atime, ctime, and mtime
"""
file_creators = set()

# timestamps for the folder itself
recent_atime = folder_stat.st_atime
recent_ctime = folder_stat.st_ctime
recent_mtime = folder_stat.st_mtime
folder_creator = get_file_creator(folder_path)
file_creators.add(folder_creator)
is_expired_flag = check_time_stamps(recent_atime, recent_ctime, recent_mtime,
scrape_time, seconds_for_expiry)
is_expired_flag = timestamps_are_expired(recent_atime,
recent_ctime,
recent_mtime,
expiry_threshold)

if check_folder_if_known(path=folder_path):
return expiry_tuple(is_expired_flag, file_creators, recent_atime, recent_ctime, recent_mtime )
return expiry_tuple(is_expired_flag, file_creators, recent_atime,
recent_ctime, recent_mtime )

# Check expiry status of all files and subdirectories within the folder
for member_file_name in os.listdir(folder_path):
# Tracks the unique names of file creators in the directory
Expand All @@ -124,7 +114,8 @@ def is_expired_folder(folder_path, folder_stat, scrape_time, seconds_for_expiry)
if not os.path.exists(member_file_path) or os.path.islink(member_file_path):
continue

file_expiry_information = is_expired(str(member_file_path), scrape_time, seconds_for_expiry)
file_expiry_information = is_expired(path=str(member_file_path),
expiry_threshold=expiry_threshold)

if file_expiry_information.is_expired:
# First val in the expiry is always the boolean true or false
Expand All @@ -146,7 +137,8 @@ def is_expired_folder(folder_path, folder_stat, scrape_time, seconds_for_expiry)
recent_ctime = max(recent_ctime, file_expiry_information.ctime)
recent_mtime = max(recent_mtime, file_expiry_information.mtime)

return expiry_tuple(is_expired_flag, file_creators, recent_atime, recent_ctime, recent_mtime )
return expiry_tuple(is_expired_flag, file_creators, recent_atime,
recent_ctime, recent_mtime)

def check_folder_if_known(path):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,3 @@ def get_file_creator(path):
""" FIX THIS LATER"""
return f"user{os.stat(path).st_uid}"
return creator_tuple(username, os.stat(path).st_uid, os.stat(path).st_gid)

def notify_file_creators():
"""
TODO: implement proper notification system
Currently is just the code to print information to a text file
"""
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def notify_file_creators():
Currently is just the code to print information to a text file
"""

def scan_folder_for_expired(folder_path, scrape_time, seconds_for_expiry):
def scan_folder_for_expired(folder_path, expiry_threshold):
"""Generator function which iterates the expired top level folders
in a given directory.
Expand All @@ -39,21 +39,25 @@ def scan_folder_for_expired(folder_path, scrape_time, seconds_for_expiry):
"""
if not os.path.isdir(folder_path) :
raise Exception("Given path directory "+ folder_path)

for entry in os.scandir(folder_path):
if os.path.exists(entry.path):
expiry_result = is_expired(entry.path, scrape_time, seconds_for_expiry)
expiry_result = is_expired(entry.path, expiry_threshold)
print(entry.path)
# path, creator tuple (name, uid, gid), atime, ctime, mtime
yield entry.path, expiry_result.is_expired, expiry_result.creators, \
expiry_result.atime, expiry_result.ctime, expiry_result.mtime

def collect_expired_file_information(folder_path, save_file, scrape_time, seconds_for_expiry):
def collect_expired_file_information(folder_path, save_file, scrape_time, expiry_threshold):
"""
Interface function which collects which directories are 'expired'
String folder_path: The folder to scan for expired files
String save_file: The jsonl file path to save the information to, ie "path_name.jsonl"
int seconds_for_expiry: The amount of days since last usage that indicates expiry
String save_file: The jsonl file path to save the information to,
ie "path_name.jsonl"
Int scrape_time: the time at the start of the information scrape
Int seconds_for_expiry: The amount of days since last usage that indicates
expiry
"""
if not os.path.isdir(folder_path):
raise Exception("Base folder does not exist")
Expand All @@ -63,24 +67,21 @@ def collect_expired_file_information(folder_path, save_file, scrape_time, second
save_file = f"file_information_{str(datetime.datetime.fromtimestamp(scrape_time))}.jsonl"

path_info = dict()
for path, is_expired, creators, atime, ctime, mtime in scan_folder_for_expired(folder_path, scrape_time, seconds_for_expiry):
for path, is_expired, creators, atime, ctime, mtime in scan_folder_for_expired(
folder_path, expiry_threshold):
# handles generating the dictionary

path_info[path] = {
"path": path, # storing pathname so we keep it when we transfer the dictionary to jsonl
"creators": [creator for creator in creators],
"expired": is_expired,
"time_variables": {
"atime_unix": atime,
"ctime_unix": ctime,
"mtime_unix": mtime,
"atime_datetime": str(datetime.datetime.fromtimestamp(atime)),
"ctime_datetime": str(datetime.datetime.fromtimestamp(ctime)),
"mtime_datetime": str(datetime.datetime.fromtimestamp(mtime)),
}}

write_jsonl_information(path_info, save_file, scrape_time)
return save_file

def write_jsonl_information(dict_info, file_path, scrape_time):
current_time = time.time()
Expand All @@ -100,7 +101,14 @@ def collect_creator_information(path_info_file, save_file, scrape_time):
Must be given the return value of form similar to the output of
collect_expired_file_information()
String save_file: The jsonl file path to save the information to, ie "path_name.jsonl"
String path_info_file: A jsonl file path containing information about a
certain path. This should be the result of calling the collect_file_information
function.
String save_file: The jsonl file path to save the information to,
ie "path_name.jsonl"
Int scrape_time: The time at the start of the information scrape.
"""
if not os.path.exists(path_info_file):
raise Exception("Given file for path information does not exist")
Expand All @@ -118,29 +126,17 @@ def collect_creator_information(path_info_file, save_file, scrape_time):
path_data = json.loads(line)
# check if the path is expired
if path_data["expired"]:
print("woo")
# take all unique creators and make a new dictionary about them
for user in path_data["creators"]:
time_vars = path_data["time_variables"]
if user[1] in creator_info:
time_vars = path_data["time_variables"]
creator_info[user[1]]["paths"][path_data["path"]] = time_vars
creator_info[user[1]]["recent_time_days"] = min([
time_vars["atime"],
time_vars["ctime"],
time_vars["mtime"],
creator_info[user[1]]["recent_time_days"]
]) / SECS_PER_DAY

else:
creator_info[user[1]] = {
"paths": {path_data["path"]: time_vars},
"name": user[0],
"uid": user[1],
"gid": user[2],
"recent_time_days": min([
time_vars["atime"],
time_vars["ctime"],
time_vars["mtime"]
]) / SECS_PER_DAY}
"gid": user[2]}

write_jsonl_information(creator_info, save_file, scrape_time)
return save_file

0 comments on commit cd25e5f

Please sign in to comment.