Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEATURE] Cost-effective materialized view for high cardinality data #765

Open
dai-chen opened this issue Oct 10, 2024 · 2 comments
Open
Labels
Core:MV enhancement New feature or request

Comments

@dai-chen
Copy link
Collaborator

dai-chen commented Oct 10, 2024

Is your feature request related to a problem?

When dealing with high cardinality data, Materialized Views (MVs) can become excessively large and inefficient, leading to significant performance and cost challenges. Take VPC Flow dashboard as an example, high cardinality fields like source and destination IP pairs create significant storage challenges when using MVs. At the terabyte (TB) scale, each 1-minute window can result in hundreds of millions of rows after grouping.

Screenshot 2024-11-05 at 9 23 44 AM  

Materialized view definition for VPC flow dashboard:

CREATE MATERIALIZED VIEW vpc_flow_log_mv
AS
SELECT
  window.start AS startTime,
  activity_name,
  protocol,
  src_endpoint.ip AS src_ip,
  dst_endpoint.ip AS dst_ip,
  dst_endpoint.port AS dst_port,
  COUNT(*) AS total_count,
  SUM(traffic.bytes) AS total_bytes,
  SUM(traffic.packets) AS total_packets
FROM vpc_flow_logs
GROUP BY
  TUMBLE(eventTime, '1 Minute'),
  activity_name,
  protocol,
  src_endpoint.ip,
  dst_endpoint.ip,
  dst_endpoint.port

Cost Breakdown

The cost of maintaining MVs for high-cardinality data primarily arises from three areas:

  1. Creation Cost
    • a) Computation: High-cardinality fields demand intensive computation for data aggregation, driving up processing cost.
    • b) Indexing: Writing to complex storage formats, such as Lucene indexing, adds further cost due to the resources required.
    • Key factors: Complexity of MV definition and choice of target file format.
  2. Storage Cost
    • Large data volumes in high-cardinality views increase storage expenses and lead to inefficiencies in data retrieval and maintenance.
    • Key factors: Volume of MV data and choice of storage medium.
  3. Query Cost
    • Querying high-cardinality views can be costly, as the size and complexity of the data require more resources to maintain responsiveness for real-time analytics.
    • Key factors: Frequency of dashboard queries on MV data and its computational complexity.

What solution would you like?

Solution Overview

To address the challenges associated with high-cardinality data and dashboard visualizations, we are considering three primary solutions: Aggregate MV, Approximate MV, and Direct Querying. The table below provides a baseline comparison of the cost implications for each solution without additional optimizations.

Screenshot 2024-11-06 at 2 43 56 PM

Design Goals

Our primary objective is to reduce the overall cost of maintaining MVs by optimizing these three aspects — creation, storage, and query — specifically for high-cardinality fields and dashboard visualizations.

Beyond cost reduction, we aim for a balanced solution that effectively manages the trade-offs between cost, latency, and accuracy. This involves addressing the specific challenges of each approach, such as reducing storage requirements in Aggregate MV, enhancing accuracy in Approximate MV, and improving query latency in Direct Querying.

Screenshot 2024-11-04 at 9 49 55 AM

Proposed Path Forward [TBD]

This recommended path forward includes both short-term enhancements to the existing aggregate MV and longer-term solutions that better align with product goals. For more details on each approach, refer to the following Additional Context section.

  • Short-Term (Aggregate MV)

    • Implement partial materialization with optional on-demand backfilling.
    • Optimize OpenSearch index to reduce indexing resource usage.
  • Long-Term

    • MV on Object Store: Explore the possibility of using a cost-effective object store to hold an MV.
  • Long-Term (Product Alignment Required)

    • Approximate MV: Transition to an approximate approach, which may involve adjustments to the product definition.
    • Direct Querying: Evaluate direct querying from the dashboard on the source dataset, requiring product validation to confirm the experience aligned with user expectations.

Note: The following section provides detailed technical context. Continue if you’re interested in specifics ...

Do you have any additional context?

(I) Aggregate Materialized View

The most straightforward approach follows the core concept of an aggregate materialized view, with further optimizations reducing its size and cost while maintaining accuracy.

  • a) Partial Materialization: Generate the MV only for time ranges selected by users. For data outside these ranges, utilize direct querying or backfilling from the source to minimize storage needs.
  • b) Optimize OpenSearch Storage: Apply techniques such as selective field indexing and compression to reduce the storage footprint within OpenSearch.
  • c) Lower Cost Storage: Store MV data in cost-effective storage solutions, like object stores (e.g. S3). Use either OpenSearch-compatible formats (e.g., searchable snapshots) or alternative formats accessible via Spark.
  • d) Hybrid Aggregation with Direct Querying: Materialize only low-cardinality columns in the MV, while using direct queries for high-cardinality data.

