-
Notifications
You must be signed in to change notification settings - Fork 125
Home
Pulsar is an open source realtime analytics platform. It can be used to collect and process user behavior events in realtime to provide key insights and enable systems to react to user activities within seconds. Pulsar provides realtime sessionization, multi-dimensional metrics aggregation over time windows and custom stream creation through data enrichment, mutation and filtering using a SQL-like event processing language. Pulsar scales to millions of events per second with high availability. It can be easily integrated with metrics stores like Cassandra and Druid.
eBay provides a platform with millions of buyers and sellers conducting commerce transactions. We perform analysis of the user interactions and behaviors to help improve and optimize eBay's experience to our end users. Over the past years batch-oriented data platforms like Hadoop have been used successfully for user behavior analytics. More recently, we have newer set of use cases that demand near real-time (within seconds) collection and processing of vast amount of events in order to derive actionable insights and generate signals for immediate actions. Example of use cases include:
- Real-time reporting and dashboard
- Business activity monitoring
- Personalization
- Marketing and advertising
- Fraud and bot detection
We identified a set of systemic qualities that are important to support the above large scale real-time analytics use cases:
- Scalability - Scale to millions of events per second
- Latency - Sub-second processing and delivery of events
- Availability - No cluster downtime during software upgrade, stream processing rules and topology changes
- Flexibility - Easy to define and change processing logic, event routing and pipeline topology.
- Productivity - Support for Complex event processing (CEP) and 4GL language for data filtering, mutation, aggregation and stateful processing
- Data Accuracy - 99.9% data delivery
- Cloud Deployable - Can distribute node across data centers on standard cloud infrastructure
Given our unique set of requirements we decided to develop our own distributed CEP framework. Pulsar CEP framework provides a Java based CEP framework and tooling to build, deploy and management CEP applications in a cloud environment. On top of this CEP framework we implemented the Pulsar real-time analytics data pipeline. Pulsar CEP provides the following capabilities:
- Declaratively define processing logic in SQL
- Hot deploy SQL without restarting applications
- Annotation plugin framework to extend SQL functionality
- Pipeline flow routing using SQL
- Dynamically create stream affinity using SQL
- CEP capabilities through Esper integration.
- Clustering with elastic scaling
- Cloud deployment
- Publish-Subscribe messaging with both push and pull models
Pulsar's real time analytics data pipeline consists of loosely coupled stages. Each stage is functionally separate from its neighboring stage. Events are transported asynchronously across a pipeline of loosely coupled stages. This provides a better scaling model with higher reliability and scalability. Each stage can be built and operated independent of the neighboring stages and can adopt its own deployment and release cycles. The topology can be changed without restarting the cluster.
Here are some of the processing we perform in our real-time analytics pipeline:
- Enrichment - Decorate events with additional attributes. For example, we can add Geo location information to user interaction events based on the IP address range
- Filtering and Mutation - Filter out irrelevant attributes, events or transform content of a event
- Aggregation - Count number of events or add up metrics along a set of dimensions over a time window
- Stateful Processing - Group multiple events into one or generate a new event based on a sequence of events and processing rules. An example is our sessionization stage, which group sequence of user interaction events into web sessions while keeping track of user session based metrics
Pulsar pipeline can be integrated with different systems. For example, summarized events can be sent to a persistent metric stores to support ad-hoc queries. Events can also be sent to some form of visualization dashboard for real-time reportings or backend systems that can react to event signals.
In Pulsar, our approach is to treat the event stream like a database table. We apply SQL queries and annotations on live streams to extract summary data as events are moving.
Here are a few examples how common processing can be expressed in Pulsar:
Event Filtering and Routing
insert into SUBSTREAM select D1, D2, D3, D4
from RAWSTREAM where D1 = 2045573 or D2 = 2047936 or D3 = 2051457 or D4 = 2053742 // filtering
@PublishOn(topics=“TOPIC1”) // publish sub stream at TOPIC1
@OutputTo(“OutboundMessageChannel)
@ClusterAffinityTag(column = D1) // partition key based on column D1
select * FROM SUBSTREAM;
Aggregate Computation
// create 10 second time window context
create context MCContext start @now end pattern [timer:interval(10)];
// aggregate event count along dimension D1 and D2 with 10 second time window
context MCContext insert into AGGREGATE select count(*) as METRIC1, D1, D2 FROM RAWSTREAM group by D1,D2 output snapshot when terminated;
select * from AGGREGATE;
TopN Computation
// create 60 second time window context
create context MCContext start @now end pattern [timer:interval(60)];
// sort to find top 10 event counts along dimensions D1, D2 and D3
// with 60 second time window
context MCContext insert into TOPITEMS select count(*) as totalCount, D1, D2, D3 from RawEventStream group by D1, D2, D3 order by count(*) limit 10
select * from TOPITEMS;
Pulsar CEP processing logic is deployed on many nodes (CEP cells) across data centers. Each CEP cell is configured with an inbound channel, outbound channel and processing logic. Events are typically partitioned based on a key such as user id. All events with the same partitioned key will be routed to the same CEP cell. In each stage, events can be partitioned based on a different key, enabling aggregation across different dimensions. To scale to more events, we just need to add more CEP cells into the pipeline. Using Zookeeper, Pulsar CEP will automatically detect the new cell and rebalance the event traffic. Similarly, if a CEP cell goes down, Pulsar CEP will reroute traffic to other nodes.
Pulsar CEP supports multiple messaging models to move events between stages. For low delivery latency, we recommend the push model when events are sent from a producer to a consumer with at most once delivery semantics. If a consumer goes down or cannot keep up with the event traffic, it can signal the producer to temporaily push the event into a persistent queue like Kafka and subsequently the events can be replayed. Pulsar CEP can also be configured to support pull model with at least once delivery semantics. In this case, all events are written into Kafka and a consumer will pull from Kafka.
Pulsar has been deployed in production at eBay and is processing all user behavior events. We have open sourced Pulsar and plan to continue to develop Pulsar in the open. We welcome everyone to contribute to Pulsar. Here are some features we are working on and would love to get your help and suggestions:
- Real-time reporting API and dashboard
- Integration with Druid or other metrics stores
- Persistent session store integration
- Support long rolling window aggregation
[Get started](Get Started) to explore the Pulsar pipeline and start contributing.
Please feel free to reach out to [email protected] for any Pulsar related questions.
- Sharad Murthy
- Tony Ng
- Bhaven Avalani
- Xinglang Wang
- Ken Wang
- Yanger Zhang
- Kevin Fang
- Anand Gangadharan
- Nemo Chen
- Lisa Li
- Xiaoju Wu
- Warren Jin
- Robin Qiao
- Bopher Lu
- Web Site: http://gopulsar.io
- Google Group: Pulsar Google Group
- Developer Mail: [email protected]
- White Paper: Pulsar White Paper