Skip to content

Batch Pipeline to pull data from S3 to Clickhouse as OALP database and use Starrocks for ad-hoc interactive query engine

Notifications You must be signed in to change notification settings

andreale28/spark-playground

Repository files navigation

SPARK BATCH ETL PIPELINE

Data Pipeline Architecture Documentation

Overview

This document provides a comprehensive overview of the data pipeline architecture, designed to ingest, process, and deliver data for a variety of analytical workloads. The pipeline leverages industry-leading technologies to ensure high performance, scalability, reliability, and flexibility, enabling insightful data-driven decision-making.

Architecture Explanation

Data Ingestion

  • JSON log files are initially stored in MinIO S3-compatible object-storage.
  • Apache Spark extracts the data from S3, initiating the ETL process.

Data Processing

  • Spark performs ETL operations, adhering to the Medallion architecture for robust data pipeline design.
  • Data is transformed and stored in Delta Lake format, ensuring ACID transactions, schema evolution, and time travel capabilities.
  • Delta Lake Compaction and Z-Order are applied on data to speed up the ETL performance.
  • Pandera serves as validation tool to ensure the quality of data.

Data Storage and Querying

Processed data in Delta Lake format is accessible to multiple query engines for diverse analytical needs:

  • ClickHouse: Serves as OALP database, load Delta Lake tables directly from MinIO using native Delta Engine of Clickhouse, enables real-time analytics and low-latency queries.
  • StarRocks: Serves as the primary query engine, providing high-performance OLAP capabilities for complex analytical queries.
    • StarRocks Architecture:
      • Frontend (FE) Node: Optimizes and plans query execution for efficient performance.
      • Backend (BE) Node: Stores and manages data, executing queries to retrieve insights.
      • Broker Node: Coordinates and manages FE and BE nodes, ensuring load balancing and seamless query routing.

Metadata Management

Hive Metastore serves as a centralized repository for metadata, enabling efficient management of table schemas, partitions, and data lineage.

Key Features and Benefits

  • Scalability: The pipeline architecture supports seamless growth and accommodates increasing data volumes.
  • Performance: StarRocks and ClickHouse deliver exceptional query performance, empowering timely insights.
  • Reliability: Delta Lake guarantees data integrity and resilience to failures, ensuring data trustworthiness.
  • Flexibility: The pipeline supports multiple query engines, catering to diverse analytical requirements.
  • Manageability: Hive Metastore simplifies metadata management, streamlining data governance and administration.

Data Processing Detail

Log Data and Multi-hop Architecture

The raw data used in this project is logging data tracking user behaviour of a telecom company in one month of 2023 and stored in S3-compatible object storage (MinIO). This data is processed following medallion architecture (multi-hop architecture) which logically organize data in lakehouse, with the goal of incrementally and progressively improving the structure and quality of data as it flows through each layer of the architecture (from Bronze (Raw Integration) -> Silver (Filter, Cleaned, Augmented) -> Gold (Business-Level aggregates)).

  1. Bronze layer: This layer stores data from upstream sources as is. Since data is stored initially in nested JSON format, it needs to be unpacked in wide-column format table.

    Bronze layer schema
    |-- Index: string (nullable = true)
    |-- Type: string (nullable = true)
    |-- Id: string (nullable = true)
    |-- Score: long (nullable = true)
    |-- AppName: string (nullable = true)
    |-- Contract: string (nullable = true)
    |-- Mac: string (nullable = true)
    |-- TotalDuration: long (nullable = true)
    |-- Date: date (nullable = true)
    
  2. Silver layer: Data from bronze layer is transformed in custom logic which is analyzing the user behaviours in a month following requirements, for instance:

    • How much time does the customers spend on watching several types of channel?
    • Which is the most watching categories with respect to each customer?
    • Based Customer360 principles, segmenting the customers into different categories using RFM analysis
    Silver layer schema
     |-- Contract: string (nullable = true)
     |-- TVDuration: long (nullable = true)
     |-- ChildDuration: long (nullable = true)
     |-- MovieDuration: long (nullable = true)
     |-- RelaxDuration: long (nullable = true)
     |-- SportDuration: long (nullable = true)
     |-- MostWatch: string (nullable = true)
     |-- SecondMostWatch: string (nullable = true)
     |-- ThirdMostWatch: string (nullable = true)
    
  3. Gold layer: Data from silver layer and bronze are combined to form datasets that directly map to the end-user use case.

    Gold layer schema
    |-- Contract: string (nullable = true)
    |-- TVDuration: long (nullable = true)
    |-- ChildDuration: long (nullable = true)
    |-- MovieDuration: long (nullable = true)
    |-- RelaxDuration: long (nullable = true)
    |-- SportDuration: long (nullable = true)
    |-- MostWatch: string (nullable = true)
    |-- SecondMostWatch: string (nullable = true)
    |-- ThirdMostWatch: string (nullable = true)
    |-- RecencyScore: integer (nullable = true)
    |-- FrequencyScore: integer (nullable = true)
    |-- MonetaryDurationScore: integer  (nullable = true)
    

