Skip to content

Commit

Permalink
Implement file TTL (#4)
Browse files Browse the repository at this point in the history
This PR implements a TTL for each file uploaded, configurable via the
URL parameter `ttl_s`.

Resolves #2
  • Loading branch information
ben-z authored Oct 15, 2024
1 parent 98f236a commit ee61ebc
Showing 1 changed file with 161 additions and 15 deletions.
176 changes: 161 additions & 15 deletions server/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@

set_up_logging()

TTL_FILENAME = "ttl.json"
DEFAULT_TTL_S = 7200

FILENAME_BLACKLIST = [TTL_FILENAME]

@app.command()
def init_cvmfs_repo(
repo_name: Annotated[str, typer.Argument(help="Name of the CVMFS repo. CVMFS requires this to be an FQDN.")],
Expand Down Expand Up @@ -110,8 +115,8 @@ async def fastapi_lifespan(app: FastAPI):
"""
try:
scheduler.start()
# Run garbage collection every minute
scheduler.add_job(gc, CronTrigger.from_crontab("* * * * *"))
# Run housekeeping every minute
scheduler.add_job(housekeeping, CronTrigger.from_crontab("* * * * *"))
yield
finally:
scheduler.shutdown()
Expand All @@ -121,8 +126,11 @@ async def fastapi_lifespan(app: FastAPI):
transaction_lock = Lock()

@fastapi_app.post("/repos/{repo_name}")
async def upload(repo_name: str, file: UploadFile, overwrite: bool = False):
logger.info(f"Uploading file: {file.filename} (content_type: {file.content_type})")
async def upload(repo_name: str, file: UploadFile, overwrite: bool = False, ttl_s: int = DEFAULT_TTL_S):
logger.info(f"Uploading file: {file.filename} (content_type: {file.content_type}, ttl_s: {ttl_s}) to repo: {repo_name}")

if file.filename in FILENAME_BLACKLIST:
raise HTTPException(status_code=400, detail=f"Filename {file.filename} is not allowed")

# check if repo exists
if not Path(f"/cvmfs/{repo_name}").exists():
Expand All @@ -132,6 +140,9 @@ async def upload(repo_name: str, file: UploadFile, overwrite: bool = False):
if not overwrite and file_path.exists():
raise HTTPException(status_code=409, detail=f"File {file.filename} already exists")

expires_at = time.time() + ttl_s
ttl_path = Path(f"/cvmfs/{repo_name}/{TTL_FILENAME}")

with transaction_lock:
# start transaction
subprocess.run(["cvmfs_server", "transaction", repo_name], check=True)
Expand All @@ -147,6 +158,11 @@ async def upload(repo_name: str, file: UploadFile, overwrite: bool = False):
f.write(await file.read())
upload_end = time.perf_counter()

# Update TTL
ttl_obj = json.loads(ttl_path.read_text()) if ttl_path.exists() else {}
ttl_obj[file.filename] = {"expires_at": expires_at}
ttl_path.write_text(json.dumps(ttl_obj))

logger.info(f"Uploaded file: {file.filename} (content_type: {file.content_type}). Took {upload_end - upload_start:.2f}s")
except Exception as e:
logger.error(f"Failed to upload file: {file.filename} (content_type: {file.content_type})")
Expand All @@ -160,12 +176,59 @@ async def upload(repo_name: str, file: UploadFile, overwrite: bool = False):
subprocess.run(["cvmfs_server", "publish", repo_name], check=True)
publish_end = time.perf_counter()

logger.info(f"Published transaction for repo: {repo_name} with file: {file.filename} (content_type: {file.content_type}). Took {publish_end - publish_start:.2f}s")
logger.info(f"Published transaction for repo: {repo_name} with file: {file.filename} (content_type: {file.content_type}). Took {publish_end - publish_start:.2f}s. Expires at: {expires_at}")

notify(repo_name)

return {"filename": file.filename, "content_type": file.content_type, "upload_time_s": upload_end - upload_start, "publish_time_s": publish_end - publish_start}
return {
"filename": file.filename,
"content_type": file.content_type,
"expires_at": expires_at,
"upload_time_s": upload_end - upload_start,
"publish_time_s": publish_end - publish_start,
}


@app.command()
@fastapi_app.post("/repos/{repo_name}/{file_name}/ttl")
async def update_ttl(repo_name: str, file_name: str, ttl_s: int):
logger.info(f"Updating TTL for file: {file_name} in repo: {repo_name}")

if file_name in FILENAME_BLACKLIST:
raise HTTPException(status_code=400, detail=f"Filename {file_name} is not allowed")

file_path = Path(f"/cvmfs/{repo_name}/{file_name}")
if not file_path.exists():
raise HTTPException(status_code=404, detail=f"File {file_name} does not exist in repo {repo_name}")

ttl_path = Path(f"/cvmfs/{repo_name}/{TTL_FILENAME}")

with transaction_lock:
# start transaction
subprocess.run(["cvmfs_server", "transaction", repo_name], check=True)

try:
# Update TTL
ttl_obj = json.loads(ttl_path.read_text())
ttl_obj[file_name] = {"expires_at": time.time() + ttl_s}
ttl_path.write_text(json.dumps(ttl_obj))

logger.info(f"Updated TTL for file: {file_name} in repo: {repo_name}")
except Exception as e:
logger.error(f"Failed to update TTL for file: {file_name} in repo: {repo_name}")
logger.exception(e)
# abort transaction
subprocess.run(["cvmfs_server", "abort", repo_name, "-f"], check=True)
raise HTTPException(status_code=500, detail=f"Failed to update TTL for file: {file_name}: {e}")

# publish transaction
subprocess.run(["cvmfs_server", "publish", repo_name], check=True)
notify(repo_name)

return {"filename": file_name, "ttl_s": ttl_s}


@app.command()
@fastapi_app.get("/repos/{repo_name}/{file_name}")
async def download(repo_name: str, file_name: str):
logger.info(f"Downloading file: {file_name} from repo: {repo_name}")
Expand All @@ -176,6 +239,7 @@ async def download(repo_name: str, file_name: str):

return FileResponse(file_path)

@app.command()
@fastapi_app.get("/repos/{repo_name}")
async def list_files(repo_name: str):
logger.info(f"Listing files in repo: {repo_name}")
Expand All @@ -186,21 +250,33 @@ async def list_files(repo_name: str):

return {"files": [file.name for file in repo_path.iterdir() if file.is_file()]}

@app.command()
@fastapi_app.delete("/repos/{repo_name}/{file_name}")
async def delete_file(repo_name: str, file_name: str):
logger.info(f"Deleting file: {file_name} from repo: {repo_name}")

if file_name in FILENAME_BLACKLIST:
raise HTTPException(status_code=400, detail=f"Filename {file_name} is not allowed")

file_path = Path(f"/cvmfs/{repo_name}/{file_name}")
if not file_path.exists():
raise HTTPException(status_code=404, detail=f"File {file_name} does not exist in repo {repo_name}")

ttl_path = Path(f"/cvmfs/{repo_name}/{TTL_FILENAME}")

with transaction_lock:
# start transaction
subprocess.run(["cvmfs_server", "transaction", repo_name], check=True)

try:
# Remove file
file_path.unlink()

# Update TTL
ttl_obj = json.loads(ttl_path.read_text())
del ttl_obj[file_name]
ttl_path.write_text(json.dumps(ttl_obj))

logger.info(f"Deleted file: {file_name} from repo: {repo_name}")
except Exception as e:
logger.error(f"Failed to delete file: {file_name} from repo: {repo_name}")
Expand All @@ -216,19 +292,56 @@ async def delete_file(repo_name: str, file_name: str):
return {"filename": file_name}

@app.command()
@fastapi_app.post("/gc")
def gc():
@fastapi_app.post("/repos/{repo_name}/clean")
def clean(repo_name: str):
"""
Perform garbage collection on all repos.
Clean up expired files in the repo.
"""
logger.info(f"Cleaning up expired files in repo: {repo_name}")

ttl_path = Path(f"/cvmfs/{repo_name}/{TTL_FILENAME}")
if not ttl_path.exists():
logger.info(f"No TTL file found in repo: {repo_name}. Skipping clean up.")
return {"message": "No TTL file found. Skipping clean up."}

cleaned = 0
errors = 0

with transaction_lock:
logger.info("Running garbage collection")
gc_start = time.perf_counter()
subprocess.run(["cvmfs_server", "gc", "-r", "0", "-f"], check=True)
gc_end = time.perf_counter()
logger.info(f"Garbage collection completed. Took {gc_end - gc_start:.2f}s")
return {"message": "Garbage collection completed", "gc_time_s": gc_end - gc_start}
# start transaction
subprocess.run(["cvmfs_server", "transaction", repo_name], check=True)

try:
ttl_obj = json.loads(ttl_path.read_text())
for file_name, ttl in ttl_obj.copy().items():
if ttl["expires_at"] < time.time():
file_path = Path(f"/cvmfs/{repo_name}/{file_name}")
if file_path.exists():
file_path.unlink()
cleaned += 1
else:
logger.warning(f"Trying to clean up non-existent file: {file_name} in repo: {repo_name}")
errors += 1
del ttl_obj[file_name]

ttl_path.write_text(json.dumps(ttl_obj))

logger.info(f"Cleaned up expired files in repo: {repo_name}")
except Exception as e:
logger.error(f"Failed to clean up expired files in repo: {repo_name}")
logger.exception(e)
# abort transaction
subprocess.run(["cvmfs_server", "abort", repo_name, "-f"], check=True)
raise HTTPException(status_code=500, detail=f"Failed to clean up expired files: {e}")

# publish transaction
subprocess.run(["cvmfs_server", "publish", repo_name], check=True)
notify(repo_name)

msg = f"Cleaned up {cleaned} expired files in repo: {repo_name}. Errors: {errors}"
logger.info(msg)

return {"message": msg}

@app.command()
@fastapi_app.post("/repos/{repo_name}/notify")
Expand Down Expand Up @@ -260,6 +373,39 @@ def notify(repo_name: str):
return {"message": f"Notified clients about changes in repo {repo_name}"}


@app.command()
@fastapi_app.post("/gc")
def gc():
"""
Perform garbage collection on all repos.
"""
with transaction_lock:
logger.info("Running garbage collection")
gc_start = time.perf_counter()
subprocess.run(["cvmfs_server", "gc", "-r", "0", "-f"], check=True)
gc_end = time.perf_counter()

logger.info(f"Garbage collection completed. Took {gc_end - gc_start:.2f}s")
return {"message": "Garbage collection completed", "gc_time_s": gc_end - gc_start}

@app.command()
@fastapi_app.post("/housekeeping")
def housekeeping():
"""
Clean all repos and perform garbage collection.
"""
logger.info("Running housekeeping")
housekeeping_start = time.perf_counter()
for repo_path in Path("/cvmfs").iterdir():
if repo_path.is_dir():
repo_name = repo_path.name
clean(repo_name)
gc()
housekeeping_end = time.perf_counter()

logger.info(f"Housekeeping completed. Took {housekeeping_end - housekeeping_start:.2f}s")
return {"message": "Housekeeping completed", "housekeeping_time_s": housekeeping_end - housekeeping_start}

@app.command()
def start_server(port: int = 81):
uvicorn.run(fastapi_app, host="0.0.0.0", port=port)
Expand Down

0 comments on commit ee61ebc

Please sign in to comment.