Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: move core services and repositories #2793

Draft
wants to merge 21 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,037 changes: 1,036 additions & 1 deletion backend/core/poetry.lock

Large diffs are not rendered by default.

9 changes: 9 additions & 0 deletions backend/core/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,15 @@ pydantic = "^2.7.4"
langchain-core = "^0.2.10"
langchain = "^0.2.6"

[tool.poetry.group.api.dependencies]
aiofiles = "^24.1.0"
python-dotenv = "^1.0.1"
fastapi = "^0.111.0"
sqlmodel = "^0.0.19"
asyncpg = "^0.29.0"
uvicorn = "^0.30.1"
supabase = { version = "^2.5.1", optional = true }

[tool.ruff]
line-length = 88
exclude = [".git", "__pycache__", ".mypy_cache", ".pytest_cache"]
Expand Down
Empty file.
41 changes: 41 additions & 0 deletions backend/core/quivr_core/api/celery_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# celery_config.py
import os

import dotenv
from celery import Celery

dotenv.load_dotenv()

CELERY_BROKER_URL = os.getenv("CELERY_BROKER_URL", "")
CELERY_BROKER_QUEUE_NAME = os.getenv("CELERY_BROKER_QUEUE_NAME", "quivr")

celery = Celery(__name__)

if CELERY_BROKER_URL.startswith("sqs"):
broker_transport_options = {
CELERY_BROKER_QUEUE_NAME: {
"my-q": {
"url": CELERY_BROKER_URL,
}
}
}
celery = Celery(
__name__,
broker=CELERY_BROKER_URL,
task_serializer="json",
task_concurrency=4,
worker_prefetch_multiplier=1,
broker_transport_options=broker_transport_options,
)
celery.conf.task_default_queue = CELERY_BROKER_QUEUE_NAME
elif CELERY_BROKER_URL.startswith("redis"):
celery = Celery(
__name__,
broker=f"{CELERY_BROKER_URL}",
backend=f"{CELERY_BROKER_URL}",
task_concurrency=4,
worker_prefetch_multiplier=2,
task_serializer="json",
)
else:
raise ValueError(f"Unsupported broker URL: {CELERY_BROKER_URL}")
174 changes: 174 additions & 0 deletions backend/core/quivr_core/api/celery_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
import os
from tempfile import NamedTemporaryFile
from uuid import UUID

from celery.schedules import crontab

from quivr_core.api.celery_config import celery
from quivr_core.api.logger import get_logger
from quivr_core.api.models.files import File
from quivr_core.api.models.settings import get_supabase_client
from quivr_core.api.modules.brain.service.brain_service import BrainService
from quivr_core.api.modules.brain.service.brain_vector_service import BrainVectorService
from quivr_core.api.modules.notification.dto.inputs import (
NotificationUpdatableProperties,
)
from quivr_core.api.modules.notification.entity.notification import (
NotificationsStatusEnum,
)
from quivr_core.api.modules.notification.service.notification_service import (
NotificationService,
)
from quivr_core.api.packages.files.crawl.crawler import CrawlWebsite, slugify
from quivr_core.api.packages.files.parsers.github import process_github
from quivr_core.api.packages.files.processors import filter_file
from quivr_core.api.packages.utils.telemetry import maybe_send_telemetry

logger = get_logger(__name__)

notification_service = NotificationService()
brain_service = BrainService()


@celery.task(name="process_file_and_notify")
def process_file_and_notify(
file_name: str,
file_original_name: str,
brain_id,
notification_id=None,
integration=None,
delete_file=False,
):
try:
supabase_client = get_supabase_client()
tmp_name = file_name.replace("/", "_")
base_file_name = os.path.basename(file_name)
_, file_extension = os.path.splitext(base_file_name)

with NamedTemporaryFile(
suffix="_" + tmp_name, # pyright: ignore reportPrivateUsage=none
) as tmp_file:
res = supabase_client.storage.from_("quivr").download(file_name)
tmp_file.write(res)
tmp_file.flush()
file_instance = File(
file_name=base_file_name,
tmp_file_path=tmp_file.name,
bytes_content=res,
file_size=len(res),
file_extension=file_extension,
)
brain_vector_service = BrainVectorService(brain_id)
if delete_file: # TODO fix bug
brain_vector_service.delete_file_from_brain(
file_original_name, only_vectors=True
)

filter_file(
file=file_instance,
brain_id=brain_id,
original_file_name=file_original_name,
)

if notification_id:
notification_service.update_notification_by_id(
notification_id,
NotificationUpdatableProperties(
status=NotificationsStatusEnum.SUCCESS,
description="Your file has been properly uploaded!",
),
)
brain_service.update_brain_last_update_time(brain_id)

return True

except TimeoutError:
logger.error("TimeoutError")

except Exception as e:
logger.exception(e)
notification_service.update_notification_by_id(
notification_id,
NotificationUpdatableProperties(
status=NotificationsStatusEnum.ERROR,
description=f"An error occurred while processing the file: {e}",
),
)


@celery.task(name="process_crawl_and_notify")
def process_crawl_and_notify(
crawl_website_url: str,
brain_id: UUID,
notification_id=None,
):
crawl_website = CrawlWebsite(url=crawl_website_url)

