diff --git a/.circleci/config.yml b/.circleci/config.yml index 9ee36dcc..d4c3d17b 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -74,12 +74,12 @@ jobs: - run: name: Build command: | - docker-compose build kg-app neo4j + docker-compose build kg-app neo4j postgres - run: name: Run background: true command: | - docker-compose up --abort-on-container-exit kg-app neo4j + docker-compose up --abort-on-container-exit kg-app neo4j postgres - run: name: Wait for the server to start command: | @@ -103,6 +103,15 @@ jobs: environment: NEO4J_AUTH: neo4j/nC1aB4mji623s2Zs NEO4JLABS_PLUGINS: '["apoc","graph-data-science"]' + - image: library/postgres:12.4 + auth: + username: $DOCKER_ID + password: $DOCKER_PASSWORD + name: mcs-postgres + environment: + POSTGRES_USER: "mcs" + POSTGRES_PASSWORD: "7EAdu7jJvZNxxrNZ" + POSTGRES_DB: "kg" steps: - attach_workspace: at: /tmp/workspace diff --git a/app/kg/conf/application.conf b/app/kg/conf/application.conf index d9b16f82..ad0f5c02 100644 --- a/app/kg/conf/application.conf +++ b/app/kg/conf/application.conf @@ -10,3 +10,10 @@ play.filters.disabled += "play.filters.csrf.CSRFFilter" play.filters.disabled += "play.filters.hosts.AllowedHostsFilter" play.http.context = ${?BASE_HREF} play.modules.enabled += "stores.KgStoresModule" +postgres.profile = "slick.jdbc.PostgresProfile$" +postgres.db.connectionPool = "HikariCP" +postgres.db.driver = "org.postgresql.Driver" +postgres.db.password = "7EAdu7jJvZNxxrNZ" +postgres.db.url = "jdbc:postgresql://mcs-postgres:5432/kg" +postgres.db.user = "mcs" + diff --git a/build.sbt b/build.sbt index 41e6296f..72d7a535 100644 --- a/build.sbt +++ b/build.sbt @@ -46,11 +46,15 @@ lazy val kgLib = .settings( libraryDependencies ++= Seq( // Implement search in the MemStore (and thus the TestStore) + "com.github.tminglei" %% "slick-pg" % "0.19.3", "com.outr" %% "lucene4s" % "1.9.1", + "com.typesafe.slick" %% "slick" % "3.3.3", + "com.typesafe.slick" %% "slick-hikaricp" % "3.3.3", "io.github.tetherless-world" %% "twxplore-base" % twxploreVersion, "io.github.tetherless-world" %% "twxplore-test" % twxploreVersion % Test, "me.tongfei" % "progressbar" % "0.8.1", - "org.neo4j.driver" % "neo4j-java-driver" % "4.0.1" + "org.neo4j.driver" % "neo4j-java-driver" % "4.0.1", + "org.postgresql" % "postgresql" % "42.2.18" ), maintainer := maintainerValue, name := "mcs-kg-lib" diff --git a/docker-compose.yml b/docker-compose.yml index 3b8c6a6b..4dabd91c 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -72,5 +72,21 @@ services: restart: unless-stopped volumes: - neo4j-data:/data + + postgres: + container_name: mcs-postgres + environment: + POSTGRES_USER: "mcs" + POSTGRES_PASSWORD: "7EAdu7jJvZNxxrNZ" + POSTGRES_DB: "kg" + image: library/postgres:12.4 + networks: + - mcs + ports: + - 127.0.0.1:5432:5432 + restart: unless-stopped + volumes: + - postgres-data:/var/lib/postgresql/data + volumes: neo4j-data: diff --git a/lib/scala/kg/src/main/scala/io/github/tetherlessworld/mcsapps/lib/kg/stores/KgStoresModule.scala b/lib/scala/kg/src/main/scala/io/github/tetherlessworld/mcsapps/lib/kg/stores/KgStoresModule.scala index 58255cf9..f0cc5c6e 100644 --- a/lib/scala/kg/src/main/scala/io/github/tetherlessworld/mcsapps/lib/kg/stores/KgStoresModule.scala +++ b/lib/scala/kg/src/main/scala/io/github/tetherlessworld/mcsapps/lib/kg/stores/KgStoresModule.scala @@ -4,6 +4,7 @@ import com.google.inject.AbstractModule import io.github.tetherlessworld.mcsapps.lib.kg.stores.empty.EmptyKgStore import io.github.tetherlessworld.mcsapps.lib.kg.stores.mem.MemKgStore import io.github.tetherlessworld.mcsapps.lib.kg.stores.neo4j.{Neo4jKgCommandStore, Neo4jKgQueryStore} +import io.github.tetherlessworld.mcsapps.lib.kg.stores.postgres.{PostgresKgCommandStore, PostgresKgQueryStore} import io.github.tetherlessworld.mcsapps.lib.kg.stores.test.TestKgStore import org.slf4j.LoggerFactory import play.api.{Configuration, Environment} @@ -22,6 +23,10 @@ final class KgStoresModule(environment: Environment, configuration: Configuratio bind(classOf[KgCommandStore]).to(classOf[MemKgStore]) bind(classOf[KgQueryStore]).to(classOf[MemKgStore]) } + case "postgres" => { + bind(classOf[KgCommandStore]).to(classOf[PostgresKgCommandStore]) + bind(classOf[KgQueryStore]).to(classOf[PostgresKgQueryStore]) + } case "test" => { logger.info("using test stores") bind(classOf[KgCommandStore]).to(classOf[TestKgStore]) diff --git a/lib/scala/kg/src/main/scala/io/github/tetherlessworld/mcsapps/lib/kg/stores/SlickDatabaseConfigProvider.scala b/lib/scala/kg/src/main/scala/io/github/tetherlessworld/mcsapps/lib/kg/stores/SlickDatabaseConfigProvider.scala new file mode 100644 index 00000000..4e84cc96 --- /dev/null +++ b/lib/scala/kg/src/main/scala/io/github/tetherlessworld/mcsapps/lib/kg/stores/SlickDatabaseConfigProvider.scala @@ -0,0 +1,27 @@ +package io.github.tetherlessworld.mcsapps.lib.kg.stores + +import com.typesafe.config.Config +import slick.basic.{BasicProfile, DatabaseConfig} + +import scala.reflect.ClassTag + +trait HasDatabaseConfig[T <: BasicProfile] { + protected val databaseConfig: DatabaseConfig[T] + + protected lazy val profile: T = databaseConfig.profile + protected lazy val db: T#Backend#Database = databaseConfig.db +} + +trait HasDatabaseConfigProvider[T <: BasicProfile] extends HasDatabaseConfig[T] { + protected val databaseConfigProvider: SlickDatabaseConfigProvider[T] + + protected val databaseConfig = databaseConfigProvider.databaseConfig +} + +class SlickDatabaseConfigProvider[T <: BasicProfile : ClassTag](val databaseConfig: DatabaseConfig[T]) { + def this(path: String) = + this(DatabaseConfig.forConfig[T](path)) + + def this(path: String, config: Config) = + this(DatabaseConfig.forConfig[T](path, config)) +} diff --git a/lib/scala/kg/src/main/scala/io/github/tetherlessworld/mcsapps/lib/kg/stores/postgres/AbstractPostgresKgStore.scala b/lib/scala/kg/src/main/scala/io/github/tetherlessworld/mcsapps/lib/kg/stores/postgres/AbstractPostgresKgStore.scala new file mode 100644 index 00000000..be5c5815 --- /dev/null +++ b/lib/scala/kg/src/main/scala/io/github/tetherlessworld/mcsapps/lib/kg/stores/postgres/AbstractPostgresKgStore.scala @@ -0,0 +1,246 @@ +package io.github.tetherlessworld.mcsapps.lib.kg.stores.postgres + +import io.github.tetherlessworld.mcsapps.lib.kg.models.edge.KgEdge +import io.github.tetherlessworld.mcsapps.lib.kg.models.node.{KgNode, KgNodeLabel} +import io.github.tetherlessworld.mcsapps.lib.kg.models.source.KgSource +import io.github.tetherlessworld.mcsapps.lib.kg.stores.HasDatabaseConfigProvider + +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, Future} + + +abstract class AbstractPostgresKgStore(protected val databaseConfigProvider: PostgresStoreConfigProvider) extends HasDatabaseConfigProvider[ExtendedPostgresProfile] { + import profile.api._ + + protected val SentencesDelimChar = '|' + protected val SentencesDelimString: String = SentencesDelimChar.toString + + protected val NodeContextTopEdgesLimit = 10 + protected val NodeLabelContextTopEdgesLimit = 10 + + protected lazy val edges = TableQuery[EdgeTable] + protected lazy val edgeLabels = TableQuery[EdgeLabelTable] + protected lazy val edgeSources = TableQuery[EdgeSourceTable] + protected object nodes extends TableQuery(new NodeTable(_)) { + def getById(id: String) = + nodes.filter(_.id === id).result.headOption + + def withLabelSource(nodeQuery: Query[NodeTable, NodeRow, Seq] = nodes) = + for { + node <- nodeQuery + nodeNodeLabel <- nodeNodeLabels if nodeNodeLabel.nodeId === node.id + nodeLabel <- nodeNodeLabel.nodeLabel + nodeSource <- nodeSources if nodeSource.nodeId === node.id + source <- nodeSource.source + } yield (node, nodeLabel, source) + + } + protected object nodeLabels extends TableQuery(new NodeLabelTable(_)) { + def withNodes(nodeLabelQuery: Query[NodeLabelTable, NodeLabelRow, Seq] = nodeLabels) = for { + ((nodeLabel, _), node) <- nodeLabelQuery + .join(nodeNodeLabels).on(_.label === _.nodeLabelLabel) + .join(nodes).on(_._2.nodeId === _.id) + } yield (nodeLabel, node) + + def withSourceNode(nodeLabelQuery: Query[NodeLabelTable, NodeLabelRow, Seq] = nodeLabels) = + for { + nodeLabel <- nodeLabelQuery + nodeNodeLabel <- nodeNodeLabels if nodeNodeLabel.nodeLabelLabel === nodeLabel.label + nodeLabelSource <- nodeLabelSources if nodeLabelSource.nodeLabelLabel === nodeLabel.label + node <- nodeNodeLabel.node + source <- nodeLabelSource.source + nodeSource <- nodeSources if nodeSource.nodeId === node.id + nodeNodeLabel <- nodeNodeLabel.nodeLabel + nodeNodeSource <- nodeSource.source + } yield (nodeLabel, source, node, nodeNodeSource, nodeNodeLabel) + } + protected lazy val nodeLabelEdges = TableQuery[NodeLabelEdgeTable] + protected lazy val nodeLabelEdgeSources = TableQuery[NodeLabelEdgeSourceTable] + protected lazy val nodeLabelSources = TableQuery[NodeLabelSourceTable] + protected lazy val nodeNodeLabels = TableQuery[NodeNodeLabelTable] + protected lazy val nodeSources = TableQuery[NodeSourceTable] + protected lazy val sources = TableQuery[SourceTable] + + protected lazy val tables = + List(edges, edgeLabels, edgeSources, nodes, nodeLabels, nodeLabelEdges, nodeLabelEdgeSources, nodeLabelSources, nodeSources, nodeNodeLabels, sources) + + protected lazy val tablesDdlObject = tables.map(_.schema).reduce((left, right) => left ++ right) + + protected final def runTransaction[R](a: DBIOAction[R, NoStream, Effect.All]): Future[R] = { + db.run(a.transactionally) + } + + protected final def runSyncTransaction[R](a: DBIOAction[R, NoStream, Effect.All], duration: Duration = Duration.Inf): R = { + Await.result(runTransaction(a), duration) + } + + protected final def runSync[R](a: DBIOAction[R, NoStream, Effect.All], duration: Duration = Duration.Inf): R = { + Await.result(db.run(a), duration) + } + + protected final case class EdgeRow(id: String, objectNodeId: String, predicate: String, sentences: String, subjectNodeId: String) { + def toKgEdge(labels: List[String], sourceIds: List[String]) = KgEdge( + id = id, + labels = labels, + `object` = objectNodeId, + predicate = predicate, + sentences = sentences.split(SentencesDelimChar).toList, + sourceIds = sourceIds, + subject = subjectNodeId + ) + } + protected final class EdgeTable(tag: Tag) extends Table[EdgeRow](tag, "edge") { + def id = column[String]("id", O.PrimaryKey) + def objectNodeId = column[String]("object_node_id") + def predicate = column[String]("predicate") + def sentences = column[String]("sentences") + def subjectNodeId = column[String]("subject_node_id") + + def * = (id, objectNodeId, predicate, sentences, subjectNodeId) <> (EdgeRow.tupled, EdgeRow.unapply) + + def objectNode = foreignKey("object_node_fk", objectNodeId, nodes)(_.id) + def subjectNode = foreignKey("subject_node_fk", subjectNodeId, nodes)(_.id) + + def unique_constraint = index("_edge_unique_idx", (objectNodeId, subjectNodeId, predicate), unique = true) + } + + protected final case class EdgeLabelRow(edgeId: String, label: String) + protected final class EdgeLabelTable(tag: Tag) extends Table[EdgeLabelRow](tag, "edge_label") { + def edgeId = column[String]("edge_id") + def label = column[String]("label") + + def * = (edgeId, label) <> (EdgeLabelRow.tupled, EdgeLabelRow.unapply) + + def edge = foreignKey("edge_fk", edgeId, edges)(_.id) + + def pk = primaryKey("edge_label_pk", (edgeId, label)) + } + + protected final case class EdgeSourceRow(edgeId: String, sourceId: String) + protected final class EdgeSourceTable(tag: Tag) extends Table[EdgeSourceRow](tag, "edge_x_source") { + def edgeId = column[String]("edge_id") + def sourceId = column[String]("source_id") + + def * = (edgeId, sourceId) <> (EdgeSourceRow.tupled, EdgeSourceRow.unapply) + + def edge = foreignKey("edge_fk", edgeId, edges)(_.id) + + def pk = primaryKey("edge_source_pk", (edgeId, sourceId)) + } + + protected final case class NodeRow(id: String, inDegree: Option[Short], outDegree: Option[Short], pageRank: Option[Float], pos: Option[Char], wordNetSenseNumber: Option[Short]) { + def toKgNode(labels: List[String], sourceIds: List[String]) = KgNode( + id = id, + inDegree = inDegree.map(_.toInt), + labels = labels, + outDegree = outDegree.map(_.toInt), + pageRank = pageRank.map(_.toDouble), + pos = pos, + sourceIds = sourceIds, + wordNetSenseNumber = wordNetSenseNumber.map(_.toInt) + ) + } + protected final class NodeTable(tag: Tag) extends Table[NodeRow](tag, "node") { + def id = column[String]("id", O.PrimaryKey) + def inDegree = column[Option[Short]]("in_degree") + def outDegree = column[Option[Short]]("out_degree") + def pageRank = column[Option[Float]]("page_rank") + def pos = column[Option[Char]]("pos", O.Length(1)) + def wordNetSenseNumber = column[Option[Short]]("word_net_sense_number") + + def * = (id, inDegree, outDegree, pageRank, pos, wordNetSenseNumber) <> (NodeRow.tupled, NodeRow.unapply) + } + + protected final case class NodeLabelRow(label: String, pageRank: Option[Float]) { + def toKgNodeLabel(nodes: List[KgNode], sourceIds: List[String]) = KgNodeLabel( + nodeLabel = label, + nodes = nodes, + pageRank = pageRank.map(_.toDouble), + sourceIds = sourceIds + ) + } + protected final class NodeLabelTable(tag: Tag) extends Table[NodeLabelRow](tag, "node_label") { + def label = column[String]("label", O.PrimaryKey) + def pageRank = column[Option[Float]]("page_rank") + + def * = (label, pageRank) <> (NodeLabelRow.tupled, NodeLabelRow.unapply) + } + + protected final case class NodeLabelEdgeRow(objectNodeLabelLabel: String, subjectNodeLabelLabel: String) + protected final class NodeLabelEdgeTable(tag: Tag) extends Table[NodeLabelEdgeRow](tag, "node_label_edge") { + def objectNodeLabelLabel = column[String]("object_node_label_label") + def subjectNodeLabelLabel = column[String]("subject_node_label_label") + + def * = (objectNodeLabelLabel, subjectNodeLabelLabel) <> (NodeLabelEdgeRow.tupled, NodeLabelEdgeRow.unapply) + + def objectNodeLabel = foreignKey("object_node_label_fk", objectNodeLabelLabel, nodeLabels)(_.label) + def subjectNodeLabel = foreignKey("subject_node_label_fk", subjectNodeLabelLabel, nodeLabels)(_.label) + + def pk = primaryKey("node_label_edge_pk", (objectNodeLabelLabel, subjectNodeLabelLabel)) + + def unique_constraint = index("node_label_edge_unique_idx", (objectNodeLabelLabel, subjectNodeLabelLabel), unique = true) + } + + protected final case class NodeLabelEdgeSourceRow(nodeLabelEdgeObjectNodeLabelLabel: String, nodeLabelEdgeSubjectNodeLabelLabel: String, sourceId: String) + protected final class NodeLabelEdgeSourceTable(tag: Tag) extends Table[NodeLabelEdgeSourceRow](tag, "node_label_edge_x_source") { + def nodeLabelEdgeObjectNodeLabelLabel = column[String]("node_node_label_edge_object_node_label_label") + def nodeLabelEdgeSubjectNodeLabelLabel = column[String]("subject_node_label_edge_subject_node_label_label") + def sourceId = column[String]("source_id") + + def * = (nodeLabelEdgeObjectNodeLabelLabel, nodeLabelEdgeSubjectNodeLabelLabel, sourceId) <> (NodeLabelEdgeSourceRow.tupled, NodeLabelEdgeSourceRow.unapply) + + def nodeLabelEdge = foreignKey("node_label_edge_fk", (nodeLabelEdgeObjectNodeLabelLabel, nodeLabelEdgeSubjectNodeLabelLabel), nodeLabelEdges)(nodeLabelEdgeTable => (nodeLabelEdgeTable.objectNodeLabelLabel, nodeLabelEdgeTable.subjectNodeLabelLabel)) + def source = foreignKey("source_fk", sourceId, sources)(_.id) + + def pk = primaryKey("node_label_edge_source_pk", (nodeLabelEdgeObjectNodeLabelLabel, nodeLabelEdgeSubjectNodeLabelLabel, sourceId)) + } + + protected final case class NodeLabelSourceRow(nodeLabelLabel: String, sourceId: String) + protected final class NodeLabelSourceTable(tag: Tag) extends Table[NodeLabelSourceRow](tag, "node_label_x_source") { + def nodeLabelLabel = column[String]("node_label_label") + def sourceId = column[String]("source_id") + + def * = (nodeLabelLabel, sourceId) <> (NodeLabelSourceRow.tupled, NodeLabelSourceRow.unapply) + + def nodeLabel = foreignKey("node_label_fk", nodeLabelLabel, nodeLabels)(_.label) + def source = foreignKey("source_fk", sourceId, sources)(_.id) + + def pk = primaryKey("node_label_source_pk", (nodeLabelLabel, sourceId)) + } + + protected final case class NodeNodeLabelRow(nodeId: String, nodeLabelLabel: String) + protected final class NodeNodeLabelTable(tag: Tag) extends Table[NodeNodeLabelRow](tag, "node_x_node_label") { + def nodeId = column[String]("node_id") + def nodeLabelLabel = column[String]("label") + + def * = (nodeId, nodeLabelLabel) <> (NodeNodeLabelRow.tupled, NodeNodeLabelRow.unapply) + + def node = foreignKey("node_fk", nodeId, nodes)(_.id) + def nodeLabel = foreignKey("node_label_fk", nodeLabelLabel, nodeLabels)(_.label) + + def pk = primaryKey("node_label_pk", (nodeId, nodeLabelLabel)) + } + + protected final case class NodeSourceRow(nodeId: String, sourceId: String) + protected final class NodeSourceTable(tag: Tag) extends Table[NodeSourceRow](tag, "node_x_source") { + def nodeId = column[String]("node_id") + def sourceId = column[String]("source_id") + + def * = (nodeId, sourceId) <> (NodeSourceRow.tupled, NodeSourceRow.unapply) + + def node = foreignKey("node_fk", nodeId, nodes)(_.id) + def source = foreignKey("source_fk", sourceId, sources)(_.id) + + def pk = primaryKey("node_source_pk", (nodeId, sourceId)) + } + + protected final case class SourceRow(id: String, label: String) { + def toKgSource = KgSource(id = id, label = label) + } + protected final class SourceTable(tag: Tag) extends Table[SourceRow](tag, "source") { + def id = column[String]("id", O.PrimaryKey) + def label = column[String]("label") + + def * = (id, label) <> (SourceRow.tupled, SourceRow.unapply) + } +} diff --git a/lib/scala/kg/src/main/scala/io/github/tetherlessworld/mcsapps/lib/kg/stores/postgres/ExtendedPostgresProfile.scala b/lib/scala/kg/src/main/scala/io/github/tetherlessworld/mcsapps/lib/kg/stores/postgres/ExtendedPostgresProfile.scala new file mode 100644 index 00000000..ca52f4f7 --- /dev/null +++ b/lib/scala/kg/src/main/scala/io/github/tetherlessworld/mcsapps/lib/kg/stores/postgres/ExtendedPostgresProfile.scala @@ -0,0 +1,18 @@ +package io.github.tetherlessworld.mcsapps.lib.kg.stores.postgres + +import slick.basic.Capability +import slick.jdbc.{JdbcCapabilities} +import com.github.tminglei.slickpg._ + +trait ExtendedPostgresProfile extends ExPostgresProfile with PgArraySupport { + override protected def computeCapabilities: Set[Capability] = + super.computeCapabilities + JdbcCapabilities.insertOrUpdate + + override val api = ExtendedAPI + + object ExtendedAPI extends API with ArrayImplicits { + implicit val strListTypeMapper = new SimpleArrayJdbcType[String]("text").to(_.toList) + } +} + +object ExtendedPostgresProfile extends ExtendedPostgresProfile \ No newline at end of file diff --git a/lib/scala/kg/src/main/scala/io/github/tetherlessworld/mcsapps/lib/kg/stores/postgres/PostgresKgCommandStore.scala b/lib/scala/kg/src/main/scala/io/github/tetherlessworld/mcsapps/lib/kg/stores/postgres/PostgresKgCommandStore.scala new file mode 100644 index 00000000..4133ac8b --- /dev/null +++ b/lib/scala/kg/src/main/scala/io/github/tetherlessworld/mcsapps/lib/kg/stores/postgres/PostgresKgCommandStore.scala @@ -0,0 +1,178 @@ +package io.github.tetherlessworld.mcsapps.lib.kg.stores.postgres + +import com.google.inject.{Inject, Singleton} +import io.github.tetherlessworld.mcsapps.lib.kg.data.KgData +import io.github.tetherlessworld.mcsapps.lib.kg.formats.kgtk.KgtkEdgeWithNodes +import io.github.tetherlessworld.mcsapps.lib.kg.models.edge.KgEdge +import io.github.tetherlessworld.mcsapps.lib.kg.models.node.{KgNode, KgNodeLabel} +import io.github.tetherlessworld.mcsapps.lib.kg.models.path.KgPath +import io.github.tetherlessworld.mcsapps.lib.kg.models.source.KgSource +import io.github.tetherlessworld.mcsapps.lib.kg.stores.{KgCommandStore, KgCommandStoreTransaction} +import org.slf4j.LoggerFactory + +import scala.concurrent.ExecutionContext + +@Singleton +class PostgresKgCommandStore @Inject()(configProvider: PostgresStoreConfigProvider)(implicit executionContext: ExecutionContext) extends AbstractPostgresKgStore(configProvider) with KgCommandStore { + import profile.api._ + + private var bootstrapped: Boolean = false + private val logger = LoggerFactory.getLogger(getClass) + + private class PostgresKgCommandStoreTransaction extends KgCommandStoreTransaction { + private implicit class KgEdgeWrapper(edge: KgEdge) { + def toRow: EdgeRow = EdgeRow( + id = edge.id, + objectNodeId = edge.`object`, + predicate = edge.predicate, + sentences = edge.sentences.mkString(SentencesDelimString), + subjectNodeId = edge.subject + ) + } + + private implicit class KgNodeWrapper(node: KgNode) { + def toRow: NodeRow = NodeRow( + id = node.id, + inDegree = node.inDegree.map(_.toShort), + outDegree = node.outDegree.map(_.toShort), + pageRank = node.pageRank.map(_.toFloat), + pos = node.pos, + wordNetSenseNumber = node.wordNetSenseNumber.map(_.toShort) + ) + } + + private implicit class KgSourceWrapper(source: KgSource) { + def toRow: SourceRow = SourceRow( + id = source.id, + label = source.label + ) + } + + private def batchedEdgeInserts(kgEdges: Iterator[KgEdge]) = { + val stream = kgEdges.toStream + List( + edges.insertOrUpdateAll(stream.map(_.toRow)), + edgeLabels.insertOrUpdateAll(stream.flatMap(edge => edge.labels.map(label => EdgeLabelRow(edge.id, label)))), + edgeSources.insertOrUpdateAll(stream.flatMap(edge => edge.sourceIds.map(sourceId => EdgeSourceRow(edge.id, sourceId)))) + ) + } + + private def batchedNodeInserts(kgNodes: Iterator[KgNode]) = { + val stream = kgNodes.toStream + List( + nodes.insertOrUpdateAll(stream.map(_.toRow)), + nodeLabels.insertOrUpdateAll(stream.flatMap(_.labels.map(NodeLabelRow(_, None)))), + nodeNodeLabels.insertOrUpdateAll(stream.flatMap(node => node.labels.map(label => NodeNodeLabelRow(node.id, label)))), + nodeLabelSources.insertOrUpdateAll(stream.flatMap(node => node.labels.flatMap(label => node.sourceIds.map(NodeLabelSourceRow(label, _))))), + nodeSources.insertOrUpdateAll(stream.flatMap(node => node.sourceIds.map(NodeSourceRow(node.id, _)))) + ) + } + + private def batchedSourceInserts(kgSources: Iterator[KgSource]) = + List(sources.insertOrUpdateAll(kgSources.map(_.toRow).toIterable)) + + override final def clear(): Unit = { + val tableNames = tables.map(_.baseTableRow.tableName).mkString(",") + runSyncTransaction(sqlu"TRUNCATE #$tableNames;") + } + + override final def close(): Unit = { + runSyncTransaction(DBIO.seq( + writeNodeLabelEdgesAction, + writeNodeLabelEdgeSourcesAction + )) + } + + override final def putData(data: KgData) = + runSyncTransaction( + DBIO.sequence( + batchedSourceInserts(data.sources.iterator) ++ + batchedNodeInserts(data.nodesUnranked.iterator) ++ + batchedEdgeInserts(data.edges.iterator) + ) + ) + + override final def putEdges(kgEdges: Iterator[KgEdge]) = + runSyncTransaction(DBIO.sequence(batchedEdgeInserts(kgEdges))) + + override final def putKgtkEdgesWithNodes(edgesWithNodes: Iterator[KgtkEdgeWithNodes]): Unit = + runSyncTransaction(DBIO.sequence( + batchedNodeInserts(edgesWithNodes.flatMap(_.nodes)) ++ + batchedEdgeInserts(edgesWithNodes.map(_.edge)) + )) + + override final def putNodes(nodes: Iterator[KgNode]): Unit = + runSyncTransaction(DBIO.sequence(batchedNodeInserts(nodes))) + + override final def putSources(sources: Iterator[KgSource]): Unit = + runSyncTransaction(DBIO.sequence(batchedSourceInserts(sources))) + + private def writeNodeLabelEdgesAction = { + val nodeLabelEdgePairsAction = (for { + (subjectNodeLabel, node) <- nodeLabels.withNodes() + edge <- edges if edge.subjectNodeId === node.id + objectNode <- edge.objectNode + objectNodeNodeLabel <- nodeNodeLabels if objectNodeNodeLabel.nodeId === objectNode.id && objectNodeNodeLabel.nodeLabelLabel =!= subjectNodeLabel.label + objectNodeLabel <- objectNodeNodeLabel.nodeLabel + } yield (subjectNodeLabel.label, objectNodeLabel.label)).result + + for { + nodeLabelEdgePairs <- nodeLabelEdgePairsAction + _ <- nodeLabelEdges ++= nodeLabelEdgePairs.distinct.map { case (objectNodeLabelLabel, subjectNodeLabelLabel) => + NodeLabelEdgeRow(objectNodeLabelLabel, subjectNodeLabelLabel) + } + } yield () + } + + private def writeNodeLabelEdgeSourcesAction = { + val objectNodeLabelEdgeSourcesQuery = for { + nodeLabelEdge <- nodeLabelEdges + objectNodeLabel <- nodeLabelEdge.objectNodeLabel + objectNodeLabelSourceSource <- nodeLabelSources if objectNodeLabelSourceSource.nodeLabelLabel === objectNodeLabel.label + objectNodeLabelSource <- objectNodeLabelSourceSource.source + } yield (nodeLabelEdge.objectNodeLabelLabel, nodeLabelEdge.subjectNodeLabelLabel, objectNodeLabelSource.id) + + val subjectNodeLabelEdgeSourcesQuery = for { + nodeLabelEdge <- nodeLabelEdges + subjectNodeLabel <- nodeLabelEdge.subjectNodeLabel + subjectNodeLabelSourceSource <- nodeLabelSources if subjectNodeLabelSourceSource.nodeLabelLabel === subjectNodeLabel.label + subjectNodeLabelSource <- subjectNodeLabelSourceSource.source + } yield (nodeLabelEdge.objectNodeLabelLabel, nodeLabelEdge.subjectNodeLabelLabel, subjectNodeLabelSource.id) + + val nodeLabelEdgeSourcesAction = (objectNodeLabelEdgeSourcesQuery ++ subjectNodeLabelEdgeSourcesQuery).result + + for { + nodeLabelEdgeSourcesResult <- nodeLabelEdgeSourcesAction + _ <- nodeLabelEdgeSources.insertOrUpdateAll(nodeLabelEdgeSourcesResult.map { + case (edgeObjectLabel, edgeSubjectLabel, sourceId) => NodeLabelEdgeSourceRow(edgeObjectLabel, edgeSubjectLabel, sourceId) + }) + } yield () + } + } + + bootstrapStore() + + private def bootstrapStore(): Unit = { + this.synchronized { + if (bootstrapped) { + logger.info("Postgres store already bootstrapped, skipping...") + return + } + + val tableCount = runSyncTransaction(sql"SELECT COUNT(table_name) FROM information_schema.tables WHERE table_schema='public'".as[Int].head) + + if (tableCount != 0) { + logger.info("Postgres database tables already created, skipping bootstrap...") + return + } + + runSyncTransaction(tablesDdlObject.create) + + bootstrapped = true + logger.info("Postgres store bootstrapped") + } + } + + override final def beginTransaction: KgCommandStoreTransaction = + new PostgresKgCommandStoreTransaction +} diff --git a/lib/scala/kg/src/main/scala/io/github/tetherlessworld/mcsapps/lib/kg/stores/postgres/PostgresKgQueryStore.scala b/lib/scala/kg/src/main/scala/io/github/tetherlessworld/mcsapps/lib/kg/stores/postgres/PostgresKgQueryStore.scala new file mode 100644 index 00000000..40314fc4 --- /dev/null +++ b/lib/scala/kg/src/main/scala/io/github/tetherlessworld/mcsapps/lib/kg/stores/postgres/PostgresKgQueryStore.scala @@ -0,0 +1,153 @@ +package io.github.tetherlessworld.mcsapps.lib.kg.stores.postgres + +import com.google.inject.Inject +import io.github.tetherlessworld.mcsapps.lib.kg.models.edge.KgEdge +import io.github.tetherlessworld.mcsapps.lib.kg.models.node.{KgNode, KgNodeContext, KgNodeLabel, KgNodeLabelContext} +import io.github.tetherlessworld.mcsapps.lib.kg.models.path.KgPath +import io.github.tetherlessworld.mcsapps.lib.kg.models.search.{KgSearchFacets, KgSearchQuery, KgSearchResult, KgSearchSort} +import io.github.tetherlessworld.mcsapps.lib.kg.models.source.KgSource +import io.github.tetherlessworld.mcsapps.lib.kg.stores.KgQueryStore +import javax.inject.Singleton +import slick.jdbc.GetResult + +import scala.concurrent.ExecutionContext + +@Singleton +final class PostgresKgQueryStore @Inject()(configProvider: PostgresStoreConfigProvider)(implicit executionContext: ExecutionContext) extends AbstractPostgresKgStore(configProvider) with KgQueryStore { + import profile.api._ + + private implicit val getKgEdge = GetResult(r => KgEdge( + id = r.rs.getString("id"), + labels = r.rs.getArray("labels").asInstanceOf[Array[Any]].toList.map(_.toString), + `object` = r.rs.getString("object"), + predicate = r.rs.getString("predicate"), + sentences = (r.rs.getString("sentences")).split(SentencesDelimChar).toList, + sourceIds = r.rs.getArray("sourceIds").asInstanceOf[Array[Any]].toList.map(_.toString), + subject = r.rs.getString("subject") + )) + + private def toKgNodeLabels(rows: Seq[(NodeLabelRow, String, NodeRow, String, String)]) = + rows + .groupBy(_._1.label) + .values + .map { rows => + rows.head._1.toKgNodeLabel( + sourceIds = rows.map(_._2).distinct.toList, + nodes = rows.groupBy(_._3.id).values.map { + nodeRows => + nodeRows.head._3.toKgNode( + labels = nodeRows.map(_._4).distinct.toList, + sourceIds = nodeRows.map(_._5).distinct.toList + ) + }.toList + ) + } + + override def getNode(id: String): Option[KgNode] = { + val nodeQuery = nodes.filter(_.id === id) + + val nodeAction = nodes.withLabelSource(nodeQuery).map { + case (node, nodeLabel, source) => (node, nodeLabel.label, source.id) + }.result + + runSyncTransaction(nodeAction) + .groupBy(_._1.id) + .values + .map { rows => + rows.head._1.toKgNode( + labels = rows.map(_._2).distinct.toList, + sourceIds = rows.map(_._3).distinct.toList + ) + } + .headOption + } + + override def getNodeContext(id: String): Option[KgNodeContext] = + runSyncTransaction(nodes.getById(id)).map { _ => + val nodeLabelQuery = for { + nodeNodeLabel <- nodeNodeLabels if nodeNodeLabel.nodeId === id + nodeLabel <- nodeNodeLabel.nodeLabel + } yield (nodeLabel) + + val relatedObjectNodeLabelQuery = (for { + nodeLabel <- nodeLabelQuery + relatedObjectNodeLabelEdge <- nodeLabelEdges if relatedObjectNodeLabelEdge.subjectNodeLabelLabel === nodeLabel.label + relatedObjectNodeLabel <- relatedObjectNodeLabelEdge.objectNodeLabel + } yield (relatedObjectNodeLabel)) + + val relatedSubjectNodeLabelQuery = (for { + nodeLabel <- nodeLabelQuery + relatedSubjectNodeLabelEdge <- nodeLabelEdges if relatedSubjectNodeLabelEdge.objectNodeLabelLabel === nodeLabel.label + relatedSubjectNodeLabel <- relatedSubjectNodeLabelEdge.subjectNodeLabel + } yield (relatedSubjectNodeLabel)) + + val relatedNodeLabelQuery = (relatedObjectNodeLabelQuery ++ relatedSubjectNodeLabelQuery) + val relatedNodeLabelWithNodeSourceAction = nodeLabels.withSourceNode(relatedNodeLabelQuery).map { + case (nodeLabel, source, nodeLabelNode, nodeLabelNodeSource, nodeLabelNodeLabel) => + (nodeLabel, source.id, nodeLabelNode, nodeLabelNodeLabel.label, nodeLabelNodeSource.id) + }.result + + val relatedNodeLabels = toKgNodeLabels(runSyncTransaction(relatedNodeLabelWithNodeSourceAction)).toList + + // TODO replace inner id order by with pageRank + val topEdgesQuery = + sql""" + SELECT + e_top.id AS id, + array_agg(DISTINCT el.label) AS labels, + e_top.object_node_id AS object, + e_outer.predicate AS predicate, + e_top.sentences AS sentences, + array_agg(DISTINCT s.id) AS sourceIds, + e_top.subject_node_id AS subject + FROM edge e_outer + JOIN LATERAL ( + SELECT * FROM edge e_inner + WHERE e_inner.subject_node_id = ${id} AND e_inner.predicate = e_outer.predicate + ORDER BY e_inner.id + LIMIT #$NodeContextTopEdgesLimit + ) e_top ON e_outer.subject_node_id = ${id} + JOIN edge_x_source es ON es.edge_id = e_top.id + JOIN source s ON s.id = es.source_id + JOIN edge_label el ON el.edge_id = e_top.id + GROUP BY e_outer.predicate, e_top.id, e_top.object_node_id, e_top.sentences, e_top.subject_node_id + ORDER BY e_outer.predicate + """.as[KgEdge] + + val topEdges = runSyncTransaction(topEdgesQuery).toList + + KgNodeContext( + topEdges = topEdges, + relatedNodeLabels = relatedNodeLabels + ) + } + + override def getNodeLabel(label: String): Option[KgNodeLabel] = { + val nodeLabelQuery = nodeLabels.filter(_.label === label) + + val nodeLabelAction = nodeLabels.withSourceNode(nodeLabelQuery).map { + case (nodeLabel, source, nodeLabelNode, nodeLabelNodeSource, nodeLabelNodeLabel) => + (nodeLabel, source.id, nodeLabelNode, nodeLabelNodeLabel.label, nodeLabelNodeSource.id) + }.result + + toKgNodeLabels(runSyncTransaction(nodeLabelAction)).headOption + } + + override def getNodeLabelContext(label: String): Option[KgNodeLabelContext] = None + + override def getSourcesById: Map[String, KgSource] = { + runSyncTransaction(sources.result).map(source => (source.id, source.toKgSource)).toMap + } + + override def getTotalEdgesCount: Int = runSyncTransaction(edges.size.result) + + override def getTotalNodesCount: Int = runSyncTransaction(nodes.size.result) + + override def isEmpty: Boolean = getTotalNodesCount == 0 + + override def search(limit: Int, offset: Int, query: KgSearchQuery, sorts: Option[List[KgSearchSort]]): List[KgSearchResult] = List() + + override def searchCount(query: KgSearchQuery): Int = 0 + + override def searchFacets(query: KgSearchQuery): KgSearchFacets = KgSearchFacets(List(), List()) +} diff --git a/lib/scala/kg/src/main/scala/io/github/tetherlessworld/mcsapps/lib/kg/stores/postgres/PostgresStoreConfigProvider.scala b/lib/scala/kg/src/main/scala/io/github/tetherlessworld/mcsapps/lib/kg/stores/postgres/PostgresStoreConfigProvider.scala new file mode 100644 index 00000000..c4885c7f --- /dev/null +++ b/lib/scala/kg/src/main/scala/io/github/tetherlessworld/mcsapps/lib/kg/stores/postgres/PostgresStoreConfigProvider.scala @@ -0,0 +1,11 @@ +package io.github.tetherlessworld.mcsapps.lib.kg.stores.postgres + +import com.typesafe.config.Config +import io.github.tetherlessworld.mcsapps.lib.kg.stores.SlickDatabaseConfigProvider + +trait PostgresStoreConfigProvider extends SlickDatabaseConfigProvider[ExtendedPostgresProfile] + +object PostgresStoreConfigProvider { + def apply() = new SlickDatabaseConfigProvider[ExtendedPostgresProfile]("postgres") with PostgresStoreConfigProvider + def apply(path: String, config: Config) = new SlickDatabaseConfigProvider[ExtendedPostgresProfile](path, config) with PostgresStoreConfigProvider +} diff --git a/lib/scala/kg/src/test/scala/io/github/tetherlessworld/mcsapps/lib/kg/stores/postgres/PostgresKgStoreSpec.scala b/lib/scala/kg/src/test/scala/io/github/tetherlessworld/mcsapps/lib/kg/stores/postgres/PostgresKgStoreSpec.scala new file mode 100644 index 00000000..7de79ffe --- /dev/null +++ b/lib/scala/kg/src/test/scala/io/github/tetherlessworld/mcsapps/lib/kg/stores/postgres/PostgresKgStoreSpec.scala @@ -0,0 +1,67 @@ +package io.github.tetherlessworld.mcsapps.lib.kg.stores.postgres + +import java.net.InetAddress + +import com.typesafe.config.ConfigFactory +import io.github.tetherlessworld.mcsapps.lib.kg.data.TestKgData +import io.github.tetherlessworld.mcsapps.lib.kg.stores.{KgCommandStore, KgCommandStoreBehaviors, KgQueryStore, KgQueryStoreBehaviors, KgStoreFactory, StoreTestMode} +import org.scalatest.{BeforeAndAfterAll, WordSpec} + +import scala.collection.JavaConverters.mapAsJavaMap + +class PostgresKgStoreSpec extends WordSpec with BeforeAndAfterAll with KgCommandStoreBehaviors with KgQueryStoreBehaviors { + import scala.concurrent.ExecutionContext.Implicits.global + + val testConfig = ConfigFactory.parseMap(mapAsJavaMap(Map( +// "postgres.profile" -> "slick.jdbc.PostgresProfile$", + "postgres.profile" -> "io.github.tetherlessworld.mcsapps.lib.kg.stores.postgres.ExtendedPostgresProfile$", + "postgres.db.connectionPool" -> "HikariCP", + "postgres.db.driver" -> "org.postgresql.Driver", + "postgres.db.password" -> "7EAdu7jJvZNxxrNZ", + "postgres.db.url" -> "jdbc:postgresql://mcs-postgres:5432/kg", + "postgres.db.user" -> "mcs" + ))) + + val configProvider = PostgresStoreConfigProvider("postgres", testConfig) + val command = new PostgresKgCommandStore(configProvider) + val query = new PostgresKgQueryStore(configProvider) + val postgresHostAddress = InetAddress.getByName("mcs-postgres").getHostAddress + val inTestingEnvironment = System.getenv("CI") != null || postgresHostAddress != "128.113.12.49" + + override def afterAll(): Unit = { + configProvider.databaseConfig.db.close() + } + + override def beforeAll(): Unit = { + if (!inTestingEnvironment) { + return + } + resetSut() + } + + private def resetSut(): Unit = { + if (!query.isEmpty) { + command.withTransaction { _.clear() } + } + command.withTransaction { _.putData(TestKgData) } + } + + private object PostgresKgStoreFactory extends KgStoreFactory { + override def apply(testMode: StoreTestMode)(f: (KgCommandStore, KgQueryStore) => Unit): Unit = { + try { + f(command, query) + } finally { + if (testMode == StoreTestMode.ReadWrite) { + resetSut() + } + } + } + } + + if (inTestingEnvironment) { + "The postgres store" can { + behave like commandStore(PostgresKgStoreFactory) + behave like queryStore(PostgresKgStoreFactory) + } + } +}