Skip to content

Commit

Permalink
Event Subscriber Infrastructure
Browse files Browse the repository at this point in the history
  • Loading branch information
codeNinjaDev committed Jul 17, 2023
1 parent b85ed13 commit f884aa8
Show file tree
Hide file tree
Showing 12 changed files with 357 additions and 3 deletions.
7 changes: 6 additions & 1 deletion cdap-app-fabric/pom.xml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright © 2014-2021 Cask Data, Inc.
Copyright © 2014-2023 Cask Data, Inc.
Licensed under the Apache License, Version 2.0 (the "License"); you may not
use this file except in compliance with the License. You may obtain a copy of
Expand Down Expand Up @@ -269,6 +269,11 @@
<artifactId>cdap-source-control</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.cdap.cdap</groupId>
<artifactId>cdap-event-reader-spi</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@
import io.cdap.cdap.internal.credential.handler.CredentialProviderHttpHandlerInternal;
import io.cdap.cdap.internal.events.EventPublishManager;
import io.cdap.cdap.internal.events.EventPublisher;
import io.cdap.cdap.internal.events.EventSubscriber;
import io.cdap.cdap.internal.events.EventSubscriberManager;
import io.cdap.cdap.internal.events.EventWriterExtensionProvider;
import io.cdap.cdap.internal.events.EventWriterProvider;
import io.cdap.cdap.internal.events.MetricsProvider;
Expand Down Expand Up @@ -398,6 +400,9 @@ protected void configure() {
Multibinder.newSetBinder(binder(), EventPublisher.class);
eventPublishersBinder.addBinding().to(ProgramStatusEventPublisher.class);
bind(EventPublishManager.class).in(Scopes.SINGLETON);
Multibinder<EventSubscriber> eventSubscribersBinder =
Multibinder.newSetBinder(binder(), EventSubscriber.class);
bind(EventSubscriberManager.class).in(Scopes.SINGLETON);
bind(EventWriterProvider.class).to(EventWriterExtensionProvider.class);
bind(MetricsProvider.class).to(SparkProgramStatusMetricsProvider.class);

Expand Down Expand Up @@ -488,7 +493,7 @@ public synchronized org.quartz.Scheduler get() {
* @return an instance of {@link org.quartz.Scheduler}
*/
private org.quartz.Scheduler getScheduler(JobStore store,
CConfiguration cConf) throws SchedulerException {
CConfiguration cConf) throws SchedulerException {

int threadPoolSize = cConf.getInt(Constants.Scheduler.CFG_SCHEDULER_MAX_THREAD_POOL_SIZE);
ExecutorThreadPool threadPool = new ExecutorThreadPool(threadPoolSize);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright © 2023 Cask Data, Inc.
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.internal.events;

import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants;
import io.cdap.cdap.spi.events.EventReaderContext;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

/**
* Provides an initialized default context for EventReader implementing {@link EventReaderContext}.
*/
public class DefaultEventReaderContext implements EventReaderContext {

private final Map<String, String> properties;

/**
* Construct the default Event reader context.
*
* @param prefix prefix for specific event reader (e.g start.simpleclassname)
* @param cConf An instance of an injected ${@link CConfiguration}.
*/
DefaultEventReaderContext(String prefix, CConfiguration cConf) {
String completePrefix = String.format("%s.%s", Constants.Event.EVENTS_READER_PREFIX, prefix);

// Calculate the maximum TMS retry timeout
int retryTimeout = cConf.getInt(Constants.AppFabric.PROGRAM_STATUS_RETRY_STRATEGY_PREFIX
+ Constants.Retry.MAX_TIME_SECS);
// Set the acknowledgement deadline to be greater than the retry timeout by configurable amount
int ackDeadline = retryTimeout + cConf.getInt(Constants.Event.EVENTS_READER_ACK_BUFFER);
Map<String, String> mutableProperties = new HashMap<>(cConf.getPropsWithPrefix(completePrefix));
mutableProperties.put("ackDeadline", String.valueOf(ackDeadline));
this.properties = Collections.unmodifiableMap(mutableProperties);
}

@Override
public Map<String, String> getProperties() {
return properties;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.internal.events;

import io.cdap.cdap.spi.events.Event;
import io.cdap.cdap.spi.events.EventReader;
import java.util.Map;

/**
* This interface provides Event Readers ({@link EventReader}) in order to load them as an
* extension.
*/
public interface EventReaderProvider<T extends Event> {

/**
* Method which retrieve a {@link Map} of {@link EventReader} and the Extension Type.
*/
Map<String, EventReader<T>> loadEventReaders();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.internal.events;

import com.google.common.util.concurrent.AbstractScheduledService;

/**
* Abstract class for an event subscriber.
* Each EventSubscriber will receive and process incoming events.
*/
public abstract class EventSubscriber extends AbstractScheduledService {

/**
* Initialize this handler.
*
* @return true if successful
*/
public abstract boolean initialize();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.internal.events;

import com.google.common.util.concurrent.AbstractIdleService;
import com.google.inject.Inject;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.feature.DefaultFeatureFlagsProvider;
import io.cdap.cdap.features.Feature;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* EventSubscriberManager is responsible for starting all the event subscriber threads.
*/
public class EventSubscriberManager extends AbstractIdleService {

private static final Logger LOG = LoggerFactory.getLogger(EventSubscriberManager.class);

private final boolean enabled;
private final Set<EventSubscriber> eventSubscribers;

@Inject
EventSubscriberManager(CConfiguration cConf, Set<EventSubscriber> eventSubscribers) {
this.enabled = Feature.EVENT_READER.isEnabled(new DefaultFeatureFlagsProvider(cConf));
this.eventSubscribers = eventSubscribers;
}

@Override
protected void startUp() throws Exception {
if (!enabled) {
return; // If not enabled, don't start
}
eventSubscribers.forEach(eventSubscriber -> {
// Loading the event readers from provider
// Initialize the event subscribers with all the event readers provided by provider
if (eventSubscriber.initialize()) {
eventSubscriber.startAndWait();
LOG.info("Successfully initialized eventSubscriber: {}",
eventSubscriber.getClass().getSimpleName());
} else {
LOG.error("Failed to initialize eventSubscriber: {}",
eventSubscriber.getClass().getSimpleName());
}
});
}

@Override
protected void shutDown() throws Exception {
if (!enabled) {
return; // If not enabled, don't shut down
}
eventSubscribers.forEach(eventSubscriber -> {
eventSubscriber.stopAndWait();
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright © 2023 Cask Data, Inc.
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.internal.events.dummy;

import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Inject;
import io.cdap.cdap.spi.events.Event;
import io.cdap.cdap.spi.events.EventReader;
import io.cdap.cdap.spi.events.EventReaderContext;
import io.cdap.cdap.spi.events.EventResult;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Dummy implementation of {@link EventReader} mainly for test proposals.
*/
public class DummyEventReader<T extends Event> implements EventReader<T> {

private static final Logger logger = LoggerFactory.getLogger(DummyEventReader.class);

@Inject
public DummyEventReader() {
}

@Override
public void initialize(EventReaderContext eventReaderContext) {
logger.info("Initializing DummyEventReader...");
}

@Override
public EventResult<T> pull(int maxMessages) {
ArrayList<T> sentMessages = new ArrayList<>();
Iterator<T> it = getMessages().iterator();
int i = 0;
while (i < maxMessages && it.hasNext()) {
sentMessages.add(it.next());
i++;
}
return new DummyEventResult(sentMessages);
}

@VisibleForTesting
public Collection<T> getMessages() {
return new ArrayList<>();
}

@Override
public void close() throws Exception {
logger.info("Closing dummy reader");
}

class DummyEventResult implements EventResult<T> {
final Collection<T> events;

DummyEventResult(Collection<T> events) {
this.events = events;
}

@Override
public void consumeEvents(Consumer<T> consumer) {
for (T evt : events) {
try {
consumer.accept(evt);
} catch (Exception e) {
logger.error("Error on consuming event {}", evt.getVersion(), e);
}
}
}

@Override
public void close() throws Exception {}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.internal.events.dummy;

import com.google.inject.Inject;
import io.cdap.cdap.internal.events.EventReaderProvider;
import io.cdap.cdap.spi.events.Event;
import io.cdap.cdap.spi.events.EventReader;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

/**
* Dummy implementation for {@link EventReaderProvider} for test proposals.
*/
public class DummyEventReaderExtensionProvider<T extends Event> implements EventReaderProvider {

private final DummyEventReader eventReader;

@Inject
public DummyEventReaderExtensionProvider(DummyEventReader eventReader) {
this.eventReader = eventReader;
}

@Override
public Map<String, EventReader> loadEventReaders() {
Map<String, EventReader> map = new HashMap<>();
map.put(this.eventReader.getClass().getName(), this.eventReader);
return Collections.unmodifiableMap(map);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2219,6 +2219,10 @@ public static final class Event {
public static final String EVENTS_WRITER_EXTENSIONS_DIR = "events.writer.extensions.dir";

public static final String EVENTS_WRITER_EXTENSIONS_ENABLED_LIST = "events.writer.extensions.enabled.list";

public static final String EVENTS_READER_PREFIX = "event.reader";

public static final String EVENTS_READER_ACK_BUFFER = "events.reader.buffer";
}

/**
Expand Down
Loading

0 comments on commit f884aa8

Please sign in to comment.