if not crawl_website.checkGithub():
# Build file data
extracted_content = crawl_website.process()
extracted_content_bytes = extracted_content.encode("utf-8")
file_name = slugify(crawl_website.url) + ".txt"

with NamedTemporaryFile(
suffix="_" + file_name, # pyright: ignore reportPrivateUsage=none
) as tmp_file:
tmp_file.write(extracted_content_bytes)
tmp_file.flush()
file_instance = File(
file_name=file_name,
tmp_file_path=tmp_file.name,
bytes_content=extracted_content_bytes,
file_size=len(extracted_content),
file_extension=".txt",
)
filter_file(
file=file_instance,
brain_id=brain_id,
original_file_name=crawl_website_url,
)
notification_service.update_notification_by_id(
notification_id,
NotificationUpdatableProperties(
status=NotificationsStatusEnum.SUCCESS,
description="Your URL has been properly crawled!",
),
)
else:
process_github(
repo=crawl_website.url,
brain_id=brain_id,
)

if notification_id:
notification_service.update_notification_by_id(
notification_id,
NotificationUpdatableProperties(
status=NotificationsStatusEnum.SUCCESS,
description="Your file has been properly uploaded!",
),
)

brain_service.update_brain_last_update_time(brain_id)
return True


@celery.task
def ping_telemetry():
maybe_send_telemetry("ping", {"ping": "pong"})


celery.conf.beat_schedule = {
"ping_telemetry": {
"task": f"{__name__}.ping_telemetry",
"schedule": crontab(minute="*/30", hour="*"),
},
"process_sync_active": {
"task": "process_sync_active",
"schedule": crontab(minute="*/1", hour="*"),
},
"process_premium_users": {
"task": "check_if_is_premium_user",
"schedule": crontab(minute="*/1", hour="*"),
},
}
45 changes: 45 additions & 0 deletions backend/core/quivr_core/api/logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import logging
import os
from logging.handlers import RotatingFileHandler

from colorlog import (
ColoredFormatter,
)


def get_logger(logger_name, log_file="application.log"):
log_level = os.getenv("LOG_LEVEL", "WARNING").upper()
logger = logging.getLogger(logger_name)
logger.setLevel(log_level)
logger.propagate = False # Prevent log propagation to avoid double logging

formatter = logging.Formatter(
"[%(levelname)s] %(name)s [%(filename)s:%(lineno)d]: %(message)s"
)

color_formatter = ColoredFormatter(
"%(log_color)s[%(levelname)s]%(reset)s %(name)s [%(filename)s:%(lineno)d]: %(message)s",
log_colors={
"DEBUG": "cyan",
"INFO": "green",
"WARNING": "yellow",
"ERROR": "red",
"CRITICAL": "red,bg_white",
},
reset=True,
style="%",
)

console_handler = logging.StreamHandler()
console_handler.setFormatter(color_formatter)

file_handler = RotatingFileHandler(
log_file, maxBytes=5000000, backupCount=5
) # 5MB file
file_handler.setFormatter(formatter)

if not logger.handlers:
logger.addHandler(console_handler)
logger.addHandler(file_handler)

return logger
40 changes: 40 additions & 0 deletions backend/core/quivr_core/api/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import logging

from fastapi import FastAPI
from quivr_core.api.modules.brain.controller import brain_router
from quivr_core.api.modules.chat.controller import chat_router
from quivr_core.api.modules.knowledge.controller import knowledge_router
from quivr_core.api.modules.prompt.controller import prompt_router
from quivr_core.api.modules.upload.controller import upload_router
from quivr_core.api.modules.user.controller import user_router

logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("LiteLLM").setLevel(logging.WARNING)
logging.getLogger("litellm").setLevel(logging.WARNING)

app = FastAPI()


app.include_router(brain_router)
app.include_router(chat_router)
app.include_router(upload_router)
app.include_router(user_router)
app.include_router(prompt_router)
app.include_router(knowledge_router)


@app.get("/")
async def root():
return {"status": "OK"}


@app.get("/healthz", tags=["Health"])
async def healthz():
return {"status": "ok"}


if __name__ == "__main__":
# run main.py to debug backend
import uvicorn

uvicorn.run(app, host="0.0.0.0", port=5050, log_level="debug", access_log=False)
Empty file.
32 changes: 32 additions & 0 deletions backend/core/quivr_core/api/models/file_repository.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from uuid import UUID

from quivr_core.api.models.settings import get_supabase_client


class FileRepository:
def __init__(self):
self.db = get_supabase_client()

def set_file_vectors_ids(self, file_sha1: bytes):
response = (
self.db.table("vectors")
.select("id")
.filter("file_sha1", "eq", str(file_sha1))
.execute()
)
return response.data

def get_brain_vectors_by_brain_id_and_file_sha1(
self, brain_id: UUID, file_sha1: bytes
):
self.set_file_vectors_ids(file_sha1)
# Check if file exists in that brain
response = (
self.db.table("brains_vectors")
.select("brain_id, vector_id")
.filter("brain_id", "eq", str(brain_id))
.filter("file_sha1", "eq", str(file_sha1))
.execute()
)

return response
Loading
Loading