Skip to content

Commit

Permalink
Merge pull request #423 from splitio/development
Browse files Browse the repository at this point in the history
[SDKS-7121] Release iff
  • Loading branch information
nmayorsplit authored Jul 18, 2023
2 parents a78a6d3 + 2e62624 commit 28a9a70
Show file tree
Hide file tree
Showing 69 changed files with 1,118 additions and 391 deletions.
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
* @splitio/sdk
5 changes: 5 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
4.8.0 (Jul 18, 2023)
- Improved streaming architecture implementation to apply feature flag updates from the notification received which is now enhanced, improving efficiency and reliability of the whole update system.
- Updated `com.google.guava` dependence to 32.0.1 for fixing a vulnerability.
- Updated SegmentFetcher for better readability.

4.7.2 (May 16, 2023)
- Updated default treatment to be control for yaml and json localhost.
- Updated terminology on the SDKs codebase to be more aligned with current standard without causing a breaking change. The core change is the term split for feature flag on things like logs and javadoc comments.
Expand Down
4 changes: 2 additions & 2 deletions client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>io.split.client</groupId>
<artifactId>java-client-parent</artifactId>
<version>4.7.2</version>
<version>4.8.0</version>
</parent>
<artifactId>java-client</artifactId>
<packaging>jar</packaging>
Expand Down Expand Up @@ -155,7 +155,7 @@
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>30.0-jre</version>
<version>32.0.1-jre</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
Expand Down
19 changes: 7 additions & 12 deletions client/src/main/java/io/split/client/SplitFactoryImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;

