Skip to content

Commit

Permalink
Merge pull request #93 from klockla/listurl_github
Browse files Browse the repository at this point in the history
Add method ListURLs to list all URLs known in the frontier with their next fetch date
  • Loading branch information
jnioche authored Sep 18, 2024
2 parents 1d99fab + ea9090e commit 247b201
Show file tree
Hide file tree
Showing 14 changed files with 24,068 additions and 19,989 deletions.
3,485 changes: 2,102 additions & 1,383 deletions API/src/main/java/crawlercommons/urlfrontier/URLFrontierGrpc.java

Large diffs are not rendered by default.

39,708 changes: 21,129 additions & 18,579 deletions API/src/main/java/crawlercommons/urlfrontier/Urlfrontier.java

Large diffs are not rendered by default.

18 changes: 18 additions & 0 deletions API/urlfrontier.proto
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ service URLFrontier {
Used to check current status of an URL within the frontier
**/
rpc GetURLStatus(URLStatusRequest) returns (URLItem) {}

/** List all URLs currently in the frontier
This does not take into account URL scheduling.
Used to check current status of all URLs within the frontier
**/
rpc ListURLs(ListUrlParams) returns (stream URLItem) {}
}

/**
Expand Down Expand Up @@ -306,3 +312,15 @@ message URLStatusRequest {
string crawlID = 3;
}

message ListUrlParams {
// position of the first result in the list; defaults to 0
uint32 start = 1;
// max number of values; defaults to 100
uint32 size = 2;
/** ID for the queue **/
string key = 3;
// crawl ID
string crawlID = 4;
// only for the current local instance
bool local = 5;
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
ListNodes.class,
ListQueues.class,
ListCrawls.class,
ListURLs.class,
GetStats.class,
PutURLs.class,
GetURLs.class,
Expand Down
162 changes: 162 additions & 0 deletions client/src/main/java/crawlercommons/urlfrontier/client/ListURLs.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
// SPDX-FileCopyrightText: 2020 Crawler-commons
// SPDX-License-Identifier: Apache-2.0

package crawlercommons.urlfrontier.client;

import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
import com.google.protobuf.util.JsonFormat.Printer;
import crawlercommons.urlfrontier.URLFrontierGrpc;
import crawlercommons.urlfrontier.URLFrontierGrpc.URLFrontierBlockingStub;
import crawlercommons.urlfrontier.Urlfrontier.ListUrlParams.Builder;
import crawlercommons.urlfrontier.Urlfrontier.URLItem;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Iterator;
import picocli.CommandLine.Command;
import picocli.CommandLine.Option;
import picocli.CommandLine.ParentCommand;

