Skip to content

Commit

Permalink
Merge pull request microsoft#40 from prateejain-linked/query_cli_kusto2
Browse files Browse the repository at this point in the history
fix kusto cli
  • Loading branch information
gbarroutlook authored Sep 9, 2024
2 parents beb0285 + 6a13d55 commit dc4a1fb
Showing 1 changed file with 77 additions and 54 deletions.
131 changes: 77 additions & 54 deletions graphrag/query/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,85 +164,108 @@ def run_local_search(
data_dir, root_dir, config_dir
)

# TODO: loading stage here must be only limited to default lancedb.

# for the POC purpose input artifacts blob, output artifacts blob and input query blob storage are going to same.
if(config.storage.type == StorageType.memory):
ValueError("Memory storage is not supported")

vector_store_args = (
config.embeddings.vector_store if config.embeddings.vector_store else {}
)

reporter.info(f"Vector Store Args: {vector_store_args}")
vector_store_type = vector_store_args.get("type", VectorStoreType.LanceDB)

entities=[]
text_units=[]
covariates=[]
reports=[]
final_relationships=[]

if(config.storage.type == StorageType.blob):
if(config.storage.container_name is not None):
input_storage_client: PipelineStorage = BlobPipelineStorage(connection_string=config.storage.connection_string, container_name=config.storage.container_name, storage_account_blob_url=config.storage.storage_account_blob_url)
output_storage_client: PipelineStorage = BlobPipelineStorage(connection_string=config.storage.connection_string, container_name=config.storage.container_name, storage_account_blob_url=config.storage.storage_account_blob_url)
output_storage_client: PipelineStorage = BlobPipelineStorage(connection_string=config.storage.connection_string,
container_name=config.storage.container_name,
storage_account_blob_url=config.storage.storage_account_blob_url)
else:
ValueError("Storage type is Blob but container name is invalid")
if(config.storage.type == StorageType.file):
input_storage_client: PipelineStorage = FilePipelineStorage(config.root_dir)
elif(config.storage.type == StorageType.file):
output_storage_client: PipelineStorage = FilePipelineStorage(config.root_dir)

data_paths = []
data_paths = get_files_by_contextid(config, context_id)
final_nodes = pd.DataFrame()
final_community_reports = pd.DataFrame()
final_text_units = pd.DataFrame()
final_relationships = pd.DataFrame()
final_entities = pd.DataFrame()
final_covariates = pd.DataFrame()
if config.graphdb.enabled:
graph_db_client = GraphDBClient(config.graphdb,context_id)
for data_path in data_paths:
#check from the config for the ouptut storage type and then read the data from the storage.

#GraphDB: we may need to make change below to read nodes data from Graph DB
final_nodes = pd.concat([final_nodes, read_paraquet_file(input_storage_client, data_path + "/create_final_nodes.parquet")])
final_community_reports = pd.concat([final_community_reports,read_paraquet_file(input_storage_client, data_path + "/create_final_community_reports.parquet")]) # KustoDB: Final_entities, Final_Nodes, Final_report should be merged and inserted to kusto
final_text_units = pd.concat([final_text_units, read_paraquet_file(input_storage_client, data_path + "/create_final_text_units.parquet")]) # lance db search need it for embedding mapping. we have embeddings in entities we should use from there. KustoDB already must have sorted it.

if not optimized_search:
final_covariates = pd.concat([final_covariates, read_paraquet_file(input_storage_client, data_path + "/create_final_covariates.parquet")])
##### LEGACY #######################

if vector_store_type == VectorStoreType.LanceDB:
# for the POC purpose input artifacts blob, output artifacts blob and input query blob storage are going to same.
if(config.storage.type == StorageType.memory):
ValueError("Memory storage is not supported")
if(config.storage.type == StorageType.blob):
if(config.storage.container_name is not None):
input_storage_client: PipelineStorage = BlobPipelineStorage(connection_string=config.storage.connection_string,
container_name=config.storage.container_name,
storage_account_blob_url=config.storage.storage_account_blob_url)
else:
ValueError("Storage type is Blob but container name is invalid")
if(config.storage.type == StorageType.file):
input_storage_client: PipelineStorage = FilePipelineStorage(config.root_dir)


