diff --git a/service/src/main/java/crawlercommons/urlfrontier/service/rocksdb/RocksDBService.java b/service/src/main/java/crawlercommons/urlfrontier/service/rocksdb/RocksDBService.java index cce3ebb..768dc46 100644 --- a/service/src/main/java/crawlercommons/urlfrontier/service/rocksdb/RocksDBService.java +++ b/service/src/main/java/crawlercommons/urlfrontier/service/rocksdb/RocksDBService.java @@ -6,7 +6,6 @@ import com.google.protobuf.InvalidProtocolBufferException; import crawlercommons.urlfrontier.CrawlID; import crawlercommons.urlfrontier.Urlfrontier.AckMessage.Status; -import crawlercommons.urlfrontier.Urlfrontier.DiscoveredURLItem; import crawlercommons.urlfrontier.Urlfrontier.KnownURLItem; import crawlercommons.urlfrontier.Urlfrontier.Stats; import crawlercommons.urlfrontier.Urlfrontier.URLInfo; @@ -57,7 +56,7 @@ public class RocksDBService extends AbstractFrontierService { RocksDB.loadLibrary(); } - protected RocksDB rocksDB; + private RocksDB rocksDB; // a list which will hold the handles for the column families once the db is // opened @@ -791,10 +790,10 @@ public void getURLStatus(URLStatusRequest request, StreamObserver respo byte[] schedulingKey = null; boolean found = false; - + URLItem.Builder builder = URLItem.newBuilder(); KnownURLItem.Builder kb = KnownURLItem.newBuilder(); - + try { schedulingKey = rocksDB.get(columnFamilyHandleList.get(0), existenceKey); if (schedulingKey != null) { @@ -820,21 +819,23 @@ public void getURLStatus(URLStatusRequest request, StreamObserver respo URLInfo info = null; try { - info = URLInfo.parseFrom( + info = + URLInfo.parseFrom( rocksDB.get(columnFamilyHandleList.get(1), schedulingKey)); kb.setInfo(info); kb.setRefetchableFromDate(scheduled); builder.setKnown(kb.build()); } catch (InvalidProtocolBufferException e) { LOG.error(e.getMessage(), e); - responseObserver.onError(io.grpc.Status.fromThrowable(e).asRuntimeException()); + responseObserver.onError( + io.grpc.Status.fromThrowable(e).asRuntimeException()); } - + found = true; } } else { // Key is unknown - found = false; + found = false; } } catch (RocksDBException e) { @@ -844,10 +845,10 @@ public void getURLStatus(URLStatusRequest request, StreamObserver respo } if (found) { - responseObserver.onNext(builder.build()); + responseObserver.onNext(builder.build()); responseObserver.onCompleted(); } else { - responseObserver.onError(io.grpc.Status.NOT_FOUND.asRuntimeException()); + responseObserver.onError(io.grpc.Status.NOT_FOUND.asRuntimeException()); } } } diff --git a/service/src/test/java/crawlercommons/urlfrontier/service/RocksDBServiceTest.java b/service/src/test/java/crawlercommons/urlfrontier/service/RocksDBServiceTest.java index 76cf81d..b108296 100644 --- a/service/src/test/java/crawlercommons/urlfrontier/service/RocksDBServiceTest.java +++ b/service/src/test/java/crawlercommons/urlfrontier/service/RocksDBServiceTest.java @@ -3,27 +3,18 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; -import com.google.protobuf.InvalidProtocolBufferException; -import crawlercommons.urlfrontier.Urlfrontier.URLInfo; import crawlercommons.urlfrontier.Urlfrontier.URLItem; import crawlercommons.urlfrontier.Urlfrontier.URLStatusRequest; import crawlercommons.urlfrontier.service.rocksdb.RocksDBService; import io.grpc.stub.StreamObserver; import java.io.IOException; -import java.nio.charset.StandardCharsets; import java.util.concurrent.atomic.AtomicInteger; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.rocksdb.RocksDBException; -import org.rocksdb.RocksIterator; import org.slf4j.LoggerFactory; -class RocksDBServiceTest extends RocksDBService { - - public RocksDBServiceTest() { - super("localhost", 7071); - } +class RocksDBServiceTest { private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(RocksDBServiceTest.class); @@ -37,43 +28,10 @@ void shutdown() throws IOException { @BeforeEach void setup() { - rocksDBService = this; + rocksDBService = new RocksDBService("localhost", 7071); ServiceTestUtil.initURLs(rocksDBService); } - @Test - void readAll() throws InvalidProtocolBufferException, RocksDBException { - final RocksIterator rocksIterator = rocksDB.newIterator(columnFamilyHandleList.get(0)); - - int count = 0; - int countScheduled = 0; - - for (rocksIterator.seekToFirst(); rocksIterator.isValid(); rocksIterator.next()) { - String currentKey = new String(rocksIterator.key(), StandardCharsets.UTF_8); - QueueWithinCrawl Qkey = QueueWithinCrawl.parseAndDeNormalise(currentKey); - LOG.info("Qkey crawlId={} queue={}", Qkey.getCrawlid(), Qkey.getQueue()); - - LOG.info("current key {}", currentKey); - byte[] schedulingKey = rocksIterator.value(); - LOG.info("scheduling key {}", new String(schedulingKey, StandardCharsets.UTF_8)); - byte[] scheduled = rocksDB.get(columnFamilyHandleList.get(1), schedulingKey); - if (scheduled != null) { - URLInfo info = URLInfo.parseFrom(scheduled); - LOG.info("current value {}", info); - countScheduled++; - } else { - LOG.info("no schedule for {}", currentKey); - } - - count++; - } - - rocksIterator.close(); - - assertEquals(3, count); - assertEquals(2, countScheduled); - } - @Test void testDiscovered() { String crawlId = "crawl_id"; @@ -115,7 +73,7 @@ public void onCompleted() { } }; - this.getURLStatus(request, statusObserver); + rocksDBService.getURLStatus(request, statusObserver); assertEquals(1, count.get()); assertEquals(1, discovered.get()); @@ -163,7 +121,7 @@ public void onCompleted() { } }; - this.getURLStatus(request, statusObserver); + rocksDBService.getURLStatus(request, statusObserver); assertEquals(1, count.get()); assertEquals(1, fetched.get()); @@ -180,7 +138,7 @@ void testNotFound() { final AtomicInteger count = new AtomicInteger(0); final AtomicInteger notfound = new AtomicInteger(0); - + StreamObserver statusObserver = new StreamObserver<>() { @@ -193,9 +151,9 @@ public void onNext(URLItem value) { @Override public void onError(Throwable t) { - assertEquals(io.grpc.Status.NOT_FOUND, io.grpc.Status.fromThrowable(t)); - LOG.error(t.getMessage()); - notfound.incrementAndGet(); + assertEquals(io.grpc.Status.NOT_FOUND, io.grpc.Status.fromThrowable(t)); + LOG.error(t.getMessage()); + notfound.incrementAndGet(); } @Override @@ -204,7 +162,7 @@ public void onCompleted() { } }; - this.getURLStatus(request, statusObserver); + rocksDBService.getURLStatus(request, statusObserver); assertEquals(0, count.get()); assertEquals(1, notfound.get()); @@ -244,7 +202,7 @@ public void onNext(URLItem value) { // Internally, MemoryFrontierService does not make a distinction // between discovered and known which have to be re-fetched if (value.hasKnown()) { - assertTrue(value.getKnown().getRefetchableFromDate() > 0); + assertTrue(value.getKnown().getRefetchableFromDate() > 0); fetched.incrementAndGet(); } count.incrementAndGet(); @@ -261,7 +219,7 @@ public void onCompleted() { } }; - this.getURLStatus(request, statusObserver); + rocksDBService.getURLStatus(request, statusObserver); assertEquals(1, count.get()); assertEquals(1, fetched.get());