@Command(
name = "ListURLs",
description = "Prints out all URLs in the Frontier",
sortOptions = false)
public class ListURLs implements Runnable {

@ParentCommand private Client parent;

@Option(
names = {"-n", "--number_urls"},
defaultValue = "100",
paramLabel = "NUM",
description = "maximum number of URLs to return (default 100)")
private int maxNumURLs;

@Option(
names = {"-s", "--start"},
defaultValue = "0",
paramLabel = "NUM",
description = "starting position of URL to return (default 0)")
private int start;

@Option(
names = {"-k", "--key"},
required = false,
paramLabel = "STRING",
description = "key to use to target a specific queue")
private String key;

@Option(
names = {"-o", "--output"},
defaultValue = "",
paramLabel = "STRING",
description = "output file to dump all the URLs")
private String output;

@Option(
names = {"-c", "--crawlID"},
defaultValue = "DEFAULT",
paramLabel = "STRING",
description = "crawl to get the queues for")
private String crawl;

@Option(
names = {"-l", "--local"},
defaultValue = "false",
paramLabel = "BOOLEAN",
description =
"restricts the scope to this frontier instance instead of aggregating over the cluster")
private Boolean local;

@Option(
names = {"-j", "--json"},
defaultValue = "false",
paramLabel = "BOOLEAN",
description = "Outputs in JSON format")
private Boolean json;

@Option(
names = {"-p", "--parsedate"},
defaultValue = "false",
description = {
"Print the refetch date in local time zone",
"By default, time is UTC seconds since the Unix epoch",
"Ignored if JSON output is selected"
})
private boolean parse;

// Use the system default time zone
private ZoneId zoneId = ZoneId.systemDefault();

@Override
public void run() {

Builder builder = crawlercommons.urlfrontier.Urlfrontier.ListUrlParams.newBuilder();
builder.setLocal(local);
if (key != null) {
builder.setKey(key);
}
builder.setSize(maxNumURLs);
builder.setStart(start);
builder.setCrawlID(crawl);

PrintStream outstream = null;
if (output.length() > 0) {
File f = new File(output);
try {
Files.deleteIfExists(f.toPath());
outstream = new PrintStream(f, Charset.defaultCharset());
} catch (IOException e) {
e.printStackTrace(System.err);
return;
}
} else {
outstream = System.out;
}

Printer jprinter = JsonFormat.printer();

ManagedChannel channel =
ManagedChannelBuilder.forAddress(parent.hostname, parent.port)
.usePlaintext()
.build();
URLFrontierBlockingStub blockingFrontier = URLFrontierGrpc.newBlockingStub(channel);

Iterator<URLItem> it = blockingFrontier.listURLs(builder.build());
while (it.hasNext()) {

URLItem item = it.next();

String fetchDate;
if (parse) {
Instant instant = Instant.ofEpochSecond(item.getKnown().getRefetchableFromDate());
LocalDateTime localDate = instant.atZone(zoneId).toLocalDateTime();
fetchDate = localDate.toString();
} else {
fetchDate = String.valueOf(item.getKnown().getRefetchableFromDate());
}

if (Boolean.TRUE.equals(json)) {
try {
outstream.println(jprinter.print(item));
} catch (InvalidProtocolBufferException e) {
e.printStackTrace(System.err);
break;
}
} else {
outstream.println(item.getKnown().getInfo().getUrl() + ";" + fetchDate);
}
}

outstream.close();
channel.shutdownNow();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import crawlercommons.urlfrontier.Urlfrontier.Boolean;
import crawlercommons.urlfrontier.Urlfrontier.Empty;
import crawlercommons.urlfrontier.Urlfrontier.GetParams;
import crawlercommons.urlfrontier.Urlfrontier.KnownURLItem;
import crawlercommons.urlfrontier.Urlfrontier.Local;
import crawlercommons.urlfrontier.Urlfrontier.LogLevelParams;
import crawlercommons.urlfrontier.Urlfrontier.QueueDelayParams;
Expand Down Expand Up @@ -872,4 +873,95 @@ public void close() throws IOException {
public abstract void getURLStatus(
crawlercommons.urlfrontier.Urlfrontier.URLStatusRequest request,
io.grpc.stub.StreamObserver<URLItem> responseObserver);

public void listURLs(
crawlercommons.urlfrontier.Urlfrontier.ListUrlParams request,
io.grpc.stub.StreamObserver<crawlercommons.urlfrontier.Urlfrontier.URLItem>
responseObserver) {

long maxURLs = request.getSize();
long start = request.getStart();
String key = request.getKey();

final String normalisedCrawlID = CrawlID.normaliseCrawlID(request.getCrawlID());

// 100 by default
if (maxURLs == 0) {
maxURLs = 100;
}

LOG.info(
"Received request to list URLs [size {}; start {}; crawlId {}, key {}]",
maxURLs,
start,
normalisedCrawlID,
key);

long totalCount = -1;
long sentCount = 0;

synchronized (getQueues()) {
Iterator<Entry<QueueWithinCrawl, QueueInterface>> qiterator =
getQueues().entrySet().iterator();

while (qiterator.hasNext() && sentCount < maxURLs) {
Entry<QueueWithinCrawl, QueueInterface> e = qiterator.next();

// check that it is within the right crawlID
if (!e.getKey().getCrawlid().equals(normalisedCrawlID)) {
continue;
}

// check that it is within the right key/queue
if (key != null && !key.isEmpty() && !e.getKey().getQueue().equals(key)) {
continue;
}

Iterator<URLItem> urliter = urlIterator(e);

while (urliter.hasNext()) {
totalCount++;
if (totalCount < start) {
urliter.next();
} else if (sentCount < maxURLs) {
responseObserver.onNext(urliter.next());
sentCount++;
} else {
break;
}
}
}
}

responseObserver.onCompleted();
}

protected Iterator<URLItem> urlIterator(Entry<QueueWithinCrawl, QueueInterface> qentry) {
return urlIterator(qentry, 0L, Long.MAX_VALUE);
}

protected abstract Iterator<URLItem> urlIterator(
Entry<QueueWithinCrawl, QueueInterface> qentry, long start, long max);

/**
* Builds an URLItem for listURLs (used by fetchURLItems, avoids builder instantiation for every
* URL)
*
* @param builder The URLItem builder
* @param kbuilder The KnownURLItem builder
* @param info URLInfo
* @param refetch refetch date from Epoch in seconds
* @return
*/
public static URLItem buildURLItem(
URLItem.Builder builder, KnownURLItem.Builder kbuilder, URLInfo info, long refetch) {
builder.clear();
kbuilder.clear();

kbuilder.setInfo(info);
kbuilder.setRefetchableFromDate(refetch);
builder.setKnown(kbuilder.build());

return builder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import crawlercommons.urlfrontier.Urlfrontier.AckMessage.Status;
import crawlercommons.urlfrontier.Urlfrontier.GetParams;
import crawlercommons.urlfrontier.Urlfrontier.KnownURLItem;
import crawlercommons.urlfrontier.Urlfrontier.ListUrlParams;
import crawlercommons.urlfrontier.Urlfrontier.URLInfo;
import crawlercommons.urlfrontier.Urlfrontier.URLItem;
import crawlercommons.urlfrontier.Urlfrontier.URLStatusRequest;
Expand All @@ -29,6 +30,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -848,4 +850,17 @@ protected Status putURLItem(URLItem value) {
public void getURLStatus(URLStatusRequest request, StreamObserver<URLItem> responseObserver) {
responseObserver.onError(io.grpc.Status.UNIMPLEMENTED.asException());
}

@Override
// TODO Implementation of listURLs for Ignite
public void listURLs(ListUrlParams request, StreamObserver<URLItem> responseObserver) {
responseObserver.onError(io.grpc.Status.UNIMPLEMENTED.asException());
}

@Override
// TODO Implementation of listURLs for Ignite
protected Iterator<URLItem> urlIterator(
java.util.Map.Entry<QueueWithinCrawl, QueueInterface> qentry, long start, long max) {
throw new UnsupportedOperationException("Feature not implemented for Ignite backend");
}
}
Loading

0 comments on commit 247b201

Please sign in to comment.