Skip to content

Commit

Permalink
Added method ListURLs
Browse files Browse the repository at this point in the history
Signed-off-by: Laurent Klock <[email protected]>
  • Loading branch information
klockla committed Sep 4, 2024
1 parent 08f09c3 commit b314367
Show file tree
Hide file tree
Showing 14 changed files with 546 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,37 @@ crawlercommons.urlfrontier.Urlfrontier.URLItem> getGetURLStatusMethod() {
return getGetURLStatusMethod;
}

private static volatile io.grpc.MethodDescriptor<crawlercommons.urlfrontier.Urlfrontier.Pagination,
crawlercommons.urlfrontier.Urlfrontier.URLItem> getListURLsMethod;

@io.grpc.stub.annotations.RpcMethod(
fullMethodName = SERVICE_NAME + '/' + "ListURLs",
requestType = crawlercommons.urlfrontier.Urlfrontier.Pagination.class,
responseType = crawlercommons.urlfrontier.Urlfrontier.URLItem.class,
methodType = io.grpc.MethodDescriptor.MethodType.SERVER_STREAMING)
public static io.grpc.MethodDescriptor<crawlercommons.urlfrontier.Urlfrontier.Pagination,
crawlercommons.urlfrontier.Urlfrontier.URLItem> getListURLsMethod() {
io.grpc.MethodDescriptor<crawlercommons.urlfrontier.Urlfrontier.Pagination, crawlercommons.urlfrontier.Urlfrontier.URLItem> getListURLsMethod;
if ((getListURLsMethod = URLFrontierGrpc.getListURLsMethod) == null) {
synchronized (URLFrontierGrpc.class) {
if ((getListURLsMethod = URLFrontierGrpc.getListURLsMethod) == null) {
URLFrontierGrpc.getListURLsMethod = getListURLsMethod =
io.grpc.MethodDescriptor.<crawlercommons.urlfrontier.Urlfrontier.Pagination, crawlercommons.urlfrontier.Urlfrontier.URLItem>newBuilder()
.setType(io.grpc.MethodDescriptor.MethodType.SERVER_STREAMING)
.setFullMethodName(generateFullMethodName(SERVICE_NAME, "ListURLs"))
.setSampledToLocalTracing(true)
.setRequestMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(
crawlercommons.urlfrontier.Urlfrontier.Pagination.getDefaultInstance()))
.setResponseMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(
crawlercommons.urlfrontier.Urlfrontier.URLItem.getDefaultInstance()))
.setSchemaDescriptor(new URLFrontierMethodDescriptorSupplier("ListURLs"))
.build();
}
}
}
return getListURLsMethod;
}

/**
* Creates a new async stub that supports all call types for the service
*/
Expand Down Expand Up @@ -686,6 +717,18 @@ public void getURLStatus(crawlercommons.urlfrontier.Urlfrontier.URLStatusRequest
io.grpc.stub.ServerCalls.asyncUnimplementedUnaryCall(getGetURLStatusMethod(), responseObserver);
}

/**
* <pre>
** List all URLs currently in the frontier
*This does not take into account URL scheduling.
*Used to check current status of all URLs within the frontier
* </pre>
*/
public void listURLs(crawlercommons.urlfrontier.Urlfrontier.Pagination request,
io.grpc.stub.StreamObserver<crawlercommons.urlfrontier.Urlfrontier.URLItem> responseObserver) {
io.grpc.stub.ServerCalls.asyncUnimplementedUnaryCall(getListURLsMethod(), responseObserver);
}

@java.lang.Override public final io.grpc.ServerServiceDefinition bindService() {
return io.grpc.ServerServiceDefinition.builder(getServiceDescriptor())
.addMethod(
Expand Down Expand Up @@ -793,6 +836,13 @@ public void getURLStatus(crawlercommons.urlfrontier.Urlfrontier.URLStatusRequest
crawlercommons.urlfrontier.Urlfrontier.URLStatusRequest,
crawlercommons.urlfrontier.Urlfrontier.URLItem>(
this, METHODID_GET_URLSTATUS)))
.addMethod(
getListURLsMethod(),
io.grpc.stub.ServerCalls.asyncServerStreamingCall(
new MethodHandlers<
crawlercommons.urlfrontier.Urlfrontier.Pagination,
crawlercommons.urlfrontier.Urlfrontier.URLItem>(
this, METHODID_LIST_URLS)))
.build();
}
}
Expand Down Expand Up @@ -983,6 +1033,19 @@ public void getURLStatus(crawlercommons.urlfrontier.Urlfrontier.URLStatusRequest
io.grpc.stub.ClientCalls.asyncUnaryCall(
getChannel().newCall(getGetURLStatusMethod(), getCallOptions()), request, responseObserver);
}

