Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MANUAL file expiry deletion script #1

Merged
merged 40 commits into from
Jun 16, 2024
Merged
Show file tree
Hide file tree
Changes from 39 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
96c5c1f
Base folder generation and expiry date checker
cehune Feb 17, 2024
32375ae
Implemented file deletion - tested locally
cehune Feb 17, 2024
b9c9a30
Implemented unit tests for file expiry tool
cehune Feb 17, 2024
576867c
Cleaned up starting code
cehune Feb 17, 2024
f0a3298
Updating days for expiry
cehune Feb 17, 2024
2f777d9
Creating Base script for file deletion
cehune Feb 17, 2024
22209a5
Updating to only delete top level folders
cehune Feb 18, 2024
0fe7230
Separated File Deletion Function between searching and deleting
cehune Feb 18, 2024
2eb6340
Update dependencies
cehune Feb 18, 2024
b6a7fba
Updated changes, only notifys top level folders but checks all files
cehune Feb 21, 2024
f6b58c4
Fixed Notifications
cehune Feb 21, 2024
b35fee4
Remove pycache
cehune Feb 21, 2024
848f266
Updated Utils
cehune Feb 29, 2024
503ce9c
Updated utils with temporary test functions
cehune Mar 4, 2024
fb5d0ef
Updated Utils with test functions
cehune Mar 4, 2024
ad75a9a
updating utils with testing functions
cehune Mar 4, 2024
e4fa991
Manually Tested
cehune Mar 21, 2024
8d5646f
Updated function documentation
cehune Mar 21, 2024
fba50a5
Implemented accumulation of informaiton and updated unit tests
cehune Mar 29, 2024
069049b
Updating Comments
cehune Mar 29, 2024
d02c9e1
Update README
cehune Mar 29, 2024
c18cd1e
Remove INIT files
cehune Mar 29, 2024
ed7af0f
Upating file information dictionary
cehune Apr 2, 2024
d5cee78
Update main.py
cehune Apr 2, 2024
cfd08db
Implemented Typer application
cehune Apr 9, 2024
2deb4fc
Cleaning and fixing unit tests
cehune May 1, 2024
bb31cda
Adding YML for testing CI Pipeline
cehune May 1, 2024
deb64f9
Removing Unecessary Files
cehune May 1, 2024
3119581
Updating YML Workflow CI file for Typos
cehune May 1, 2024
7cbd8d6
Adding Requirements.txt
cehune May 1, 2024
6f68ffd
Removing Hard coded paths
cehune May 1, 2024
8b750c2
Updated Unit tests
cehune May 2, 2024
94e1aff
Updating dependency names in unit test functions
cehune May 2, 2024
53aae90
Split functions into seperate files
cehune May 27, 2024
f7d767f
update pipeline yml
cehune May 27, 2024
cd25e5f
Cleaning Directory
cehune May 27, 2024
e48f9ac
Optimizing imports for main.py
cehune May 27, 2024
9189aef
Fixing unit tests
cehune May 27, 2024
c5f5350
Fixing unit test file paths for pipeline yml
cehune May 27, 2024
133b942
Updating .gitignore and README
cehune Jun 5, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions .github/workflows/unit_tests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
name: File Auto Expiry Unit Tests
on: [pull_request]

jobs:
tests:
runs-on: ubuntu-latest

strategy:
matrix:
python-version: ["3.9", "3.10", "3.11"]

steps:
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}

- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt

- name: Test with pytest
run: |
pip install pytest pytest-cov
pytest source/tests/test_utils.py --doctest-modules --junitxml=junit/test-results.xml --cov=com --cov-report=xml --cov-report=html


2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
/venv
/infra_file_auto_expiry/source/__pycache__
cehune marked this conversation as resolved.
Show resolved Hide resolved
15 changes: 14 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1 +1,14 @@
# infra_file_auto_expiry
# infra_file_auto_expiry

Relating to issue: https://github.com/WATonomous/infra-config/issues/1143
cehune marked this conversation as resolved.
Show resolved Hide resolved

This project is meant to help automatically expire and delete files. It's currently at the stage of gathering all necessary information about file deletion easier. In the future, it is required to add a notification system for users whose files are to be deleted, and an actual deletion system. C
cehune marked this conversation as resolved.
Show resolved Hide resolved

