Skip to content

Commit

Permalink
Merge pull request #15244 from cdapio/CDAP-20710-artifact-localizer-p…
Browse files Browse the repository at this point in the history
…reload

CDAP-20710: Only preload artifacts with configured version count
  • Loading branch information
samdgupi authored Jul 22, 2023
2 parents a686e34 + 4c2c05a commit 996c948
Show file tree
Hide file tree
Showing 7 changed files with 220 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@
package io.cdap.cdap.internal.app.worker.sidecar;

import com.google.inject.Inject;
import io.cdap.cdap.api.artifact.ApplicationClass;
import io.cdap.cdap.api.artifact.ArtifactInfo;
import io.cdap.cdap.api.artifact.ArtifactManager;
import io.cdap.cdap.api.artifact.ArtifactVersion;
import io.cdap.cdap.api.retry.RetryableException;
import io.cdap.cdap.common.ArtifactNotFoundException;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants;
Expand All @@ -37,7 +38,10 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -47,31 +51,33 @@
* class will run in the sidecar container that is defined by {@link
* ArtifactLocalizerTwillRunnable}.
*
* <p>
* Artifacts will be cached using the following file structure:
* {@code /DATA_DIRECTORY/artifacts/<namespace>/<artifact-name>/<artifact-version>/<last-modified-timestamp>.jar}
* </p>
*
* <p>
* Artifacts will be unpacked using the following file structure:
* {@code /DATA_DIRECTORY/unpacked/<namespace>/<artifact-name>/<artifact-version>/<last-modified-timestamp>/...}
* </p>
*
* The procedure for fetching an artifact is:
*
* <p>
* The procedure for fetching an artifact is:<br>
* 1. Check if there is a locally cached version of the artifact, if so fetch the lastModified
* timestamp from the filename. 2. Send a request to the
* timestamp from the filename.<br>
* 2. Send a request to the
* {@link io.cdap.cdap.gateway.handlers.ArtifactHttpHandlerInternal#getArtifactBytes}
* endpoint in appfabric and provide the lastModified timestamp (if available) 3. If a lastModified
* timestamp was not specified, or there is a newer version of the artifact: appfabric will stream
* endpoint in appfabric and provide the lastModified timestamp (if available) <br>
* 3. If a lastModified timestamp was not specified, or there is a newer version of the artifact: appfabric will stream
* the bytes for the newest version of the jar and pass the new lastModified timestamp in the
* response headers
*
* OR
*
* response headers <br>
* OR <br>
* If the provided lastModified timestamp matches the newest version of the artifact: appfabric will
* return NOT_MODIFIED
*
* 4. Return the local path to the newest version of the artifact jar.
*
* return NOT_MODIFIED <br>
* 4. Return the local path to the newest version of the artifact jar. <br>
* NOTE: There is no need to invalidate the cache at any point since we will always need to call
* appfabric to confirm that the cached version is the newest version available.
* </p>
*/
public class ArtifactLocalizer extends AbstractArtifactLocalizer {

Expand All @@ -81,6 +87,9 @@ public class ArtifactLocalizer extends AbstractArtifactLocalizer {
private final ArtifactManagerFactory artifactManagerFactory;
private final RemoteClient remoteClient;

/**
* Constructor for ArtifactLocalizer.
*/
@Inject
public ArtifactLocalizer(CConfiguration cConf, RemoteClientFactory remoteClientFactory,
ArtifactManagerFactory artifactManagerFactory) {
Expand Down Expand Up @@ -150,7 +159,7 @@ public File getAndUnpackArtifact(ArtifactId artifactId) throws Exception {

/**
* fetchArtifact attempts to connect to app fabric to download the given artifact. This method
* will throw {@linkRetryableException} in certain circumstances.
* will throw {@link RetryableException} in certain circumstances.
*
* @param artifactId The ArtifactId of the artifact to fetch
* @return The Local Location for this artifact
Expand All @@ -170,7 +179,7 @@ private Path getLocalPath(String dirName, ArtifactId artifactId) {

/**
* Returns a {@link File} representing the cache directory jars for the given artifact. The file
* path is: /DATA_DIRECTORY/artifacts/<namespace>/<artifact-name>/<artifact-version>/
* path is: {@literal /DATA_DIRECTORY/artifacts/<namespace>/<artifact-name>/<artifact-version>/}
*/
private File getArtifactDirLocation(ArtifactId artifactId) {
return getLocalPath("artifacts", artifactId).toFile();
Expand All @@ -186,23 +195,39 @@ private File getUnpackLocalPath(ArtifactId artifactId, long lastModifiedTimestam
.toFile();
}

public void preloadArtifacts(Set<String> artifactNames)
throws IOException, ArtifactNotFoundException {
/**
* Preloads the latest version of selected artifacts.
*
* @param artifactNames list of artifact names to be preloaded in cache
* @param versionCount number of versions to be cached
*/
public void preloadArtifacts(Set<String> artifactNames, int versionCount)
throws IOException {
ArtifactManager artifactManager = artifactManagerFactory.create(
NamespaceId.SYSTEM,
RetryStrategies.fromConfiguration(cConf, Constants.Service.TASK_WORKER + "."));

for (ArtifactInfo info : artifactManager.listArtifacts()) {
if (artifactNames.contains(info.getName()) && info.getParents().isEmpty()) {
String className = info.getClasses().getApps().stream()
.findFirst()
.map(ApplicationClass::getClassName)
.orElse(null);

LOG.info("Preloading artifact {}:{}-{}", info.getScope(), info.getName(),
info.getVersion());
List<ArtifactInfo> allArtifacts = artifactManager.listArtifacts();
for (String artifactName : artifactNames) {
List<ArtifactInfo> artifactsToPreload = allArtifacts.stream()
.filter(artifactInfo -> artifactInfo.getName().equals(artifactName))
.sorted(Comparator.comparing((ArtifactInfo artifactInfo) ->
new ArtifactVersion(artifactInfo.getVersion())).reversed())
.limit(versionCount)
.collect(Collectors.toList());

if (artifactsToPreload.isEmpty()) {
LOG.warn("Found no artifact to preload for {}", artifactName);
}

for (ArtifactInfo artifactInfo : artifactsToPreload) {
LOG.info("Preloading artifact {}:{}-{}", artifactInfo.getScope(),
artifactInfo.getName(),
artifactInfo.getVersion());

ArtifactId artifactId = NamespaceId.SYSTEM.artifact(info.getName(), info.getVersion());
ArtifactId artifactId = NamespaceId.SYSTEM.artifact(artifactInfo.getName(),
artifactInfo.getVersion());
try {
fetchArtifact(artifactId);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ protected void startUp() throws Exception {
cacheCleanupInterval, TimeUnit.MINUTES);

artifactLocalizer.preloadArtifacts(
new HashSet<>(cConf.getTrimmedStringCollection(Constants.TaskWorker.PRELOAD_ARTIFACTS)));
new HashSet<>(cConf.getTrimmedStringCollection(Constants.ArtifactLocalizer.PRELOAD_LIST)),
cConf.getInt(Constants.ArtifactLocalizer.PRELOAD_VERSION_LIMIT));

LOG.debug("Starting ArtifactLocalizerService has completed");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ private CConfiguration createCConf() {
cConf.set(Constants.TaskWorker.ADDRESS, "localhost");
cConf.setInt(Constants.TaskWorker.PORT, 0);
cConf.setBoolean(Constants.Security.SSL.INTERNAL_ENABLED, false);
cConf.set(Constants.TaskWorker.PRELOAD_ARTIFACTS, "");
cConf.set(Constants.ArtifactLocalizer.PRELOAD_LIST, "");
cConf.setInt(Constants.ArtifactLocalizer.PRELOAD_VERSION_LIMIT, 1);
cConf.setInt(Constants.TaskWorker.CONTAINER_KILL_AFTER_REQUEST_COUNT, 1);
return cConf;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ private CConfiguration createCConf() {
cConf.set(Constants.TaskWorker.ADDRESS, "localhost");
cConf.setInt(Constants.TaskWorker.PORT, 0);
cConf.setBoolean(Constants.Security.SSL.INTERNAL_ENABLED, false);
cConf.set(Constants.TaskWorker.PRELOAD_ARTIFACTS, "");
cConf.set(Constants.ArtifactLocalizer.PRELOAD_LIST, "");
cConf.setInt(Constants.ArtifactLocalizer.PRELOAD_VERSION_LIMIT, 1);
cConf.setInt(Constants.TaskWorker.CONTAINER_KILL_AFTER_REQUEST_COUNT, 1);
return cConf;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.internal.app.worker.sidecar;

import io.cdap.cdap.api.artifact.ArtifactInfo;
import io.cdap.cdap.api.artifact.ArtifactManager;
import io.cdap.cdap.common.ArtifactNotFoundException;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants;
import io.cdap.cdap.common.internal.remote.RemoteClient;
import io.cdap.cdap.common.internal.remote.RemoteClientFactory;
import io.cdap.cdap.internal.app.services.http.AppFabricTestBase;
import io.cdap.common.http.HttpMethod;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import org.apache.twill.discovery.DiscoveryServiceClient;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.Mockito;

public class ArtifactLocalizerTest extends AppFabricTestBase {

@ClassRule
public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();

private ArtifactLocalizer createArtifactLocalizer(ArtifactManager mockManager,
RemoteClient mockClient)
throws IOException {
CConfiguration cConf = CConfiguration.create();
cConf.set(Constants.CFG_LOCAL_DATA_DIR, TEMP_FOLDER.newFolder().getAbsolutePath());
DiscoveryServiceClient discoveryClient = getInjector().getInstance(
DiscoveryServiceClient.class);

RemoteClientFactory remoteClientFactory = Mockito.mock(RemoteClientFactory.class);
Mockito.when(remoteClientFactory.createRemoteClient(Mockito.anyString(), Mockito.any(),
Mockito.anyString()))
.thenReturn(mockClient);
return new ArtifactLocalizer(cConf, remoteClientFactory,
(namespaceId, retryStrategy) -> mockManager);
}

@Test
public void testPreloadArtifacts() throws IOException, ArtifactNotFoundException {
ArtifactInfo dataPipeline1 = new ArtifactInfo("cdap-data-pipeline", "6.8", null, null, null);
ArtifactInfo dataPipeline2 = new ArtifactInfo("cdap-data-pipeline", "6.9", null, null, null);
ArtifactInfo dataPipeline3 = new ArtifactInfo("cdap-data-pipeline", "6.10", null, null, null);
ArtifactInfo wrangler1 = new ArtifactInfo("wrangler", "4.8", null, null, null);
ArtifactInfo wrangler2 = new ArtifactInfo("wrangler", "4.9", null, null, null);
ArtifactInfo wrangler3 = new ArtifactInfo("wrangler", "4.10", null, null, null);
List<ArtifactInfo> infos = Arrays.asList(dataPipeline1, dataPipeline2, dataPipeline3, wrangler1,
wrangler2, wrangler3);

ArtifactManager mockManager = Mockito.mock(ArtifactManager.class);
Mockito.when(mockManager.listArtifacts()).thenReturn(infos);
RemoteClient mockClient = Mockito.mock(RemoteClient.class);

ArtifactLocalizer localizer = createArtifactLocalizer(mockManager, mockClient);
localizer.preloadArtifacts(new HashSet<>(Arrays.asList("cdap-data-pipeline", "wrangler")), 2);

String downloadUrlFormat = "namespaces/default/artifacts/%s/versions/%s/download?scope=SYSTEM";

Mockito.verify(mockClient)
.openConnection(HttpMethod.GET,
String.format(downloadUrlFormat, "cdap-data-pipeline", "6.10"));
Mockito.verify(mockClient)
.openConnection(HttpMethod.GET,
String.format(downloadUrlFormat, "cdap-data-pipeline", "6.9"));
Mockito.verify(mockClient)
.openConnection(HttpMethod.GET, String.format(downloadUrlFormat, "wrangler", "4.10"));
Mockito.verify(mockClient)
.openConnection(HttpMethod.GET, String.format(downloadUrlFormat, "wrangler", "4.9"));
Mockito.verify(mockClient, Mockito.times(4)).openConnection(Mockito.any(), Mockito.anyString());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,6 @@ public static final class TaskWorker {
public static final String POOL_CHECK_INTERVAL = "task.worker.pool.check.interval";
public static final String POOL_ENABLE = "task.worker.pool.enable";
public static final String COMPRESSION_ENABLED = "task.worker.compression.enabled";
public static final String PRELOAD_ARTIFACTS = "task.worker.preload.artifacts";

/**
* Task worker container configurations.
Expand Down Expand Up @@ -585,6 +584,8 @@ public static final class ArtifactLocalizer {
public static final String PORT = "artifact.localizer.bind.port";
public static final String BOSS_THREADS = "artifact.localizer.boss.threads";
public static final String WORKER_THREADS = "artifact.localizer.worker.threads";
public static final String PRELOAD_LIST = "artifact.localizer.preload.list";
public static final String PRELOAD_VERSION_LIMIT = "artifact.localizer.preload.version.limit";
}

/**
Expand Down
Loading

0 comments on commit 996c948

Please sign in to comment.