a) Partial Materialized View with Backfilling

Screenshot 2024-11-06 at 3 06 20 PM

  • User Workflow

    • Users select an initial time range for real-time analytics, prompting an auto-refresh MV to refresh data from the source.
    • When users extend the time range to an earlier start date, it triggers backfill loading, pulling older data into the MV.
  • Cons

    • Limited Benefits for Long-Term Analysis: This approach is beneficial for users focused on analyzing recent data. However, it may be less effective for those requiring analysis across the entire dataset or over longer time ranges, such as a full year.
    • Potential Dashboard Delays: Users may experience delays when using the dashboard, even after the initial bootstrap, as backfill operations may happen during their analytics. This could impact the real-time responsiveness of the dashboard.
  • Technical Challenges

    • Backfilling Job Management
      • Abstract the backfill loading process as part of the auto-refresh MV, reflecting it in index state and metadata. For instance, update the MV start time upon backfill completion, which is required for future query optimization.
      • Avoid or manage multiple backfill jobs triggered in parallel to prevent resource contention.
    • Overlap Time Window Handling
      • Handle overlaps between time windows loaded by both auto-refresh and backfill processes to ensure data consistency and accuracy.
    • Similar challenges discussed in

b) OpenSearch Index Optimizations

Regardless of the chosen approach, additional optimizations can be applied within the OpenSearch index to further reduce both indexing computation and storage requirements. It's essential to validate these optimizations through real testing to ensure they translate into measurable cost reductions.

  • Disable _source: Disabling the _source field can reduce the index size by approximately 40%, though it comes with side effects such as the loss of certain search functionalities like highlighting. This approach sacrifices some flexibility for better storage efficiency.

  • Disable docvalues for non-aggregated fields: For fields used only for filtering on the dashboard, docvalues can be disabled to save storage, as they are not required for aggregation.

  • Disable inverted index on non-dimension fields: If the index is primarily used to serve pre-canned dashboards, you can consider disabling the inverted index on non-dimension fields to save storage. This is particularly useful if these fields do not need to be used in filters, though they can still be used in aggregations.

  • Store by the most appropriate field type: Storing fields using the most fitting data types can significantly reduce the storage size. For example, instead of storing as a keyword field, using the ip type for IPv4 addresses which are very common in VPC flow log, CloudTrail, WAF dataset.

  • Change compression rate: Adjusting the compression settings in OpenSearch from the default to the best compression option can further reduce the index size, at the cost of slightly slower indexing speeds.

The last item can be configured in index_settings option while the first two will be configurable once support for #772 is implemented.

c) Materialized View on Cost-Effective Object Store

Screenshot 2024-11-06 at 3 06 31 PM

  • Cons

    • Warmup Latency: Initial queries on uncached segments may experience higher latency, as data segments are loaded on demand from the object store.
    • Read-Only Limitations: Searchable snapshots create a read-only index, limiting flexibility for any updates or modifications to the MV.
  • Technical Challenges

    • Index Cache Tuning: Efficiently managing the LRU cache is essential to balance memory usage and query performance, including tuning cache algorithm, size and eviction policies.
    • Snapshot Format Compatibility: Writing data from Spark jobs compatible with OpenSearch’s snapshot format presents an open question.

(II) Approximate Materialized View

The basic idea is to create MVs that store approximate or partial data, significantly reducing storage requirements by capturing only the most essential information while accepting trade-offs in accuracy or completeness.

  • a) Approximate Aggregation: Employ approximate data structures to handle high cardinality data. Specifically, these stream-friendly SQL functions process data without requiring sorting and compress the data by storing approximate values, such as Top K by frequency, or Top K by sum.
  • b) Iceberg-Cube Aggregation: This approach groups the source data by different dimension combinations (e.g., source IP, destination IP, source-destination IP pair) and selectively materializes only a subset of the cells in these large groups. By applying threshold such as Top K most frequent, it can decide which cells to materialize, significantly reducing storage while still capturing the most relevant data for querying.
  • c) Cluster-based Aggregation: This approach reduces the cardinality of high-cardinality columns by grouping similar values into clusters, such as clustering specific IPs into IP ranges or geographical locations (e.g., countries or regions). By aggregating data into these broader clusters, the materialized view significantly reduces the number of rows, optimizing storage while still providing meaningful data for analysis.
