diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/app/guice/AppFabricServiceRuntimeModule.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/app/guice/AppFabricServiceRuntimeModule.java index 972c11d68b6b..24c028a0471d 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/app/guice/AppFabricServiceRuntimeModule.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/app/guice/AppFabricServiceRuntimeModule.java @@ -181,7 +181,7 @@ /** * AppFabric Service Runtime Module. */ -public final class AppFabricServiceRuntimeModule extends RuntimeModule { +public final class AppFabricServiceRuntimeModule extends RuntimeModule { public static final String NOAUTH_ARTIFACT_REPO = "noAuthArtifactRepo"; diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/app/preview/DefaultPreviewRunnerManager.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/app/preview/DefaultPreviewRunnerManager.java index d30b83c195b0..87d2f2ada6c1 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/app/preview/DefaultPreviewRunnerManager.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/app/preview/DefaultPreviewRunnerManager.java @@ -27,6 +27,7 @@ import com.google.inject.Scopes; import com.google.inject.name.Named; import com.google.inject.util.Modules; +import io.cdap.cdap.api.auditlogging.AuditLogWriter; import io.cdap.cdap.api.security.store.SecureStore; import io.cdap.cdap.app.guice.ProgramRunnerRuntimeModule; import io.cdap.cdap.common.NotFoundException; @@ -54,6 +55,7 @@ import io.cdap.cdap.metadata.MetadataReaderWriterModules; import io.cdap.cdap.metrics.guice.MetricsClientRuntimeModule; import io.cdap.cdap.proto.id.ApplicationId; +import io.cdap.cdap.security.auth.MessagingAuditLogWriter; import io.cdap.cdap.security.auth.context.AuthenticationContextModules; import io.cdap.cdap.security.guice.CoreSecurityRuntimeModule; import io.cdap.cdap.security.guice.preview.PreviewSecureStoreModule; @@ -207,6 +209,7 @@ protected void configure() { new AbstractModule() { @Override protected void configure() { + bind(AuditLogWriter.class).to(MessagingAuditLogWriter.class).in(Scopes.SINGLETON); } @Provides diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/worker/sidecar/ArtifactLocalizerTwillRunnable.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/worker/sidecar/ArtifactLocalizerTwillRunnable.java index 8532499c1559..0568c24ef8e3 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/worker/sidecar/ArtifactLocalizerTwillRunnable.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/worker/sidecar/ArtifactLocalizerTwillRunnable.java @@ -142,7 +142,6 @@ protected void configure() { modules.add(new RemoteLogAppenderModule()); modules.add(new LocalLocationModule()); - if (coreSecurityModule.requiresZKClient()) { modules.add(new ZkClientModule()); } diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/security/auth/AuditLogSubscriberService.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/security/auth/AuditLogSubscriberService.java index 4f0e9270e6e8..1a78a7fb8aa5 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/security/auth/AuditLogSubscriberService.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/security/auth/AuditLogSubscriberService.java @@ -1,3 +1,19 @@ +/* + * Copyright © 2024 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + package io.cdap.cdap.security.auth; import com.google.common.collect.ImmutableMap; @@ -24,16 +40,19 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.util.Iterator; import java.util.Queue; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; +/** + * This class subscribes to the audit log messaging topic, reads and processes the messages into {@link AuditLogContext} + * and delegates the batch of AuditLogContexts to External Auth service using {@link AuditLoggerSpi}, which would + * further publish as configured. + */ public class AuditLogSubscriberService extends AbstractMessagingSubscriberService { - private static final Logger LOG = LoggerFactory.getLogger(AuditLogSubscriberService.class); private static final Gson GSON = new Gson(); @@ -41,7 +60,6 @@ public class AuditLogSubscriberService extends AbstractMessagingSubscriberServic private final TransactionRunner transactionRunner; private final AccessControllerInstantiator accessControllerInstantiator; - @Inject AuditLogSubscriberService(CConfiguration cConf, MessagingService messagingService, MetricsCollectionService metricsCollectionService, @@ -77,11 +95,6 @@ protected TransactionRunner getTransactionRunner() { /** * Loads last persisted message id. This method will be called from a transaction. The returned * message id will be used as the starting message id (exclusive) for the first fetch. - * - * @param context the {@link StructuredTableContext} for getting dataset instances. - * @return the last persisted message id or {@code null} to have first fetch starts from the first - * available message in the topic. - * @throws Exception if failed to load the message id */ @Nullable @Override @@ -94,12 +107,6 @@ protected String loadMessageId(StructuredTableContext context) throws Exception /** * Persists the given message id. This method will be called from a transaction, which is the same * transaction for the call to {@link #processMessages(StructuredTableContext, Iterator)}. - * - * @param context the {@link StructuredTableContext} for getting dataset instances - * @param messageId the message id that the - * {@link #processMessages(StructuredTableContext, Iterator)} has been processed up to. - * @throws Exception if failed to persist the message id - * @see #processMessages(StructuredTableContext, Iterator) */ @Override protected void storeMessageId(StructuredTableContext context, String messageId) throws Exception { @@ -113,16 +120,11 @@ protected void storeMessageId(StructuredTableContext context, String messageId) * the {@link #storeMessageId(StructuredTableContext, String)} call. If {@link Exception} is * raised from this method, the messages as provided through the {@code messages} parameter will * be replayed in the next call. - * - * @throws Exception if failed to process the messages - * @see #storeMessageId(StructuredTableContext, String) */ @Override protected void processMessages(StructuredTableContext structuredTableContext, Iterator> messages) throws Exception { - LOG.warn("SANKET_TEST : processMessages "); - Queue auditLogContextQueue = new LinkedBlockingDeque<>(); int count = 0 ; @@ -136,16 +138,16 @@ protected void processMessages(StructuredTableContext structuredTableContext, } if (!auditLogContextQueue.isEmpty()) { - LOG.warn("SANKET_TEST : processMessages {}", auditLogContextQueue.size()); + LOG.debug("Publishing a queue of Audit Log events of size {} events.", auditLogContextQueue.size()); AuditLoggerSpi.PublishStatus publishStatus = this.accessControllerInstantiator.get().publish(auditLogContextQueue); - + // TODO : This logic can change based on how Auth Ext publishes a batch. if (publishStatus.equals(AuditLoggerSpi.PublishStatus.UNSUCCESSFUL)) { - throw new IOException(); + throw new Exception("The publishing of audit log events Failed."); } } - LOG.warn("SANKET_TEST : processedMessages for {} msgs" , count); + LOG.trace("Publishing a queue of Audit Log events of size {} events is successful.", auditLogContextQueue.size()); } /** @@ -157,12 +159,7 @@ protected MessagingContext getMessagingContext() { } /** - * Decodes the raw {@link Message} into an object of type {@code T}. - * - * @param message the {@link Message} to decode - * @return an object of type {@code T} - * @throws Exception if the decode failed and the given message will be skipped for - * processing + * Decodes the raw {@link Message} into an object of type {@link AuditLogContext}. */ @Override protected AuditLogContext decodeMessage(Message message) throws Exception { diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/security/auth/MessagingAuditLogWriter.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/security/auth/MessagingAuditLogWriter.java index cb77fdff8989..ecaee462827a 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/security/auth/MessagingAuditLogWriter.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/security/auth/MessagingAuditLogWriter.java @@ -1,3 +1,19 @@ +/* + * Copyright © 2024 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + package io.cdap.cdap.security.auth; import com.google.common.base.Throwables; @@ -15,7 +31,6 @@ import io.cdap.cdap.messaging.DefaultTopicMetadata; import io.cdap.cdap.messaging.client.StoreRequestBuilder; import io.cdap.cdap.messaging.spi.MessagingService; -import io.cdap.cdap.messaging.spi.StoreRequest; import io.cdap.cdap.proto.id.NamespaceId; import io.cdap.cdap.proto.id.TopicId; import io.cdap.cdap.security.spi.authorization.AuditLogContext; @@ -27,7 +42,11 @@ import java.util.Queue; import java.util.concurrent.TimeUnit; -public class MessagingAuditLogWriter implements AuditLogWriter { +/** + * This class receives a collection of {@link AuditLogContext} and writes them in order to a + * messaging service / topic ( ex - tms ) + */ +public class MessagingAuditLogWriter implements AuditLogWriter { private static final Logger LOG = LoggerFactory.getLogger(MessagingAuditLogWriter.class); private static final Gson GSON = new Gson(); @@ -67,7 +86,6 @@ public void publish(Queue auditLogContexts) throws IOException messagingService.publish(StoreRequestBuilder.of(topic) .addPayload(GSON.toJson(auditLogContexts.remove())) .build()); - LOG.warn("SANKET_TEST : Published audit log to TMS "); } done = true; } catch (IOException | AccessException e) { @@ -105,7 +123,7 @@ public void publish(Queue auditLogContexts) throws IOException private void createTopicIfNeeded() throws IOException { try { messagingService.createTopic(new DefaultTopicMetadata(topic, Collections.emptyMap())); - LOG.warn("Created topic {}", topic.getTopic()); + LOG.info("Created topic {}", topic.getTopic()); } catch (TopicAlreadyExistsException ex) { // no-op } diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/gateway/handlers/AuthorizationHandlerTest.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/gateway/handlers/AuthorizationHandlerTest.java index 6ffe643c4587..354b6dea7f9c 100644 --- a/cdap-app-fabric/src/test/java/io/cdap/cdap/gateway/handlers/AuthorizationHandlerTest.java +++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/gateway/handlers/AuthorizationHandlerTest.java @@ -40,7 +40,6 @@ import io.cdap.cdap.proto.security.StandardPermission; import io.cdap.cdap.security.auth.context.MasterAuthenticationContext; import io.cdap.cdap.security.authorization.AuthorizationContextFactory; -//import io.cdap.cdap.security.authorization.InMemoryAccessController; import io.cdap.cdap.security.authorization.InMemoryPermissionManager; import io.cdap.cdap.security.authorization.InMemoryRoleController; import io.cdap.cdap.security.authorization.NoOpAuthorizationContextFactory; @@ -85,7 +84,8 @@ public void setUp() throws Exception { final InMemoryPermissionManager auth = new InMemoryPermissionManager(); final InMemoryRoleController inMemoryRoleController = new InMemoryRoleController(); // auth.initialize(FACTORY.create(properties)); //Will be used on migration to SPI implementation - service = new CommonNettyHttpServiceBuilder(conf, getClass().getSimpleName(), new NoOpMetricsCollectionService()) + service = new CommonNettyHttpServiceBuilder(conf, getClass().getSimpleName(), new NoOpMetricsCollectionService(), + auditLogContexts -> {}) .setHttpHandlers(new AuthorizationHandler(auth, conf, new MasterAuthenticationContext(), inMemoryRoleController)) .setChannelPipelineModifier(new ChannelPipelineModifier() { @Override @@ -134,7 +134,8 @@ private void testDisabled(CConfiguration cConf, FeatureDisabledException.Feature final InMemoryPermissionManager accessController = new InMemoryPermissionManager(); final InMemoryRoleController inMemoryRoleController = new InMemoryRoleController(); NettyHttpService service = new CommonNettyHttpServiceBuilder(cConf, getClass().getSimpleName(), - new NoOpMetricsCollectionService()) + new NoOpMetricsCollectionService(), + auditLogContexts -> {}) .setHttpHandlers(new AuthorizationHandler( accessController, cConf, new MasterAuthenticationContext(), inMemoryRoleController)) .build(); diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/deploy/RemoteConfiguratorTest.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/deploy/RemoteConfiguratorTest.java index 4e93db8b031b..f465ac4de30f 100644 --- a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/deploy/RemoteConfiguratorTest.java +++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/deploy/RemoteConfiguratorTest.java @@ -118,7 +118,8 @@ public static void init() throws Exception { InMemoryDiscoveryService discoveryService = new InMemoryDiscoveryService(); remoteClientFactory = new RemoteClientFactory(discoveryService, new DefaultInternalAuthenticator(new AuthenticationTestContext())); - httpService = new CommonNettyHttpServiceBuilder(cConf, "test", new NoOpMetricsCollectionService()) + httpService = new CommonNettyHttpServiceBuilder(cConf, "test", new NoOpMetricsCollectionService(), + auditLogContexts -> {}) .setHttpHandlers( new TaskWorkerHttpHandlerInternal(cConf, discoveryService, discoveryService, className -> { }, new NoOpMetricsCollectionService()), diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/runtime/RemoteAppStateStoreTest.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/runtime/RemoteAppStateStoreTest.java index 111e6a7b395d..8085c0b07604 100644 --- a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/runtime/RemoteAppStateStoreTest.java +++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/runtime/RemoteAppStateStoreTest.java @@ -91,7 +91,8 @@ public static void setup() throws Exception { namespaceAdmin.create(testNameSpace); remoteClientFactory = new RemoteClientFactory(discoveryService, new DefaultInternalAuthenticator(new AuthenticationTestContext())); - httpService = new CommonNettyHttpServiceBuilder(cConf, "appfabric", new NoOpMetricsCollectionService()) + httpService = new CommonNettyHttpServiceBuilder(cConf, "appfabric", new NoOpMetricsCollectionService(), + auditLogContexts -> {}) .setHttpHandlers(new AppStateHandler(applicationLifecycleService, namespaceAdmin)).build(); httpService.start(); cancellable = discoveryService diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/store/state/AppStateHandlerTest.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/store/state/AppStateHandlerTest.java index 76be4472ad7a..ead87c786416 100644 --- a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/store/state/AppStateHandlerTest.java +++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/store/state/AppStateHandlerTest.java @@ -110,7 +110,8 @@ public static void teardown() throws Exception { @Before public void setUp() throws Exception { NettyHttpService service = new CommonNettyHttpServiceBuilder(CConfiguration.create(), getClass().getSimpleName(), - new NoOpMetricsCollectionService()) + new NoOpMetricsCollectionService(), + auditLogContexts -> {}) .setHttpHandlers(new AppStateHandler(applicationLifecycleService, namespaceAdmin)) .build(); service.start(); diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/worker/TaskWorkerMetricsTest.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/worker/TaskWorkerMetricsTest.java index 53f40545da4b..a1c4eedfda42 100644 --- a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/worker/TaskWorkerMetricsTest.java +++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/worker/TaskWorkerMetricsTest.java @@ -86,7 +86,8 @@ protected void publish(Iterator metrics) { InMemoryDiscoveryService discoveryService = new InMemoryDiscoveryService(); taskWorkerService = new TaskWorkerService(cConf, sConf, discoveryService, discoveryService, mockMetricsCollector, - new CommonNettyHttpServiceFactory(cConf, mockMetricsCollector)); + new CommonNettyHttpServiceFactory(cConf, mockMetricsCollector, + auditLogContexts -> {})); taskWorkerStateFuture = TaskWorkerTestUtil.getServiceCompletionFuture(taskWorkerService); // start the service taskWorkerService.startAndWait(); diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/worker/TaskWorkerServiceTest.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/worker/TaskWorkerServiceTest.java index 5ab6d604b40a..0f757e5e4ea7 100644 --- a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/worker/TaskWorkerServiceTest.java +++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/worker/TaskWorkerServiceTest.java @@ -96,7 +96,7 @@ public void beforeTest() { TaskWorkerService taskWorkerService = new TaskWorkerService( cConf, sConf, discoveryService, discoveryService, metricsCollectionService, - new CommonNettyHttpServiceFactory(cConf, metricsCollectionService)); + new CommonNettyHttpServiceFactory(cConf, metricsCollectionService, auditLogContexts -> {})); serviceCompletionFuture = TaskWorkerTestUtil.getServiceCompletionFuture( taskWorkerService); // start the service @@ -123,7 +123,7 @@ public void testPeriodicRestart() { TaskWorkerService taskWorkerService = new TaskWorkerService( cConf, sConf, discoveryService, discoveryService, metricsCollectionService, - new CommonNettyHttpServiceFactory(cConf, metricsCollectionService)); + new CommonNettyHttpServiceFactory(cConf, metricsCollectionService, auditLogContexts -> {})); serviceCompletionFuture = TaskWorkerTestUtil.getServiceCompletionFuture( taskWorkerService); // start the service @@ -144,7 +144,7 @@ public void testPeriodicRestartWithInflightRequest() throws IOException { TaskWorkerService taskWorkerService = new TaskWorkerService( cConf, sConf, discoveryService, discoveryService, metricsCollectionService, - new CommonNettyHttpServiceFactory(cConf, metricsCollectionService)); + new CommonNettyHttpServiceFactory(cConf, metricsCollectionService, auditLogContexts -> {})); serviceCompletionFuture = TaskWorkerTestUtil.getServiceCompletionFuture( taskWorkerService); // start the service @@ -187,7 +187,7 @@ public void testPeriodicRestartWithNeverEndingInflightRequest() { discoveryService, discoveryService, metricsCollectionService, - new CommonNettyHttpServiceFactory(cConf, metricsCollectionService)); + new CommonNettyHttpServiceFactory(cConf, metricsCollectionService, auditLogContexts -> {})); serviceCompletionFuture = TaskWorkerTestUtil.getServiceCompletionFuture( taskWorkerService); // start the service @@ -232,7 +232,7 @@ public void testRestartAfterMultipleExecutions() throws IOException { TaskWorkerService taskWorkerService = new TaskWorkerService( cConf, sConf, discoveryService, discoveryService, metricsCollectionService, - new CommonNettyHttpServiceFactory(cConf, metricsCollectionService)); + new CommonNettyHttpServiceFactory(cConf, metricsCollectionService, auditLogContexts -> {})); serviceCompletionFuture = TaskWorkerTestUtil.getServiceCompletionFuture( taskWorkerService); // start the service @@ -361,7 +361,7 @@ public void testConcurrentRequestsWithIsolationDisabled() throws Exception { TaskWorkerService taskWorkerService = new TaskWorkerService(cConf, createSConf(), discoveryService, discoveryService, metricsCollectionService, - new CommonNettyHttpServiceFactory(cConf, metricsCollectionService)); + new CommonNettyHttpServiceFactory(cConf, metricsCollectionService, auditLogContexts -> {})); taskWorkerService.startAndWait(); InetSocketAddress addr = taskWorkerService.getBindAddress(); URI uri = URI.create( @@ -422,7 +422,7 @@ public void testRestartWithConcurrentRequests() throws Exception { TaskWorkerService taskWorkerService = new TaskWorkerService(cConf, createSConf(), discoveryService, discoveryService, metricsCollectionService, - new CommonNettyHttpServiceFactory(cConf, metricsCollectionService)); + new CommonNettyHttpServiceFactory(cConf, metricsCollectionService, auditLogContexts -> {})); serviceCompletionFuture = TaskWorkerTestUtil.getServiceCompletionFuture( taskWorkerService); taskWorkerService.startAndWait(); diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/worker/sidecar/ArtifactLocalizerServiceTest.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/worker/sidecar/ArtifactLocalizerServiceTest.java index ea9f953f718e..7fcd41bcab16 100644 --- a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/worker/sidecar/ArtifactLocalizerServiceTest.java +++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/worker/sidecar/ArtifactLocalizerServiceTest.java @@ -85,7 +85,7 @@ private ArtifactLocalizerService setupArtifactLocalizerService(CConfiguration cC ArtifactLocalizerService artifactLocalizerService = new ArtifactLocalizerService( cConf, new ArtifactLocalizer(cConf, remoteClientFactory, (namespaceId, retryStrategy) -> { return new NoOpArtifactManager(); - }), new CommonNettyHttpServiceFactory(cConf, new NoOpMetricsCollectionService()), + }), new CommonNettyHttpServiceFactory(cConf, new NoOpMetricsCollectionService(), auditLogContexts -> {}), remoteClientFactory, new NoOpRemoteAuthenticator()); // start the service artifactLocalizerService.startAndWait(); diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/worker/sidecar/GcpMetadataHttpHandlerInternalTest.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/worker/sidecar/GcpMetadataHttpHandlerInternalTest.java index 0e7eac4d507c..555e7ba13e14 100644 --- a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/worker/sidecar/GcpMetadataHttpHandlerInternalTest.java +++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/worker/sidecar/GcpMetadataHttpHandlerInternalTest.java @@ -68,7 +68,7 @@ public static void init() throws Exception { RemoteClientFactory remoteClientFactory = Mockito.mock(RemoteClientFactory.class); httpService = new CommonNettyHttpServiceBuilder(cConf, "test", - new NoOpMetricsCollectionService()) + new NoOpMetricsCollectionService(), auditLogContexts -> {}) .setHttpHandlers( new GcpMetadataHttpHandlerInternal(cConf, remoteClientFactory, new NoOpRemoteAuthenticator()) diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/worker/system/SystemWorkerServiceTest.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/worker/system/SystemWorkerServiceTest.java index 7238db9e104a..c5c938d33de5 100644 --- a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/worker/system/SystemWorkerServiceTest.java +++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/worker/system/SystemWorkerServiceTest.java @@ -105,7 +105,7 @@ public void beforeTest() throws IOException { InMemoryDiscoveryService discoveryService = new InMemoryDiscoveryService(); SystemWorkerService service = new SystemWorkerService(cConf, sConf, discoveryService, metricsCollectionService, - new CommonNettyHttpServiceFactory(cConf, metricsCollectionService), + new CommonNettyHttpServiceFactory(cConf, metricsCollectionService, auditLogContexts -> {}), injector.getInstance(TokenManager.class), new NoopTwillRunnerService(), new NoopTwillRunnerService(), getInjector().getInstance(ProvisioningService.class), diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/tethering/ArtifactCacheServiceTest.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/tethering/ArtifactCacheServiceTest.java index 32e95e3f6ab7..e97a279be9e4 100644 --- a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/tethering/ArtifactCacheServiceTest.java +++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/tethering/ArtifactCacheServiceTest.java @@ -79,7 +79,7 @@ public void setUp() throws Exception { DiscoveryService discoveryService = getInjector().getInstance(DiscoveryService.class); artifactCacheService = new ArtifactCacheService( cConf, artifactCache, tetheringStore, null, discoveryService, - new CommonNettyHttpServiceFactory(cConf, new NoOpMetricsCollectionService())); + new CommonNettyHttpServiceFactory(cConf, new NoOpMetricsCollectionService(), auditLogContexts -> {})); artifactCacheService.startAndWait(); getInjector().getInstance(ArtifactRepository.class).clear(NamespaceId.DEFAULT); LocationFactory locationFactory = getInjector().getInstance(LocationFactory.class); diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/tethering/TetheringClientHandlerTest.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/tethering/TetheringClientHandlerTest.java index bf65de227545..c07973e928b9 100644 --- a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/tethering/TetheringClientHandlerTest.java +++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/tethering/TetheringClientHandlerTest.java @@ -213,7 +213,7 @@ public void setUp() throws Exception { CConfiguration conf = CConfiguration.create(); serverHandler = new MockTetheringServerHandler(); serverService = new CommonNettyHttpServiceBuilder(conf, getClass().getSimpleName() + "_server", - new NoOpMetricsCollectionService()) + new NoOpMetricsCollectionService(), auditLogContexts -> {}) .setHttpHandlers(serverHandler).build(); serverService.start(); serverConfig = ClientConfig.builder() @@ -238,7 +238,7 @@ public void setUp() throws Exception { messagingService = injector.getInstance(MessagingService.class); clientService = new CommonNettyHttpServiceBuilder(conf, getClass().getSimpleName() + "_client", - new NoOpMetricsCollectionService()) + new NoOpMetricsCollectionService(), auditLogContexts -> {}) .setHttpHandlers(new TetheringClientHandler(cConf, tetheringStore, contextAccessEnforcer, namespaceAdmin, injector.getInstance(RemoteAuthenticator.class), messagingService), new TetheringHandler(cConf, tetheringStore, messagingService, profileService)) diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/tethering/TetheringServerHandlerTest.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/tethering/TetheringServerHandlerTest.java index 202f02eb9246..95288d102a7f 100644 --- a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/tethering/TetheringServerHandlerTest.java +++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/tethering/TetheringServerHandlerTest.java @@ -212,7 +212,7 @@ public void setUp() throws Exception { new CommonNettyHttpServiceBuilder( CConfiguration.create(), getClass().getSimpleName(), - new NoOpMetricsCollectionService()) + new NoOpMetricsCollectionService(), auditLogContexts -> {}) .setHttpHandlers( new TetheringServerHandler( cConf, diff --git a/cdap-common/src/main/java/io/cdap/cdap/api/auditlogging/AuditLogWriter.java b/cdap-common/src/main/java/io/cdap/cdap/api/auditlogging/AuditLogWriter.java index dc46d0582328..326e709bffbd 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/api/auditlogging/AuditLogWriter.java +++ b/cdap-common/src/main/java/io/cdap/cdap/api/auditlogging/AuditLogWriter.java @@ -24,7 +24,6 @@ /** * An interface to write/ persist a collection of {@link AuditLogContext} to a * messaging service / topic ( ex - tms ) - * */ public interface AuditLogWriter { diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/http/AuthenticationChannelHandler.java b/cdap-common/src/main/java/io/cdap/cdap/common/http/AuthenticationChannelHandler.java index d50c4ebd5651..5cd7aaf1edd2 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/http/AuthenticationChannelHandler.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/http/AuthenticationChannelHandler.java @@ -128,6 +128,11 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception ctx.fireChannelRead(msg); } + /** + * If Audit logging is enabled then it sends the collection of audit events stored in {@link SecurityRequestContext} + * to get stored in a messaging system. + */ + @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { try { if (auditLoggingEnabled && msg instanceof HttpResponse) { diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/http/CommonNettyHttpServiceBuilder.java b/cdap-common/src/main/java/io/cdap/cdap/common/http/CommonNettyHttpServiceBuilder.java index c816294f4fd2..690085f14bff 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/http/CommonNettyHttpServiceBuilder.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/http/CommonNettyHttpServiceBuilder.java @@ -67,12 +67,6 @@ public void modify(ChannelPipeline pipeline) { new MetricsReporterHook(cConf, metricsCollectionService, serviceName))); } - //TODO : Remove , this is for compiling test classes - public CommonNettyHttpServiceBuilder(CConfiguration cConf, String serviceName, - MetricsCollectionService metricsCollectionService) { - this(cConf, serviceName, metricsCollectionService, null); - } - /** * Sets pipeline modifier, preserving the security one installed in constructor. */ diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/http/CommonNettyHttpServiceFactory.java b/cdap-common/src/main/java/io/cdap/cdap/common/http/CommonNettyHttpServiceFactory.java index 137e9ba29f0b..0227386ac724 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/http/CommonNettyHttpServiceFactory.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/http/CommonNettyHttpServiceFactory.java @@ -40,14 +40,6 @@ public CommonNettyHttpServiceFactory(CConfiguration cConf, this.auditLogWriter = auditLogWriter; } - //TODO : remove - public CommonNettyHttpServiceFactory(CConfiguration cConf, - MetricsCollectionService metricsCollectionService) { - this.cConf = cConf; - this.metricsCollectionService = metricsCollectionService; - this.auditLogWriter = null; - } - /** * Creates a {@link CommonNettyHttpServiceBuilder} with serviceName * diff --git a/cdap-common/src/test/java/io/cdap/cdap/common/internal/remote/RemoteTaskExecutorTest.java b/cdap-common/src/test/java/io/cdap/cdap/common/internal/remote/RemoteTaskExecutorTest.java index 28325746931a..d4d022cb8920 100644 --- a/cdap-common/src/test/java/io/cdap/cdap/common/internal/remote/RemoteTaskExecutorTest.java +++ b/cdap-common/src/test/java/io/cdap/cdap/common/internal/remote/RemoteTaskExecutorTest.java @@ -17,6 +17,7 @@ package io.cdap.cdap.common.internal.remote; import com.google.common.util.concurrent.ListenableFuture; +import io.cdap.cdap.api.auditlogging.AuditLogWriter; import io.cdap.cdap.api.metrics.MetricsCollectionService; import io.cdap.cdap.api.metrics.MetricsContext; import io.cdap.cdap.api.service.worker.RemoteExecutionException; @@ -28,14 +29,18 @@ import io.cdap.cdap.common.discovery.URIScheme; import io.cdap.cdap.common.http.CommonNettyHttpServiceBuilder; import io.cdap.cdap.common.metrics.NoOpMetricsCollectionService; +import io.cdap.cdap.security.spi.authorization.AuditLogContext; import io.cdap.http.ChannelPipelineModifier; import io.cdap.http.NettyHttpService; import io.netty.channel.ChannelPipeline; import io.netty.handler.codec.http.HttpContentDecompressor; + +import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Queue; import java.util.concurrent.Executor; import org.apache.twill.common.Cancellable; import org.apache.twill.discovery.InMemoryDiscoveryService; @@ -65,7 +70,8 @@ public static void init() throws Exception { discoveryService = new InMemoryDiscoveryService(); remoteClientFactory = new RemoteClientFactory(discoveryService, new NoOpInternalAuthenticator()); InMemoryDiscoveryService discoveryService = new InMemoryDiscoveryService(); - httpService = new CommonNettyHttpServiceBuilder(cConf, "test", new NoOpMetricsCollectionService()) + httpService = new CommonNettyHttpServiceBuilder(cConf, "test", new NoOpMetricsCollectionService(), + auditLogContexts -> {}) .setHttpHandlers( new TaskWorkerHttpHandlerInternal(cConf, discoveryService, discoveryService, className -> { }, new NoOpMetricsCollectionService()) diff --git a/cdap-security-spi/src/main/java/io/cdap/cdap/security/spi/authorization/AuditLoggerSpi.java b/cdap-security-spi/src/main/java/io/cdap/cdap/security/spi/authorization/AuditLoggerSpi.java index f28b91886f63..e4d61f4340c2 100644 --- a/cdap-security-spi/src/main/java/io/cdap/cdap/security/spi/authorization/AuditLoggerSpi.java +++ b/cdap-security-spi/src/main/java/io/cdap/cdap/security/spi/authorization/AuditLoggerSpi.java @@ -1,3 +1,19 @@ +/* + * Copyright © 2024 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + package io.cdap.cdap.security.spi.authorization; import io.cdap.cdap.api.annotation.Beta; @@ -21,9 +37,9 @@ enum PublishStatus { /** * TODO : THIS IS WIP : Needs to be modified based on how auth extension works. * Specially w.r.t to Retry. - * IF the auth ext is able to publish a batch all together vs needs to publish one by one. + * If the auth ext is able to publish a batch all together vs needs to publish one by one. * @return {@link PublishStatus} */ PublishStatus publish(Queue auditLogContexts); -} \ No newline at end of file +} diff --git a/cdap-security/src/main/java/io/cdap/cdap/security/auth/context/AuthenticationContextModules.java b/cdap-security/src/main/java/io/cdap/cdap/security/auth/context/AuthenticationContextModules.java index c451542c0415..355e56e11a33 100644 --- a/cdap-security/src/main/java/io/cdap/cdap/security/auth/context/AuthenticationContextModules.java +++ b/cdap-security/src/main/java/io/cdap/cdap/security/auth/context/AuthenticationContextModules.java @@ -24,6 +24,7 @@ import com.google.inject.PrivateModule; import com.google.inject.Provider; import com.google.inject.TypeLiteral; +import io.cdap.cdap.api.auditlogging.AuditLogWriter; import io.cdap.cdap.common.conf.CConfiguration; import io.cdap.cdap.common.conf.Constants; import io.cdap.cdap.common.internal.remote.DefaultInternalAuthenticator; @@ -39,6 +40,9 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.Queue; + +import io.cdap.cdap.security.spi.authorization.AuditLogContext; import org.apache.hadoop.security.UserGroupInformation; /** @@ -175,6 +179,7 @@ public Module getNoOpModule() { protected void configure() { bind(AuthenticationContext.class).to(AuthenticationTestContext.class); bind(InternalAuthenticator.class).toProvider(InternalAuthenticatorProvider.class); + bind(AuditLogWriter.class).to(NoOpAuditLogWriter.class); } }; } @@ -231,4 +236,20 @@ public AuthenticationContext get() { return injector.getInstance(MasterAuthenticationContext.class); } } + + /** + * A NO OP implementation for tests + */ + private static final class NoOpAuditLogWriter implements AuditLogWriter { + + /** + * pushes the log entry to respective messaging topic + * + * @param auditLogContexts + */ + @Override + public void publish(Queue auditLogContexts) throws IOException { + + } + } } diff --git a/cdap-security/src/test/java/io/cdap/cdap/security/auth/context/RemoteClientAuthenticatorTest.java b/cdap-security/src/test/java/io/cdap/cdap/security/auth/context/RemoteClientAuthenticatorTest.java index 0fff49b22494..4096e09aceba 100644 --- a/cdap-security/src/test/java/io/cdap/cdap/security/auth/context/RemoteClientAuthenticatorTest.java +++ b/cdap-security/src/test/java/io/cdap/cdap/security/auth/context/RemoteClientAuthenticatorTest.java @@ -83,7 +83,8 @@ protected void configure() { // Setup test HTTP handler and register the service. testHttpHandler = new TestHttpHandler(); - httpService = new CommonNettyHttpServiceBuilder(cConf, TEST_SERVICE, new NoOpMetricsCollectionService()) + httpService = new CommonNettyHttpServiceBuilder(cConf, TEST_SERVICE, new NoOpMetricsCollectionService(), + auditLogContexts -> {}) .setHttpHandlers(testHttpHandler).build(); httpService.start(); discoveryService.register(new Discoverable(TEST_SERVICE, httpService.getBindAddress())); diff --git a/cdap-security/src/test/java/io/cdap/cdap/security/store/SecureStoreTest.java b/cdap-security/src/test/java/io/cdap/cdap/security/store/SecureStoreTest.java index b1b0116ce685..3ea4f11d2ec8 100644 --- a/cdap-security/src/test/java/io/cdap/cdap/security/store/SecureStoreTest.java +++ b/cdap-security/src/test/java/io/cdap/cdap/security/store/SecureStoreTest.java @@ -107,7 +107,7 @@ protected void configure() { injector.getInstance(NamespaceAdmin.class).create(NamespaceMeta.DEFAULT); httpServer = new CommonNettyHttpServiceBuilder(injector.getInstance(CConfiguration.class), "SecureStore", - new NoOpMetricsCollectionService()) + new NoOpMetricsCollectionService(), auditLogContexts -> {}) .setHttpHandlers(Collections.singleton(injector.getInstance(SecureStoreHandler.class))) .build(); httpServer.start(); diff --git a/cdap-source-control/src/test/java/io/cdap/cdap/sourcecontrol/operationrunner/RemoteSourceControlOperationRunnerTest.java b/cdap-source-control/src/test/java/io/cdap/cdap/sourcecontrol/operationrunner/RemoteSourceControlOperationRunnerTest.java index 16555ca795c1..825667cf8cde 100644 --- a/cdap-source-control/src/test/java/io/cdap/cdap/sourcecontrol/operationrunner/RemoteSourceControlOperationRunnerTest.java +++ b/cdap-source-control/src/test/java/io/cdap/cdap/sourcecontrol/operationrunner/RemoteSourceControlOperationRunnerTest.java @@ -136,7 +136,8 @@ public static void init() throws Exception { FileSecureStoreService.CURRENT_CODEC .newInstance()); - httpService = new CommonNettyHttpServiceBuilder(cConf, "test", new NoOpMetricsCollectionService()) + httpService = new CommonNettyHttpServiceBuilder(cConf, "test", new NoOpMetricsCollectionService(), + auditLogContexts -> {}) .setHttpHandlers( new TaskWorkerHttpHandlerInternal(cConf, discoveryService, discoveryService, className -> { }, new NoOpMetricsCollectionService()), diff --git a/cdap-tms/src/test/java/io/cdap/cdap/messaging/distributed/LeaderElectionMessagingServiceTest.java b/cdap-tms/src/test/java/io/cdap/cdap/messaging/distributed/LeaderElectionMessagingServiceTest.java index 2c7245d7919e..c0137a6bc05f 100644 --- a/cdap-tms/src/test/java/io/cdap/cdap/messaging/distributed/LeaderElectionMessagingServiceTest.java +++ b/cdap-tms/src/test/java/io/cdap/cdap/messaging/distributed/LeaderElectionMessagingServiceTest.java @@ -50,6 +50,7 @@ import io.cdap.cdap.messaging.store.leveldb.LevelDBTableFactory; import io.cdap.cdap.proto.id.NamespaceId; import io.cdap.cdap.proto.id.TopicId; +import io.cdap.cdap.security.auth.context.AuthenticationContextModules; import io.cdap.cdap.security.authorization.AuthorizationEnforcementModule; import io.cdap.cdap.security.spi.authorization.UnauthorizedException; import java.io.IOException; @@ -264,6 +265,7 @@ private Injector createInjector(int instanceId) { new ZkDiscoveryModule(), new AuthorizationEnforcementModule().getNoOpModules(), new DFSLocationModule(), + new AuthenticationContextModules().getNoOpModule(), new AbstractModule() { @Override protected void configure() {