Skip to content

Commit

Permalink
Merge pull request #263 from tetherless-world/#258
Browse files Browse the repository at this point in the history
PostgreSQL store implementation
  • Loading branch information
gordom6 committed Nov 27, 2020
2 parents 57dc22a + fef1963 commit 4f06961
Show file tree
Hide file tree
Showing 12 changed files with 744 additions and 3 deletions.
13 changes: 11 additions & 2 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,12 @@ jobs:
- run:
name: Build
command: |
docker-compose build kg-app neo4j
docker-compose build kg-app neo4j postgres
- run:
name: Run
background: true
command: |
docker-compose up --abort-on-container-exit kg-app neo4j
docker-compose up --abort-on-container-exit kg-app neo4j postgres
- run:
name: Wait for the server to start
command: |
Expand All @@ -103,6 +103,15 @@ jobs:
environment:
NEO4J_AUTH: neo4j/nC1aB4mji623s2Zs
NEO4JLABS_PLUGINS: '["apoc","graph-data-science"]'
- image: library/postgres:12.4
auth:
username: $DOCKER_ID
password: $DOCKER_PASSWORD
name: mcs-postgres
environment:
POSTGRES_USER: "mcs"
POSTGRES_PASSWORD: "7EAdu7jJvZNxxrNZ"
POSTGRES_DB: "kg"
steps:
- attach_workspace:
at: /tmp/workspace
Expand Down
7 changes: 7 additions & 0 deletions app/kg/conf/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,10 @@ play.filters.disabled += "play.filters.csrf.CSRFFilter"
play.filters.disabled += "play.filters.hosts.AllowedHostsFilter"
play.http.context = ${?BASE_HREF}
play.modules.enabled += "stores.KgStoresModule"
postgres.profile = "slick.jdbc.PostgresProfile$"
postgres.db.connectionPool = "HikariCP"
postgres.db.driver = "org.postgresql.Driver"
postgres.db.password = "7EAdu7jJvZNxxrNZ"
postgres.db.url = "jdbc:postgresql://mcs-postgres:5432/kg"
postgres.db.user = "mcs"

