From ee61ebcf1e26e1bde6c5c38e3aa705de269bae73 Mon Sep 17 00:00:00 2001 From: Ben Zhang Date: Mon, 14 Oct 2024 22:05:00 -0700 Subject: [PATCH] Implement file TTL (#4) This PR implements a TTL for each file uploaded, configurable via the URL parameter `ttl_s`. Resolves #2 --- server/src/main.py | 176 +++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 161 insertions(+), 15 deletions(-) diff --git a/server/src/main.py b/server/src/main.py index 0fe3a23..4115416 100644 --- a/server/src/main.py +++ b/server/src/main.py @@ -22,6 +22,11 @@ set_up_logging() +TTL_FILENAME = "ttl.json" +DEFAULT_TTL_S = 7200 + +FILENAME_BLACKLIST = [TTL_FILENAME] + @app.command() def init_cvmfs_repo( repo_name: Annotated[str, typer.Argument(help="Name of the CVMFS repo. CVMFS requires this to be an FQDN.")], @@ -110,8 +115,8 @@ async def fastapi_lifespan(app: FastAPI): """ try: scheduler.start() - # Run garbage collection every minute - scheduler.add_job(gc, CronTrigger.from_crontab("* * * * *")) + # Run housekeeping every minute + scheduler.add_job(housekeeping, CronTrigger.from_crontab("* * * * *")) yield finally: scheduler.shutdown() @@ -121,8 +126,11 @@ async def fastapi_lifespan(app: FastAPI): transaction_lock = Lock() @fastapi_app.post("/repos/{repo_name}") -async def upload(repo_name: str, file: UploadFile, overwrite: bool = False): - logger.info(f"Uploading file: {file.filename} (content_type: {file.content_type})") +async def upload(repo_name: str, file: UploadFile, overwrite: bool = False, ttl_s: int = DEFAULT_TTL_S): + logger.info(f"Uploading file: {file.filename} (content_type: {file.content_type}, ttl_s: {ttl_s}) to repo: {repo_name}") + + if file.filename in FILENAME_BLACKLIST: + raise HTTPException(status_code=400, detail=f"Filename {file.filename} is not allowed") # check if repo exists if not Path(f"/cvmfs/{repo_name}").exists(): @@ -132,6 +140,9 @@ async def upload(repo_name: str, file: UploadFile, overwrite: bool = False): if not overwrite and file_path.exists(): raise HTTPException(status_code=409, detail=f"File {file.filename} already exists") + expires_at = time.time() + ttl_s + ttl_path = Path(f"/cvmfs/{repo_name}/{TTL_FILENAME}") + with transaction_lock: # start transaction subprocess.run(["cvmfs_server", "transaction", repo_name], check=True) @@ -147,6 +158,11 @@ async def upload(repo_name: str, file: UploadFile, overwrite: bool = False): f.write(await file.read()) upload_end = time.perf_counter() + # Update TTL + ttl_obj = json.loads(ttl_path.read_text()) if ttl_path.exists() else {} + ttl_obj[file.filename] = {"expires_at": expires_at} + ttl_path.write_text(json.dumps(ttl_obj)) + logger.info(f"Uploaded file: {file.filename} (content_type: {file.content_type}). Took {upload_end - upload_start:.2f}s") except Exception as e: logger.error(f"Failed to upload file: {file.filename} (content_type: {file.content_type})") @@ -160,12 +176,59 @@ async def upload(repo_name: str, file: UploadFile, overwrite: bool = False): subprocess.run(["cvmfs_server", "publish", repo_name], check=True) publish_end = time.perf_counter() - logger.info(f"Published transaction for repo: {repo_name} with file: {file.filename} (content_type: {file.content_type}). Took {publish_end - publish_start:.2f}s") + logger.info(f"Published transaction for repo: {repo_name} with file: {file.filename} (content_type: {file.content_type}). Took {publish_end - publish_start:.2f}s. Expires at: {expires_at}") notify(repo_name) - return {"filename": file.filename, "content_type": file.content_type, "upload_time_s": upload_end - upload_start, "publish_time_s": publish_end - publish_start} + return { + "filename": file.filename, + "content_type": file.content_type, + "expires_at": expires_at, + "upload_time_s": upload_end - upload_start, + "publish_time_s": publish_end - publish_start, + } + +@app.command() +@fastapi_app.post("/repos/{repo_name}/{file_name}/ttl") +async def update_ttl(repo_name: str, file_name: str, ttl_s: int): + logger.info(f"Updating TTL for file: {file_name} in repo: {repo_name}") + + if file_name in FILENAME_BLACKLIST: + raise HTTPException(status_code=400, detail=f"Filename {file_name} is not allowed") + + file_path = Path(f"/cvmfs/{repo_name}/{file_name}") + if not file_path.exists(): + raise HTTPException(status_code=404, detail=f"File {file_name} does not exist in repo {repo_name}") + + ttl_path = Path(f"/cvmfs/{repo_name}/{TTL_FILENAME}") + + with transaction_lock: + # start transaction + subprocess.run(["cvmfs_server", "transaction", repo_name], check=True) + + try: + # Update TTL + ttl_obj = json.loads(ttl_path.read_text()) + ttl_obj[file_name] = {"expires_at": time.time() + ttl_s} + ttl_path.write_text(json.dumps(ttl_obj)) + + logger.info(f"Updated TTL for file: {file_name} in repo: {repo_name}") + except Exception as e: + logger.error(f"Failed to update TTL for file: {file_name} in repo: {repo_name}") + logger.exception(e) + # abort transaction + subprocess.run(["cvmfs_server", "abort", repo_name, "-f"], check=True) + raise HTTPException(status_code=500, detail=f"Failed to update TTL for file: {file_name}: {e}") + + # publish transaction + subprocess.run(["cvmfs_server", "publish", repo_name], check=True) + notify(repo_name) + + return {"filename": file_name, "ttl_s": ttl_s} + + +@app.command() @fastapi_app.get("/repos/{repo_name}/{file_name}") async def download(repo_name: str, file_name: str): logger.info(f"Downloading file: {file_name} from repo: {repo_name}") @@ -176,6 +239,7 @@ async def download(repo_name: str, file_name: str): return FileResponse(file_path) +@app.command() @fastapi_app.get("/repos/{repo_name}") async def list_files(repo_name: str): logger.info(f"Listing files in repo: {repo_name}") @@ -186,14 +250,20 @@ async def list_files(repo_name: str): return {"files": [file.name for file in repo_path.iterdir() if file.is_file()]} +@app.command() @fastapi_app.delete("/repos/{repo_name}/{file_name}") async def delete_file(repo_name: str, file_name: str): logger.info(f"Deleting file: {file_name} from repo: {repo_name}") + if file_name in FILENAME_BLACKLIST: + raise HTTPException(status_code=400, detail=f"Filename {file_name} is not allowed") + file_path = Path(f"/cvmfs/{repo_name}/{file_name}") if not file_path.exists(): raise HTTPException(status_code=404, detail=f"File {file_name} does not exist in repo {repo_name}") + ttl_path = Path(f"/cvmfs/{repo_name}/{TTL_FILENAME}") + with transaction_lock: # start transaction subprocess.run(["cvmfs_server", "transaction", repo_name], check=True) @@ -201,6 +271,12 @@ async def delete_file(repo_name: str, file_name: str): try: # Remove file file_path.unlink() + + # Update TTL + ttl_obj = json.loads(ttl_path.read_text()) + del ttl_obj[file_name] + ttl_path.write_text(json.dumps(ttl_obj)) + logger.info(f"Deleted file: {file_name} from repo: {repo_name}") except Exception as e: logger.error(f"Failed to delete file: {file_name} from repo: {repo_name}") @@ -216,19 +292,56 @@ async def delete_file(repo_name: str, file_name: str): return {"filename": file_name} @app.command() -@fastapi_app.post("/gc") -def gc(): +@fastapi_app.post("/repos/{repo_name}/clean") +def clean(repo_name: str): """ - Perform garbage collection on all repos. + Clean up expired files in the repo. """ + logger.info(f"Cleaning up expired files in repo: {repo_name}") + + ttl_path = Path(f"/cvmfs/{repo_name}/{TTL_FILENAME}") + if not ttl_path.exists(): + logger.info(f"No TTL file found in repo: {repo_name}. Skipping clean up.") + return {"message": "No TTL file found. Skipping clean up."} + + cleaned = 0 + errors = 0 + with transaction_lock: - logger.info("Running garbage collection") - gc_start = time.perf_counter() - subprocess.run(["cvmfs_server", "gc", "-r", "0", "-f"], check=True) - gc_end = time.perf_counter() - logger.info(f"Garbage collection completed. Took {gc_end - gc_start:.2f}s") - return {"message": "Garbage collection completed", "gc_time_s": gc_end - gc_start} + # start transaction + subprocess.run(["cvmfs_server", "transaction", repo_name], check=True) + + try: + ttl_obj = json.loads(ttl_path.read_text()) + for file_name, ttl in ttl_obj.copy().items(): + if ttl["expires_at"] < time.time(): + file_path = Path(f"/cvmfs/{repo_name}/{file_name}") + if file_path.exists(): + file_path.unlink() + cleaned += 1 + else: + logger.warning(f"Trying to clean up non-existent file: {file_name} in repo: {repo_name}") + errors += 1 + del ttl_obj[file_name] + + ttl_path.write_text(json.dumps(ttl_obj)) + + logger.info(f"Cleaned up expired files in repo: {repo_name}") + except Exception as e: + logger.error(f"Failed to clean up expired files in repo: {repo_name}") + logger.exception(e) + # abort transaction + subprocess.run(["cvmfs_server", "abort", repo_name, "-f"], check=True) + raise HTTPException(status_code=500, detail=f"Failed to clean up expired files: {e}") + + # publish transaction + subprocess.run(["cvmfs_server", "publish", repo_name], check=True) + notify(repo_name) + msg = f"Cleaned up {cleaned} expired files in repo: {repo_name}. Errors: {errors}" + logger.info(msg) + + return {"message": msg} @app.command() @fastapi_app.post("/repos/{repo_name}/notify") @@ -260,6 +373,39 @@ def notify(repo_name: str): return {"message": f"Notified clients about changes in repo {repo_name}"} +@app.command() +@fastapi_app.post("/gc") +def gc(): + """ + Perform garbage collection on all repos. + """ + with transaction_lock: + logger.info("Running garbage collection") + gc_start = time.perf_counter() + subprocess.run(["cvmfs_server", "gc", "-r", "0", "-f"], check=True) + gc_end = time.perf_counter() + + logger.info(f"Garbage collection completed. Took {gc_end - gc_start:.2f}s") + return {"message": "Garbage collection completed", "gc_time_s": gc_end - gc_start} + +@app.command() +@fastapi_app.post("/housekeeping") +def housekeeping(): + """ + Clean all repos and perform garbage collection. + """ + logger.info("Running housekeeping") + housekeeping_start = time.perf_counter() + for repo_path in Path("/cvmfs").iterdir(): + if repo_path.is_dir(): + repo_name = repo_path.name + clean(repo_name) + gc() + housekeeping_end = time.perf_counter() + + logger.info(f"Housekeeping completed. Took {housekeeping_end - housekeeping_start:.2f}s") + return {"message": "Housekeeping completed", "housekeeping_time_s": housekeeping_end - housekeeping_start} + @app.command() def start_server(port: int = 81): uvicorn.run(fastapi_app, host="0.0.0.0", port=port)