Problem statement:
- Problem: This project aims to provide a solution for handling sudden spikes in workload in Apache Flink applications. This is an important problem because Apache Flink is a distributed stream processing framework that is designed to handle large amounts of data in real-time. However, when the input rate exceeds the processing capacity, the system becomes overwhelmed, leading to increased latency, decreased performance, or even failures.
- Technique: The cloud bursting technique is an alternative to back-pressure, which is a mechanism that slows down the input rate when the system is overwhelmed. Instead, Cloud Bursting allows the excess workload to be offloaded to a cloud provider, such as AWS, where it can be processed in parallel. This can help ensure that the system continues to operate at a high level of performance, even when there is a sudden spike in workload.
Source:
Nexmark is a benchmarking suite for queries over continuous data streams. This project is inspired by the NEXMark research paper and Apache Beam Nexmark.
These are multiple queries over a three entities model representing on online auction system:
- Person represents a person submitting an item for auction and/or making a bid on an auction.
- Auction represents an item under auction.
- Bid represents a bid for an item under auction.
Basic Architecture:
Demo:
Please view the recording here.
Run instructions:
-
Environment -
- Java 8
- Flink version - 1.16.0
- IntelliJ
-
Using Intellij -
- Build your Maven project using the command
mvn clean package
, which will generate a JAR file for the Flink pipeline, or create a Maven run configuration inIntellij
to run the commandclean package
- Install all dependencies using Maven.
- Create another application run configuration with
main
class asoperator.FlinkPipeline
. Select theAdd dependencies with 'provided' scope to classpath
option. Set the working directory asteam-6/Code/deliverable-2
. - Build the project using the application run configuration to generate the jar files in the
target
folder.
- Build your Maven project using the command
-
Using Command Line (MacOS) -
- Install Maven
brew install maven
- Create ~/.mavenrc file and add line
export JAVA_HOME=<path>
. You can get the path using/usr/libexec/java\_home
. We have built the project withjava8
as the version, so that's the version we can assure you. - Go to
Code/deliverable-2
directory and runmvn clean package
- This should create the
flink-java-project-0.1.jar
file under thetarget
folder.
- Install Maven
-
To run on Flink web UI -
- Start the Flink cluster ( ./start-cluster.sh )
- Go to http://localhost:8081 in your browser
- Upload the jar file to the Flink web UI under the
submit a job
tab
-
To switch between Java and Python AWS Lambda Functions -
- In the file
Code/deliverable-2/src/main/java/lambda/LambdaInvokerUsingURLPayload.java
, updateline:18
withconfigs/JavaConfig.json
for invoking AWS Lambda written in Java orconfigs/PythonConfig.json
for invoking AWS Lambda written in Java. - In the file
Code/deliverable-2/src/main/java/operator/InvokeOperator.java
, Changeline:131
togetResultFromJsonPython
while usingconfigs/PythonConfig.json
or togetResultFromJsonJava
while usingconfigs/JavaConfig.json
.
- In the file
-
To test for input rates greater than the experiments documented -
- In the file
Code/deliverable-2/src/main/java/operator/InvokeOperator.java
, commentline:110
toline:119
and uncomment lineline:120
to use a standard input threshhold of0.4
in the logic.
- In the file
Source Configurations:
To change configurations at the source, navigate to the file NexmarkConfiguration.java
under team-6/Code/deliverable-2
.
For instance, for changing the input rate
change the configuration variable firstEventRate
and nextEventRate
and change the value of variable ratePeriodSec
for changing the duration of the spike.
Lambda Deployments:
Although the lambda functions are already deployed with an exposed API gateway, the following are the steps to deploy the lambda functions in your account.
-
Setup IAM:
- Go to AWS account > IAM
- Click on
Add user
and add a new user withProgrammatic Access
- Add permissions for
AWSLambdaFullAccess
- Save the
Access ID
andSecret Access Key
for the new user.
-
Python:
- Run
npm i -g serverless
in terminal. - Navigate to
team-6/Code/PythonCloudBurstOperator/cloud-burst-operator-python
- Run
serverless config credentials --provider aws --key <access-id> --secret <secret-access-key>
- Run
serverless deploy
- Run
-
Java:
- Install
AWS SDK
tools for Java Intellij. - Ensure
Docker
is installed and is properly running. - Navigate to
team-6/Code/CloudBurstOperator
- Right-click on the project root in Intellij and click on
Serverless Application sync
option.
- Install
Experimental Setup:
-
Pipelines:
Apart from the Figure 1 solution pipeline, for comparison, we also ran our experiments on 2 baseline pipelines.
-
Measurements:
-
Parallelism:
- The input rate is set to 500 events/sec and 100000 events are generated from the source.
- The job parallelism is set to 1,2,4,8 and the completion is measured.
- Configuration values kept constant:
- Input Rate = 1000 events/s
- Duration of Experiment = 5mins
- Metric Call Latency = 800 ms
- Max Records Sent = 100000
-
Backpressure duration:
- The duration of 5 mins.
- Input rates - 100, 250, 500, 750, 1000events/sec.
- Configuration values kept constant:
- Duration of Experiment = 5mins
- Parallelism = 1
- Max Records Sent = unlimited,
- Batch size change-every 8 secs
- Metric Call Latency = 1000 ms
-
Metric calculation interval:
- The input rate is set to 500 events/sec and 100000 events are generated from the source.
- Metric calculation delay values tested - 700ms, 800ms, 1000ms, 1100ms, 120ms.
- The completion time and the accumulated backpressure is tested
- We found 800ms to be the best with our experiments.
- Configuration values kept constant:
- Input Rate = 1000 events/s
- Duration of Experiment = 5mins
- Parallelism = 1
- Max Records Sent = unlimited
- Batch size change-every 8 secs
-
Lambda:
- The pipeline is run from the source to the invoker operator, removing all the other operators.
- Batch size:
- Java: 1-10; step size=1
- Python: 1-10; step size=1 and 20-100; step size=10
- 5 experiments are run on each batch size
- Latency is measured for each batch and throughput is measured for all the data.
-
Latency:
- Add the current timestamp when the event enters the pipeline. Before writing to sink - subtract the current timestamp with a timestamp in the event to get the latency for that record.
- Performed on all 3 pipelines - vanilla, modified pipeline (without lambda), proposed solution
- Input rates - 100, 250,500,750,1000,1250,1500 - tested for 5 minutes
- 3 experiments are run on each input rate.
- For the proposed solution, configuration values:
- MEASURE_INTERVAL_MS = 800ms
- INPUT_THRESHOLD = 500
- BUSY_TIME_THRESHOLD = 0.06
-
Experiments Result Plots:
- [1] Refer to
experiments/latency
for sample data collection files. For complete data collected during latency experiments, go here - [2] Refer to
experiments/metricdelay-backpressure-parallelism
for sample data collection files. - [3] Refer to
experiments/invoker/java
for sample data collection files. - [4] Refer to
experiments/invoker/python
for sample data collection files. - Refer to
assets/
for image files of all the plots for a closer view.
Future work:
- Handling stateful operations/operators.
- Security in AWS Lambda
- Fault tolerance - Currently project relies on Flink and AWS out-of-the-box fault tolerance, but the plan is to incorporate systems like Kafka in the pipeline to ensure no packets are lost between Flink and AWS Lambda.