Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dev extended em #195

Merged
merged 4 commits into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions JeMPI_Apps/JeMPI_Configuration/build.sbt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
ThisBuild / version := "0.1.0-SNAPSHOT"

ThisBuild / scalaVersion := "3.3.1"
ThisBuild / scalaVersion := "3.4.0"

// https://mvnrepository.com/artifact/org.scala-lang.modules/scala-parser-combinators
ThisBuild / libraryDependencies += "org.scala-lang.modules" %% "scala-parser-combinators" % "2.3.0"
Expand All @@ -18,4 +18,3 @@ lazy val root = (project in file("."))
.settings(
name := "JeMPI_Configuration"
)

Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,54 @@ object ScalaCustomFields {

def generate(config: Config): Any = {

def fieldDefs(): String =
def colFieldDefs(): String =
config.demographicFields.zipWithIndex
.map((f, i) => {
val colName = "COL_" + Utils.camelCaseToSnakeCase(f.fieldName)
s"""${" " * 2}private val ${colName.toUpperCase()} = ${i}"""
})
.mkString(sys.props("line.separator"))
end colFieldDefs

def colLinkFieldDefs(): String =
config.demographicFields
.filter(f => f.linkMetaData.isDefined)
.map(f => {
val colName = " COL_" + Utils.camelCaseToSnakeCase(f.fieldName)
s"""${" " * 2}${colName.toUpperCase()},"""
})
.mkString(sys.props("line.separator"))
.dropRight(1)
end colLinkFieldDefs

def colValidateFieldDefs(): String =
config.demographicFields
.filter(f => f.validateMetaData.isDefined)
.map(f => {
val colName = " COL_" + Utils.camelCaseToSnakeCase(f.fieldName)
s"""${" " * 2}${colName.toUpperCase()},"""
})
.mkString(sys.props("line.separator"))
.dropRight(1)
end colValidateFieldDefs

def colMatchFieldDefs(): String =
config.demographicFields
.filter(f => f.matchMetaData.isDefined)
.map(f => {
val colName = " COL_" + Utils.camelCaseToSnakeCase(f.fieldName)
s"""${" " * 2}${colName.toUpperCase()},"""
})
.mkString(sys.props("line.separator"))
.dropRight(1)
end colMatchFieldDefs

def fieldDefs(): String =
config.demographicFields
.map(f => {
val fieldName = Utils.snakeCaseToCamelCase(f.fieldName)
s"""${" " * 4}Field("${fieldName}", ${i}),"""
val colName = "COL_" + Utils.camelCaseToSnakeCase(f.fieldName)
s"""${" " * 4}Field("${fieldName}", ${colName.toUpperCase()}),"""
})
.mkString(sys.props("line.separator"))
.trim
Expand All @@ -35,10 +78,24 @@ object ScalaCustomFields {
|
|object CustomFields {
|
|${colFieldDefs()}
|
| val FIELDS: ArraySeq[Field] = ArraySeq(
| ${fieldDefs()}
| )
|
| val LINK_COLS: ArraySeq[Int] = ArraySeq(
|${colLinkFieldDefs()}
| )
|
| val VALIDATE_COLS: ArraySeq[Int] = ArraySeq(
|${colValidateFieldDefs()}
| )
|
| val MATCH_COLS: ArraySeq[Int] = ArraySeq(
|${colMatchFieldDefs()}
| )
|
|}
|""".stripMargin)
writer.flush()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@ object ScalaCustomInteractionEnvelop {

def generate(config: Config): Any = {

val muList =
for (t <- config.demographicFields.filter(f => f.linkMetaData.isDefined))
yield t
val muList = for (t <- config.demographicFields) yield t

def fieldDefs(): String =
muList.zipWithIndex
Expand Down Expand Up @@ -50,7 +48,6 @@ object ScalaCustomInteractionEnvelop {
writer.println(s"""
|import com.fasterxml.jackson.annotation.JsonIgnoreProperties
|
|
|@JsonIgnoreProperties(ignoreUnknown = true)
|case class ${custom_className}(
| contentType: String,
Expand Down Expand Up @@ -82,7 +79,6 @@ object ScalaCustomInteractionEnvelop {
writer.println(s"""
|import com.fasterxml.jackson.annotation.JsonIgnoreProperties
|
|
|@JsonIgnoreProperties(ignoreUnknown = true)
|case class ${custom_className}(
| contentType: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ object ScalaCustomMU {
def generate(config: Config): Any = {

def fieldDefs(): String =
config.demographicFields.zipWithIndex
.map((f, i) => {
config.demographicFields
.filter(f => f.linkMetaData.isDefined)
.map(f => {
val fieldName = Utils.snakeCaseToCamelCase(f.fieldName)
s"""${" " * 4}${fieldName}: Probability,"""
})
Expand All @@ -24,7 +25,9 @@ object ScalaCustomMU {
end fieldDefs

def probSeqDefs(): String =
config.demographicFields.zipWithIndex
config.demographicFields
.filter(f => f.linkMetaData.isDefined)
.zipWithIndex
.map((f, i) => {
s"""${" " * 12}Probability(muSeq.apply(${i}).m, muSeq.apply(${i}).u),"""
})
Expand Down
10 changes: 5 additions & 5 deletions JeMPI_Apps/JeMPI_EM_Scala/build.sbt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
ThisBuild / version := "0.1.0-SNAPSHOT"

ThisBuild / scalaVersion := "2.13.12"
ThisBuild / scalaVersion := "2.13.13"

lazy val root = (project in file("."))
.settings(
Expand All @@ -12,17 +12,17 @@ lazy val root = (project in file("."))
// https://mvnrepository.com/artifact/com.typesafe.scala-logging/scala-logging
"com.typesafe.scala-logging" %% "scala-logging" % "3.9.5",
// https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams
"org.apache.kafka" % "kafka-streams" % "3.6.1",
"org.apache.kafka" % "kafka-streams" % "3.7.0",
// https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients
"org.apache.kafka" % "kafka-clients" % "3.6.1",
"org.apache.kafka" % "kafka-clients" % "3.7.0",
// https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams-scala
"org.apache.kafka" %% "kafka-streams-scala" % "3.6.1",
"org.apache.kafka" %% "kafka-streams-scala" % "3.7.0",
// https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-jsr310
"com.fasterxml.jackson.datatype" % "jackson-datatype-jsr310" % "2.16.1",
"com.fasterxml.jackson.core" % "jackson-databind" % "2.16.1",
"com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.16.1",
// https://mvnrepository.com/artifact/ch.qos.logback/logback-classic
"ch.qos.logback" % "logback-classic" % "1.4.14"
"ch.qos.logback" % "logback-classic" % "1.5.3"
),
assembly / assemblyJarName := "em-scala-fatjar-1.0.jar",
assembly / assemblyMergeStrategy := {
Expand Down
2 changes: 1 addition & 1 deletion JeMPI_Apps/JeMPI_EM_Scala/project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version = 1.9.8
sbt.version = 1.9.9
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,40 @@ import scala.collection.immutable.ArraySeq

object CustomFields {

private val COL_GIVEN_NAME = 0
private val COL_FAMILY_NAME = 1
private val COL_GENDER = 2
private val COL_DOB = 3
private val COL_CITY = 4
private val COL_PHONE_NUMBER = 5
private val COL_NATIONAL_ID = 6

val FIELDS: ArraySeq[Field] = ArraySeq(
Field("givenName", 0),
Field("familyName", 1),
Field("gender", 2),
Field("dob", 3),
Field("city", 4),
Field("phoneNumber", 5),
Field("nationalId", 6)
Field("givenName", COL_GIVEN_NAME),
Field("familyName", COL_FAMILY_NAME),
Field("gender", COL_GENDER),
Field("dob", COL_DOB),
Field("city", COL_CITY),
Field("phoneNumber", COL_PHONE_NUMBER),
Field("nationalId", COL_NATIONAL_ID)
)

val LINK_COLS: ArraySeq[Int] = ArraySeq(
COL_GIVEN_NAME,
COL_FAMILY_NAME,
COL_GENDER,
COL_DOB,
COL_CITY,
COL_PHONE_NUMBER,
COL_NATIONAL_ID
)

val VALIDATE_COLS: ArraySeq[Int] = ArraySeq(

)

val MATCH_COLS: ArraySeq[Int] = ArraySeq(

)

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package org.jembi.jempi.em

import com.fasterxml.jackson.annotation.JsonIgnoreProperties


@JsonIgnoreProperties(ignoreUnknown = true)
case class CustomInteractionEnvelop(
contentType: String,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,21 @@
package org.jembi.jempi.em

import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import com.fasterxml.jackson.module.scala.{ClassTagExtensions, DefaultScalaModule}
import com.fasterxml.jackson.module.scala.{
ClassTagExtensions,
DefaultScalaModule
}
import com.typesafe.scalalogging.LazyLogging
import org.apache.kafka.common.serialization.{Serde, Serdes}
import org.apache.kafka.streams.kstream.{Consumed, KStream}
import org.apache.kafka.streams.{KafkaStreams, StreamsBuilder, StreamsConfig}
import org.jembi.jempi.em.kafka.Config.{CFG_KAFKA_APPLICATION_ID, CFG_KAFKA_BOOTSTRAP_SERVERS, CFG_KAFKA_CLIENT_ID, CFG_KAFKA_TOPIC_INTERACTION_EM}
import org.jembi.jempi.em.CustomFields.LINK_COLS
import org.jembi.jempi.em.kafka.Config.{
CFG_KAFKA_APPLICATION_ID,
CFG_KAFKA_BOOTSTRAP_SERVERS,
CFG_KAFKA_CLIENT_ID,
CFG_KAFKA_TOPIC_INTERACTION_EM
}
import org.jembi.jempi.em.kafka.Producer

import java.util.Properties
Expand Down Expand Up @@ -85,9 +94,10 @@ object EM_Scala extends LazyLogging {
)
val (mu, ms) = Profile.profile(EM_Task.run(interactions_))

CustomFields.FIELDS.zipWithIndex.foreach(x =>
Utils.printMU(x._1.name, mu(x._2))
)
for (i <- LINK_COLS.indices) {
Utils.printMU(CustomFields.FIELDS.apply(LINK_COLS.apply(i)).name, mu(i))
}

logger.info(s"$ms ms")
Producer.send(tag, mu);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package org.jembi.jempi.em

import com.typesafe.scalalogging.LazyLogging
import org.jembi.jempi.em.CustomFields.FIELDS
import org.jembi.jempi.em.CustomFields.{FIELDS, LINK_COLS}
import org.jembi.jempi.em.Utils._

import java.lang.Math.log
Expand All @@ -16,6 +16,7 @@ object EM_Task extends LazyLogging {

val (gamma, ms2) = Profile.profile(
Gamma.getGamma(
CustomFields.LINK_COLS,
Map[String, Long](),
interactions.head,
interactions.tail
Expand Down Expand Up @@ -142,9 +143,13 @@ object EM_Task extends LazyLogging {
.map(x => x.tallies)
.fold(Tallies())((x, y) => addTallies(x, y))
val newMU = computeMU(tallies)
FIELDS.zipWithIndex.foreach(x =>
printTalliesAndMU(x._1.name, tallies.colTally(x._2), newMU(x._2))
)
for (i <- LINK_COLS.indices) {
printTalliesAndMU(
FIELDS.apply(LINK_COLS.apply(i)).name,
tallies.colTally(i),
newMU(i)
)
}
if (LOCK_U) {
runEM(iterations + 1, mergeMU(newMU, currentMU), gamma)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ object Gamma {

@tailrec
def getGamma(
cols: ArraySeq[Int],
gamma: Map[String, Long],
left: ArraySeq[String],
right: ParVector[ArraySeq[String]]
Expand All @@ -32,14 +33,15 @@ object Gamma {
}

val gamma: ParVector[String] =
interactions.map(right => gammaKey(left, right))
interactions.map(right => gammaKey(cols, left, right))
gamma.aggregate(Map[String, Long]())(sequenceOp, combineOp)
}

if (right.isEmpty) {
gamma
} else {
getGamma(
cols,
gamma ++ innerLoop(left, right)
.map { case (k: String, v: Long) =>
k -> (v + gamma.getOrElse(k, 0L))
Expand All @@ -51,10 +53,13 @@ object Gamma {
}

private def gammaKey(
cols : ArraySeq[Int],
left: ArraySeq[String],
right: ArraySeq[String]
): String = {
val key: ArraySeq[Int] = (left zip right).map { case (l, r) =>
val left_ = cols.map(i => left.apply(i))
val right_ = cols.map(i => right.apply(i))
val key: ArraySeq[Int] = (left_ zip right_).map { case (l, r) =>
if (l.isEmpty || r.isEmpty) {
Utils.GAMMA_TAG_MISSING
} else if (l.equals(r)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.jembi.jempi.em

import CustomFields.FIELDS
import scala.collection.immutable.ArraySeq

case class Tallies(colTally: ArraySeq[Tally] = FIELDS.map(_ => Tally()))
case class Tallies(colTally: ArraySeq[Tally] = CustomFields.LINK_COLS.map(_ => Tally()))
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package org.jembi.jempi.em

import com.typesafe.scalalogging.LazyLogging
import Jaro.jaro
import org.jembi.jempi.em.CustomFields.FIELDS
import org.jembi.jempi.em.CustomFields.{FIELDS, LINK_COLS}

import scala.collection.immutable.ArraySeq

Expand Down Expand Up @@ -108,7 +108,7 @@ object Utils extends LazyLogging {

Tallies(
ArraySeq
.range(0, FIELDS.length) // x.colTally.length)
.range(0, x.colTally.length)
.map(idx => addTally(x.colTally(idx), y.colTally(idx)))
)
}
Expand Down