Currently it moves through every single top level folder in a directory, and checks whether it is expired or not. This means that every single file in that directory tree must be expired. As it does this, it gathers all the users who created files in that directory, and the days since the most RECENT atime, ctime, and mtime of ANY file in that directory. It only collects these for folders which have been confirmed to be expired.

To collect the expiry information of all top level directories in a given path:
sudo $(which python3) /path_to_directory/infra_file_auto_expiry/infra_file_auto_expiry/source/main.py collect-file-info path_to_check_expiry_of

This will return a jsonl file. You can then use this in the following command to tabulate all expired paths that are associated with a particular user.

sudo $(which python3) /path_to_directory/infra_file_auto_expiry/infra_file_auto_expiry/source/main.py collect-creator-info path_to_jsonl_file
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
typer>=0.12.3
17 changes: 17 additions & 0 deletions source/data/expiry_constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
KNOWN_DIRECTORIES = {
cehune marked this conversation as resolved.
Show resolved Hide resolved
"ros/humble",

"lib/python3.10",
"lib/python3.9",
"lib/python3.8",
"lib/python3.7",

"lib64/python3.10",
"lib64/python3.9",
"lib64/python3.8",
"lib64/python3.7",

"test_dir/one"
}

SECS_PER_DAY = 86400
4 changes: 4 additions & 0 deletions source/data/tuples.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from collections import namedtuple

expiry_tuple = namedtuple("file_tuple", "is_expired, creators, atime, ctime, mtime")
creator_tuple = namedtuple("creator_tuple", "username, uid, gid")
34 changes: 34 additions & 0 deletions source/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from utils.interface import *
from data.expiry_constants import SECS_PER_DAY
import time
import typer
app = typer.Typer()

@app.command()
def collect_file_info(path: str, save_file: str = "", days_for_expiry: int = 10):
"""
Collects information about the top level paths within a given folder path
And dumps it into a json file, specified by the save_file flag
"""
scrape_time = time.time()
seconds_for_expiry = int(days_for_expiry) * SECS_PER_DAY
expiry_threshold = scrape_time - seconds_for_expiry
collect_expired_file_information(folder_path=path,
save_file=save_file,
scrape_time=scrape_time,
expiry_threshold=expiry_threshold)

@app.command()
def collect_creator_info(file_info: str, save_file: str = ""):
"""
Tabulates the paths that relate to specific users, based on a given jsonl path
That jsonl path should be the result of calling the collect_file_info function
It then dumps the new information into another json file, specified by the save_file flag
"""
scrape_time = time.time()
collect_creator_information(path_info_file=file_info,
save_file=save_file,
scrape_time=scrape_time)

if __name__ == "__main__":
app()
94 changes: 94 additions & 0 deletions source/tests/test_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import unittest
import os
import sys
from unittest.mock import MagicMock, patch
module_path = os.path.dirname(
os.path.dirname(os.path.abspath(__file__))
)
sys.path.append(module_path)

from utils.interface import *
from utils.expiry_checks import *

class TestUtils(unittest.TestCase):
@patch("pwd.getpwuid")
@patch("os.stat")
def test_get_file_creator(self, patch_stat, patch_pwd):
"""
Tests retrieving the user name of a file owner
"""
# Successfully retrieves file owner
patch_stat.return_value.st_uid=5111
patch_stat.return_value.st_gid=1555
patch_pwd.return_value.pw_name="tester_account"

file_creator = get_file_creator("/home/machung/test.txt")
self.assertEqual(file_creator[0], "tester_account")
self.assertEqual(file_creator[1], 5111)
self.assertEqual(file_creator[2], 1555)

@patch('os.stat')
def test_is_expired_filepath(self, patch_stat):
"""
Tests the is_expired_file function
"""
time_for_expiry = 30 # 30 days
patch_stat.st_atime = 5 # 5 days
patch_stat.st_ctime = 5 # 5 days
patch_stat.st_mtime = 5 # 5 days
scrape_time = 50 # 50 days
expiry_threshold = scrape_time - time_for_expiry