Screenshot 2024-11-05 at 10 43 37 AM
  • Cons

    • Reduced Accuracy over Aggregated Windows: Approximate or partial results are stored for fine-grained windows, such as 1-minute intervals. However, dashboards often need to aggregate this data into larger windows, such as 5-minute intervals. Aggregating such results over these larger windows can introduce errors or reduce the precision of the data, making it harder to trust the insights.
    • Overhead on small datasets: High cardinality columns are duplicate in each approximate aggregate result or cube cells (Top K src_ip by count, Top K src_ip by bytes). This duplication increases the overhead, especially for small datasets, as the approximation structures must now store multiple instances of these columns, potentially inflating the data size and reducing the storage efficiency benefits.
  • Technical Challenges

    • Complex Computation Limitations in Spark Streaming: The cubing approach in 2b requires advanced operations, such as window functions to calculate top-k per group. However, Spark Structured Streaming currently lacks support for these complex computations, and how to make it work remains an open question.
    • Lack of Built-in Approximate Top-K Function: Spark does not have a built-in approximate top-k function, so this functionality would need to be implemented from scratch, adding complexity and development effort.
    • Frontend Adaptation for New MV Structure: The new MV structure, which may include top-k results in nested fields or sparse indexing from data cubing, will require corresponding changes in the frontend code to handle these structures properly.

(III) Direct Querying Dashboard

Perform direct queries on the source data for real-time analysis, similar to Grafana and CloudWatch Log Insights experience, without relying on pre-computed MV.

Screenshot 2024-11-06 at 3 06 44 PM

  • Pros

    • Realtime Data Access: Users can query the most up-to-date data directly from the source.
    • Reduced Storage Costs: Eliminates the need for persistent storage of MVs.
    • Flexibility: Allows for a wide range of queries without being limited by the predefined structure of MVs.
  • Cons

    • Query Latency: Direct querying can result in slower response times, especially with large datasets or complex queries, impacting the responsiveness of the dashboard.
    • Resource Intensity: Frequent querying can place a higher load on the source data query engine and storage.
  • Technical Challenges

    • Automatic Acceleration: Leverage Query Result Cache for repeated, deterministic queries (e.g., same query with varying time ranges) or MV created dynamically based on workload.
    • Progressive Execution: Deliver incremental, partial results to the dashboard, improving responsiveness by displaying data as it’s processed.
    • Approximate Functions: Apply approximate calculations (e.g., approximate counts or top-k) to speed up response times, similar to the techniques used in approximate MVs.
@dai-chen dai-chen added enhancement New feature or request untriaged Core:MV and removed untriaged labels Oct 10, 2024
@dai-chen
Copy link
Collaborator Author

dai-chen commented Oct 11, 2024

Proof of Concept: Approximate Aggregation Approach

Goals

Verify the feasibility of the Approximate Aggregation MV approach and evaluate its impact on storage, performance and cost, specifically including:

  • Implement approximate aggregate functions such as approx_top_count and approx_top_sum.
  • Compare actual storage in the OpenSearch index between traditional aggregate MV and approximate aggregate MV.
  • Benchmark MV build and dashboard query performance between traditional aggregate MV and approximate aggregate MV.
  • Evaluate EMR-S job costs for building the materialized view using traditional aggregation versus approximate aggregation.

Design

Syntax:

CREATE MATERIALIZED VIEW vpc_flow_log_mv
AS
SELECT
  window.start AS startTime,
  activity,
  APPROX_TOP_COUNT(src_endpoint.ip, 100) AS top_k_src_ip_by_count,
  APPROX_TOP_COUNT(dst_endpoint.ip, 100) AS top_k_dst_ip_by_count,
  APPROX_TOP_SUM(src_endpoint.ip, 100) AS top_k_src_ip_by_sum,
  APPROX_TOP_SUM(dst_endpoint.ip, 100) AS top_k_dst_ip_by_sum,
  APPROX_TOP_COUNT(ARRAY(src_endpoint.ip, dst_endpoint.ip), 100) AS top_k_src_dst_ip_by_count,
  COUNT(*) AS total_count,
  SUM(traffic.bytes) AS total_bytes,
  SUM(traffic.packets) AS total_packets
FROM vpc_flow_logs
GROUP BY
  TUMBLE(eventTime, '1 Day'),
  activity_name

