diff --git a/waku/waku_archive/driver/builder.nim b/waku/waku_archive/driver/builder.nim index 1825477b5f..ddc447685f 100644 --- a/waku/waku_archive/driver/builder.nim +++ b/waku/waku_archive/driver/builder.nim @@ -102,6 +102,8 @@ proc new*( ## Hence, this should be run after the migration is completed. asyncSpawn driver.startPartitionFactory(onFatalErrorAction) + driver.startAnalyzeTableLoop() + info "waiting for a partition to be created" for i in 0 ..< 100: if driver.containsAnyPartition(): diff --git a/waku/waku_archive/driver/postgres_driver/postgres_driver.nim b/waku/waku_archive/driver/postgres_driver/postgres_driver.nim index e76806ae2a..c15941b213 100644 --- a/waku/waku_archive/driver/postgres_driver/postgres_driver.nim +++ b/waku/waku_archive/driver/postgres_driver/postgres_driver.nim @@ -25,6 +25,8 @@ type PostgresDriver* = ref object of ArchiveDriver partitionMngr: PartitionManager futLoopPartitionFactory: Future[void] + futLoopAnalyzeTable: Future[void] + const InsertRowStmtName = "InsertRow" const InsertRowStmtDefinition = """INSERT INTO messages (id, messageHash, pubsubTopic, contentTopic, payload, @@ -997,6 +999,10 @@ method close*(s: PostgresDriver): Future[ArchiveDriverResult[void]] {.async.} = ## Cancel the partition factory loop s.futLoopPartitionFactory.cancelSoon() + ## Cancel analyze table loop + if not s.futLoopAnalyzeTable.isNil(): + s.futLoopAnalyzeTable.cancelSoon() + ## Close the database connection let writeCloseRes = await s.writeConnPool.close() let readCloseRes = await s.readConnPool.close() @@ -1030,6 +1036,7 @@ proc sleep*( return ok() +const EXPECTED_LOCK_ERROR* = "another waku instance is currently executing a migration" proc acquireDatabaseLock*( s: PostgresDriver, lockId: int = 841886 ): Future[ArchiveDriverResult[void]] {.async.} = @@ -1052,7 +1059,7 @@ proc acquireDatabaseLock*( return err("error acquiring a lock: " & error) if locked == "f": - return err("another waku instance is currently executing a migration") + return err(EXPECTED_LOCK_ERROR) return ok() @@ -1508,3 +1515,36 @@ method deleteMessagesOlderThanTimestamp*( return err("error in deleteMessagesOlderThanTimestamp: " & $error) return ok() + +############################################ +## TODO: start splitting code better + +const AnalyzeQuery = "ANALYZE messages" +const AnalyzeTableLockId = 111111 ## An arbitrary and different lock id +const RunAnalyzeInterval = timer.days(1) + +proc analyzeTableLoop(self: PostgresDriver) {.async.} = + ## The database stats should be calculated regularly so that the planner + ## picks up the proper indexes and we have better query performance. + while true: + debug "analyzeTableLoop lock db" + (await self.acquireDatabaseLock(AnalyzeTableLockId)).isOkOr: + if error != EXPECTED_LOCK_ERROR: + error "failed to acquire lock in analyzeTableLoop", error = error + await sleepAsync(RunAnalyzeInterval) + continue + + debug "analyzeTableLoop start analysis" + (await self.performWriteQuery(AnalyzeQuery)).isOkOr: + error "failed to run ANALYZE messages", error = error + + debug "analyzeTableLoop unlock db" + (await self.releaseDatabaseLock(AnalyzeTableLockId)).isOkOr: + error "failed to release lock analyzeTableLoop", error = error + + debug "analyzeTableLoop analysis completed" + + await sleepAsync(RunAnalyzeInterval) + +proc startAnalyzeTableLoop*(self: PostgresDriver) = + self.futLoopAnalyzeTable = self.analyzeTableLoop