/**
* <pre>
** List all URLs currently in the frontier
*This does not take into account URL scheduling.
*Used to check current status of all URLs within the frontier
* </pre>
*/
public void listURLs(crawlercommons.urlfrontier.Urlfrontier.Pagination request,
io.grpc.stub.StreamObserver<crawlercommons.urlfrontier.Urlfrontier.URLItem> responseObserver) {
io.grpc.stub.ClientCalls.asyncServerStreamingCall(
getChannel().newCall(getListURLsMethod(), getCallOptions()), request, responseObserver);
}
}

/**
Expand Down Expand Up @@ -1147,6 +1210,19 @@ public crawlercommons.urlfrontier.Urlfrontier.URLItem getURLStatus(crawlercommon
return io.grpc.stub.ClientCalls.blockingUnaryCall(
getChannel(), getGetURLStatusMethod(), getCallOptions(), request);
}

/**
* <pre>
** List all URLs currently in the frontier
*This does not take into account URL scheduling.
*Used to check current status of all URLs within the frontier
* </pre>
*/
public java.util.Iterator<crawlercommons.urlfrontier.Urlfrontier.URLItem> listURLs(
crawlercommons.urlfrontier.Urlfrontier.Pagination request) {
return io.grpc.stub.ClientCalls.blockingServerStreamingCall(
getChannel(), getListURLsMethod(), getCallOptions(), request);
}
}

/**
Expand Down Expand Up @@ -1329,7 +1405,8 @@ public com.google.common.util.concurrent.ListenableFuture<crawlercommons.urlfron
private static final int METHODID_SET_LOG_LEVEL = 11;
private static final int METHODID_SET_CRAWL_LIMIT = 12;
private static final int METHODID_GET_URLSTATUS = 13;
private static final int METHODID_PUT_URLS = 14;
private static final int METHODID_LIST_URLS = 14;
private static final int METHODID_PUT_URLS = 15;

private static final class MethodHandlers<Req, Resp> implements
io.grpc.stub.ServerCalls.UnaryMethod<Req, Resp>,
Expand Down Expand Up @@ -1404,6 +1481,10 @@ public void invoke(Req request, io.grpc.stub.StreamObserver<Resp> responseObserv
serviceImpl.getURLStatus((crawlercommons.urlfrontier.Urlfrontier.URLStatusRequest) request,
(io.grpc.stub.StreamObserver<crawlercommons.urlfrontier.Urlfrontier.URLItem>) responseObserver);
break;
case METHODID_LIST_URLS:
serviceImpl.listURLs((crawlercommons.urlfrontier.Urlfrontier.Pagination) request,
(io.grpc.stub.StreamObserver<crawlercommons.urlfrontier.Urlfrontier.URLItem>) responseObserver);
break;
default:
throw new AssertionError();
}
Expand Down Expand Up @@ -1483,6 +1564,7 @@ public static io.grpc.ServiceDescriptor getServiceDescriptor() {
.addMethod(getSetLogLevelMethod())
.addMethod(getSetCrawlLimitMethod())
.addMethod(getGetURLStatusMethod())
.addMethod(getListURLsMethod())
.build();
}
}
Expand Down
11 changes: 6 additions & 5 deletions API/src/main/java/crawlercommons/urlfrontier/Urlfrontier.java

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 7 additions & 1 deletion API/urlfrontier.proto
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ service URLFrontier {
Used to check current status of an URL within the frontier
**/
rpc GetURLStatus(URLStatusRequest) returns (URLItem) {}

/** List all URLs currently in the frontier
This does not take into account URL scheduling.
Used to check current status of all URLs within the frontier
**/
rpc ListURLs(Pagination) returns (stream URLItem) {}
}

