-
Notifications
You must be signed in to change notification settings - Fork 477
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Create a new post for creating a new database integration with data prepper #3338
Changes from all commits
8c678f3
da9b752
e62c853
0f368a9
acc61a0
4eb2875
3cb65f1
efdefd2
918a59c
c9321c4
76716f5
25ef1ea
23239b5
24d7a11
1e21412
9a6b67b
f81c5ea
7d6e998
2a202d6
345d080
f7d7111
7c69248
7b69ff5
d5b7c28
120c629
f387f59
48fc976
708dda4
3d6f563
74457cf
349a8c2
0eae00d
81fe3e3
ccdf47f
93ce7d5
30fa3b3
38d4d8b
33d183b
b435caf
6ce6067
6ffe942
99175d0
c0653f1
bf7deea
abfed1e
19eae96
0f84c07
d398203
6de1340
a968bc6
df1858f
4fc360d
e029d6a
f3ed634
ee68a61
3498d1b
eab1f31
7f2ff64
ceed395
4f1d91d
34a09d5
e73b08e
e861af1
267ff08
4935010
f1b9134
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,366 @@ | ||
--- | ||
layout: post | ||
title: "Step-by-step: Creating a new database integration using Data Prepper" | ||
authors: | ||
- tylgry | ||
- dinujoh | ||
date: 2022-11-05 10:00:00 -0500 | ||
categories: | ||
- technical-post | ||
meta_keywords: Data Prepper, database integration, OpenSearch data collector, data pipeline, mongodb | ||
meta_description: Use this step-by-step guide to set up and test a Data Prepper pipeline. Create a new database integration that streams data from MongoDB to OpenSearch using source coordination. | ||
twittercard: | ||
description: "Data Prepper offers a flexible framework for database migration, supporting sources like MongoDB and Amazon DynamoDB. You can extend this capability to new databases by implementing a Data Prepper source plugin." | ||
--- | ||
|
||
Data Prepper, an open-source data collector, enables you to collect, filter, enrich, and aggregate trace and log data. With Data Prepper, you can prepare your data for downstream analysis and visualization in OpenSearch. | ||
|
||
Data Prepper pipelines consist of three main components: a source, an optional set of processors, and one or more sinks. For more information, see [Data Prepper key concepts and fundamentals](https://opensearch.org/docs/latest/data-prepper/#key-concepts-and-fundamentals). The following sections outline the steps necessary for implementing a new database source integration within Data Prepper. | ||
|
||
### Understanding push-based and pull-based sources | ||
|
||
Data Prepper source plugins fall into two categories: push based and pull based. | ||
|
||
_Pull-based sources_, such as HTTP and OpenTelemetry (OTel), scale easily across Data Prepper containers. _Push-based sources_ rely on load balancing solutions, such as Kubernetes, NGINX, or Docker Swarm, to distribute a workload across Data Prepper containers. | ||
|
||
Unlike push-based sources, pull-based sources use [source coordination](https://opensearch.org/docs/latest/data-prepper/managing-data-prepper/source-coordination/) to achieve scalability and work distribution across multiple containers. Source coordination uses an external store functioning as a lease table, similar to the approach used by the [Kinesis Client Library](https://docs.aws.amazon.com/streams/latest/dev/shared-throughput-kcl-consumers.html). | ||
|
||
kolchfa-aws marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
### Defining work partitions for source coordination | ||
|
||
Data Prepper uses source coordination to distribute work partitions across Data Prepper containers. | ||
|
||
For new Data Prepper sources using source coordination, identifying and delineating work partitions is a fundamental first step. | ||
|
||
Data Prepper defines work partitions differently for various sources. In the `s3` source, each Amazon Simple Storage Service (Amazon S3) object represents a partition. In OpenSearch, an index serves as a partition. Amazon DynamoDB sources have dual partition types: S3 data files for exports and shards for stream processing. | ||
|
||
|
||
### Creating a source-coordination-enabled Data Prepper plugin | ||
|
||
A source coordination plugin consists of two classes: the main plugin class and a configuration class. The configuration class specifies all required user inputs, including data endpoints, authorization details, and performance tuning parameters. All user-required inputs for plugin operations should be specified within this configuration class. | ||
|
||
For a practical starting point, refer to the [sample source code](https://github.com/graytaylor0/data-prepper/blob/SourceCoordinationSampleSource/data-prepper-plugins/sample-source-coordination-source/src/main/java/SampleSource.java) in the Data Prepper repository. | ||
|
||
This example demonstrates a basic configuration for a [hypothetical database source](https://github.com/graytaylor0/data-prepper/blob/SourceCoordinationSampleSource/data-prepper-plugins/sample-source-coordination-source/src/main/java/SampleSourceConfig.java), requiring only `database_name`, `username`, and `password`. The plugin name and configuration class are defined in the `@DataPrepperPlugin` annotation. | ||
|
||
The `pipeline.yaml` file for running this source in Data Prepper would be structured as follows: | ||
|
||
```yaml | ||
version: 2 | ||
sample-source-pipeline: | ||
source: | ||
sample_source: | ||
database_name: "my-database" | ||
username: 'my-username' | ||
password: 'my-password' | ||
sink: | ||
- stdout: | ||
``` | ||
|
||
### Using the source coordination APIs | ||
|
||
The [source coordination interface](https://github.com/opensearch-project/data-prepper/blob/main/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/coordinator/enhanced/EnhancedSourceCoordinator.java) defines the methods available for interacting with the [source coordination store](https://github.com/opensearch-project/data-prepper/blob/main/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/SourceCoordinationStore.java). | ||
|
||
These methods support managing partition CRUD operations and getting the next available partition using `acquireAvailablePartition(String partitionType)`. A common source coordination pattern assigns a "leader" Data Prepper container for partition discovery and creation. This is done by initializing a "leader partition" at startup and using `acquireAvailablePartition(LeaderPartition.PARTITION_TYPE)` to assign partition management responsibilities. | ||
|
||
The following code snippet shows a basic source coordination workflow, using a hypothetical database in which each partition represents an individual database file. See the [full code](https://github.com/graytaylor0/data-prepper/blob/6e38dead8e9beca089381519654f329b82524b9d/data-prepper-plugins/sample-source-coordination-source/src/main/java/DatabaseWorker.java#L40) on GitHub. | ||
|
||
```java | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Move this code snippet to follow its introductory paragraph. The code snippet should be at line 76. Then, move the overview/description (lines 77-82) after the code snippet. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will update |
||
public void run() { | ||
|
||
while (!Thread.currentThread().isInterrupted()) { | ||
try { | ||
|
||
// 1 - Check if this node is already the leader. If it is not, then try to acquire leadership in case the leader node has crashed | ||
if (leaderPartition == null) { | ||
final Optional<EnhancedSourcePartition> sourcePartition = sourceCoordinator.acquireAvailablePartition(LeaderPartition.PARTITION_TYPE); | ||
if (sourcePartition.isPresent()) { | ||
LOG.info("Running as a LEADER that will discover new database files and create partitions"); | ||
leaderPartition = (LeaderPartition) sourcePartition.get(); | ||
} | ||
} | ||
|
||
// 2- If this node is the leader, run discovery of new database files and create partitions | ||
if (leaderPartition != null) { | ||
final List<EnhancedSourcePartition<DatabaseFilePartitionProgressState>> databaseFilePartitions = discoverDatabaseFilePartitions(); | ||
LOG.info("Discovered {} new database file partitions", databaseFilePartitions.size()); | ||
|
||
databaseFilePartitions.forEach(databaseFilePartition -> { | ||
sourceCoordinator.createPartition(databaseFilePartition); | ||
}); | ||
|
||
LOG.info("Created {} database file partitions in the source coordination store", databaseFilePartitions.size()); | ||
} | ||
|
||
// 3 - Grab a database file partition, process it by writing to the buffer, and mark that database file partition as completed | ||
final Optional<EnhancedSourcePartition> databaseFilePartition = sourceCoordinator.acquireAvailablePartition(DatabaseFilePartition.PARTITION_TYPE); | ||
|
||
// 4 - If it's empty that means there are no more database files to process for now. If it's not empty, the database file is processed and then marked as COMPLETED in the source coordination store | ||
if (databaseFilePartition.isPresent()) { | ||
processDataFile(databaseFilePartition.get().getPartitionKey()); | ||
sourceCoordinator.completePartition(databaseFilePartition.get()); | ||
} | ||
|
||
} catch (final Exception e) { | ||
LOG.error("Received an exception in DatabaseWorker loop, retrying"); | ||
} | ||
} | ||
} | ||
``` | ||
|
||
The key components and code workflow for implementing source coordination in Data Prepper are as follows: | ||
|
||
1. Upon starting Data Prepper, a leader partition is established. See the [code reference](https://github.com/graytaylor0/data-prepper/blob/6e38dead8e9beca089381519654f329b82524b9d/data-prepper-plugins/sample-source-coordination-source/src/main/java/SampleSource.java#L41)). The single leader partition is assigned to the Data Prepper node that successfully calls `acquireAvailablePartition(LeaderPartition.PARTITION_TYPE)`, assigning it the task of identifying new database files. | ||
2. When a Data Prepper node owns the leader partition, it queries the hypothetical database and creates new partitions in the source coordination store, enabling all nodes running this source to access these database file partitions. | ||
3. A database file partition is acquired for processing. In cases where no partitions need processing, an empty `Optional` is returned. | ||
4. The database file undergoes processing, with its records written into the Data Prepper buffer as individual `Events`. Once all records have been written to the buffer, the source coordination store marks the database file partition as `COMPLETED`, ensuring that it is not processed again. | ||
|
||
### Running and testing Data Prepper using source coordination | ||
|
||
Before creating a new plugin, you must set up and run Data Prepper locally. The following steps guide you through configuring Data Prepper to stream documents from MongoDB to OpenSearch using source coordination. While this example uses a single Data Prepper instance, the source coordination allows for scalability when running multiple instances with identical pipeline configurations and shared source coordination store settings defined in `data-prepper-config.yaml`. | ||
|
||
#### Step 1: Set up Data Prepper for local development | ||
|
||
The [OpenSearch Data Prepper Developer Guide](https://github.com/opensearch-project/data-prepper/blob/main/docs/developer_guide.md) provides a complete overview of running Data Prepper in various environments. | ||
|
||
Creating a new source plugin requires cloning the Data Prepper repository and building it from source using the following commands: | ||
graytaylor0 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
natebower marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
- Clone the Data Prepper repository: | ||
``` | ||
git clone https://github.com/opensearch-project/data-prepper.git | ||
``` | ||
|
||
- Build Data Prepper from source: | ||
|
||
``` | ||
./gradlew assemble | ||
``` | ||
|
||
#### Step 2: Set up MongoDB locally | ||
|
||
First, install and configure MongoDB using the [MongoDB installation guide](https://www.mongodb.com/docs/manual/installation/). Before running MongoDB, enable [MongoDB change streams](https://www.mongodb.com/docs/manual/changeStreams/) by following the instructions in [Convert a Standalone Self-Managed mongod to a Replica Set](https://www.mongodb.com/docs/manual/tutorial/convert-standalone-to-replica-set/). | ||
Check failure on line 142 in _posts/2024-09-24-creating-a-new-database-integration-with-data-prepper.markdown GitHub Actions / style-job
|
||
|
||
Next, launch the MongoDB shell by running `mongosh`, and then create a new user and password within the shell using the following syntax. The username and password will later be required by the Data Prepper `pipeline.yaml` file. See [Create User Documentation](https://www.mongodb.com/docs/manual/reference/method/db.createUser/) for more information about MongoDB user creation. | ||
|
||
natebower marked this conversation as resolved.
Show resolved
Hide resolved
|
||
``` | ||
use admin | ||
db.createUser({"user": "dbuser","pwd": "admin1234","roles": []}); | ||
``` | ||
|
||
Then, create a new database named `demo`: | ||
Check failure on line 151 in _posts/2024-09-24-creating-a-new-database-integration-with-data-prepper.markdown GitHub Actions / style-job
|
||
|
||
``` | ||
use demo | ||
``` | ||
|
||
Next, create a new MongoDB collection named `demo_collection` in your `demo` database with this syntax: | ||
|
||
``` | ||
db.createCollection("demo_collection") | ||
``` | ||
|
||
Finally, add sample records to the collection using the following syntax. These records are processed during the MongoDB pipeline's export phase: | ||
|
||
``` | ||
db.demo_collection.insertOne({"key-one": "value-one"}) | ||
db.demo_collection.insertOne({"key-two": "value-two"}) | ||
db.demo_collection.insertOne({"key-three": "value-three"}) | ||
``` | ||
|
||
#### Step 3: Set up OpenSearch locally | ||
|
||
To run OpenSearch locally, follow the steps in the [Installation quickstart](https://opensearch.org/docs/latest/getting-started/quickstart/). | ||
|
||
#### Step 4: Create an Amazon S3 bucket | ||
Follow the steps in [Create a new S3 bucket](https://docs.aws.amazon.com/AmazonS3/latest/userguide/creating-bucket.html). You can skip this step if you have an existing bucket. This S3 bucket enables parallel processing and writing to OpenSearch across multiple Data Prepper containers in a multi-node setup, given that only one node can read from MongoDB change streams at a time. | ||
|
||
natebower marked this conversation as resolved.
Show resolved
Hide resolved
|
||
#### Step 5: Get AWS credentials for DynamoDB and S3 access | ||
|
||
Set up an AWS role with the following policy permissions to enable Data Prepper to interact with the DynamoDB source coordination store and the S3 bucket from step 4. Make sure to replace `MONGODB_BUCKET`, `REGION`, and `AWS_ACCOUNT_ID` with your unique values. | ||
|
||
```json | ||
{ | ||
"Version": "2012-10-17", | ||
"Statement": [ | ||
{ | ||
"Sid": "s3Access", | ||
"Effect": "Allow", | ||
"Action": [ | ||
"s3:PutObject" | ||
], | ||
"Resource": [ "arn:aws:s3:::{{MONGODB_BUCKET}}/*" ] | ||
}, | ||
{ | ||
"Sid": "allowReadingFromS3Buckets", | ||
"Effect": "Allow", | ||
"Action": [ | ||
"s3:GetObject", | ||
"s3:DeleteObject", | ||
"s3:GetBucketLocation", | ||
"s3:ListBucket" | ||
], | ||
"Resource": [ | ||
"arn:aws:s3:::{{MONGODB_BUCKET}}", | ||
"arn:aws:s3:::{{MONGODB_BUCKET}}/*" | ||
] | ||
}, | ||
{ | ||
"Sid": "allowListAllMyBuckets", | ||
"Effect":"Allow", | ||
"Action":"s3:ListAllMyBuckets", | ||
"Resource":"arn:aws:s3:::*" | ||
}, | ||
{ | ||
"Sid": "ReadWriteSourceCoordinationDynamoStore", | ||
"Effect": "Allow", | ||
"Action": [ | ||
"dynamodb:DescribeTimeToLive", | ||
"dynamodb:UpdateTimeToLive", | ||
"dynamodb:DescribeTable", | ||
"dynamodb:CreateTable", | ||
"dynamodb:GetItem", | ||
"dynamodb:DeleteItem", | ||
"dynamodb:PutItem", | ||
"dynamodb:Query" | ||
], | ||
"Resource": [ | ||
"arn:aws:dynamodb:${REGION}:${AWS_ACCOUNT_ID}:table/DataPrepperSourceCoordinationStore", | ||
"arn:aws:dynamodb:${REGION}:${AWS_ACCOUNT_ID}:table/DataPrepperSourceCoordinationStore/index/source-status" | ||
] | ||
} | ||
] | ||
} | ||
``` | ||
|
||
Run the following command, and then enter the `Access Key Id` and `Secret Access Key` associated with the credentials that correspond to the previously defined role: | ||
|
||
``` | ||
aws configure | ||
``` | ||
|
||
|
||
Then, set the following environment variables: | ||
Check failure on line 243 in _posts/2024-09-24-creating-a-new-database-integration-with-data-prepper.markdown GitHub Actions / style-job
|
||
|
||
``` | ||
export AWS_REGION="{{REGION}}" | ||
export SOURCE_COORDINATION_PIPELINE_IDENTIFIER="test-mongodb" | ||
``` | ||
|
||
The `SOURCE_COORDINATION_PIPELINE_IDENTIFIER` must correspond to the `partition_prefix` that you will define in the `data-prepper-config.yaml` file in step 6. | ||
|
||
#### Step 6: Create the data-prepper-config.yaml file | ||
|
||
Configure the source coordination store for Data Prepper using the `data-prepper-config.yaml` file. Currently, this store exclusively supports DynamoDB. | ||
|
||
In the `data-prepper/release/archives/linux/build/install/opensearch-data-prepper-$VERSION-linux-x64/config/` directory, create a file named `data-prepper-config.yaml`. Insert the following content, replacing `REGION` with your desired DynamoDB table region and `ROLE_ARN_FROM_STEP_5` with the appropriate role Amazon Resource Name (ARN): | ||
|
||
```yaml | ||
ssl: false | ||
source_coordination: | ||
partition_prefix: "test-mongodb" | ||
store: | ||
dynamodb: | ||
sts_role_arn: "{{ROLE_ARN_FROM_STEP_5}}" | ||
table_name: "DataPrepperSourceCoordinationStore" | ||
region: "{{REGION}}" | ||
skip_table_creation: false | ||
``` | ||
|
||
The `skip_table_creation` parameter is set to `false`, instructing Data Prepper to create the table on startup if it is missing. For subsequent runs, you can set this flag to `true` to accelerate startup speed. | ||
|
||
The `partition_prefix` enables soft resets of the pipeline in the source coordination store. When testing a new source plugin, incrementing this prefix (for example, `test-mongodb-1`, `test-mongodb-2`) ensures that Data Prepper ignores DynamoDB items from the previous test runs. | ||
|
||
#### Step 7: Create the `pipeline.yaml` file | ||
|
||
In the `data-prepper/release/archives/linux/build/install/opensearch-data-prepper-$VERSION-linux-x64/pipelines/` directory, create a `pipeline.yaml` file containing the following content. Make sure to update `S3_BUCKET_NAME`, `S3_BUCKET_REGION, `ROLE_ARN_FROM_STEP_5`, and your OpenSearch password. | ||
|
||
```yaml | ||
pipeline: | ||
workers: 2 | ||
delay: 0 | ||
buffer: | ||
bounded_blocking: | ||
batch_size: 125000 | ||
buffer_size: 1000000 | ||
source: | ||
mongodb: | ||
host: "localhost" | ||
port: 27017 | ||
acknowledgments: true | ||
s3_bucket: "{{S3_BUCKET_NAME}}" | ||
s3_region: "{{S3_BUCKET_REGION}}" | ||
s3_prefix: "mongodb-opensearch" | ||
insecure: "true" | ||
ssl_insecure_disable_verification: "true" | ||
authentication: | ||
username: "dbuser" | ||
password: "admin1234" | ||
collections: | ||
- collection: "demo.demo_collection" | ||
export: true | ||
stream: true | ||
aws: | ||
sts_role_arn: "{{ROLE_ARN_FROM_STEP_5}}" | ||
sink: | ||
- opensearch: | ||
hosts: [ "http://localhost:9200" ] | ||
index: "mongodb-index" | ||
document_id: "${getMetadata(\"primary_key\")}" | ||
action: "${getMetadata(\"opensearch_action\")}" | ||
document_version: "${getMetadata(\"document_version\")}" | ||
document_version_type: "external" | ||
# Default username | ||
username: "admin" | ||
# Change to your OpenSearch password if needed. For running OpenSearch with Docker Compose, this is set by the environment variable OPENSEARCH_INITIAL_ADMIN_PASSWORD | ||
password: "OpenSearchMongoDB1#" | ||
flush_timeout: -1 | ||
``` | ||
|
||
#### Step 8: Run the pipeline | ||
|
||
With AWS credentials configured and both MongoDB and OpenSearch running on your local machine, you can launch the pipeline. | ||
|
||
First, navigate to the directory containing the Data Prepper binaries: | ||
|
||
``` | ||
cd data-prepper/release/archives/linux/build/install/opensearch-data-prepper-$VERSION-linux-x64 | ||
``` | ||
|
||
Then, start Data Prepper using the following command: | ||
Check failure on line 330 in _posts/2024-09-24-creating-a-new-database-integration-with-data-prepper.markdown GitHub Actions / style-job
|
||
|
||
``` | ||
bin/data-prepper | ||
``` | ||
|
||
#### Step 9: Observe the export documents in OpenSearch | ||
|
||
Wait for the export to complete, which may take a minute or so. Once Data Prepper displays a log, for example, `org.opensearch.dataprepper.plugins.source.s3.ScanObjectWorker - Received all acknowledgments for folder partition`, open `http://localhost:5601` to access OpenSearch Dashboards. | ||
|
||
natebower marked this conversation as resolved.
Show resolved
Hide resolved
|
||
Go to the **Dev Tools** application and enter `GET mongodb-index/_search` in the console editor to retrieve the MongoDB documents you created in step 2. | ||
|
||
kolchfa-aws marked this conversation as resolved.
Show resolved
Hide resolved
|
||
#### Step 10: Add sample documents to MongoDB | ||
Add sample documents to MongoDB using the following command: | ||
|
||
``` | ||
db.demo_collection.insertOne({"key-four": "value-four"}) | ||
db.demo_collection.insertOne({"key-five": "value-five"}) | ||
db.demo_collection.insertOne({"key-six": "value-six"}) | ||
``` | ||
|
||
The MongoDB source in Data Prepper will now extract these documents from the MongoDB change streams. | ||
|
||
#### Step 11: Observe the stream documents in OpenSearch | ||
|
||
natebower marked this conversation as resolved.
Show resolved
Hide resolved
|
||
As soon as Data Prepper generates another log, for example, `org.opensearch.dataprepper.plugins.source.s3.ScanObjectWorker - Received all acknowledgments for folder partition`, return to **Dev Tools** and run another search on the index using `GET mongodb-index/_search`. | ||
|
||
#### Step 12: Clean up resources | ||
|
||
As you complete this process, make sure that you delete the DynamoDB source coordination store and S3 bucket as well as stop the Data Prepper, MongoDB, and OpenSearch instances. | ||
|
||
### Summary | ||
|
||
natebower marked this conversation as resolved.
Show resolved
Hide resolved
|
||
We hope this guide deepens your knowledge of the Data Prepper architecture and the process of creating scalable plugins with source coordination. For any suggestions regarding new database plugins, assistance with plugin creation, or general Data Prepper questions, [create a new discussion](https://github.com/opensearch-project/data-prepper/discussions) in the Data Prepper repository. The Data Prepper community and maintenance team are committed to supporting your efforts. | ||
|
||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add the following meta:
meta_keywords: Data Prepper, database inintegration, OpenSearch data collector, data pipeline, mongodb
meta_description: Use this step-by-step guide to set up and test a Data Prepper pipeline. Create a new database integration that streams data from MongoDB to OpenSearch using source coordination.