diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/app/guice/AppFabricServiceRuntimeModule.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/app/guice/AppFabricServiceRuntimeModule.java index b201bd84a6f0..70e6e3d5d24b 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/app/guice/AppFabricServiceRuntimeModule.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/app/guice/AppFabricServiceRuntimeModule.java @@ -120,8 +120,8 @@ import io.cdap.cdap.internal.credential.handler.CredentialProviderHttpHandlerInternal; import io.cdap.cdap.internal.events.EventPublishManager; import io.cdap.cdap.internal.events.EventPublisher; -import io.cdap.cdap.internal.events.EventReaderHandler; -import io.cdap.cdap.internal.events.EventReaderHandlerManager; +import io.cdap.cdap.internal.events.EventSubscriber; +import io.cdap.cdap.internal.events.EventSubscriberManager; import io.cdap.cdap.internal.events.EventWriterExtensionProvider; import io.cdap.cdap.internal.events.EventWriterProvider; import io.cdap.cdap.internal.events.MetricsProvider; @@ -400,9 +400,9 @@ protected void configure() { Multibinder.newSetBinder(binder(), EventPublisher.class); eventPublishersBinder.addBinding().to(ProgramStatusEventPublisher.class); bind(EventPublishManager.class).in(Scopes.SINGLETON); - Multibinder eventHandlerBinder = - Multibinder.newSetBinder(binder(), EventReaderHandler.class); - bind(EventReaderHandlerManager.class).in(Scopes.SINGLETON); + Multibinder eventSubscribersBinder = + Multibinder.newSetBinder(binder(), EventSubscriber.class); + bind(EventSubscriberManager.class).in(Scopes.SINGLETON); bind(EventWriterProvider.class).to(EventWriterExtensionProvider.class); bind(MetricsProvider.class).to(SparkProgramStatusMetricsProvider.class); diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/events/EventReaderHandler.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/events/EventSubscriber.java similarity index 81% rename from cdap-app-fabric/src/main/java/io/cdap/cdap/internal/events/EventReaderHandler.java rename to cdap-app-fabric/src/main/java/io/cdap/cdap/internal/events/EventSubscriber.java index 5a190a5b7f73..9f54c3accb87 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/events/EventReaderHandler.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/events/EventSubscriber.java @@ -19,10 +19,10 @@ import com.google.common.util.concurrent.AbstractScheduledService; /** - * Abstract class for an event reader Handler. - * Each EventReaderHandler will receive and process incoming events. + * Abstract class for an event subscriber. + * Each EventSubscriber will receive and process incoming events. */ -public abstract class EventReaderHandler extends AbstractScheduledService { +public abstract class EventSubscriber extends AbstractScheduledService { /** * Initialize this handler. diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/events/EventReaderHandlerManager.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/events/EventSubscriberManager.java similarity index 58% rename from cdap-app-fabric/src/main/java/io/cdap/cdap/internal/events/EventReaderHandlerManager.java rename to cdap-app-fabric/src/main/java/io/cdap/cdap/internal/events/EventSubscriberManager.java index 91a9651e5817..3d7cac2d63e2 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/events/EventReaderHandlerManager.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/events/EventSubscriberManager.java @@ -26,19 +26,19 @@ import org.slf4j.LoggerFactory; /** - * EventReaderHandlerManager is responsible for starting all the event reader handler threads. + * EventSubscriberManager is responsible for starting all the event subscriber threads. */ -public class EventReaderHandlerManager extends AbstractIdleService { +public class EventSubscriberManager extends AbstractIdleService { - private static final Logger LOG = LoggerFactory.getLogger(EventReaderHandlerManager.class); + private static final Logger LOG = LoggerFactory.getLogger(EventSubscriberManager.class); private final boolean enabled; - private final Set eventReaderHandlers; + private final Set eventSubscribers; @Inject - EventReaderHandlerManager(CConfiguration cConf, Set eventReaderHandlers) { + EventSubscriberManager(CConfiguration cConf, Set eventSubscribers) { this.enabled = Feature.EVENT_READER.isEnabled(new DefaultFeatureFlagsProvider(cConf)); - this.eventReaderHandlers = eventReaderHandlers; + this.eventSubscribers = eventSubscribers; } @Override @@ -46,16 +46,16 @@ protected void startUp() throws Exception { if (!enabled) { return; // If not enabled, don't start } - eventReaderHandlers.forEach(eventReaderHandler -> { - // Loading the event writers from provider - // Initialize the event publisher with all the event writers provided by provider - if (eventReaderHandler.initialize()) { - eventReaderHandler.startAndWait(); - LOG.info("Successfully initialized eventReaderHandler: {}", - eventReaderHandler.getClass().getSimpleName()); + eventSubscribers.forEach(eventSubscriber -> { + // Loading the event readers from provider + // Initialize the event subscribers with all the event readers provided by provider + if (eventSubscriber.initialize()) { + eventSubscriber.startAndWait(); + LOG.info("Successfully initialized eventSubscriber: {}", + eventSubscriber.getClass().getSimpleName()); } else { - LOG.info("Failed to initialize eventReaderHandler: {}", - eventReaderHandler.getClass().getSimpleName()); + LOG.info("Failed to initialize eventSubscriber: {}", + eventSubscriber.getClass().getSimpleName()); } }); } @@ -65,8 +65,8 @@ protected void shutDown() throws Exception { if (!enabled) { return; // If not enabled, don't shut down } - eventReaderHandlers.forEach(eventReaderHandler -> { - eventReaderHandler.stopAndWait(); + eventSubscribers.forEach(eventSubscriber -> { + eventSubscriber.stopAndWait(); }); } } diff --git a/cdap-standalone/src/main/java/io/cdap/cdap/StandaloneMain.java b/cdap-standalone/src/main/java/io/cdap/cdap/StandaloneMain.java index 01e6ed273b94..d1bb9b22d2e9 100644 --- a/cdap-standalone/src/main/java/io/cdap/cdap/StandaloneMain.java +++ b/cdap-standalone/src/main/java/io/cdap/cdap/StandaloneMain.java @@ -69,7 +69,7 @@ import io.cdap.cdap.internal.app.runtime.monitor.RuntimeServer; import io.cdap.cdap.internal.app.services.AppFabricServer; import io.cdap.cdap.internal.app.worker.sidecar.ArtifactLocalizerService; -import io.cdap.cdap.internal.events.EventReaderHandlerManager; +import io.cdap.cdap.internal.events.EventSubscriberManager; import io.cdap.cdap.internal.events.EventPublishManager; import io.cdap.cdap.logging.LoggingUtil; import io.cdap.cdap.logging.appender.LogAppenderInitializer; @@ -156,7 +156,7 @@ public class StandaloneMain { private final RuntimeServer runtimeServer; private final ArtifactLocalizerService artifactLocalizerService; private final EventPublishManager eventPublishManager; - private final EventReaderHandlerManager eventReaderHandlerManager; + private final EventSubscriberManager eventSubscriberManager; private ExternalAuthenticationServer externalAuthenticationServer; @@ -189,7 +189,7 @@ private StandaloneMain(List modules, CConfiguration cConf) { cConf.setInt(Constants.ArtifactLocalizer.PORT, 0); artifactLocalizerService = injector.getInstance(ArtifactLocalizerService.class); eventPublishManager = injector.getInstance(EventPublishManager.class); - eventReaderHandlerManager = injector.getInstance(EventReaderHandlerManager.class); + eventSubscriberManager = injector.getInstance(EventSubscriberManager.class); if (cConf.getBoolean(Constants.Transaction.TX_ENABLED)) { txService = injector.getInstance(InMemoryTransactionService.class); @@ -304,7 +304,7 @@ public void startUp() throws Exception { secureStoreService.startAndWait(); supportBundleInternalService.startAndWait(); eventPublishManager.startAndWait(); - eventReaderHandlerManager.startAndWait(); + eventSubscriberManager.startAndWait(); String protocol = sslEnabled ? "https" : "http"; int dashboardPort = sslEnabled @@ -343,7 +343,7 @@ public void shutDown() { previewHttpServer.stopAndWait(); artifactLocalizerService.stopAndWait(); eventPublishManager.stopAndWait(); - eventReaderHandlerManager.stopAndWait(); + eventSubscriberManager.stopAndWait(); // app fabric will also stop all programs appFabricServer.stopAndWait(); runtimeServer.stopAndWait();