Skip to content

Commit

Permalink
Keep rocksDB private in RocksDBService
Browse files Browse the repository at this point in the history
Signed-off-by: Laurent Klock <[email protected]>
  • Loading branch information
klockla committed Aug 28, 2024
1 parent 4c5867f commit 35ea444
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import com.google.protobuf.InvalidProtocolBufferException;
import crawlercommons.urlfrontier.CrawlID;
import crawlercommons.urlfrontier.Urlfrontier.AckMessage.Status;
import crawlercommons.urlfrontier.Urlfrontier.DiscoveredURLItem;
import crawlercommons.urlfrontier.Urlfrontier.KnownURLItem;
import crawlercommons.urlfrontier.Urlfrontier.Stats;
import crawlercommons.urlfrontier.Urlfrontier.URLInfo;
Expand Down Expand Up @@ -57,7 +56,7 @@ public class RocksDBService extends AbstractFrontierService {
RocksDB.loadLibrary();
}

protected RocksDB rocksDB;
private RocksDB rocksDB;

// a list which will hold the handles for the column families once the db is
// opened
Expand Down Expand Up @@ -791,10 +790,10 @@ public void getURLStatus(URLStatusRequest request, StreamObserver<URLItem> respo
byte[] schedulingKey = null;

boolean found = false;

URLItem.Builder builder = URLItem.newBuilder();
KnownURLItem.Builder kb = KnownURLItem.newBuilder();

try {
schedulingKey = rocksDB.get(columnFamilyHandleList.get(0), existenceKey);
if (schedulingKey != null) {
Expand All @@ -820,21 +819,23 @@ public void getURLStatus(URLStatusRequest request, StreamObserver<URLItem> respo

URLInfo info = null;
try {
info = URLInfo.parseFrom(
info =
URLInfo.parseFrom(
rocksDB.get(columnFamilyHandleList.get(1), schedulingKey));
kb.setInfo(info);
kb.setRefetchableFromDate(scheduled);
builder.setKnown(kb.build());
} catch (InvalidProtocolBufferException e) {
LOG.error(e.getMessage(), e);
responseObserver.onError(io.grpc.Status.fromThrowable(e).asRuntimeException());
responseObserver.onError(
io.grpc.Status.fromThrowable(e).asRuntimeException());
}

found = true;
}
} else {
// Key is unknown
found = false;
found = false;
}

} catch (RocksDBException e) {
Expand All @@ -844,10 +845,10 @@ public void getURLStatus(URLStatusRequest request, StreamObserver<URLItem> respo
}

if (found) {
responseObserver.onNext(builder.build());
responseObserver.onNext(builder.build());
responseObserver.onCompleted();
} else {
responseObserver.onError(io.grpc.Status.NOT_FOUND.asRuntimeException());
responseObserver.onError(io.grpc.Status.NOT_FOUND.asRuntimeException());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,18 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.google.protobuf.InvalidProtocolBufferException;
import crawlercommons.urlfrontier.Urlfrontier.URLInfo;
import crawlercommons.urlfrontier.Urlfrontier.URLItem;
import crawlercommons.urlfrontier.Urlfrontier.URLStatusRequest;
import crawlercommons.urlfrontier.service.rocksdb.RocksDBService;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.slf4j.LoggerFactory;

class RocksDBServiceTest extends RocksDBService {

public RocksDBServiceTest() {
super("localhost", 7071);
}
class RocksDBServiceTest {

private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(RocksDBServiceTest.class);

Expand All @@ -37,43 +28,10 @@ void shutdown() throws IOException {
@BeforeEach
void setup() {

rocksDBService = this;
rocksDBService = new RocksDBService("localhost", 7071);
ServiceTestUtil.initURLs(rocksDBService);
}

@Test
void readAll() throws InvalidProtocolBufferException, RocksDBException {
final RocksIterator rocksIterator = rocksDB.newIterator(columnFamilyHandleList.get(0));

int count = 0;
int countScheduled = 0;

for (rocksIterator.seekToFirst(); rocksIterator.isValid(); rocksIterator.next()) {
String currentKey = new String(rocksIterator.key(), StandardCharsets.UTF_8);
QueueWithinCrawl Qkey = QueueWithinCrawl.parseAndDeNormalise(currentKey);
LOG.info("Qkey crawlId={} queue={}", Qkey.getCrawlid(), Qkey.getQueue());

LOG.info("current key {}", currentKey);
byte[] schedulingKey = rocksIterator.value();
LOG.info("scheduling key {}", new String(schedulingKey, StandardCharsets.UTF_8));
byte[] scheduled = rocksDB.get(columnFamilyHandleList.get(1), schedulingKey);
if (scheduled != null) {
URLInfo info = URLInfo.parseFrom(scheduled);
LOG.info("current value {}", info);
countScheduled++;
} else {
LOG.info("no schedule for {}", currentKey);
}

count++;
}

rocksIterator.close();

assertEquals(3, count);
assertEquals(2, countScheduled);
}

@Test
void testDiscovered() {
String crawlId = "crawl_id";
Expand Down Expand Up @@ -115,7 +73,7 @@ public void onCompleted() {
}
};

this.getURLStatus(request, statusObserver);
rocksDBService.getURLStatus(request, statusObserver);

assertEquals(1, count.get());
assertEquals(1, discovered.get());
Expand Down Expand Up @@ -163,7 +121,7 @@ public void onCompleted() {
}
};

this.getURLStatus(request, statusObserver);
rocksDBService.getURLStatus(request, statusObserver);

assertEquals(1, count.get());
assertEquals(1, fetched.get());
Expand All @@ -180,7 +138,7 @@ void testNotFound() {

final AtomicInteger count = new AtomicInteger(0);
final AtomicInteger notfound = new AtomicInteger(0);

StreamObserver<URLItem> statusObserver =
new StreamObserver<>() {

Expand All @@ -193,9 +151,9 @@ public void onNext(URLItem value) {

@Override
public void onError(Throwable t) {
assertEquals(io.grpc.Status.NOT_FOUND, io.grpc.Status.fromThrowable(t));
LOG.error(t.getMessage());
notfound.incrementAndGet();
assertEquals(io.grpc.Status.NOT_FOUND, io.grpc.Status.fromThrowable(t));
LOG.error(t.getMessage());
notfound.incrementAndGet();
}

@Override
Expand All @@ -204,7 +162,7 @@ public void onCompleted() {
}
};

this.getURLStatus(request, statusObserver);
rocksDBService.getURLStatus(request, statusObserver);

assertEquals(0, count.get());
assertEquals(1, notfound.get());
Expand Down Expand Up @@ -244,7 +202,7 @@ public void onNext(URLItem value) {
// Internally, MemoryFrontierService does not make a distinction
// between discovered and known which have to be re-fetched
if (value.hasKnown()) {
assertTrue(value.getKnown().getRefetchableFromDate() > 0);
assertTrue(value.getKnown().getRefetchableFromDate() > 0);
fetched.incrementAndGet();
}
count.incrementAndGet();
Expand All @@ -261,7 +219,7 @@ public void onCompleted() {
}
};

this.getURLStatus(request, statusObserver);
rocksDBService.getURLStatus(request, statusObserver);

assertEquals(1, count.get());
assertEquals(1, fetched.get());
Expand Down

0 comments on commit 35ea444

Please sign in to comment.