Skip to content

Commit

Permalink
Renamed EventReaderHandler to EventSubscriber
Browse files Browse the repository at this point in the history
  • Loading branch information
codeNinjaDev committed Jul 11, 2023
1 parent adbc549 commit baad386
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,8 @@
import io.cdap.cdap.internal.credential.handler.CredentialProviderHttpHandlerInternal;
import io.cdap.cdap.internal.events.EventPublishManager;
import io.cdap.cdap.internal.events.EventPublisher;
import io.cdap.cdap.internal.events.EventReaderHandler;
import io.cdap.cdap.internal.events.EventReaderHandlerManager;
import io.cdap.cdap.internal.events.EventSubscriber;
import io.cdap.cdap.internal.events.EventSubscriberManager;
import io.cdap.cdap.internal.events.EventWriterExtensionProvider;
import io.cdap.cdap.internal.events.EventWriterProvider;
import io.cdap.cdap.internal.events.MetricsProvider;
Expand Down Expand Up @@ -400,9 +400,9 @@ protected void configure() {
Multibinder.newSetBinder(binder(), EventPublisher.class);
eventPublishersBinder.addBinding().to(ProgramStatusEventPublisher.class);
bind(EventPublishManager.class).in(Scopes.SINGLETON);
Multibinder<EventReaderHandler> eventHandlerBinder =
Multibinder.newSetBinder(binder(), EventReaderHandler.class);
bind(EventReaderHandlerManager.class).in(Scopes.SINGLETON);
Multibinder<EventSubscriber> eventSubscribersBinder =
Multibinder.newSetBinder(binder(), EventSubscriber.class);
bind(EventSubscriberManager.class).in(Scopes.SINGLETON);
bind(EventWriterProvider.class).to(EventWriterExtensionProvider.class);
bind(MetricsProvider.class).to(SparkProgramStatusMetricsProvider.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
import com.google.common.util.concurrent.AbstractScheduledService;

/**
* Abstract class for an event reader Handler.
* Each EventReaderHandler will receive and process incoming events.
* Abstract class for an event subscriber.
* Each EventSubscriber will receive and process incoming events.
*/
public abstract class EventReaderHandler extends AbstractScheduledService {
public abstract class EventSubscriber extends AbstractScheduledService {

/**
* Initialize this handler.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,36 +26,36 @@
import org.slf4j.LoggerFactory;

/**
* EventReaderHandlerManager is responsible for starting all the event reader handler threads.
* EventSubscriberManager is responsible for starting all the event subscriber threads.
*/
public class EventReaderHandlerManager extends AbstractIdleService {
public class EventSubscriberManager extends AbstractIdleService {

private static final Logger LOG = LoggerFactory.getLogger(EventReaderHandlerManager.class);
private static final Logger LOG = LoggerFactory.getLogger(EventSubscriberManager.class);

private final boolean enabled;
private final Set<EventReaderHandler> eventReaderHandlers;
private final Set<EventSubscriber> eventSubscribers;

@Inject
EventReaderHandlerManager(CConfiguration cConf, Set<EventReaderHandler> eventReaderHandlers) {
EventSubscriberManager(CConfiguration cConf, Set<EventSubscriber> eventSubscribers) {
this.enabled = Feature.EVENT_READER.isEnabled(new DefaultFeatureFlagsProvider(cConf));
this.eventReaderHandlers = eventReaderHandlers;
this.eventSubscribers = eventSubscribers;
}

@Override
protected void startUp() throws Exception {
if (!enabled) {
return; // If not enabled, don't start
}
eventReaderHandlers.forEach(eventReaderHandler -> {
// Loading the event writers from provider
// Initialize the event publisher with all the event writers provided by provider
if (eventReaderHandler.initialize()) {
eventReaderHandler.startAndWait();
LOG.info("Successfully initialized eventReaderHandler: {}",
eventReaderHandler.getClass().getSimpleName());
eventSubscribers.forEach(eventSubscriber -> {
// Loading the event readers from provider
// Initialize the event subscribers with all the event readers provided by provider
if (eventSubscriber.initialize()) {
eventSubscriber.startAndWait();
LOG.info("Successfully initialized eventSubscriber: {}",
eventSubscriber.getClass().getSimpleName());
} else {
LOG.info("Failed to initialize eventReaderHandler: {}",
eventReaderHandler.getClass().getSimpleName());
LOG.info("Failed to initialize eventSubscriber: {}",
eventSubscriber.getClass().getSimpleName());
}
});
}
Expand All @@ -65,8 +65,8 @@ protected void shutDown() throws Exception {
if (!enabled) {
return; // If not enabled, don't shut down
}
eventReaderHandlers.forEach(eventReaderHandler -> {
eventReaderHandler.stopAndWait();
eventSubscribers.forEach(eventSubscriber -> {
eventSubscriber.stopAndWait();
});
}
}
10 changes: 5 additions & 5 deletions cdap-standalone/src/main/java/io/cdap/cdap/StandaloneMain.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
import io.cdap.cdap.internal.app.runtime.monitor.RuntimeServer;
import io.cdap.cdap.internal.app.services.AppFabricServer;
import io.cdap.cdap.internal.app.worker.sidecar.ArtifactLocalizerService;
import io.cdap.cdap.internal.events.EventReaderHandlerManager;
import io.cdap.cdap.internal.events.EventSubscriberManager;
import io.cdap.cdap.internal.events.EventPublishManager;

Check warning on line 73 in cdap-standalone/src/main/java/io/cdap/cdap/StandaloneMain.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.imports.CustomImportOrderCheck

Wrong lexicographical order for 'io.cdap.cdap.internal.events.EventPublishManager' import. Should be before 'io.cdap.cdap.internal.events.EventSubscriberManager'.
import io.cdap.cdap.logging.LoggingUtil;
import io.cdap.cdap.logging.appender.LogAppenderInitializer;
Expand Down Expand Up @@ -156,7 +156,7 @@ public class StandaloneMain {
private final RuntimeServer runtimeServer;
private final ArtifactLocalizerService artifactLocalizerService;
private final EventPublishManager eventPublishManager;
private final EventReaderHandlerManager eventReaderHandlerManager;
private final EventSubscriberManager eventSubscriberManager;

private ExternalAuthenticationServer externalAuthenticationServer;

Expand Down Expand Up @@ -189,7 +189,7 @@ private StandaloneMain(List<Module> modules, CConfiguration cConf) {
cConf.setInt(Constants.ArtifactLocalizer.PORT, 0);
artifactLocalizerService = injector.getInstance(ArtifactLocalizerService.class);
eventPublishManager = injector.getInstance(EventPublishManager.class);
eventReaderHandlerManager = injector.getInstance(EventReaderHandlerManager.class);
eventSubscriberManager = injector.getInstance(EventSubscriberManager.class);

if (cConf.getBoolean(Constants.Transaction.TX_ENABLED)) {
txService = injector.getInstance(InMemoryTransactionService.class);
Expand Down Expand Up @@ -304,7 +304,7 @@ public void startUp() throws Exception {
secureStoreService.startAndWait();
supportBundleInternalService.startAndWait();
eventPublishManager.startAndWait();
eventReaderHandlerManager.startAndWait();
eventSubscriberManager.startAndWait();

String protocol = sslEnabled ? "https" : "http";
int dashboardPort = sslEnabled
Expand Down Expand Up @@ -343,7 +343,7 @@ public void shutDown() {
previewHttpServer.stopAndWait();
artifactLocalizerService.stopAndWait();
eventPublishManager.stopAndWait();
eventReaderHandlerManager.stopAndWait();
eventSubscriberManager.stopAndWait();
// app fabric will also stop all programs
appFabricServer.stopAndWait();
runtimeServer.stopAndWait();
Expand Down

0 comments on commit baad386

Please sign in to comment.