diff --git a/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/kvin/partitioned/KvinPartitioned.java b/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/kvin/partitioned/KvinPartitioned.java index 944a6a62..fc248fe2 100644 --- a/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/kvin/partitioned/KvinPartitioned.java +++ b/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/kvin/partitioned/KvinPartitioned.java @@ -128,7 +128,7 @@ private void scheduleCyclicArchival() { public boolean addListener(KvinListener listener) { try { listeners.add(listener); - return true; + return hotStore.addListener(listener); } catch (Exception e) { throw new RuntimeException(e); } @@ -138,7 +138,7 @@ public boolean addListener(KvinListener listener) { public boolean removeListener(KvinListener listener) { try { listeners.remove(listener); - return true; + return hotStore.removeListener(listener); } catch (Exception e) { throw new RuntimeException(e); } @@ -164,6 +164,10 @@ public void createNewHotDataStore() throws IOException { FileUtils.deleteDirectory(this.currentStoreArchivePath); FileUtils.moveDirectory(this.currentStorePath, this.currentStoreArchivePath); hotStore = new KvinLevelDb(currentStorePath); + for (KvinListener listener : listeners) { + // register listeners on new hot store + hotStore.addListener(listener); + } hotStoreArchive = new KvinLevelDb(this.currentStoreArchivePath); } diff --git a/bundles/io.github.linkedfactory.core/src/main/scala/io/github/linkedfactory/core/kvin/leveldb/KvinLevelDb.scala b/bundles/io.github.linkedfactory.core/src/main/scala/io/github/linkedfactory/core/kvin/leveldb/KvinLevelDb.scala index abe4e8a5..d64840b1 100644 --- a/bundles/io.github.linkedfactory.core/src/main/scala/io/github/linkedfactory/core/kvin/leveldb/KvinLevelDb.scala +++ b/bundles/io.github.linkedfactory.core/src/main/scala/io/github/linkedfactory/core/kvin/leveldb/KvinLevelDb.scala @@ -593,6 +593,11 @@ class KvinLevelDb(path: File) extends KvinLevelDbBase with Kvin { } override def put(entries: java.lang.Iterable[KvinTuple]): Unit = { + var notifyTuples = Option.empty[mutable.ArrayBuffer[KvinTuple]] + if (! this.listeners.isEmpty && entries.isInstanceOf[IExtendedIterator[_]]) { + notifyTuples = Some(new mutable.ArrayBuffer[KvinTuple]()) + } + val idsBatch = ids.createWriteBatch() val batch = values.createWriteBatch() activeWrites.incrementAndGet() @@ -614,6 +619,8 @@ class KvinLevelDb(path: File) extends KvinLevelDbBase with Kvin { // remove timed-out entries ttl(entry.item) map (asyncRemoveByTtl(values, prefix, _)) } + // buffer tuples if entries are given via iterator + notifyTuples.foreach(_.addOne(entry)) } var writeIds: Future[_] = null if (idsBatch.size() > 0) { @@ -632,7 +639,7 @@ class KvinLevelDb(path: File) extends KvinLevelDbBase with Kvin { uriToIdCacheWrite.invalidateAll() } } - entries.asScala.foreach { entry => + notifyTuples.getOrElse(entries.asScala).foreach { entry => for (l <- listeners.asScala) l.valueAdded(entry.item, entry.property, entry.context, entry.time, entry.seqNr, entry.value) } }