# Days since last access is 5 < 20
# The file should be expired
self.assertTrue(is_expired_filepath("test_name.txt", patch_stat, expiry_threshold)[0])

expiry_threshold = -20 # change to 10 days
# Days since last access is 5 > -20
# The file should not be expired
expiry_test_result = is_expired_filepath("test_name.txt", patch_stat, expiry_threshold)
self.assertFalse(expiry_test_result[0])
self.assertTrue(5, expiry_test_result[2])
self.assertTrue(5, expiry_test_result[3])
self.assertTrue(5, expiry_test_result[4])

@patch('os.listdir')
@patch("os.stat")
@patch("utils.expiry_checks.is_expired")
def test_is_expired_folder(self, patch_expired, patch_stat, patch_path):
"""
Tests the is_expired_folder function. This should return
True (is_expired) if all subdirectories and files are also expired.

The values of atime, ctime, and mtime should be the largest timestamps
seen from the entire folder tree. This indicates the most recent timestamp.
In the test we just simulate those timestamps by using smaller integers.
"""
mocked_file_expiry_results_1 = MagicMock()
mocked_file_expiry_results_2 = MagicMock()

mocked_file_expiry_results_1.configure_mock(
is_expired = True, creators = ("a", 0, 0), atime = 1000,
ctime = 2000, mtime = 10000)
# atime, ctime, mtime = 5, 7, and 10 days respectively

mocked_file_expiry_results_2.configure_mock(
is_expired = False, creators = ("b", 1, 1), atime = 2000,
ctime = 6000 , mtime = 5000)
# atime, ctime, mtime = 7, 6, and 15 days respectively

patch_expired.side_effect = [mocked_file_expiry_results_1,
mocked_file_expiry_results_2]
patch_path.return_value = ["one.txt", "two.txt"]

# atime, ctime, mtime for the folder itself is 5 days for all
patch_stat.st_atime = patch_stat.st_ctime = patch_stat.st_mtime = 3000

res = is_expired_folder("test_path", patch_stat, 0)
self.assertEqual(False, res[0])
self.assertEqual(3000 , res[2])
self.assertEqual(6000 , res[3])
self.assertEqual(10000 , res[4])

if __name__ == '__main__':
unittest.main()
160 changes: 160 additions & 0 deletions source/utils/expiry_checks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
import os
import stat
from data.expiry_constants import *
from data.expiry_constants import KNOWN_DIRECTORIES
from data.tuples import *
from utils.file_creator import *

def is_expired(path, expiry_threshold):
""" Interface function to return if a file-structure is expired or not.
TODO: Provide implementation for character device files, blocks, sockets.
"""

path_stat = os.stat(path)
if stat.S_ISREG(path_stat.st_mode): # normal file
return is_expired_filepath(path, path_stat, expiry_threshold)

elif stat.S_ISDIR(path_stat.st_mode): # folder
return is_expired_folder(path, path_stat, expiry_threshold)

elif stat.S_ISLNK(path_stat.st_mode): # symlink
return is_expired_link(path, path_stat, expiry_threshold)

elif stat.S_ISCHR(path_stat.st_mode): # character driver
return is_expired_filepath(path, path_stat, expiry_threshold)

elif stat.S_ISBLK(path_stat.st_mode): # block
return is_expired_filepath(path, path_stat, expiry_threshold)

elif stat.S_ISFIFO(path_stat.st_mode): # pipe
return is_expired_filepath(path, path_stat, expiry_threshold)

elif stat.S_ISSOCK(path_stat.st_mode): # socket
return is_expired_filepath(path, path_stat, expiry_threshold)

def is_expired_filepath(path, file_stat, expiry_threshold):
"""
Checks the last time a file or folder has been accessed. If it has not
been accessed in the days specified, then return True. False if otherwise.

It will also return a tuple containing the creator name and id, along with the
file atime, ctime, and mtime
"""
if os.path.islink(path):
file_stat = os.lstat(path)
creator = get_file_creator(path)

