Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Event Handling Infrastructure #15223

Merged
merged 1 commit into from
Jul 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion cdap-app-fabric/pom.xml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright © 2014-2021 Cask Data, Inc.
Copyright © 2014-2023 Cask Data, Inc.

Licensed under the Apache License, Version 2.0 (the "License"); you may not
use this file except in compliance with the License. You may obtain a copy of
Expand Down Expand Up @@ -269,6 +269,11 @@
<artifactId>cdap-source-control</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.cdap.cdap</groupId>
<artifactId>cdap-event-reader-spi</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@
import io.cdap.cdap.internal.credential.handler.CredentialProviderHttpHandlerInternal;
import io.cdap.cdap.internal.events.EventPublishManager;
import io.cdap.cdap.internal.events.EventPublisher;
import io.cdap.cdap.internal.events.EventSubscriber;
import io.cdap.cdap.internal.events.EventSubscriberManager;
import io.cdap.cdap.internal.events.EventWriterExtensionProvider;
import io.cdap.cdap.internal.events.EventWriterProvider;
import io.cdap.cdap.internal.events.MetricsProvider;
Expand Down Expand Up @@ -398,6 +400,9 @@ protected void configure() {
Multibinder.newSetBinder(binder(), EventPublisher.class);
eventPublishersBinder.addBinding().to(ProgramStatusEventPublisher.class);
bind(EventPublishManager.class).in(Scopes.SINGLETON);
Multibinder<EventSubscriber> eventSubscribersBinder =
Multibinder.newSetBinder(binder(), EventSubscriber.class);
bind(EventSubscriberManager.class).in(Scopes.SINGLETON);
bind(EventWriterProvider.class).to(EventWriterExtensionProvider.class);
bind(MetricsProvider.class).to(SparkProgramStatusMetricsProvider.class);

Expand Down Expand Up @@ -488,7 +493,7 @@ public synchronized org.quartz.Scheduler get() {
* @return an instance of {@link org.quartz.Scheduler}
*/
private org.quartz.Scheduler getScheduler(JobStore store,
CConfiguration cConf) throws SchedulerException {
CConfiguration cConf) throws SchedulerException {

int threadPoolSize = cConf.getInt(Constants.Scheduler.CFG_SCHEDULER_MAX_THREAD_POOL_SIZE);
ExecutorThreadPool threadPool = new ExecutorThreadPool(threadPoolSize);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright © 2023 Cask Data, Inc.
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.internal.events;

import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants;
import io.cdap.cdap.spi.events.EventReaderContext;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

/**
* Provides an initialized default context for EventReader implementing {@link EventReaderContext}.
*/
public class DefaultEventReaderContext implements EventReaderContext {

private final Map<String, String> properties;

/**
* Construct the default Event reader context.
*
* @param prefix prefix for specific event reader
* @param cConf An instance of an injected ${@link CConfiguration}.
*/
DefaultEventReaderContext(String prefix, CConfiguration cConf) {
codeNinjaDev marked this conversation as resolved.
Show resolved Hide resolved
Map<String, String> mutableProperties = new HashMap<>(cConf.getPropsWithPrefix(prefix));
mutableProperties.put(Constants.INSTANCE_NAME, cConf.get(Constants.INSTANCE_NAME));
greeshmaswaminathan marked this conversation as resolved.
Show resolved Hide resolved
this.properties = Collections.unmodifiableMap(mutableProperties);
codeNinjaDev marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public Map<String, String> getProperties() {
return properties;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.internal.events;

import io.cdap.cdap.spi.events.Event;
import io.cdap.cdap.spi.events.EventReader;
import java.util.Map;

/**
* This interface provides Event Readers ({@link EventReader}) in order to load them as an
* extension.
*/
public interface EventReaderProvider<T extends Event> {

/**
* Method which retrieve a {@link Map} of {@link EventReader} and the Extension Type.
*/
Map<String, EventReader<T>> loadEventReaders();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.internal.events;

import com.google.common.util.concurrent.AbstractScheduledService;

/**
* Abstract class for an event subscriber.
* Each EventSubscriber will receive and process incoming events.
*/
public abstract class EventSubscriber extends AbstractScheduledService {

/**
* Initialize this handler.
*/
public abstract void initialize();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.internal.events;

import com.google.common.util.concurrent.AbstractIdleService;
import com.google.inject.Inject;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.feature.DefaultFeatureFlagsProvider;
import io.cdap.cdap.features.Feature;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* EventSubscriberManager is responsible for starting all the event subscriber threads.
*/
public class EventSubscriberManager extends AbstractIdleService {

private static final Logger LOG = LoggerFactory.getLogger(EventSubscriberManager.class);

private final boolean enabled;
private final Set<EventSubscriber> eventSubscribers;

@Inject
EventSubscriberManager(CConfiguration cConf, Set<EventSubscriber> eventSubscribers) {
this.enabled = Feature.EVENT_READER.isEnabled(new DefaultFeatureFlagsProvider(cConf));
this.eventSubscribers = eventSubscribers;
}

@Override
protected void startUp() throws Exception {
if (!enabled) {
return; // If not enabled, don't start
}
eventSubscribers.forEach(eventSubscriber -> {
// Initialize the event subscribers with all the event readers provided by provider
try {
eventSubscriber.initialize();
eventSubscriber.startAndWait();
LOG.info("Successfully initialized eventSubscriber: {}",
codeNinjaDev marked this conversation as resolved.
Show resolved Hide resolved
eventSubscriber.getClass().getSimpleName());
codeNinjaDev marked this conversation as resolved.
Show resolved Hide resolved
} catch (Exception e) {
LOG.error("Failed to initialize eventSubscriber: {}",
eventSubscriber.getClass().getSimpleName(), e);
}
});
}

@Override
protected void shutDown() throws Exception {
if (!enabled) {
return; // If not enabled, don't shut down
}
eventSubscribers.forEach(eventSubscriber -> {
try {
eventSubscriber.stopAndWait();
} catch (Exception e) {
LOG.error("Failed to stop subscriber", e);
}
});
}
}
10 changes: 10 additions & 0 deletions cdap-common/src/main/resources/cdap-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5077,6 +5077,16 @@
</description>
</property>

<!-- event subscriber -->
<property>
<name>feature.event.reader.enabled</name>
<value>false</value>
<final>false</final>
<description>
Enable event subscribing in CDAP
</description>
</property>

<!-- Spark on k8s -->
<property>
<name>artifact.fetcher.bind.port</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
public enum Feature {
REPLICATION_TRANSFORMATIONS("6.6.0"),
EVENT_PUBLISH("6.7.0", false),
EVENT_READER("6.10.0", false),
PIPELINE_COMPOSITE_TRIGGERS("6.8.0"),
PUSHDOWN_TRANSFORMATION_GROUPBY("6.7.0"),
PUSHDOWN_TRANSFORMATION_DEDUPLICATE("6.7.0"),
Expand Down Expand Up @@ -72,7 +73,9 @@ public boolean isEnabled(FeatureFlagsProvider featureFlagsProvider) {
}

/**
* @return string that identifies the feature flag.
* Retrieve the string that identifies the feature flag.
*
* @return feature flag string
*/
public String getFeatureFlagString() {
return featureFlagString;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import io.cdap.cdap.internal.app.services.AppFabricServer;
import io.cdap.cdap.internal.app.worker.sidecar.ArtifactLocalizerService;
import io.cdap.cdap.internal.events.EventPublishManager;
import io.cdap.cdap.internal.events.EventSubscriberManager;
import io.cdap.cdap.logging.LoggingUtil;
import io.cdap.cdap.logging.appender.LogAppenderInitializer;
import io.cdap.cdap.logging.framework.LogPipelineLoader;
Expand Down Expand Up @@ -155,6 +156,7 @@ public class StandaloneMain {
private final RuntimeServer runtimeServer;
private final ArtifactLocalizerService artifactLocalizerService;
private final EventPublishManager eventPublishManager;
private final EventSubscriberManager eventSubscriberManager;

private ExternalAuthenticationServer externalAuthenticationServer;

Expand Down Expand Up @@ -187,6 +189,7 @@ private StandaloneMain(List<Module> modules, CConfiguration cConf) {
cConf.setInt(Constants.ArtifactLocalizer.PORT, 0);
artifactLocalizerService = injector.getInstance(ArtifactLocalizerService.class);
eventPublishManager = injector.getInstance(EventPublishManager.class);
eventSubscriberManager = injector.getInstance(EventSubscriberManager.class);

if (cConf.getBoolean(Constants.Transaction.TX_ENABLED)) {
txService = injector.getInstance(InMemoryTransactionService.class);
Expand Down Expand Up @@ -301,6 +304,7 @@ public void startUp() throws Exception {
secureStoreService.startAndWait();
supportBundleInternalService.startAndWait();
eventPublishManager.startAndWait();
eventSubscriberManager.startAndWait();

String protocol = sslEnabled ? "https" : "http";
int dashboardPort = sslEnabled
Expand Down Expand Up @@ -339,6 +343,7 @@ public void shutDown() {
previewHttpServer.stopAndWait();
artifactLocalizerService.stopAndWait();
eventPublishManager.stopAndWait();
eventSubscriberManager.stopAndWait();
// app fabric will also stop all programs
appFabricServer.stopAndWait();
runtimeServer.stopAndWait();
Expand Down