From abf8fa2f58c44562a3991751b4a0984a01b93980 Mon Sep 17 00:00:00 2001 From: Peter Chacko Date: Tue, 11 Jul 2023 18:10:48 +0000 Subject: [PATCH] Rename EventHandler to EventReaderHandler --- .../guice/AppFabricServiceRuntimeModule.java | 10 +++---- ...ntHandler.java => EventReaderHandler.java} | 5 ++-- ...er.java => EventReaderHandlerManager.java} | 26 +++++++++---------- .../java/io/cdap/cdap/StandaloneMain.java | 10 +++---- 4 files changed, 26 insertions(+), 25 deletions(-) rename cdap-app-fabric/src/main/java/io/cdap/cdap/internal/events/{EventHandler.java => EventReaderHandler.java} (81%) rename cdap-app-fabric/src/main/java/io/cdap/cdap/internal/events/{EventHandlerManager.java => EventReaderHandlerManager.java} (69%) diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/app/guice/AppFabricServiceRuntimeModule.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/app/guice/AppFabricServiceRuntimeModule.java index f0027e6e5cbd..b201bd84a6f0 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/app/guice/AppFabricServiceRuntimeModule.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/app/guice/AppFabricServiceRuntimeModule.java @@ -118,10 +118,10 @@ import io.cdap.cdap.internal.capability.CapabilityModule; import io.cdap.cdap.internal.credential.handler.CredentialProviderHttpHandler; import io.cdap.cdap.internal.credential.handler.CredentialProviderHttpHandlerInternal; -import io.cdap.cdap.internal.events.EventHandler; -import io.cdap.cdap.internal.events.EventHandlerManager; import io.cdap.cdap.internal.events.EventPublishManager; import io.cdap.cdap.internal.events.EventPublisher; +import io.cdap.cdap.internal.events.EventReaderHandler; +import io.cdap.cdap.internal.events.EventReaderHandlerManager; import io.cdap.cdap.internal.events.EventWriterExtensionProvider; import io.cdap.cdap.internal.events.EventWriterProvider; import io.cdap.cdap.internal.events.MetricsProvider; @@ -400,9 +400,9 @@ protected void configure() { Multibinder.newSetBinder(binder(), EventPublisher.class); eventPublishersBinder.addBinding().to(ProgramStatusEventPublisher.class); bind(EventPublishManager.class).in(Scopes.SINGLETON); - Multibinder eventHandlerBinder = - Multibinder.newSetBinder(binder(), EventHandler.class); - bind(EventHandlerManager.class).in(Scopes.SINGLETON); + Multibinder eventHandlerBinder = + Multibinder.newSetBinder(binder(), EventReaderHandler.class); + bind(EventReaderHandlerManager.class).in(Scopes.SINGLETON); bind(EventWriterProvider.class).to(EventWriterExtensionProvider.class); bind(MetricsProvider.class).to(SparkProgramStatusMetricsProvider.class); diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/events/EventHandler.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/events/EventReaderHandler.java similarity index 81% rename from cdap-app-fabric/src/main/java/io/cdap/cdap/internal/events/EventHandler.java rename to cdap-app-fabric/src/main/java/io/cdap/cdap/internal/events/EventReaderHandler.java index 2c20c14f921d..5a190a5b7f73 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/events/EventHandler.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/events/EventReaderHandler.java @@ -19,9 +19,10 @@ import com.google.common.util.concurrent.AbstractScheduledService; /** - * Abstract class for an event Handler. Each EventHandler will receive and process incoming events + * Abstract class for an event reader Handler. + * Each EventReaderHandler will receive and process incoming events. */ -public abstract class EventHandler extends AbstractScheduledService { +public abstract class EventReaderHandler extends AbstractScheduledService { /** * Initialize this handler. diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/events/EventHandlerManager.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/events/EventReaderHandlerManager.java similarity index 69% rename from cdap-app-fabric/src/main/java/io/cdap/cdap/internal/events/EventHandlerManager.java rename to cdap-app-fabric/src/main/java/io/cdap/cdap/internal/events/EventReaderHandlerManager.java index a223e0413e13..b89b1d90eef7 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/events/EventHandlerManager.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/events/EventReaderHandlerManager.java @@ -26,19 +26,19 @@ import org.slf4j.LoggerFactory; /** - * EventHandlerManager is responsible for starting all the event handler threads. + * EventReaderHandlerManager is responsible for starting all the event reader handler threads. */ -public class EventHandlerManager extends AbstractIdleService { +public class EventReaderHandlerManager extends AbstractIdleService { - private static final Logger LOG = LoggerFactory.getLogger(EventHandlerManager.class); + private static final Logger LOG = LoggerFactory.getLogger(EventReaderHandlerManager.class); private final boolean enabled; - private final Set eventHandlers; + private final Set eventReaderHandlers; @Inject - EventHandlerManager(CConfiguration cConf, Set eventHandlers) { + EventReaderHandlerManager(CConfiguration cConf, Set eventReaderHandlers) { this.enabled = Feature.EVENT_READER.isEnabled(new DefaultFeatureFlagsProvider(cConf)); - this.eventHandlers = eventHandlers; + this.eventReaderHandlers = eventReaderHandlers; } @Override @@ -46,16 +46,16 @@ protected void startUp() throws Exception { if (!enabled) { return; // If not enabled, don't start } - eventHandlers.forEach(eventHandler -> { + eventReaderHandlers.forEach(eventReaderHandler -> { // Loading the event writers from provider // Initialize the event publisher with all the event writers provided by provider - if (eventHandler.initialize()) { - eventHandler.startAndWait(); + if (eventReaderHandler.initialize()) { + eventReaderHandler.startAndWait(); LOG.info("Successfully initialized eventReaderHandler: {}", - eventHandler.getClass().getSimpleName()); + eventReaderHandler.getClass().getSimpleName()); } else { LOG.error("Failed to initialize eventReaderHandler: {}", - eventHandler.getClass().getSimpleName()); + eventReaderHandler.getClass().getSimpleName()); } }); } @@ -65,8 +65,8 @@ protected void shutDown() throws Exception { if (!enabled) { return; // If not enabled, don't shut down } - eventHandlers.forEach(eventHandler -> { - eventHandler.stopAndWait(); + eventReaderHandlers.forEach(eventReaderHandler -> { + eventReaderHandler.stopAndWait(); }); } } diff --git a/cdap-standalone/src/main/java/io/cdap/cdap/StandaloneMain.java b/cdap-standalone/src/main/java/io/cdap/cdap/StandaloneMain.java index 431a0a83d59e..01e6ed273b94 100644 --- a/cdap-standalone/src/main/java/io/cdap/cdap/StandaloneMain.java +++ b/cdap-standalone/src/main/java/io/cdap/cdap/StandaloneMain.java @@ -69,7 +69,7 @@ import io.cdap.cdap.internal.app.runtime.monitor.RuntimeServer; import io.cdap.cdap.internal.app.services.AppFabricServer; import io.cdap.cdap.internal.app.worker.sidecar.ArtifactLocalizerService; -import io.cdap.cdap.internal.events.EventHandlerManager; +import io.cdap.cdap.internal.events.EventReaderHandlerManager; import io.cdap.cdap.internal.events.EventPublishManager; import io.cdap.cdap.logging.LoggingUtil; import io.cdap.cdap.logging.appender.LogAppenderInitializer; @@ -156,7 +156,7 @@ public class StandaloneMain { private final RuntimeServer runtimeServer; private final ArtifactLocalizerService artifactLocalizerService; private final EventPublishManager eventPublishManager; - private final EventHandlerManager eventHandlerManager; + private final EventReaderHandlerManager eventReaderHandlerManager; private ExternalAuthenticationServer externalAuthenticationServer; @@ -189,7 +189,7 @@ private StandaloneMain(List modules, CConfiguration cConf) { cConf.setInt(Constants.ArtifactLocalizer.PORT, 0); artifactLocalizerService = injector.getInstance(ArtifactLocalizerService.class); eventPublishManager = injector.getInstance(EventPublishManager.class); - eventHandlerManager = injector.getInstance(EventHandlerManager.class); + eventReaderHandlerManager = injector.getInstance(EventReaderHandlerManager.class); if (cConf.getBoolean(Constants.Transaction.TX_ENABLED)) { txService = injector.getInstance(InMemoryTransactionService.class); @@ -304,7 +304,7 @@ public void startUp() throws Exception { secureStoreService.startAndWait(); supportBundleInternalService.startAndWait(); eventPublishManager.startAndWait(); - eventHandlerManager.startAndWait(); + eventReaderHandlerManager.startAndWait(); String protocol = sslEnabled ? "https" : "http"; int dashboardPort = sslEnabled @@ -343,7 +343,7 @@ public void shutDown() { previewHttpServer.stopAndWait(); artifactLocalizerService.stopAndWait(); eventPublishManager.stopAndWait(); - eventHandlerManager.stopAndWait(); + eventReaderHandlerManager.stopAndWait(); // app fabric will also stop all programs appFabricServer.stopAndWait(); runtimeServer.stopAndWait();