Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SDKS-7445] Add InputStream provider for localhost mode #429

Merged
merged 15 commits into from
Sep 7, 2023
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -2,55 +2,44 @@

import com.google.gson.stream.JsonReader;
import io.split.client.dtos.SplitChange;
import io.split.client.utils.InputStreamProvider;
import io.split.client.utils.Json;
import io.split.client.utils.LocalhostSanitizer;
import io.split.engine.common.FetchOptions;
import io.split.engine.experiments.SplitChangeFetcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.UnsupportedEncodingException;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Arrays;

public class JsonLocalhostSplitChangeFetcher implements SplitChangeFetcher {

private static final Logger _log = LoggerFactory.getLogger(JsonLocalhostSplitChangeFetcher.class);
private final File _file;
private final InputStreamProvider _inputStreamProvider;
private byte [] lastHash;

public JsonLocalhostSplitChangeFetcher(String filePath) {
_file = new File(filePath);
public JsonLocalhostSplitChangeFetcher(InputStreamProvider inputStreamProvider) {
_inputStreamProvider = inputStreamProvider;
lastHash = new byte[0];
}

@Override
public SplitChange fetch(long since, FetchOptions options) {

try {
JsonReader jsonReader = new JsonReader(new FileReader(_file));
JsonReader jsonReader = new JsonReader(new BufferedReader(new InputStreamReader(_inputStreamProvider.get(), StandardCharsets.UTF_8)));
SplitChange splitChange = Json.fromJson(jsonReader, SplitChange.class);
return processSplitChange(splitChange, since);
} catch (FileNotFoundException f){
_log.warn(String.format("There was no file named %s found. " +
"We created a split client that returns default treatments for all feature flags for all of your users. " +
"If you wish to return a specific treatment for a feature flag, enter the name of that feature flag name and " +
"treatment name separated by whitespace in %s; one pair per line. Empty lines or lines starting with '#' are " +
"considered comments",
_file.getPath(), _file.getPath()), f);
throw new IllegalStateException("Problem fetching splitChanges: " + f.getMessage(), f);
} catch (Exception e) {
_log.warn(String.format("Problem to fetch split change using the file %s",
_file.getPath()), e);
throw new IllegalStateException("Problem fetching splitChanges: " + e.getMessage(), e);
}
}

private SplitChange processSplitChange(SplitChange splitChange, long changeNumber) throws NoSuchAlgorithmException, UnsupportedEncodingException {
private SplitChange processSplitChange(SplitChange splitChange, long changeNumber) throws NoSuchAlgorithmException {
SplitChange splitChangeToProcess = LocalhostSanitizer.sanitization(splitChange);
// if the till is less than storage CN and different from the default till ignore the change
if (splitChangeToProcess.till < changeNumber && splitChangeToProcess.till != -1) {
Expand Down
27 changes: 26 additions & 1 deletion client/src/main/java/io/split/client/SplitClientConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@

import io.split.client.impressions.ImpressionListener;
import io.split.client.impressions.ImpressionsManager;
import io.split.client.utils.FileTypeEnum;
import io.split.integrations.IntegrationsConfig;
import io.split.storages.enums.OperationMode;
import io.split.storages.enums.StorageMode;
import org.apache.hc.core5.http.HttpHost;
import pluggable.CustomStorageWrapper;

import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.concurrent.ThreadFactory;

Expand Down Expand Up @@ -49,6 +51,8 @@ public class SplitClientConfig {
private final int _maxStringLength;
private final boolean _destroyOnShutDown;
private final String _splitFile;
private final FileTypeEnum _fileType;
private final InputStream _inputStream;
private final String _segmentDirectory;
private final IntegrationsConfig _integrationsConfig;
private final boolean _streamingEnabled;
Expand Down Expand Up @@ -81,7 +85,6 @@ public class SplitClientConfig {
public static String splitSdkVersion;
private final long _lastSeenCacheSize;


public static Builder builder() {
return new Builder();
}
Expand Down Expand Up @@ -111,6 +114,8 @@ private SplitClientConfig(String endpoint,
int maxStringLength,
boolean destroyOnShutDown,
String splitFile,
FileTypeEnum fileType,
InputStream inputStream,
String segmentDirectory,
IntegrationsConfig integrationsConfig,
boolean streamingEnabled,
Expand Down Expand Up @@ -159,6 +164,8 @@ private SplitClientConfig(String endpoint,
_maxStringLength = maxStringLength;
_destroyOnShutDown = destroyOnShutDown;
_splitFile = splitFile;
_fileType = fileType;
_inputStream = inputStream;
_segmentDirectory = segmentDirectory;
_integrationsConfig = integrationsConfig;
_streamingEnabled = streamingEnabled;
Expand Down Expand Up @@ -301,6 +308,14 @@ public String splitFile() {
return _splitFile;
}

public FileTypeEnum fileType() {
return _fileType;
}

public InputStream inputStream(){
return _inputStream;
}

public String segmentDirectory() {
return _segmentDirectory;
}
Expand Down Expand Up @@ -394,6 +409,8 @@ public static final class Builder {
private int _maxStringLength = 250;
private boolean _destroyOnShutDown = true;
private String _splitFile = null;
private FileTypeEnum _fileType = null;
private InputStream _inputStream = null;
private String _segmentDirectory = null;
private IntegrationsConfig _integrationsConfig = null;
private boolean _streamingEnabled = true;
Expand Down Expand Up @@ -748,6 +765,12 @@ public Builder splitFile(String splitFile) {
return this;
}

public Builder splitFile(InputStream inputStream, FileTypeEnum fileType) {
_fileType = fileType;
_inputStream = inputStream;
return this;
}

/**
* Set the location of the directory where are the segment json files for localhost mode.
* This setting is optional.
Expand Down Expand Up @@ -1005,6 +1028,8 @@ public SplitClientConfig build() {
_maxStringLength,
_destroyOnShutDown,
_splitFile,
_fileType,
_inputStream,
_segmentDirectory,
_integrationsConfig,
_streamingEnabled,
Expand Down
68 changes: 58 additions & 10 deletions client/src/main/java/io/split/client/SplitFactoryImpl.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.split.client;

import com.google.common.io.Files;
import io.split.client.dtos.Metadata;
import io.split.client.events.EventsSender;
import io.split.client.events.EventsStorage;
Expand Down Expand Up @@ -30,6 +31,10 @@
import io.split.client.interceptors.GzipDecoderResponseInterceptor;
import io.split.client.interceptors.GzipEncoderRequestInterceptor;
import io.split.client.interceptors.SdkMetadataInterceptorFilter;
import io.split.client.utils.FileInputStreamProvider;
import io.split.client.utils.FileTypeEnum;
import io.split.client.utils.InputStreamProvider;
import io.split.client.utils.InputStreamProviderImp;
import io.split.client.utils.SDKMetadata;
import io.split.engine.SDKReadinessGates;
import io.split.engine.common.ConsumerSyncManager;
Expand Down Expand Up @@ -99,6 +104,7 @@
import pluggable.CustomStorageWrapper;

import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.URI;
import java.net.URISyntaxException;
Expand All @@ -111,6 +117,8 @@

public class SplitFactoryImpl implements SplitFactory {
private static final Logger _log = LoggerFactory.getLogger(SplitFactory.class);
private static final String LEGACY_LOG_MESSAGE = "The sdk initialize in localhost mode using Legacy file. The splitFile or " +
"inputStream doesn't add it to the config.";
private final static long SSE_CONNECT_TIMEOUT = 30000;
private final static long SSE_SOCKET_TIMEOUT = 70000;

Expand Down Expand Up @@ -366,16 +374,7 @@ protected SplitFactoryImpl(SplitClientConfig config) {
config.getThreadFactory());

// SplitFetcher
SplitChangeFetcher splitChangeFetcher;
String splitFile = config.splitFile();
if (splitFile != null && splitFile.toLowerCase().endsWith(".json")){
splitChangeFetcher = new JsonLocalhostSplitChangeFetcher(config.splitFile());
} else if (splitFile != null && !splitFile.isEmpty() && (splitFile.endsWith(".yaml") || splitFile.endsWith(".yml"))) {
splitChangeFetcher = new YamlLocalhostSplitChangeFetcher(splitFile);
} else {
splitChangeFetcher = new LegacyLocalhostSplitChangeFetcher(config.splitFile());
}

SplitChangeFetcher splitChangeFetcher = createSplitChangeFetcher(config);
SplitParser splitParser = new SplitParser();

_splitFetcher = new SplitFetcherImp(splitChangeFetcher, splitParser, splitCache, _telemetryStorageProducer);
Expand Down Expand Up @@ -643,4 +642,53 @@ private UniqueKeysTracker createUniqueKeysTracker(SplitClientConfig config){
}
return null;
}

private SplitChangeFetcher createSplitChangeFetcher(SplitClientConfig splitClientConfig) {
String splitFile = splitClientConfig.splitFile();
InputStream inputStream = splitClientConfig.inputStream();
FileTypeEnum fileType = splitClientConfig.fileType();
InputStreamProvider inputStreamProvider;
if (splitFile != null || !isInputStreamConfigValid(inputStream, fileType)) {
if (splitFile == null) {
_log.warn("The InputStream config is invalid");
}
fileType = getFileTypeFromFileName(splitFile);
inputStreamProvider = new FileInputStreamProvider(splitFile);
} else {
inputStreamProvider = new InputStreamProviderImp(inputStream);
}
try {
switch (fileType){
case JSON:
return new JsonLocalhostSplitChangeFetcher(inputStreamProvider);
case YAML:
return new YamlLocalhostSplitChangeFetcher(inputStreamProvider);
default:
_log.warn(LEGACY_LOG_MESSAGE);
return new LegacyLocalhostSplitChangeFetcher(splitFile);
}
} catch (Exception e) {
_log.warn(String.format("There was no file named %s found. " +
"We created a split client that returns default treatments for all feature flags for all of your users. " +
"If you wish to return a specific treatment for a feature flag, enter the name of that feature flag name and " +
"treatment name separated by whitespace in %s; one pair per line. Empty lines or lines starting with '#' are " +
"considered comments",
splitFile, splitFile), e);
}
_log.warn(LEGACY_LOG_MESSAGE);
return new LegacyLocalhostSplitChangeFetcher(splitFile);
}

private Boolean isInputStreamConfigValid(InputStream inputStream, FileTypeEnum fileType) {
return inputStream != null && fileType != null;
}

private FileTypeEnum getFileTypeFromFileName(String fileName) {
try {
return FileTypeEnum.valueOf(Files.getFileExtension(fileName).toUpperCase());
} catch (Exception e) {
return FileTypeEnum.LEGACY;
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,15 @@
import io.split.client.dtos.Split;
import io.split.client.dtos.SplitChange;
import io.split.client.dtos.Status;
import io.split.client.utils.InputStreamProvider;
import io.split.client.utils.LocalhostConstants;
import io.split.engine.common.FetchOptions;
import io.split.engine.experiments.SplitChangeFetcher;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.yaml.snakeyaml.Yaml;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand All @@ -23,21 +22,20 @@

import static io.split.client.utils.LocalhostSanitizer.createCondition;


public class YamlLocalhostSplitChangeFetcher implements SplitChangeFetcher {

private static final Logger _log = LoggerFactory.getLogger(YamlLocalhostSplitChangeFetcher.class);
private final File _splitFile;
private final InputStreamProvider _inputStreamProvider;

public YamlLocalhostSplitChangeFetcher(String filePath) {
_splitFile = new File(filePath);
public YamlLocalhostSplitChangeFetcher(InputStreamProvider inputStreamProvider) {
_inputStreamProvider = inputStreamProvider;
}

@Override
public SplitChange fetch(long since, FetchOptions options) {
try {
Yaml yaml = new Yaml();
List<Map<String, Map<String, Object>>> yamlSplits = yaml.load(new FileReader(_splitFile));
List<Map<String, Map<String, Object>>> yamlSplits = yaml.load(_inputStreamProvider.get());
SplitChange splitChange = new SplitChange();
splitChange.splits = new ArrayList<>();
for(Map<String, Map<String, Object>> aSplit : yamlSplits) {
Expand Down Expand Up @@ -76,17 +74,8 @@ public SplitChange fetch(long since, FetchOptions options) {
splitChange.till = since;
splitChange.since = since;
return splitChange;
} catch (FileNotFoundException f) {
_log.warn(String.format("There was no file named %s found. We created a split client that returns default treatments " +
"for all feature flags for all of your users. If you wish to return a specific treatment for a feature flag, " +
"enter the name of that feature flag name and treatment name separated by whitespace in %s; one pair per line. " +
"Empty lines or lines starting with '#' are considered comments",
_splitFile.getPath(), _splitFile.getPath()), f);
throw new IllegalStateException("Problem fetching splitChanges: " + f.getMessage(), f);
} catch (Exception e) {
_log.warn(String.format("Problem to fetch split change using the file %s",
_splitFile.getPath()), e);
throw new IllegalStateException("Problem fetching splitChanges: " + e.getMessage(), e);
throw new IllegalStateException("Problem fetching splitChanges using a yaml file: " + e.getMessage(), e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package io.split.client.exceptions;

public class InputStreamProviderException extends Exception {

public InputStreamProviderException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package io.split.client.utils;

import io.split.client.exceptions.InputStreamProviderException;

import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.InputStream;

public class FileInputStreamProvider implements InputStreamProvider {

private final String _fileName;

public FileInputStreamProvider(String fileName) {
_fileName = fileName;
}

@Override
public InputStream get() throws InputStreamProviderException {
try {
return new FileInputStream(_fileName);
} catch (FileNotFoundException f) {
throw new InputStreamProviderException(String.format("Problem fetching splitChanges using file named %s: %s",
_fileName, f.getMessage()));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.split.client.utils;

public enum FileTypeEnum {
LEGACY,
YAML,
JSON
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package io.split.client.utils;

import io.split.client.exceptions.InputStreamProviderException;

import java.io.InputStream;

public interface InputStreamProvider {

InputStream get() throws InputStreamProviderException;
}
Loading