# collect days since last atime, ctime, and mtime of each file
atime = (file_stat.st_atime)
ctime = (file_stat.st_ctime)
mtime = (file_stat.st_mtime)
# If all atime, ctime, mtime are more than the expiry date limit,
# then this return true, along with the other information
return expiry_tuple(
is_expired=timestamps_are_expired(atime, ctime, mtime,
expiry_threshold),
creators={creator},
atime=atime,
ctime=ctime,
mtime=mtime)

def timestamps_are_expired(atime, ctime, mtime, expiry_threshold):
"""
Checks if all atime, ctime, and mtime are expired.
Returns True when all are expired.
"""
return ((atime < expiry_threshold) and
(ctime < expiry_threshold) and
(mtime < expiry_threshold))

def is_expired_link(path, file_stat, expiry_threshold):
"""
Checks if a symlink is expired.
It will also return a tuple containing the creator name and id, along with the
file atime, ctime, and mtime
"""
if not os.path.islink(path):
raise Exception("Given path is not a valid link.")

#TODO: implement edge case for when the link points to a recursive directory
# For now, just handle by only considering the link itself
return is_expired_filepath(path=path, file_stat=file_stat,
expiry_threshold=expiry_threshold)


def is_expired_folder(folder_path, folder_stat, expiry_threshold):
"""
Goes through all files in a folder. Returns true if ALL files in directory
are expire.

It will also return a tuple containing the creator name and id, along with the
most recent atime, ctime, and mtime
"""
file_creators = set()
# timestamps for the folder itself
recent_atime = folder_stat.st_atime
recent_ctime = folder_stat.st_ctime
recent_mtime = folder_stat.st_mtime
folder_creator = get_file_creator(folder_path)
file_creators.add(folder_creator)
is_expired_flag = timestamps_are_expired(recent_atime,
recent_ctime,
recent_mtime,
expiry_threshold)

if check_folder_if_known(path=folder_path):
return expiry_tuple(is_expired_flag, file_creators, recent_atime,
recent_ctime, recent_mtime )

# Check expiry status of all files and subdirectories within the folder
for member_file_name in os.listdir(folder_path):
# Tracks the unique names of file creators in the directory
member_file_path = os.path.join(folder_path, member_file_name)

if not os.path.exists(member_file_path) or os.path.islink(member_file_path):
continue

file_expiry_information = is_expired(path=str(member_file_path),
expiry_threshold=expiry_threshold)

if file_expiry_information.is_expired:
# First val in the expiry is always the boolean true or false
is_expired_flag = False

creators = file_expiry_information.creators # collects tuple of (name, uid, gid)
# If file_expiry_information is from a folder, it should already contain a set
# with the information of file creators
if isinstance(creators, set):
for user in creators:
file_creators.add(user)
# if file_expiry_information is from a file, and the creator is not
# already in the set, then they're information is added.
else:
file_creators.add(creators)

# update atime, ctime, mtime
recent_atime = max(recent_atime, file_expiry_information.atime)
recent_ctime = max(recent_ctime, file_expiry_information.ctime)
recent_mtime = max(recent_mtime, file_expiry_information.mtime)

return expiry_tuple(is_expired_flag, file_creators, recent_atime,
recent_ctime, recent_mtime)

def check_folder_if_known(path):
"""
Checks if a folder path is within a known set of directories
that are large and typically non-edited by users.
"""
base_name = os.path.basename(path)
parent_path_name = os.path.basename(os.path.dirname(path))
if f"{parent_path_name}/{base_name}" in KNOWN_DIRECTORIES:
return True

def catch_link_issues(path):
"""
Returns True if a link leads to a link or a directory
"""
if os.path.islink(path):
real_path = os.path.realpath(path)
if os.path.islink(real_path) or os.path.isdir(real_path):
return True
19 changes: 19 additions & 0 deletions source/utils/file_creator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import os
import pwd
from data.tuples import *

def get_file_creator(path):
"""
Returns a tuple including the file creator username,
their UID, and GID in that order respectively.

string file_path: The absolute path of the file
"""
# Get the UID of the file or directory owner
# Get the username associated with the UID
try:
username = pwd.getpwuid(os.stat(path).st_uid).pw_name
except KeyError:
""" FIX THIS LATER"""
return f"user{os.stat(path).st_uid}"
return creator_tuple(username, os.stat(path).st_uid, os.stat(path).st_gid)
Loading