Skip to content

Commit

Permalink
[opt](task-assignment) use consistent hash as default task assigner a…
Browse files Browse the repository at this point in the history
…nd cache the consistent hash ring (#28522)

1. Use consistent hash algo as the default assigner for file query scan node
    A consistent assignment can better utilize the page cache of BE node.

2. Cache the consistent hash ring
    Init a consistent hash ring is time-consuming because there a thousands of virtual node need to be added.
    So cache it for better performance
  • Loading branch information
morningman committed Dec 22, 2023
1 parent f0a9d34 commit 9ecbd30
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
import org.apache.doris.thrift.TScanRangeLocations;

import com.google.common.base.Preconditions;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
Expand All @@ -46,7 +49,9 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

public class FederationBackendPolicy {
Expand All @@ -59,6 +64,53 @@ public class FederationBackendPolicy {
private int nextBe = 0;
private boolean initialized = false;

// Create a ConsistentHash ring may be a time-consuming operation, so we cache it.
private static LoadingCache<HashCacheKey, ConsistentHash<TScanRangeLocations, Backend>> consistentHashCache;

static {
consistentHashCache = CacheBuilder.newBuilder().maximumSize(5)
.build(new CacheLoader<HashCacheKey, ConsistentHash<TScanRangeLocations, Backend>>() {
@Override
public ConsistentHash<TScanRangeLocations, Backend> load(HashCacheKey key) {
return new ConsistentHash<>(Hashing.murmur3_128(), new ScanRangeHash(),
new BackendHash(), key.bes, Config.virtual_node_number);
}
});
}

private static class HashCacheKey {
// sorted backend ids as key
private List<Long> beIds;
// backends is not part of key, just an attachment
private List<Backend> bes;

HashCacheKey(List<Backend> backends) {
this.bes = backends;
this.beIds = backends.stream().map(b -> b.getId()).sorted().collect(Collectors.toList());
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (!(obj instanceof HashCacheKey)) {
return false;
}
return Objects.equals(beIds, ((HashCacheKey) obj).beIds);
}

@Override
public int hashCode() {
return Objects.hash(beIds);
}

@Override
public String toString() {
return "HashCache{" + "beIds=" + beIds + '}';
}
}

public void init() throws UserException {
if (!initialized) {
init(Collections.emptyList());
Expand Down Expand Up @@ -96,8 +148,11 @@ public void init(BeSelectionPolicy policy) throws UserException {
throw new UserException("No available backends");
}
backendMap.putAll(backends.stream().collect(Collectors.groupingBy(Backend::getHost)));
consistentHash = new ConsistentHash<>(Hashing.murmur3_128(), new ScanRangeHash(),
new BackendHash(), backends, Config.virtual_node_number);
try {
consistentHash = consistentHashCache.get(new HashCacheKey(backends));
} catch (ExecutionException e) {
throw new UserException("failed to get consistent hash", e);
}
}

public Backend getNextBe() {
Expand All @@ -111,7 +166,7 @@ public Backend getNextConsistentBe(TScanRangeLocations scanRangeLocations) {
}

// Try to find a local BE, if not exists, use `getNextBe` instead
public Backend getNextLocalBe(List<String> hosts) {
public Backend getNextLocalBe(List<String> hosts, TScanRangeLocations scanRangeLocations) {
List<Backend> candidateBackends = Lists.newArrayListWithCapacity(hosts.size());
for (String host : hosts) {
List<Backend> backends = backendMap.get(host);
Expand All @@ -121,7 +176,7 @@ public Backend getNextLocalBe(List<String> hosts) {
}

return CollectionUtils.isEmpty(candidateBackends)
? getNextBe()
? getNextConsistentBe(scanRangeLocations)
: candidateBackends.get(random.nextInt(candidateBackends.size()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,6 @@ public void createScanRangeLocations() throws UserException {
params.setProperties(locationProperties);
}

boolean enableSqlCache = ConnectContext.get().getSessionVariable().enableFileCache;
boolean enableShortCircuitRead = HdfsResource.enableShortCircuitRead(locationProperties);
List<String> pathPartitionKeys = getPathPartitionKeys();
for (Split split : inputSplits) {
Expand Down Expand Up @@ -346,14 +345,12 @@ public void createScanRangeLocations() throws UserException {
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
TScanRangeLocation location = new TScanRangeLocation();
Backend selectedBackend;
if (enableSqlCache) {
// Use consistent hash to assign the same scan range into the same backend among different queries
selectedBackend = backendPolicy.getNextConsistentBe(curLocations);
} else if (enableShortCircuitRead) {
if (enableShortCircuitRead) {
// Try to find a local BE if enable hdfs short circuit read
selectedBackend = backendPolicy.getNextLocalBe(Arrays.asList(fileSplit.getHosts()));
selectedBackend = backendPolicy.getNextLocalBe(Arrays.asList(fileSplit.getHosts()), curLocations);
} else {
selectedBackend = backendPolicy.getNextBe();
// Use consistent hash to assign the same scan range into the same backend among different queries
selectedBackend = backendPolicy.getNextConsistentBe(curLocations);
}
setLocationPropertiesIfNecessary(selectedBackend, locationType, locationProperties);
location.setBackendId(selectedBackend.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@
import org.apache.doris.planner.external.FederationBackendPolicy;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TExternalScanRange;
import org.apache.doris.thrift.TFileRangeDesc;
import org.apache.doris.thrift.TFileScanRange;
import org.apache.doris.thrift.TScanRange;
import org.apache.doris.thrift.TScanRangeLocations;

import com.google.common.base.Stopwatch;
import mockit.Mock;
Expand Down Expand Up @@ -93,12 +98,50 @@ public void testGetNextLocalBe() throws UserException {
int invokeTimes = 1000000;
Assertions.assertEquals(policy.numBackends(), backendNum);
List<String> localHosts = Arrays.asList("192.168.1.0", "192.168.1.1", "192.168.1.2");
TScanRangeLocations scanRangeLocations = getScanRangeLocations("path1", 0, 100);
Stopwatch sw = Stopwatch.createStarted();
for (int i = 0; i < invokeTimes; i++) {
Assertions.assertTrue(localHosts.contains(policy.getNextLocalBe(localHosts).getHost()));
Assertions.assertTrue(localHosts.contains(policy.getNextLocalBe(localHosts, scanRangeLocations).getHost()));
}
sw.stop();
System.out.println("Invoke getNextLocalBe() " + invokeTimes
+ " times cost [" + sw.elapsed(TimeUnit.MILLISECONDS) + "] ms");
}

@Test
public void testConsistentHash() throws UserException {
FederationBackendPolicy policy = new FederationBackendPolicy();
policy.init();
int backendNum = 200;
Assertions.assertEquals(policy.numBackends(), backendNum);

TScanRangeLocations scanRangeLocations = getScanRangeLocations("path1", 0, 100);
Assertions.assertEquals(39, policy.getNextConsistentBe(scanRangeLocations).getId());

scanRangeLocations = getScanRangeLocations("path2", 0, 100);
Assertions.assertEquals(78, policy.getNextConsistentBe(scanRangeLocations).getId());
}

private TScanRangeLocations getScanRangeLocations(String path, long startOffset, long size) {
// Generate on file scan range
TFileScanRange fileScanRange = new TFileScanRange();
// Scan range
TExternalScanRange externalScanRange = new TExternalScanRange();
externalScanRange.setFileScanRange(fileScanRange);
TScanRange scanRange = new TScanRange();
scanRange.setExtScanRange(externalScanRange);
scanRange.getExtScanRange().getFileScanRange().addToRanges(createRangeDesc(path, startOffset, size));
// Locations
TScanRangeLocations locations = new TScanRangeLocations();
locations.setScanRange(scanRange);
return locations;
}

private TFileRangeDesc createRangeDesc(String path, long startOffset, long size) {
TFileRangeDesc rangeDesc = new TFileRangeDesc();
rangeDesc.setPath(path);
rangeDesc.setStartOffset(startOffset);
rangeDesc.setSize(size);
return rangeDesc;
}
}

0 comments on commit 9ecbd30

Please sign in to comment.