Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support datastreams as an AuditLog Sink #4257

Merged
merged 6 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1591,6 +1591,50 @@ public List<Setting<?>> getSettings() {
)
);

// Internal OpenSearch DataStream
settings.add(
Setting.simpleString(
ConfigConstants.SECURITY_AUDIT_CONFIG_DEFAULT_PREFIX + ConfigConstants.SECURITY_AUDIT_OPENSEARCH_DATASTREAM_NAME,
Property.NodeScope,
Property.Filtered
)
);
settings.add(
Setting.boolSetting(
ConfigConstants.SECURITY_AUDIT_CONFIG_DEFAULT_PREFIX
+ ConfigConstants.SECURITY_AUDIT_OPENSEARCH_DATASTREAM_TEMPLATE_MANAGE,
true,
Property.NodeScope,
Property.Filtered
)
);
settings.add(
Setting.simpleString(
ConfigConstants.SECURITY_AUDIT_CONFIG_DEFAULT_PREFIX
+ ConfigConstants.SECURITY_AUDIT_OPENSEARCH_DATASTREAM_TEMPLATE_NAME,
Property.NodeScope,
Property.Filtered
)
);
settings.add(
Setting.intSetting(
ConfigConstants.SECURITY_AUDIT_CONFIG_DEFAULT_PREFIX
+ ConfigConstants.SECURITY_AUDIT_OPENSEARCH_DATASTREAM_TEMPLATE_NUMBER_OF_SHARDS,
1,
Property.NodeScope,
Property.Filtered
)
);
settings.add(
Setting.intSetting(
ConfigConstants.SECURITY_AUDIT_CONFIG_DEFAULT_PREFIX
+ ConfigConstants.SECURITY_AUDIT_OPENSEARCH_DATASTREAM_TEMPLATE_NUMBER_OF_REPLICAS,
0,
Property.NodeScope,
Property.Filtered
)
);

