Skip to content

Commit

Permalink
fix-issue-434 To add proxy related settings to splunk connector
Browse files Browse the repository at this point in the history
Signed-off-by: Wang, Shu <[email protected]>
  • Loading branch information
wangshu3000 committed Oct 28, 2024
1 parent 11e3498 commit b45a55d
Show file tree
Hide file tree
Showing 12 changed files with 132 additions and 12 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ Use the below schema to configure Splunk Connect for Kafka
"splunk.hec.ack.poll.threads": "<number of threads used to poll event acks>",
"splunk.hec.ssl.validate.certs": "<true|false>",
"splunk.hec.http.keepalive": "<true|false>",
"splunk.hec.http.proxy.host": "<the http proxy host name>",
"splunk.hec.http.proxy.port": "<the http proxy port number>",
"splunk.hec.max.http.connection.per.channel": "<max number of http connections per channel>",
"splunk.hec.total.channels": "<total number of channels>",
"splunk.hec.max.batch.size": "<max number of kafka records post in one batch>",
Expand Down Expand Up @@ -167,6 +169,8 @@ Use the below schema to configure Splunk Connect for Kafka
| `splunk.validation.disable` | Disable validating splunk configurations before creating task. | `false` |
| `splunk.hec.ssl.validate.certs` | Valid settings are `true` or `false`. Enables or disables HTTPS certification validation. |`true`|
| `splunk.hec.http.keepalive` | Valid settings are `true` or `false`. Enables or disables HTTP connection keep-alive. |`true`|
| `splunk.hec.http.proxy.host` | This setting is the http proxy server hostname. Configure it to use connector level proxy when connecting to HEC endpoint, otherwise, it'll use JVM level proxy setting in JVM_OPTS. | `""` |
| `splunk.hec.http.proxy.port` | This setting is the http proxy server port. Configure it to use connector level proxy when connecting to HEC endpoint, otherwise, it'll use JVM level proxy setting in JVM_OPTS. | `0` |
| `splunk.hec.max.http.connection.per.channel` | Controls how many HTTP connections will be created and cached in the HTTP pool for one HEC channel. |`2`|
| `splunk.hec.total.channels` | Controls the total channels created to perform HEC event POSTs. See the Load balancer section for more details. |`2`|
| `splunk.hec.max.batch.size` | Maximum batch size when posting events to Splunk. The size is the actual number of Kafka events, and not byte size. |`500`|
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/com/splunk/hecclient/Hec.java
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,8 @@ public static CloseableHttpClient createHttpClient(final HecConfig config) {
return new HttpClientBuilder().setDisableSSLCertVerification(config.getDisableSSLCertVerification())
.setMaxConnectionPoolSizePerDestination(poolSizePerDest)
.setMaxConnectionPoolSize(poolSizePerDest * config.getUris().size())
.setHttpProxyHost(config.getHttpProxyHost())
.setHttpProxyPort(config.getHttpProxyPort())
.build();
}