Log data is available upon request, please contact the repository owner.

Coding Pattern

Factory pattern is implemented in this project. An abstract class StandardETL contains the processing pattern which get inherited by LogETL class and other pipelines (if needed). Although this pattern requires high maintenance, it allows many different pipeline share an identical process without repeating themselves.

Moreover, the function to read/write data (publish_data method) must be separate from the transformation logic (etl_bronze-layer, .etc). This enables easier testing of transformation logic, simpler debugging and follows functional design principles.

Data validation

When building a pipeline, it is critical to ensure the quality of input and output data. If downstream consumers use bad data, it can be disastrous.

To prevent such nightmare, validate_data method is implemented using Pandera to validate data before publishing it. This process usually involves the time-consuming and expensive re-running of all the affected processes since validation requires cluster computer to re-loading and counting on datasets. Thankfully, Pandera has a cache mechanism to reducing such processes. Readers can refer to workspaces/scripts/table_schema.py to know more about validation details.

Reproducing pipeline

First, reader can clone this project with Git. A Makefile is included to simplify the reproducing steps.

make up #run the docker container, this takes a few minutes at first run
make start #start the pipeline 
make down #shut down the container after finishing 

Note

On Hive Metastore

Spark has two ways to store the metadata. The first one is storing metadata in built-in Apache Derby RDBMS but this option does not support multiple SparkSession at a time. The second one is Hive Metastore which allows multiple session concurrently. Hive Metastore also acts as a centralized metadata store for other service enabling composable lakehouse.

On Clickhouse

Instead of pushing data from MinIO to Clickhouse using Clickhouse-Spark connector, this project will rely on the native ingesting engine which is Delta Engine from Clickhouse. Readers can follow the steps in workspaces/scripts/clickhouse_ddl.sql.

  • Why not using Clickhouse-Spark connector?: Using Clickhouse-Spark connector will require additional workload on Spark cluster to transfer to data from MinIO to Clickhouse or from in-memory dataframe to Clickhouse. To get rid of that, I decide to use Delta Engine from Clickhouse which is much faster than putting more workloads on Spark cluster. From testing process, this also consumes less time than using Clickhouse-Spark connector.

On Starrocks

Trino, a leading query engine, are often used to conduct interactive queries. However, for this project I opt to Starrocks due to some reasons:

  1. Starrocks has a C++ vectorized execution engine which provides faster query performance than Trino. In short, with vectorized execution engine, Starrocks can run multiple elements simultaneously.
  2. Starrocks can be scaled horizontally well by adding more nodes to the cluster. In this project, a shared-nothing architecture with 1 FE node, 1 BE node and 1 Broker node are used but this architecture can be scaled up if computation resources allow. Broker node presented here also supports both synchronous and asynchronous loading ensuring load balancing. Broker node also supports loading Parquets, ORC and CSV files which opens up more loading use cases and patterns.
  3. A shared-data which has FE nodes and CN (Compute Node) nodes is on plan of implementation. This architecture reduces storage costs, ensures better resource isolation and high elasticity and scalability.
  4. Compare to Trino, Starrocks has a built-in cache mechanism which can reduce the latency of querying data from MinIO. This reduces query latency, improve resource utilization and high scalability. While Trino doesn't have a built-in cache, it can leverage external caching solutions like Alluxio for similar benefits. However, this introduces additional complexity and requires separate configuration and management.

Readers can refer to workspaces/scripts/starrocks_query.sql to have a look on data lake querying using Starrocks.

About

Batch Pipeline to pull data from S3 to Clickhouse as OALP database and use Starrocks for ad-hoc interactive query engine

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published