// External OpenSearch
settings.add(
Setting.listSetting(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.security.auditlog.sink;

import java.io.IOException;

import org.opensearch.action.DocWriteRequest;
import org.opensearch.action.index.IndexRequestBuilder;
import org.opensearch.action.support.WriteRequest.RefreshPolicy;
import org.opensearch.client.Client;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.ThreadContext.StoredContext;
import org.opensearch.security.auditlog.impl.AuditMessage;
import org.opensearch.security.support.ConfigConstants;
import org.opensearch.security.support.HeaderHelper;
import org.opensearch.threadpool.ThreadPool;

public abstract class AbstractInternalOpenSearchSink extends AuditLogSink {

protected final Client clientProvider;
private final ThreadPool threadPool;
private final DocWriteRequest.OpType storeOpType;

public AbstractInternalOpenSearchSink(
final String name,
final Settings settings,
final String settingsPrefix,
final Client clientProvider,
ThreadPool threadPool,
AuditLogSink fallbackSink,
DocWriteRequest.OpType storeOpType
) {
super(name, settings, settingsPrefix, fallbackSink);
this.clientProvider = clientProvider;
this.threadPool = threadPool;
this.storeOpType = storeOpType;
}

@Override
public void close() throws IOException {

}

Check warning on line 52 in src/main/java/org/opensearch/security/auditlog/sink/AbstractInternalOpenSearchSink.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/security/auditlog/sink/AbstractInternalOpenSearchSink.java#L52

Added line #L52 was not covered by tests

public boolean doStore(final AuditMessage msg, String indexName) {

if (Boolean.parseBoolean(
HeaderHelper.getSafeFromHeader(threadPool.getThreadContext(), ConfigConstants.OPENDISTRO_SECURITY_CONF_REQUEST_HEADER)
)) {
if (log.isTraceEnabled()) {
log.trace("audit log of audit log will not be executed");

Check warning on line 60 in src/main/java/org/opensearch/security/auditlog/sink/AbstractInternalOpenSearchSink.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/security/auditlog/sink/AbstractInternalOpenSearchSink.java#L60

Added line #L60 was not covered by tests
}
return true;

Check warning on line 62 in src/main/java/org/opensearch/security/auditlog/sink/AbstractInternalOpenSearchSink.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/security/auditlog/sink/AbstractInternalOpenSearchSink.java#L62

Added line #L62 was not covered by tests
}

try (StoredContext ctx = threadPool.getThreadContext().stashContext()) {
try {
final IndexRequestBuilder irb = clientProvider.prepareIndex(indexName)
.setRefreshPolicy(RefreshPolicy.IMMEDIATE)
.setSource(msg.getAsMap());
threadPool.getThreadContext().putHeader(ConfigConstants.OPENDISTRO_SECURITY_CONF_REQUEST_HEADER, "true");
irb.setTimeout(TimeValue.timeValueMinutes(1));
if (this.storeOpType != null) {
irb.setOpType(this.storeOpType);
}
irb.execute().actionGet();
return true;
} catch (final Exception e) {
log.error("Unable to index audit log {} due to", msg, e);
return false;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.security.auditlog.sink;

// CS-SUPPRESS-SINGLE: RegexpSingleline https://github.com/opensearch-project/OpenSearch/issues/3663
import java.io.IOException;
import java.nio.file.Path;
import java.util.List;

import org.opensearch.ResourceAlreadyExistsException;
import org.opensearch.action.DocWriteRequest;
import org.opensearch.action.admin.indices.datastream.CreateDataStreamAction;
import org.opensearch.action.admin.indices.template.put.PutComposableIndexTemplateAction;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.ComposableIndexTemplate;
import org.opensearch.cluster.metadata.DataStream;
import org.opensearch.cluster.metadata.Template;
import org.opensearch.common.settings.Settings;
import org.opensearch.security.auditlog.impl.AuditMessage;
import org.opensearch.security.support.ConfigConstants;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.RemoteTransportException;

public final class InternalOpenSearchDataStreamSink extends AbstractInternalOpenSearchSink {

String dataStreamName;
private boolean dataStreamInitialized = false;

public InternalOpenSearchDataStreamSink(
final String name,
final Settings settings,
final String settingsPrefix,
final Path configPath,
final Client clientProvider,
ThreadPool threadPool,
AuditLogSink fallbackSink
) {
super(name, settings, settingsPrefix, clientProvider, threadPool, fallbackSink, DocWriteRequest.OpType.CREATE);
Settings sinkSettings = getSinkSettings(settingsPrefix);

this.dataStreamName = sinkSettings.get(ConfigConstants.SECURITY_AUDIT_OPENSEARCH_DATASTREAM_NAME, "opensearch-security-auditlog");

// Node is no ready yet... this.initDataStream() must be called later (in method doStore())
}

private boolean initDataStream() {

if (this.dataStreamInitialized) {
return true;
}

Settings sinkSettings = getSinkSettings(settingsPrefix);

final boolean templateManage = sinkSettings.getAsBoolean(
ConfigConstants.SECURITY_AUDIT_OPENSEARCH_DATASTREAM_TEMPLATE_MANAGE,
true
);

// Create datastream template
if (templateManage) {

final String templateName = sinkSettings.get(
ConfigConstants.SECURITY_AUDIT_OPENSEARCH_DATASTREAM_TEMPLATE_NAME,
"opensearch-security-auditlog"
);
final Integer numberOfReplicas = sinkSettings.getAsInt(
ConfigConstants.SECURITY_AUDIT_OPENSEARCH_DATASTREAM_TEMPLATE_NUMBER_OF_REPLICAS,
0
);
final Integer numberOfShards = sinkSettings.getAsInt(
ConfigConstants.SECURITY_AUDIT_OPENSEARCH_DATASTREAM_TEMPLATE_NUMBER_OF_SHARDS,
1
);

ComposableIndexTemplate template = new ComposableIndexTemplate(
List.of(dataStreamName),
new Template(
Settings.builder().put("number_of_shards", numberOfShards).put("number_of_replicas", numberOfReplicas).build(),
null,
null
),
null,
null,
null,
null,
new ComposableIndexTemplate.DataStreamTemplate(new DataStream.TimestampField("@timestamp"))
);

try {
PutComposableIndexTemplateAction.Request request = new PutComposableIndexTemplateAction.Request(templateName);
request.indexTemplate(template);
AcknowledgedResponse response = clientProvider.execute(PutComposableIndexTemplateAction.INSTANCE, request).get();
if (!response.isAcknowledged()) {
log.error("Failed to create index template {}", templateName);
return false;

Check warning on line 105 in src/main/java/org/opensearch/security/auditlog/sink/InternalOpenSearchDataStreamSink.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/security/auditlog/sink/InternalOpenSearchDataStreamSink.java#L104-L105

Added lines #L104 - L105 were not covered by tests
}
} catch (final Exception e) {
log.error("Cannot create index template {} due to", templateName, e);
return false;

Check warning on line 109 in src/main/java/org/opensearch/security/auditlog/sink/InternalOpenSearchDataStreamSink.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/security/auditlog/sink/InternalOpenSearchDataStreamSink.java#L107-L109

Added lines #L107 - L109 were not covered by tests
}
}

CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request(dataStreamName);
try {
AcknowledgedResponse response = clientProvider.admin().indices().createDataStream(createDataStreamRequest).get();
if (!response.isAcknowledged()) {
log.error("Failed to create datastream {}", dataStreamName);

Check warning on line 117 in src/main/java/org/opensearch/security/auditlog/sink/InternalOpenSearchDataStreamSink.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/security/auditlog/sink/InternalOpenSearchDataStreamSink.java#L117

Added line #L117 was not covered by tests
}
this.dataStreamInitialized = true;
} catch (final Exception e) {
if (e.getCause() instanceof ResourceAlreadyExistsException
|| (e.getCause() instanceof RemoteTransportException
&& e.getCause().getCause() instanceof ResourceAlreadyExistsException)) {
log.trace("Datastream {} already exists", dataStreamName);
this.dataStreamInitialized = true;
} else {
log.error("Cannot create datastream {} due to", dataStreamName, e);
return false;
}
}

return this.dataStreamInitialized;
}

@Override
public void close() throws IOException {

}

public boolean doStore(final AuditMessage msg) {

if (!this.initDataStream()) {
log.error("Datastream initializaten failed. Cannot write to auditlog");
return false;
}

return super.doStore(msg, this.dataStreamName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,20 @@
import java.io.IOException;
import java.nio.file.Path;

import org.opensearch.action.index.IndexRequestBuilder;
import org.opensearch.action.support.WriteRequest.RefreshPolicy;
import org.opensearch.client.Client;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.ThreadContext.StoredContext;
import org.opensearch.security.auditlog.impl.AuditMessage;
import org.opensearch.security.support.ConfigConstants;
import org.opensearch.security.support.HeaderHelper;
import org.opensearch.threadpool.ThreadPool;

import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;

public final class InternalOpenSearchSink extends AuditLogSink {
public final class InternalOpenSearchSink extends AbstractInternalOpenSearchSink {

private final Client clientProvider;
final String index;
final String type;
private DateTimeFormatter indexPattern;
private final ThreadPool threadPool;

public InternalOpenSearchSink(
final String name,
Expand All @@ -45,14 +38,12 @@ public InternalOpenSearchSink(
ThreadPool threadPool,
AuditLogSink fallbackSink
) {
super(name, settings, settingsPrefix, fallbackSink);
this.clientProvider = clientProvider;
Settings sinkSettings = getSinkSettings(settingsPrefix);
super(name, settings, settingsPrefix, clientProvider, threadPool, fallbackSink, null);

Settings sinkSettings = getSinkSettings(settingsPrefix);
this.index = sinkSettings.get(ConfigConstants.SECURITY_AUDIT_OPENSEARCH_INDEX, "'security-auditlog-'YYYY.MM.dd");
this.type = sinkSettings.get(ConfigConstants.SECURITY_AUDIT_OPENSEARCH_TYPE, null);

this.threadPool = threadPool;
try {
this.indexPattern = DateTimeFormat.forPattern(index);
} catch (IllegalArgumentException e) {
Expand All @@ -69,29 +60,6 @@ public void close() throws IOException {
}

public boolean doStore(final AuditMessage msg) {

if (Boolean.parseBoolean(
HeaderHelper.getSafeFromHeader(threadPool.getThreadContext(), ConfigConstants.OPENDISTRO_SECURITY_CONF_REQUEST_HEADER)
)) {
if (log.isTraceEnabled()) {
log.trace("audit log of audit log will not be executed");
}
return true;
}

try (StoredContext ctx = threadPool.getThreadContext().stashContext()) {
try {
final IndexRequestBuilder irb = clientProvider.prepareIndex(getExpandedIndexName(indexPattern, index))
.setRefreshPolicy(RefreshPolicy.IMMEDIATE)
.setSource(msg.getAsMap());
threadPool.getThreadContext().putHeader(ConfigConstants.OPENDISTRO_SECURITY_CONF_REQUEST_HEADER, "true");
irb.setTimeout(TimeValue.timeValueMinutes(1));
irb.execute().actionGet();
return true;
} catch (final Exception e) {
log.error("Unable to index audit log {} due to", msg, e);
return false;
}
}
return super.doStore(msg, getExpandedIndexName(this.indexPattern, this.index));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,17 @@ private final AuditLogSink createSink(final String name, final String type, fina
case "internal_opensearch":
sink = new InternalOpenSearchSink(name, settings, settingsPrefix, configPath, clientProvider, threadPool, fallbackSink);
break;
case "internal_opensearch_data_stream":
sink = new InternalOpenSearchDataStreamSink(
name,
settings,
settingsPrefix,
configPath,
clientProvider,
threadPool,
fallbackSink
);
break;
case "external_opensearch":
try {
sink = new ExternalOpenSearchSink(name, settings, settingsPrefix, configPath, fallbackSink);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,13 @@ public class ConfigConstants {

public static final String SECURITY_AUDIT_CONFIG_DEFAULT_PREFIX = "plugins.security.audit.config.";

// Internal Opensearch data_stream
public static final String SECURITY_AUDIT_OPENSEARCH_DATASTREAM_NAME = "data_stream.name";
public static final String SECURITY_AUDIT_OPENSEARCH_DATASTREAM_TEMPLATE_MANAGE = "data_stream.template.manage";
public static final String SECURITY_AUDIT_OPENSEARCH_DATASTREAM_TEMPLATE_NAME = "data_stream.template.name";
public static final String SECURITY_AUDIT_OPENSEARCH_DATASTREAM_TEMPLATE_NUMBER_OF_REPLICAS = "data_stream.template.number_of_replicas";
public static final String SECURITY_AUDIT_OPENSEARCH_DATASTREAM_TEMPLATE_NUMBER_OF_SHARDS = "data_stream.template.number_of_shards";

// Internal / External OpenSearch
public static final String SECURITY_AUDIT_OPENSEARCH_INDEX = "index";
public static final String SECURITY_AUDIT_OPENSEARCH_TYPE = "type";
Expand Down
Loading
Loading