Skip to content

Commit

Permalink
Merge pull request #15223 from cdapio/feature/cdap-20678-event-pr
Browse files Browse the repository at this point in the history
Event Subscriber Infrastructure
  • Loading branch information
codeNinjaDev authored Jul 25, 2023
2 parents c9d7eeb + 5846b08 commit f3d7b36
Show file tree
Hide file tree
Showing 9 changed files with 216 additions and 3 deletions.
7 changes: 6 additions & 1 deletion cdap-app-fabric/pom.xml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright © 2014-2021 Cask Data, Inc.
Copyright © 2014-2023 Cask Data, Inc.
Licensed under the Apache License, Version 2.0 (the "License"); you may not
use this file except in compliance with the License. You may obtain a copy of
Expand Down Expand Up @@ -269,6 +269,11 @@
<artifactId>cdap-source-control</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.cdap.cdap</groupId>
<artifactId>cdap-event-reader-spi</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@
import io.cdap.cdap.internal.credential.handler.CredentialProviderHttpHandlerInternal;
import io.cdap.cdap.internal.events.EventPublishManager;
import io.cdap.cdap.internal.events.EventPublisher;
import io.cdap.cdap.internal.events.EventSubscriber;
import io.cdap.cdap.internal.events.EventSubscriberManager;
import io.cdap.cdap.internal.events.EventWriterExtensionProvider;
import io.cdap.cdap.internal.events.EventWriterProvider;
import io.cdap.cdap.internal.events.MetricsProvider;
Expand Down Expand Up @@ -402,6 +404,9 @@ protected void configure() {
Multibinder.newSetBinder(binder(), EventPublisher.class);
eventPublishersBinder.addBinding().to(ProgramStatusEventPublisher.class);
bind(EventPublishManager.class).in(Scopes.SINGLETON);
Multibinder<EventSubscriber> eventSubscribersBinder =
Multibinder.newSetBinder(binder(), EventSubscriber.class);
bind(EventSubscriberManager.class).in(Scopes.SINGLETON);
bind(EventWriterProvider.class).to(EventWriterExtensionProvider.class);
bind(MetricsProvider.class).to(SparkProgramStatusMetricsProvider.class);

Expand Down Expand Up @@ -492,7 +497,7 @@ public synchronized org.quartz.Scheduler get() {
* @return an instance of {@link org.quartz.Scheduler}
*/
private org.quartz.Scheduler getScheduler(JobStore store,
CConfiguration cConf) throws SchedulerException {
CConfiguration cConf) throws SchedulerException {

int threadPoolSize = cConf.getInt(Constants.Scheduler.CFG_SCHEDULER_MAX_THREAD_POOL_SIZE);
ExecutorThreadPool threadPool = new ExecutorThreadPool(threadPoolSize);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright © 2023 Cask Data, Inc.
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.internal.events;

import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants;
import io.cdap.cdap.spi.events.EventReaderContext;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

/**
* Provides an initialized default context for EventReader implementing {@link EventReaderContext}.
*/
public class DefaultEventReaderContext implements EventReaderContext {

private final Map<String, String> properties;

/**
* Construct the default Event reader context.
*
* @param prefix prefix for specific event reader
* @param cConf An instance of an injected ${@link CConfiguration}.
*/
DefaultEventReaderContext(String prefix, CConfiguration cConf) {
Map<String, String> mutableProperties = new HashMap<>(cConf.getPropsWithPrefix(prefix));
mutableProperties.put(Constants.INSTANCE_NAME, cConf.get(Constants.INSTANCE_NAME));
this.properties = Collections.unmodifiableMap(mutableProperties);
}

@Override
public Map<String, String> getProperties() {
return properties;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.internal.events;

import io.cdap.cdap.spi.events.Event;
import io.cdap.cdap.spi.events.EventReader;
import java.util.Map;

/**
* This interface provides Event Readers ({@link EventReader}) in order to load them as an
* extension.
*/
public interface EventReaderProvider<T extends Event> {

/**
* Method which retrieve a {@link Map} of {@link EventReader} and the Extension Type.
*/
Map<String, EventReader<T>> loadEventReaders();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.internal.events;

import com.google.common.util.concurrent.AbstractScheduledService;

/**
* Abstract class for an event subscriber.
* Each EventSubscriber will receive and process incoming events.
*/
public abstract class EventSubscriber extends AbstractScheduledService {

/**
* Initialize this handler.
*/
public abstract void initialize();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.internal.events;

import com.google.common.util.concurrent.AbstractIdleService;
import com.google.inject.Inject;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.feature.DefaultFeatureFlagsProvider;
import io.cdap.cdap.features.Feature;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* EventSubscriberManager is responsible for starting all the event subscriber threads.
*/
public class EventSubscriberManager extends AbstractIdleService {

private static final Logger LOG = LoggerFactory.getLogger(EventSubscriberManager.class);

private final boolean enabled;
private final Set<EventSubscriber> eventSubscribers;

@Inject
EventSubscriberManager(CConfiguration cConf, Set<EventSubscriber> eventSubscribers) {
this.enabled = Feature.EVENT_READER.isEnabled(new DefaultFeatureFlagsProvider(cConf));
this.eventSubscribers = eventSubscribers;
}

@Override
protected void startUp() throws Exception {
if (!enabled) {
return; // If not enabled, don't start
}
eventSubscribers.forEach(eventSubscriber -> {
// Initialize the event subscribers with all the event readers provided by provider
try {
eventSubscriber.initialize();
eventSubscriber.startAndWait();
LOG.info("Successfully initialized eventSubscriber: {}",
eventSubscriber.getClass().getSimpleName());
} catch (Exception e) {
LOG.error("Failed to initialize eventSubscriber: {}",
eventSubscriber.getClass().getSimpleName(), e);
}
});
}

@Override
protected void shutDown() throws Exception {
if (!enabled) {
return; // If not enabled, don't shut down
}
eventSubscribers.forEach(eventSubscriber -> {
try {
eventSubscriber.stopAndWait();
} catch (Exception e) {
LOG.error("Failed to stop subscriber", e);
}
});
}
}
10 changes: 10 additions & 0 deletions cdap-common/src/main/resources/cdap-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5077,6 +5077,16 @@
</description>
</property>

<!-- event subscriber -->
<property>
<name>feature.event.reader.enabled</name>
<value>false</value>
<final>false</final>
<description>
Enable event subscribing in CDAP
</description>
</property>

<!-- Spark on k8s -->
<property>
<name>artifact.fetcher.bind.port</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
public enum Feature {
REPLICATION_TRANSFORMATIONS("6.6.0"),
EVENT_PUBLISH("6.7.0", false),
EVENT_READER("6.10.0", false),
PIPELINE_COMPOSITE_TRIGGERS("6.8.0"),
PUSHDOWN_TRANSFORMATION_GROUPBY("6.7.0"),
PUSHDOWN_TRANSFORMATION_DEDUPLICATE("6.7.0"),
Expand Down Expand Up @@ -72,7 +73,9 @@ public boolean isEnabled(FeatureFlagsProvider featureFlagsProvider) {
}

/**
* @return string that identifies the feature flag.
* Retrieve the string that identifies the feature flag.
*
* @return feature flag string
*/
public String getFeatureFlagString() {
return featureFlagString;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import io.cdap.cdap.internal.app.services.AppFabricServer;
import io.cdap.cdap.internal.app.worker.sidecar.ArtifactLocalizerService;
import io.cdap.cdap.internal.events.EventPublishManager;
import io.cdap.cdap.internal.events.EventSubscriberManager;
import io.cdap.cdap.logging.LoggingUtil;
import io.cdap.cdap.logging.appender.LogAppenderInitializer;
import io.cdap.cdap.logging.framework.LogPipelineLoader;
Expand Down Expand Up @@ -155,6 +156,7 @@ public class StandaloneMain {
private final RuntimeServer runtimeServer;
private final ArtifactLocalizerService artifactLocalizerService;
private final EventPublishManager eventPublishManager;
private final EventSubscriberManager eventSubscriberManager;

private ExternalAuthenticationServer externalAuthenticationServer;

Expand Down Expand Up @@ -187,6 +189,7 @@ private StandaloneMain(List<Module> modules, CConfiguration cConf) {
cConf.setInt(Constants.ArtifactLocalizer.PORT, 0);
artifactLocalizerService = injector.getInstance(ArtifactLocalizerService.class);
eventPublishManager = injector.getInstance(EventPublishManager.class);
eventSubscriberManager = injector.getInstance(EventSubscriberManager.class);

if (cConf.getBoolean(Constants.Transaction.TX_ENABLED)) {
txService = injector.getInstance(InMemoryTransactionService.class);
Expand Down Expand Up @@ -301,6 +304,7 @@ public void startUp() throws Exception {
secureStoreService.startAndWait();
supportBundleInternalService.startAndWait();
eventPublishManager.startAndWait();
eventSubscriberManager.startAndWait();

String protocol = sslEnabled ? "https" : "http";
int dashboardPort = sslEnabled
Expand Down Expand Up @@ -339,6 +343,7 @@ public void shutDown() {
previewHttpServer.stopAndWait();
artifactLocalizerService.stopAndWait();
eventPublishManager.stopAndWait();
eventSubscriberManager.stopAndWait();
// app fabric will also stop all programs
appFabricServer.stopAndWait();
runtimeServer.stopAndWait();
Expand Down

0 comments on commit f3d7b36

Please sign in to comment.