Skip to content

Commit

Permalink
Renamed EventHandler to EventReaderHandler
Browse files Browse the repository at this point in the history
  • Loading branch information
codeNinjaDev committed Jul 10, 2023
1 parent a842b7e commit fb5c882
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@
import io.cdap.cdap.internal.capability.CapabilityModule;
import io.cdap.cdap.internal.credential.handler.CredentialProviderHttpHandler;
import io.cdap.cdap.internal.credential.handler.CredentialProviderHttpHandlerInternal;
import io.cdap.cdap.internal.events.EventHandlerManager;
import io.cdap.cdap.internal.events.EventReaderHandlerManager;
import io.cdap.cdap.internal.events.EventPublishManager;
import io.cdap.cdap.internal.events.EventPublisher;
import io.cdap.cdap.internal.events.EventWriterExtensionProvider;
Expand Down Expand Up @@ -399,7 +399,7 @@ protected void configure() {
Multibinder.newSetBinder(binder(), EventPublisher.class);
eventPublishersBinder.addBinding().to(ProgramStatusEventPublisher.class);
bind(EventPublishManager.class).in(Scopes.SINGLETON);
bind(EventHandlerManager.class).in(Scopes.SINGLETON);
bind(EventReaderHandlerManager.class).in(Scopes.SINGLETON);
bind(EventWriterProvider.class).to(EventWriterExtensionProvider.class);
bind(MetricsProvider.class).to(SparkProgramStatusMetricsProvider.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ public final ProgramController run(final Program program, ProgramOptions oldOpti
// Also extra files to the resources jar so that the TwillAppLifecycleEventHandler,
// which runs in the AM container, can get them.
// This can be removed when TWILL-246 is fixed.
// Only program running in Hadoop will be using EventHandler
// Only program running in Hadoop will be using EventReaderHandler
twillPreparer.withResources(localizeResources.get(CDAP_CONF_FILE_NAME).getURI(),
localizeResources.get(HADOOP_CONF_FILE_NAME).getURI(),
localizeResources.get(PROGRAM_OPTIONS_FILE_NAME).getURI());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@
import com.google.common.util.concurrent.AbstractScheduledService;

/**
* Abstract class for an event Handler. Each EventHandler will receive and process incoming events
* Abstract class for an event reader Handler.
* Each EventReaderHandler will receive and process incoming events.
*/
public abstract class EventHandler extends AbstractScheduledService {
public abstract class EventReaderHandler extends AbstractScheduledService {

/**
* Initialize this handler.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,31 +26,29 @@
import org.slf4j.LoggerFactory;

/**
* EventHandlerManager is responsible for starting all the event handler threads.
* EventReaderHandlerManager is responsible for starting all the event reader handler threads.
*/
public class EventHandlerManager extends AbstractIdleService {
public class EventReaderHandlerManager extends AbstractIdleService {

private static final Logger LOG = LoggerFactory.getLogger(EventHandlerManager.class);
private static final Logger LOG = LoggerFactory.getLogger(EventReaderHandlerManager.class);

private final boolean enabled;
private final Set<EventHandler> eventHandlers;
private final Set<EventReaderHandler> eventReaderHandlers;

@Inject
EventHandlerManager(CConfiguration cConf, Set<EventHandler> eventHandlers) {
EventReaderHandlerManager(CConfiguration cConf, Set<EventReaderHandler> eventReaderHandlers) {
this.enabled = Feature.EVENT_READ.isEnabled(new DefaultFeatureFlagsProvider(cConf));
this.eventHandlers = eventHandlers;
this.eventReaderHandlers = eventReaderHandlers;
}

@Override
protected void startUp() throws Exception {
if (!enabled) {
return; // If not enabled, don't start
}
eventHandlers.forEach(eventHandler -> {
// Loading the event writers from provider
// Initialize the event publisher with all the event writers provided by provider
if (eventHandler.initialize()) {
eventHandler.startAndWait();
eventReaderHandlers.forEach(eventReaderHandler -> {
if (eventReaderHandler.initialize()) {
eventReaderHandler.startAndWait();
}
});
}
Expand All @@ -60,8 +58,8 @@ protected void shutDown() throws Exception {
if (!enabled) {
return; // If not enabled, don't shut down
}
eventHandlers.forEach(eventHandler -> {
eventHandler.stopAndWait();
eventReaderHandlers.forEach(eventReaderHandler -> {
eventReaderHandler.stopAndWait();
});
}
}
10 changes: 5 additions & 5 deletions cdap-standalone/src/main/java/io/cdap/cdap/StandaloneMain.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
import io.cdap.cdap.internal.app.runtime.monitor.RuntimeServer;
import io.cdap.cdap.internal.app.services.AppFabricServer;
import io.cdap.cdap.internal.app.worker.sidecar.ArtifactLocalizerService;
import io.cdap.cdap.internal.events.EventHandlerManager;
import io.cdap.cdap.internal.events.EventReaderHandlerManager;
import io.cdap.cdap.internal.events.EventPublishManager;
import io.cdap.cdap.logging.LoggingUtil;
import io.cdap.cdap.logging.appender.LogAppenderInitializer;
Expand Down Expand Up @@ -156,7 +156,7 @@ public class StandaloneMain {
private final RuntimeServer runtimeServer;
private final ArtifactLocalizerService artifactLocalizerService;
private final EventPublishManager eventPublishManager;
private final EventHandlerManager eventHandlerManager;
private final EventReaderHandlerManager eventReaderHandlerManager;

private ExternalAuthenticationServer externalAuthenticationServer;

Expand Down Expand Up @@ -189,7 +189,7 @@ private StandaloneMain(List<Module> modules, CConfiguration cConf) {
cConf.setInt(Constants.ArtifactLocalizer.PORT, 0);
artifactLocalizerService = injector.getInstance(ArtifactLocalizerService.class);
eventPublishManager = injector.getInstance(EventPublishManager.class);
eventHandlerManager = injector.getInstance(EventHandlerManager.class);
eventReaderHandlerManager = injector.getInstance(EventReaderHandlerManager.class);

if (cConf.getBoolean(Constants.Transaction.TX_ENABLED)) {
txService = injector.getInstance(InMemoryTransactionService.class);
Expand Down Expand Up @@ -304,7 +304,7 @@ public void startUp() throws Exception {
secureStoreService.startAndWait();
supportBundleInternalService.startAndWait();
eventPublishManager.startAndWait();
eventHandlerManager.startAndWait();
eventReaderHandlerManager.startAndWait();

String protocol = sslEnabled ? "https" : "http";
int dashboardPort = sslEnabled
Expand Down Expand Up @@ -343,7 +343,7 @@ public void shutDown() {
previewHttpServer.stopAndWait();
artifactLocalizerService.stopAndWait();
eventPublishManager.stopAndWait();
eventHandlerManager.stopAndWait();
eventReaderHandlerManager.stopAndWait();
// app fabric will also stop all programs
appFabricServer.stopAndWait();
runtimeServer.stopAndWait();
Expand Down

0 comments on commit fb5c882

Please sign in to comment.