Expand All @@ -115,8 +114,6 @@ public class SplitFactoryImpl implements SplitFactory {
private final static long SSE_CONNECT_TIMEOUT = 30000;
private final static long SSE_SOCKET_TIMEOUT = 70000;

private static Random RANDOM = new Random();

private final SDKReadinessGates _gates;
private final ImpressionsManager _impressionsManager;
private final Evaluator _evaluator;
Expand Down Expand Up @@ -152,7 +149,6 @@ public class SplitFactoryImpl implements SplitFactory {
private final URI _eventsRootTarget;
private final UniqueKeysTracker _uniqueKeysTracker;


//Constructor for standalone mode
public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyntaxException {
_userStorageWrapper = null;
Expand Down Expand Up @@ -188,13 +184,14 @@ public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyn
ImpressionsStorage impressionsStorage = new InMemoryImpressionsStorage(config.impressionsQueueSize());
_splitCache = splitCache;
_segmentCache = segmentCache;
_telemetrySynchronizer = new TelemetryInMemorySubmitter(_httpclient, URI.create(config.telemetryURL()), telemetryStorage, splitCache, segmentCache, telemetryStorage, _startTime);
_telemetrySynchronizer = new TelemetryInMemorySubmitter(_httpclient, URI.create(config.telemetryURL()), telemetryStorage, splitCache, _segmentCache, telemetryStorage, _startTime);

// Segments
_segmentSynchronizationTaskImp = buildSegments(config, segmentCache, splitCache);

SplitParser splitParser = new SplitParser();
// SplitFetcher
_splitFetcher = buildSplitFetcher(splitCache, splitCache);
_splitFetcher = buildSplitFetcher(splitCache, splitParser);

// SplitSynchronizationTask
_splitSynchronizationTask = new SplitSynchronizationTask(_splitFetcher,
Expand Down Expand Up @@ -241,7 +238,7 @@ public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyn
SplitAPI splitAPI = SplitAPI.build(_httpclient, buildSSEdHttpClient(apiToken, config, _sdkMetadata));

_syncManager = SyncManagerImp.build(splitTasks, _splitFetcher, splitCache, splitAPI,
segmentCache, _gates, _telemetryStorageProducer, _telemetrySynchronizer, config);
segmentCache, _gates, _telemetryStorageProducer, _telemetrySynchronizer, config, splitParser);
_syncManager.start();

// DestroyOnShutDown
Expand All @@ -255,7 +252,6 @@ public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyn
}
}


//Constructor for consumer mode
protected SplitFactoryImpl(String apiToken, SplitClientConfig config, CustomStorageWrapper customStorageWrapper) throws URISyntaxException {
//Variables that are not used in Consumer mode.
Expand Down Expand Up @@ -378,7 +374,7 @@ protected SplitFactoryImpl(SplitClientConfig config) {

SplitParser splitParser = new SplitParser();

_splitFetcher = new SplitFetcherImp(splitChangeFetcher, splitParser, _splitCache, splitCache, _telemetryStorageProducer);
_splitFetcher = new SplitFetcherImp(splitChangeFetcher, splitParser, splitCache, _telemetryStorageProducer);

// SplitSynchronizationTask
_splitSynchronizationTask = new SplitSynchronizationTask(_splitFetcher, splitCache, config.featuresRefreshRate(), config.getThreadFactory());
Expand Down Expand Up @@ -559,11 +555,10 @@ private SegmentSynchronizationTaskImp buildSegments(SplitClientConfig config, Se
config.getThreadFactory());
}

private SplitFetcher buildSplitFetcher(SplitCacheConsumer splitCacheConsumer, SplitCacheProducer splitCacheProducer) throws URISyntaxException {
private SplitFetcher buildSplitFetcher(SplitCacheProducer splitCacheProducer, SplitParser splitParser) throws URISyntaxException {
SplitChangeFetcher splitChangeFetcher = HttpSplitChangeFetcher.create(_httpclient, _rootTarget, _telemetryStorageProducer);
SplitParser splitParser = new SplitParser();

return new SplitFetcherImp(splitChangeFetcher, splitParser, splitCacheConsumer, splitCacheProducer, _telemetryStorageProducer);
return new SplitFetcherImp(splitChangeFetcher, splitParser, splitCacheProducer, _telemetryStorageProducer);
}

private ImpressionsManagerImpl buildImpressionsManager(SplitClientConfig config, ImpressionsStorageConsumer impressionsStorageConsumer, ImpressionsStorageProducer impressionsStorageProducer) throws URISyntaxException {
Expand Down
2 changes: 1 addition & 1 deletion client/src/main/java/io/split/client/jmx/JmxMonitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -130,4 +130,4 @@ private String getContextPath() {
return null;
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,4 @@ public String fetchDefinition(String featureName) {
public boolean isKeyInSegment(String key, String segmentName) {
return segmentCacheConsumer.isInSegment(segmentName, key);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,4 @@ public interface SplitJmxMonitorMBean {
*/
boolean isKeyInSegment(String key, String segmentName);

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package io.split.client.utils;

import io.split.client.dtos.Split;
import io.split.client.dtos.Status;
import io.split.engine.experiments.ParsedSplit;
import io.split.engine.experiments.SplitParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

public class FeatureFlagProcessor {
private static final Logger _log = LoggerFactory.getLogger(FeatureFlagProcessor.class);

public static FeatureFlagsToUpdate processFeatureFlagChanges(SplitParser splitParser, List<Split> splits) {
List<ParsedSplit> toAdd = new ArrayList<>();
List<String> toRemove = new ArrayList<>();
Set<String> segments = new HashSet<>();
for (Split split : splits) {
if (split.status != Status.ACTIVE) {
// archive.
toRemove.add(split.name);
continue;
}
ParsedSplit parsedSplit = splitParser.parse(split);
if (parsedSplit == null) {
_log.debug(String.format("We could not parse the feature flag definition for: %s", split.name));
continue;
}
segments.addAll(parsedSplit.getSegmentsNames());
toAdd.add(parsedSplit);
}
return new FeatureFlagsToUpdate(toAdd, toRemove, segments);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package io.split.client.utils;

import io.split.engine.experiments.ParsedSplit;

import java.util.List;
import java.util.Set;

public class FeatureFlagsToUpdate {
List<ParsedSplit> toAdd;
List<String> toRemove;
Set<String> segments;

public FeatureFlagsToUpdate(List<ParsedSplit> toAdd, List<String> toRemove, Set<String> segments) {
this.toAdd = toAdd;
this.toRemove = toRemove;
this.segments = segments;
}

public List<ParsedSplit> getToAdd() {
return toAdd;
}

public List<String> getToRemove() {
return toRemove;
}

public Set<String> getSegments() {
return segments;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.split.client.impressions.ImpressionsManager;
import io.split.client.impressions.UniqueKeysTracker;
import io.split.engine.sse.dtos.SplitKillNotification;
import io.split.telemetry.synchronizer.TelemetrySyncTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -39,7 +40,7 @@ public void refreshSplits(Long targetChangeNumber) {
}

@Override
public void localKillSplit(String featureFlagName, String defaultTreatment, long newChangeNumber) {
public void localKillSplit(SplitKillNotification splitKillNotification) {
//No-Op
}

Expand Down Expand Up @@ -80,4 +81,9 @@ public void stopPeriodicDataRecording() {
_telemetrySyncTask.stopScheduledTask();
_log.info("Successful shutdown of telemetry sync task");
}
}

@Override
public void forceRefreshSegment(String segmentName) {
//No-Op
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import io.split.engine.experiments.SplitSynchronizationTask;
import io.split.engine.segments.SegmentFetcher;
import io.split.engine.segments.SegmentSynchronizationTask;
import io.split.engine.sse.dtos.SplitKillNotification;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -66,7 +67,7 @@ public void refreshSplits(Long targetChangeNumber) {
}

@Override
public void localKillSplit(String featureFlagName, String defaultTreatment, long newChangeNumber) {
public void localKillSplit(SplitKillNotification splitKillNotification) {
//No-Op
}

Expand All @@ -85,4 +86,9 @@ public void startPeriodicDataRecording() {
public void stopPeriodicDataRecording() {
//No-Op
}

@Override
public void forceRefreshSegment(String segmentName) {
//No-Op
}
}
26 changes: 15 additions & 11 deletions client/src/main/java/io/split/engine/common/PushManagerImp.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.split.engine.common;

import com.google.common.annotations.VisibleForTesting;
import io.split.engine.experiments.SplitParser;
import io.split.engine.sse.AuthApiClient;
import io.split.engine.sse.AuthApiClientImp;
import io.split.engine.sse.EventSourceClient;
Expand All @@ -11,10 +12,11 @@
import io.split.engine.sse.dtos.AuthenticationResponse;
import io.split.engine.sse.dtos.SegmentQueueDto;
import io.split.engine.sse.workers.SegmentsWorkerImp;
import io.split.engine.sse.workers.SplitsWorker;
import io.split.engine.sse.workers.SplitsWorkerImp;
import io.split.engine.sse.workers.FeatureFlagsWorker;
import io.split.engine.sse.workers.FeatureFlagWorkerImp;
import io.split.engine.sse.workers.Worker;

import io.split.storages.SplitCacheProducer;
import io.split.telemetry.domain.StreamingEvent;
import io.split.telemetry.domain.enums.StreamEventsEnum;
import io.split.telemetry.storage.TelemetryRuntimeProducer;
Expand All @@ -36,7 +38,7 @@ public class PushManagerImp implements PushManager {

private final AuthApiClient _authApiClient;
private final EventSourceClient _eventSourceClient;
private final SplitsWorker _splitsWorker;
private final FeatureFlagsWorker _featureFlagsWorker;
private final Worker<SegmentQueueDto> _segmentWorker;
private final PushStatusTracker _pushStatusTracker;

Expand All @@ -48,15 +50,15 @@ public class PushManagerImp implements PushManager {
@VisibleForTesting
/* package private */ PushManagerImp(AuthApiClient authApiClient,
EventSourceClient eventSourceClient,
SplitsWorker splitsWorker,
FeatureFlagsWorker featureFlagsWorker,
Worker<SegmentQueueDto> segmentWorker,
PushStatusTracker pushStatusTracker,
TelemetryRuntimeProducer telemetryRuntimeProducer,
ThreadFactory threadFactory) {

_authApiClient = checkNotNull(authApiClient);
_eventSourceClient = checkNotNull(eventSourceClient);
_splitsWorker = splitsWorker;
_featureFlagsWorker = featureFlagsWorker;
_segmentWorker = segmentWorker;
_pushStatusTracker = pushStatusTracker;
_expirationTime = new AtomicLong();
Expand All @@ -70,13 +72,15 @@ public static PushManagerImp build(Synchronizer synchronizer,
SplitAPI splitAPI,
LinkedBlockingQueue<PushManager.Status> statusMessages,
TelemetryRuntimeProducer telemetryRuntimeProducer,
ThreadFactory threadFactory) {
SplitsWorker splitsWorker = new SplitsWorkerImp(synchronizer);
ThreadFactory threadFactory,
SplitParser splitParser,
SplitCacheProducer splitCacheProducer) {
FeatureFlagsWorker featureFlagsWorker = new FeatureFlagWorkerImp(synchronizer, splitParser, splitCacheProducer, telemetryRuntimeProducer);
Worker<SegmentQueueDto> segmentWorker = new SegmentsWorkerImp(synchronizer);
PushStatusTracker pushStatusTracker = new PushStatusTrackerImp(statusMessages, telemetryRuntimeProducer);
return new PushManagerImp(new AuthApiClientImp(authUrl, splitAPI.getHttpClient(), telemetryRuntimeProducer),
EventSourceClientImp.build(streamingUrl, splitsWorker, segmentWorker, pushStatusTracker, splitAPI.getSseHttpClient(), telemetryRuntimeProducer, threadFactory),
splitsWorker,
EventSourceClientImp.build(streamingUrl, featureFlagsWorker, segmentWorker, pushStatusTracker, splitAPI.getSseHttpClient(), telemetryRuntimeProducer, threadFactory),
featureFlagsWorker,
segmentWorker,
pushStatusTracker,
telemetryRuntimeProducer,
Expand Down Expand Up @@ -134,13 +138,13 @@ private boolean startSse(String token, String channels) {

@Override
public synchronized void startWorkers() {
_splitsWorker.start();
_featureFlagsWorker.start();
_segmentWorker.start();
}

@Override
public synchronized void stopWorkers() {
_splitsWorker.stop();
_featureFlagsWorker.stop();
_segmentWorker.stop();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import io.split.client.SplitClientConfig;
import io.split.engine.SDKReadinessGates;
import io.split.engine.experiments.SplitFetcher;
import io.split.engine.experiments.SplitParser;
import io.split.engine.experiments.SplitSynchronizationTask;
import io.split.engine.segments.SegmentSynchronizationTask;
import io.split.storages.SegmentCacheProducer;
Expand Down Expand Up @@ -85,7 +86,8 @@ public static SyncManagerImp build(SplitTasks splitTasks,
SDKReadinessGates gates,
TelemetryRuntimeProducer telemetryRuntimeProducer,
TelemetrySynchronizer telemetrySynchronizer,
SplitClientConfig config) {
SplitClientConfig config,
SplitParser splitParser) {
LinkedBlockingQueue<PushManager.Status> pushMessages = new LinkedBlockingQueue<>();
Synchronizer synchronizer = new SynchronizerImp(splitTasks,
splitFetcher,
Expand All @@ -102,7 +104,9 @@ public static SyncManagerImp build(SplitTasks splitTasks,
splitAPI,
pushMessages,
telemetryRuntimeProducer,
config.getThreadFactory());
config.getThreadFactory(),
splitParser,
splitCacheProducer);

return new SyncManagerImp(splitTasks,
config.streamingEnabled(),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package io.split.engine.common;

import io.split.engine.sse.dtos.SplitKillNotification;

public interface Synchronizer {
boolean syncAll();
void startPeriodicFetching();
void stopPeriodicFetching();
void refreshSplits(Long targetChangeNumber);
void localKillSplit(String featureFlagName, String defaultTreatment, long newChangeNumber);
void localKillSplit(SplitKillNotification splitKillNotification);
void refreshSegment(String segmentName, Long targetChangeNumber);
void startPeriodicDataRecording();
void stopPeriodicDataRecording();
void forceRefreshSegment(String segmentName);
}
Loading

0 comments on commit 28a9a70

Please sign in to comment.