Skip to content

Commit

Permalink
reformated code and cleaned up tests
Browse files Browse the repository at this point in the history
  • Loading branch information
codeNinjaDev committed Jul 21, 2023
1 parent b4949f5 commit 155900d
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -39,8 +38,8 @@
* extending from {@link AbstractExtensionLoader}
*/
public class StartProgramEventReaderExtensionProvider
extends AbstractExtensionLoader<String, EventReader<StartProgramEvent>>
implements EventReaderProvider<StartProgramEvent> {
extends AbstractExtensionLoader<String, EventReader<StartProgramEvent>>
implements EventReaderProvider<StartProgramEvent> {

private static final Logger LOG = LoggerFactory.getLogger(StartProgramEventReaderExtensionProvider.class);
private static final Set<String> ALLOWED_RESOURCES = createAllowedResources();
Expand All @@ -56,7 +55,7 @@ public class StartProgramEventReaderExtensionProvider
public StartProgramEventReaderExtensionProvider(CConfiguration cConf) {
super(cConf.get(Constants.Event.START_EVENTS_READER_EXTENSIONS_DIR, ""));
this.enabledEventReaders = cConf.getStringCollection(
Constants.Event.START_EVENTS_READER_EXTENSIONS_ENABLED_LIST);
Constants.Event.START_EVENTS_READER_EXTENSIONS_ENABLED_LIST);
if (this.enabledEventReaders == null || this.enabledEventReaders.isEmpty()) {
return;
}
Expand All @@ -66,10 +65,10 @@ public StartProgramEventReaderExtensionProvider(CConfiguration cConf) {
private static Set<String> createAllowedResources() {
try {
return ClassPathResources.getResourcesWithDependencies(EventReader.class.getClassLoader(),
EventReader.class);
EventReader.class);
} catch (IOException e) {
throw new RuntimeException("Failed to trace dependencies for reader extension. "
+ "Usage of events reader might fail.", e);
+ "Usage of events reader might fail.", e);
}
}

Expand All @@ -80,9 +79,9 @@ public Map<String, EventReader<StartProgramEvent>> loadEventReaders() {
@Override
protected Set<String> getSupportedTypesForProvider(EventReader eventReader) {
if (enabledEventReaders == null ||
!enabledEventReaders.contains(eventReader.getId())) {
!enabledEventReaders.contains(eventReader.getId())) {
LOG.debug("{} is not present in the allowed list of event readers.",
eventReader.getId());
eventReader.getId());
return Collections.emptySet();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ public class StartProgramEventSubscriber extends EventSubscriber {
/**
* Create instance that handles StartProgramEvents.
*
* @param cConf CDAP configuration
* @param cConf CDAP configuration
* @param extensionProvider eventReaderProvider for StartProgramEvent Readers
* @param lifecycleService to publish start programs to TMS
* @param lifecycleService to publish start programs to TMS
*/
@Inject
StartProgramEventSubscriber(CConfiguration cConf, EventReaderProvider<StartProgramEvent> extensionProvider,
Expand All @@ -76,28 +76,28 @@ public class StartProgramEventSubscriber extends EventSubscriber {
public void initialize() {
readers = new HashSet<>(extensionProvider.loadEventReaders().values());
Iterator<EventReader<StartProgramEvent>> eventReaderIterator
= readers.iterator();
= readers.iterator();
while (eventReaderIterator.hasNext()) {
EventReader<StartProgramEvent> reader = eventReaderIterator.next();
try {
reader.initialize(
new DefaultEventReaderContext(String.format("%s.%s.",
Constants.Event.START_EVENT_PREFIX, reader.getId()), cConf));
new DefaultEventReaderContext(String.format("%s.%s.",
Constants.Event.START_EVENT_PREFIX, reader.getId()), cConf));
} catch (Exception e) {
LOG.error("Failed to initialize reader: {}", reader.getId(), e);
eventReaderIterator.remove();
}
}
if (!readers.isEmpty()) {
threadPoolExecutor = new ThreadPoolExecutor(readers.size(), readers.size(), 60,
TimeUnit.SECONDS, new LinkedBlockingQueue<>());
TimeUnit.SECONDS, new LinkedBlockingQueue<>());
}
}

@Override
protected Scheduler scheduler() {
return Scheduler.newFixedDelaySchedule(0,
cConf.getInt(Constants.Event.START_PROGRAM_EVENT_READER_POLL_DELAY), TimeUnit.SECONDS);
cConf.getInt(Constants.Event.START_PROGRAM_EVENT_READER_POLL_DELAY), TimeUnit.SECONDS);
}

@Override
Expand Down Expand Up @@ -125,18 +125,18 @@ void processEvents(EventReader<StartProgramEvent> reader) {
* @throws RuntimeException if starting program fails
*/
private void startProgram(StartProgramEvent event) {
StartProgramEventDetails eventDetails = event.getEventDetails();
StartProgramEventDetails eventDetails = event.getEventDetails();
try {
ProgramType programType = ProgramType.valueOfCategoryName(eventDetails.getProgramType());
ProgramReference programReference = new ProgramReference(eventDetails.getNamespaceId(),
eventDetails.getAppId(), programType,
eventDetails.getProgramId());
eventDetails.getAppId(), programType,
eventDetails.getProgramId());
LOG.debug("Starting pipeline {}, with args: {}, programReference: {}",
eventDetails.getAppId(), eventDetails.getArgs(), programReference);
eventDetails.getAppId(), eventDetails.getArgs(), programReference);
RunId runId = lifecycleService.run(programReference, eventDetails.getArgs(), true);
ProgramRunId programRunId = new ProgramRunId(
programReference.getNamespace(), programReference.getApplication(), programType,
programReference.getProgram(), runId.getId());
programReference.getNamespace(), programReference.getApplication(), programType,
programReference.getProgram(), runId.getId());
LOG.info("Started pipeline, ProgramRunId: {}", programRunId.toMetadataEntity());
} catch (Exception e) {
throw new RuntimeException(e);
Expand All @@ -146,8 +146,8 @@ private void startProgram(StartProgramEvent event) {
@Override
protected final ScheduledExecutorService executor() {
executor =
Executors.newSingleThreadScheduledExecutor(
Threads.createDaemonThreadFactory("start-program-event-handler-scheduler"));
Executors.newSingleThreadScheduledExecutor(
Threads.createDaemonThreadFactory("start-program-event-subscriber-scheduler"));
return executor;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@

package io.cdap.cdap.internal.events.dummy;

import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Inject;
import io.cdap.cdap.spi.events.Event;
import io.cdap.cdap.spi.events.EventReader;
import io.cdap.cdap.spi.events.EventReaderContext;
Expand All @@ -31,9 +29,10 @@
public class DummyEventReader<T extends Event> implements EventReader<T> {

private static final Logger logger = LoggerFactory.getLogger(DummyEventReader.class);
private final Collection<T> messages;

@Inject
public DummyEventReader() {
public DummyEventReader(Collection<T> messages) {
this.messages = messages;
}

@Override
Expand All @@ -49,7 +48,7 @@ public void initialize(EventReaderContext eventReaderContext) {
@Override
public EventResult<T> pull(int maxMessages) {
ArrayList<T> sentMessages = new ArrayList<>();
Iterator<T> it = getMessages().iterator();
Iterator<T> it = messages.iterator();
int i = 0;
while (i < maxMessages && it.hasNext()) {
sentMessages.add(it.next());
Expand All @@ -58,11 +57,6 @@ public EventResult<T> pull(int maxMessages) {
return new DummyEventResult(sentMessages);
}

@VisibleForTesting
public Collection<T> getMessages() {
return new ArrayList<>();
}

@Override
public void close() throws Exception {
logger.info("Closing dummy reader");
Expand All @@ -87,6 +81,7 @@ public void consumeEvents(Consumer<T> consumer) {
}

@Override
public void close() throws Exception {}
public void close() throws Exception {
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.cdap.cdap.common.conf.Constants;
import io.cdap.cdap.internal.events.dummy.DummyEventReader;
import io.cdap.cdap.spi.events.EventReader;
import java.util.ArrayList;
import java.util.Set;
import org.junit.Assert;
import org.junit.Test;
Expand All @@ -32,18 +33,18 @@ public class StartProgramEventReaderExtensionProviderTest {

@Test
public void testEnabledEventReaderFilter() {
EventReader mockReader = new DummyEventReader();
EventReader mockReader = new DummyEventReader(new ArrayList());
CConfiguration cConf = CConfiguration.create();

StartProgramEventReaderExtensionProvider readerExtensionProvider1
= new StartProgramEventReaderExtensionProvider(cConf);
= new StartProgramEventReaderExtensionProvider(cConf);
Set<String> test1 = readerExtensionProvider1.getSupportedTypesForProvider(mockReader);
Assert.assertTrue(test1.isEmpty());

//Test with reader class name enabled
cConf.setStrings(Constants.Event.START_EVENTS_READER_EXTENSIONS_ENABLED_LIST, mockReader.getId());
StartProgramEventReaderExtensionProvider readerExtensionProvider2
= new StartProgramEventReaderExtensionProvider(cConf);
= new StartProgramEventReaderExtensionProvider(cConf);
Set<String> test2 = readerExtensionProvider2.getSupportedTypesForProvider(mockReader);
Assert.assertTrue(test2.contains(mockReader.getId()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,12 @@
import io.cdap.cdap.internal.events.dummy.DummyEventReader;
import io.cdap.cdap.internal.events.dummy.DummyEventReaderExtensionProvider;
import io.cdap.cdap.proto.id.ProgramReference;
import io.cdap.cdap.spi.events.EventReader;
import io.cdap.cdap.spi.events.StartProgramEvent;
import io.cdap.cdap.spi.events.StartProgramEventDetails;
import java.util.ArrayList;
import java.util.Collection;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -49,23 +46,22 @@
*/
public class StartProgramEventSubscriberTest extends AppFabricTestBase {
private static final Logger LOG = LoggerFactory.getLogger(StartProgramEventSubscriberTest.class);

private StartProgramEventSubscriber eventHandler;
private ProgramLifecycleService lifecycleService = Mockito.mock(ProgramLifecycleService.class);
private DummyEventReader<StartProgramEvent> eventReader = Mockito.mock(DummyEventReader.class);
private CConfiguration cConf = Mockito.mock(CConfiguration.class);
private final ProgramLifecycleService lifecycleService = Mockito.mock(ProgramLifecycleService.class);
private final DummyEventReader<StartProgramEvent> eventReader = new DummyEventReader<>(mockedEvents());
private final CConfiguration cConf = Mockito.mock(CConfiguration.class);
private final Injector injector = Guice.createInjector(new AbstractModule() {
@Override
protected void configure() {
bind(ProgramLifecycleService.class).toInstance(lifecycleService);
bind(CConfiguration.class).toInstance(cConf);
cConf.setInt(Constants.Event.START_PROGRAM_EVENT_FETCH_SIZE, 1);
bind(EventReader.class).toInstance(eventReader);
bind(new TypeLiteral<EventReaderProvider<StartProgramEvent>>() {})
.toInstance(new DummyEventReaderExtensionProvider<StartProgramEvent>(eventReader));
bind(new TypeLiteral<EventReaderProvider<StartProgramEvent>>() {
})
.toInstance(new DummyEventReaderExtensionProvider<StartProgramEvent>(eventReader));
bind(StartProgramEventSubscriber.class).in(Scopes.SINGLETON);
}
});
private StartProgramEventSubscriber eventHandler;

@Before
public void setup() {
Expand All @@ -76,46 +72,25 @@ public void setup() {

@Test
public void testInitialize() {
try {
eventHandler.initialize();
} catch (Exception ex) {
LOG.error("Error during Event Handler initialization.", ex);
Assert.fail("Error while initializing Event Handler");
}
eventHandler.initialize();
}

@Test
public void testMessageWorkflow() throws Exception {
assert (lifecycleService != null);
Mockito.doReturn(RunIds.generate()).when(lifecycleService).run((ProgramReference) any(), any(),
Mockito.anyBoolean());
Mockito.when(eventReader.getMessages()).thenReturn(mockedEvents());

Mockito.doCallRealMethod().when(eventReader).pull(1);
Mockito.doCallRealMethod().when(eventReader).initialize(any());

try {
eventHandler.initialize();
} catch (Exception ex) {
LOG.error("Error during Event Handler initialization.", ex);
Assert.fail("Error while initializing Event Handler");
}

try {
eventHandler.processEvents(eventReader);
} catch (Exception e) {
LOG.error("Error during message process.", e);
Assert.fail("Error during message process");
}
Mockito.anyBoolean());

eventHandler.initialize();
eventHandler.processEvents(eventReader);
Mockito.verify(lifecycleService).run((ProgramReference) any(), any(), Mockito.anyBoolean());

}

private Collection<StartProgramEvent> mockedEvents() {
ArrayList<StartProgramEvent> eventList = new ArrayList<>();
eventList.add(new StartProgramEvent(1, "1",
new StartProgramEventDetails("app1",
new StartProgramEventDetails("app1",
"namespace1", "id1", "workflows", null)));
return eventList;
}
Expand Down

0 comments on commit 155900d

Please sign in to comment.