toDelete = new HashSet<>();
-
- synchronized (getQueues()) {
-
- // find the crawlIDs
- QueueWithinCrawl[] array = getQueues().keySet().toArray(new QueueWithinCrawl[0]);
- Arrays.sort(array);
-
- for (QueueWithinCrawl prefixed_queue : array) {
- boolean samePrefix = prefixed_queue.getCrawlid().equals(normalisedCrawlID);
- if (samePrefix) {
- toDelete.add(prefixed_queue);
- }
- }
-
- ignite.destroyCache(URLCacheNamePrefix + normalisedCrawlID);
-
- for (QueueWithinCrawl quid : toDelete) {
- if (queuesBeingDeleted.contains(quid)) {
- continue;
- } else {
- queuesBeingDeleted.put(quid, quid);
- }
-
- QueueInterface q = getQueues().remove(quid);
- total += q.countActive();
- total += q.getCountCompleted();
-
- // remove at the global level
- globalQueueCache.remove(quid.toString());
-
- queuesBeingDeleted.remove(quid);
- }
- }
- responseObserver.onNext(
- crawlercommons.urlfrontier.Urlfrontier.Long.newBuilder().setValue(total).build());
- responseObserver.onCompleted();
- }
-
- /**
- *
- *
- *
- * * Delete the queue based on the key in parameter *
- *
- */
- @Override
- public int deleteLocalQueue(final QueueWithinCrawl qc) {
-
- int sizeQueue = 0;
-
- // if the queue is unknown or already being deleted
- if (!getQueues().containsKey(qc) || queuesBeingDeleted.contains(qc)) {
- return sizeQueue;
- }
-
- queuesBeingDeleted.put(qc, qc);
-
- IgniteCache URLCache = createOrGetCacheForCrawlID(qc.getCrawlid());
-
- // slow version - scan the whole cache
- IgniteBiPredicate filter = (key, p) -> key.crawlQueueID.equals(qc.toString());
-
- try (QueryCursor> cur =
- URLCache.query(new ScanQuery(filter).setLocal(true))) {
- for (Entry entry : cur) {
- URLCache.remove(entry.getKey());
- }
- }
-
- QueueInterface q = getQueues().remove(qc);
- sizeQueue += q.countActive();
- sizeQueue += q.getCountCompleted();
-
- queuesBeingDeleted.remove(qc);
-
- // remove at the global level
- globalQueueCache.remove(qc.toString());
-
- return sizeQueue;
- }
-
- @Override
- protected long deleteLocalCrawl(String normalisedCrawlID) {
- final Set toDelete = new HashSet<>();
-
- long total = 0;
-
- synchronized (getQueues()) {
- // find the crawlIDs
- QueueWithinCrawl[] array = getQueues().keySet().toArray(new QueueWithinCrawl[0]);
- Arrays.sort(array);
-
- for (QueueWithinCrawl prefixed_queue : array) {
- boolean samePrefix = prefixed_queue.getCrawlid().equals(normalisedCrawlID);
- if (samePrefix) {
- toDelete.add(prefixed_queue);
- }
- }
-
- ignite.destroyCache(URLCacheNamePrefix + normalisedCrawlID);
-
- for (QueueWithinCrawl quid : toDelete) {
- if (queuesBeingDeleted.contains(quid)) {
- continue;
- } else {
- queuesBeingDeleted.put(quid, quid);
- }
-
- QueueInterface q = getQueues().remove(quid);
- total += q.countActive();
- total += q.getCountCompleted();
-
- // remove at the global level
- globalQueueCache.remove(quid.toString());
-
- queuesBeingDeleted.remove(quid);
- }
- }
- return total;
- }
-
- @Override
- public void getURLs(GetParams request, StreamObserver responseObserver) {
- try {
- searcherManager.maybeRefresh();
- } catch (IOException e) {
- LOG.error("Exception when calling maybeRefresh in getURLs", e);
- }
- super.getURLs(request, responseObserver);
- }
-
- @Override
- protected Status putURLItem(URLItem value) {
-
- long nextFetchDate;
- boolean discovered = true;
- URLInfo info;
-
- putURLs_urls_count.inc();
-
- if (value.hasDiscovered()) {
- putURLs_discovered_count.labels("true").inc();
- info = value.getDiscovered().getInfo();
- nextFetchDate = Instant.now().getEpochSecond();
- } else {
- putURLs_discovered_count.labels("false").inc();
- KnownURLItem known = value.getKnown();
- info = known.getInfo();
- nextFetchDate = known.getRefetchableFromDate();
- discovered = Boolean.FALSE;
- }
-
- String Qkey = info.getKey();
- String url = info.getUrl();
- String crawlID = CrawlID.normaliseCrawlID(info.getCrawlID());
-
- // has a queue key been defined? if not use the hostname
- if (Qkey.equals("")) {
- LOG.debug("key missing for {}", url);
- Qkey = provideMissingKey(url);
- if (Qkey == null) {
- LOG.error("Malformed URL {}", url);
- return AckMessage.Status.SKIPPED;
- }
- // make a new info object ready to return
- info = URLInfo.newBuilder(info).setKey(Qkey).setCrawlID(crawlID).build();
- }
-
- // check that the key is not too long
- if (Qkey.length() > 255) {
- LOG.error("Key too long: {}", Qkey);
- return AckMessage.Status.SKIPPED;
- }
-
- QueueWithinCrawl qk = QueueWithinCrawl.get(Qkey, crawlID);
-
- // ignore this url if the queue is being deleted
- if (queuesBeingDeleted.containsKey(qk)) {
- LOG.info("Not adding {} as its queue {} is being deleted", url, Qkey);
- return AckMessage.Status.SKIPPED;
- }
-
- IgniteCache _cache = createOrGetCacheForCrawlID(crawlID);
-
- final String existenceKeyString = (qk.toString() + "_" + url).intern();
-
- synchronized (existenceKeyString) {
- final Key key = new Key(qk.toString(), url);
-
- // is this URL already known?
- boolean known = _cache.containsKey(key);
-
- // already known? ignore if discovered
- if (known && discovered) {
- putURLs_alreadyknown_count.inc();
- return AckMessage.Status.SKIPPED;
- }
-
- // get the priority queue - if it is a local one
- // or create a dummy one
- // but do not create it in the queues unless we are in a non distributed
- // environment
- QueueMetadata queueMD = null;
-
- if (clusterMode) {
- queueMD = (QueueMetadata) getQueues().getOrDefault(qk, new QueueMetadata());
- } else {
- queueMD = (QueueMetadata) getQueues().computeIfAbsent(qk, s -> new QueueMetadata());
- }
-
- // but make sure it exists globally anyway
- globalQueueCache.putIfAbsent(qk.toString(), qk.toString());
-
- Payload newpayload = new Payload(nextFetchDate, info.toByteArray());
-
- _cache.put(key, newpayload);
-
- // known - remove from queues
- // its key in the queues was stored in the default cf
- if (known) {
- // remove from queue metadata
- queueMD.removeFromProcessed(url);
- queueMD.decrementActive();
- }
-
- // add the new item
- // unless it is an update and it's nextFetchDate is 0 == NEVER
- if (!discovered && nextFetchDate == 0) {
- queueMD.incrementCompleted();
- putURLs_completed_count.inc();
- } else {
- // it is either brand new or already known
- queueMD.incrementActive();
- }
- }
-
- return AckMessage.Status.OK;
- }
-
- @Override
- // TODO Implementation of getURLStatus for Ignite
- public void getURLStatus(URLStatusRequest request, StreamObserver responseObserver) {
- responseObserver.onError(io.grpc.Status.UNIMPLEMENTED.asException());
- }
-
- @Override
- // TODO Implementation of listURLs for Ignite
- public void listURLs(ListUrlParams request, StreamObserver responseObserver) {
- responseObserver.onError(io.grpc.Status.UNIMPLEMENTED.asException());
- }
-
- @Override
- // TODO Implementation of listURLs for Ignite
- protected Iterator urlIterator(
- java.util.Map.Entry qentry, long start, long max) {
- throw new UnsupportedOperationException("Feature not implemented for Ignite backend");
- }
-}
diff --git a/service/src/main/java/crawlercommons/urlfrontier/service/ignite/Key.java b/service/src/main/java/crawlercommons/urlfrontier/service/ignite/Key.java
deleted file mode 100644
index 2951c96..0000000
--- a/service/src/main/java/crawlercommons/urlfrontier/service/ignite/Key.java
+++ /dev/null
@@ -1,24 +0,0 @@
-// SPDX-FileCopyrightText: 2020 Crawler-commons
-// SPDX-License-Identifier: Apache-2.0
-
-package crawlercommons.urlfrontier.service.ignite;
-
-import java.io.Serializable;
-import org.apache.ignite.cache.affinity.AffinityKeyMapped;
-
-class Key implements Serializable {
-
- @AffinityKeyMapped String crawlQueueID;
-
- String URL;
-
- Key(String crawlQueueID, String uRL) {
- super();
- this.crawlQueueID = crawlQueueID;
- URL = uRL;
- }
-
- public String toString() {
- return this.crawlQueueID + "_" + this.URL;
- }
-}
diff --git a/service/src/main/java/crawlercommons/urlfrontier/service/ignite/Payload.java b/service/src/main/java/crawlercommons/urlfrontier/service/ignite/Payload.java
deleted file mode 100644
index 8956a34..0000000
--- a/service/src/main/java/crawlercommons/urlfrontier/service/ignite/Payload.java
+++ /dev/null
@@ -1,16 +0,0 @@
-// SPDX-FileCopyrightText: 2020 Crawler-commons
-// SPDX-License-Identifier: Apache-2.0
-
-package crawlercommons.urlfrontier.service.ignite;
-
-import java.io.Serializable;
-
-class Payload implements Serializable {
- long nextFetchDate;
- byte[] payload;
-
- public Payload(long nextFetchDate, byte[] bs) {
- this.nextFetchDate = nextFetchDate;
- this.payload = bs;
- }
-}