Skip to content

Commit

Permalink
Merge pull request #273 from tetherless-world/#258
Browse files Browse the repository at this point in the history
Add page rank for nodes and node labels
  • Loading branch information
123joshuawu authored Dec 18, 2020
2 parents fc73013 + f801de5 commit 740a319
Show file tree
Hide file tree
Showing 4 changed files with 194 additions and 11 deletions.
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,4 @@ services:

volumes:
neo4j-data:
postgres-data:
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ abstract class AbstractPostgresKgStore(protected val databaseConfigProvider: Pos
labels = labels,
`object` = objectNodeId,
predicate = predicate,
sentences = sentences.split(SentencesDelimChar).toList,
sentences = sentences.split(SentencesDelimChar).filter(!_.trim.isEmpty).toList,
sourceIds = sourceIds,
subject = subjectNodeId
)
Expand Down Expand Up @@ -151,7 +151,7 @@ abstract class AbstractPostgresKgStore(protected val databaseConfigProvider: Pos
def * = (id, inDegree, outDegree, pageRank, pos, wordNetSenseNumber) <> (NodeRow.tupled, NodeRow.unapply)
}

protected final case class NodeLabelRow(label: String, pageRank: Option[Float]) {
protected final case class NodeLabelRow(inDegree: Option[Short], label: String, outDegree: Option[Short], pageRank: Option[Float]) {
def toKgNodeLabel(nodes: List[KgNode], sourceIds: List[String]) = KgNodeLabel(
nodeLabel = label,
nodes = nodes,
Expand All @@ -160,10 +160,12 @@ abstract class AbstractPostgresKgStore(protected val databaseConfigProvider: Pos
)
}
protected final class NodeLabelTable(tag: Tag) extends Table[NodeLabelRow](tag, "node_label") {
def inDegree = column[Option[Short]]("in_degree")
def label = column[String]("label", O.PrimaryKey)
def outDegree = column[Option[Short]]("out_degree")
def pageRank = column[Option[Float]]("page_rank")

def * = (label, pageRank) <> (NodeLabelRow.tupled, NodeLabelRow.unapply)
def * = (inDegree, label, outDegree, pageRank) <> (NodeLabelRow.tupled, NodeLabelRow.unapply)
}

protected final case class NodeLabelEdgeRow(objectNodeLabelLabel: String, subjectNodeLabelLabel: String)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class PostgresKgCommandStore @Inject()(configProvider: PostgresStoreConfigProvid
val stream = kgNodes.toStream
List(
nodes.insertOrUpdateAll(stream.map(_.toRow)),
nodeLabels.insertOrUpdateAll(stream.flatMap(_.labels.map(NodeLabelRow(_, None)))),
nodeLabels.insertOrUpdateAll(stream.flatMap(_.labels.map(NodeLabelRow(None, _, None, None)))),
nodeNodeLabels.insertOrUpdateAll(stream.flatMap(node => node.labels.map(label => NodeNodeLabelRow(node.id, label)))),
nodeLabelSources.insertOrUpdateAll(stream.flatMap(node => node.labels.flatMap(label => node.sourceIds.map(NodeLabelSourceRow(label, _))))),
nodeSources.insertOrUpdateAll(stream.flatMap(node => node.sourceIds.map(NodeSourceRow(node.id, _))))
Expand All @@ -79,8 +79,11 @@ class PostgresKgCommandStore @Inject()(configProvider: PostgresStoreConfigProvid
override final def close(): Unit = {
runSyncTransaction(DBIO.seq(
writeNodeLabelEdgesAction,
writeNodeLabelEdgeSourcesAction
writeNodeLabelEdgeSourcesAction,
))

writeNodePageRank()
writeNodeLabelPageRank()
}

override final def putData(data: KgData) =
Expand All @@ -107,6 +110,170 @@ class PostgresKgCommandStore @Inject()(configProvider: PostgresStoreConfigProvid
override final def putSources(sources: Iterator[KgSource]): Unit =
runSyncTransaction(DBIO.sequence(batchedSourceInserts(sources)))

private def writeNodePageRank(convergenceThreshold: Double = 0.0000001, dampingFactor: Double = 0.85, maxIterations: Int = 20): Unit = {
val numNodesAction = nodes.size.result
// Initialize node page ranks to 1/n where n is the number of nodes
val initializeNodePageRanksAction = numNodesAction.flatMap {
case (numNodes) => nodes.map(_.pageRank).update(Some((1.0 / numNodes).toFloat))
}
// Initialize node in/out degrees and page ranks
val numNodes = runSyncTransaction(DBIO.sequence(List(initializeNodePageRanksAction, writeNodeDegreesAction)))(0)

if (numNodes == 0) {
return
}

for (_ <- 1 to maxIterations) {
// intellij shows writeNodePageRankAction returning a List[Double] but for some
// reason sbt compiles it as returning List[Any] so forced to convert to Double
// Update node page ranks and retrieve page rank delta value
val delta = runSyncTransaction(writeNodePageRankAction(dampingFactor.toFloat))(2).asInstanceOf[Number].doubleValue

// If page rank has converged, exit
if (delta < convergenceThreshold) {
return
}
}
}

private def writeNodeLabelPageRank(convergenceThreshold: Double = 0.0000001, dampingFactor: Double = 0.85, maxIterations: Int = 20): Unit = {
val numNodeLabelsAction = nodeLabels.size.result
val initializeNodeLabelPageRanksAction = numNodeLabelsAction.flatMap {
case (numNodeLabels) => nodeLabels.map(_.pageRank).update(Some((1.0 / numNodeLabels).toFloat))
}
val numNodes = runSyncTransaction(DBIO.sequence(List(initializeNodeLabelPageRanksAction, writeNodeLabelDegreesAction)))(0)

if (numNodes == 0) {
return
}

for (_ <- 1 to maxIterations) {
val delta = runSyncTransaction(writeNodeLabelPageRankAction(dampingFactor.toFloat))(2).asInstanceOf[Number].doubleValue

if (delta < convergenceThreshold) {
return
}
}
}

private def writeNodePageRankAction(dampingFactor: Float): DBIOAction[List[Any], NoStream, Effect] = {
val temporaryTableName = "temp_node_page_rank"
DBIO.sequence(List(
// Create temporary table
sqlu"""
CREATE TEMP TABLE #$temporaryTableName (
node_id VARCHAR,
page_rank REAL
) ON COMMIT DROP
""",
// Calculates new page rank for each node and saves in a temporary table
sqlu"""
INSERT INTO #$temporaryTableName (node_id, page_rank)
SELECT n.id as node_id, ${dampingFactor} * sum(n_neighbor.page_rank / n.out_degree) + (1 - ${dampingFactor}) as page_rank
FROM node n
JOIN edge e
ON e.object_node_id = n.id
JOIN node n_neighbor
ON n_neighbor.id = e.subject_node_id
GROUP BY n.id
""",
// Check if page rank has converged by calculating the difference
// between the new page ranks and the current page ranks
sql"""
SELECT sqrt(sum((new.page_rank - cur.page_rank)^2))
FROM #$temporaryTableName new
JOIN node cur
ON cur.id = new.node_id
""".as[Double].head,
// Update the nodes with the new page ranks
sqlu"""
UPDATE node
SET
page_rank = new.page_rank
FROM #$temporaryTableName new
WHERE node.id = new.node_id
"""
))
}

private def writeNodeLabelPageRankAction(dampingFactor: Float): DBIOAction[List[Any], NoStream, Effect] = {
val temporaryTableName = "temp_node_label_page_rank"
DBIO.sequence(List(
sqlu"""
CREATE TEMP TABLE #$temporaryTableName (
label VARCHAR,
page_rank REAL
) ON COMMIT DROP
""",
sqlu"""
INSERT INTO #$temporaryTableName (label, page_rank)
SELECT nl.label as label, ${dampingFactor} * sum(nl_neighbor.page_rank / nl.out_degree) + (1 - ${dampingFactor}) as page_rank
FROM node_label nl
JOIN node_label_edge e
ON e.object_node_label_label = nl.label
JOIN node_label nl_neighbor
ON nl_neighbor.label = e.subject_node_label_label
GROUP BY nl.label
""",
sql"""
SELECT sqrt(sum((new.page_rank - cur.page_rank)^2))
FROM #$temporaryTableName new
JOIN node_label cur
ON cur.label = new.label
""".as[Double].head,
sqlu"""
UPDATE node_label
SET
page_rank = new.page_rank
FROM #$temporaryTableName new
WHERE node_label.label = new.label
""",
sqlu"DROP TABLE #$temporaryTableName"
))
}

private def writeNodeDegreesAction = {
sqlu"""
UPDATE node
SET
in_degree = in_edge.degree,
out_degree = out_edge.degree
FROM node n
JOIN (
SELECT n.id AS id, count(e.id) AS degree FROM node n
LEFT JOIN edge e ON e.subject_node_id = n.id
GROUP BY n.id
) as out_edge on out_edge.id = n.id
JOIN (
SELECT n.id AS id, count(e.id) AS degree FROM node n
LEFT JOIN edge e ON e.object_node_id = n.id
GROUP BY n.id
) as in_edge ON in_edge.id = n.id
WHERE node.id = n.id;
"""
}

private def writeNodeLabelDegreesAction = {
sqlu"""
UPDATE node_label
SET
in_degree = in_edge.degree,
out_degree = out_edge.degree
FROM node_label nl
JOIN (
SELECT subject_node_label_label as label, count(*) AS degree
FROM node_label_edge e
GROUP BY e.subject_node_label_label
) as out_edge on out_edge.label = nl.label
JOIN (
SELECT object_node_label_label as label, count(*) AS degree
FROM node_label_edge e
GROUP BY e.object_node_label_label
) as in_edge on in_edge.label = nl.label
WHERE node_label.label = nl.label;
"""
}

private def writeNodeLabelEdgesAction = {
val nodeLabelEdgePairsAction = (for {
(subjectNodeLabel, node) <- nodeLabels.withNodes()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ final class PostgresKgQueryStore @Inject()(configProvider: PostgresStoreConfigPr

private implicit val getKgEdge = GetResult(r => KgEdge(
id = r.rs.getString("id"),
labels = r.rs.getArray("labels").asInstanceOf[Array[Any]].toList.map(_.toString),
labels = r.rs.getArray("labels").getArray.asInstanceOf[Array[Any]].toList.map(_.toString),
`object` = r.rs.getString("object"),
predicate = r.rs.getString("predicate"),
sentences = (r.rs.getString("sentences")).split(SentencesDelimChar).toList,
sourceIds = r.rs.getArray("sourceIds").asInstanceOf[Array[Any]].toList.map(_.toString),
sentences = (r.rs.getString("sentences")).split(SentencesDelimChar).filter(!_.trim.isEmpty).toList,
sourceIds = r.rs.getArray("sourceIds").getArray.asInstanceOf[Array[Any]].toList.map(_.toString),
subject = r.rs.getString("subject")
))

Expand Down Expand Up @@ -89,7 +89,17 @@ final class PostgresKgQueryStore @Inject()(configProvider: PostgresStoreConfigPr

val relatedNodeLabels = toKgNodeLabels(runSyncTransaction(relatedNodeLabelWithNodeSourceAction)).toList

// TODO replace inner id order by with pageRank
// Returns the top edges of a given node grouped by predicate
// based on the object node page ranks
// Here are the steps used in the below query:
// 1. Get all edges with with given node as subject (e_outer)
// 2. For each edge in e_outer
// a. find edges with the same predicate
// b. order by object node page rank
// c. return top NodeContextTopEdgesLimit edges
// (e_top)
// 3. Add source and label information
// 4. Eliminate duplicate edges
val topEdgesQuery =
sql"""
SELECT
Expand All @@ -102,9 +112,12 @@ final class PostgresKgQueryStore @Inject()(configProvider: PostgresStoreConfigPr
e_top.subject_node_id AS subject
FROM edge e_outer
JOIN LATERAL (
SELECT * FROM edge e_inner
SELECT e_inner.*
FROM edge e_inner
JOIN node e_inner_obj_node
ON e_inner.object_node_id = e_inner_obj_node.id
WHERE e_inner.subject_node_id = ${id} AND e_inner.predicate = e_outer.predicate
ORDER BY e_inner.id
ORDER BY e_inner_obj_node.page_rank DESC, e_inner.id
LIMIT #$NodeContextTopEdgesLimit
) e_top ON e_outer.subject_node_id = ${id}
JOIN edge_x_source es ON es.edge_id = e_top.id
Expand Down

0 comments on commit 740a319

Please sign in to comment.