Skip to content

Commit

Permalink
Integrate with ingestor and queryexecutor; add optional searchpipelin…
Browse files Browse the repository at this point in the history
…e on searchIndex()

Signed-off-by: Tyler Ohlsen <[email protected]>
  • Loading branch information
ohltyler committed Apr 22, 2024
1 parent e4b21df commit 75d8c8f
Show file tree
Hide file tree
Showing 8 changed files with 113 additions and 25 deletions.
2 changes: 2 additions & 0 deletions public/pages/workflow_detail/prototype/ingestor.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ function getDocGeneratorFn(workflow: Workflow): DocGeneratorFn {
switch (workflow.use_case) {
case USE_CASE.SEMANTIC_SEARCH:
case USE_CASE.NEURAL_SPARSE_SEARCH:
case USE_CASE.HYBRID_SEARCH:
default: {
fn = () => generateNeuralSearchDoc;
}
Expand All @@ -198,6 +199,7 @@ function getWorkflowValues(workflow: Workflow): WorkflowValues {
switch (workflow.use_case) {
case USE_CASE.SEMANTIC_SEARCH:
case USE_CASE.NEURAL_SPARSE_SEARCH:
case USE_CASE.HYBRID_SEARCH:
default: {
values = getNeuralSearchValues(workflow);
}
Expand Down
58 changes: 55 additions & 3 deletions public/pages/workflow_detail/prototype/query_executor.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import {
import { searchIndex, useAppDispatch } from '../../../store';
import { getCore } from '../../../services';
import {
HybridSearchValues,
NeuralSparseValues,
SemanticSearchValues,
WorkflowValues,
Expand Down Expand Up @@ -83,7 +84,13 @@ export function QueryExecutor(props: QueryExecutorProps) {

//
function onExecuteSearch() {
dispatch(searchIndex({ index: indexName, body: queryObj }))
dispatch(
searchIndex({
index: indexName,
body: queryObj,
searchPipeline: workflowValues?.searchPipelineId,
})
)
.unwrap()
.then(async (result) => {
setResultHits(result.hits.hits);
Expand Down Expand Up @@ -187,9 +194,16 @@ function getQueryGeneratorFn(workflow: Workflow): QueryGeneratorFn {
fn = () => generateSemanticSearchQuery;
break;
}
case USE_CASE.NEURAL_SPARSE_SEARCH:
default: {
case USE_CASE.NEURAL_SPARSE_SEARCH: {
fn = () => generateNeuralSparseQuery;
break;
}
case USE_CASE.HYBRID_SEARCH: {
fn = () => generateHybridSearchQuery;
break;
}
default: {
fn = () => () => {};
}
}
return fn;
Expand All @@ -200,6 +214,8 @@ function getWorkflowValues(workflow: Workflow): WorkflowValues {
let values;
switch (workflow.use_case) {
case USE_CASE.SEMANTIC_SEARCH:
case USE_CASE.NEURAL_SPARSE_SEARCH:
case USE_CASE.HYBRID_SEARCH:
default: {
values = getNeuralSearchValues(workflow);
}
Expand Down Expand Up @@ -251,6 +267,42 @@ function generateNeuralSparseQuery(
};
}

// utility fn to generate a hybrid search query
function generateHybridSearchQuery(
queryText: string,
workflowValues: HybridSearchValues
): {} {
return {
// TODO: can make this configurable
_source: {
excludes: [`${workflowValues.vectorField}`],
},
query: {
hybrid: {
queries: [
{
match: {
[workflowValues.inputField]: {
query: queryText,
},
},
},
{
neural: {
[workflowValues.vectorField]: {
query_text: queryText,
model_id: workflowValues.modelId,
// TODO: expose k as configurable
k: 5,
},
},
},
],
},
},
};
}

function processHits(hits: any[]): {}[] {
return hits.map((hit) => hit._source);
}
4 changes: 3 additions & 1 deletion public/pages/workflow_detail/prototype/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,7 @@ export type SemanticSearchValues = WorkflowValues & {
inputField: string;
vectorField: string;
};

export type NeuralSparseValues = SemanticSearchValues;
export type HybridSearchValues = SemanticSearchValues & {
searchPipelineId: string;
};
16 changes: 14 additions & 2 deletions public/pages/workflow_detail/utils/data_extractor_utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import {
WORKFLOW_RESOURCE_TYPE,
WorkflowResource,
NODE_CATEGORY,
WORKFLOW_STEP_TYPE,
} from '../../../../common';
import { getNodesAndEdgesUnderParent } from './workflow_to_template_utils';

Expand All @@ -38,15 +39,26 @@ export function getIndexName(workflow: Workflow): string | undefined {
// persist the same values to use during ingest and search, so we keep the naming general
export function getNeuralSearchValues(
workflow: Workflow
): { modelId: string; inputField: string; vectorField: string } {
): {
modelId: string;
inputField: string;
vectorField: string;
searchPipelineId?: string;
} {
const modelId = getModelId(workflow) as string;
const transformerComponent = getTransformerComponent(
workflow
) as ReactFlowComponent;
const { inputField, vectorField } = componentDataToFormik(
transformerComponent.data
) as { inputField: string; vectorField: string };
return { modelId, inputField, vectorField };

const searchPipelineId = workflow.resourcesCreated?.find(
(resource) =>
resource.stepType === WORKFLOW_STEP_TYPE.CREATE_SEARCH_PIPELINE_STEP_TYPE
)?.id;

return { modelId, inputField, vectorField, searchPipelineId };
}

function getFormValues(workflow: Workflow): WorkspaceFormValues | undefined {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,7 @@ export function getNodesAndEdgesUnderParent(
): { nodes: ReactFlowComponent[]; edges: ReactFlowEdge[] } {
const parentId = allNodes.find((node) => node.type === parentGroup)
?.id as string;
const nodes = allNodes.filter(
(node) =>
node.parentNode === parentId &&
// TODO: hardcoding that only create fields will be eligible for parsing. Make more generic in the future
node.data.createFields !== undefined &&
node.data.createFields.length > 0
);
const nodes = allNodes.filter((node) => node.parentNode === parentId);
const nodeIds = nodes.map((node) => node.id);
const edges = allEdges.filter(
(edge) => nodeIds.includes(edge.source) || nodeIds.includes(edge.target)
Expand Down
21 changes: 13 additions & 8 deletions public/route_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,11 @@ export interface RouteService {
deleteWorkflow: (workflowId: string) => Promise<any | HttpFetchError>;
getWorkflowPresets: () => Promise<any | HttpFetchError>;
catIndices: (pattern: string) => Promise<any | HttpFetchError>;
searchIndex: (index: string, body: {}) => Promise<any | HttpFetchError>;
searchIndex: (
index: string,
body: {},
searchPipeline?: string
) => Promise<any | HttpFetchError>;
ingest: (index: string, doc: {}) => Promise<any | HttpFetchError>;
searchModels: (body: {}) => Promise<any | HttpFetchError>;
}
Expand Down Expand Up @@ -161,14 +165,15 @@ export function configureRoutes(core: CoreStart): RouteService {
return e as HttpFetchError;
}
},
searchIndex: async (index: string, body: {}) => {
searchIndex: async (index: string, body: {}, searchPipeline?: string) => {
try {
const response = await core.http.post<{ respString: string }>(
`${SEARCH_INDEX_NODE_API_PATH}/${index}`,
{
body: JSON.stringify(body),
}
);
const basePath = `${SEARCH_INDEX_NODE_API_PATH}/${index}`;
const path = searchPipeline
? `${basePath}/${searchPipeline}`
: basePath;
const response = await core.http.post<{ respString: string }>(path, {
body: JSON.stringify(body),
});
return response;
} catch (e: any) {
return e as HttpFetchError;
Expand Down
10 changes: 7 additions & 3 deletions public/store/reducers/opensearch_reducer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,15 @@ export const catIndices = createAsyncThunk(

export const searchIndex = createAsyncThunk(
SEARCH_INDEX_ACTION,
async (searchIndexInfo: { index: string; body: {} }, { rejectWithValue }) => {
const { index, body } = searchIndexInfo;
async (
searchIndexInfo: { index: string; body: {}; searchPipeline?: string },
{ rejectWithValue }
) => {
const { index, body, searchPipeline } = searchIndexInfo;
const response: any | HttpFetchError = await getRouteService().searchIndex(
index,
body
body,
searchPipeline
);
if (response instanceof HttpFetchError) {
return rejectWithValue('Error searching index: ' + response.body.message);
Expand Down
19 changes: 18 additions & 1 deletion server/routes/opensearch_routes_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,19 @@ export function registerOpenSearchRoutes(
},
opensearchRoutesService.searchIndex
);
router.post(
{
path: `${SEARCH_INDEX_NODE_API_PATH}/{index}/{search_pipeline}`,
validate: {
params: schema.object({
index: schema.string(),
search_pipeline: schema.string(),
}),
body: schema.any(),
},
},
opensearchRoutesService.searchIndex
);
router.put(
{
path: `${INGEST_NODE_API_PATH}/{index}`,
Expand Down Expand Up @@ -103,14 +116,18 @@ export class OpenSearchRoutesService {
req: OpenSearchDashboardsRequest,
res: OpenSearchDashboardsResponseFactory
): Promise<IOpenSearchDashboardsResponse<any>> => {
const { index } = req.params as { index: string };
const { index, search_pipeline } = req.params as {
index: string;
search_pipeline: string | undefined;
};
const body = req.body;
try {
const response = await this.client
.asScoped(req)
.callAsCurrentUser('search', {
index,
body,
search_pipeline,
});

return res.ok({ body: response });
Expand Down

0 comments on commit 75d8c8f

Please sign in to comment.