Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RFC] OpenSearch and Apache Spark Integration #4

Open
penghuo opened this issue Nov 29, 2022 · 16 comments
Open

[RFC] OpenSearch and Apache Spark Integration #4

penghuo opened this issue Nov 29, 2022 · 16 comments
Assignees

Comments

@penghuo
Copy link
Collaborator

penghuo commented Nov 29, 2022

Introduction

We received a feature request for query execution on object stores in OpenSearch.

We have investigated the possibility to build a new solution for OpenSearch uses and leverage object store as storage. Which includes

We found the challenges are

  • OpenSearch aggregation framework is the simplified MPP frameworks and does not support shuffle stage.
  • OpenSearch query framework missing key feature support, E.g. JOIN, Subquery.

We found these work have been solved by general purpose data preprocessing system, E.g. Presto, Spark, Trino. And build such a platform require years to mature.

Idea

The initial idea is

  1. Using SQL as interface.
  2. Leverage spark as query/compute execution engine.

High level diagram:

Screenshot 2023-06-16 at 8 21 37 AM

User Experience

  1. User configure SPARK cluster as computation resource, E.g. https://SPARK:7707.
  2. User submit SQL to OpenSearch cluster use _plugins/_sql REST API.
    1. SQL engine parse and analysis the SQL query.
    2. SQL engine decide whether route the query to SPARK cluster or run query locally.
  3. In phase-1, we provide interface to let user create derived dataset from data on object store and store in OpenSearch. Then query will be optimized based derived dataset automatically during query time.
  4. In phase-2, we provide opt-in optimization choice for user. The derived dataset will be create automatically based on query pattern.

Epic

@penghuo penghuo added the enhancement New feature or request label Nov 29, 2022
@anirudha
Copy link
Collaborator

This enables spark as a compute connector to opensearch data. correct ?
can we set this up as a remote compute connection similar to a data source ?

  • query from spark cluster to index is SQL ?

@penghuo
Copy link
Collaborator Author

penghuo commented Nov 29, 2022

This enables spark as a compute connector to opensearch data. correct ?

Yes. But not limit to OpenSearch data.

can we set this up as a remote compute connection similar to a data source ?

It is one option, but i feel we should make it more generic. ML could also leverage Spark as computation engine.

query from spark cluster to index is SQL ?

  • OpenSearch SQL engine will submit the job to Spark cluster. E.g. the job leverage OpenSearch to store materialization view and accelerate the query.
  • Potentially, we could also leverage Spark Job to query/load cold lucene segment on object store, and provide alternative solution for ultrawarm query path.

@YANG-DB
Copy link
Member

YANG-DB commented Dec 8, 2022

Will opensearch SQL engine be responsible for analyzing the query and dispatching all the queries to the MPP engine ?
or it will have the ability to do parts of the query itself (for opensearh indices) and other parts delegate to spark ? will this require adding rules to Catalyst ?

@dai-chen
Copy link
Collaborator

Just some thoughts for discussion and PoC later: we need to verify and confirm the role of Spark RDD (with/without Spark SQL) in OpenSearch:

  1. Provide capability for querying object store only by Spark RDD: immediate requirement and our own query engine is still needed for querying OpenSearch index and planning execution jobs for Spark
  2. Provide capability for querying OpenSearch index as well by Spark SQL + RDD: higher complexity and needs to either get Spark SQL/RDD work with OpenSearch DSL, or OpenSearch index, or Lucene index directly and thus:
    2a. Provide faster execution path or remove limitation in OpenSearch aggregation/join
    2b. Replace OpenSearch DSL query

@dai-chen
Copy link
Collaborator

dai-chen commented Dec 19, 2022

As discussed, Spark SQL and RDD is only for purpose # 1 above. Leveraging it for querying OpenSearch index is a totally different story and not our current goal. So in this case, the question for introducing Spark SQL is: whether we need it for Spark RDD job optimizing and planning to query object store.

Implementation options:

  1. Introduce Spark SQL as library
  2. Introduce partial, such as only Catalyst optimizer
  3. Copy source code to make required changes
  4. Reuse our own engine to plan RDD job