data_paths = []
data_paths = get_files_by_contextid(config, context_id)
final_nodes = pd.DataFrame()
final_community_reports = pd.DataFrame()
final_text_units = pd.DataFrame()
final_relationships = pd.DataFrame()
final_entities = pd.DataFrame()
final_covariates = pd.DataFrame()

for data_path in data_paths:
#check from the config for the ouptut storage type and then read the data from the storage.

#GraphDB: we may need to make change below to read nodes data from Graph DB
final_nodes = pd.concat([final_nodes, read_paraquet_file(input_storage_client, data_path + "/create_final_nodes.parquet")])
final_community_reports = pd.concat([final_community_reports,read_paraquet_file(input_storage_client, data_path + "/create_final_community_reports.parquet")]) # KustoDB: Final_entities, Final_Nodes, Final_report should be merged and inserted to kusto
final_text_units = pd.concat([final_text_units, read_paraquet_file(input_storage_client, data_path + "/create_final_text_units.parquet")]) # lance db search need it for embedding mapping. we have embeddings in entities we should use from there. KustoDB already must have sorted it.
final_relationships = pd.concat([final_text_units,read_paraquet_file(input_storage_client, data_path + "/create_final_relationships.parquet")])

if not optimized_search:
final_covariates = pd.concat([final_covariates, read_paraquet_file(input_storage_client, data_path + "/create_final_covariates.parquet")])

if config.graphdb.enabled:
final_entities = pd.concat([final_entities, graph_db_client.query_vertices(context_id)])
graph_db_client._client.close()
else:
final_entities = pd.concat([final_entities, read_paraquet_file(input_storage_client, data_path + "/create_final_entities.parquet")])

if config.graphdb.enabled:
graph_db_client._client.close()
vector_store_args = (
config.embeddings.vector_store if config.embeddings.vector_store else {}
)
############# End of for loop

reporter.info(f"Vector Store Args: {vector_store_args}")
vector_store_type = vector_store_args.get("type", VectorStoreType.LanceDB)
entities = read_indexer_entities(final_nodes, final_entities, community_level) # KustoDB: read Final nodes data and entities data and merge it.
reports=read_indexer_reports(
final_community_reports, final_nodes, community_level
)

covariates = (
read_indexer_covariates(final_covariates)
if final_covariates.empty is False
else []
)
text_units=read_indexer_text_units(final_text_units)

elif not use_kusto_community_reports:
print("\n\n[!] WARNING: Passing empty reports.\n\n")

########################################################################################

entities = read_indexer_entities(final_nodes, final_entities, community_level) # KustoDB: read Final nodes data and entities data and merge it.
reports=read_indexer_reports(
final_community_reports, final_nodes, community_level
)
description_embedding_store = __get_embedding_description_store(
entities=entities,
vector_store_type=vector_store_type,
config_args=vector_store_args,
context_id=context_id,
)

covariates = (
read_indexer_covariates(final_covariates)
if final_covariates.empty is False
else []
)

if(isinstance(description_embedding_store, KustoVectorStore)):
entities = []
if use_kusto_community_reports:
reports = []

'''
*** If KUSTO is enabled, both entities and final_relationships must be empty.
'''
search_engine = get_local_search_engine(
config,
reports=reports,
text_units=read_indexer_text_units(final_text_units),
text_units=text_units,
entities=entities,
relationships=[],
relationships=final_relationships,
covariates={"claims": covariates},
description_embedding_store=description_embedding_store,
response_type=response_type,
Expand Down

0 comments on commit dc4a1fb

Please sign in to comment.