From b412a2644543e8473953abd415a4d8389c3c3fc3 Mon Sep 17 00:00:00 2001 From: Peter Chacko Date: Fri, 7 Jul 2023 23:29:26 +0000 Subject: [PATCH] Event Subscriber Infrastructure --- cdap-app-fabric/pom.xml | 7 +- .../guice/AppFabricServiceRuntimeModule.java | 7 +- .../events/DefaultEventReaderContext.java | 47 ++++++++++ .../internal/events/EventReaderProvider.java | 33 +++++++ .../cdap/internal/events/EventSubscriber.java | 33 +++++++ .../events/EventSubscriberManager.java | 72 +++++++++++++++ .../events/dummy/DummyEventReader.java | 87 +++++++++++++++++++ .../DummyEventReaderExtensionProvider.java | 45 ++++++++++ .../io/cdap/cdap/common/conf/Constants.java | 2 + .../src/main/resources/cdap-default.xml | 10 +++ .../java/io/cdap/cdap/features/Feature.java | 5 +- .../java/io/cdap/cdap/StandaloneMain.java | 5 ++ 12 files changed, 350 insertions(+), 3 deletions(-) create mode 100644 cdap-app-fabric/src/main/java/io/cdap/cdap/internal/events/DefaultEventReaderContext.java create mode 100644 cdap-app-fabric/src/main/java/io/cdap/cdap/internal/events/EventReaderProvider.java create mode 100644 cdap-app-fabric/src/main/java/io/cdap/cdap/internal/events/EventSubscriber.java create mode 100644 cdap-app-fabric/src/main/java/io/cdap/cdap/internal/events/EventSubscriberManager.java create mode 100644 cdap-app-fabric/src/main/java/io/cdap/cdap/internal/events/dummy/DummyEventReader.java create mode 100644 cdap-app-fabric/src/main/java/io/cdap/cdap/internal/events/dummy/DummyEventReaderExtensionProvider.java diff --git a/cdap-app-fabric/pom.xml b/cdap-app-fabric/pom.xml index e9793a1f6c7f..b43fee7793c3 100644 --- a/cdap-app-fabric/pom.xml +++ b/cdap-app-fabric/pom.xml @@ -1,6 +1,6 @@ + + feature.event.reader.enabled + false + false + + Enable event subscribing in CDAP + + + artifact.fetcher.bind.port diff --git a/cdap-features/src/main/java/io/cdap/cdap/features/Feature.java b/cdap-features/src/main/java/io/cdap/cdap/features/Feature.java index e604c573db11..ba0e38d98387 100644 --- a/cdap-features/src/main/java/io/cdap/cdap/features/Feature.java +++ b/cdap-features/src/main/java/io/cdap/cdap/features/Feature.java @@ -28,6 +28,7 @@ public enum Feature { REPLICATION_TRANSFORMATIONS("6.6.0"), EVENT_PUBLISH("6.7.0", false), + EVENT_READER("6.10.0", false), PIPELINE_COMPOSITE_TRIGGERS("6.8.0"), PUSHDOWN_TRANSFORMATION_GROUPBY("6.7.0"), PUSHDOWN_TRANSFORMATION_DEDUPLICATE("6.7.0"), @@ -71,7 +72,9 @@ public boolean isEnabled(FeatureFlagsProvider featureFlagsProvider) { } /** - * @return string that identifies the feature flag. + * Retrieve the string that identifies the feature flag. + * + * @return feature flag string */ public String getFeatureFlagString() { return featureFlagString; diff --git a/cdap-standalone/src/main/java/io/cdap/cdap/StandaloneMain.java b/cdap-standalone/src/main/java/io/cdap/cdap/StandaloneMain.java index ad1f8c8651e6..ee618ea4e978 100644 --- a/cdap-standalone/src/main/java/io/cdap/cdap/StandaloneMain.java +++ b/cdap-standalone/src/main/java/io/cdap/cdap/StandaloneMain.java @@ -70,6 +70,7 @@ import io.cdap.cdap.internal.app.services.AppFabricServer; import io.cdap.cdap.internal.app.worker.sidecar.ArtifactLocalizerService; import io.cdap.cdap.internal.events.EventPublishManager; +import io.cdap.cdap.internal.events.EventSubscriberManager; import io.cdap.cdap.logging.LoggingUtil; import io.cdap.cdap.logging.appender.LogAppenderInitializer; import io.cdap.cdap.logging.framework.LogPipelineLoader; @@ -155,6 +156,7 @@ public class StandaloneMain { private final RuntimeServer runtimeServer; private final ArtifactLocalizerService artifactLocalizerService; private final EventPublishManager eventPublishManager; + private final EventSubscriberManager eventSubscriberManager; private ExternalAuthenticationServer externalAuthenticationServer; @@ -187,6 +189,7 @@ private StandaloneMain(List modules, CConfiguration cConf) { cConf.setInt(Constants.ArtifactLocalizer.PORT, 0); artifactLocalizerService = injector.getInstance(ArtifactLocalizerService.class); eventPublishManager = injector.getInstance(EventPublishManager.class); + eventSubscriberManager = injector.getInstance(EventSubscriberManager.class); if (cConf.getBoolean(Constants.Transaction.TX_ENABLED)) { txService = injector.getInstance(InMemoryTransactionService.class); @@ -301,6 +304,7 @@ public void startUp() throws Exception { secureStoreService.startAndWait(); supportBundleInternalService.startAndWait(); eventPublishManager.startAndWait(); + eventSubscriberManager.startAndWait(); String protocol = sslEnabled ? "https" : "http"; int dashboardPort = sslEnabled @@ -339,6 +343,7 @@ public void shutDown() { previewHttpServer.stopAndWait(); artifactLocalizerService.stopAndWait(); eventPublishManager.stopAndWait(); + eventSubscriberManager.stopAndWait(); // app fabric will also stop all programs appFabricServer.stopAndWait(); runtimeServer.stopAndWait();