Research items:

  1. Metastore: how/where to manage table metadata for Spark SQL
  2. Fault tolerance: get WAL and intermediate data store work for streaming
  3. Thread pool: check if any blocking operation in Spark SQL
  4. Data source integration: pass credential from Data Source introduced in 2.4 to file system reader
  5. Plugin setting: for example response size limit, need to make it work as well

@anirudha anirudha changed the title [FEATURE] OpenSearch SQL on Spark [FEATURE] OpenSearch and Spark Integration Jan 31, 2023
@anirudha anirudha changed the title [FEATURE] OpenSearch and Spark Integration [FEATURE] OpenSearch and Apace Spark Integration Jan 31, 2023
@anirudha anirudha changed the title [FEATURE] OpenSearch and Apace Spark Integration [FEATURE] OpenSearch and Apache Spark Integration Jan 31, 2023
@anirudha
Copy link
Collaborator

anirudha commented Mar 3, 2023

os-sql-viz-final.mov

@ps48
Copy link
Member

ps48 commented Mar 3, 2023

OS-SQL-SPARK.mp4

@ryn9
Copy link

ryn9 commented Mar 9, 2023

Amazing stuff!

How will you support filtering (eg. timestamp ranges and/or keywords) in relation to S3 path schema.

For example - if using fluentbit's S3 output with s3_key_format /$TAG[2]/$TAG[0]/%Y/%m/%d/%H_%M_%S/$UUID.gz how will we map a keyword to pull objects with only the tags in a supplied filter and time range desired?

ref: https://docs.fluentbit.io/manual/pipeline/outputs/s3/

@dai-chen
Copy link
Collaborator

Amazing stuff!

How will you support filtering (eg. timestamp ranges and/or keywords) in relation to S3 path schema.

For example - if using fluentbit's S3 output with s3_key_format /$TAG[2]/$TAG[0]/%Y/%m/%d/%H_%M_%S/$UUID.gz how will we map a keyword to pull objects with only the tags in a supplied filter and time range desired?

ref: https://docs.fluentbit.io/manual/pipeline/outputs/s3/

@ryn9 Similar as optimization in other query engine, we can leverage partition pruning and data skipping on your data (path or content). Please see general example for data skipping in opensearch-project/sql#1379 (comment). We may look into FluentBit later. Thanks!

@anirudha
Copy link
Collaborator

Decorators will be available in FluentBit/ Data-Pepper/ otel-exporter

@dai-chen
Copy link
Collaborator

@muralikpbhat
Copy link

Great initiative. Really like the price performance trade off that this solution will bring in. Few questions below:

  1. Can we think about and call out what are the downsides of doing query planing in spark? Will it restrict some of the existing features of open search? What are those ? Few pointers:

a. What are the types of queries that doesn’t work with sql today ?
b. How will DLS/FLS work ?
c. How document level alerting/percolator work?

  1. How are we thinking about life cycle management of materialised views ? We need an ability to delete old MVs. Assuming maximus table and skipping indices don’t need that as they will not be very huge.
  2. Are we using data streams for MV so that we don’t need explicit index rotation ?
  3. Can we think of on-demand materialised views instead of keeping it up to date (cost reduction)
  4. In case of MV, can the query span across MV and raw data ? (Case where one data file is projected completely and the other is not)
  5. Similarly, can the query span across fields in MV and raw data for the same document? (Not for fields in skipping index, but for fields in MVs covered index)

@dai-chen
Copy link
Collaborator

@muralikpbhat Thanks for all the comment! Please find my answer inline as below.


1.Can we think about and call out what are the downsides of doing query planing in spark? Will it restrict some of the existing features of open search? What are those?

In our demo, we use Spark SQL mostly for building skipping index and MV into OpenSearch index. Finally all query and dashboard works with the index as before.

As you asked below, I assume we're talking about Spark SQL query with OS index involved, if so there are limitations:

a. What are the types of queries that doesn’t work with sql today ?

OpenSearch functions including full text search and aggregation: this maybe solved by either improving OS-Hadoop or introducing our OS SQL plugin into Spark.

How will DLS/FLS work ?