6 changes: 5 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,15 @@ lazy val kgLib =
.settings(
libraryDependencies ++= Seq(
// Implement search in the MemStore (and thus the TestStore)
"com.github.tminglei" %% "slick-pg" % "0.19.3",
"com.outr" %% "lucene4s" % "1.9.1",
"com.typesafe.slick" %% "slick" % "3.3.3",
"com.typesafe.slick" %% "slick-hikaricp" % "3.3.3",
"io.github.tetherless-world" %% "twxplore-base" % twxploreVersion,
"io.github.tetherless-world" %% "twxplore-test" % twxploreVersion % Test,
"me.tongfei" % "progressbar" % "0.8.1",
"org.neo4j.driver" % "neo4j-java-driver" % "4.0.1"
"org.neo4j.driver" % "neo4j-java-driver" % "4.0.1",
"org.postgresql" % "postgresql" % "42.2.18"
),
maintainer := maintainerValue,
name := "mcs-kg-lib"
Expand Down
16 changes: 16 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -72,5 +72,21 @@ services:
restart: unless-stopped
volumes:
- neo4j-data:/data

postgres:
container_name: mcs-postgres
environment:
POSTGRES_USER: "mcs"
POSTGRES_PASSWORD: "7EAdu7jJvZNxxrNZ"
POSTGRES_DB: "kg"
image: library/postgres:12.4
networks:
- mcs
ports:
- 127.0.0.1:5432:5432
restart: unless-stopped
volumes:
- postgres-data:/var/lib/postgresql/data

volumes:
neo4j-data:
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import com.google.inject.AbstractModule
import io.github.tetherlessworld.mcsapps.lib.kg.stores.empty.EmptyKgStore
import io.github.tetherlessworld.mcsapps.lib.kg.stores.mem.MemKgStore
import io.github.tetherlessworld.mcsapps.lib.kg.stores.neo4j.{Neo4jKgCommandStore, Neo4jKgQueryStore}
import io.github.tetherlessworld.mcsapps.lib.kg.stores.postgres.{PostgresKgCommandStore, PostgresKgQueryStore}
import io.github.tetherlessworld.mcsapps.lib.kg.stores.test.TestKgStore
import org.slf4j.LoggerFactory
import play.api.{Configuration, Environment}
Expand All @@ -22,6 +23,10 @@ final class KgStoresModule(environment: Environment, configuration: Configuratio
bind(classOf[KgCommandStore]).to(classOf[MemKgStore])
bind(classOf[KgQueryStore]).to(classOf[MemKgStore])
}
case "postgres" => {
bind(classOf[KgCommandStore]).to(classOf[PostgresKgCommandStore])
bind(classOf[KgQueryStore]).to(classOf[PostgresKgQueryStore])
}
case "test" => {
logger.info("using test stores")
bind(classOf[KgCommandStore]).to(classOf[TestKgStore])
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package io.github.tetherlessworld.mcsapps.lib.kg.stores

import com.typesafe.config.Config
import slick.basic.{BasicProfile, DatabaseConfig}

import scala.reflect.ClassTag

trait HasDatabaseConfig[T <: BasicProfile] {
protected val databaseConfig: DatabaseConfig[T]

protected lazy val profile: T = databaseConfig.profile
protected lazy val db: T#Backend#Database = databaseConfig.db
}

trait HasDatabaseConfigProvider[T <: BasicProfile] extends HasDatabaseConfig[T] {
protected val databaseConfigProvider: SlickDatabaseConfigProvider[T]

protected val databaseConfig = databaseConfigProvider.databaseConfig
}

class SlickDatabaseConfigProvider[T <: BasicProfile : ClassTag](val databaseConfig: DatabaseConfig[T]) {
def this(path: String) =
this(DatabaseConfig.forConfig[T](path))

def this(path: String, config: Config) =
this(DatabaseConfig.forConfig[T](path, config))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
package io.github.tetherlessworld.mcsapps.lib.kg.stores.postgres

import io.github.tetherlessworld.mcsapps.lib.kg.models.edge.KgEdge
import io.github.tetherlessworld.mcsapps.lib.kg.models.node.{KgNode, KgNodeLabel}
import io.github.tetherlessworld.mcsapps.lib.kg.models.source.KgSource
import io.github.tetherlessworld.mcsapps.lib.kg.stores.HasDatabaseConfigProvider

import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}


abstract class AbstractPostgresKgStore(protected val databaseConfigProvider: PostgresStoreConfigProvider) extends HasDatabaseConfigProvider[ExtendedPostgresProfile] {
import profile.api._

protected val SentencesDelimChar = '|'
protected val SentencesDelimString: String = SentencesDelimChar.toString

protected val NodeContextTopEdgesLimit = 10
protected val NodeLabelContextTopEdgesLimit = 10

protected lazy val edges = TableQuery[EdgeTable]
protected lazy val edgeLabels = TableQuery[EdgeLabelTable]
protected lazy val edgeSources = TableQuery[EdgeSourceTable]
protected object nodes extends TableQuery(new NodeTable(_)) {
def getById(id: String) =
nodes.filter(_.id === id).result.headOption

def withLabelSource(nodeQuery: Query[NodeTable, NodeRow, Seq] = nodes) =
for {
node <- nodeQuery
nodeNodeLabel <- nodeNodeLabels if nodeNodeLabel.nodeId === node.id
nodeLabel <- nodeNodeLabel.nodeLabel
nodeSource <- nodeSources if nodeSource.nodeId === node.id
source <- nodeSource.source
} yield (node, nodeLabel, source)

}
protected object nodeLabels extends TableQuery(new NodeLabelTable(_)) {
def withNodes(nodeLabelQuery: Query[NodeLabelTable, NodeLabelRow, Seq] = nodeLabels) = for {
((nodeLabel, _), node) <- nodeLabelQuery
.join(nodeNodeLabels).on(_.label === _.nodeLabelLabel)
.join(nodes).on(_._2.nodeId === _.id)
} yield (nodeLabel, node)

def withSourceNode(nodeLabelQuery: Query[NodeLabelTable, NodeLabelRow, Seq] = nodeLabels) =
for {
nodeLabel <- nodeLabelQuery
nodeNodeLabel <- nodeNodeLabels if nodeNodeLabel.nodeLabelLabel === nodeLabel.label
nodeLabelSource <- nodeLabelSources if nodeLabelSource.nodeLabelLabel === nodeLabel.label
node <- nodeNodeLabel.node
source <- nodeLabelSource.source
nodeSource <- nodeSources if nodeSource.nodeId === node.id
nodeNodeLabel <- nodeNodeLabel.nodeLabel
nodeNodeSource <- nodeSource.source
} yield (nodeLabel, source, node, nodeNodeSource, nodeNodeLabel)
}
protected lazy val nodeLabelEdges = TableQuery[NodeLabelEdgeTable]
protected lazy val nodeLabelEdgeSources = TableQuery[NodeLabelEdgeSourceTable]
protected lazy val nodeLabelSources = TableQuery[NodeLabelSourceTable]
protected lazy val nodeNodeLabels = TableQuery[NodeNodeLabelTable]
protected lazy val nodeSources = TableQuery[NodeSourceTable]
protected lazy val sources = TableQuery[SourceTable]

protected lazy val tables =
List(edges, edgeLabels, edgeSources, nodes, nodeLabels, nodeLabelEdges, nodeLabelEdgeSources, nodeLabelSources, nodeSources, nodeNodeLabels, sources)

protected lazy val tablesDdlObject = tables.map(_.schema).reduce((left, right) => left ++ right)

protected final def runTransaction[R](a: DBIOAction[R, NoStream, Effect.All]): Future[R] = {
db.run(a.transactionally)
}

protected final def runSyncTransaction[R](a: DBIOAction[R, NoStream, Effect.All], duration: Duration = Duration.Inf): R = {
Await.result(runTransaction(a), duration)
}

protected final def runSync[R](a: DBIOAction[R, NoStream, Effect.All], duration: Duration = Duration.Inf): R = {
Await.result(db.run(a), duration)
}

protected final case class EdgeRow(id: String, objectNodeId: String, predicate: String, sentences: String, subjectNodeId: String) {
def toKgEdge(labels: List[String], sourceIds: List[String]) = KgEdge(
id = id,
labels = labels,
`object` = objectNodeId,
predicate = predicate,
sentences = sentences.split(SentencesDelimChar).toList,
sourceIds = sourceIds,
subject = subjectNodeId
)
}
protected final class EdgeTable(tag: Tag) extends Table[EdgeRow](tag, "edge") {
def id = column[String]("id", O.PrimaryKey)
def objectNodeId = column[String]("object_node_id")
def predicate = column[String]("predicate")
def sentences = column[String]("sentences")
def subjectNodeId = column[String]("subject_node_id")

def * = (id, objectNodeId, predicate, sentences, subjectNodeId) <> (EdgeRow.tupled, EdgeRow.unapply)

def objectNode = foreignKey("object_node_fk", objectNodeId, nodes)(_.id)
def subjectNode = foreignKey("subject_node_fk", subjectNodeId, nodes)(_.id)

def unique_constraint = index("_edge_unique_idx", (objectNodeId, subjectNodeId, predicate), unique = true)
}

protected final case class EdgeLabelRow(edgeId: String, label: String)
protected final class EdgeLabelTable(tag: Tag) extends Table[EdgeLabelRow](tag, "edge_label") {
def edgeId = column[String]("edge_id")
def label = column[String]("label")

def * = (edgeId, label) <> (EdgeLabelRow.tupled, EdgeLabelRow.unapply)

def edge = foreignKey("edge_fk", edgeId, edges)(_.id)

def pk = primaryKey("edge_label_pk", (edgeId, label))
}

protected final case class EdgeSourceRow(edgeId: String, sourceId: String)
protected final class EdgeSourceTable(tag: Tag) extends Table[EdgeSourceRow](tag, "edge_x_source") {
def edgeId = column[String]("edge_id")
def sourceId = column[String]("source_id")

def * = (edgeId, sourceId) <> (EdgeSourceRow.tupled, EdgeSourceRow.unapply)

def edge = foreignKey("edge_fk", edgeId, edges)(_.id)

def pk = primaryKey("edge_source_pk", (edgeId, sourceId))
}

protected final case class NodeRow(id: String, inDegree: Option[Short], outDegree: Option[Short], pageRank: Option[Float], pos: Option[Char], wordNetSenseNumber: Option[Short]) {
def toKgNode(labels: List[String], sourceIds: List[String]) = KgNode(
id = id,
inDegree = inDegree.map(_.toInt),
labels = labels,
outDegree = outDegree.map(_.toInt),
pageRank = pageRank.map(_.toDouble),
pos = pos,
sourceIds = sourceIds,
wordNetSenseNumber = wordNetSenseNumber.map(_.toInt)
)
}
protected final class NodeTable(tag: Tag) extends Table[NodeRow](tag, "node") {
def id = column[String]("id", O.PrimaryKey)
def inDegree = column[Option[Short]]("in_degree")
def outDegree = column[Option[Short]]("out_degree")
def pageRank = column[Option[Float]]("page_rank")
def pos = column[Option[Char]]("pos", O.Length(1))
def wordNetSenseNumber = column[Option[Short]]("word_net_sense_number")

def * = (id, inDegree, outDegree, pageRank, pos, wordNetSenseNumber) <> (NodeRow.tupled, NodeRow.unapply)
}

protected final case class NodeLabelRow(label: String, pageRank: Option[Float]) {
def toKgNodeLabel(nodes: List[KgNode], sourceIds: List[String]) = KgNodeLabel(
nodeLabel = label,
nodes = nodes,
pageRank = pageRank.map(_.toDouble),
sourceIds = sourceIds
)
}
protected final class NodeLabelTable(tag: Tag) extends Table[NodeLabelRow](tag, "node_label") {
def label = column[String]("label", O.PrimaryKey)
def pageRank = column[Option[Float]]("page_rank")

def * = (label, pageRank) <> (NodeLabelRow.tupled, NodeLabelRow.unapply)
}

protected final case class NodeLabelEdgeRow(objectNodeLabelLabel: String, subjectNodeLabelLabel: String)
protected final class NodeLabelEdgeTable(tag: Tag) extends Table[NodeLabelEdgeRow](tag, "node_label_edge") {
def objectNodeLabelLabel = column[String]("object_node_label_label")
def subjectNodeLabelLabel = column[String]("subject_node_label_label")

def * = (objectNodeLabelLabel, subjectNodeLabelLabel) <> (NodeLabelEdgeRow.tupled, NodeLabelEdgeRow.unapply)

def objectNodeLabel = foreignKey("object_node_label_fk", objectNodeLabelLabel, nodeLabels)(_.label)
def subjectNodeLabel = foreignKey("subject_node_label_fk", subjectNodeLabelLabel, nodeLabels)(_.label)

def pk = primaryKey("node_label_edge_pk", (objectNodeLabelLabel, subjectNodeLabelLabel))

def unique_constraint = index("node_label_edge_unique_idx", (objectNodeLabelLabel, subjectNodeLabelLabel), unique = true)
}

protected final case class NodeLabelEdgeSourceRow(nodeLabelEdgeObjectNodeLabelLabel: String, nodeLabelEdgeSubjectNodeLabelLabel: String, sourceId: String)
protected final class NodeLabelEdgeSourceTable(tag: Tag) extends Table[NodeLabelEdgeSourceRow](tag, "node_label_edge_x_source") {
def nodeLabelEdgeObjectNodeLabelLabel = column[String]("node_node_label_edge_object_node_label_label")
def nodeLabelEdgeSubjectNodeLabelLabel = column[String]("subject_node_label_edge_subject_node_label_label")
def sourceId = column[String]("source_id")

def * = (nodeLabelEdgeObjectNodeLabelLabel, nodeLabelEdgeSubjectNodeLabelLabel, sourceId) <> (NodeLabelEdgeSourceRow.tupled, NodeLabelEdgeSourceRow.unapply)

def nodeLabelEdge = foreignKey("node_label_edge_fk", (nodeLabelEdgeObjectNodeLabelLabel, nodeLabelEdgeSubjectNodeLabelLabel), nodeLabelEdges)(nodeLabelEdgeTable => (nodeLabelEdgeTable.objectNodeLabelLabel, nodeLabelEdgeTable.subjectNodeLabelLabel))
def source = foreignKey("source_fk", sourceId, sources)(_.id)

def pk = primaryKey("node_label_edge_source_pk", (nodeLabelEdgeObjectNodeLabelLabel, nodeLabelEdgeSubjectNodeLabelLabel, sourceId))
}

protected final case class NodeLabelSourceRow(nodeLabelLabel: String, sourceId: String)
protected final class NodeLabelSourceTable(tag: Tag) extends Table[NodeLabelSourceRow](tag, "node_label_x_source") {
def nodeLabelLabel = column[String]("node_label_label")
def sourceId = column[String]("source_id")

def * = (nodeLabelLabel, sourceId) <> (NodeLabelSourceRow.tupled, NodeLabelSourceRow.unapply)

def nodeLabel = foreignKey("node_label_fk", nodeLabelLabel, nodeLabels)(_.label)
def source = foreignKey("source_fk", sourceId, sources)(_.id)

def pk = primaryKey("node_label_source_pk", (nodeLabelLabel, sourceId))
}

protected final case class NodeNodeLabelRow(nodeId: String, nodeLabelLabel: String)
protected final class NodeNodeLabelTable(tag: Tag) extends Table[NodeNodeLabelRow](tag, "node_x_node_label") {
def nodeId = column[String]("node_id")
def nodeLabelLabel = column[String]("label")

def * = (nodeId, nodeLabelLabel) <> (NodeNodeLabelRow.tupled, NodeNodeLabelRow.unapply)

def node = foreignKey("node_fk", nodeId, nodes)(_.id)
def nodeLabel = foreignKey("node_label_fk", nodeLabelLabel, nodeLabels)(_.label)

def pk = primaryKey("node_label_pk", (nodeId, nodeLabelLabel))
}

protected final case class NodeSourceRow(nodeId: String, sourceId: String)
protected final class NodeSourceTable(tag: Tag) extends Table[NodeSourceRow](tag, "node_x_source") {
def nodeId = column[String]("node_id")
def sourceId = column[String]("source_id")

def * = (nodeId, sourceId) <> (NodeSourceRow.tupled, NodeSourceRow.unapply)

def node = foreignKey("node_fk", nodeId, nodes)(_.id)
def source = foreignKey("source_fk", sourceId, sources)(_.id)

def pk = primaryKey("node_source_pk", (nodeId, sourceId))
}

protected final case class SourceRow(id: String, label: String) {
def toKgSource = KgSource(id = id, label = label)
}
protected final class SourceTable(tag: Tag) extends Table[SourceRow](tag, "source") {
def id = column[String]("id", O.PrimaryKey)
def label = column[String]("label")

def * = (id, label) <> (SourceRow.tupled, SourceRow.unapply)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package io.github.tetherlessworld.mcsapps.lib.kg.stores.postgres

import slick.basic.Capability
import slick.jdbc.{JdbcCapabilities}
import com.github.tminglei.slickpg._

trait ExtendedPostgresProfile extends ExPostgresProfile with PgArraySupport {
override protected def computeCapabilities: Set[Capability] =
super.computeCapabilities + JdbcCapabilities.insertOrUpdate

override val api = ExtendedAPI

object ExtendedAPI extends API with ArrayImplicits {
implicit val strListTypeMapper = new SimpleArrayJdbcType[String]("text").to(_.toList)
}
}

object ExtendedPostgresProfile extends ExtendedPostgresProfile
Loading

0 comments on commit 4f06961

Please sign in to comment.