/**
Expand Down Expand Up @@ -186,7 +192,7 @@ message BlockQueueParams {
bool local = 4;
}

/** Parameter message for GetURLs **/
/** Parameter message for GetURLs and ListURLs **/
message GetParams {
// maximum number of URLs per queue, the default value of 0 means no limit
uint32 max_urls_per_queue = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
ListNodes.class,
ListQueues.class,
ListCrawls.class,
ListURLs.class,
GetStats.class,
PutURLs.class,
GetURLs.class,
Expand Down
130 changes: 130 additions & 0 deletions client/src/main/java/crawlercommons/urlfrontier/client/ListURLs.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
// SPDX-FileCopyrightText: 2020 Crawler-commons
// SPDX-License-Identifier: Apache-2.0

package crawlercommons.urlfrontier.client;

import crawlercommons.urlfrontier.URLFrontierGrpc;
import crawlercommons.urlfrontier.URLFrontierGrpc.URLFrontierBlockingStub;
import crawlercommons.urlfrontier.Urlfrontier.Pagination.Builder;
import crawlercommons.urlfrontier.Urlfrontier.URLItem;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.nio.charset.Charset;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Iterator;
import picocli.CommandLine.Command;
import picocli.CommandLine.Option;
import picocli.CommandLine.ParentCommand;

@Command(
name = "ListURLs",
description = "Prints out all URLs in the Frontier",
sortOptions = false)
public class ListURLs implements Runnable {

@ParentCommand private Client parent;

@Option(
names = {"-n", "--number_urls"},
defaultValue = "100",
paramLabel = "NUM",
description = "maximum number of URLs to return (default 100)")
private int maxNumURLs;

@Option(
names = {"-s", "--start"},
defaultValue = "0",
paramLabel = "NUM",
description = "starting position of URL to return (default 0)")
private int start;

@Option(
names = {"-o", "--output"},
defaultValue = "",
paramLabel = "STRING",
description = "output file to dump all the URLs")
private String output;

@Option(
names = {"-c", "--crawlID"},
defaultValue = "DEFAULT",
paramLabel = "STRING",
description = "crawl to get the queues for")
private String crawl;

@Option(
names = {"-l", "--local"},
defaultValue = "false",
paramLabel = "BOOLEAN",
description =
"restricts the scope to this frontier instance instead of aggregating over the cluster")
private Boolean local;

@Option(
names = {"-p", "--parsedate"},
defaultValue = "false",
description = {
"Print the refetch date in local time zone",
"By default, time is in UTC seconds since the Unix epoch"
})
private boolean parse;

// Use the system default time zone
private ZoneId zoneId = ZoneId.systemDefault();

@Override
public void run() {

Builder builder = crawlercommons.urlfrontier.Urlfrontier.Pagination.newBuilder();
builder.setLocal(local);
builder.setSize(maxNumURLs);
builder.setStart(start);
builder.setIncludeInactive(true);
builder.setCrawlID(crawl);

PrintStream outstream = null;
if (output.length() > 0) {
File f = new File(output);
f.delete();
try {
outstream = new PrintStream(f, Charset.defaultCharset());
} catch (IOException e) {
e.printStackTrace(System.err);
return;
}
} else {
outstream = System.out;
}

ManagedChannel channel =
ManagedChannelBuilder.forAddress(parent.hostname, parent.port)
.usePlaintext()
.build();
URLFrontierBlockingStub blockingFrontier = URLFrontierGrpc.newBlockingStub(channel);

Iterator<URLItem> it = blockingFrontier.listURLs(builder.build());
while (it.hasNext()) {

URLItem item = it.next();

String fetchDate;
if (parse) {
Instant instant = Instant.ofEpochSecond(item.getKnown().getRefetchableFromDate());
LocalDateTime localDate = instant.atZone(zoneId).toLocalDateTime();
fetchDate = localDate.toString();
} else {
fetchDate = String.valueOf(item.getKnown().getRefetchableFromDate());
}

outstream.println(item.getKnown().getInfo().getUrl() + ";" + fetchDate);
}

outstream.close();
channel.shutdownNow();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -866,4 +866,9 @@ public void close() throws IOException {
public abstract void getURLStatus(
crawlercommons.urlfrontier.Urlfrontier.URLStatusRequest request,
io.grpc.stub.StreamObserver<URLItem> responseObserver);

public abstract void listURLs(
crawlercommons.urlfrontier.Urlfrontier.Pagination request,
io.grpc.stub.StreamObserver<crawlercommons.urlfrontier.Urlfrontier.URLItem>
responseObserver);
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import crawlercommons.urlfrontier.Urlfrontier.AckMessage.Status;
import crawlercommons.urlfrontier.Urlfrontier.GetParams;
import crawlercommons.urlfrontier.Urlfrontier.KnownURLItem;
import crawlercommons.urlfrontier.Urlfrontier.Pagination;
import crawlercommons.urlfrontier.Urlfrontier.URLInfo;
import crawlercommons.urlfrontier.Urlfrontier.URLItem;
import crawlercommons.urlfrontier.Urlfrontier.URLStatusRequest;
Expand Down Expand Up @@ -848,4 +849,10 @@ protected Status putURLItem(URLItem value) {
public void getURLStatus(URLStatusRequest request, StreamObserver<URLItem> responseObserver) {
responseObserver.onError(io.grpc.Status.UNIMPLEMENTED.asException());
}

@Override
// TODO Implementation of listURLs for Ignite
public void listURLs(Pagination request, StreamObserver<URLItem> responseObserver) {
responseObserver.onError(io.grpc.Status.UNIMPLEMENTED.asException());
}
}
Loading

0 comments on commit b314367

Please sign in to comment.