Skip to content

Commit

Permalink
Event Handling Infrastructure
Browse files Browse the repository at this point in the history
  • Loading branch information
codeNinjaDev committed Jul 11, 2023
1 parent b85ed13 commit 6e189b8
Show file tree
Hide file tree
Showing 11 changed files with 344 additions and 4 deletions.
8 changes: 7 additions & 1 deletion cdap-app-fabric/pom.xml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright © 2014-2021 Cask Data, Inc.
Copyright © 2014-2023 Cask Data, Inc.
Licensed under the Apache License, Version 2.0 (the "License"); you may not
use this file except in compliance with the License. You may obtain a copy of
Expand Down Expand Up @@ -269,6 +269,12 @@
<artifactId>cdap-source-control</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.cdap.cdap</groupId>
<artifactId>cdap-event-reader-spi</artifactId>
<version>${project.version}</version>
<scope>compile</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@
import io.cdap.cdap.internal.capability.CapabilityModule;
import io.cdap.cdap.internal.credential.handler.CredentialProviderHttpHandler;
import io.cdap.cdap.internal.credential.handler.CredentialProviderHttpHandlerInternal;
import io.cdap.cdap.internal.events.EventHandler;
import io.cdap.cdap.internal.events.EventHandlerManager;
import io.cdap.cdap.internal.events.EventPublishManager;
import io.cdap.cdap.internal.events.EventPublisher;
import io.cdap.cdap.internal.events.EventWriterExtensionProvider;
Expand Down Expand Up @@ -398,6 +400,9 @@ protected void configure() {
Multibinder.newSetBinder(binder(), EventPublisher.class);
eventPublishersBinder.addBinding().to(ProgramStatusEventPublisher.class);
bind(EventPublishManager.class).in(Scopes.SINGLETON);
Multibinder<EventHandler> eventHandlerBinder =
Multibinder.newSetBinder(binder(), EventHandler.class);
bind(EventHandlerManager.class).in(Scopes.SINGLETON);
bind(EventWriterProvider.class).to(EventWriterExtensionProvider.class);
bind(MetricsProvider.class).to(SparkProgramStatusMetricsProvider.class);

Expand Down Expand Up @@ -488,7 +493,7 @@ public synchronized org.quartz.Scheduler get() {
* @return an instance of {@link org.quartz.Scheduler}
*/
private org.quartz.Scheduler getScheduler(JobStore store,
CConfiguration cConf) throws SchedulerException {
CConfiguration cConf) throws SchedulerException {

int threadPoolSize = cConf.getInt(Constants.Scheduler.CFG_SCHEDULER_MAX_THREAD_POOL_SIZE);
ExecutorThreadPool threadPool = new ExecutorThreadPool(threadPoolSize);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright © 2023 Cask Data, Inc.
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.internal.events;

import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants;
import io.cdap.cdap.spi.events.EventReaderContext;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

/**
* Provides an initialized default context for EventReader implementing {@link EventReaderContext}.
*/
public class DefaultEventReaderContext implements EventReaderContext {

private final Map<String, String> properties;

/**
* Construct the default Event reader context.
*
* @param prefix prefix for specific event reader (e.g start.<simpleclassname>)
* @param cConf An instance of an injected ${@link CConfiguration}.
*/
DefaultEventReaderContext(String prefix, CConfiguration cConf) {
String completePrefix = String.format("%s.%s", Constants.Event.EVENTS_READER_PREFIX, prefix);

// Calculate the maximum TMS retry timeout
int retryTimeout = cConf.getInt(Constants.AppFabric.PROGRAM_STATUS_RETRY_STRATEGY_PREFIX
+ Constants.Retry.MAX_TIME_SECS);
// Set the acknowledgement deadline to be greater than the retry timeout by configurable amount
int ackDeadline = retryTimeout + cConf.getInt(Constants.Event.EVENTS_READER_ACK_BUFFER);
Map<String, String> mutableProperties = new HashMap<>(cConf.getPropsWithPrefix(completePrefix));
mutableProperties.put("ackDeadline", String.valueOf(ackDeadline));
this.properties = Collections.unmodifiableMap(mutableProperties);
}

@Override
public Map<String, String> getProperties() {
return properties;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.internal.events;

import com.google.common.util.concurrent.AbstractScheduledService;

/**
* Abstract class for an event Handler. Each EventHandler will receive and process incoming events
*/
public abstract class EventHandler extends AbstractScheduledService {

/**
* Initialize this handler.
*
* @return true if successful
*/
public abstract boolean initialize();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.internal.events;

import com.google.common.util.concurrent.AbstractIdleService;
import com.google.inject.Inject;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.feature.DefaultFeatureFlagsProvider;
import io.cdap.cdap.features.Feature;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* EventHandlerManager is responsible for starting all the event handler threads.
*/
public class EventHandlerManager extends AbstractIdleService {

private static final Logger LOG = LoggerFactory.getLogger(EventHandlerManager.class);

private final boolean enabled;
private final Set<EventHandler> eventHandlers;

@Inject
EventHandlerManager(CConfiguration cConf, Set<EventHandler> eventHandlers) {
this.enabled = Feature.EVENT_READER.isEnabled(new DefaultFeatureFlagsProvider(cConf));
this.eventHandlers = eventHandlers;
}

@Override
protected void startUp() throws Exception {
if (!enabled) {
return; // If not enabled, don't start
}
eventHandlers.forEach(eventHandler -> {
// Loading the event writers from provider
// Initialize the event publisher with all the event writers provided by provider
if (eventHandler.initialize()) {
eventHandler.startAndWait();
LOG.info("Successfully initialized eventReaderHandler: {}",
eventHandler.getClass().getSimpleName());
} else {
LOG.error("Failed to initialize eventReaderHandler: {}",
eventHandler.getClass().getSimpleName());
}
});
}

@Override
protected void shutDown() throws Exception {
if (!enabled) {
return; // If not enabled, don't shut down
}
eventHandlers.forEach(eventHandler -> {
eventHandler.stopAndWait();
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.internal.events;

import io.cdap.cdap.spi.events.Event;
import io.cdap.cdap.spi.events.EventReader;
import java.util.Map;

/**
* This interface provides Event Readers ({@link EventReader}) in order to load them as an
* extension.
*/
public interface EventReaderProvider<T extends Event> {

/**
* Method which retrieve a {@link Map} of {@link EventReader} and the Extension Type.
*/
Map<String, EventReader<T>> loadEventReaders();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright © 2023 Cask Data, Inc.
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.internal.events.dummy;

import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Inject;
import io.cdap.cdap.spi.events.Event;
import io.cdap.cdap.spi.events.EventReader;
import io.cdap.cdap.spi.events.EventReaderContext;
import io.cdap.cdap.spi.events.EventResult;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Dummy implementation of {@link EventReader} mainly for test proposals.
*/
public class DummyEventReader<T extends Event> implements EventReader<T> {

private static final Logger logger = LoggerFactory.getLogger(DummyEventReader.class);

@Inject
public DummyEventReader() {
}

@Override
public void initialize(EventReaderContext eventReaderContext) {
logger.info("Initializing DummyEventReader...");
}

@Override
public EventResult<T> pull(int maxMessages) {
ArrayList<T> sentMessages = new ArrayList<>();
Iterator<T> it = getMessages().iterator();
int i = 0;
while (i < maxMessages && it.hasNext()) {
sentMessages.add(it.next());
i++;
}
return new DummyEventResult(sentMessages);
}

@VisibleForTesting
public Collection<T> getMessages() {
return new ArrayList<>();
}

@Override
public void close() throws Exception {
logger.info("Closing dummy reader");
}

class DummyEventResult implements EventResult<T> {
final Collection<T> events;

DummyEventResult(Collection<T> events) {
this.events = events;
}

@Override
public void consumeEvents(Consumer<T> consumer) {
for (T evt : events) {
try {
consumer.accept(evt);
} catch (Exception e) {
logger.error("Error on consuming event {}", evt.getVersion(), e);
}
}
}

@Override
public void close() throws Exception {}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.internal.events.dummy;

import com.google.inject.Inject;
import io.cdap.cdap.internal.events.EventReaderProvider;
import io.cdap.cdap.spi.events.Event;
import io.cdap.cdap.spi.events.EventReader;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

/**
* Dummy implementation for {@link EventReaderProvider} for test proposals.
*/
public class DummyEventReaderExtensionProvider<T extends Event> implements EventReaderProvider {

private final DummyEventReader eventReader;

@Inject
public DummyEventReaderExtensionProvider(DummyEventReader eventReader) {
this.eventReader = eventReader;
}

@Override
public Map<String, EventReader> loadEventReaders() {
Map<String, EventReader> map = new HashMap<>();
map.put(this.eventReader.getClass().getName(), this.eventReader);
return Collections.unmodifiableMap(map);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2208,17 +2208,19 @@ public static final class Event {

public static final String PROGRAM_STATUS_POLL_INTERVAL_SECONDS = "event.program.status.poll.interval.seconds";


public static final String PROGRAM_STATUS_FETCH_SIZE = "event.program.status.fetch.size";

public static final String INSTANCE_NAME = "event.instance.name";

public static final String PROJECT_NAME = "event.project.name";

public static final String EVENTS_WRITER_PREFIX = "event.writer";

public static final String EVENTS_WRITER_EXTENSIONS_DIR = "events.writer.extensions.dir";

public static final String EVENTS_WRITER_EXTENSIONS_ENABLED_LIST = "events.writer.extensions.enabled.list";

public static final String EVENTS_READER_PREFIX = "event.reader";
public static final String EVENTS_READER_ACK_BUFFER = "events.reader.buffer";
}

/**
Expand Down
Loading

0 comments on commit 6e189b8

Please sign in to comment.