Skip to content

Commit

Permalink
Cleaning up.
Browse files Browse the repository at this point in the history
  • Loading branch information
sahusanket committed Oct 24, 2024
1 parent 2022069 commit eee5244
Show file tree
Hide file tree
Showing 28 changed files with 136 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@
/**
* AppFabric Service Runtime Module.
*/
public final class AppFabricServiceRuntimeModule extends RuntimeModule {
public final class AppFabricServiceRuntimeModule extends RuntimeModule {

public static final String NOAUTH_ARTIFACT_REPO = "noAuthArtifactRepo";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.inject.Scopes;
import com.google.inject.name.Named;
import com.google.inject.util.Modules;
import io.cdap.cdap.api.auditlogging.AuditLogWriter;
import io.cdap.cdap.api.security.store.SecureStore;
import io.cdap.cdap.app.guice.ProgramRunnerRuntimeModule;
import io.cdap.cdap.common.NotFoundException;
Expand Down Expand Up @@ -54,6 +55,7 @@
import io.cdap.cdap.metadata.MetadataReaderWriterModules;
import io.cdap.cdap.metrics.guice.MetricsClientRuntimeModule;
import io.cdap.cdap.proto.id.ApplicationId;
import io.cdap.cdap.security.auth.MessagingAuditLogWriter;
import io.cdap.cdap.security.auth.context.AuthenticationContextModules;
import io.cdap.cdap.security.guice.CoreSecurityRuntimeModule;
import io.cdap.cdap.security.guice.preview.PreviewSecureStoreModule;
Expand Down Expand Up @@ -207,6 +209,7 @@ protected void configure() {
new AbstractModule() {
@Override
protected void configure() {
bind(AuditLogWriter.class).to(MessagingAuditLogWriter.class).in(Scopes.SINGLETON);
}

@Provides
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,6 @@ protected void configure() {
modules.add(new RemoteLogAppenderModule());
modules.add(new LocalLocationModule());


if (coreSecurityModule.requiresZKClient()) {
modules.add(new ZkClientModule());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
* Copyright © 2024 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.security.auth;

import com.google.common.collect.ImmutableMap;
Expand All @@ -24,24 +40,26 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;

/**
* This class subscribes to the audit log messaging topic, reads and processes the messages into {@link AuditLogContext}
* and delegates the batch of AuditLogContexts to External Auth service using {@link AuditLoggerSpi}, which would
* further publish as configured.
*/
public class AuditLogSubscriberService extends AbstractMessagingSubscriberService<AuditLogContext> {


private static final Logger LOG = LoggerFactory.getLogger(AuditLogSubscriberService.class);
private static final Gson GSON = new Gson();

private final MultiThreadMessagingContext messagingContext;
private final TransactionRunner transactionRunner;
private final AccessControllerInstantiator accessControllerInstantiator;


@Inject
AuditLogSubscriberService(CConfiguration cConf, MessagingService messagingService,
MetricsCollectionService metricsCollectionService,
Expand Down Expand Up @@ -77,11 +95,6 @@ protected TransactionRunner getTransactionRunner() {
/**
* Loads last persisted message id. This method will be called from a transaction. The returned
* message id will be used as the starting message id (exclusive) for the first fetch.
*
* @param context the {@link StructuredTableContext} for getting dataset instances.
* @return the last persisted message id or {@code null} to have first fetch starts from the first
* available message in the topic.
* @throws Exception if failed to load the message id
*/
@Nullable
@Override
Expand All @@ -94,12 +107,6 @@ protected String loadMessageId(StructuredTableContext context) throws Exception
/**
* Persists the given message id. This method will be called from a transaction, which is the same
* transaction for the call to {@link #processMessages(StructuredTableContext, Iterator)}.
*
* @param context the {@link StructuredTableContext} for getting dataset instances
* @param messageId the message id that the
* {@link #processMessages(StructuredTableContext, Iterator)} has been processed up to.
* @throws Exception if failed to persist the message id
* @see #processMessages(StructuredTableContext, Iterator)
*/
@Override
protected void storeMessageId(StructuredTableContext context, String messageId) throws Exception {
Expand All @@ -113,16 +120,11 @@ protected void storeMessageId(StructuredTableContext context, String messageId)
* the {@link #storeMessageId(StructuredTableContext, String)} call. If {@link Exception} is
* raised from this method, the messages as provided through the {@code messages} parameter will
* be replayed in the next call.
*
* @throws Exception if failed to process the messages
* @see #storeMessageId(StructuredTableContext, String)
*/
@Override
protected void processMessages(StructuredTableContext structuredTableContext,
Iterator<ImmutablePair<String, AuditLogContext>> messages) throws Exception {

LOG.warn("SANKET_TEST : processMessages ");

Queue<AuditLogContext> auditLogContextQueue = new LinkedBlockingDeque<>();

int count = 0 ;
Expand All @@ -136,16 +138,16 @@ protected void processMessages(StructuredTableContext structuredTableContext,
}

if (!auditLogContextQueue.isEmpty()) {
LOG.warn("SANKET_TEST : processMessages {}", auditLogContextQueue.size());
LOG.debug("Publishing a queue of Audit Log events of size {} events.", auditLogContextQueue.size());
AuditLoggerSpi.PublishStatus publishStatus =
this.accessControllerInstantiator.get().publish(auditLogContextQueue);

// TODO : This logic can change based on how Auth Ext publishes a batch.
if (publishStatus.equals(AuditLoggerSpi.PublishStatus.UNSUCCESSFUL)) {
throw new IOException();
throw new Exception("The publishing of audit log events Failed.");
}
}

LOG.warn("SANKET_TEST : processedMessages for {} msgs" , count);
LOG.trace("Publishing a queue of Audit Log events of size {} events is successful.", auditLogContextQueue.size());
}

/**
Expand All @@ -157,12 +159,7 @@ protected MessagingContext getMessagingContext() {
}

/**
* Decodes the raw {@link Message} into an object of type {@code T}.
*
* @param message the {@link Message} to decode
* @return an object of type {@code T}
* @throws Exception if the decode failed and the given message will be skipped for
* processing
* Decodes the raw {@link Message} into an object of type {@link AuditLogContext}.
*/
@Override
protected AuditLogContext decodeMessage(Message message) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
* Copyright © 2024 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.security.auth;

import com.google.common.base.Throwables;
Expand All @@ -15,7 +31,6 @@
import io.cdap.cdap.messaging.DefaultTopicMetadata;
import io.cdap.cdap.messaging.client.StoreRequestBuilder;
import io.cdap.cdap.messaging.spi.MessagingService;
import io.cdap.cdap.messaging.spi.StoreRequest;
import io.cdap.cdap.proto.id.NamespaceId;
import io.cdap.cdap.proto.id.TopicId;
import io.cdap.cdap.security.spi.authorization.AuditLogContext;
Expand All @@ -27,7 +42,11 @@
import java.util.Queue;
import java.util.concurrent.TimeUnit;

public class MessagingAuditLogWriter implements AuditLogWriter {
/**
* This class receives a collection of {@link AuditLogContext} and writes them in order to a
* messaging service / topic ( ex - tms )
*/
public class MessagingAuditLogWriter implements AuditLogWriter {

private static final Logger LOG = LoggerFactory.getLogger(MessagingAuditLogWriter.class);
private static final Gson GSON = new Gson();
Expand Down Expand Up @@ -67,7 +86,6 @@ public void publish(Queue<AuditLogContext> auditLogContexts) throws IOException
messagingService.publish(StoreRequestBuilder.of(topic)
.addPayload(GSON.toJson(auditLogContexts.remove()))
.build());
LOG.warn("SANKET_TEST : Published audit log to TMS ");
}
done = true;
} catch (IOException | AccessException e) {
Expand Down Expand Up @@ -105,7 +123,7 @@ public void publish(Queue<AuditLogContext> auditLogContexts) throws IOException
private void createTopicIfNeeded() throws IOException {
try {
messagingService.createTopic(new DefaultTopicMetadata(topic, Collections.emptyMap()));
LOG.warn("Created topic {}", topic.getTopic());
LOG.info("Created topic {}", topic.getTopic());
} catch (TopicAlreadyExistsException ex) {
// no-op
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import io.cdap.cdap.proto.security.StandardPermission;
import io.cdap.cdap.security.auth.context.MasterAuthenticationContext;
import io.cdap.cdap.security.authorization.AuthorizationContextFactory;
//import io.cdap.cdap.security.authorization.InMemoryAccessController;
import io.cdap.cdap.security.authorization.InMemoryPermissionManager;
import io.cdap.cdap.security.authorization.InMemoryRoleController;
import io.cdap.cdap.security.authorization.NoOpAuthorizationContextFactory;
Expand Down Expand Up @@ -85,7 +84,8 @@ public void setUp() throws Exception {
final InMemoryPermissionManager auth = new InMemoryPermissionManager();
final InMemoryRoleController inMemoryRoleController = new InMemoryRoleController();
// auth.initialize(FACTORY.create(properties)); //Will be used on migration to SPI implementation
service = new CommonNettyHttpServiceBuilder(conf, getClass().getSimpleName(), new NoOpMetricsCollectionService())
service = new CommonNettyHttpServiceBuilder(conf, getClass().getSimpleName(), new NoOpMetricsCollectionService(),
auditLogContexts -> {})
.setHttpHandlers(new AuthorizationHandler(auth, conf, new MasterAuthenticationContext(), inMemoryRoleController))
.setChannelPipelineModifier(new ChannelPipelineModifier() {
@Override
Expand Down Expand Up @@ -134,7 +134,8 @@ private void testDisabled(CConfiguration cConf, FeatureDisabledException.Feature
final InMemoryPermissionManager accessController = new InMemoryPermissionManager();
final InMemoryRoleController inMemoryRoleController = new InMemoryRoleController();
NettyHttpService service = new CommonNettyHttpServiceBuilder(cConf, getClass().getSimpleName(),
new NoOpMetricsCollectionService())
new NoOpMetricsCollectionService(),
auditLogContexts -> {})
.setHttpHandlers(new AuthorizationHandler(
accessController, cConf, new MasterAuthenticationContext(), inMemoryRoleController))
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ public static void init() throws Exception {
InMemoryDiscoveryService discoveryService = new InMemoryDiscoveryService();
remoteClientFactory = new RemoteClientFactory(discoveryService,
new DefaultInternalAuthenticator(new AuthenticationTestContext()));
httpService = new CommonNettyHttpServiceBuilder(cConf, "test", new NoOpMetricsCollectionService())
httpService = new CommonNettyHttpServiceBuilder(cConf, "test", new NoOpMetricsCollectionService(),
auditLogContexts -> {})
.setHttpHandlers(
new TaskWorkerHttpHandlerInternal(cConf, discoveryService, discoveryService, className -> { },
new NoOpMetricsCollectionService()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ public static void setup() throws Exception {
namespaceAdmin.create(testNameSpace);
remoteClientFactory = new RemoteClientFactory(discoveryService,
new DefaultInternalAuthenticator(new AuthenticationTestContext()));
httpService = new CommonNettyHttpServiceBuilder(cConf, "appfabric", new NoOpMetricsCollectionService())
httpService = new CommonNettyHttpServiceBuilder(cConf, "appfabric", new NoOpMetricsCollectionService(),
auditLogContexts -> {})
.setHttpHandlers(new AppStateHandler(applicationLifecycleService, namespaceAdmin)).build();
httpService.start();
cancellable = discoveryService
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ public static void teardown() throws Exception {
@Before
public void setUp() throws Exception {
NettyHttpService service = new CommonNettyHttpServiceBuilder(CConfiguration.create(), getClass().getSimpleName(),
new NoOpMetricsCollectionService())
new NoOpMetricsCollectionService(),
auditLogContexts -> {})
.setHttpHandlers(new AppStateHandler(applicationLifecycleService, namespaceAdmin))
.build();
service.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ protected void publish(Iterator<MetricValues> metrics) {
InMemoryDiscoveryService discoveryService = new InMemoryDiscoveryService();
taskWorkerService = new TaskWorkerService(cConf, sConf, discoveryService, discoveryService,
mockMetricsCollector,
new CommonNettyHttpServiceFactory(cConf, mockMetricsCollector));
new CommonNettyHttpServiceFactory(cConf, mockMetricsCollector,
auditLogContexts -> {}));
taskWorkerStateFuture = TaskWorkerTestUtil.getServiceCompletionFuture(taskWorkerService);
// start the service
taskWorkerService.startAndWait();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public void beforeTest() {
TaskWorkerService taskWorkerService = new TaskWorkerService(
cConf, sConf, discoveryService, discoveryService,
metricsCollectionService,
new CommonNettyHttpServiceFactory(cConf, metricsCollectionService));
new CommonNettyHttpServiceFactory(cConf, metricsCollectionService, auditLogContexts -> {}));
serviceCompletionFuture = TaskWorkerTestUtil.getServiceCompletionFuture(
taskWorkerService);
// start the service
Expand All @@ -123,7 +123,7 @@ public void testPeriodicRestart() {
TaskWorkerService taskWorkerService = new TaskWorkerService(
cConf, sConf, discoveryService, discoveryService,
metricsCollectionService,
new CommonNettyHttpServiceFactory(cConf, metricsCollectionService));
new CommonNettyHttpServiceFactory(cConf, metricsCollectionService, auditLogContexts -> {}));
serviceCompletionFuture = TaskWorkerTestUtil.getServiceCompletionFuture(
taskWorkerService);
// start the service
Expand All @@ -144,7 +144,7 @@ public void testPeriodicRestartWithInflightRequest() throws IOException {
TaskWorkerService taskWorkerService = new TaskWorkerService(
cConf, sConf, discoveryService, discoveryService,
metricsCollectionService,
new CommonNettyHttpServiceFactory(cConf, metricsCollectionService));
new CommonNettyHttpServiceFactory(cConf, metricsCollectionService, auditLogContexts -> {}));
serviceCompletionFuture = TaskWorkerTestUtil.getServiceCompletionFuture(
taskWorkerService);
// start the service
Expand Down Expand Up @@ -187,7 +187,7 @@ public void testPeriodicRestartWithNeverEndingInflightRequest() {
discoveryService,
discoveryService,
metricsCollectionService,
new CommonNettyHttpServiceFactory(cConf, metricsCollectionService));
new CommonNettyHttpServiceFactory(cConf, metricsCollectionService, auditLogContexts -> {}));
serviceCompletionFuture = TaskWorkerTestUtil.getServiceCompletionFuture(
taskWorkerService);
// start the service
Expand Down Expand Up @@ -232,7 +232,7 @@ public void testRestartAfterMultipleExecutions() throws IOException {
TaskWorkerService taskWorkerService = new TaskWorkerService(
cConf, sConf, discoveryService, discoveryService,
metricsCollectionService,
new CommonNettyHttpServiceFactory(cConf, metricsCollectionService));
new CommonNettyHttpServiceFactory(cConf, metricsCollectionService, auditLogContexts -> {}));
serviceCompletionFuture = TaskWorkerTestUtil.getServiceCompletionFuture(
taskWorkerService);
// start the service
Expand Down Expand Up @@ -361,7 +361,7 @@ public void testConcurrentRequestsWithIsolationDisabled() throws Exception {
TaskWorkerService taskWorkerService = new TaskWorkerService(cConf,
createSConf(), discoveryService, discoveryService,
metricsCollectionService,
new CommonNettyHttpServiceFactory(cConf, metricsCollectionService));
new CommonNettyHttpServiceFactory(cConf, metricsCollectionService, auditLogContexts -> {}));
taskWorkerService.startAndWait();
InetSocketAddress addr = taskWorkerService.getBindAddress();
URI uri = URI.create(
Expand Down Expand Up @@ -422,7 +422,7 @@ public void testRestartWithConcurrentRequests() throws Exception {
TaskWorkerService taskWorkerService = new TaskWorkerService(cConf,
createSConf(), discoveryService, discoveryService,
metricsCollectionService,
new CommonNettyHttpServiceFactory(cConf, metricsCollectionService));
new CommonNettyHttpServiceFactory(cConf, metricsCollectionService, auditLogContexts -> {}));
serviceCompletionFuture = TaskWorkerTestUtil.getServiceCompletionFuture(
taskWorkerService);
taskWorkerService.startAndWait();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ private ArtifactLocalizerService setupArtifactLocalizerService(CConfiguration cC
ArtifactLocalizerService artifactLocalizerService = new ArtifactLocalizerService(
cConf, new ArtifactLocalizer(cConf, remoteClientFactory, (namespaceId, retryStrategy) -> {
return new NoOpArtifactManager();
}), new CommonNettyHttpServiceFactory(cConf, new NoOpMetricsCollectionService()),
}), new CommonNettyHttpServiceFactory(cConf, new NoOpMetricsCollectionService(), auditLogContexts -> {}),
remoteClientFactory, new NoOpRemoteAuthenticator());
// start the service
artifactLocalizerService.startAndWait();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public static void init() throws Exception {

RemoteClientFactory remoteClientFactory = Mockito.mock(RemoteClientFactory.class);
httpService = new CommonNettyHttpServiceBuilder(cConf, "test",
new NoOpMetricsCollectionService())
new NoOpMetricsCollectionService(), auditLogContexts -> {})
.setHttpHandlers(
new GcpMetadataHttpHandlerInternal(cConf, remoteClientFactory,
new NoOpRemoteAuthenticator())
Expand Down
Loading

0 comments on commit eee5244

Please sign in to comment.