-
Notifications
You must be signed in to change notification settings - Fork 0
/
app.py
66 lines (55 loc) · 2.16 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# app.py
import streamlit as st
from init_collection import get_client, create_collection
# can separate into multiple functions for cleaner code.
from unstructured.partition.pdf import partition_pdf
# Connect to Weaviate
client = get_client()
create_collection(client) # Ensure the collection is created
def insert_chunks_from_file(pdf_path):
partitions = partition_pdf(pdf_path)
chunks = [str(part) for part in partitions]
data_objects = [{"content": chunk, "source": pdf_path} for chunk in chunks]
for data_object in data_objects:
client.batch.add_data_object(data_object, "Document")
# Flush the batch to ensure all data is written
client.batch.flush()
# def perform_search(query, prompt):
# response = client.query.get("Document", ["content", "source"]) \
# .with_near_text({"concepts": [query]}) \
# .with_additional("generate(single_prompt: $prompt)") \
# .with_limit(2) \
# .do()
# return response
#Performing search using Weaviate generative search. Can use single task too.
def perform_search(query, prompt):
collection_of_docs = client.collections.get("Document")
response = collection_of_docs.generate.near_text(
query=query,
grouped_task=prompt,
limit=2
)
return response
# Load environment variables
openai_key = st.secrets["default"]["OPENAI_API_KEY"]
wcd_api_key = st.secrets["default"]["WCD_API_KEY"]
wcd_url = st.secrets["default"]["WCD_URL"]
st.header("Załaduj i przetwórz PDF")
uploaded_file = st.file_uploader("Wybierz plik PDF", type="pdf")
if uploaded_file:
save_path = f"/tmp/{uploaded_file.name}"
with open(save_path, "wb") as f:
f.write(uploaded_file.getbuffer())
insert_chunks_from_file(save_path)
client.close()
st.success(f"Inserted chunks from {uploaded_file.name}")
st.header("Przeszukaj dokument PDF")
query = st.text_input("Wpisz swoje zapytanie")
prompt = st.text_input("Wpisz prompt, któgo poszukujesz")
if st.button("Szukaj"):
if query and prompt:
search_results = perform_search(query, prompt)
st.write("Wyniki wyszukiwania:")
st.write(search_results.generated)
else:
st.error("Prosze podać zapytanie oraz promt.")