Expand All @@ -295,6 +297,8 @@ public static CloseableHttpClient createHttpClient(final HecConfig config) {
.setMaxConnectionPoolSizePerDestination(poolSizePerDest)
.setMaxConnectionPoolSize(poolSizePerDest * config.getUris().size())
.setSslContext(context)
.setHttpProxyHost(config.getHttpProxyHost())
.setHttpProxyPort(config.getHttpProxyPort())
.build();
}
else {
Expand Down
20 changes: 20 additions & 0 deletions src/main/java/com/splunk/hecclient/HecConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ public final class HecConfig {
private String kerberosKeytabPath;
private int concurrentHecQueueCapacity = 100;
private Boolean autoExtractTimestamp;
private String httpProxyHost;
private int httpProxyPort = 0;

public HecConfig(List<String> uris, String token) {
this.uris = uris;
Expand All @@ -63,6 +65,14 @@ public boolean getHttpKeepAlive() {
return httpKeepAlive;
}

public String getHttpProxyHost() {
return httpProxyHost;
}

public int getHttpProxyPort() {
return httpProxyPort;
}

public int getSocketTimeout() {
return socketTimeout;
}
Expand Down Expand Up @@ -127,6 +137,16 @@ public HecConfig setHttpKeepAlive(boolean keepAlive) {
return this;
}

public HecConfig setHttpProxyHost(final String httpProxyHost) {
this.httpProxyHost = httpProxyHost;
return this;
}

public HecConfig setHttpProxyPort(final int httpProxyPort) {
this.httpProxyPort = httpProxyPort;
return this;
}

public HecConfig setSocketTimeout(int timeout /*seconds*/) {
socketTimeout = timeout;
return this;
Expand Down
24 changes: 21 additions & 3 deletions src/main/java/com/splunk/hecclient/HttpClientBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.Principal;

import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthSchemeProvider;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.Credentials;
Expand Down Expand Up @@ -50,6 +53,8 @@ public final class HttpClientBuilder {
private int socketTimeout = 60; // in seconds
private int socketSendBufferSize = 8 * 1024 * 1024; // in bytes
private boolean disableSSLCertVerification = false;
private String httpProxyHost;
private int httpProxyPort;
private SSLContext sslContext = null;

public HttpClientBuilder setMaxConnectionPoolSizePerDestination(int connections) {
Expand Down Expand Up @@ -77,6 +82,16 @@ public HttpClientBuilder setDisableSSLCertVerification(boolean disableVerificati
return this;
}

public HttpClientBuilder setHttpProxyHost(final String httpProxyHost) {
this.httpProxyHost = httpProxyHost;
return this;
}

public HttpClientBuilder setHttpProxyPort(final int httpProxyPort) {
this.httpProxyPort = httpProxyPort;
return this;
}

public HttpClientBuilder setSslContext(SSLContext context) {
this.sslContext = context;
return this;
Expand All @@ -88,9 +103,12 @@ public CloseableHttpClient build() {
.setSndBufSize(socketSendBufferSize)
.setSoTimeout(socketTimeout * 1000)
.build();
RequestConfig requestConfig = RequestConfig.custom()
.setCookieSpec(CookieSpecs.STANDARD)
.build();
RequestConfig.Builder requestConfigBuilder = RequestConfig.custom()
.setCookieSpec(CookieSpecs.STANDARD);
if (StringUtils.isNotEmpty(this.httpProxyHost) && this.httpProxyPort != 0) {
requestConfigBuilder.setProxy(new HttpHost(this.httpProxyHost, this.httpProxyPort));
}
RequestConfig requestConfig = requestConfigBuilder.build();

return HttpClients.custom()
.useSystemProperties()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
static final String MAX_HTTP_CONNECTION_PER_CHANNEL_CONF = "splunk.hec.max.http.connection.per.channel";
static final String MAX_BATCH_SIZE_CONF = "splunk.hec.max.batch.size"; // record count
static final String HTTP_KEEPALIVE_CONF = "splunk.hec.http.keepalive";
static final String HTTP_PROXY_HOST_CONF = "splunk.hec.http.proxy.host";
static final String HTTP_PROXY_PORT_CONF = "splunk.hec.http.proxy.port";
static final String HEC_THREDS_CONF = "splunk.hec.threads";
static final String SOCKET_TIMEOUT_CONF = "splunk.hec.socket.timeout"; // seconds
static final String SSL_VALIDATE_CERTIFICATES_CONF = "splunk.hec.ssl.validate.certs";
Expand Down Expand Up @@ -128,6 +130,10 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
+ "Kafka events not the byte size. By default, this is set to 100.";
static final String HTTP_KEEPALIVE_DOC = "Valid settings are true or false. Enables or disables HTTP connection "
+ "keep-alive. By default, this is set to true";
static final String HTTP_PROXY_HOST_DOC = "This setting is the http proxy server hostname. Configure it to use connector "
+ "level proxy when connecting to HEC endpoint, otherwise, it'll use JVM level proxy setting in JVM_OPTS.";
static final String HTTP_PROXY_PORT_DOC = "This setting is the http proxy server port. Configure it to use connector "
+ "level proxy when connecting to HEC endpoint, otherwise, it'll use JVM level proxy setting in JVM_OPTS.";
static final String HEC_THREADS_DOC = "Controls how many threads are spawned to do data injection via HEC in a single "
+ "connector task. By default, this is set to 1.";
static final String SOCKET_TIMEOUT_DOC = "Max duration in seconds to read / write data to network before internal TCP "
Expand Down Expand Up @@ -225,6 +231,8 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
final int maxHttpConnPerChannel;
final int maxBatchSize;
final boolean httpKeepAlive;
final String httpProxyHost;
final int httpProxyPort;
final int numberOfThreads;
final int socketTimeout;
final boolean validateCertificates;
Expand Down Expand Up @@ -280,6 +288,8 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
sourcetypes = getString(SOURCETYPE_CONF);
sources = getString(SOURCE_CONF);
httpKeepAlive = getBoolean(HTTP_KEEPALIVE_CONF);
httpProxyHost = getString(HTTP_PROXY_HOST_CONF);
httpProxyPort = getInt(HTTP_PROXY_PORT_CONF);
validateCertificates = getBoolean(SSL_VALIDATE_CERTIFICATES_CONF);
trustStorePath = getString(SSL_TRUSTSTORE_PATH_CONF);
hasTrustStorePath = StringUtils.isNotBlank(trustStorePath);
Expand Down Expand Up @@ -330,7 +340,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
autoExtractTimestamp = getBoolean(AUTO_EXTRACT_TIMESTAMP_CONF);
}


public static ConfigDef conf() {
return new ConfigDef()
.define(TOKEN_CONF, ConfigDef.Type.PASSWORD, ConfigDef.Importance.HIGH, TOKEN_DOC)
Expand All @@ -341,6 +351,8 @@ public static ConfigDef conf() {
.define(SOURCETYPE_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, SOURCETYPE_DOC)
.define(SOURCE_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, SOURCE_DOC)
.define(HTTP_KEEPALIVE_CONF, ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.MEDIUM, HTTP_KEEPALIVE_DOC)
.define(HTTP_PROXY_HOST_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.HIGH, HTTP_PROXY_HOST_DOC)
.define(HTTP_PROXY_PORT_CONF, ConfigDef.Type.INT, 0, ConfigDef.Importance.HIGH, HTTP_PROXY_PORT_DOC)
.define(SSL_VALIDATE_CERTIFICATES_CONF, ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.MEDIUM, SSL_VALIDATE_CERTIFICATES_DOC)
.define(SSL_TRUSTSTORE_PATH_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.HIGH, SSL_TRUSTSTORE_PATH_DOC)
.define(SSL_TRUSTSTORE_TYPE_CONF, ConfigDef.Type.STRING, "JKS", ConfigDef.Importance.LOW, SSL_TRUSTSTORE_TYPE_DOC)
Expand Down Expand Up @@ -392,6 +404,8 @@ public HecConfig getHecConfig() {
.setTotalChannels(totalHecChannels)
.setEventBatchTimeout(eventBatchTimeout)
.setHttpKeepAlive(httpKeepAlive)
.setHttpProxyHost(httpProxyHost)
.setHttpProxyPort(httpProxyPort)
.setAckPollInterval(ackPollInterval)
.setlbPollInterval(lbPollInterval)
.setAckPollThreads(ackPollThreads)
Expand Down Expand Up @@ -424,6 +438,8 @@ public String toString() {
+ "headerSupport:" + headerSupport + ", "
+ "headerCustom:" + headerCustom + ", "
+ "httpKeepAlive:" + httpKeepAlive + ", "
+ "httpProxyHost:" + httpProxyHost + ", "
+ "httpProxyPort:" + httpProxyPort + ", "
+ "validateCertificates:" + validateCertificates + ", "
+ "trustStorePath:" + trustStorePath + ", "
+ "trustStoreType:" + trustStoreType + ", "
Expand Down
4 changes: 4 additions & 0 deletions src/test/java/com/splunk/hecclient/HecConfigTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public void getterSetter() {
config.setAckPollInterval(1)
.setDisableSSLCertVerification(true)
.setHttpKeepAlive(false)
.setHttpProxyHost("test.host")
.setHttpProxyPort(8080)
.setSocketSendBufferSize(2)
.setSocketTimeout(3)
.setMaxHttpConnectionPerChannel(4)
Expand All @@ -60,6 +62,8 @@ public void getterSetter() {
Assert.assertEquals(5, config.getTotalChannels());
Assert.assertEquals(6, config.getAckPollThreads());
Assert.assertEquals(7, config.getEventBatchTimeout());
Assert.assertEquals("test.host", config.getHttpProxyHost());
Assert.assertEquals(8080, config.getHttpProxyPort());
Assert.assertEquals("test", config.getTrustStorePath());
Assert.assertEquals("PKCS12", config.getTrustStoreType());
Assert.assertEquals("pass", config.getTrustStorePassword());
Expand Down
21 changes: 21 additions & 0 deletions src/test/java/com/splunk/hecclient/HttpClientBuilderTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,27 @@ public void buildSecureDefault() {
.build();
Assert.assertNotNull(client);
}

@Test
public void buildHttpProxy() {
HttpClientBuilder builder = new HttpClientBuilder();
CloseableHttpClient client = builder.setMaxConnectionPoolSizePerDestination(1)
.setHttpProxyHost("rest.host")
.setHttpProxyPort(8080)
.build();
Assert.assertNotNull(client);
builder = new HttpClientBuilder();
client = builder.setMaxConnectionPoolSizePerDestination(1)
.setHttpProxyPort(8080)
.build();
Assert.assertNotNull(client);
builder = new HttpClientBuilder();
client = builder.setMaxConnectionPoolSizePerDestination(1)
.setHttpProxyHost("rest.host")
.build();
Assert.assertNotNull(client);
}

@Test
public void buildSecureCustomKeystore() {
HttpClientBuilder builder = new HttpClientBuilder();
Expand Down
22 changes: 21 additions & 1 deletion src/test/java/com/splunk/kafka/connect/ConfigProfile.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ public class ConfigProfile {
private String sourcetypes;
private String sources;
private boolean httpKeepAlive;
private String httpProxyHost;
private int httpProxyPort;
private boolean validateCertificates;
private boolean hasTrustStorePath;
private String trustStorePath;
Expand Down Expand Up @@ -75,6 +77,8 @@ public ConfigProfile buildProfileDefault() {
this.sourcetypes = "";
this.sources = "";
this.httpKeepAlive = true;
this.httpProxyHost = "proxy.host";
this.httpProxyPort = 8080;
this.validateCertificates = true;
this.hasTrustStorePath = true;
this.trustStorePath = "./src/test/resources/keystoretest.jks";
Expand Down Expand Up @@ -311,6 +315,22 @@ public void setHttpKeepAlive(boolean httpKeepAlive) {
this.httpKeepAlive = httpKeepAlive;
}

public int getHttpProxyPort() {
return httpProxyPort;
}

public String getHttpProxyHost() {
return httpProxyHost;
}

public void setHttpProxyHost(final String httpProxyHost) {
this.httpProxyHost = httpProxyHost;
}

public void setHttpProxyPort(final int httpProxyPort) {
this.httpProxyPort = httpProxyPort;
}

public boolean isValidateCertificates() {
return validateCertificates;
}
Expand Down Expand Up @@ -472,6 +492,6 @@ public void setHeaderHost(String headerHost) {
}

@Override public String toString() {
return "ConfigProfile{" + "topics='" + topics + '\'' + ", topics.regex='" + topicsRegex + '\'' + ", token='" + token + '\'' + ", uri='" + uri + '\'' + ", raw=" + raw + ", ack=" + ack + ", indexes='" + indexes + '\'' + ", sourcetypes='" + sourcetypes + '\'' + ", sources='" + sources + '\'' + ", httpKeepAlive=" + httpKeepAlive + ", validateCertificates=" + validateCertificates + ", hasTrustStorePath=" + hasTrustStorePath + ", trustStorePath='" + trustStorePath + '\'' + ", trustStoreType='" + trustStoreType + '\'' + ", trustStorePassword='" + trustStorePassword + '\'' + ", eventBatchTimeout=" + eventBatchTimeout + ", ackPollInterval=" + ackPollInterval + ", ackPollThreads=" + ackPollThreads + ", maxHttpConnPerChannel=" + maxHttpConnPerChannel + ", totalHecChannels=" + totalHecChannels + ", socketTimeout=" + socketTimeout + ", enrichements='" + enrichements + '\'' + ", enrichementMap=" + enrichementMap + ", trackData=" + trackData + ", maxBatchSize=" + maxBatchSize + ", numOfThreads=" + numOfThreads + '}';
return "ConfigProfile{" + "topics='" + topics + '\'' + ", topics.regex='" + topicsRegex + '\'' + ", token='" + token + '\'' + ", uri='" + uri + '\'' + ", raw=" + raw + ", ack=" + ack + ", indexes='" + indexes + '\'' + ", sourcetypes='" + sourcetypes + '\'' + ", sources='" + sources + '\'' + ", httpKeepAlive=" + httpKeepAlive + ", httpProxyHost=" + httpProxyHost + ", httpProxyPort=" + httpProxyPort + ", validateCertificates=" + validateCertificates + ", hasTrustStorePath=" + hasTrustStorePath + ", trustStorePath='" + trustStorePath + '\'' + ", " + "trustStoreType='" + trustStoreType + '\'' + ", trustStorePassword='" + trustStorePassword + '\'' + ", eventBatchTimeout=" + eventBatchTimeout + ", ackPollInterval=" + ackPollInterval + ", ackPollThreads=" + ackPollThreads + ", maxHttpConnPerChannel=" + maxHttpConnPerChannel + ", totalHecChannels=" + totalHecChannels + ", socketTimeout=" + socketTimeout + ", enrichements='" + enrichements + '\'' + ", enrichementMap=" + enrichementMap + ", trackData=" + trackData + ", maxBatchSize=" + maxBatchSize + ", numOfThreads=" + numOfThreads + '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,17 @@ public void testSpecialCharLineBreaker() {
Assert.assertEquals("\t", connectorConfig.lineBreaker);
}

@Test
public void testHttpProxy() {
UnitUtil uu = new UnitUtil(0);
Map<String, String> taskConfig = uu.createTaskConfig();
SplunkSinkConnectorConfig connectorConfig = new SplunkSinkConnectorConfig(taskConfig);
HecConfig config = connectorConfig.getHecConfig();

Assert.assertEquals("proxy.host", config.getHttpProxyHost());
Assert.assertEquals(8080, config.getHttpProxyPort());
}

@Test
public void toStr() {
UnitUtil uu = new UnitUtil(0);
Expand Down
2 changes: 2 additions & 0 deletions src/test/java/com/splunk/kafka/connect/UnitUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ public Map<String, String> createTaskConfig() {
config.put(SplunkSinkConnectorConfig.SOURCETYPE_CONF, configProfile.getSourcetypes());
config.put(SplunkSinkConnectorConfig.SOURCE_CONF, configProfile.getSources());
config.put(SplunkSinkConnectorConfig.HTTP_KEEPALIVE_CONF, String.valueOf(configProfile.isHttpKeepAlive()));
config.put(SplunkSinkConnectorConfig.HTTP_PROXY_HOST_CONF, configProfile.getHttpProxyHost());
config.put(SplunkSinkConnectorConfig.HTTP_PROXY_PORT_CONF, String.valueOf(configProfile.getHttpProxyPort()));
config.put(SplunkSinkConnectorConfig.SSL_VALIDATE_CERTIFICATES_CONF, String.valueOf(configProfile.isValidateCertificates()));

if(configProfile.getTrustStorePath() != null ) {
Expand Down
Loading

0 comments on commit b45a55d

Please sign in to comment.