Materialized view data:

    "_source": {
          "startTime": "2024-10-01 12:00:00",
          "activity": "Traffic",
          "top_k_src_ip_by_count": [
            {
              "ip": "192.168.0.100",
              "count": 23205
            },
            ...
           ]
           "top_k_dst_ip_by_count": [
            {
              "ip": "127.0.01",
              "count": 238
            }
            ...
           ]
        },

OpenSearch DSL query:

POST /vpc_flow_log_approx_mv/_search
{
  "size": 0,
  "aggs": {
    "top_ips": {
      "nested": {
        "path": "top_k_src_ip_by_count"
      },
      "aggs": {
        "ip_buckets": {
          "terms": {
            "field": "top_k_src_ip_by_count.ip",
            "size": 100,
            "order": {
              "total_count": "desc"
            }
          },
          "aggs": {
            "total_count": {
              "sum": {
                "field": "top_k_src_ip_by_count.count"
              }
            }
          }
        }
      }
    }
  }
}

Implementation Tasks

  1. Implement approx_top_count function: Create a function to compute approximate top K counts for high cardinality fields.
  2. Implement approx_top_sum function: Develop a similar function for approximate top K sum calculations.
  3. Support nested fields in MV output: Ensure the materialized view (MV) can output nested fields to store approximate aggregation results.
  4. Create a dashboard on MV data: Build a dashboard for visualizing the results from the MV, using approximate aggregation for top K values.

Testing Tasks

  1. Storage comparison: Compare the actual storage usage in the OpenSearch index for aggregate MV and approximate aggregate MV.
  2. Performance benchmark: Measure and compare the performance of building and querying aggregate MV and approximate aggregate MV.
  3. EMR-S cost evaluation: Evaluate the overall cost on EMR-S for building aggregate MV versus approximate aggregate MV.

@dai-chen
Copy link
Collaborator Author

dai-chen commented Nov 1, 2024

Proof of Concept: OpenSearch Index Optimizations

TODO: A quick test showed a 58% reduction in index size by disabling _source and unnecessary inverted indexes.

health status index              uuid                 pri rep docs.count docs.deleted store.size pri.store.size
              vpc_flow_logs_mv_3 oG5_m5IB0HCHA9l1-54T            1000000            0     82.3mb         82.3mb
              vpc_flow_logs_mv_5 o26Em5IB0HCHA9l1F56g            1000000            0     59.2mb         59.2mb
              vpc_flow_logs_mv_1 _xJzm5IB1Vv8TQuVfOXQ            1000000            0    140.9mb        140.9mb

# Current schema (baseline)
PUT vpc_flow_logs_mv_1
{
  "settings": {
    "number_of_shards": 1,
    "number_of_replicas": 2
  }, 
  "mappings": {
    "properties": {
      "startTime": { "type": "date" },
      "activity": { "type": "keyword" },
      "src_ip": { "type": "keyword" },
      "dest_ip": { "type": "keyword" },
      "total_count": { "type": "long" },
      "total_bytes": { "type": "long" },
      "total_packets": { "type": "long" }
    }
  }
}

# Disable source
PUT vpc_flow_logs_mv_3
{
  "settings": {
    "number_of_shards": 1,
    "number_of_replicas": 2
  }, 
  "mappings": {
    "_source": {
      "enabled": false
    },
    "properties": {
      "startTime": { "type": "date" },
      "activity": { "type": "keyword" },
      "src_ip": { "type": "keyword" },
      "dest_ip": { "type": "keyword" },
      "total_count": { "type": "long" },
      "total_bytes": { "type": "long" },
      "total_packets": { "type": "long" }
    }
  }
}

# Combine all
PUT vpc_flow_logs_mv_5
{
  "settings": {
    "number_of_shards": 1,
    "number_of_replicas": 2
  }, 
  "mappings": {
    "_source": {
      "enabled": false
    }, 
    "properties": {
      "startTime": { "type": "date" },
      "activity": { "type": "keyword" },
      "src_ip": { "type": "ip" },
      "dest_ip": { "type": "ip" },
      "total_count": { "type": "long", "index": false },
      "total_bytes": { "type": "long", "index": false },
      "total_packets": { "type": "long", "index": false }
    }
  }
}

@dai-chen dai-chen changed the title [FEATURE] Efficient storage of high cardinality data in materialized view [FEATURE] Cost-effective materialized view for high cardinality data Nov 4, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Core:MV enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

1 participant