Skip to content

Commit

Permalink
abstract internal opensearch class and some code style fixes
Browse files Browse the repository at this point in the history
Signed-off-by: tmanninger <[email protected]>
  • Loading branch information
tmanninger committed Sep 24, 2024
1 parent 1ea16c4 commit 12f13e0
Showing 1 changed file with 83 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.security.auditlog.sink;

import java.io.IOException;

import org.opensearch.action.DocWriteRequest;
import org.opensearch.action.index.IndexRequestBuilder;
import org.opensearch.action.support.WriteRequest.RefreshPolicy;
import org.opensearch.client.Client;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.ThreadContext.StoredContext;
import org.opensearch.security.auditlog.impl.AuditMessage;
import org.opensearch.security.support.ConfigConstants;
import org.opensearch.security.support.HeaderHelper;
import org.opensearch.threadpool.ThreadPool;

public abstract class AbstractInternalOpenSearchSink extends AuditLogSink {

protected final Client clientProvider;
private final ThreadPool threadPool;
private final DocWriteRequest.OpType storeOpType;

public AbstractInternalOpenSearchSink(
final String name,
final Settings settings,
final String settingsPrefix,
final Client clientProvider,
ThreadPool threadPool,
AuditLogSink fallbackSink,
DocWriteRequest.OpType storeOpType
) {
super(name, settings, settingsPrefix, fallbackSink);
this.clientProvider = clientProvider;
this.threadPool = threadPool;
this.storeOpType = storeOpType;
}

@Override
public void close() throws IOException {

}

public boolean doStore(final AuditMessage msg, String indexName) {

if (Boolean.parseBoolean(
HeaderHelper.getSafeFromHeader(threadPool.getThreadContext(), ConfigConstants.OPENDISTRO_SECURITY_CONF_REQUEST_HEADER)
)) {
if (log.isTraceEnabled()) {
log.trace("audit log of audit log will not be executed");
}
return true;
}

try (StoredContext ctx = threadPool.getThreadContext().stashContext()) {
try {
final IndexRequestBuilder irb = clientProvider.prepareIndex(indexName)
.setRefreshPolicy(RefreshPolicy.IMMEDIATE)
.setSource(msg.getAsMap());
threadPool.getThreadContext().putHeader(ConfigConstants.OPENDISTRO_SECURITY_CONF_REQUEST_HEADER, "true");
irb.setTimeout(TimeValue.timeValueMinutes(1));
if (this.storeOpType != null) {
irb.setOpType(this.storeOpType);
}
irb.execute().actionGet();
return true;
} catch (final Exception e) {
log.error("Unable to index audit log {} due to", msg, e);
return false;
}
}
}
}

0 comments on commit 12f13e0

Please sign in to comment.