I think we need separate AuthN/Z for raw data on S3. If you're talking about OS index, the query sent to OS is still DSL which may work. We need deep dive.

How document level alerting/percolator work?

I think all OS feature can work with MV. But for raw data, I'm not sure. Need to understand the use case and workflow.


2.How are we thinking about life cycle management of materialised views ? We need an ability to delete old MVs. Assuming maximus table and skipping indices don’t need that as they will not be very huge.

Yes, we're considering MV as second level and on-demand acceleration strategy. We will provide standard SQL API for higher level application to use, such as SHOW/DROP MV.


3.Are we using data streams for MV so that we don’t need explicit index rotation ?

As shown in demo above, the sink (destination) of streaming job behind MV is regular OpenSearch index. I think we can make it any OpenSearch object as long as OpenSearch-Hadoop connector can support it.


4.Can we think of on-demand materialised views instead of keeping it up to date (cost reduction)

Yes, that's what we're doing in the demo. We ignore the strong consistency between MV and source intentionally.


5.In case of MV, can the query span across MV and raw data ? (Case where one data file is projected completely and the other is not)

Yes, because MV itself is a table too. User can use it in any query with raw data. We didn't do this in the demo because currently OS-Hadoop doesn't extend Spark Catalog so efforts required to register MV or any OS index to Spark catalog.

Meanwhile, I'm not sure what specific use case or query you're referring to. Actually we also consider and may need this in future for Hybrid Scan capability. Hybrid scan will union the MV data and latest raw data. This will be helpful for customer who want strong consistency.


6.Similarly, can the query span across fields in MV and raw data for the same document? (Not for fields in skipping index, but for fields in MVs covered index)

Not pretty sure what the query looks like. I think it's possible as long as there is primary key field in MV correlated to row in raw data.

@dai-chen dai-chen changed the title [FEATURE] OpenSearch and Apache Spark Integration [RFC] OpenSearch and Apache Spark Integration Apr 3, 2023
@sathishbaskar
Copy link

Would joins involve pulling data to RDDs?

@penghuo
Copy link
Collaborator Author

penghuo commented Jul 3, 2023

Would joins involve pulling data to RDDs?

could you eleberate more? do you mean join OpenSearch Index and S3?

@sathishbaskar
Copy link

could you eleberate more? do you mean join OpenSearch Index and S3?

An example would help explain this better. Consider following datasets,

users [20 billion docs, ~ 2 TB]
user_id, user_name, user_location

pages [1 trillion docs, ~ 90 TB]
page_id, website_id

page_views [10 trillion docs, over 1 PB]
hour_timestamp
user_id
page_id

If I have to prepare a report every day, to summarize page view pattern in the 7 days - top 100 pages and top 100 locations, with following result schemas,

  1. day, hour, page_id, website_id, views
  2. day, hour, user_location, views
SELECT
  DATE(pv.hour_timestamp) AS day, HOUR(pv.hour_timestamp) AS hour, pv.page_id, p.website_id,  COUNT(*) AS views
FROM
  page_views pv JOIN pages p ON pv.page_id = p.page_id
WHERE
  pv.hour_timestamp >= date_sub(current_date(), interval 7 days)
GROUP BY
  day, hour, pv.page_id, p.website_id
ORDER BY views DESC LIMIT 100
SELECT
  DATE(pv.hour_timestamp) AS day, HOUR(pv.hour_timestamp) AS hour, u.user_location, COUNT(*) AS views
FROM
  page_views pv JOIN users u ON pv.user_id = u.user_id
WHERE
  pv.hour_timestamp >= date_sub(current_date(), interval 7 days)
GROUP BY
  day, hour, u.user_location
ORDER BY views DESC
LIMIT 100

Assuming users & pages are completely available in Opensearch storage in a reasonably large cluster, and page_views is a materialized view, with most data in S3, I'd like to understand how we plan to make the joins work. Would Spark data frames be loaded with data fetched from Opensearch index and Opensearch materialized views, and then processed within Spark runtime? And do we intend to push down some of the compute to Opensearch, as we could avoid good amount of network transfers?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

8 participants