From 6ef1cea3cfafcafbf20fe8ea30d27f75712b8e48 Mon Sep 17 00:00:00 2001 From: 123joshuawu Date: Thu, 3 Dec 2020 21:27:58 -0500 Subject: [PATCH 1/7] add postgres-data volume --- docker-compose.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/docker-compose.yml b/docker-compose.yml index 4dabd91c..154022ee 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -90,3 +90,4 @@ services: volumes: neo4j-data: + postgres-data: From 35bb38a990b55cc52d7824302ae3bd9c584c0801 Mon Sep 17 00:00:00 2001 From: 123joshuawu Date: Tue, 15 Dec 2020 01:15:14 -0500 Subject: [PATCH 2/7] add in and out degree to node_label --- .../lib/kg/stores/postgres/AbstractPostgresKgStore.scala | 6 ++++-- .../lib/kg/stores/postgres/PostgresKgCommandStore.scala | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/lib/scala/kg/src/main/scala/io/github/tetherlessworld/mcsapps/lib/kg/stores/postgres/AbstractPostgresKgStore.scala b/lib/scala/kg/src/main/scala/io/github/tetherlessworld/mcsapps/lib/kg/stores/postgres/AbstractPostgresKgStore.scala index be5c5815..7f6b7b68 100644 --- a/lib/scala/kg/src/main/scala/io/github/tetherlessworld/mcsapps/lib/kg/stores/postgres/AbstractPostgresKgStore.scala +++ b/lib/scala/kg/src/main/scala/io/github/tetherlessworld/mcsapps/lib/kg/stores/postgres/AbstractPostgresKgStore.scala @@ -151,7 +151,7 @@ abstract class AbstractPostgresKgStore(protected val databaseConfigProvider: Pos def * = (id, inDegree, outDegree, pageRank, pos, wordNetSenseNumber) <> (NodeRow.tupled, NodeRow.unapply) } - protected final case class NodeLabelRow(label: String, pageRank: Option[Float]) { + protected final case class NodeLabelRow(inDegree: Option[Short], label: String, outDegree: Option[Short], pageRank: Option[Float]) { def toKgNodeLabel(nodes: List[KgNode], sourceIds: List[String]) = KgNodeLabel( nodeLabel = label, nodes = nodes, @@ -160,10 +160,12 @@ abstract class AbstractPostgresKgStore(protected val databaseConfigProvider: Pos ) } protected final class NodeLabelTable(tag: Tag) extends Table[NodeLabelRow](tag, "node_label") { + def inDegree = column[Option[Short]]("in_degree") def label = column[String]("label", O.PrimaryKey) + def outDegree = column[Option[Short]]("out_degree") def pageRank = column[Option[Float]]("page_rank") - def * = (label, pageRank) <> (NodeLabelRow.tupled, NodeLabelRow.unapply) + def * = (inDegree, label, outDegree, pageRank) <> (NodeLabelRow.tupled, NodeLabelRow.unapply) } protected final case class NodeLabelEdgeRow(objectNodeLabelLabel: String, subjectNodeLabelLabel: String) diff --git a/lib/scala/kg/src/main/scala/io/github/tetherlessworld/mcsapps/lib/kg/stores/postgres/PostgresKgCommandStore.scala b/lib/scala/kg/src/main/scala/io/github/tetherlessworld/mcsapps/lib/kg/stores/postgres/PostgresKgCommandStore.scala index 4133ac8b..23621925 100644 --- a/lib/scala/kg/src/main/scala/io/github/tetherlessworld/mcsapps/lib/kg/stores/postgres/PostgresKgCommandStore.scala +++ b/lib/scala/kg/src/main/scala/io/github/tetherlessworld/mcsapps/lib/kg/stores/postgres/PostgresKgCommandStore.scala @@ -61,7 +61,7 @@ class PostgresKgCommandStore @Inject()(configProvider: PostgresStoreConfigProvid val stream = kgNodes.toStream List( nodes.insertOrUpdateAll(stream.map(_.toRow)), - nodeLabels.insertOrUpdateAll(stream.flatMap(_.labels.map(NodeLabelRow(_, None)))), + nodeLabels.insertOrUpdateAll(stream.flatMap(_.labels.map(NodeLabelRow(None, _, None, None)))), nodeNodeLabels.insertOrUpdateAll(stream.flatMap(node => node.labels.map(label => NodeNodeLabelRow(node.id, label)))), nodeLabelSources.insertOrUpdateAll(stream.flatMap(node => node.labels.flatMap(label => node.sourceIds.map(NodeLabelSourceRow(label, _))))), nodeSources.insertOrUpdateAll(stream.flatMap(node => node.sourceIds.map(NodeSourceRow(node.id, _)))) From 786b247abc7295099053f981eb73694a402cc11c Mon Sep 17 00:00:00 2001 From: 123joshuawu Date: Tue, 15 Dec 2020 01:29:28 -0500 Subject: [PATCH 3/7] write node and node label degrees --- .../postgres/PostgresKgCommandStore.scala | 46 ++++++++++++++++++- 1 file changed, 45 insertions(+), 1 deletion(-) diff --git a/lib/scala/kg/src/main/scala/io/github/tetherlessworld/mcsapps/lib/kg/stores/postgres/PostgresKgCommandStore.scala b/lib/scala/kg/src/main/scala/io/github/tetherlessworld/mcsapps/lib/kg/stores/postgres/PostgresKgCommandStore.scala index 23621925..5d999467 100644 --- a/lib/scala/kg/src/main/scala/io/github/tetherlessworld/mcsapps/lib/kg/stores/postgres/PostgresKgCommandStore.scala +++ b/lib/scala/kg/src/main/scala/io/github/tetherlessworld/mcsapps/lib/kg/stores/postgres/PostgresKgCommandStore.scala @@ -79,7 +79,9 @@ class PostgresKgCommandStore @Inject()(configProvider: PostgresStoreConfigProvid override final def close(): Unit = { runSyncTransaction(DBIO.seq( writeNodeLabelEdgesAction, - writeNodeLabelEdgeSourcesAction + writeNodeLabelEdgeSourcesAction, + writeNodeDegreesAction, + writeNodeLabelDegreesAction )) } @@ -107,6 +109,48 @@ class PostgresKgCommandStore @Inject()(configProvider: PostgresStoreConfigProvid override final def putSources(sources: Iterator[KgSource]): Unit = runSyncTransaction(DBIO.sequence(batchedSourceInserts(sources))) + private def writeNodeDegreesAction = { + sqlu""" + UPDATE node + SET + in_degree = in_edge.degree, + out_degree = out_edge.degree + FROM node n + JOIN ( + SELECT n.id AS id, count(e.id) AS degree FROM node n + LEFT JOIN edge e ON e.subject_node_id = n.id + GROUP BY n.id + ) as out_edge on out_edge.id = n.id + JOIN ( + SELECT n.id AS id, count(e.id) AS degree FROM node n + LEFT JOIN edge e ON e.object_node_id = n.id + GROUP BY n.id + ) as in_edge ON in_edge.id = n.id + WHERE node.id = n.id; + """ + } + + private def writeNodeLabelDegreesAction = { + sqlu""" + UPDATE node_label + SET + in_degree = in_edge.degree, + out_degree = out_edge.degree + FROM node_label nl + JOIN ( + SELECT subject_node_label_label as label, count(*) AS degree + FROM node_label_edge e + GROUP BY e.subject_node_label_label + ) as out_edge on out_edge.label = nl.label + JOIN ( + SELECT object_node_label_label as label, count(*) AS degree + FROM node_label_edge e + GROUP BY e.object_node_label_label + ) as in_edge on in_edge.label = nl.label + WHERE node_label.label = nl.label; + """ + } + private def writeNodeLabelEdgesAction = { val nodeLabelEdgePairsAction = (for { (subjectNodeLabel, node) <- nodeLabels.withNodes() From 36f2190a87a211d81ed24b1c967bb98649e363b7 Mon Sep 17 00:00:00 2001 From: 123joshuawu Date: Tue, 15 Dec 2020 12:30:34 -0500 Subject: [PATCH 4/7] add pagerank for node and node labels --- .../postgres/AbstractPostgresKgStore.scala | 2 +- .../postgres/PostgresKgCommandStore.scala | 116 +++++++++++++++++- .../postgres/PostgresKgQueryStore.scala | 25 +++- 3 files changed, 134 insertions(+), 9 deletions(-) diff --git a/lib/scala/kg/src/main/scala/io/github/tetherlessworld/mcsapps/lib/kg/stores/postgres/AbstractPostgresKgStore.scala b/lib/scala/kg/src/main/scala/io/github/tetherlessworld/mcsapps/lib/kg/stores/postgres/AbstractPostgresKgStore.scala index 7f6b7b68..45024675 100644 --- a/lib/scala/kg/src/main/scala/io/github/tetherlessworld/mcsapps/lib/kg/stores/postgres/AbstractPostgresKgStore.scala +++ b/lib/scala/kg/src/main/scala/io/github/tetherlessworld/mcsapps/lib/kg/stores/postgres/AbstractPostgresKgStore.scala @@ -84,7 +84,7 @@ abstract class AbstractPostgresKgStore(protected val databaseConfigProvider: Pos labels = labels, `object` = objectNodeId, predicate = predicate, - sentences = sentences.split(SentencesDelimChar).toList, + sentences = sentences.split(SentencesDelimChar).filter(!_.trim.isEmpty).toList, sourceIds = sourceIds, subject = subjectNodeId ) diff --git a/lib/scala/kg/src/main/scala/io/github/tetherlessworld/mcsapps/lib/kg/stores/postgres/PostgresKgCommandStore.scala b/lib/scala/kg/src/main/scala/io/github/tetherlessworld/mcsapps/lib/kg/stores/postgres/PostgresKgCommandStore.scala index 5d999467..05127032 100644 --- a/lib/scala/kg/src/main/scala/io/github/tetherlessworld/mcsapps/lib/kg/stores/postgres/PostgresKgCommandStore.scala +++ b/lib/scala/kg/src/main/scala/io/github/tetherlessworld/mcsapps/lib/kg/stores/postgres/PostgresKgCommandStore.scala @@ -80,9 +80,10 @@ class PostgresKgCommandStore @Inject()(configProvider: PostgresStoreConfigProvid runSyncTransaction(DBIO.seq( writeNodeLabelEdgesAction, writeNodeLabelEdgeSourcesAction, - writeNodeDegreesAction, - writeNodeLabelDegreesAction )) + + writeNodePageRank() + writeNodeLabelPageRank() } override final def putData(data: KgData) = @@ -109,6 +110,117 @@ class PostgresKgCommandStore @Inject()(configProvider: PostgresStoreConfigProvid override final def putSources(sources: Iterator[KgSource]): Unit = runSyncTransaction(DBIO.sequence(batchedSourceInserts(sources))) + private def writeNodePageRank(convergenceThreshold: Double = 0.0000001, dampingFactor: Double = 0.85, maxIterations: Int = 20): Unit = { + val numNodesAction = nodes.size.result +// Initialize node page ranks to 1/n where n is the number of nodes + val initializeNodePageRanksAction = numNodesAction.flatMap { + case (numNodes) => nodes.map(_.pageRank).update(Some((1.0 / numNodes).toFloat)) + } +// Initialize node in/out degrees and page ranks + val numNodes = runSyncTransaction(DBIO.sequence(List(initializeNodePageRanksAction, writeNodeDegreesAction)))(0) + + if (numNodes == 0) { + return + } + + for (_ <- 1 to maxIterations) { +// intellij shows writeNodePageRankAction returning a List[Double] but for some +// reason sbt compiles it as returning List[Any] so forced to convert to Double +// Update node page ranks and retrieve page rank delta value + val delta = runSyncTransaction(writeNodePageRankAction(dampingFactor.toFloat))(1).asInstanceOf[Number].doubleValue + +// If page rank has converged, exit + if (delta < convergenceThreshold) { + return + } + } + } + + private def writeNodeLabelPageRank(convergenceThreshold: Double = 0.0000001, dampingFactor: Double = 0.85, maxIterations: Int = 20): Unit = { + val numNodeLabelsAction = nodeLabels.size.result + val initializeNodeLabelPageRanksAction = numNodeLabelsAction.flatMap { + case (numNodes) => nodes.map(_.pageRank).update(Some((1.0 / numNodes).toFloat)) + } + val numNodes = runSyncTransaction(DBIO.sequence(List(initializeNodeLabelPageRanksAction, writeNodeLabelDegreesAction)))(0) + + if (numNodes == 0) { + return + } + + for (_ <- 1 to maxIterations) { + val delta = runSyncTransaction(writeNodeLabelPageRankAction(dampingFactor.toFloat))(1).asInstanceOf[Number].doubleValue + + if (delta < convergenceThreshold) { + return + } + } + } + + private def writeNodePageRankAction(dampingFactor: Float) = { + val temporaryTableName = "temp_node_page_rank" + DBIO.sequence(List( + // Calculates new page rank for each node and saves in a temporary table + sqlu""" + SELECT n.id as node_id, ${dampingFactor} * sum(n_neighbor.page_rank / n.out_degree) + (1 - ${dampingFactor}) as page_rank + INTO #$temporaryTableName + FROM node n + JOIN edge e + ON e.object_node_id = n.id + JOIN node n_neighbor + ON n_neighbor.id = e.subject_node_id + GROUP BY n.id + """, + // Check if page rank has converged by calculating the difference + // between the new page ranks and the current page ranks + sql""" + SELECT sqrt(sum((new.page_rank - cur.page_rank)^2)) + FROM #$temporaryTableName new + JOIN node cur + ON cur.id = new.node_id + """.as[Double].head, + // Update the nodes with the new page ranks + sqlu""" + UPDATE node + SET + page_rank = new.page_rank + FROM #$temporaryTableName new + WHERE node.id = new.node_id + """, + // Drop the table + sqlu"DROP TABLE #$temporaryTableName" + )) + } + + private def writeNodeLabelPageRankAction(dampingFactor: Float) = { + val temporaryTableName = "temp_node_label_page_rank" + DBIO.sequence(List( + sqlu""" + SELECT nl.label as label, ${dampingFactor} * sum(nl_neighbor.page_rank / nl.out_degree) + (1 - ${dampingFactor}) as page_rank + INTO #$temporaryTableName + FROM node_label nl + JOIN node_label_edge e + ON e.object_node_label_label = nl.label + JOIN node_label nl_neighbor + ON nl_neighbor.label = e.subject_node_label_label + GROUP BY nl.label + """, + sql""" + SELECT sqrt(sum((new.page_rank - cur.page_rank)^2)) + FROM #$temporaryTableName new + JOIN node_label cur + ON cur.label = new.label + """.as[Double].head, + sqlu""" + UPDATE node_label + SET + page_rank = new.page_rank + FROM #$temporaryTableName new + WHERE node.label = new.label + """, + sqlu"DROP TABLE #$temporaryTableName" + )) + } + private def writeNodeDegreesAction = { sqlu""" UPDATE node diff --git a/lib/scala/kg/src/main/scala/io/github/tetherlessworld/mcsapps/lib/kg/stores/postgres/PostgresKgQueryStore.scala b/lib/scala/kg/src/main/scala/io/github/tetherlessworld/mcsapps/lib/kg/stores/postgres/PostgresKgQueryStore.scala index 40314fc4..cd0ea7d9 100644 --- a/lib/scala/kg/src/main/scala/io/github/tetherlessworld/mcsapps/lib/kg/stores/postgres/PostgresKgQueryStore.scala +++ b/lib/scala/kg/src/main/scala/io/github/tetherlessworld/mcsapps/lib/kg/stores/postgres/PostgresKgQueryStore.scala @@ -18,11 +18,11 @@ final class PostgresKgQueryStore @Inject()(configProvider: PostgresStoreConfigPr private implicit val getKgEdge = GetResult(r => KgEdge( id = r.rs.getString("id"), - labels = r.rs.getArray("labels").asInstanceOf[Array[Any]].toList.map(_.toString), + labels = r.rs.getArray("labels").getArray.asInstanceOf[Array[Any]].toList.map(_.toString), `object` = r.rs.getString("object"), predicate = r.rs.getString("predicate"), - sentences = (r.rs.getString("sentences")).split(SentencesDelimChar).toList, - sourceIds = r.rs.getArray("sourceIds").asInstanceOf[Array[Any]].toList.map(_.toString), + sentences = (r.rs.getString("sentences")).split(SentencesDelimChar).filter(!_.trim.isEmpty).toList, + sourceIds = r.rs.getArray("sourceIds").getArray.asInstanceOf[Array[Any]].toList.map(_.toString), subject = r.rs.getString("subject") )) @@ -89,7 +89,17 @@ final class PostgresKgQueryStore @Inject()(configProvider: PostgresStoreConfigPr val relatedNodeLabels = toKgNodeLabels(runSyncTransaction(relatedNodeLabelWithNodeSourceAction)).toList - // TODO replace inner id order by with pageRank + // Returns the top edges of a given node grouped by predicate + // based on the object node page ranks + // Here are the steps used in the below query: + // 1. Get all edges with with given node as subject (e_outer) + // 2. For each edge in e_outer + // a. find edges with the same predicate + // b. order by object node page rank + // c. return top NodeContextTopEdgesLimit edges + // (e_top) + // 3. Add source and label information + // 4. Eliminate duplicate edges val topEdgesQuery = sql""" SELECT @@ -102,9 +112,12 @@ final class PostgresKgQueryStore @Inject()(configProvider: PostgresStoreConfigPr e_top.subject_node_id AS subject FROM edge e_outer JOIN LATERAL ( - SELECT * FROM edge e_inner + SELECT e_inner.* + FROM edge e_inner + JOIN node e_inner_obj_node + ON e_inner.object_node_id = e_inner_obj_node.id WHERE e_inner.subject_node_id = ${id} AND e_inner.predicate = e_outer.predicate - ORDER BY e_inner.id + ORDER BY e_inner_obj_node.page_rank DESC, e_inner.id LIMIT #$NodeContextTopEdgesLimit ) e_top ON e_outer.subject_node_id = ${id} JOIN edge_x_source es ON es.edge_id = e_top.id From 2613d171a792a7ad7921196441c8084bbf5540ba Mon Sep 17 00:00:00 2001 From: 123joshuawu Date: Tue, 15 Dec 2020 13:08:13 -0500 Subject: [PATCH 5/7] fix node -> node_label typo --- .../lib/kg/stores/postgres/PostgresKgCommandStore.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/scala/kg/src/main/scala/io/github/tetherlessworld/mcsapps/lib/kg/stores/postgres/PostgresKgCommandStore.scala b/lib/scala/kg/src/main/scala/io/github/tetherlessworld/mcsapps/lib/kg/stores/postgres/PostgresKgCommandStore.scala index 05127032..92e82cef 100644 --- a/lib/scala/kg/src/main/scala/io/github/tetherlessworld/mcsapps/lib/kg/stores/postgres/PostgresKgCommandStore.scala +++ b/lib/scala/kg/src/main/scala/io/github/tetherlessworld/mcsapps/lib/kg/stores/postgres/PostgresKgCommandStore.scala @@ -139,7 +139,7 @@ class PostgresKgCommandStore @Inject()(configProvider: PostgresStoreConfigProvid private def writeNodeLabelPageRank(convergenceThreshold: Double = 0.0000001, dampingFactor: Double = 0.85, maxIterations: Int = 20): Unit = { val numNodeLabelsAction = nodeLabels.size.result val initializeNodeLabelPageRanksAction = numNodeLabelsAction.flatMap { - case (numNodes) => nodes.map(_.pageRank).update(Some((1.0 / numNodes).toFloat)) + case (numNodeLabels) => nodeLabels.map(_.pageRank).update(Some((1.0 / numNodeLabels).toFloat)) } val numNodes = runSyncTransaction(DBIO.sequence(List(initializeNodeLabelPageRanksAction, writeNodeLabelDegreesAction)))(0) @@ -215,7 +215,7 @@ class PostgresKgCommandStore @Inject()(configProvider: PostgresStoreConfigProvid SET page_rank = new.page_rank FROM #$temporaryTableName new - WHERE node.label = new.label + WHERE node_label.label = new.label """, sqlu"DROP TABLE #$temporaryTableName" )) From 73894988697ec8b017e983d995c15eee20035b9d Mon Sep 17 00:00:00 2001 From: 123joshuawu Date: Thu, 17 Dec 2020 14:35:34 -0500 Subject: [PATCH 6/7] return type as List[Any] --- .../lib/kg/stores/postgres/PostgresKgCommandStore.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/scala/kg/src/main/scala/io/github/tetherlessworld/mcsapps/lib/kg/stores/postgres/PostgresKgCommandStore.scala b/lib/scala/kg/src/main/scala/io/github/tetherlessworld/mcsapps/lib/kg/stores/postgres/PostgresKgCommandStore.scala index 92e82cef..b57b025c 100644 --- a/lib/scala/kg/src/main/scala/io/github/tetherlessworld/mcsapps/lib/kg/stores/postgres/PostgresKgCommandStore.scala +++ b/lib/scala/kg/src/main/scala/io/github/tetherlessworld/mcsapps/lib/kg/stores/postgres/PostgresKgCommandStore.scala @@ -156,7 +156,7 @@ class PostgresKgCommandStore @Inject()(configProvider: PostgresStoreConfigProvid } } - private def writeNodePageRankAction(dampingFactor: Float) = { + private def writeNodePageRankAction(dampingFactor: Float): DBIOAction[List[Any], NoStream, Effect] = { val temporaryTableName = "temp_node_page_rank" DBIO.sequence(List( // Calculates new page rank for each node and saves in a temporary table @@ -191,7 +191,7 @@ class PostgresKgCommandStore @Inject()(configProvider: PostgresStoreConfigProvid )) } - private def writeNodeLabelPageRankAction(dampingFactor: Float) = { + private def writeNodeLabelPageRankAction(dampingFactor: Float): DBIOAction[List[Any], NoStream, Effect] = { val temporaryTableName = "temp_node_label_page_rank" DBIO.sequence(List( sqlu""" From f801de5374e943d7ad86a18cf8f170628f7b1ba9 Mon Sep 17 00:00:00 2001 From: 123joshuawu Date: Thu, 17 Dec 2020 14:52:51 -0500 Subject: [PATCH 7/7] use temporary tables --- .../postgres/PostgresKgCommandStore.scala | 25 +++++++++++++------ 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/lib/scala/kg/src/main/scala/io/github/tetherlessworld/mcsapps/lib/kg/stores/postgres/PostgresKgCommandStore.scala b/lib/scala/kg/src/main/scala/io/github/tetherlessworld/mcsapps/lib/kg/stores/postgres/PostgresKgCommandStore.scala index b57b025c..c118715f 100644 --- a/lib/scala/kg/src/main/scala/io/github/tetherlessworld/mcsapps/lib/kg/stores/postgres/PostgresKgCommandStore.scala +++ b/lib/scala/kg/src/main/scala/io/github/tetherlessworld/mcsapps/lib/kg/stores/postgres/PostgresKgCommandStore.scala @@ -127,7 +127,7 @@ class PostgresKgCommandStore @Inject()(configProvider: PostgresStoreConfigProvid // intellij shows writeNodePageRankAction returning a List[Double] but for some // reason sbt compiles it as returning List[Any] so forced to convert to Double // Update node page ranks and retrieve page rank delta value - val delta = runSyncTransaction(writeNodePageRankAction(dampingFactor.toFloat))(1).asInstanceOf[Number].doubleValue + val delta = runSyncTransaction(writeNodePageRankAction(dampingFactor.toFloat))(2).asInstanceOf[Number].doubleValue // If page rank has converged, exit if (delta < convergenceThreshold) { @@ -148,7 +148,7 @@ class PostgresKgCommandStore @Inject()(configProvider: PostgresStoreConfigProvid } for (_ <- 1 to maxIterations) { - val delta = runSyncTransaction(writeNodeLabelPageRankAction(dampingFactor.toFloat))(1).asInstanceOf[Number].doubleValue + val delta = runSyncTransaction(writeNodeLabelPageRankAction(dampingFactor.toFloat))(2).asInstanceOf[Number].doubleValue if (delta < convergenceThreshold) { return @@ -159,10 +159,17 @@ class PostgresKgCommandStore @Inject()(configProvider: PostgresStoreConfigProvid private def writeNodePageRankAction(dampingFactor: Float): DBIOAction[List[Any], NoStream, Effect] = { val temporaryTableName = "temp_node_page_rank" DBIO.sequence(List( + // Create temporary table + sqlu""" + CREATE TEMP TABLE #$temporaryTableName ( + node_id VARCHAR, + page_rank REAL + ) ON COMMIT DROP + """, // Calculates new page rank for each node and saves in a temporary table sqlu""" + INSERT INTO #$temporaryTableName (node_id, page_rank) SELECT n.id as node_id, ${dampingFactor} * sum(n_neighbor.page_rank / n.out_degree) + (1 - ${dampingFactor}) as page_rank - INTO #$temporaryTableName FROM node n JOIN edge e ON e.object_node_id = n.id @@ -185,9 +192,7 @@ class PostgresKgCommandStore @Inject()(configProvider: PostgresStoreConfigProvid page_rank = new.page_rank FROM #$temporaryTableName new WHERE node.id = new.node_id - """, - // Drop the table - sqlu"DROP TABLE #$temporaryTableName" + """ )) } @@ -195,8 +200,14 @@ class PostgresKgCommandStore @Inject()(configProvider: PostgresStoreConfigProvid val temporaryTableName = "temp_node_label_page_rank" DBIO.sequence(List( sqlu""" + CREATE TEMP TABLE #$temporaryTableName ( + label VARCHAR, + page_rank REAL + ) ON COMMIT DROP + """, + sqlu""" + INSERT INTO #$temporaryTableName (label, page_rank) SELECT nl.label as label, ${dampingFactor} * sum(nl_neighbor.page_rank / nl.out_degree) + (1 - ${dampingFactor}) as page_rank - INTO #$temporaryTableName FROM node_label nl JOIN node_label_edge e ON e.object_node_label_label = nl.label