diff --git a/cdap-app-fabric/pom.xml b/cdap-app-fabric/pom.xml
index e9793a1f6c7f..b43fee7793c3 100644
--- a/cdap-app-fabric/pom.xml
+++ b/cdap-app-fabric/pom.xml
@@ -1,6 +1,6 @@
+
+ feature.event.reader.enabled
+ false
+ false
+
+ Enable event subscribing in CDAP
+
+
+
artifact.fetcher.bind.port
diff --git a/cdap-features/src/main/java/io/cdap/cdap/features/Feature.java b/cdap-features/src/main/java/io/cdap/cdap/features/Feature.java
index b81ceb506d6d..eb2377979a20 100644
--- a/cdap-features/src/main/java/io/cdap/cdap/features/Feature.java
+++ b/cdap-features/src/main/java/io/cdap/cdap/features/Feature.java
@@ -28,6 +28,7 @@
public enum Feature {
REPLICATION_TRANSFORMATIONS("6.6.0"),
EVENT_PUBLISH("6.7.0", false),
+ EVENT_READER("6.10.0", false),
PIPELINE_COMPOSITE_TRIGGERS("6.8.0"),
PUSHDOWN_TRANSFORMATION_GROUPBY("6.7.0"),
PUSHDOWN_TRANSFORMATION_DEDUPLICATE("6.7.0"),
@@ -72,7 +73,9 @@ public boolean isEnabled(FeatureFlagsProvider featureFlagsProvider) {
}
/**
- * @return string that identifies the feature flag.
+ * Retrieve the string that identifies the feature flag.
+ *
+ * @return feature flag string
*/
public String getFeatureFlagString() {
return featureFlagString;
diff --git a/cdap-standalone/src/main/java/io/cdap/cdap/StandaloneMain.java b/cdap-standalone/src/main/java/io/cdap/cdap/StandaloneMain.java
index 35bdf6593b91..223a2a799d38 100644
--- a/cdap-standalone/src/main/java/io/cdap/cdap/StandaloneMain.java
+++ b/cdap-standalone/src/main/java/io/cdap/cdap/StandaloneMain.java
@@ -70,6 +70,7 @@
import io.cdap.cdap.internal.app.services.AppFabricServer;
import io.cdap.cdap.internal.app.worker.sidecar.ArtifactLocalizerService;
import io.cdap.cdap.internal.events.EventPublishManager;
+import io.cdap.cdap.internal.events.EventSubscriberManager;
import io.cdap.cdap.logging.LoggingUtil;
import io.cdap.cdap.logging.appender.LogAppenderInitializer;
import io.cdap.cdap.logging.framework.LogPipelineLoader;
@@ -155,6 +156,7 @@ public class StandaloneMain {
private final RuntimeServer runtimeServer;
private final ArtifactLocalizerService artifactLocalizerService;
private final EventPublishManager eventPublishManager;
+ private final EventSubscriberManager eventSubscriberManager;
private ExternalAuthenticationServer externalAuthenticationServer;
@@ -187,6 +189,7 @@ private StandaloneMain(List modules, CConfiguration cConf) {
cConf.setInt(Constants.ArtifactLocalizer.PORT, 0);
artifactLocalizerService = injector.getInstance(ArtifactLocalizerService.class);
eventPublishManager = injector.getInstance(EventPublishManager.class);
+ eventSubscriberManager = injector.getInstance(EventSubscriberManager.class);
if (cConf.getBoolean(Constants.Transaction.TX_ENABLED)) {
txService = injector.getInstance(InMemoryTransactionService.class);
@@ -301,6 +304,7 @@ public void startUp() throws Exception {
secureStoreService.startAndWait();
supportBundleInternalService.startAndWait();
eventPublishManager.startAndWait();
+ eventSubscriberManager.startAndWait();
String protocol = sslEnabled ? "https" : "http";
int dashboardPort = sslEnabled
@@ -339,6 +343,7 @@ public void shutDown() {
previewHttpServer.stopAndWait();
artifactLocalizerService.stopAndWait();
eventPublishManager.stopAndWait();
+ eventSubscriberManager.stopAndWait();
// app fabric will also stop all programs
appFabricServer.stopAndWait();
runtimeServer.stopAndWait();