From b31436776ccc588033321f76d28781b5bf287d4d Mon Sep 17 00:00:00 2001 From: Laurent Klock Date: Thu, 8 Aug 2024 16:04:27 +0200 Subject: [PATCH] Added method ListURLs Signed-off-by: Laurent Klock --- .../urlfrontier/URLFrontierGrpc.java | 84 ++++++++++- .../urlfrontier/Urlfrontier.java | 11 +- API/urlfrontier.proto | 8 +- .../urlfrontier/client/Client.java | 1 + .../urlfrontier/client/ListURLs.java | 130 ++++++++++++++++++ .../service/AbstractFrontierService.java | 5 + .../service/ignite/IgniteService.java | 7 + .../service/memory/MemoryFrontierService.java | 96 +++++++++++++ .../urlfrontier/service/memory/URLQueue.java | 8 ++ .../service/rocksdb/RocksDBService.java | 106 ++++++++++++++ .../rocksdb/ShardedRocksDBService.java | 7 + .../service/MemoryFrontierServiceTest.java | 53 ++++++- .../service/RocksDBServiceTest.java | 41 ++++++ .../urlfrontier/service/ServiceTestUtil.java | 3 + 14 files changed, 546 insertions(+), 14 deletions(-) create mode 100644 client/src/main/java/crawlercommons/urlfrontier/client/ListURLs.java diff --git a/API/src/main/java/crawlercommons/urlfrontier/URLFrontierGrpc.java b/API/src/main/java/crawlercommons/urlfrontier/URLFrontierGrpc.java index f5d2694..73a7b3c 100644 --- a/API/src/main/java/crawlercommons/urlfrontier/URLFrontierGrpc.java +++ b/API/src/main/java/crawlercommons/urlfrontier/URLFrontierGrpc.java @@ -480,6 +480,37 @@ crawlercommons.urlfrontier.Urlfrontier.URLItem> getGetURLStatusMethod() { return getGetURLStatusMethod; } + private static volatile io.grpc.MethodDescriptor getListURLsMethod; + + @io.grpc.stub.annotations.RpcMethod( + fullMethodName = SERVICE_NAME + '/' + "ListURLs", + requestType = crawlercommons.urlfrontier.Urlfrontier.Pagination.class, + responseType = crawlercommons.urlfrontier.Urlfrontier.URLItem.class, + methodType = io.grpc.MethodDescriptor.MethodType.SERVER_STREAMING) + public static io.grpc.MethodDescriptor getListURLsMethod() { + io.grpc.MethodDescriptor getListURLsMethod; + if ((getListURLsMethod = URLFrontierGrpc.getListURLsMethod) == null) { + synchronized (URLFrontierGrpc.class) { + if ((getListURLsMethod = URLFrontierGrpc.getListURLsMethod) == null) { + URLFrontierGrpc.getListURLsMethod = getListURLsMethod = + io.grpc.MethodDescriptor.newBuilder() + .setType(io.grpc.MethodDescriptor.MethodType.SERVER_STREAMING) + .setFullMethodName(generateFullMethodName(SERVICE_NAME, "ListURLs")) + .setSampledToLocalTracing(true) + .setRequestMarshaller(io.grpc.protobuf.ProtoUtils.marshaller( + crawlercommons.urlfrontier.Urlfrontier.Pagination.getDefaultInstance())) + .setResponseMarshaller(io.grpc.protobuf.ProtoUtils.marshaller( + crawlercommons.urlfrontier.Urlfrontier.URLItem.getDefaultInstance())) + .setSchemaDescriptor(new URLFrontierMethodDescriptorSupplier("ListURLs")) + .build(); + } + } + } + return getListURLsMethod; + } + /** * Creates a new async stub that supports all call types for the service */ @@ -686,6 +717,18 @@ public void getURLStatus(crawlercommons.urlfrontier.Urlfrontier.URLStatusRequest io.grpc.stub.ServerCalls.asyncUnimplementedUnaryCall(getGetURLStatusMethod(), responseObserver); } + /** + *
+     ** List all URLs currently in the frontier 
+     *This does not take into account URL scheduling.
+     *Used to check current status of all URLs within the frontier
+     * 
+ */ + public void listURLs(crawlercommons.urlfrontier.Urlfrontier.Pagination request, + io.grpc.stub.StreamObserver responseObserver) { + io.grpc.stub.ServerCalls.asyncUnimplementedUnaryCall(getListURLsMethod(), responseObserver); + } + @java.lang.Override public final io.grpc.ServerServiceDefinition bindService() { return io.grpc.ServerServiceDefinition.builder(getServiceDescriptor()) .addMethod( @@ -793,6 +836,13 @@ public void getURLStatus(crawlercommons.urlfrontier.Urlfrontier.URLStatusRequest crawlercommons.urlfrontier.Urlfrontier.URLStatusRequest, crawlercommons.urlfrontier.Urlfrontier.URLItem>( this, METHODID_GET_URLSTATUS))) + .addMethod( + getListURLsMethod(), + io.grpc.stub.ServerCalls.asyncServerStreamingCall( + new MethodHandlers< + crawlercommons.urlfrontier.Urlfrontier.Pagination, + crawlercommons.urlfrontier.Urlfrontier.URLItem>( + this, METHODID_LIST_URLS))) .build(); } } @@ -983,6 +1033,19 @@ public void getURLStatus(crawlercommons.urlfrontier.Urlfrontier.URLStatusRequest io.grpc.stub.ClientCalls.asyncUnaryCall( getChannel().newCall(getGetURLStatusMethod(), getCallOptions()), request, responseObserver); } + + /** + *
+     ** List all URLs currently in the frontier 
+     *This does not take into account URL scheduling.
+     *Used to check current status of all URLs within the frontier
+     * 
+ */ + public void listURLs(crawlercommons.urlfrontier.Urlfrontier.Pagination request, + io.grpc.stub.StreamObserver responseObserver) { + io.grpc.stub.ClientCalls.asyncServerStreamingCall( + getChannel().newCall(getListURLsMethod(), getCallOptions()), request, responseObserver); + } } /** @@ -1147,6 +1210,19 @@ public crawlercommons.urlfrontier.Urlfrontier.URLItem getURLStatus(crawlercommon return io.grpc.stub.ClientCalls.blockingUnaryCall( getChannel(), getGetURLStatusMethod(), getCallOptions(), request); } + + /** + *
+     ** List all URLs currently in the frontier 
+     *This does not take into account URL scheduling.
+     *Used to check current status of all URLs within the frontier
+     * 
+ */ + public java.util.Iterator listURLs( + crawlercommons.urlfrontier.Urlfrontier.Pagination request) { + return io.grpc.stub.ClientCalls.blockingServerStreamingCall( + getChannel(), getListURLsMethod(), getCallOptions(), request); + } } /** @@ -1329,7 +1405,8 @@ public com.google.common.util.concurrent.ListenableFuture implements io.grpc.stub.ServerCalls.UnaryMethod, @@ -1404,6 +1481,10 @@ public void invoke(Req request, io.grpc.stub.StreamObserver responseObserv serviceImpl.getURLStatus((crawlercommons.urlfrontier.Urlfrontier.URLStatusRequest) request, (io.grpc.stub.StreamObserver) responseObserver); break; + case METHODID_LIST_URLS: + serviceImpl.listURLs((crawlercommons.urlfrontier.Urlfrontier.Pagination) request, + (io.grpc.stub.StreamObserver) responseObserver); + break; default: throw new AssertionError(); } @@ -1483,6 +1564,7 @@ public static io.grpc.ServiceDescriptor getServiceDescriptor() { .addMethod(getSetLogLevelMethod()) .addMethod(getSetCrawlLimitMethod()) .addMethod(getGetURLStatusMethod()) + .addMethod(getListURLsMethod()) .build(); } } diff --git a/API/src/main/java/crawlercommons/urlfrontier/Urlfrontier.java b/API/src/main/java/crawlercommons/urlfrontier/Urlfrontier.java index 5511e7f..abd2625 100644 --- a/API/src/main/java/crawlercommons/urlfrontier/Urlfrontier.java +++ b/API/src/main/java/crawlercommons/urlfrontier/Urlfrontier.java @@ -9746,7 +9746,7 @@ public interface GetParamsOrBuilder extends } /** *
-   ** Parameter message for GetURLs *
+   ** Parameter message for GetURLs and ListURLs *
    * 
* * Protobuf type {@code urlfrontier.GetParams} @@ -10231,7 +10231,7 @@ protected Builder newBuilderForType( } /** *
-     ** Parameter message for GetURLs *
+     ** Parameter message for GetURLs and ListURLs *
      * 
* * Protobuf type {@code urlfrontier.GetParams} @@ -18896,7 +18896,7 @@ public crawlercommons.urlfrontier.Urlfrontier.URLStatusRequest getDefaultInstanc "\005ERROR\020\004\"?\n\020CrawlLimitParams\022\013\n\003key\030\001 \001(" + "\t\022\r\n\005limit\030\002 \001(\r\022\017\n\007crawlID\030\003 \001(\t\"=\n\020URL" + "StatusRequest\022\013\n\003url\030\001 \001(\t\022\013\n\003key\030\002 \001(\t\022" + - "\017\n\007crawlID\030\003 \001(\t2\342\007\n\013URLFrontier\022:\n\tList" + + "\017\n\007crawlID\030\003 \001(\t2\241\010\n\013URLFrontier\022:\n\tList" + "Nodes\022\022.urlfrontier.Empty\032\027.urlfrontier." + "StringList\"\000\022;\n\nListCrawls\022\022.urlfrontier" + ".Local\032\027.urlfrontier.StringList\"\000\022C\n\013Del" + @@ -18921,8 +18921,9 @@ public crawlercommons.urlfrontier.Urlfrontier.URLStatusRequest getDefaultInstanc "rawlLimit\022\035.urlfrontier.CrawlLimitParams" + "\032\022.urlfrontier.Empty\"\000\022E\n\014GetURLStatus\022\035" + ".urlfrontier.URLStatusRequest\032\024.urlfront" + - "ier.URLItem\"\000B\034\n\032crawlercommons.urlfront" + - "ierb\006proto3" + "ier.URLItem\"\000\022=\n\010ListURLs\022\027.urlfrontier." + + "Pagination\032\024.urlfrontier.URLItem\"\0000\001B\034\n\032" + + "crawlercommons.urlfrontierb\006proto3" }; descriptor = com.google.protobuf.Descriptors.FileDescriptor .internalBuildGeneratedFileFrom(descriptorData, diff --git a/API/urlfrontier.proto b/API/urlfrontier.proto index 9835248..382b895 100644 --- a/API/urlfrontier.proto +++ b/API/urlfrontier.proto @@ -78,6 +78,12 @@ service URLFrontier { Used to check current status of an URL within the frontier **/ rpc GetURLStatus(URLStatusRequest) returns (URLItem) {} + + /** List all URLs currently in the frontier + This does not take into account URL scheduling. + Used to check current status of all URLs within the frontier + **/ + rpc ListURLs(Pagination) returns (stream URLItem) {} } /** @@ -186,7 +192,7 @@ message BlockQueueParams { bool local = 4; } -/** Parameter message for GetURLs **/ +/** Parameter message for GetURLs and ListURLs **/ message GetParams { // maximum number of URLs per queue, the default value of 0 means no limit uint32 max_urls_per_queue = 1; diff --git a/client/src/main/java/crawlercommons/urlfrontier/client/Client.java b/client/src/main/java/crawlercommons/urlfrontier/client/Client.java index 2f765cd..886771b 100644 --- a/client/src/main/java/crawlercommons/urlfrontier/client/Client.java +++ b/client/src/main/java/crawlercommons/urlfrontier/client/Client.java @@ -15,6 +15,7 @@ ListNodes.class, ListQueues.class, ListCrawls.class, + ListURLs.class, GetStats.class, PutURLs.class, GetURLs.class, diff --git a/client/src/main/java/crawlercommons/urlfrontier/client/ListURLs.java b/client/src/main/java/crawlercommons/urlfrontier/client/ListURLs.java new file mode 100644 index 0000000..9b5180a --- /dev/null +++ b/client/src/main/java/crawlercommons/urlfrontier/client/ListURLs.java @@ -0,0 +1,130 @@ +// SPDX-FileCopyrightText: 2020 Crawler-commons +// SPDX-License-Identifier: Apache-2.0 + +package crawlercommons.urlfrontier.client; + +import crawlercommons.urlfrontier.URLFrontierGrpc; +import crawlercommons.urlfrontier.URLFrontierGrpc.URLFrontierBlockingStub; +import crawlercommons.urlfrontier.Urlfrontier.Pagination.Builder; +import crawlercommons.urlfrontier.Urlfrontier.URLItem; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import java.io.File; +import java.io.IOException; +import java.io.PrintStream; +import java.nio.charset.Charset; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.Iterator; +import picocli.CommandLine.Command; +import picocli.CommandLine.Option; +import picocli.CommandLine.ParentCommand; + +@Command( + name = "ListURLs", + description = "Prints out all URLs in the Frontier", + sortOptions = false) +public class ListURLs implements Runnable { + + @ParentCommand private Client parent; + + @Option( + names = {"-n", "--number_urls"}, + defaultValue = "100", + paramLabel = "NUM", + description = "maximum number of URLs to return (default 100)") + private int maxNumURLs; + + @Option( + names = {"-s", "--start"}, + defaultValue = "0", + paramLabel = "NUM", + description = "starting position of URL to return (default 0)") + private int start; + + @Option( + names = {"-o", "--output"}, + defaultValue = "", + paramLabel = "STRING", + description = "output file to dump all the URLs") + private String output; + + @Option( + names = {"-c", "--crawlID"}, + defaultValue = "DEFAULT", + paramLabel = "STRING", + description = "crawl to get the queues for") + private String crawl; + + @Option( + names = {"-l", "--local"}, + defaultValue = "false", + paramLabel = "BOOLEAN", + description = + "restricts the scope to this frontier instance instead of aggregating over the cluster") + private Boolean local; + + @Option( + names = {"-p", "--parsedate"}, + defaultValue = "false", + description = { + "Print the refetch date in local time zone", + "By default, time is in UTC seconds since the Unix epoch" + }) + private boolean parse; + + // Use the system default time zone + private ZoneId zoneId = ZoneId.systemDefault(); + + @Override + public void run() { + + Builder builder = crawlercommons.urlfrontier.Urlfrontier.Pagination.newBuilder(); + builder.setLocal(local); + builder.setSize(maxNumURLs); + builder.setStart(start); + builder.setIncludeInactive(true); + builder.setCrawlID(crawl); + + PrintStream outstream = null; + if (output.length() > 0) { + File f = new File(output); + f.delete(); + try { + outstream = new PrintStream(f, Charset.defaultCharset()); + } catch (IOException e) { + e.printStackTrace(System.err); + return; + } + } else { + outstream = System.out; + } + + ManagedChannel channel = + ManagedChannelBuilder.forAddress(parent.hostname, parent.port) + .usePlaintext() + .build(); + URLFrontierBlockingStub blockingFrontier = URLFrontierGrpc.newBlockingStub(channel); + + Iterator it = blockingFrontier.listURLs(builder.build()); + while (it.hasNext()) { + + URLItem item = it.next(); + + String fetchDate; + if (parse) { + Instant instant = Instant.ofEpochSecond(item.getKnown().getRefetchableFromDate()); + LocalDateTime localDate = instant.atZone(zoneId).toLocalDateTime(); + fetchDate = localDate.toString(); + } else { + fetchDate = String.valueOf(item.getKnown().getRefetchableFromDate()); + } + + outstream.println(item.getKnown().getInfo().getUrl() + ";" + fetchDate); + } + + outstream.close(); + channel.shutdownNow(); + } +} diff --git a/service/src/main/java/crawlercommons/urlfrontier/service/AbstractFrontierService.java b/service/src/main/java/crawlercommons/urlfrontier/service/AbstractFrontierService.java index e474688..743421a 100644 --- a/service/src/main/java/crawlercommons/urlfrontier/service/AbstractFrontierService.java +++ b/service/src/main/java/crawlercommons/urlfrontier/service/AbstractFrontierService.java @@ -866,4 +866,9 @@ public void close() throws IOException { public abstract void getURLStatus( crawlercommons.urlfrontier.Urlfrontier.URLStatusRequest request, io.grpc.stub.StreamObserver responseObserver); + + public abstract void listURLs( + crawlercommons.urlfrontier.Urlfrontier.Pagination request, + io.grpc.stub.StreamObserver + responseObserver); } diff --git a/service/src/main/java/crawlercommons/urlfrontier/service/ignite/IgniteService.java b/service/src/main/java/crawlercommons/urlfrontier/service/ignite/IgniteService.java index 5cf651d..a862568 100644 --- a/service/src/main/java/crawlercommons/urlfrontier/service/ignite/IgniteService.java +++ b/service/src/main/java/crawlercommons/urlfrontier/service/ignite/IgniteService.java @@ -9,6 +9,7 @@ import crawlercommons.urlfrontier.Urlfrontier.AckMessage.Status; import crawlercommons.urlfrontier.Urlfrontier.GetParams; import crawlercommons.urlfrontier.Urlfrontier.KnownURLItem; +import crawlercommons.urlfrontier.Urlfrontier.Pagination; import crawlercommons.urlfrontier.Urlfrontier.URLInfo; import crawlercommons.urlfrontier.Urlfrontier.URLItem; import crawlercommons.urlfrontier.Urlfrontier.URLStatusRequest; @@ -848,4 +849,10 @@ protected Status putURLItem(URLItem value) { public void getURLStatus(URLStatusRequest request, StreamObserver responseObserver) { responseObserver.onError(io.grpc.Status.UNIMPLEMENTED.asException()); } + + @Override + // TODO Implementation of listURLs for Ignite + public void listURLs(Pagination request, StreamObserver responseObserver) { + responseObserver.onError(io.grpc.Status.UNIMPLEMENTED.asException()); + } } diff --git a/service/src/main/java/crawlercommons/urlfrontier/service/memory/MemoryFrontierService.java b/service/src/main/java/crawlercommons/urlfrontier/service/memory/MemoryFrontierService.java index 75a85ea..c3f96be 100644 --- a/service/src/main/java/crawlercommons/urlfrontier/service/memory/MemoryFrontierService.java +++ b/service/src/main/java/crawlercommons/urlfrontier/service/memory/MemoryFrontierService.java @@ -4,9 +4,11 @@ package crawlercommons.urlfrontier.service.memory; import com.google.protobuf.InvalidProtocolBufferException; +import crawlercommons.urlfrontier.CrawlID; import crawlercommons.urlfrontier.Urlfrontier.AckMessage; import crawlercommons.urlfrontier.Urlfrontier.AckMessage.Status; import crawlercommons.urlfrontier.Urlfrontier.KnownURLItem; +import crawlercommons.urlfrontier.Urlfrontier.Pagination; import crawlercommons.urlfrontier.Urlfrontier.URLInfo; import crawlercommons.urlfrontier.Urlfrontier.URLItem; import crawlercommons.urlfrontier.Urlfrontier.URLStatusRequest; @@ -18,6 +20,7 @@ import java.util.HashMap; import java.util.Iterator; import java.util.Map; +import java.util.Map.Entry; import java.util.PriorityQueue; import org.slf4j.LoggerFactory; @@ -224,4 +227,97 @@ public void getURLStatus(URLStatusRequest request, StreamObserver respo responseObserver.onError(io.grpc.Status.NOT_FOUND.asRuntimeException()); } } + + @Override + public void listURLs(Pagination request, StreamObserver responseObserver) { + + long maxURLs = request.getSize(); + long start = request.getStart(); + + boolean include_inactive = request.getIncludeInactive(); + + final String normalisedCrawlID = CrawlID.normaliseCrawlID(request.getCrawlID()); + + // 100 by default + if (maxURLs == 0) { + maxURLs = 100; + } + + LOG.info( + "Received request to list URLs [size {}; start {}; inactive {}]", + maxURLs, + start, + include_inactive); + + int pos = -1; + int sent = 0; + + synchronized (getQueues()) { + Iterator> iterator = + getQueues().entrySet().iterator(); + + while (iterator.hasNext()) { + Entry e = iterator.next(); + URLQueue queue = (URLQueue) getQueues().get(e.getKey()); + + // check that it is within the right crawlID + if (!e.getKey().getCrawlid().equals(normalisedCrawlID)) { + continue; + } + + URLItem.Builder builder = URLItem.newBuilder(); + KnownURLItem.Builder knownBuilder = KnownURLItem.newBuilder(); + + // First iterate over completed items + for (String curcomplete : queue.getCompleted()) { + builder.clear(); + knownBuilder.clear(); + + pos++; + URLInfo info = + URLInfo.newBuilder() + .setCrawlID(e.getKey().getCrawlid()) + .setKey(e.getKey().getQueue()) + .setUrl(curcomplete) + .build(); + + knownBuilder.setInfo(info); + knownBuilder.setRefetchableFromDate(0); + builder.setKnown(knownBuilder.build()); + + if (pos >= start) { + responseObserver.onNext(builder.build()); + sent++; + } + } + + // Iterate over scheduled items + Iterator iter = queue.iterator(); + + while (iter.hasNext() && sent <= maxURLs) { + InternalURL item = iter.next(); + pos++; + + builder.clear(); + knownBuilder.clear(); + + try { + knownBuilder.setInfo(item.toURLInfo(e.getKey())); + } catch (InvalidProtocolBufferException e1) { + LOG.error(e1.getMessage(), e1); + } + knownBuilder.setRefetchableFromDate(item.nextFetchDate); + + builder.setKnown(knownBuilder.build()); + + if (pos >= start) { + responseObserver.onNext(builder.build()); + sent++; + } + } + } + } + + responseObserver.onCompleted(); + } } diff --git a/service/src/main/java/crawlercommons/urlfrontier/service/memory/URLQueue.java b/service/src/main/java/crawlercommons/urlfrontier/service/memory/URLQueue.java index 1eef781..098fb41 100644 --- a/service/src/main/java/crawlercommons/urlfrontier/service/memory/URLQueue.java +++ b/service/src/main/java/crawlercommons/urlfrontier/service/memory/URLQueue.java @@ -4,10 +4,12 @@ package crawlercommons.urlfrontier.service.memory; import crawlercommons.urlfrontier.service.QueueInterface; +import java.util.Collections; import java.util.HashSet; import java.util.Iterator; import java.util.Optional; import java.util.PriorityQueue; +import java.util.Set; public class URLQueue extends PriorityQueue implements QueueInterface { @@ -93,6 +95,7 @@ public int getDelay() { public int countActive() { return this.size(); } + public boolean isCompleted(String url) { return completed.contains(url); } @@ -114,4 +117,9 @@ public Boolean isLimitReached() { return getCountCompleted() >= limit.get(); } + + /** @return The unmodifiable set of completed URLs */ + public Set getCompleted() { + return Collections.unmodifiableSet(completed); + } } diff --git a/service/src/main/java/crawlercommons/urlfrontier/service/rocksdb/RocksDBService.java b/service/src/main/java/crawlercommons/urlfrontier/service/rocksdb/RocksDBService.java index 0d7b179..5e5977e 100644 --- a/service/src/main/java/crawlercommons/urlfrontier/service/rocksdb/RocksDBService.java +++ b/service/src/main/java/crawlercommons/urlfrontier/service/rocksdb/RocksDBService.java @@ -7,6 +7,7 @@ import crawlercommons.urlfrontier.CrawlID; import crawlercommons.urlfrontier.Urlfrontier.AckMessage.Status; import crawlercommons.urlfrontier.Urlfrontier.KnownURLItem; +import crawlercommons.urlfrontier.Urlfrontier.Pagination; import crawlercommons.urlfrontier.Urlfrontier.Stats; import crawlercommons.urlfrontier.Urlfrontier.URLInfo; import crawlercommons.urlfrontier.Urlfrontier.URLItem; @@ -862,4 +863,109 @@ public void getURLStatus(URLStatusRequest request, StreamObserver respo responseObserver.onError(io.grpc.Status.NOT_FOUND.asRuntimeException()); } } + + @Override + public void listURLs(Pagination request, StreamObserver responseObserver) { + long maxURLs = request.getSize(); + long start = request.getStart(); + + boolean include_inactive = request.getIncludeInactive(); + + final String normalisedCrawlID = CrawlID.normaliseCrawlID(request.getCrawlID()); + + // 100 by default + if (maxURLs == 0) { + maxURLs = 100; + } + + LOG.info( + "Received request to list URLs [size {}; start {}; inactive {}]", + maxURLs, + start, + include_inactive); + + int pos = -1; + int sent = 0; + + try (RocksIterator rocksIterator = rocksDB.newIterator(columnFamilyHandleList.get(0))) { + URLItem.Builder builder = URLItem.newBuilder(); + KnownURLItem.Builder knownBuilder = KnownURLItem.newBuilder(); + + for (rocksIterator.seekToFirst(); + rocksIterator.isValid() && sent <= maxURLs; + rocksIterator.next()) { + String existenceKey = new String(rocksIterator.key(), StandardCharsets.UTF_8); + QueueWithinCrawl Qkey = QueueWithinCrawl.parseAndDeNormalise(existenceKey); + LOG.debug("Qkey crawlId={} queue={}", Qkey.getCrawlid(), Qkey.getQueue()); + + // check that it is within the right crawlID + if (!Qkey.getCrawlid().equals(normalisedCrawlID)) { + continue; + } + + final String schedulingKey = + new String(rocksIterator.value(), StandardCharsets.UTF_8); + + LOG.debug("current key {}, schedulingKey={}", existenceKey, schedulingKey); + pos++; + + builder.clear(); + knownBuilder.clear(); + + byte[] scheduled = null; + try { + scheduled = rocksDB.get(columnFamilyHandleList.get(1), rocksIterator.value()); + } catch (RocksDBException e) { + LOG.error(e.getMessage(), e); + } + + if (!StringUtil.isNullOrEmpty(schedulingKey)) { + URLInfo info = null; + try { + info = URLInfo.parseFrom(scheduled); + + knownBuilder.setInfo(info); + + final int pos1 = schedulingKey.indexOf('_'); + final int pos2 = schedulingKey.indexOf('_', pos1 + 1); + final int pos3 = schedulingKey.indexOf('_', pos2 + 1); + + long scheduleDate = Long.parseLong(schedulingKey.substring(pos2 + 1, pos3)); + knownBuilder.setRefetchableFromDate(scheduleDate); + + builder.setKnown(knownBuilder.build()); + if (pos >= start) { + responseObserver.onNext(builder.build()); + sent++; + } + } catch (InvalidProtocolBufferException | NumberFormatException e) { + LOG.error(e.getMessage(), e); + } + } else { + LOG.debug("no schedule for {}", existenceKey); + + final int pos1 = existenceKey.indexOf('_'); + final int pos2 = existenceKey.indexOf('_', pos1 + 1); + + URLInfo info = + URLInfo.newBuilder() + .setCrawlID(Qkey.getCrawlid()) + .setKey(Qkey.getQueue()) + .setUrl(existenceKey.substring(pos2 + 1)) + .build(); + + LOG.debug("current value {}", info); + knownBuilder.setRefetchableFromDate(0).setInfo(info).build(); + builder.setKnown(knownBuilder.build()); + + if (pos >= start) { + responseObserver.onNext(builder.build()); + sent++; + } + } + } + } + + responseObserver.onCompleted(); + } } diff --git a/service/src/main/java/crawlercommons/urlfrontier/service/rocksdb/ShardedRocksDBService.java b/service/src/main/java/crawlercommons/urlfrontier/service/rocksdb/ShardedRocksDBService.java index 73bb65a..e3c0070 100644 --- a/service/src/main/java/crawlercommons/urlfrontier/service/rocksdb/ShardedRocksDBService.java +++ b/service/src/main/java/crawlercommons/urlfrontier/service/rocksdb/ShardedRocksDBService.java @@ -4,6 +4,7 @@ package crawlercommons.urlfrontier.service.rocksdb; import crawlercommons.urlfrontier.Urlfrontier.AckMessage.Status; +import crawlercommons.urlfrontier.Urlfrontier.Pagination; import crawlercommons.urlfrontier.Urlfrontier.URLInfo; import crawlercommons.urlfrontier.Urlfrontier.URLItem; import crawlercommons.urlfrontier.Urlfrontier.URLStatusRequest; @@ -89,4 +90,10 @@ public Map getQueues() { public void getURLStatus(URLStatusRequest request, StreamObserver responseObserver) { responseObserver.onError(io.grpc.Status.UNIMPLEMENTED.asException()); } + + @Override + // TODO Implementation of listURLs for ShardedRocksDB + public void listURLs(Pagination request, StreamObserver responseObserver) { + responseObserver.onError(io.grpc.Status.UNIMPLEMENTED.asException()); + } } diff --git a/service/src/test/java/crawlercommons/urlfrontier/service/MemoryFrontierServiceTest.java b/service/src/test/java/crawlercommons/urlfrontier/service/MemoryFrontierServiceTest.java index e644a80..4767100 100644 --- a/service/src/test/java/crawlercommons/urlfrontier/service/MemoryFrontierServiceTest.java +++ b/service/src/test/java/crawlercommons/urlfrontier/service/MemoryFrontierServiceTest.java @@ -1,18 +1,20 @@ +// SPDX-FileCopyrightText: 2020 Crawler-commons +// SPDX-License-Identifier: Apache-2.0 + package crawlercommons.urlfrontier.service; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; -import java.util.concurrent.atomic.AtomicInteger; - -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Test; -import org.slf4j.LoggerFactory; - +import crawlercommons.urlfrontier.Urlfrontier.Pagination; import crawlercommons.urlfrontier.Urlfrontier.URLItem; import crawlercommons.urlfrontier.Urlfrontier.URLStatusRequest; import crawlercommons.urlfrontier.service.memory.MemoryFrontierService; import io.grpc.stub.StreamObserver; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.slf4j.LoggerFactory; class MemoryFrontierServiceTest { @@ -139,7 +141,7 @@ public void onNext(URLItem value) { @Override public void onError(Throwable t) { - assertEquals(io.grpc.Status.NOT_FOUND, io.grpc.Status.fromThrowable(t)); + assertEquals(io.grpc.Status.NOT_FOUND, io.grpc.Status.fromThrowable(t)); LOG.error(t.getMessage()); } @@ -210,4 +212,41 @@ public void onCompleted() { assertEquals(1, count.get()); assertEquals(1, fetched.get()); } + + @Test + void testListURLs() { + + Pagination pagination = Pagination.newBuilder().setCrawlID("crawl_id").build(); + + final AtomicInteger fetched = new AtomicInteger(0); + final AtomicInteger count = new AtomicInteger(0); + + StreamObserver statusObserver = + new StreamObserver<>() { + + @Override + public void onNext(URLItem value) { + // receives confirmation that the value has been received + logURLItem(value); + + if (value.hasKnown()) { + fetched.incrementAndGet(); + } + count.incrementAndGet(); + } + + @Override + public void onError(Throwable t) { + t.printStackTrace(); + } + + @Override + public void onCompleted() { + LOG.info("completed testListURLs"); + } + }; + + memoryFrontierService.listURLs(pagination, statusObserver); + assertEquals(3, count.get()); + } } diff --git a/service/src/test/java/crawlercommons/urlfrontier/service/RocksDBServiceTest.java b/service/src/test/java/crawlercommons/urlfrontier/service/RocksDBServiceTest.java index e1e064f..18e56bd 100644 --- a/service/src/test/java/crawlercommons/urlfrontier/service/RocksDBServiceTest.java +++ b/service/src/test/java/crawlercommons/urlfrontier/service/RocksDBServiceTest.java @@ -1,8 +1,12 @@ +// SPDX-FileCopyrightText: 2020 Crawler-commons +// SPDX-License-Identifier: Apache-2.0 + package crawlercommons.urlfrontier.service; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; +import crawlercommons.urlfrontier.Urlfrontier.Pagination; import crawlercommons.urlfrontier.Urlfrontier.URLItem; import crawlercommons.urlfrontier.Urlfrontier.URLStatusRequest; import crawlercommons.urlfrontier.service.rocksdb.RocksDBService; @@ -239,4 +243,41 @@ public void onCompleted() { assertEquals(1, count.get()); assertEquals(1, fetched.get()); } + + @Test + void testListURLs() { + + Pagination pagination = Pagination.newBuilder().setCrawlID("crawl_id").build(); + + final AtomicInteger fetched = new AtomicInteger(0); + final AtomicInteger count = new AtomicInteger(0); + + StreamObserver statusObserver = + new StreamObserver<>() { + + @Override + public void onNext(URLItem value) { + // receives confirmation that the value has been received + logURLItem(value); + + if (value.hasKnown()) { + fetched.incrementAndGet(); + } + count.incrementAndGet(); + } + + @Override + public void onError(Throwable t) { + t.printStackTrace(); + } + + @Override + public void onCompleted() { + LOG.info("completed testListURLs"); + } + }; + + rocksDBService.listURLs(pagination, statusObserver); + assertEquals(3, count.get()); + } } diff --git a/service/src/test/java/crawlercommons/urlfrontier/service/ServiceTestUtil.java b/service/src/test/java/crawlercommons/urlfrontier/service/ServiceTestUtil.java index 56e747d..13c13a5 100644 --- a/service/src/test/java/crawlercommons/urlfrontier/service/ServiceTestUtil.java +++ b/service/src/test/java/crawlercommons/urlfrontier/service/ServiceTestUtil.java @@ -1,3 +1,6 @@ +// SPDX-FileCopyrightText: 2020 Crawler-commons +// SPDX-License-Identifier: Apache-2.0 + package crawlercommons.urlfrontier.service; import crawlercommons.urlfrontier.Urlfrontier.AckMessage;