diff --git a/README.md b/README.md index 5317d5ad..93822702 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,7 @@ # DS-JedAI DS-JedAI (Distributed Spatial JedAI) is a system for Holistic Geospatial Interlinking for big geospatial data. -In Holistic Geospatial Interlinking we aim to discover all the topological relations between two geospatial datasets, +In Holistic Geospatial Interlinking, we aim to discover all the topological relations between the geometries of two geospatial datasets, using the [DE-9IM](https://en.wikipedia.org/wiki/DE-9IM) topological model. DS-JedAI offers a novel batch algorithm for geospatial interlinking and several algorithms for progressive Geospatial Interlinking. All the algorithms have been parallelized based on the MapReduce Framework. @@ -12,7 +12,7 @@ These algorithms take as input a budget (*BU*) that indicates the total number o and a weighting scheme *W* that quantify the probability of two geometries to relate. Furthermore, DS-JedAI allows temporal filtering in order to detect pairs that coincide not only spatially but also temporally. -DS-JedAI in implemented on top of Apache Spark and can run in any distributed or standalone environment that +DS-JedAI in implemented on top of **Apache Spark** and can run in any distributed or standalone environment that supports the execution of Apache Spark jobs. Currently, supports most of the RDF formats (i.e., NĀ­Triples, Turtle, RDF/JSON and RDF/XML), as well as CSV, TSV, GeoJSON and ESRI shapefiles. @@ -43,10 +43,11 @@ to relate. The idea is, that not all the verifications will be performed, but us Hence, the progressive algorithms prioritize the *BU* prominent verifications. The implemented algorithms are the following: -- **Progressive Giant**: Implements GIA.nt but prioritizes the BU most promising geometry pairs. +- **Progressive GIA.nt**: Implements GIA.nt but prioritizes the BU most promising geometry pairs. +- **Dynamic Progressive GIA.nt**: Extends Progressive GIA.nt by boosting the weight of the pairs that are associated with qualifying pairs. So the Priority Queue, + that stores the geometry pairs, dynamically changes during the verifications. - **Geometry Top-k** : For each geometry, finds and verifies its top-k. - **Geometry Reciprocal Top-k**: Verifies only the pairs *(s, t)* that *s* belongs to the top-k of *t* and *t* belongs to the top-k of *s*. -- **Geometry-Centric**: The prioritization of *(s, t)* is based on the mean weight of all the pairs that *t* is part of. - **RandomScheduling**: Implements random prioritization. Currently, the supported weighting schemes are: @@ -54,8 +55,11 @@ Currently, the supported weighting schemes are: - Co-occurrence Frequencies (CF) - Jaccard Similarity (JS) - Pearson's chi square test (PEARSON_x2) -- MBR INTERSECTION -- GEOMETRY POINTS +- Minimum Bounding Rectangle Overlap (MBRO) +- Inverse Sum of Points (ISP) + +The algorithms also supports composite schemes, where we combine two weighting schemes in the sense that the second weighting +scheme is for resolving the ties of the main one. The progressive Algorithms, the weighting schemes and the budget *BU* are specified in the configuration file. Advise the configuration template in `config/configurationTemplate.yaml` to see how you can specify them. To execute, run: @@ -66,8 +70,14 @@ Some additional options are the following: - **-p N**: specify the number of partitions - **-gt type**: specify the grid type for the spatial partitioning. Accepted values are KDBTREE and QUADTREE. -- **ws WS**: specify weighting scheme - allowed values: *CF, JS, MBR_INTERSECTION, PEARSON_X2, POINTS*. -- **progressiveAlgorithm PA**: specify progressive algorithm - allowed values: *PROGRESSIVE_GIANT, TOPK, RECIPROCAL_TOPK, GEOMETRY_CENTRIC, RANDOM* +- **mws WS**: specify the main weighting scheme - allowed values: *CF, JS, MBRO, PEARSON_X2, ISP*. +- **sws WS**: specify the secondary weighting scheme (optional)- allowed values: *CF, JS, MBRO, PEARSON_X2, ISP*, MBRO is preferred. +- **progressiveAlgorithm PA**: specify progressive algorithm - allowed values: *PROGRESSIVE_GIANT, DYNAMIC_PROGRESSIVE_GIANT, TOPK, RECIPROCAL_TOPK, RANDOM* - **budget** BU: the input budget. -The command line options will overwrite the corresponding options of the configuration file. \ No newline at end of file +The command line options will overwrite the corresponding options of the configuration file. + +--- +## Publication + +*Progressive, Holistic Geospatial Interlinking. George Papadakis, Georgios Mandilaras, Nikos Mamoulis, Manolis Koubarakis. In Proceedings of The Web Conference 2021* \ No newline at end of file diff --git a/config/LINEARWATER-AREAWATER.yaml b/config/LINEARWATER-AREAWATER.yaml new file mode 100644 index 00000000..66a9104b --- /dev/null +++ b/config/LINEARWATER-AREAWATER.yaml @@ -0,0 +1,17 @@ + +source: + path: "/home/gmandi/Documents/Extreme-Earth/Datasets/SPATIAL-HADOOP/LINEARWATER_100K.tsv" + realIdField: "id" + geometryField: "WKT" + +target: + path: "/home/gmandi/Documents/Extreme-Earth/Datasets/SPATIAL-HADOOP/AREAWATER_100K.tsv" + realIdField: "id" + geometryField: "WKT" + +relation: "DE9IM" + +configurations: + thetaGranularity: "avg" + secondaryWS: "MBRO" + mainWS: "JS" \ No newline at end of file diff --git a/config/configurationTemplate.yaml b/config/configurationTemplate.yaml index c77f6536..c56dadf5 100644 --- a/config/configurationTemplate.yaml +++ b/config/configurationTemplate.yaml @@ -20,6 +20,7 @@ configurations: partitions: "number of partitions" thetaGranularity: "avg" # define the extend of dynamic tiling based on the geometries of source - Experiments have shown that "avg" is the best option gridType: "spatial paritioner grid Type algorithm" # allowed values: KDBTREE, QUADTREE - weightingScheme: "WS" # specify weighting scheme - allowed values: CF, JS, MBR_INTERSECTION, PEARSON_X2, POINTS + mainWS: "WS" # specify weighting scheme - allowed values: CF, JS, MBRO, PEARSON_X2, ISP + secondaryWS: "WS" progressiveAlgorithm : "PA" # specify progressive algorithm - allowed values: PROGRESSIVE_GIANT, TOPK, RECIPROCAL_TOPK, GEOMETRY_CENTRIC, RANDOM budget: "BU" # the budget of progressive algorithms \ No newline at end of file diff --git a/src/main/scala/dataModel/ComparisonPQ.scala b/src/main/scala/dataModel/ComparisonPQ.scala deleted file mode 100644 index 835b6c61..00000000 --- a/src/main/scala/dataModel/ComparisonPQ.scala +++ /dev/null @@ -1,63 +0,0 @@ -package dataModel - -import org.spark_project.guava.collect.MinMaxPriorityQueue -import scala.collection.JavaConverters._ - -/** - * a wrapper of guava min-max PQ. - * - * @param maxSize max size of PQ - * @tparam T the type of input items - */ -case class ComparisonPQ[T](maxSize: Int){ - - var minW: Float = 0f - val ordering: Ordering[(Float, T)] = Ordering.by[(Float, T), Float](_._1).reverse - lazy val pq: MinMaxPriorityQueue[(Float, T)] = MinMaxPriorityQueue.orderedBy(ordering).maximumSize(maxSize+1).create() - - /** - * if w is smaller than minW then omit it. - * Otherwise, insert it into PQ and if PQ exceed max size, - * remove item with the smallest weight and update minW - * - * @param w the weight of the item - * @param item item to insert - */ - def enqueue(w: Float, item: T): Unit ={ - if (minW < w) { - pq.add((w, item)) - if (pq.size > maxSize) - minW = pq.pollLast()._1 - } - } - - def enqueueAll(items: Iterator[(T, Float)]): Unit = items.foreach{ case(item, w) => enqueue(w, item)} - - def take(n: Option[Int]): Iterator[(Float, T)] = - n match { - case Some(n) => Iterator.continually{ pq.pollFirst() }.take(n) - case None => Iterator.continually{ pq.pollFirst() }.takeWhile(_ => !pq.isEmpty) - } - - def take(n: Int): Iterator[(Float, T)] = take(Option(n)) - - def dequeueAll: Iterator[(Float, T)] = take(None) - - def clear(): Unit = { - pq.clear() - minW = 0f - } - - def isEmpty: Boolean = pq.isEmpty - - def size(): Int = pq.size() - - def dequeueHead(): (Float, T) = pq.pollFirst() - - def dequeue(): (Float, T) = pq.pollLast() - - def iterator(): Iterator[(Float, T)] = pq.iterator().asScala -} - - - diff --git a/src/main/scala/experiments/WellBalancedExp.scala b/src/main/scala/experiments/BalancingExp.scala similarity index 54% rename from src/main/scala/experiments/WellBalancedExp.scala rename to src/main/scala/experiments/BalancingExp.scala index eec5bca0..7d4dd457 100644 --- a/src/main/scala/experiments/WellBalancedExp.scala +++ b/src/main/scala/experiments/BalancingExp.scala @@ -2,24 +2,23 @@ package experiments import java.util.Calendar -import geospatialInterlinking.IndexBasedMatching -import geospatialInterlinking.IndexBasedMatching -import geospatialInterlinking.progressive.ProgressiveAlgorithmsFactory +import model.Entity +import interlinkers.{GIAnt, IndexedJoinInterlinking} import org.apache.log4j.{Level, LogManager, Logger} +import org.apache.spark.rdd.RDD import org.apache.spark.serializer.KryoSerializer import org.apache.spark.sql.SparkSession import org.apache.spark.storage.StorageLevel import org.apache.spark.{SparkConf, SparkContext, TaskContext} import org.datasyslab.geospark.serde.GeoSparkKryoRegistrator -import utils.Constants.ProgressiveAlgorithm.ProgressiveAlgorithm -import utils.Constants.{GridType, ProgressiveAlgorithm, Relation, WeightingScheme} -import utils.Constants.WeightingScheme.WeightingScheme -import utils.{ConfigurationParser, SpatialReader, Utils} +import utils.Constants.{GridType, Relation} +import utils.readers.Reader +import utils.{ConfigurationParser, Utils} -object WellBalancedExp { +object BalancingExp { - implicit class TuppleAdd(t: (Int, Int, Int, Int, Int, Int, Int, Int, Int, Int, Int)) { + implicit class TupleAdd(t: (Int, Int, Int, Int, Int, Int, Int, Int, Int, Int, Int)) { def +(p: (Int, Int, Int, Int, Int, Int, Int, Int, Int, Int, Int)) : (Int, Int, Int, Int, Int, Int, Int, Int, Int, Int, Int) = (p._1 + t._1, p._2 + t._2, p._3 +t._3, p._4+t._4, p._5+t._5, p._6+t._6, p._7+t._7, p._8+t._8, p._9+t._9, p._10+t._10, p._11+t._11) @@ -46,20 +45,8 @@ object WellBalancedExp { case Nil => map case ("-c" | "-conf") :: value :: tail => nextOption(map ++ Map("conf" -> value), tail) - case ("-f" | "-fraction") :: value :: tail => - nextOption(map ++ Map("fraction" -> value), tail) - case ("-s" | "-stats") :: tail => - nextOption(map ++ Map("stats" -> "true"), tail) - case "-auc" :: tail => - nextOption(map ++ Map("auc" -> "true"), tail) case ("-p" | "-partitions") :: value :: tail => nextOption(map ++ Map("partitions" -> value), tail) - case ("-b" | "-budget") :: value :: tail => - nextOption(map ++ Map("budget" -> value), tail) - case "-ws" :: value :: tail => - nextOption(map ++ Map("ws" -> value), tail) - case "-ma" :: value :: tail => - nextOption(map ++ Map("ma" -> value), tail) case "-gt" :: value :: tail => nextOption(map ++ Map("gt" -> value), tail) case _ :: tail => @@ -80,41 +67,67 @@ object WellBalancedExp { val confPath = options("conf") val conf = ConfigurationParser.parse(confPath) val partitions: Int = if (options.contains("partitions")) options("partitions").toInt else conf.getPartitions - val budget: Int = if (options.contains("budget")) options("budget").toInt else conf.getBudget - val ws: WeightingScheme = if (options.contains("ws")) WeightingScheme.withName(options("ws")) else conf.getWeightingScheme - val ma: ProgressiveAlgorithm = if (options.contains("ma")) ProgressiveAlgorithm.withName(options("ma")) else conf.getProgressiveAlgorithm val gridType: GridType.GridType = if (options.contains("gt")) GridType.withName(options("gt").toString) else conf.getGridType val relation = conf.getRelation + val startTime = Calendar.getInstance().getTimeInMillis - log.info("DS-JEDAI: Input Budget: " + budget) - log.info("DS-JEDAI: Weighting Scheme: " + ws.toString) - val startTime = Calendar.getInstance().getTimeInMillis - val reader = SpatialReader(conf.source, partitions, gridType) - val sourceRDD = reader.load() + // reading source dataset + val reader = Reader(partitions, gridType) + val sourceRDD: RDD[(Int, Entity)] = reader.loadSource(conf.source) sourceRDD.persist(StorageLevel.MEMORY_AND_DISK) - Utils(sourceRDD.map(_._2.mbr), conf.getTheta, reader.partitionsZones) + val sourcePartitions: RDD[(Int, Iterator[Entity])] = sourceRDD.mapPartitions(si => Iterator((TaskContext.getPartitionId(), si.map(_._2)))) + - val targetRDD = reader.load(conf.target) + // reading target dataset + val targetRDD: RDD[(Int, Entity)] = reader.load(conf.target) match { + case Left(e) => + log.error("Paritioner is not initialized, call first the `loadSource`.") + e.printStackTrace() + System.exit(1) + null + case Right(rdd) => rdd + } val partitioner = reader.partitioner - val partitionEntitiesAVG = sourceRDD.mapPartitions(si => Iterator(si.toArray.length)).sum()/sourceRDD.getNumPartitions - val balancedSource = sourceRDD.mapPartitions(si => Iterator(si.toArray)).filter(_.length < partitionEntitiesAVG*3).flatMap(_.toIterator) - val overloadedSource = sourceRDD.mapPartitions(si => Iterator(si.toArray)).filter(_.length >= partitionEntitiesAVG*3).flatMap(_.toIterator) - val overloadedPartitionIds = overloadedSource.map(_ => TaskContext.getPartitionId()).collect().toSet - val balancedTarget = targetRDD.mapPartitions(ti => Iterator((TaskContext.getPartitionId(), ti))).filter{ case (pid, _) => !overloadedPartitionIds.contains(pid) }.flatMap(_._2) - val overloadedTarget = targetRDD.mapPartitions(ti => Iterator((TaskContext.getPartitionId(), ti))).filter{ case (pid, _) => overloadedPartitionIds.contains(pid) }.flatMap(_._2) - log.info("DS-JEDAI: Overloaded partitions: " + overloadedPartitionIds.size) + val entitiesPerPartitions: Seq[(Int, Int)] = sourcePartitions.map{ case (pid, si) => (pid, si.size)}.collect() + + // find outlier partitions + val mean = entitiesPerPartitions.map(_._2).sum/sourceRDD.getNumPartitions + val variance = entitiesPerPartitions.map(_._2.toDouble).map(x => math.pow(x - mean, 2)).sum / entitiesPerPartitions.length + val std = Math.sqrt(variance) + val zScore: (Int, Int) => (Int, Double) = (p: Int, x: Int) => (p, (x - mean).toDouble/std) - val matchingStartTime = Calendar.getInstance().getTimeInMillis + val outliers = entitiesPerPartitions.map{case (p, x) => zScore(p, x)}.filter(_._2 > 2.5) + val outlierPartitions = outliers.map(_._1).toSet + log.info("DS-JEDAI: Overloaded partitions: " + outlierPartitions.size) - val pm = ProgressiveAlgorithmsFactory.get(ma, sourceRDD, targetRDD, partitioner, budget, ws) - val ibm = IndexBasedMatching(overloadedSource.map(_._2), overloadedTarget.map(_._2), Utils.getTheta) + val goodSourceRDD = sourceRDD.filter(s => !outlierPartitions.contains(s._1)) + val badSourceRDD = sourceRDD.filter(s => outlierPartitions.contains(s._1)) + + val goodTargetRDD = targetRDD.filter(t => !outlierPartitions.contains(t._1)) + val badTargetRDD = targetRDD.filter(t => outlierPartitions.contains(t._1)) + + val giant = GIAnt(goodSourceRDD, goodTargetRDD, partitioner) + val iji = IndexedJoinInterlinking(badSourceRDD, badTargetRDD, Utils.getTheta) if (relation.equals(Relation.DE9IM)) { - val (totalContains, totalCoveredBy, totalCovers, totalCrosses, totalEquals, totalIntersects, - totalOverlaps, totalTouches, totalWithin, intersectingPairs, interlinkedGeometries) = pm.countAllRelations + ibm.countAllRelations + val giantStartTime = Calendar.getInstance().getTimeInMillis + val giantResults = giant.countAllRelations + val giantEndTime = Calendar.getInstance().getTimeInMillis + log.info("DS-JEDAI: GIA.nt Time: " + (giantEndTime - giantStartTime) / 1000.0) + log.info("DS-JEDAI: GIA.nt Interlinked Geometries: " + giantResults._11) + log.info("-----------------------------------------------------------\n") + + val indexedJoinStartTime = Calendar.getInstance().getTimeInMillis + val indexedJoinResults = iji.countAllRelations + val indexedJoinEndTime = Calendar.getInstance().getTimeInMillis + log.info("DS-JEDAI: INDEXED-JOIN Time: " + (indexedJoinEndTime - indexedJoinStartTime) / 1000.0) + log.info("DS-JEDAI:INDEXED-JOIN Interlinked Geometries: " + indexedJoinResults._11) + log.info("-----------------------------------------------------------\n") + val (totalContains, totalCoveredBy, totalCovers, totalCrosses, totalEquals, totalIntersects, + totalOverlaps, totalTouches, totalWithin, intersectingPairs, interlinkedGeometries) = giantResults + indexedJoinResults val totalRelations = totalContains + totalCoveredBy + totalCovers + totalCrosses + totalEquals + totalIntersects + totalOverlaps + totalTouches + totalWithin log.info("DS-JEDAI: Total Intersecting Pairs: " + intersectingPairs) @@ -132,11 +145,9 @@ object WellBalancedExp { log.info("DS-JEDAI: Total Relations Discovered: " + totalRelations) } else{ - val totalMatches = pm.countRelation(relation) + ibm.countRelation(relation) + val totalMatches = giant.countRelation(relation) + iji.countRelation(relation) log.info("DS-JEDAI: " + relation.toString +": " + totalMatches) } - val matchingEndTime = Calendar.getInstance().getTimeInMillis - log.info("DS-JEDAI: Interlinking Time: " + (matchingEndTime - matchingStartTime) / 1000.0) val endTime = Calendar.getInstance() log.info("DS-JEDAI: Total Execution Time: " + (endTime.getTimeInMillis - startTime) / 1000.0) diff --git a/src/main/scala/experiments/EvaluationExp.scala b/src/main/scala/experiments/EvaluationExp.scala index 55390310..d61ee49d 100644 --- a/src/main/scala/experiments/EvaluationExp.scala +++ b/src/main/scala/experiments/EvaluationExp.scala @@ -1,9 +1,9 @@ package experiments -import dataModel.Entity -import geospatialInterlinking.GIAnt -import geospatialInterlinking.progressive.ProgressiveAlgorithmsFactory +import model.Entity +import interlinkers.GIAnt +import interlinkers.progressive.ProgressiveAlgorithmsFactory import org.apache.log4j.{Level, LogManager, Logger} import org.apache.spark.rdd.RDD import org.apache.spark.{Partitioner, SparkConf, SparkContext} @@ -15,7 +15,8 @@ import utils.Constants.ProgressiveAlgorithm.ProgressiveAlgorithm import utils.Constants.Relation.Relation import utils.Constants.WeightingScheme.WeightingScheme import utils.Constants.{GridType, ProgressiveAlgorithm, Relation, WeightingScheme} -import utils.{ConfigurationParser, SpatialReader, Utils} +import utils.readers.Reader +import utils.{ConfigurationParser, Utils} object EvaluationExp { @@ -24,7 +25,7 @@ object EvaluationExp { log.setLevel(Level.INFO) var budget: Int = 10000 - var takeBudget: Seq[Int] = Seq(5000000, 10000000) + var takeBudget: Seq[Int] = Seq(500000, 1000000) var relation: Relation = Relation.DE9IM def main(args: Array[String]): Unit = { @@ -52,6 +53,12 @@ object EvaluationExp { nextOption(map ++ Map("budget" -> value), tail) case "-gt" :: value :: tail => nextOption(map ++ Map("gt" -> value), tail) + case "-tv" :: value :: tail => + nextOption(map ++ Map("tv" -> value), tail) + case "-qp" :: value :: tail => + nextOption(map ++ Map("qp" -> value), tail) + case "-pa" :: value :: tail => + nextOption(map ++ Map("pa" -> value), tail) case _ :: tail => log.warn("DS-JEDAI: Unrecognized argument") nextOption(map, tail) @@ -77,33 +84,61 @@ object EvaluationExp { log.info("DS-JEDAI: Input Budget: " + budget) - val reader = SpatialReader(conf.source, partitions, gridType) - val sourceRDD = reader.load() + val reader = Reader(partitions, gridType) + val sourceRDD: RDD[(Int, Entity)] = reader.loadSource(conf.source) sourceRDD.persist(StorageLevel.MEMORY_AND_DISK) - Utils(sourceRDD.map(_._2.mbr), conf.getTheta, reader.partitionsZones) - log.info(s"DS-JEDAI: Source was loaded into ${sourceRDD.getNumPartitions} partitions") - val targetRDD = reader.load(conf.target) + val targetRDD: RDD[(Int, Entity)] = reader.load(conf.target) match { + case Left(e) => + log.error("Paritioner is not initialized, call first the `loadSource`.") + e.printStackTrace() + System.exit(1) + null + case Right(rdd) => rdd + } val partitioner = reader.partitioner - val (_, _, _, _, _, _, _, _, _, totalVerifications, totalRelatedPairs) = GIAnt(sourceRDD, targetRDD, partitioner).countAllRelations + Utils(sourceRDD.map(_._2.mbr), conf.getTheta, reader.partitionsZones) + log.info(s"DS-JEDAI: Source was loaded into ${sourceRDD.getNumPartitions} partitions") + + val (totalVerifications, totalRelatedPairs) = + if (options.contains("tv") && options.contains("qp")) + (options("tv").toInt, options("qp").toInt) + else { + val g = GIAnt(sourceRDD, targetRDD, partitioner).countAllRelations + (g._10, g._11) + } log.info("DS-JEDAI: Total Verifications: " + totalVerifications) - log.info("DS-JEDAI: Total Interlinked Geometries: " + totalRelatedPairs) + log.info("DS-JEDAI: Total Qualifying Pairs: " + totalRelatedPairs) log.info("\n") - printResults(sourceRDD, targetRDD, partitioner, totalRelatedPairs, ProgressiveAlgorithm.RANDOM, WeightingScheme.CF) - val algorithms = Seq(ProgressiveAlgorithm.PROGRESSIVE_GIANT, ProgressiveAlgorithm.TOPK, ProgressiveAlgorithm.RECIPROCAL_TOPK, ProgressiveAlgorithm.GEOMETRY_CENTRIC) - val weightingSchemes = Seq(WeightingScheme.MBR_INTERSECTION, WeightingScheme.POINTS) + //printResults(sourceRDD, targetRDD, partitioner, totalRelatedPairs, ProgressiveAlgorithm.RANDOM, (WeightingScheme.CF, None)) + + val algorithms: Seq[ProgressiveAlgorithm] = + if (options.contains("pa")) + options("pa").split(",").filter(ProgressiveAlgorithm.exists).map(ProgressiveAlgorithm.withName).toSeq + else + Seq(ProgressiveAlgorithm.DYNAMIC_PROGRESSIVE_GIANT, ProgressiveAlgorithm.PROGRESSIVE_GIANT, ProgressiveAlgorithm.TOPK, ProgressiveAlgorithm.RECIPROCAL_TOPK) + + val weightingSchemes = Seq((WeightingScheme.JS, Option(WeightingScheme.MBRO)), + (WeightingScheme.CF, None), + (WeightingScheme.JS, None), + (WeightingScheme.PEARSON_X2,None), + (WeightingScheme.MBRO, None), + (WeightingScheme.ISP, None), + (WeightingScheme.JS, Option(WeightingScheme.MBRO)), + (WeightingScheme.PEARSON_X2, Option(WeightingScheme.MBRO))) + for (a <- algorithms ; ws <- weightingSchemes) printResults(sourceRDD, targetRDD, partitioner, totalRelatedPairs, a, ws) } def printResults(source:RDD[(Int, Entity)], target:RDD[(Int, Entity)], partitioner: Partitioner, totalRelations: Int, - ma: ProgressiveAlgorithm, ws: WeightingScheme, n: Int = 10): Unit = { + ma: ProgressiveAlgorithm, ws: (WeightingScheme, Option[WeightingScheme]), n: Int = 10): Unit = { - val pma = ProgressiveAlgorithmsFactory.get(ma, source, target, partitioner, budget, ws) + val pma = ProgressiveAlgorithmsFactory.get(ma, source, target, partitioner, budget, ws._1, ws._2) val results = pma.evaluate(relation, n, totalRelations, takeBudget) results.zip(takeBudget).foreach { case ((pgr, qp, verifications, (verificationSteps, qualifiedPairsSteps)), b) => diff --git a/src/main/scala/experiments/GiantExp.scala b/src/main/scala/experiments/GiantExp.scala index 72103bc1..8b6e1409 100644 --- a/src/main/scala/experiments/GiantExp.scala +++ b/src/main/scala/experiments/GiantExp.scala @@ -3,15 +3,18 @@ package experiments import java.util.Calendar -import geospatialInterlinking.GIAnt +import interlinkers.GIAnt +import model.Entity import org.apache.log4j.{Level, LogManager, Logger} +import org.apache.spark.rdd.RDD import org.apache.spark.serializer.KryoSerializer import org.apache.spark.sql.SparkSession import org.apache.spark.storage.StorageLevel import org.apache.spark.{SparkConf, SparkContext} import org.datasyslab.geospark.serde.GeoSparkKryoRegistrator import utils.Constants.{GridType, Relation} -import utils.{ConfigurationParser, SpatialReader, Utils} +import utils.readers.Reader +import utils.{ConfigurationParser, Utils} object GiantExp { @@ -40,6 +43,8 @@ object GiantExp { nextOption(map ++ Map("partitions" -> value), tail) case "-gt" :: value :: tail => nextOption(map ++ Map("gt" -> value), tail) + case "-s" :: tail => + nextOption(map ++ Map("stats" -> "true"), tail) case _ :: tail => log.warn("DS-JEDAI: Unrecognized argument") nextOption(map, tail) @@ -60,17 +65,36 @@ object GiantExp { val partitions: Int = if (options.contains("partitions")) options("partitions").toInt else conf.getPartitions val gridType: GridType.GridType = if (options.contains("gt")) GridType.withName(options("gt").toString) else conf.getGridType val relation = conf.getRelation + val printCount = options.getOrElse("stats", "false").toBoolean val startTime = Calendar.getInstance().getTimeInMillis - val reader = SpatialReader(conf.source, partitions, gridType) - val sourceRDD = reader.load() + // reading source dataset + val reader = Reader(partitions, gridType, printCount) + val sourceRDD: RDD[(Int, Entity)] = reader.loadSource(conf.source) sourceRDD.persist(StorageLevel.MEMORY_AND_DISK) + val sourceCount = reader.counter + + // reading target dataset + val targetRDD: RDD[(Int, Entity)] = reader.load(conf.target) match { + case Left(e) => + log.error("Paritioner is not initialized, call first the `loadSource`.") + e.printStackTrace() + System.exit(1) + null + case Right(rdd) => rdd + } + val targetCount = reader.counter + val partitioner = reader.partitioner + Utils(sourceRDD.map(_._2.mbr), conf.getTheta, reader.partitionsZones) log.info(s"DS-JEDAI: Source was loaded into ${sourceRDD.getNumPartitions} partitions") - val targetRDD = reader.load(conf.target) - val partitioner = reader.partitioner + if(printCount){ + log.info(s"DS-JEDAI: Source geometries: $sourceCount") + log.info(s"DS-JEDAI: Target geometries: $targetCount") + log.info(s"DS-JEDAI: Cartesian: ${sourceCount*targetCount}") + } val matchingStartTime = Calendar.getInstance().getTimeInMillis val giant = GIAnt(sourceRDD, targetRDD, partitioner) @@ -92,7 +116,7 @@ object GiantExp { log.info("DS-JEDAI: OVERLAPS: " + totalOverlaps) log.info("DS-JEDAI: TOUCHES: " + totalTouches) log.info("DS-JEDAI: WITHIN: " + totalWithin) - log.info("DS-JEDAI: Total Relations Discovered: " + totalRelations) + log.info("DS-JEDAI: Total Discovered Relations: " + totalRelations) } else{ val totalMatches = giant.countRelation(relation) diff --git a/src/main/scala/experiments/ProgressiveExp.scala b/src/main/scala/experiments/ProgressiveExp.scala index 554f9ddb..b84af188 100644 --- a/src/main/scala/experiments/ProgressiveExp.scala +++ b/src/main/scala/experiments/ProgressiveExp.scala @@ -2,8 +2,10 @@ package experiments import java.util.Calendar -import geospatialInterlinking.progressive.ProgressiveAlgorithmsFactory +import interlinkers.progressive.ProgressiveAlgorithmsFactory +import model.Entity import org.apache.log4j.{Level, LogManager, Logger} +import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.serializer.KryoSerializer import org.apache.spark.sql.SparkSession @@ -12,7 +14,8 @@ import org.datasyslab.geospark.serde.GeoSparkKryoRegistrator import utils.Constants.ProgressiveAlgorithm.ProgressiveAlgorithm import utils.Constants.{GridType, ProgressiveAlgorithm, Relation, WeightingScheme} import utils.Constants.WeightingScheme.WeightingScheme -import utils.{ConfigurationParser, SpatialReader, Utils} +import utils.readers.Reader +import utils.{ConfigurationParser, Utils} object ProgressiveExp { @@ -39,12 +42,16 @@ object ProgressiveExp { nextOption(map ++ Map("conf" -> value), tail) case ("-b" | "-budget") :: value :: tail => nextOption(map ++ Map("budget" -> value), tail) - case "-ws" :: value :: tail => - nextOption(map ++ Map("ws" -> value), tail) + case "-mws" :: value :: tail => + nextOption(map ++ Map("mws" -> value), tail) + case "-sws" :: value :: tail => + nextOption(map ++ Map("sws" -> value), tail) case "-pa" :: value :: tail => nextOption(map ++ Map("pa" -> value), tail) case "-gt" :: value :: tail => nextOption(map ++ Map("gt" -> value), tail) + case ("-p" | "-partitions") :: value :: tail => + nextOption(map ++ Map("partitions" -> value), tail) case _ :: tail => log.warn("DS-JEDAI: Unrecognized argument") nextOption(map, tail) @@ -64,28 +71,38 @@ object ProgressiveExp { val conf = ConfigurationParser.parse(confPath) val partitions: Int = if (options.contains("partitions")) options("partitions").toInt else conf.getPartitions val budget: Int = if (options.contains("budget")) options("budget").toInt else conf.getBudget - val ws: WeightingScheme = if (options.contains("ws")) WeightingScheme.withName(options("ws")) else conf.getWeightingScheme + val mainWS: WeightingScheme = if (options.contains("mws")) WeightingScheme.withName(options("mws")) else conf.getMainWS + val secondaryWS: Option[WeightingScheme] = if (options.contains("sws")) Option(WeightingScheme.withName(options("sws"))) else conf.getSecondaryWS val pa: ProgressiveAlgorithm = if (options.contains("pa")) ProgressiveAlgorithm.withName(options("pa")) else conf.getProgressiveAlgorithm val gridType: GridType.GridType = if (options.contains("gt")) GridType.withName(options("gt").toString) else conf.getGridType val relation = conf.getRelation log.info("DS-JEDAI: Input Budget: " + budget) - log.info("DS-JEDAI: Weighting Scheme: " + ws.toString) + log.info("DS-JEDAI: Main Weighting Scheme: " + mainWS.toString) + if (secondaryWS.isDefined) log.info("DS-JEDAI: Secondary Weighting Scheme: " + secondaryWS.get.toString) log.info("DS-JEDAI: Progressive Algorithm: " + pa.toString) val startTime = Calendar.getInstance().getTimeInMillis - val reader = SpatialReader(conf.source, partitions, gridType) - val sourceRDD = reader.load() + val reader = Reader(partitions, gridType) + val sourceRDD: RDD[(Int, Entity)] = reader.loadSource(conf.source) sourceRDD.persist(StorageLevel.MEMORY_AND_DISK) - Utils(sourceRDD.map(_._2.mbr), conf.getTheta, reader.partitionsZones) - log.info(s"DS-JEDAI: Source was loaded into ${sourceRDD.getNumPartitions} partitions") - val targetRDD = reader.load(conf.target) + val targetRDD: RDD[(Int, Entity)] = reader.load(conf.target) match { + case Left(e) => + log.error("Paritioner is not initialized, call first the `loadSource`.") + e.printStackTrace() + System.exit(1) + null + case Right(rdd) => rdd + } val partitioner = reader.partitioner + Utils(sourceRDD.map(_._2.mbr), conf.getTheta, reader.partitionsZones) + log.info(s"DS-JEDAI: Source was loaded into ${sourceRDD.getNumPartitions} partitions") + val matchingStartTime = Calendar.getInstance().getTimeInMillis - val method = ProgressiveAlgorithmsFactory.get(pa, sourceRDD, targetRDD, partitioner, budget, ws) + val method = ProgressiveAlgorithmsFactory.get(pa, sourceRDD, targetRDD, partitioner, budget, mainWS, secondaryWS) if (relation.equals(Relation.DE9IM)) { val (totalContains, totalCoveredBy, totalCovers, totalCrosses, totalEquals, totalIntersects, totalOverlaps, totalTouches, totalWithin, verifications, qp) = method.countAllRelations diff --git a/src/main/scala/geospatialInterlinking/IndexBasedMatching.scala b/src/main/scala/geospatialInterlinking/IndexBasedMatching.scala deleted file mode 100644 index d54f2cda..00000000 --- a/src/main/scala/geospatialInterlinking/IndexBasedMatching.scala +++ /dev/null @@ -1,70 +0,0 @@ -package geospatialInterlinking - -import dataModel.{Entity, IM} -import org.apache.spark.TaskContext -import org.apache.spark.rdd.RDD -import utils.Constants.Relation -import utils.Constants.Relation.Relation -import utils.Constants.WeightingScheme.WeightingScheme - -import scala.collection.mutable.ListBuffer - -case class IndexBasedMatching(source:RDD[Entity], target:RDD[Entity], thetaXY: (Double, Double)) extends GeospatialInterlinkingT { - - val joinedRDD: RDD[(Int, (Iterable[Entity], Iterable[Entity]))] = null - val ws: WeightingScheme = null - - val filteringFunction: ((Entity, Int), (Entity, Int), (Int, Int), Relation) => Boolean = - (e1: (Entity, Int), e2: (Entity, Int), c: (Int, Int), r: Relation) => e1._2 == e2._2 && e1._1.filter(e2._1, r, c, thetaXY) - - /** - * First index the Source and then use the index to find the comparisons with target's entities. - * Filter the redundant comparisons using testMBR and RF - * - * @param relation the examining relation - * @return an RDD containing the matching pairs - */ - def relate(relation: Relation): RDD[(String, String)] = { - - val indexedSource = source - .map(se => (se.index(thetaXY), (se, TaskContext.getPartitionId()))) - .flatMap{case (indices, (se, pid)) => indices.map(i => (i, ListBuffer((se, pid))))} - .reduceByKey(_ ++ _) - val partitioner = indexedSource.partitioner.get - - val indexedTarget = target - .map(se => (se.index(thetaXY), (se, TaskContext.getPartitionId()))) - .flatMap{case (indices, (se, pid)) => indices.map(i => (i, ListBuffer((se, pid))))} - .reduceByKey(partitioner, _ ++ _) - - indexedSource.leftOuterJoin(indexedTarget, partitioner) - .filter(_._2._2.isDefined) - .flatMap { case (c: (Int, Int), (s: ListBuffer[(Entity, Int)], optT: Option[ListBuffer[(Entity, Int)]])) => - for (e1 <- s; e2 <- optT.get; if filteringFunction(e1, e2, c, relation)) - yield (e1._1.originalID, e2._1.originalID) - } - } - - - - def getDE9IM: RDD[IM] = { - val indexedSource = source - .map(se => (se.index(thetaXY), (se, TaskContext.getPartitionId()))) - .flatMap{case (indices, (se, pid)) => indices.map(i => (i, ListBuffer((se, pid))))} - .reduceByKey(_ ++ _) - val partitioner = indexedSource.partitioner.get - - val indexedTarget = target - .map(se => (se.index(thetaXY), (se, TaskContext.getPartitionId()))) - .flatMap{case (indices, (se, pid)) => indices.map(i => (i, ListBuffer((se, pid))))} - .reduceByKey(partitioner, _ ++ _) - - indexedSource.leftOuterJoin(indexedTarget, partitioner) - .filter(_._2._2.isDefined) - .flatMap { case (c: (Int, Int), (s: ListBuffer[(Entity, Int)], optT: Option[ListBuffer[(Entity, Int)]])) => - for (e1 <- s; e2 <- optT.get; if filteringFunction(e1, e2, c, Relation.DE9IM)) yield IM(e1._1, e2._1) - } - } - - -} diff --git a/src/main/scala/geospatialInterlinking/progressive/GeometryCentric.scala b/src/main/scala/geospatialInterlinking/progressive/GeometryCentric.scala deleted file mode 100644 index 7f7b5d1f..00000000 --- a/src/main/scala/geospatialInterlinking/progressive/GeometryCentric.scala +++ /dev/null @@ -1,68 +0,0 @@ -package geospatialInterlinking.progressive - -import dataModel.{ComparisonPQ, Entity, MBR} -import org.apache.spark.Partitioner -import org.apache.spark.rdd.RDD -import utils.Constants.Relation.Relation -import utils.Constants.WeightingScheme.WeightingScheme -import utils.Utils - - -case class GeometryCentric(joinedRDD: RDD[(Int, (Iterable[Entity], Iterable[Entity]))], - thetaXY: (Double, Double), ws: WeightingScheme, budget: Int, sourceCount: Long) - extends ProgressiveGeospatialInterlinkingT { - - - /** - * For each target entity we keep only the top K comparisons, according to a weighting scheme. - * Then we assign the top K comparisons a common weight, which is their avg - * Based on this weight we prioritize their execution. - * - * @return an RDD of Intersection Matrices - */ - def prioritize(source: Array[Entity], target: Array[Entity], partition: MBR, relation: Relation): ComparisonPQ[(Int, Int)] = { - val sourceIndex = index(source) - val filterIndices = (b: (Int, Int)) => sourceIndex.contains(b) - val k = (math.ceil(budget / target.length).toInt + 1) * 2 // +1 to avoid k=0 - val targetPQ: ComparisonPQ[Int] = ComparisonPQ[Int](k) - val partitionPQ: ComparisonPQ[(Int, Int)] = ComparisonPQ[(Int, Int)](budget) - - target - .indices - .foreach { j => - var wSum = 0f - val e2 = target(j) - e2.index(thetaXY, filterIndices) - .foreach { block => - sourceIndex.get(block) - .filter(i => source(i).filter(e2, relation, block, thetaXY, Some(partition))) - .foreach { i => - val e1 = source(i) - val w = getWeight(e1, e2) - wSum += w - targetPQ.enqueue(w, i) - } - } - if (! targetPQ.isEmpty) { - val pqSize = targetPQ.size() - val topK = targetPQ.dequeueAll.map(_._2) - val weight = wSum / pqSize - partitionPQ.enqueueAll(topK.map(i => ((i, j), weight))) - targetPQ.clear() - } - } - partitionPQ - } -} - - -object GeometryCentric{ - - def apply(source:RDD[(Int, Entity)], target:RDD[(Int, Entity)], ws: WeightingScheme, budget: Int, partitioner: Partitioner) - : GeometryCentric ={ - val thetaXY = Utils.getTheta - val sourceCount = Utils.getSourceCount - val joinedRDD = source.cogroup(target, partitioner) - GeometryCentric(joinedRDD, thetaXY, ws, budget, sourceCount) - } -} \ No newline at end of file diff --git a/src/main/scala/geospatialInterlinking/progressive/ProgressiveAlgorithmsFactory.scala b/src/main/scala/geospatialInterlinking/progressive/ProgressiveAlgorithmsFactory.scala deleted file mode 100644 index ffe9033b..00000000 --- a/src/main/scala/geospatialInterlinking/progressive/ProgressiveAlgorithmsFactory.scala +++ /dev/null @@ -1,30 +0,0 @@ -package geospatialInterlinking.progressive - -import dataModel.Entity -import org.apache.log4j.{LogManager, Logger} -import org.apache.spark.Partitioner -import org.apache.spark.rdd.RDD -import utils.Constants.ProgressiveAlgorithm.ProgressiveAlgorithm -import utils.Constants.WeightingScheme.WeightingScheme -import utils.Constants.{ProgressiveAlgorithm, WeightingScheme} - -object ProgressiveAlgorithmsFactory { - - - def get(matchingAlgorithm: ProgressiveAlgorithm, source: RDD[(Int, Entity)], target: RDD[(Int, Entity)], - partitioner: Partitioner, budget: Int = 0, ws: WeightingScheme = WeightingScheme.JS): ProgressiveGeospatialInterlinkingT ={ - - matchingAlgorithm match { - case ProgressiveAlgorithm.RANDOM => - RandomScheduling(source, target, ws, budget, partitioner) - case ProgressiveAlgorithm.GEOMETRY_CENTRIC => - GeometryCentric(source, target, ws, budget, partitioner) - case ProgressiveAlgorithm.TOPK => - TopKPairs(source, target, ws, budget, partitioner) - case ProgressiveAlgorithm.RECIPROCAL_TOPK => - ReciprocalTopK(source, target, ws, budget, partitioner) - case ProgressiveAlgorithm.PROGRESSIVE_GIANT | _ => - ProgressiveGIAnt(source, target, ws, budget, partitioner) - } - } -} diff --git a/src/main/scala/geospatialInterlinking/progressive/ProgressiveGeospatialInterlinkingT.scala b/src/main/scala/geospatialInterlinking/progressive/ProgressiveGeospatialInterlinkingT.scala deleted file mode 100644 index a9e2420a..00000000 --- a/src/main/scala/geospatialInterlinking/progressive/ProgressiveGeospatialInterlinkingT.scala +++ /dev/null @@ -1,140 +0,0 @@ -package geospatialInterlinking.progressive - -import dataModel.{ComparisonPQ, Entity, IM, MBR} -import geospatialInterlinking.GeospatialInterlinkingT -import org.apache.spark.rdd.RDD -import org.apache.spark.storage.StorageLevel -import utils.Constants.Relation.Relation -import utils.Constants.WeightingScheme.WeightingScheme -import utils.Constants.Relation -import utils.Utils - -import scala.collection.mutable -import scala.collection.mutable.ListBuffer - -trait ProgressiveGeospatialInterlinkingT extends GeospatialInterlinkingT{ - val budget: Int - val ws: WeightingScheme - - def prioritize(source: Array[Entity], target: Array[Entity], partition: MBR, relation: Relation): ComparisonPQ[(Int, Int)] - - - def getWeight(e1: Entity, e2: Entity): Float = Utils.getWeight(e1, e2, ws) - - - /** - * Get the DE-9IM of the top most related entities based - * on the input budget and the Weighting Scheme - * @return an RDD of IM - */ - def getDE9IM: RDD[IM] ={ - joinedRDD.filter(j => j._2._1.nonEmpty && j._2._2.nonEmpty) - .flatMap{ p => - val pid = p._1 - val partition = partitionsZones(pid) - val source = p._2._1.toArray - val target = p._2._2.toArray - - val pq = prioritize(source, target, partition, Relation.DE9IM) - if (!pq.isEmpty) - pq.dequeueAll.map{ case (_, (i, j)) => - val e1 = source(i) - val e2 = target(j) - IM(e1, e2) - }.takeWhile(_ => !pq.isEmpty) - else Iterator() - } - } - - - /** - * Examine the Relation of the top most related entities based - * on the input budget and the Weighting Scheme - * @param relation the relation to examine - * @return an RDD of pair of IDs - */ - def relate(relation: Relation): RDD[(String, String)] = { - joinedRDD.filter(j => j._2._1.nonEmpty && j._2._2.nonEmpty) - .flatMap{ p => - val pid = p._1 - val partition = partitionsZones(pid) - val source = p._2._1.toArray - val target = p._2._2.toArray - - val pq = prioritize(source, target, partition, relation) - if (!pq.isEmpty) - pq.dequeueAll.map{ case (_, (i, j)) => - val e1 = source(i) - val e2 = target(j) - (e1.relate(e2, relation), (e1.originalID, e2.originalID)) - }.filter(_._1).map(_._2) - else Iterator() - } - } - - - /** - * Compute PGR - first weight and perform the comparisons in each partition, - * then collect them in descending order and compute the progressive True Positives. - * - * @param relation the examined relation - * @return (PGR, total interlinked Geometries (TP), total comparisons) - */ - def evaluate(relation: Relation, n: Int = 10, totalQualifiedPairs: Double, takeBudget: Seq[Int]): Seq[(Double, Long, Long, (List[Int], List[Int]))] ={ - // computes weighted the weighted comparisons - val matches: RDD[(Float, Boolean)] = joinedRDD - .filter(p => p._2._1.nonEmpty && p._2._2.nonEmpty) - .flatMap { p => - val pid = p._1 - val partition = partitionsZones(pid) - val source = p._2._1.toArray - val target = p._2._2.toArray - - val pq = prioritize(source, target, partition, relation) - if (!pq.isEmpty) - pq.dequeueAll.map{ case (w, (i, j)) => - val e1 = source(i) - val e2 = target(j) - relation match { - case Relation.DE9IM => (w, IM(e1, e2).relate) - case _ => (w, e1.relate(e2, relation)) - } - }.takeWhile(_ => !pq.isEmpty) - else Iterator() - }.persist(StorageLevel.MEMORY_AND_DISK) - - var results = mutable.ListBuffer[(Double, Long, Long, (List[Int], List[Int]))]() - for(b <- takeBudget){ - // compute AUC prioritizing the comparisons based on their weight - val sorted = matches.takeOrdered(b)(Ordering.by[(Float, Boolean), Float](_._1).reverse) - val verifications = sorted.length - val step = math.ceil(verifications/n) - - var progressiveQP: Double = 0 - var qp = 0 - val verificationSteps = ListBuffer[Int]() - val qualifiedPairsSteps = ListBuffer[Int]() - - sorted - .map(_._2) - .zipWithIndex - .foreach{ - case (r, i) => - if (r) qp += 1 - progressiveQP += qp - if (i % step == 0){ - qualifiedPairsSteps += qp - verificationSteps += i - } - } - qualifiedPairsSteps += qp - verificationSteps += verifications - val qualifiedPairsWithinBudget = if (totalQualifiedPairs < verifications) totalQualifiedPairs else verifications - val pgr = (progressiveQP/qualifiedPairsWithinBudget)/verifications.toDouble - results += ((pgr, qp, verifications, (verificationSteps.toList, qualifiedPairsSteps.toList))) - } - matches.unpersist() - results - } - -} diff --git a/src/main/scala/geospatialInterlinking/progressive/RandomScheduling.scala b/src/main/scala/geospatialInterlinking/progressive/RandomScheduling.scala deleted file mode 100644 index dd94569b..00000000 --- a/src/main/scala/geospatialInterlinking/progressive/RandomScheduling.scala +++ /dev/null @@ -1,57 +0,0 @@ -package geospatialInterlinking.progressive - -import dataModel.{ComparisonPQ, Entity, MBR} -import org.apache.spark.Partitioner -import org.apache.spark.rdd.RDD -import utils.Constants.Relation.Relation -import utils.Constants.WeightingScheme.WeightingScheme -import utils.Utils - -case class RandomScheduling(joinedRDD: RDD[(Int, (Iterable[Entity], Iterable[Entity]))], - thetaXY: (Double, Double), ws: WeightingScheme, budget: Int, sourceCount: Long) extends ProgressiveGeospatialInterlinkingT { - - - /** - * First index source and then for each entity of target, find its comparisons using source's index. - * - * @param partition the MBR: of the partition - * @param source source - * @param target target - * @return a PQ with the top comparisons - */ - def prioritize(source: Array[Entity], target: Array[Entity], partition: MBR, relation: Relation): ComparisonPQ[(Int, Int)] ={ - val sourceIndex = index(source) - val filterIndices = (b: (Int, Int)) => sourceIndex.contains(b) - val pq: ComparisonPQ[(Int, Int)] = ComparisonPQ[(Int, Int)](budget) - - // weight and put the comparisons in a PQ - target - .indices - .foreach {j => - val e2 = target(j) - e2.index(thetaXY, filterIndices) - .foreach { block => - sourceIndex.get(block) - .filter(i => source(i).filter(e2, relation, block, thetaXY, Some(partition))) - .foreach { i => pq.enqueue(1f, (i,j)) } - } - } - pq - } -} - - - -/** - * auxiliary constructor - */ -object RandomScheduling { - - def apply(source:RDD[(Int, Entity)], target:RDD[(Int, Entity)], ws: WeightingScheme, budget: Int, partitioner: Partitioner): RandomScheduling ={ - val thetaXY = Utils.getTheta - val sourceCount = Utils.getSourceCount - val joinedRDD = source.cogroup(target, partitioner) - RandomScheduling(joinedRDD, thetaXY, ws, budget, sourceCount) - } - -} diff --git a/src/main/scala/geospatialInterlinking/progressive/TopKPairs.scala b/src/main/scala/geospatialInterlinking/progressive/TopKPairs.scala deleted file mode 100644 index f6278af2..00000000 --- a/src/main/scala/geospatialInterlinking/progressive/TopKPairs.scala +++ /dev/null @@ -1,108 +0,0 @@ -package geospatialInterlinking.progressive - -import dataModel.{ComparisonPQ, Entity, MBR} -import org.apache.spark.Partitioner -import org.apache.spark.rdd.RDD -import utils.Constants.Relation.Relation -import utils.Constants.WeightingScheme.WeightingScheme -import utils.Utils - -import scala.collection.mutable - -case class TopKPairs(joinedRDD: RDD[(Int, (Iterable[Entity], Iterable[Entity]))], - thetaXY: (Double, Double), ws: WeightingScheme, budget: Int, sourceCount: Long) extends ProgressiveGeospatialInterlinkingT { - - - /** - * First we find the top-k comparisons of each geometry in source and target, - * then we merge them in a common PQ and for each duplicate comparison - * maintain the max weight. - * - * @param source partition of source dataset - * @param target partition of target dataset - * @param partition partition MBR - * @param relation examining relation - * @return prioritized comparisons in a PQ - */ - def prioritize(source: Array[Entity], target: Array[Entity], partition: MBR, relation: Relation): ComparisonPQ[(Int, Int)] = { - val sourceIndex = index(source) - val filterIndices = (b: (Int, Int)) => sourceIndex.contains(b) - - // the budget is divided based on the number of entities - val k = (math.ceil(budget / (source.length + target.length)).toInt + 1) * 2 // +1 to avoid k=0 - val sourcePQ: Array[ComparisonPQ[Int]] = new Array(source.length) - val targetPQ: ComparisonPQ[Int] = ComparisonPQ[Int](k) - val partitionPQ: ComparisonPQ[(Int, Int)] = ComparisonPQ[(Int, Int)](budget) - - target.indices - .foreach{ j => - val e2 = target(j) - e2.index(thetaXY, filterIndices) - .foreach{ block => - sourceIndex.get(block) - .filter(i => source(i).filter(e2, relation, block, thetaXY, Some(partition))) - .foreach { i => - val e1 = source(i) - val w = getWeight(e1, e2) - - // set top-K PQ for the examining target entity - targetPQ.enqueue(w, i) - - // update source entities' top-K - if (sourcePQ(i) == null) - sourcePQ(i) = ComparisonPQ[Int](k) - sourcePQ(i).enqueue(w, j) - } - } - - // add target's pairs in partition's PQ - if (!targetPQ.isEmpty) { - val w = Double.MaxValue - while (targetPQ.size > 0 && w > partitionPQ.minW) { - val (w, i) = targetPQ.dequeueHead() - partitionPQ.enqueue(w, (i, j)) - } - } - targetPQ.clear() - } - - // putting target comparisons in a HasMap. Source entities will also be added in the HashMap - // to update wights and avoid duplicate comparisons - val partitionPairs: mutable.HashMap[(Int, Int), Float] = mutable.HashMap() - partitionPQ.iterator().foreach{ case(w:Float, pair:(Int, Int)) => partitionPairs += (pair -> w) } - - // adding source entities' top-K in hashMap - sourcePQ - .zipWithIndex - .filter(_._1 != null) - .foreach { case (pq, i) => - val w = Double.MaxValue - while (pq.size > 0 && w > partitionPQ.minW) { - val (w, j) = pq.dequeueHead() - if (partitionPQ.minW < w) { - partitionPairs.get(i, j) match { - case Some(weight) if weight < w => partitionPairs.update((i, j), w) //if exist with smaller weight -> update - case None => partitionPairs += ((i, j) -> w) - case _ => - } - } - } - pq.clear() - } - - // keep partition's top comparisons - partitionPQ.clear() - partitionPQ.enqueueAll(partitionPairs.toIterator) - partitionPQ - } -} - -object TopKPairs{ - - def apply(source:RDD[(Int, Entity)], target:RDD[(Int, Entity)], ws: WeightingScheme, budget: Int, partitioner: Partitioner): TopKPairs ={ - val thetaXY = Utils.getTheta - val sourceCount = Utils.getSourceCount - val joinedRDD = source.cogroup(target, partitioner) - TopKPairs(joinedRDD, thetaXY, ws, budget, sourceCount) - } -} diff --git a/src/main/scala/geospatialInterlinking/GIAnt.scala b/src/main/scala/interlinkers/GIAnt.scala similarity index 95% rename from src/main/scala/geospatialInterlinking/GIAnt.scala rename to src/main/scala/interlinkers/GIAnt.scala index f7f426f9..cf665686 100644 --- a/src/main/scala/geospatialInterlinking/GIAnt.scala +++ b/src/main/scala/interlinkers/GIAnt.scala @@ -1,6 +1,6 @@ -package geospatialInterlinking +package interlinkers -import dataModel.{Entity, IM} +import model.{Entity, IM} import org.apache.spark.Partitioner import org.apache.spark.rdd.RDD import utils.Constants.Relation @@ -8,7 +8,7 @@ import utils.Constants.Relation.Relation import utils.Utils -case class GIAnt(joinedRDD: RDD[(Int, (Iterable[Entity], Iterable[Entity]))], thetaXY: (Double, Double)) extends GeospatialInterlinkingT { +case class GIAnt(joinedRDD: RDD[(Int, (Iterable[Entity], Iterable[Entity]))], thetaXY: (Double, Double)) extends InterlinkerT { /** * First index the Source and then use the index to find the comparisons with target's entities. diff --git a/src/main/scala/interlinkers/IndexedJoinInterlinking.scala b/src/main/scala/interlinkers/IndexedJoinInterlinking.scala new file mode 100644 index 00000000..3012b951 --- /dev/null +++ b/src/main/scala/interlinkers/IndexedJoinInterlinking.scala @@ -0,0 +1,65 @@ +package interlinkers + +import model.{Entity, IM} +import org.apache.spark.HashPartitioner +import org.apache.spark.rdd.RDD +import utils.Constants.Relation +import utils.Constants.Relation.Relation +import utils.Constants.WeightingScheme.WeightingScheme + + + + +case class IndexedJoinInterlinking(source:RDD[(Int, Entity)], target:RDD[(Int, Entity)], thetaXY: (Double, Double)) extends InterlinkerT { + + val joinedRDD: RDD[(Int, (Iterable[Entity], Iterable[Entity]))] = null + val ws: WeightingScheme = null + val partitioner = new HashPartitioner(source.getNumPartitions) + + val filteringFunction: ((Int, Entity), (Int, Entity), (Int, Int), Relation) => Boolean = + (e1: (Int, Entity), e2: (Int, Entity), c: (Int, Int), r: Relation) => + e1._1 == e2._1 && e1._2.filter(e2._2, r, c, thetaXY, Some(partitionsZones(e1._1))) + + + def indexedJoin(): RDD[((Int, Int), (Iterable[(Int, Entity)], Iterable[(Int, Entity)]))] = { + val indexedSource: RDD[((Int, Int), Iterable[(Int, Entity)])] = source + .map(se => (se._2.index(thetaXY), se)) + .flatMap{ case (indices, (pid, se)) => indices.map(i => (i, (pid, se)))} + .groupByKey(partitioner) + + val indexedTarget: RDD[((Int, Int), Iterable[(Int, Entity)])] = target + .map(se => (se._2.index(thetaXY), se)) + .flatMap{ case (indices, (pid, se)) => indices.map(i => (i, (pid, se)))} + .groupByKey(partitioner) + + indexedSource.leftOuterJoin(indexedTarget, partitioner) + .filter(_._2._2.isDefined) + .map(p => (p._1, (p._2._1, p._2._2.get))) + } + + /** + * First index Source and then use index to find the comparisons with the entities of Target. + * Filter the redundant comparisons using the spatial Filters + * + * @param relation the examining relation + * @return an RDD containing the matching pairs + */ + def relate(relation: Relation): RDD[(String, String)] = + indexedJoin() + .flatMap { case (c: (Int, Int), ( source: Iterable[(Int, Entity)], target: Iterable[(Int, Entity)])) => + for (e1 <- source; e2 <- target; if filteringFunction(e1, e2, c, relation) && e1._2.relate(e2._2, relation)) + yield (e1._2.originalID, e2._2.originalID) + } + + + def getDE9IM: RDD[IM] = { + val indexedSeq = indexedJoin() + indexedSeq + .flatMap { case (c: (Int, Int), (source: Iterable[(Int, Entity)], target: Iterable[(Int, Entity)])) => + for (e1 <- source; e2 <- target; if filteringFunction(e1, e2, c, Relation.DE9IM)) + yield IM(e1._2, e2._2) + } + } + + +} diff --git a/src/main/scala/geospatialInterlinking/GeospatialInterlinkingT.scala b/src/main/scala/interlinkers/InterlinkerT.scala similarity index 96% rename from src/main/scala/geospatialInterlinking/GeospatialInterlinkingT.scala rename to src/main/scala/interlinkers/InterlinkerT.scala index e39783a4..6db51c22 100644 --- a/src/main/scala/geospatialInterlinking/GeospatialInterlinkingT.scala +++ b/src/main/scala/interlinkers/InterlinkerT.scala @@ -1,12 +1,12 @@ -package geospatialInterlinking +package interlinkers -import dataModel.{Entity, IM, MBR, SpatialIndex} +import model.{Entity, IM, MBR, SpatialIndex} import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import utils.Constants.Relation.Relation import utils.Utils -trait GeospatialInterlinkingT { +trait InterlinkerT { val orderByWeight: Ordering[(Double, (Entity, Entity))] = Ordering.by[(Double, (Entity, Entity)), Double](_._1).reverse @@ -15,7 +15,6 @@ trait GeospatialInterlinkingT { val partitionsZones: Array[MBR] = SparkContext.getOrCreate().broadcast(Utils.getZones).value - /** * index a list of spatial entities * @@ -65,7 +64,6 @@ trait GeospatialInterlinkingT { } (totalContains, totalCoveredBy, totalCovers, totalCrosses, totalEquals, totalIntersects, totalOverlaps, totalTouches, totalWithin, verifications, qualifiedPairs) - } def countAllRelations: (Int, Int, Int, Int, Int, Int, Int, Int, Int, Int, Int) = diff --git a/src/main/scala/interlinkers/progressive/DynamicProgressiveGIAnt.scala b/src/main/scala/interlinkers/progressive/DynamicProgressiveGIAnt.scala new file mode 100644 index 00000000..81471e27 --- /dev/null +++ b/src/main/scala/interlinkers/progressive/DynamicProgressiveGIAnt.scala @@ -0,0 +1,213 @@ +package interlinkers.progressive + +import model.{ComparisonPQ, DynamicComparisonPQ, Entity, IM, MBR, WeightedPair} +import org.apache.spark.Partitioner +import org.apache.spark.rdd.RDD +import org.apache.spark.storage.StorageLevel +import utils.Constants.Relation +import utils.Constants.Relation.Relation +import utils.Constants.WeightingScheme.WeightingScheme +import utils.Utils + +import scala.collection.mutable +import scala.collection.mutable.ListBuffer + +case class DynamicProgressiveGIAnt(joinedRDD: RDD[(Int, (Iterable[Entity], Iterable[Entity]))], thetaXY: (Double, Double), + mainWS: WeightingScheme, secondaryWS: Option[WeightingScheme], budget: Int, sourceEntities: Int) + extends ProgressiveInterlinkerT { + + + /** + * First index source and then for each entity of target, find its comparisons using source's index. + * Weight the comparisons according to the input weighting scheme and sort them using a PQ. + * + * @param partition the MBR: of the partition + * @param source source + * @param target target + * @return a PQ with the top comparisons + */ + def prioritize(source: Array[Entity], target: Array[Entity], partition: MBR, relation: Relation): ComparisonPQ ={ + val localBudget = (math.ceil(budget*source.length.toDouble/sourceEntities.toDouble)*2).toLong + val sourceIndex = index(source) + val filterIndices = (b: (Int, Int)) => sourceIndex.contains(b) + val pq: DynamicComparisonPQ = DynamicComparisonPQ(localBudget) + var counter = 0 + // weight and put the comparisons in a PQ + target + .indices + .foreach {j => + val e2 = target(j) + e2.index(thetaXY, filterIndices) + .foreach { block => + sourceIndex.get(block) + .filter(i => source(i).filter(e2, relation, block, thetaXY, Some(partition))) + .foreach { i => + val e1 = source(i) + val w = getMainWeight(e1, e2) + val secW = getSecondaryWeight(e1, e2) + val wp = WeightedPair(counter, i, j, w, secW) + pq.enqueue(wp) + counter += 1 + } + } + } + pq + } + + /** + * Get the DE-9IM of the top most related entities based + * on the input budget and the Weighting Scheme + * @return an RDD of IM + */ + override def getDE9IM: RDD[IM] ={ + joinedRDD.filter(j => j._2._1.nonEmpty && j._2._2.nonEmpty) + .flatMap{ p => + val pid = p._1 + val partition = partitionsZones(pid) + val source = p._2._1.toArray + val target = p._2._2.toArray + + val pq: DynamicComparisonPQ = prioritize(source, target, partition, Relation.DE9IM).asInstanceOf[DynamicComparisonPQ] + val sourceCandidates: Map[Int, List[WeightedPair]] = pq.iterator().map(wp => (wp.entityId1, wp)).toList.groupBy(_._1).mapValues(_.map(_._2)) + val targetCandidates: Map[Int, List[WeightedPair]] = pq.iterator().map(wp => (wp.entityId2, wp)).toList.groupBy(_._1).mapValues(_.map(_._2)) + + if (!pq.isEmpty) + Iterator.continually{ + val wp = pq.dequeueHead() + val e1 = source(wp.entityId1) + val e2 = target(wp.entityId2) + val im = IM(e1, e2) + val isRelated = im.relate + if (isRelated){ + sourceCandidates.getOrElse(wp.entityId1, List()).foreach(wp => pq.dynamicUpdate(wp)) + targetCandidates.getOrElse(wp.entityId2, List()).foreach(wp => pq.dynamicUpdate(wp)) + } + im + }.takeWhile(_ => !pq.isEmpty) + else Iterator() + } + } + + + /** + * Examine the Relation of the top most related entities based + * on the input budget and the Weighting Scheme + * @param relation the relation to examine + * @return an RDD of pair of IDs + */ + override def relate(relation: Relation): RDD[(String, String)] = { + joinedRDD.filter(j => j._2._1.nonEmpty && j._2._2.nonEmpty) + .flatMap{ p => + val pid = p._1 + val partition = partitionsZones(pid) + val source = p._2._1.toArray + val target = p._2._2.toArray + + val pq: DynamicComparisonPQ = prioritize(source, target, partition, relation).asInstanceOf[DynamicComparisonPQ] + val sourceCandidates: Map[Int, List[WeightedPair]] = pq.iterator().map(wp => (wp.entityId1, wp)).toList.groupBy(_._1).mapValues(_.map(_._2)) + val targetCandidates: Map[Int, List[WeightedPair]] = pq.iterator().map(wp => (wp.entityId2, wp)).toList.groupBy(_._1).mapValues(_.map(_._2)) + if (!pq.isEmpty) + Iterator.continually{ + val wp = pq.dequeueHead() + val e1 = source(wp.entityId1) + val e2 = target(wp.entityId2) + val isRelated = e1.relate(e2, relation) + if (isRelated){ + sourceCandidates.getOrElse(wp.entityId1, List()).foreach(wp => pq.dynamicUpdate(wp)) + targetCandidates.getOrElse(wp.entityId2, List()).foreach(wp => pq.dynamicUpdate(wp)) + } + (isRelated, (e1.originalID, e2.originalID)) + }.filter(_._1).map(_._2) + else Iterator() + } + } + + + /** + * Compute PGR - first weight and perform the comparisons in each partition, + * then collect them in descending order and compute the progressive True Positives. + * + * @param relation the examined relation + * @return (PGR, total interlinked Geometries (TP), total comparisons) + */ + override def evaluate(relation: Relation, n: Int = 10, totalQualifiedPairs: Double, takeBudget: Seq[Int]): Seq[(Double, Long, Long, (List[Int], List[Int]))] ={ + // computes weighted the weighted comparisons + val matches: RDD[(WeightedPair, Boolean)] = joinedRDD + .filter(p => p._2._1.nonEmpty && p._2._2.nonEmpty) + .flatMap { p => + val pid = p._1 + val partition = partitionsZones(pid) + val source = p._2._1.toArray + val target = p._2._2.toArray + + val pq: DynamicComparisonPQ = prioritize(source, target, partition, relation).asInstanceOf[DynamicComparisonPQ] + val sourceCandidates: Map[Int, List[WeightedPair]] = pq.iterator().map(wp => (wp.entityId1, wp)).toList.groupBy(_._1).mapValues(_.map(_._2)) + val targetCandidates: Map[Int, List[WeightedPair]] = pq.iterator().map(wp => (wp.entityId2, wp)).toList.groupBy(_._1).mapValues(_.map(_._2)) + if (!pq.isEmpty) + Iterator.continually{ + val wp = pq.dequeueHead() + val e1 = source(wp.entityId1) + val e2 = target(wp.entityId2) + val isRelated = relation match { + case Relation.DE9IM => IM(e1, e2).relate + case _ => e1.relate(e2, relation) + } + if (isRelated){ + sourceCandidates.getOrElse(wp.entityId1, List()).foreach(wp => pq.dynamicUpdate(wp)) + targetCandidates.getOrElse(wp.entityId2, List()).foreach(wp => pq.dynamicUpdate(wp)) + } + (wp, isRelated) + }.takeWhile(_ => !pq.isEmpty) + else Iterator() + }.persist(StorageLevel.MEMORY_AND_DISK) + + var results = mutable.ListBuffer[(Double, Long, Long, (List[Int], List[Int]))]() + for(b <- takeBudget){ + // compute AUC prioritizing the comparisons based on their weight + val sorted = matches.takeOrdered(b) + val verifications = sorted.length + val step = math.ceil(verifications/n) + + var progressiveQP: Double = 0 + var qp = 0 + val verificationSteps = ListBuffer[Int]() + val qualifiedPairsSteps = ListBuffer[Int]() + + sorted + .map(_._2) + .zipWithIndex + .foreach{ + case (r, i) => + if (r) qp += 1 + progressiveQP += qp + if (i % step == 0){ + qualifiedPairsSteps += qp + verificationSteps += i + } + } + qualifiedPairsSteps += qp + verificationSteps += verifications + val qualifiedPairsWithinBudget = if (totalQualifiedPairs < verifications) totalQualifiedPairs else verifications + val pgr = (progressiveQP/qualifiedPairsWithinBudget)/verifications.toDouble + results += ((pgr, qp, verifications, (verificationSteps.toList, qualifiedPairsSteps.toList))) + } + matches.unpersist() + results + } +} + + +/** + * auxiliary constructor + */ +object DynamicProgressiveGIAnt { + + def apply(source:RDD[(Int, Entity)], target:RDD[(Int, Entity)], ws: WeightingScheme, sws: Option[WeightingScheme] = None, + budget: Int, partitioner: Partitioner): DynamicProgressiveGIAnt ={ + val thetaXY = Utils.getTheta + val joinedRDD = source.cogroup(target, partitioner) + val sourceEntities = Utils.sourceCount + DynamicProgressiveGIAnt(joinedRDD, thetaXY, ws, sws, budget, sourceEntities.toInt) + } + +} \ No newline at end of file diff --git a/src/main/scala/interlinkers/progressive/GeometryCentric.scala b/src/main/scala/interlinkers/progressive/GeometryCentric.scala new file mode 100644 index 00000000..b0f5cd3a --- /dev/null +++ b/src/main/scala/interlinkers/progressive/GeometryCentric.scala @@ -0,0 +1,68 @@ +//package geospatialInterlinking.progressive +// +//import dataModel.{ComparisonPQ, Entity, MBR, WeightedPairsPQ} +//import org.apache.spark.Partitioner +//import org.apache.spark.rdd.RDD +//import utils.Constants.Relation.Relation +//import utils.Constants.WeightingScheme.WeightingScheme +//import utils.Utils +// +// +//case class GeometryCentric(joinedRDD: RDD[(Int, (Iterable[Entity], Iterable[Entity]))], +// thetaXY: (Double, Double), ws: WeightingScheme, budget: Int, sourceCount: Long) +// extends ProgressiveGeospatialInterlinkingT { +// +// +// /** +// * For each target entity we keep only the top K comparisons, according to a weighting scheme. +// * Then we assign the top K comparisons a common weight, which is their avg +// * Based on this weight we prioritize their execution. +// * +// * @return an RDD of Intersection Matrices +// */ +// def prioritize(source: Array[Entity], target: Array[Entity], partition: MBR, relation: Relation): WeightedPairsPQ = { +// val sourceIndex = index(source) +// val filterIndices = (b: (Int, Int)) => sourceIndex.contains(b) +// val k = (math.ceil(budget / target.length).toInt + 1) * 2 // +1 to avoid k=0 +// val targetPQ: ComparisonPQ[Int] = ComparisonPQ[Int](k) +// val partitionPQ: ComparisonPQ[(Int, Int)] = ComparisonPQ[(Int, Int)](budget) +// +// target +// .indices +// .foreach { j => +// var wSum = 0f +// val e2 = target(j) +// e2.index(thetaXY, filterIndices) +// .foreach { block => +// sourceIndex.get(block) +// .filter(i => source(i).filter(e2, relation, block, thetaXY, Some(partition))) +// .foreach { i => +// val e1 = source(i) +// val w = getWeight(e1, e2) +// wSum += w +// targetPQ.enqueue(w, i) +// } +// } +// if (! targetPQ.isEmpty) { +// val pqSize = targetPQ.size() +// val topK = targetPQ.dequeueAll.map(_._2) +// val weight = wSum / pqSize +// partitionPQ.enqueueAll(topK.map(i => ((i, j), weight))) +// targetPQ.clear() +// } +// } +// partitionPQ +// } +//} +// +// +//object GeometryCentric{ +// +// def apply(source:RDD[(Int, Entity)], target:RDD[(Int, Entity)], ws: WeightingScheme, budget: Int, partitioner: Partitioner) +// : GeometryCentric ={ +// val thetaXY = Utils.getTheta +// val sourceCount = Utils.getSourceCount +// val joinedRDD = source.cogroup(target, partitioner) +// GeometryCentric(joinedRDD, thetaXY, ws, budget, sourceCount) +// } +//} \ No newline at end of file diff --git a/src/main/scala/interlinkers/progressive/ProgressiveAlgorithmsFactory.scala b/src/main/scala/interlinkers/progressive/ProgressiveAlgorithmsFactory.scala new file mode 100644 index 00000000..8e4d9441 --- /dev/null +++ b/src/main/scala/interlinkers/progressive/ProgressiveAlgorithmsFactory.scala @@ -0,0 +1,32 @@ +package interlinkers.progressive + +import model.Entity +import org.apache.spark.Partitioner +import org.apache.spark.rdd.RDD +import utils.Constants.ProgressiveAlgorithm.ProgressiveAlgorithm +import utils.Constants.WeightingScheme.WeightingScheme +import utils.Constants.ProgressiveAlgorithm + +object ProgressiveAlgorithmsFactory { + + + def get(matchingAlgorithm: ProgressiveAlgorithm, source: RDD[(Int, Entity)], target: RDD[(Int, Entity)], + partitioner: Partitioner, budget: Int = 0, mainWS: WeightingScheme, secondaryWS: Option[WeightingScheme]): + ProgressiveInterlinkerT ={ + + matchingAlgorithm match { + case ProgressiveAlgorithm.RANDOM => + RandomScheduling(source, target, mainWS, secondaryWS, budget, partitioner) +// case ProgressiveAlgorithm.GEOMETRY_CENTRIC => +// GeometryCentric(source, target, ws, budget, partitioner) + case ProgressiveAlgorithm.TOPK => + TopKPairs(source, target, mainWS, secondaryWS, budget, partitioner) + case ProgressiveAlgorithm.RECIPROCAL_TOPK => + ReciprocalTopK(source, target, mainWS, secondaryWS, budget, partitioner) + case ProgressiveAlgorithm.DYNAMIC_PROGRESSIVE_GIANT => + DynamicProgressiveGIAnt(source, target, mainWS, secondaryWS, budget, partitioner) + case ProgressiveAlgorithm.PROGRESSIVE_GIANT | _ => + ProgressiveGIAnt(source, target, mainWS, secondaryWS, budget, partitioner) + } + } +} diff --git a/src/main/scala/geospatialInterlinking/progressive/ProgressiveGIAnt.scala b/src/main/scala/interlinkers/progressive/ProgressiveGIAnt.scala similarity index 58% rename from src/main/scala/geospatialInterlinking/progressive/ProgressiveGIAnt.scala rename to src/main/scala/interlinkers/progressive/ProgressiveGIAnt.scala index 962aedea..9ffae1fa 100644 --- a/src/main/scala/geospatialInterlinking/progressive/ProgressiveGIAnt.scala +++ b/src/main/scala/interlinkers/progressive/ProgressiveGIAnt.scala @@ -1,6 +1,6 @@ -package geospatialInterlinking.progressive +package interlinkers.progressive -import dataModel.{ComparisonPQ, Entity, MBR} +import model.{Entity, MBR, WeightedPair, StaticComparisonPQ} import org.apache.spark.Partitioner import org.apache.spark.rdd.RDD import utils.Constants.Relation.Relation @@ -8,8 +8,9 @@ import utils.Constants.WeightingScheme.WeightingScheme import utils.Utils -case class ProgressiveGIAnt(joinedRDD: RDD[(Int, (Iterable[Entity], Iterable[Entity]))], - thetaXY: (Double, Double), ws: WeightingScheme, budget: Int, sourceCount: Long) extends ProgressiveGeospatialInterlinkingT { +case class ProgressiveGIAnt(joinedRDD: RDD[(Int, (Iterable[Entity], Iterable[Entity]))], thetaXY: (Double, Double), + mainWS: WeightingScheme, secondaryWS: Option[WeightingScheme], budget: Int, sourceEntities: Int) + extends ProgressiveInterlinkerT { /** @@ -21,11 +22,12 @@ case class ProgressiveGIAnt(joinedRDD: RDD[(Int, (Iterable[Entity], Iterable[Ent * @param target target * @return a PQ with the top comparisons */ - def prioritize(source: Array[Entity], target: Array[Entity], partition: MBR, relation: Relation): ComparisonPQ[(Int, Int)] ={ + def prioritize(source: Array[Entity], target: Array[Entity], partition: MBR, relation: Relation): StaticComparisonPQ ={ + val localBudget = (math.ceil(budget*source.length.toDouble/sourceEntities.toDouble)*2).toLong val sourceIndex = index(source) val filterIndices = (b: (Int, Int)) => sourceIndex.contains(b) - val pq: ComparisonPQ[(Int, Int)] = ComparisonPQ[(Int, Int)](budget) - + val pq: StaticComparisonPQ = StaticComparisonPQ(localBudget) + var counter = 0 // weight and put the comparisons in a PQ target .indices @@ -37,8 +39,11 @@ case class ProgressiveGIAnt(joinedRDD: RDD[(Int, (Iterable[Entity], Iterable[Ent .filter(i => source(i).filter(e2, relation, block, thetaXY, Some(partition))) .foreach { i => val e1 = source(i) - val w = getWeight(e1, e2) - pq.enqueue(w, (i,j)) + val w = getMainWeight(e1, e2) + val secW = getSecondaryWeight(e1, e2) + val wp = WeightedPair(counter, i, j, w, secW) + pq.enqueue(wp) + counter += 1 } } } @@ -53,11 +58,12 @@ case class ProgressiveGIAnt(joinedRDD: RDD[(Int, (Iterable[Entity], Iterable[Ent */ object ProgressiveGIAnt { - def apply(source:RDD[(Int, Entity)], target:RDD[(Int, Entity)], ws: WeightingScheme, budget: Int, partitioner: Partitioner): ProgressiveGIAnt ={ + def apply(source:RDD[(Int, Entity)], target:RDD[(Int, Entity)], ws: WeightingScheme, sws: Option[WeightingScheme] = None, + budget: Int, partitioner: Partitioner): ProgressiveGIAnt ={ val thetaXY = Utils.getTheta - val sourceCount = Utils.getSourceCount val joinedRDD = source.cogroup(target, partitioner) - ProgressiveGIAnt(joinedRDD, thetaXY, ws, budget, sourceCount) + val sourceEntities = Utils.sourceCount + ProgressiveGIAnt(joinedRDD, thetaXY, ws, sws, budget, sourceEntities.toInt) } } diff --git a/src/main/scala/interlinkers/progressive/ProgressiveInterlinkerT.scala b/src/main/scala/interlinkers/progressive/ProgressiveInterlinkerT.scala new file mode 100644 index 00000000..332893af --- /dev/null +++ b/src/main/scala/interlinkers/progressive/ProgressiveInterlinkerT.scala @@ -0,0 +1,191 @@ +package interlinkers.progressive + +import model.{ComparisonPQ, Entity, IM, MBR, WeightedPair} +import interlinkers.InterlinkerT +import org.apache.commons.math3.stat.inference.ChiSquareTest +import org.apache.spark.rdd.RDD +import org.apache.spark.storage.StorageLevel +import utils.Constants.Relation.Relation +import utils.Constants.WeightingScheme.WeightingScheme +import utils.Constants.{Relation, WeightingScheme} + +import scala.collection.mutable +import scala.collection.mutable.ListBuffer +import scala.math.{ceil, floor, max, min} + +trait ProgressiveInterlinkerT extends InterlinkerT{ + val budget: Int + val mainWS: WeightingScheme + val secondaryWS: Option[WeightingScheme] + + lazy val totalBlocks: Double = { + val globalMinX: Double = partitionsZones.map(p => p.minX / thetaXY._1).min + val globalMaxX: Double = partitionsZones.map(p => p.maxX / thetaXY._1).max + val globalMinY: Double = partitionsZones.map(p => p.minY / thetaXY._2).min + val globalMaxY: Double = partitionsZones.map(p => p.maxY / thetaXY._2).max + + (globalMaxX - globalMinX + 1) * (globalMaxY - globalMinY + 1) + } + + def prioritize(source: Array[Entity], target: Array[Entity], partition: MBR, relation: Relation): ComparisonPQ + + def getMainWeight(e1: Entity, e2: Entity): Float = getWeight(e1, e2, mainWS) + + def getSecondaryWeight(e1: Entity, e2: Entity): Float = + secondaryWS match { + case Some(ws) => getWeight(e1, e2, ws) + case None => 0f + } + + /** + * Weight a comparison + * + * @param e1 Spatial entity + * @param e2 Spatial entity + * @return weight + */ + def getWeight(e1: Entity, e2: Entity, ws: WeightingScheme): Float = { + val e1Blocks = (ceil(e1.mbr.maxX/thetaXY._1).toInt - floor(e1.mbr.minX/thetaXY._1).toInt + 1) * (ceil(e1.mbr.maxY/thetaXY._2).toInt - floor(e1.mbr.minY/thetaXY._2).toInt + 1) + val e2Blocks = (ceil(e2.mbr.maxX/thetaXY._1).toInt - floor(e2.mbr.minX/thetaXY._1).toInt + 1) * (ceil(e2.mbr.maxY/thetaXY._2).toInt - floor(e2.mbr.minY/thetaXY._2).toInt + 1) + lazy val cb = (min(ceil(e1.mbr.maxX/thetaXY._1), ceil(e2.mbr.maxX/thetaXY._1)).toInt - max(floor(e1.mbr.minX/thetaXY._1), floor(e2.mbr.minX/thetaXY._1)).toInt + 1) * + (min(ceil(e1.mbr.maxY/thetaXY._2), ceil(e2.mbr.maxY/thetaXY._2)).toInt - max(floor(e1.mbr.minY/thetaXY._2), floor(e2.mbr.minY/thetaXY._2)).toInt + 1) + + ws match { + case WeightingScheme.MBRO => + val intersectionArea = e1.mbr.getIntersectingMBR(e2.mbr).getArea + val w = intersectionArea / (e1.mbr.getArea + e2.mbr.getArea - intersectionArea) + if (!w.isNaN) w else 0f + + case WeightingScheme.ISP => + 1f / (e1.geometry.getNumPoints + e2.geometry.getNumPoints); + + case WeightingScheme.JS => + cb / (e1Blocks + e2Blocks - cb) + + case WeightingScheme.PEARSON_X2 => + val v1: Array[Long] = Array[Long](cb, (e2Blocks - cb).toLong) + val v2: Array[Long] = Array[Long]((e1Blocks - cb).toLong, (totalBlocks - (v1(0) + v1(1) + (e1Blocks - cb))).toLong) + val chiTest = new ChiSquareTest() + chiTest.chiSquare(Array(v1, v2)).toFloat + + case WeightingScheme.CF | _ => + cb.toFloat + } + } + + /** + * Get the DE-9IM of the top most related entities based + * on the input budget and the Weighting Scheme + * @return an RDD of IM + */ + def getDE9IM: RDD[IM] ={ + joinedRDD.filter(j => j._2._1.nonEmpty && j._2._2.nonEmpty) + .flatMap{ p => + val pid = p._1 + val partition = partitionsZones(pid) + val source = p._2._1.toArray + val target = p._2._2.toArray + + val pq = prioritize(source, target, partition, Relation.DE9IM) + if (!pq.isEmpty) + pq.dequeueAll.map{ wp => + val e1 = source(wp.entityId1) + val e2 = target(wp.entityId2) + IM(e1, e2) + }.takeWhile(_ => !pq.isEmpty) + else Iterator() + } + } + + + /** + * Examine the Relation of the top most related entities based + * on the input budget and the Weighting Scheme + * @param relation the relation to examine + * @return an RDD of pair of IDs + */ + def relate(relation: Relation): RDD[(String, String)] = { + joinedRDD.filter(j => j._2._1.nonEmpty && j._2._2.nonEmpty) + .flatMap{ p => + val pid = p._1 + val partition = partitionsZones(pid) + val source = p._2._1.toArray + val target = p._2._2.toArray + + val pq = prioritize(source, target, partition, relation) + if (!pq.isEmpty) + pq.dequeueAll.map{ wp => + val e1 = source(wp.entityId1) + val e2 = target(wp.entityId2) + (e1.relate(e2, relation), (e1.originalID, e2.originalID)) + }.filter(_._1).map(_._2) + else Iterator() + } + } + + + /** + * Compute PGR - first weight and perform the comparisons in each partition, + * then collect them in descending order and compute the progressive True Positives. + * + * @param relation the examined relation + * @return (PGR, total interlinked Geometries (TP), total comparisons) + */ + def evaluate(relation: Relation, n: Int = 10, totalQualifiedPairs: Double, takeBudget: Seq[Int]): Seq[(Double, Long, Long, (List[Int], List[Int]))] ={ + // computes weighted the weighted comparisons + val matches: RDD[(WeightedPair, Boolean)] = joinedRDD + .filter(p => p._2._1.nonEmpty && p._2._2.nonEmpty) + .flatMap { p => + val pid = p._1 + val partition = partitionsZones(pid) + val source = p._2._1.toArray + val target = p._2._2.toArray + + val pq = prioritize(source, target, partition, relation) + if (!pq.isEmpty) + pq.dequeueAll.map{ wp => + val e1 = source(wp.entityId1) + val e2 = target(wp.entityId2) + relation match { + case Relation.DE9IM => (wp, IM(e1, e2).relate) + case _ => (wp, e1.relate(e2, relation)) + } + }.takeWhile(_ => !pq.isEmpty) + else Iterator() + }.persist(StorageLevel.MEMORY_AND_DISK) + + var results = mutable.ListBuffer[(Double, Long, Long, (List[Int], List[Int]))]() + for(b <- takeBudget){ + // compute AUC prioritizing the comparisons based on their weight + val sorted = matches.takeOrdered(b) + val verifications = sorted.length + val step = math.ceil(verifications/n) + + var progressiveQP: Double = 0 + var qp = 0 + val verificationSteps = ListBuffer[Int]() + val qualifiedPairsSteps = ListBuffer[Int]() + + sorted + .map(_._2) + .zipWithIndex + .foreach{ + case (r, i) => + if (r) qp += 1 + progressiveQP += qp + if (i % step == 0){ + qualifiedPairsSteps += qp + verificationSteps += i + } + } + qualifiedPairsSteps += qp + verificationSteps += verifications + val qualifiedPairsWithinBudget = if (totalQualifiedPairs < verifications) totalQualifiedPairs else verifications + val pgr = (progressiveQP/qualifiedPairsWithinBudget)/verifications.toDouble + results += ((pgr, qp, verifications, (verificationSteps.toList, qualifiedPairsSteps.toList))) + } + matches.unpersist() + results + } + +} diff --git a/src/main/scala/interlinkers/progressive/RandomScheduling.scala b/src/main/scala/interlinkers/progressive/RandomScheduling.scala new file mode 100644 index 00000000..9aa843ce --- /dev/null +++ b/src/main/scala/interlinkers/progressive/RandomScheduling.scala @@ -0,0 +1,67 @@ +package interlinkers.progressive + +import model.{Entity, MBR, WeightedPair, StaticComparisonPQ} +import org.apache.spark.Partitioner +import org.apache.spark.rdd.RDD +import utils.Constants.Relation.Relation +import utils.Constants.WeightingScheme.WeightingScheme +import utils.Utils + +case class RandomScheduling(joinedRDD: RDD[(Int, (Iterable[Entity], Iterable[Entity]))], thetaXY: (Double, Double), + mainWS: WeightingScheme, secondaryWS: Option[WeightingScheme], budget: Int, sourceEntities: Int) + extends ProgressiveInterlinkerT { + + + /** + * First index source and then for each entity of target, find its comparisons using source's index. + * + * @param partition the MBR: of the partition + * @param source source + * @param target target + * @return a PQ with the top comparisons + */ + def prioritize(source: Array[Entity], target: Array[Entity], partition: MBR, relation: Relation): StaticComparisonPQ = { + val localBudget = (math.ceil(budget*source.length.toDouble/sourceEntities.toDouble)*2).toInt + + val sourceIndex = index(source) + val filterIndices = (b: (Int, Int)) => sourceIndex.contains(b) + val pq: StaticComparisonPQ = StaticComparisonPQ(localBudget) + val rnd = new scala.util.Random + var counter = 0 + // weight and put the comparisons in a PQ + target + .indices + .foreach { j => + val e2 = target(j) + e2.index(thetaXY, filterIndices) + .foreach { block => + sourceIndex.get(block) + .filter(i => source(i).filter(e2, relation, block, thetaXY, Some(partition))) + .foreach { i => + val w = rnd.nextFloat() + val secW = rnd.nextFloat() + val wp = WeightedPair(counter, i, j, w, secW) + pq.enqueue(wp) + counter += 1 + } + } + } + pq + } +} + + +/** + * auxiliary constructor + */ +object RandomScheduling { + + def apply(source:RDD[(Int, Entity)], target:RDD[(Int, Entity)], ws: WeightingScheme, sws: Option[WeightingScheme] = None, + budget: Int, partitioner: Partitioner): RandomScheduling ={ + val thetaXY = Utils.getTheta + val joinedRDD = source.cogroup(target, partitioner) + val sourceEntities = Utils.sourceCount + RandomScheduling(joinedRDD, thetaXY, ws, sws, budget, sourceEntities.toInt) + } + +} diff --git a/src/main/scala/geospatialInterlinking/progressive/ReciprocalTopK.scala b/src/main/scala/interlinkers/progressive/ReciprocalTopK.scala similarity index 50% rename from src/main/scala/geospatialInterlinking/progressive/ReciprocalTopK.scala rename to src/main/scala/interlinkers/progressive/ReciprocalTopK.scala index 81612156..f6e1a973 100644 --- a/src/main/scala/geospatialInterlinking/progressive/ReciprocalTopK.scala +++ b/src/main/scala/interlinkers/progressive/ReciprocalTopK.scala @@ -1,6 +1,6 @@ -package geospatialInterlinking.progressive +package interlinkers.progressive -import dataModel.{ComparisonPQ, Entity, MBR} +import model.{Entity, MBR, WeightedPair, StaticComparisonPQ} import org.apache.spark.Partitioner import org.apache.spark.rdd.RDD import utils.Constants.Relation.Relation @@ -9,8 +9,9 @@ import utils.Utils -case class ReciprocalTopK(joinedRDD: RDD[(Int, (Iterable[Entity], Iterable[Entity]))], - thetaXY: (Double, Double), ws: WeightingScheme, budget: Int, sourceCount: Long) extends ProgressiveGeospatialInterlinkingT { +case class ReciprocalTopK(joinedRDD: RDD[(Int, (Iterable[Entity], Iterable[Entity]))], thetaXY: (Double, Double), + mainWS: WeightingScheme, secondaryWS: Option[WeightingScheme], budget: Int, sourceEntities: Int) + extends ProgressiveInterlinkerT { /** * Find the top-K comparisons of target and source and keep only the comparison (i, j) that belongs to both @@ -22,16 +23,18 @@ case class ReciprocalTopK(joinedRDD: RDD[(Int, (Iterable[Entity], Iterable[Entit * @param relation examining relation * @return prioritized comparisons as a PQ */ - def prioritize(source: Array[Entity], target: Array[Entity], partition: MBR, relation: Relation): ComparisonPQ[(Int, Int)] = { + def prioritize(source: Array[Entity], target: Array[Entity], partition: MBR, relation: Relation): StaticComparisonPQ = { + val localBudget = (math.ceil(budget*source.length.toDouble/sourceEntities.toDouble)*2).toLong val sourceIndex = index(source) val filterIndices = (b: (Int, Int)) => sourceIndex.contains(b) - val sourceK = (math.ceil(budget / source.length).toInt + 1) * 2 // +1 to avoid k=0 - val targetK = (math.ceil(budget / target.length).toInt + 1) * 2 // +1 to avoid k=0 + val sourceK = (math.ceil(localBudget / source.length).toInt + 1) * 2 // +1 to avoid k=0 + val targetK = (math.ceil(localBudget / target.length).toInt + 1) * 2 // +1 to avoid k=0 - val sourcePQ: Array[ComparisonPQ[Int]] = new Array(source.length) - val targetPQ: ComparisonPQ[Int] = ComparisonPQ[Int](targetK) - val partitionPQ: ComparisonPQ[(Int, Int)] = ComparisonPQ[(Int, Int)](budget) + val sourcePQ: Array[StaticComparisonPQ] = new Array(source.length) + val targetPQ: StaticComparisonPQ = StaticComparisonPQ(targetK) + val partitionPQ: StaticComparisonPQ = StaticComparisonPQ(localBudget) + var counter = 0 val targetSet: Array[Set[Int]] = new Array(target.length) target.indices @@ -43,33 +46,32 @@ case class ReciprocalTopK(joinedRDD: RDD[(Int, (Iterable[Entity], Iterable[Entit .filter(i => source(i).filter(e2, relation, block, thetaXY, Some(partition))) .foreach { i => val e1 = source(i) - val w = getWeight(e1, e2) + val w = getMainWeight(e1, e2) + val secW = getSecondaryWeight(e1, e2) + val wp = WeightedPair(counter, i, j, w, secW) + counter += 1 // set top-K PQ for the examining target entity - targetPQ.enqueue(w, i) + targetPQ.enqueue(wp) // update source entities' top-K if (sourcePQ(i) == null) - sourcePQ(i) = ComparisonPQ[Int](sourceK) - sourcePQ(i).enqueue(w, j) + sourcePQ(i) = StaticComparisonPQ(sourceK) + sourcePQ(i).enqueue(wp) } } // add comparisons into corresponding HashSet - targetSet(j) = targetPQ.iterator().map(_._2).toSet + targetSet(j) = targetPQ.iterator().map(_.entityId1).toSet targetPQ.clear() } // add comparison into PQ only if is contained by both top-K PQs sourcePQ - .zipWithIndex - .filter(_._1 != null) - .foreach { case (pq, i) => - val w = Double.MaxValue - while (pq.size > 0 && w > partitionPQ.minW) { - val (w, j) = pq.dequeueHead() - if (targetSet(j).contains(i)) - partitionPQ.enqueue(w, (i, j)) - } + .filter(_ != null) + .foreach { pq => + pq.iterator() + .filter(wp => targetSet(wp.entityId2).contains(wp.entityId1)) + .foreach(wp => partitionPQ.enqueue(wp)) } partitionPQ } @@ -77,10 +79,11 @@ case class ReciprocalTopK(joinedRDD: RDD[(Int, (Iterable[Entity], Iterable[Entit object ReciprocalTopK{ - def apply(source:RDD[(Int, Entity)], target:RDD[(Int, Entity)], ws: WeightingScheme, budget: Int, partitioner: Partitioner): ReciprocalTopK ={ + def apply(source:RDD[(Int, Entity)], target:RDD[(Int, Entity)], ws: WeightingScheme, sws: Option[WeightingScheme] = None, + budget: Int, partitioner: Partitioner): ReciprocalTopK ={ val thetaXY = Utils.getTheta - val sourceCount = Utils.getSourceCount val joinedRDD = source.cogroup(target, partitioner) - ReciprocalTopK(joinedRDD, thetaXY, ws, budget, sourceCount) + val sourceEntities = Utils.sourceCount + ReciprocalTopK(joinedRDD, thetaXY, ws, sws, budget, sourceEntities.toInt) } } diff --git a/src/main/scala/interlinkers/progressive/TopKPairs.scala b/src/main/scala/interlinkers/progressive/TopKPairs.scala new file mode 100644 index 00000000..458f307b --- /dev/null +++ b/src/main/scala/interlinkers/progressive/TopKPairs.scala @@ -0,0 +1,97 @@ +package interlinkers.progressive + +import model.{Entity, MBR, WeightedPair, StaticComparisonPQ} +import org.apache.spark.Partitioner +import org.apache.spark.rdd.RDD +import utils.Constants.Relation.Relation +import utils.Constants.WeightingScheme.WeightingScheme +import utils.Utils + +case class TopKPairs(joinedRDD: RDD[(Int, (Iterable[Entity], Iterable[Entity]))], thetaXY: (Double, Double), + mainWS: WeightingScheme, secondaryWS: Option[WeightingScheme], budget: Int, sourceEntities: Int) + extends ProgressiveInterlinkerT { + + /** + * First we find the top-k comparisons of each geometry in source and target, + * then we merge them in a common PQ and for each duplicate comparison + * maintain the max weight. + * + * @param source partition of source dataset + * @param target partition of target dataset + * @param partition partition MBR + * @param relation examining relation + * @return prioritized comparisons in a PQ + */ + def prioritize(source: Array[Entity], target: Array[Entity], partition: MBR, relation: Relation): StaticComparisonPQ = { + val localBudget = (math.ceil(budget*source.length.toDouble/sourceEntities.toDouble)*2).toLong + val sourceIndex = index(source) + val filterIndices = (b: (Int, Int)) => sourceIndex.contains(b) + + // the budget is divided based on the number of entities + val k = (math.ceil(localBudget / (source.length + target.length)).toInt + 1) * 2 // +1 to avoid k=0 + val sourcePQ: Array[StaticComparisonPQ] = new Array(source.length) + val targetPQ: StaticComparisonPQ = StaticComparisonPQ(k) + val partitionPQ: StaticComparisonPQ = StaticComparisonPQ(localBudget) + var counter = 0 + + target.indices + .foreach{ j => + val e2 = target(j) + e2.index(thetaXY, filterIndices) + .foreach{ block => + sourceIndex.get(block) + .filter(i => source(i).filter(e2, relation, block, thetaXY, Some(partition))) + .foreach { i => + val e1 = source(i) + val w = getMainWeight(e1, e2) + val secW = getSecondaryWeight(e1, e2) + val wp = WeightedPair(counter, i, j, w, secW) + counter += 1 + + // set top-K PQ for the examining target entity + targetPQ.enqueue(wp) + + // update source entities' top-K + if (sourcePQ(i) == null) + sourcePQ(i) = StaticComparisonPQ(k) + sourcePQ(i).enqueue(wp) + } + } + + // add target's pairs in partition's PQ + if (!targetPQ.isEmpty) { + while (targetPQ.size > 0) { + val wp = targetPQ.dequeueHead() + partitionPQ.enqueue(wp) + } + } + targetPQ.clear() + } + + // putting target comparisons in a HasMap. Source entities will also be added in the HashMap + // to update wights and avoid duplicate comparisons + val existingPairs = partitionPQ.iterator().toSet + // adding source entities' top-K in hashMap + sourcePQ + .filter(_ != null) + .foreach { pq => + pq.dequeueAll.foreach(wp => partitionPQ.enqueue(wp)) + pq.clear() + } + // keep partition's top comparisons + partitionPQ.clear() + partitionPQ.enqueueAll(existingPairs.iterator) + partitionPQ + } +} + +object TopKPairs{ + + def apply(source:RDD[(Int, Entity)], target:RDD[(Int, Entity)], ws: WeightingScheme, sws: Option[WeightingScheme] = None, + budget: Int, partitioner: Partitioner): TopKPairs ={ + val thetaXY = Utils.getTheta + val joinedRDD = source.cogroup(target, partitioner) + val sourceEntities = Utils.sourceCount + TopKPairs(joinedRDD, thetaXY, ws, sws, budget, sourceEntities.toInt) + } +} diff --git a/src/main/scala/model/ComparisonPQ.scala b/src/main/scala/model/ComparisonPQ.scala new file mode 100644 index 00000000..742a2e2e --- /dev/null +++ b/src/main/scala/model/ComparisonPQ.scala @@ -0,0 +1,75 @@ +package model + +import java.util +import org.spark_project.guava.collect.MinMaxPriorityQueue +import scala.collection.JavaConverters._ + + +sealed trait ComparisonPQ { + val pq: util.AbstractCollection[WeightedPair] + val maxSize: Long + + def enqueue(wp: WeightedPair): Unit ={ + pq.add(wp) + if (pq.size > maxSize) + dequeueLast() + } + + def enqueueAll(items: Iterator[WeightedPair]): Unit = items.foreach(wp => enqueue(wp)) + + def take(n: Option[Int]): Iterator[WeightedPair] = + n match { + case Some(n) => Iterator.continually{ dequeueHead() }.take(n) + case None => Iterator.continually{ dequeueHead() }.takeWhile(_ => !pq.isEmpty) + } + + def take(n: Int): Iterator[WeightedPair] = take(Option(n)) + + def dequeueAll: Iterator[WeightedPair] = take(None) + + def clear(): Unit = pq.clear() + + def isEmpty: Boolean = pq.isEmpty + + def size(): Int = pq.size() + + def dequeueHead(): WeightedPair + + def dequeueLast(): WeightedPair + + def iterator(): Iterator[WeightedPair] = pq.iterator().asScala + +} + + +case class StaticComparisonPQ(maxSize: Long) extends ComparisonPQ{ + + val pq: MinMaxPriorityQueue[WeightedPair] = MinMaxPriorityQueue.maximumSize(maxSize.toInt+1).create() + + def dequeueHead(): WeightedPair = pq.pollFirst() + + def dequeueLast(): WeightedPair = pq.pollLast() + +} + +case class DynamicComparisonPQ(maxSize: Long) extends ComparisonPQ{ + + val pq: util.TreeSet[WeightedPair] = new util.TreeSet[WeightedPair]() + + def dequeueHead(): WeightedPair = pq.pollFirst() + + def dequeueLast(): WeightedPair = pq.pollLast() + + def dynamicUpdate(wp: WeightedPair): Unit ={ + val exists = pq.remove(wp) + if (exists){ + wp.incrementRelatedMatches() + enqueue(wp) + } + } +} + + + + + diff --git a/src/main/scala/dataModel/Entity.scala b/src/main/scala/model/Entity.scala similarity index 99% rename from src/main/scala/dataModel/Entity.scala rename to src/main/scala/model/Entity.scala index 8ac56235..fe22ba9c 100644 --- a/src/main/scala/dataModel/Entity.scala +++ b/src/main/scala/model/Entity.scala @@ -1,4 +1,4 @@ -package dataModel +package model import com.vividsolutions.jts.geom.{Geometry, IntersectionMatrix} import com.vividsolutions.jts.io.WKTReader diff --git a/src/main/scala/dataModel/IM.scala b/src/main/scala/model/IM.scala similarity index 97% rename from src/main/scala/dataModel/IM.scala rename to src/main/scala/model/IM.scala index f3d252a0..73f667da 100644 --- a/src/main/scala/dataModel/IM.scala +++ b/src/main/scala/model/IM.scala @@ -1,4 +1,4 @@ -package dataModel +package model case class IM(idPair: (String, String), isContains: Boolean, isCoveredBy: Boolean, isCovers: Boolean, isCrosses: Boolean, isEquals: Boolean, isIntersects: Boolean, isOverlaps: Boolean, isTouches: Boolean, isWithin: Boolean){ diff --git a/src/main/scala/dataModel/MBR.scala b/src/main/scala/model/MBR.scala similarity index 95% rename from src/main/scala/dataModel/MBR.scala rename to src/main/scala/model/MBR.scala index bbde614d..589a9392 100644 --- a/src/main/scala/dataModel/MBR.scala +++ b/src/main/scala/model/MBR.scala @@ -1,4 +1,4 @@ -package dataModel +package model import com.vividsolutions.jts.geom.{Coordinate, Envelope, Geometry, GeometryFactory} import utils.Constants.Relation @@ -30,7 +30,7 @@ case class MBR(maxX:Double, minX:Double, maxY:Double, minY:Double){ * @param thetaXY blocks' granularity * @return true if the reference point is in the block */ - private[dataModel] + private[model] def referencePointFiltering(mbr:MBR, b:(Int, Int), thetaXY: (Double, Double)): Boolean ={ val (thetaX, thetaY) = thetaXY @@ -53,7 +53,7 @@ case class MBR(maxX:Double, minX:Double, maxY:Double, minY:Double){ * @param partition the examining partition * @return true if the reference point is in the block and in partition */ - private[dataModel] + private[model] def referencePointFiltering(mbr:MBR, b:(Int, Int), thetaXY: (Double, Double), partition: MBR): Boolean ={ val (thetaX, thetaY) = thetaXY @@ -75,7 +75,7 @@ case class MBR(maxX:Double, minX:Double, maxY:Double, minY:Double){ * @param relations requested relations * @return whether the relation is true */ - private[dataModel] + private[model] def testMBR(mbr:MBR, relations: Seq[Relation]): Boolean = relations.map { case Relation.CONTAINS | Relation.COVERS => @@ -98,7 +98,7 @@ case class MBR(maxX:Double, minX:Double, maxY:Double, minY:Double){ * @param mbr given mbr * @return whether it's true */ - private[dataModel] + private[model] def equals(mbr:MBR): Boolean = minX == mbr.minX && maxX == mbr.maxX && minY == mbr.minY && maxY == mbr.maxY @@ -107,14 +107,14 @@ case class MBR(maxX:Double, minX:Double, maxY:Double, minY:Double){ * @param mbr given mbr * @return whether it's true */ - private[dataModel] + private[model] def contains(mbr:MBR): Boolean = minX <= mbr.minX && maxX >= mbr.maxX && minY <= mbr.minY && maxY >= mbr.maxY - private[dataModel] + private[model] def contains(minX: Double, maxX: Double, minY: Double, maxY: Double): Boolean = minX <= minX && maxX >= maxX && minY <= minY && maxY >= maxY - private[dataModel] + private[model] def contains(c: (Double, Double)): Boolean = minX <= c._1 && maxX >= c._1 && minY <= c._2 && maxY >= c._2 @@ -123,7 +123,7 @@ case class MBR(maxX:Double, minX:Double, maxY:Double, minY:Double){ * @param mbr given mbr * @return whether it's true */ - private[dataModel] + private[model] def within(mbr: MBR):Boolean = mbr.contains(this) @@ -132,7 +132,7 @@ case class MBR(maxX:Double, minX:Double, maxY:Double, minY:Double){ * @param mbr given mbr * @return whether it's true */ - private[dataModel] + private[model] def touches(mbr: MBR): Boolean = maxX == mbr.maxX || minX == mbr.minX || maxY == mbr.maxY || minY == mbr.minY @@ -141,7 +141,7 @@ case class MBR(maxX:Double, minX:Double, maxY:Double, minY:Double){ * @param mbr given mbr * @return whether it's true */ - private[dataModel] + private[model] def intersects(mbr:MBR): Boolean = ! disjoint(mbr) @@ -150,7 +150,7 @@ case class MBR(maxX:Double, minX:Double, maxY:Double, minY:Double){ * @param mbr given mbr * @return whether it's true */ - private[dataModel] + private[model] def disjoint(mbr:MBR): Boolean = minX > mbr.maxX || maxX < mbr.minX || minY > mbr.maxY || maxY < mbr.minY diff --git a/src/main/scala/dataModel/SpatialIndex.scala b/src/main/scala/model/SpatialIndex.scala similarity index 98% rename from src/main/scala/dataModel/SpatialIndex.scala rename to src/main/scala/model/SpatialIndex.scala index b5d88612..cebf0e8c 100644 --- a/src/main/scala/dataModel/SpatialIndex.scala +++ b/src/main/scala/model/SpatialIndex.scala @@ -1,4 +1,4 @@ -package dataModel +package model import scala.collection.{Set, mutable} import scala.collection.mutable.ListBuffer diff --git a/src/main/scala/dataModel/SpatioTemporalEntity.scala b/src/main/scala/model/SpatioTemporalEntity.scala similarity index 98% rename from src/main/scala/dataModel/SpatioTemporalEntity.scala rename to src/main/scala/model/SpatioTemporalEntity.scala index 5b7c3252..7f37760e 100644 --- a/src/main/scala/dataModel/SpatioTemporalEntity.scala +++ b/src/main/scala/model/SpatioTemporalEntity.scala @@ -1,4 +1,4 @@ -package dataModel +package model import com.vividsolutions.jts.geom.Geometry import com.vividsolutions.jts.io.WKTReader diff --git a/src/main/scala/model/WeightedPair.scala b/src/main/scala/model/WeightedPair.scala new file mode 100644 index 00000000..2eeedd85 --- /dev/null +++ b/src/main/scala/model/WeightedPair.scala @@ -0,0 +1,48 @@ +package model + +case class WeightedPair(counter: Int, entityId1: Int, entityId2: Int, mainWeight: Float, secondaryWeight: Float) extends Serializable with Comparable[WeightedPair]{ + + var relatedMatches: Int = 0 + + /** + * Note: ID based comparison leads to violation of comparable contract + * as may lead to cases that A > B, B > C and C > A. This is because the ids + * indicate the index of each entity in the partitions array, if they are collected + * it may lead to violations. + * + * CompareTo will sort elements in a descendant order + * + * @param o a weighted pair + * @return 1 if o is greater, 0 if they are equal, -1 if o is lesser. + */ + override def compareTo(o: WeightedPair): Int = { + + if (entityId1 == o.entityId1 && entityId2 == o.entityId2) return 0 + + val test1 = o.getMainWeight - getMainWeight + if (0 < test1) return 1 + + if (test1 < 0) return -1 + + val test2 = o.getSecondaryWeight - getSecondaryWeight + if (0 < test2) return 1 + + if (test2 < 0) return -1 + + o.counter - counter + } + + /** + * Returns the weight between two geometries. Higher weights correspond to + * stronger likelihood of related entities. + * + * @return + */ + def getMainWeight: Float = mainWeight * (1 + relatedMatches) + + def getSecondaryWeight: Float = secondaryWeight * (1 + relatedMatches) + + def incrementRelatedMatches(): Unit = relatedMatches += 1 + + override def toString: String = s"E1 : $entityId1 E2 : $entityId2 main weight : $getMainWeight secondary weight : $getSecondaryWeight" +} diff --git a/src/main/scala/utils/ConfigurationParser.scala b/src/main/scala/utils/ConfigurationParser.scala index 1f937a2d..05c452ff 100644 --- a/src/main/scala/utils/ConfigurationParser.scala +++ b/src/main/scala/utils/ConfigurationParser.scala @@ -14,12 +14,9 @@ import utils.Constants.FileTypes.FileTypes import utils.Constants._ - /** * @author George Mandilaras < gmandi@di.uoa.gr > (National and Kapodistrian University of Athens) */ - - case class DatasetConfigurations(path: String, geometryField: String, realIdField: Option[String] = None, dateField: Option[String] = None, datePattern: Option[String] = None){ @@ -86,7 +83,12 @@ case class Configuration(source: DatasetConfigurations, target:DatasetConfigurat def getTheta: ThetaOption = ThetaOption.withName(configurations.getOrElse(YamlConfiguration.CONF_THETA_GRANULARITY, "avg")) - def getWeightingScheme: WeightingScheme = WeightingScheme.withName(configurations.getOrElse(YamlConfiguration.CONF_WEIGHTING_SCHM, "JS")) + def getMainWS: WeightingScheme = WeightingScheme.withName(configurations.getOrElse(YamlConfiguration.CONF_MAIN_WS, "JS")) + + def getSecondaryWS: Option[WeightingScheme] = configurations.get(YamlConfiguration.CONF_SECONDARY_WS) match { + case Some(ws) => Option(WeightingScheme.withName(ws)) + case None => None + } def getGridType: GridType = GridType.withName(configurations.getOrElse(YamlConfiguration.CONF_GRIDTYPE, "QUADTREE")) @@ -155,9 +157,9 @@ object ConfigurationParser { log.error(s"DS-JEDAI: Prioritization Algorithm \'$value\' is not supported") false } - case YamlConfiguration.CONF_WEIGHTING_SCHM => + case YamlConfiguration.CONF_MAIN_WS | YamlConfiguration.CONF_SECONDARY_WS=> if (! WeightingScheme.exists(value)) { - log.error(s"DS-JEDAI: Weighting algorithm \'$value\' is not supported") + log.error(s"DS-JEDAI: Weighting Scheme \'$value\' is not supported") false } case YamlConfiguration.CONF_GRIDTYPE=> diff --git a/src/main/scala/utils/Constants.scala b/src/main/scala/utils/Constants.scala index 26c1e586..f65a2b4c 100644 --- a/src/main/scala/utils/Constants.scala +++ b/src/main/scala/utils/Constants.scala @@ -69,12 +69,19 @@ object Constants { */ object WeightingScheme extends Enumeration { type WeightingScheme = Value - + // co-occurrence frequency val CF: Constants.WeightingScheme.Value = Value("CF") + // jaccard similarity val JS: Constants.WeightingScheme.Value = Value("JS") + + // Pearson's chi squared test val PEARSON_X2: Constants.WeightingScheme.Value = Value("PEARSON_X2") - val MBR_INTERSECTION: Constants.WeightingScheme.Value = Value("MBR_INTERSECTION") - val POINTS: Constants.WeightingScheme.Value = Value("POINTS") + + // minimum bounding rectangle overlap + val MBRO: Constants.WeightingScheme.Value = Value("MBRO") + + // inverse sum of points + val ISP: Constants.WeightingScheme.Value = Value("ISP") def exists(s: String): Boolean = values.exists(_.toString == s) } @@ -87,7 +94,8 @@ object Constants { val CONF_PARTITIONS = "partitions" val CONF_THETA_GRANULARITY = "thetaGranularity" val CONF_PROGRESSIVE_ALG = "progressiveAlgorithm" - val CONF_WEIGHTING_SCHM = "weightingScheme" + val CONF_MAIN_WS = "mainWS" + val CONF_SECONDARY_WS = "secondaryWS" val CONF_BUDGET = "budget" val CONF_GRIDTYPE = "gridType" } @@ -107,6 +115,7 @@ object Constants { object ProgressiveAlgorithm extends Enumeration { type ProgressiveAlgorithm = Value val PROGRESSIVE_GIANT: Constants.ProgressiveAlgorithm.Value = Value("PROGRESSIVE_GIANT") + val DYNAMIC_PROGRESSIVE_GIANT: Constants.ProgressiveAlgorithm.Value = Value("DYNAMIC_PROGRESSIVE_GIANT") val GEOMETRY_CENTRIC: Constants.ProgressiveAlgorithm.Value = Value("GEOMETRY_CENTRIC") val TOPK: Constants.ProgressiveAlgorithm.Value = Value("TOPK") val RECIPROCAL_TOPK: Constants.ProgressiveAlgorithm.Value = Value("RECIPROCAL_TOPK") diff --git a/src/main/scala/utils/SpaceStatsCounter.scala b/src/main/scala/utils/SpaceStatsCounter.scala deleted file mode 100644 index a2be58c3..00000000 --- a/src/main/scala/utils/SpaceStatsCounter.scala +++ /dev/null @@ -1,106 +0,0 @@ -package utils - -import dataModel.{Entity, IM, MBR, SpatialIndex} -import org.apache.log4j.{Level, LogManager} -import org.apache.spark.rdd.RDD -import org.apache.spark.{Partitioner, TaskContext} -import utils.Constants.Relation - -import scala.collection.mutable.ListBuffer - - -case class SpaceStatsCounter(joinedRDD: RDD[(Int, (Iterable[Entity], Iterable[Entity]))], thetaXY: (Double, Double)){ - - val partitionsZones: Array[MBR] = Utils.getZones - - def printSpaceInfo(): Unit ={ - val log = LogManager.getRootLogger - log.setLevel(Level.INFO) - - val source: RDD[Entity] = joinedRDD.flatMap(_._2._1.map(se => (se.originalID, se))).distinct().map(_._2).setName("Source").cache() - val target: RDD[Entity] = joinedRDD.flatMap(_._2._2.map(se => (se.originalID, se))).distinct().map(_._2).setName("target").cache() - - val sourceTiles: RDD[(Int, Int)] = source.flatMap(se => se.index(thetaXY)).setName("SourceTiles").cache() - val targetTiles: RDD[(Int, Int)] = target.flatMap(se => se.index(thetaXY)).setName("TargetTiles").cache() - - val ssePerTile: RDD[((Int, Int), Int)] = sourceTiles.map((_,1)).reduceByKey(_ + _) - val tsePerTile: RDD[((Int, Int), Int)] = targetTiles.map((_,1)).reduceByKey(_ + _) - - val commonTiles = ssePerTile.join(tsePerTile).setName("CommonTiles").cache() - val tiles = commonTiles.map{ case(_, (n1, n2)) => n2}.sum() - log.info("Tiles: " + tiles) - - val pairTiles = commonTiles.map{ case(c, (n1, n2)) => n1*n2}.sum() - log.info("Pairs Tiles: " + pairTiles) - - sourceTiles.unpersist() - targetTiles.unpersist() - commonTiles.unpersist() - - val tilesSE = source.flatMap(se => se.index(thetaXY).map(c => (c, ListBuffer(se.originalID)))).reduceByKey(_ ++ _) - val tilesTE = target.flatMap(se => se.index(thetaXY).map(c => (c, ListBuffer(se.originalID)))).reduceByKey(_ ++ _) - val joinedTiles = tilesSE.rightOuterJoin(tilesTE).filter(_._2._1.isDefined).setName("Joined").cache() - val uniquePairs = joinedTiles.flatMap{case(_, (sse, tse)) => tse.map(se => (se, sse.get))}.reduceByKey(_ ++ _).map(_._2.distinct.size).sum - log.info("Unique Tiles: " + uniquePairs) - - joinedTiles.unpersist() - source.unpersist() - target.unpersist() - - val comparisonsRDD: RDD[(Entity, Entity)] = joinedRDD - .filter(p => p._2._1.nonEmpty && p._2._2.nonEmpty) - .flatMap { p => - val source: Array[Entity] = p._2._1.toArray - val target: Iterator[Entity] = p._2._2.toIterator - val sourceIndex = index(source) - val filteringFunction = (b: (Int, Int)) => sourceIndex.contains(b) - val pid = p._1 - val partition = partitionsZones(pid) - - target.flatMap { targetSE => - targetSE - .index(thetaXY, filteringFunction) - .flatMap(c => sourceIndex.get(c).map(j => (c, source(j)))) - .filter{case(c, se) => se.referencePointFiltering(targetSE, c, thetaXY, Some(partition))} - .map {case(_, se) => (se, targetSE)} - } - }.setName("ComparisonsRDD") - - val intersectingTiles = comparisonsRDD.filter{ case (sSE, tSE) => sSE.testMBR(tSE, Relation.INTERSECTS, Relation.TOUCHES)} - val truePairs = comparisonsRDD.filter{ case (sSE, tSE) => sSE.testMBR(tSE, Relation.INTERSECTS, Relation.TOUCHES)}.filter{case (sSE, tSE) => IM(sSE, tSE).relate} - - log.info("Intersecting Pairs: " + intersectingTiles.count()) - log.info("True Pairs: " + truePairs.count()) - log.info("") - } - - /** - * index a list of spatial entities - * - * @param entities list of spatial entities - * @return a SpatialIndex - */ - def index(entities: Array[Entity]): SpatialIndex = { - val spatialIndex = new SpatialIndex() - entities.zipWithIndex.foreach { case (se, index) => - val indices: Seq[(Int, Int)] = se.index(thetaXY) - indices.foreach(i => spatialIndex.insert(i, index)) - } - spatialIndex - } - - -} -object SpaceStatsCounter{ - - def apply(source:RDD[Entity], target:RDD[Entity], partitioner: Partitioner): SpaceStatsCounter ={ - val thetaXY = Utils.getTheta - val sourcePartitions = source.map(se => (TaskContext.getPartitionId(), se)) - val targetPartitions = target.map(se => (TaskContext.getPartitionId(), se)) - - val joinedRDD = sourcePartitions.cogroup(targetPartitions, partitioner) - - SpaceStatsCounter(joinedRDD, thetaXY) - } -} - diff --git a/src/main/scala/utils/SparqlExecutor.scala b/src/main/scala/utils/SparqlExecutor.scala index 3771ec91..819f6afb 100644 --- a/src/main/scala/utils/SparqlExecutor.scala +++ b/src/main/scala/utils/SparqlExecutor.scala @@ -1,38 +1,42 @@ package utils import net.sansa_stack.query.spark.sparqlify.{QueryExecutionSpark, SparqlifyUtils3} -import org.apache.jena.graph.Triple -import org.apache.jena.query.QueryFactory +import net.sansa_stack.rdf.common.partition.core.{RdfPartition, RdfPartitioner, RdfPartitionerDefault} +import net.sansa_stack.rdf.spark.partition.core.RdfPartitionUtilsSpark +import org.apache.jena.query.{ARQ, QueryFactory} import org.apache.jena.sparql.core.Var -import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.functions._ +import scala.reflect.ClassTag +import org.apache.jena.graph.Triple +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.Row + import scala.collection.JavaConversions._ import scala.collection.JavaConversions.asScalaSet -import net.sansa_stack.rdf.spark.partition.core.RdfPartitionUtilsSpark - import scala.collection.mutable + object SparqlExecutor { //Renames columns to their expected names - def renameDfCols(mappings: Array[(String, mutable.Set[Var])], df:DataFrame) : DataFrame = { - mappings.foldLeft(df) { (memoDf:DataFrame, colMapping:(String, mutable.Set[Var])) => concatColumns(colMapping, memoDf)} + def renameDfCols(mappings: Array[(String, mutable.Set[Var])], df: DataFrame): DataFrame = { + mappings.foldLeft(df) { (memoDf: DataFrame, colMapping: (String, mutable.Set[Var])) => concatColumns(colMapping, memoDf) } } //Merges associated columns - def concatColumns(mapping:(String, mutable.Set[Var]), df: DataFrame) : DataFrame = { + def concatColumns(mapping: (String, mutable.Set[Var]), df: DataFrame): DataFrame = { val colArray = for {colName <- mapping._2} yield col(colName.getName) val colNameArray = for {colName <- mapping._2} yield colName.getName val colSeq = colArray.toSeq val colNameSeq = colNameArray.toSeq - df.withColumn(mapping._1, concat(colSeq:_*)) - .drop(colNameSeq:_*) + df.withColumn(mapping._1, concat(colSeq: _*)) + .drop(colNameSeq: _*) } - def query(spark: SparkSession, triples: RDD[Triple], sparqlQuery:String): DataFrame = { + def query(spark: SparkSession, triples: RDD[Triple], sparqlQuery: String): DataFrame = { val partitions = RdfPartitionUtilsSpark.partitionGraph(triples) val rewriter = SparqlifyUtils3.createSparqlSqlRewriter(spark, partitions) @@ -52,4 +56,4 @@ object SparqlExecutor { renamedDf } -} +} \ No newline at end of file diff --git a/src/main/scala/utils/SpatialReader.scala b/src/main/scala/utils/SpatialReader.scala deleted file mode 100644 index 44039163..00000000 --- a/src/main/scala/utils/SpatialReader.scala +++ /dev/null @@ -1,313 +0,0 @@ -package utils - -import dataModel.{Entity, MBR, SpatialEntity, SpatioTemporalEntity} -import com.vividsolutions.jts.geom.Geometry -import org.apache.jena.query.ARQ -import net.sansa_stack.rdf.spark.io._ -import org.apache.jena.riot.Lang -import org.apache.spark.rdd.RDD -import org.apache.spark.serializer.KryoSerializer -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.functions.col -import org.apache.spark.sql.types.StringType -import org.apache.spark.sql.functions.udf -import org.apache.spark.{HashPartitioner, SparkConf, SparkContext} -import org.datasyslab.geospark.enums.GridType -import org.datasyslab.geospark.formatMapper.GeoJsonReader -import org.datasyslab.geospark.formatMapper.shapefileParser.ShapefileReader -import org.datasyslab.geospark.serde.GeoSparkKryoRegistrator -import org.datasyslab.geospark.spatialPartitioning.SpatialPartitioner -import org.datasyslab.geospark.spatialRDD.SpatialRDD -import org.datasyslab.geosparksql.utils.{Adapter, GeoSparkSQLRegistrator} -import org.joda.time.DateTime -import org.joda.time.format.DateTimeFormat -import utils.Constants.FileTypes - -import scala.collection.JavaConverters._ -import scala.collection.mutable - -/** - * Spatial Reader loads input dataset into RDD[Entity] - * - * It's initialized based on the source dataset, which sets the partitioner, - * and the next datasets will be loaded using the same partitioner - * @param sourceDc source Dataset configuration - * @param partitions num of partitions - * @param gt grid algorithm of spatial partitioner - */ -case class SpatialReader(sourceDc: DatasetConfigurations, partitions: Int, gt: Constants.GridType.GridType = Constants.GridType.QUADTREE) { - - lazy val gridType: GridType = gt match { - case Constants.GridType.KDBTREE => GridType.KDBTREE - case _ => GridType.QUADTREE - } - - // spatial RDD of source - lazy val spatialRDD: SpatialRDD[Geometry] = loadSource(sourceDc) - - // spatial partitioner defined by the source spatial RDD - lazy val spatialPartitioner: SpatialPartitioner = { - spatialRDD.analyze() - if (partitions > 0) spatialRDD.spatialPartitioning(gridType, partitions) else spatialRDD.spatialPartitioning(gridType) - spatialRDD.getPartitioner - } - - // the final partitioner - because the transformation of SRDD into RDD does not preserve partitioning - // we partitioning using HashPartitioning with the spatial indexes as keys - lazy val partitioner = new HashPartitioner(spatialPartitioner.numPartitions) - - lazy val partitionsZones: Array[MBR] = - spatialPartitioner.getGrids.asScala.map(e => MBR(e.getMaxX, e.getMinX, e.getMaxY, e.getMinY)).toArray - - /** - * Employ the appropriate reader based the FileType - * @param dc dataset configuration - * @return a spatial RDD - */ - def loadSource(dc: DatasetConfigurations): SpatialRDD[Geometry] ={ - val extension = dc.getExtension - extension match { - case FileTypes.CSV => - loadCSV(dc.path, dc.realIdField.getOrElse("id"), dc.geometryField, dc.dateField, header = true ) - case FileTypes.TSV => - loadTSV(dc.path, dc.realIdField.getOrElse("id"), dc.geometryField, dc.dateField, header = true ) - case FileTypes.SHP => - loadSHP(dc.path, dc.realIdField.getOrElse("id"), dc.dateField) - case FileTypes.NTRIPLES => - loadRDF(dc.path, dc.geometryField, dc.dateField, Lang.NTRIPLES) - case FileTypes.TURTLE => - loadRDF(dc.path, dc.geometryField, dc.dateField, Lang.TURTLE) - case FileTypes.RDFXML => - loadRDF(dc.path, dc.geometryField, dc.dateField, Lang.RDFXML) - case FileTypes.RDFJSON => - loadRDF(dc.path, dc.geometryField, dc.dateField, Lang.RDFJSON) - case _ => - null - } - } - - def loadCSV(filepath: String, realIdField: String, geometryField: String, dateField: Option[String], header: Boolean):SpatialRDD[Geometry] = - loadDelimitedFile(filepath, realIdField, geometryField, dateField, ",", header) - - def loadTSV(filepath: String, realIdField: String, geometryField: String, dateField: Option[String], header: Boolean): SpatialRDD[Geometry] = - loadDelimitedFile(filepath, realIdField, geometryField, dateField, "\t", header) - - /** - * Loads a delimited file - * @param filepath path to the delimited text file - * @param realIdField instances' unique id - * @param geometryField geometry field - * @param dateField date field if exists - * @param delimiter delimiter - * @param header if first row contains the headers - * @return a spatial RDD - */ - def loadDelimitedFile(filepath: String, realIdField: String, geometryField: String, dateField: Option[String], delimiter: String, header: Boolean): SpatialRDD[Geometry] ={ - val conf = new SparkConf() - conf.set("spark.serializer", classOf[KryoSerializer].getName) - conf.set("spark.kryo.registrator", classOf[GeoSparkKryoRegistrator].getName) - val sc = SparkContext.getOrCreate(conf) - val spark = SparkSession.getActiveSession.get - - GeoSparkSQLRegistrator.registerAll(spark) - - var inputDF = spark.read.format("csv") - .option("delimiter", delimiter) - .option("quote", "\"") - .option("header", header) - .load(filepath) - .filter(col(realIdField).isNotNull) - .filter(col(geometryField).isNotNull) - .filter(! col(geometryField).contains("EMPTY")) - - var query = s"SELECT ST_GeomFromWKT(GEOMETRIES.$geometryField) AS WKT, GEOMETRIES.$realIdField AS REAL_ID FROM GEOMETRIES".stripMargin - - if (dateField.isDefined) { - inputDF = inputDF.filter(col(dateField.get).isNotNull) - query = s"SELECT ST_GeomFromWKT(GEOMETRIES.$geometryField) AS WKT, GEOMETRIES.$realIdField AS REAL_ID, GEOMETRIES.${dateField.get} AS DATE FROM GEOMETRIES".stripMargin - } - - inputDF.createOrReplaceTempView("GEOMETRIES") - - val spatialDF = spark.sql(query) - val srdd = new SpatialRDD[Geometry] - srdd.rawSpatialRDD = Adapter.toRdd(spatialDF) - srdd - } - - /** - * Loads an ESRI Shapefile - * @param filepath path to the SHP file - * @param realIdField instances' unique id - * @param dateField date field if exists - * @return a spatial RDD - */ - def loadSHP(filepath: String, realIdField: String, dateField: Option[String]): SpatialRDD[Geometry] ={ - val conf = new SparkConf() - conf.set("spark.serializer", classOf[KryoSerializer].getName) - conf.set("spark.kryo.registrator", classOf[GeoSparkKryoRegistrator].getName) - val sc = SparkContext.getOrCreate(conf) - - val parentFolder = filepath.substring(0, filepath.lastIndexOf("/")) - val srdd = ShapefileReader.readToGeometryRDD(sc, parentFolder) - adjustUserData(srdd, realIdField, dateField) - } - - - /** - * Loads a GeoJSON file - * @param filepath path to the SHP file - * @param realIdField instances' unique id - * @param dateField date field if exists - * @return a spatial RDD - */ - def loadGeoJSON(filepath: String, realIdField: String, dateField: Option[String]): SpatialRDD[Geometry] ={ - val conf = new SparkConf() - conf.set("spark.serializer", classOf[KryoSerializer].getName) - conf.set("spark.kryo.registrator", classOf[GeoSparkKryoRegistrator].getName) - val sc = SparkContext.getOrCreate(conf) - - val srdd = GeoJsonReader.readToGeometryRDD(sc, filepath) - adjustUserData(srdd, realIdField, dateField) - } - - /** - * Adjust users' data. - * Discard all properties except the id and the date if it's requested. - * @param srdd the input rdd - * @param realIdField the field of id - * @param dateField the field of data if it's given - * @return geometries with only the necessary user data - */ - def adjustUserData(srdd: SpatialRDD[Geometry], realIdField: String, dateField: Option[String]): SpatialRDD[Geometry]={ - val idIndex = srdd.fieldNames.indexOf(realIdField) - val rddWithUserData: RDD[Geometry] = dateField match { - case Some(dateField) => - val dateIndex = srdd.fieldNames.indexOf(dateField) - srdd.rawSpatialRDD.rdd.map { g => - val userData = g.getUserData.toString.split("\t") - val id = userData(idIndex) - val date = userData(dateIndex) - g.setUserData(id + '\t' + date) - g - } - case _ => - srdd.rawSpatialRDD.rdd.map{ g => - val userData = g.getUserData.toString.split("\t") - val id = userData(idIndex) - g.setUserData(id) - g - } - } - srdd.setRawSpatialRDD(rddWithUserData) - - // filter records with valid geometries and ids - srdd.setRawSpatialRDD(srdd.rawSpatialRDD.rdd.filter(g => ! (g.isEmpty || g == null || g.getUserData.toString == ""))) - srdd - } - - - /** - * Loads RDF dataset into Spatial RDD - First loads the dataset into - * RDD[Triples] and then using a SPARQL Select query, extract the necessary - * fields. - * - * @param filepath path to the RDF file - * @param geometryPredicate the predicate of the geometry - * @param datePredicate date predicate if exists - * @param lang the RDF format (i.e. NTRIPLES, TURTLE, etc.) - * @return a spatial RDD - */ - def loadRDF(filepath: String, geometryPredicate: String, datePredicate: Option[String], lang: Lang) : SpatialRDD[Geometry] ={ - val conf = new SparkConf() - conf.set("spark.serializer", classOf[KryoSerializer].getName) - conf.set("spark.kryo.registrator", classOf[GeoSparkKryoRegistrator].getName) - val sc = SparkContext.getOrCreate(conf) - val spark = SparkSession.getActiveSession.get - GeoSparkSQLRegistrator.registerAll(spark) - ARQ.init() - - val asWKT = "http://www.opengis.net/ont/geosparql#asWKT" - val allowedPredicates: mutable.Set[String] = mutable.Set(asWKT) - var sparqlQuery = s"SELECT ?Subject ?WKT WHERE { ?Subject $geometryPredicate ?g. ?g <$asWKT> ?WKT.}" - var query = "SELECT ST_GeomFromWKT(GEOMETRIES.WKT), GEOMETRIES.Subject FROM GEOMETRIES".stripMargin - - val cleanGeomPredicate: String = - if (geometryPredicate.head == '<' && geometryPredicate.last == '>') - geometryPredicate.substring(1, geometryPredicate.length-1) - else geometryPredicate - - allowedPredicates.add(cleanGeomPredicate) - - if(datePredicate.isDefined){ - val datePredicateValue = datePredicate.get - val cleanDatePredicate: String = if (datePredicateValue.head == '<' && datePredicateValue.last == '>') - datePredicateValue.substring(1, datePredicateValue.length-1) - else datePredicateValue - allowedPredicates.add(cleanDatePredicate) - sparqlQuery = s"SELECT ?Subject ?WKT ?Date WHERE { ?Subject ${datePredicate.get} ?Date. ?Subject $geometryPredicate ?g. ?g <$asWKT> ?WKT.}" - query = "SELECT ST_GeomFromWKT(GEOMETRIES.WKT), GEOMETRIES.Subject, GEOMETRIES.Date FROM GEOMETRIES".stripMargin - } - - val triplesRDD = spark.rdf(lang)(filepath).filter(t => allowedPredicates.contains(t.getPredicate.getURI)) - var df = SparqlExecutor.query(spark, triplesRDD, sparqlQuery) - - val cleanWKT = udf( (wkt: String) => wkt.replaceAll("<\\S+>\\s?", ""), StringType) - df = df.withColumn("WKT", cleanWKT(df.col("WKT"))) - .filter(col("WKT").isNotNull) - .filter(col("WKT").isNotNull) - .filter(! col("WKT").contains("EMPTY")) - - df.createOrReplaceTempView("GEOMETRIES") - - val spatialDF = spark.sql(query) - val srdd = new SpatialRDD[Geometry] - srdd.rawSpatialRDD = Adapter.toRdd(spatialDF) - srdd - } - - /** - * Loads a dataset into Spatial Partitioned RDD. The partitioner - * is defined by the first dataset (i.e. the source dataset) - * - * @param dc dataset configuration - * @return a spatial partitioned rdd - */ - def load(dc: DatasetConfigurations = sourceDc): RDD[(Int, Entity)] = { - val srdd = if (dc == sourceDc) spatialRDD else loadSource(dc) - val sp = SparkContext.getOrCreate().broadcast(spatialPartitioner) - - val withTemporal = dc.dateField.isDefined - - // remove empty, invalid geometries and geometry collections - val filteredGeometriesRDD = srdd.rawSpatialRDD.rdd - .map{ geom => - val userdata = geom.getUserData.asInstanceOf[String].split("\t") - (geom, userdata) - } - .filter{case (g, _) => !g.isEmpty && g.isValid && g.getGeometryType != "GeometryCollection"} - - // create Spatial or SpatioTemporal entities - val entitiesRDD: RDD[Entity] = - if(!withTemporal) - filteredGeometriesRDD.map{ case (geom, userdata) => SpatialEntity(userdata(0), geom)} - else - filteredGeometriesRDD.mapPartitions{ geomIterator => - val pattern = dc.datePattern.get - val formatter = DateTimeFormat.forPattern(pattern) - geomIterator.map{ - case (geom, userdata) => - val realID = userdata(0) - val dateStr = userdata(1) - val date: DateTime = formatter.parseDateTime(dateStr) - val dateStr_ = date.toString(Constants.defaultDatePattern) - SpatioTemporalEntity(realID, geom, dateStr_) - } - } - // redistribute based on spatial partitioner - entitiesRDD - .flatMap(se => sp.value.placeObject(se.geometry).asScala.map(i => (i._1.toInt, se))) - .partitionBy(partitioner) - } - -} diff --git a/src/main/scala/utils/Utils.scala b/src/main/scala/utils/Utils.scala index e70e7204..d83eb012 100644 --- a/src/main/scala/utils/Utils.scala +++ b/src/main/scala/utils/Utils.scala @@ -1,43 +1,41 @@ package utils -import dataModel.{Entity, MBR} +import model.{Entity, MBR} import com.vividsolutions.jts.geom.Geometry -import org.apache.commons.math3.stat.inference.ChiSquareTest import org.apache.log4j.{LogManager, Logger} import org.apache.spark.TaskContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.sql.{Encoder, Encoders, Row, SparkSession} -import utils.Constants.{ThetaOption, WeightingScheme} +import utils.Constants.ThetaOption import utils.Constants.ThetaOption.ThetaOption -import utils.Constants.WeightingScheme.WeightingScheme import scala.collection.mutable -import scala.math.{ceil, floor, max, min} import scala.reflect.ClassTag /** * @author George Mandilaras < gmandi@di.uoa.gr > (National and Kapodistrian University of Athens) */ -object Utils { +object Utils extends Serializable { val spark: SparkSession = SparkSession.builder().getOrCreate() var thetaOption: ThetaOption = _ var source: RDD[MBR] = spark.sparkContext.emptyRDD - var partitionsZones: Array[MBR] = _ + var partitionsZones: Array[MBR] = Array() lazy val sourceCount: Long = source.count() lazy val thetaXY: (Double, Double) = initTheta() + def apply(sourceRDD: RDD[MBR], thetaOpt: ThetaOption = Constants.ThetaOption.AVG, pz: Array[MBR]=Array()): Unit ={ source = sourceRDD - source.cache() thetaOption = thetaOpt partitionsZones = pz } + def getTheta: (Double, Double) = thetaXY def getSourceCount: Long = sourceCount @@ -46,13 +44,12 @@ object Utils { implicit def singleInt[A](implicit c: ClassTag[Int]): Encoder[Int] = Encoders.scalaInt implicit def tuple[String, Int](implicit e1: Encoder[String], e2: Encoder[Int]): Encoder[(String,Int)] = Encoders.tuple[String,Int](e1, e2) + lazy val globalMinX: Double = partitionsZones.map(p => p.minX / thetaXY._1).min lazy val globalMaxX: Double = partitionsZones.map(p => p.maxX / thetaXY._1).max lazy val globalMinY: Double = partitionsZones.map(p => p.minY / thetaXY._2).min lazy val globalMaxY: Double = partitionsZones.map(p => p.maxY / thetaXY._2).max - lazy val totalBlocks: Double = (globalMaxX - globalMinX + 1) * (globalMaxY - globalMinY + 1) - /** * initialize theta based on theta granularity @@ -83,46 +80,9 @@ object Utils { case _ => (1d, 1d) } - source.unpersist() (tx, ty) } - /** - * Weight a comparison - * TODO: ensure that float does not produce issues - * - * @param e1 Spatial entity - * @param e2 Spatial entity - * @return weight - */ - def getWeight(e1: Entity, e2: Entity, ws: WeightingScheme): Float = { - val e1Blocks = (ceil(e1.mbr.maxX/thetaXY._1).toInt - floor(e1.mbr.minX/thetaXY._1).toInt + 1) * (ceil(e1.mbr.maxY/thetaXY._2).toInt - floor(e1.mbr.minY/thetaXY._2).toInt + 1) - val e2Blocks = (ceil(e2.mbr.maxX/thetaXY._1).toInt - floor(e2.mbr.minX/thetaXY._1).toInt + 1) * (ceil(e2.mbr.maxY/thetaXY._2).toInt - floor(e2.mbr.minY/thetaXY._2).toInt + 1) - val cb = (min(ceil(e1.mbr.maxX/thetaXY._1), ceil(e2.mbr.maxX/thetaXY._1)).toInt - max(floor(e1.mbr.minX/thetaXY._1), floor(e2.mbr.minX/thetaXY._1)).toInt + 1) * - (min(ceil(e1.mbr.maxY/thetaXY._2), ceil(e2.mbr.maxY/thetaXY._2)).toInt - max(floor(e1.mbr.minY/thetaXY._2), floor(e2.mbr.minY/thetaXY._2)).toInt + 1) - - ws match { - case WeightingScheme.MBR_INTERSECTION => - val intersectionArea = e1.mbr.getIntersectingMBR(e2.mbr).getArea - intersectionArea / (e1.mbr.getArea + e2.mbr.getArea - intersectionArea) - - case WeightingScheme.POINTS => - 1f / (e1.geometry.getNumPoints + e2.geometry.getNumPoints); - - case WeightingScheme.JS => - cb / (e1Blocks + e2Blocks - cb) - - case WeightingScheme.PEARSON_X2 => - val v1: Array[Long] = Array[Long](cb, (e2Blocks - cb).toLong) - val v2: Array[Long] = Array[Long]((e1Blocks - cb).toLong, (totalBlocks - (v1(0) + v1(1) + (e1Blocks - cb))).toLong) - val chiTest = new ChiSquareTest() - chiTest.chiSquare(Array(v1, v2)).toFloat - - case WeightingScheme.CF | _ => - cb.toFloat - } - } - def getZones: Array[MBR] ={ val (thetaX, thetaY) = thetaXY @@ -166,14 +126,13 @@ object Utils { log.info("Unique blocks: " + pSet.size) } - def export(rdd: RDD[Entity], path:String): Unit ={ + def export(rdd: RDD[(String, String)], path:String): Unit ={ val schema = StructType( - StructField("id", IntegerType, nullable = true) :: - StructField("wkt", StringType, nullable = true) :: Nil + StructField("id1", StringType, nullable = true) :: + StructField("id2", StringType, nullable = true) :: Nil ) - val rowRDD: RDD[Row] = rdd.map(s => new GenericRowWithSchema(Array(TaskContext.getPartitionId(), s.geometry.toText), schema)) + val rowRDD: RDD[Row] = rdd.map(s => new GenericRowWithSchema(Array(s._1, s._2), schema)) val df = spark.createDataFrame(rowRDD, schema) df.write.option("header", "true").csv(path) } - } \ No newline at end of file diff --git a/src/main/scala/utils/readers/CSVReader.scala b/src/main/scala/utils/readers/CSVReader.scala new file mode 100644 index 00000000..b12b4922 --- /dev/null +++ b/src/main/scala/utils/readers/CSVReader.scala @@ -0,0 +1,68 @@ +package utils.readers + +import com.vividsolutions.jts.geom.Geometry +import org.apache.spark.serializer.KryoSerializer +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.functions.col +import org.datasyslab.geospark.serde.GeoSparkKryoRegistrator +import org.datasyslab.geospark.spatialRDD.SpatialRDD +import org.datasyslab.geosparksql.utils.{Adapter, GeoSparkSQLRegistrator} +import utils.Constants.FileTypes +import utils.DatasetConfigurations + +object CSVReader { + + def extract(dc: DatasetConfigurations): SpatialRDD[Geometry] = { + val extension = dc.getExtension + extension match { + case FileTypes.CSV => + loadDelimitedFile(dc.path, dc.realIdField.getOrElse("id"), dc.geometryField, dc.dateField, ",", header = true) + case FileTypes.TSV => + loadDelimitedFile(dc.path, dc.realIdField.getOrElse("id"), dc.geometryField, dc.dateField, "\t", header = true) + } + } + + /** + * Loads a delimited file + * @param filepath path to the delimited text file + * @param realIdField instances' unique id + * @param geometryField geometry field + * @param dateField date field if exists + * @param delimiter delimiter + * @param header if first row contains the headers + * @return a spatial RDD + */ + def loadDelimitedFile(filepath: String, realIdField: String, geometryField: String, dateField: Option[String], delimiter: String, header: Boolean): SpatialRDD[Geometry] ={ + val conf = new SparkConf() + conf.set("spark.serializer", classOf[KryoSerializer].getName) + conf.set("spark.kryo.registrator", classOf[GeoSparkKryoRegistrator].getName) + val sc = SparkContext.getOrCreate(conf) + val spark = SparkSession.getActiveSession.get + + GeoSparkSQLRegistrator.registerAll(spark) + + var inputDF = spark.read.format("csv") + .option("delimiter", delimiter) + .option("quote", "\"") + .option("header", header) + .load(filepath) + .filter(col(realIdField).isNotNull) + .filter(col(geometryField).isNotNull) + .filter(! col(geometryField).contains("EMPTY")) + + var query = s"SELECT ST_GeomFromWKT(GEOMETRIES.$geometryField) AS WKT, GEOMETRIES.$realIdField AS REAL_ID FROM GEOMETRIES".stripMargin + + if (dateField.isDefined) { + inputDF = inputDF.filter(col(dateField.get).isNotNull) + query = s"SELECT ST_GeomFromWKT(GEOMETRIES.$geometryField) AS WKT, GEOMETRIES.$realIdField AS REAL_ID, GEOMETRIES.${dateField.get} AS DATE FROM GEOMETRIES".stripMargin + } + + inputDF.createOrReplaceTempView("GEOMETRIES") + + val spatialDF = spark.sql(query) + val srdd = new SpatialRDD[Geometry] + srdd.rawSpatialRDD = Adapter.toRdd(spatialDF) + srdd + } +} diff --git a/src/main/scala/utils/readers/GeospatialReader.scala b/src/main/scala/utils/readers/GeospatialReader.scala new file mode 100644 index 00000000..096f3dce --- /dev/null +++ b/src/main/scala/utils/readers/GeospatialReader.scala @@ -0,0 +1,97 @@ +package utils.readers + +import com.vividsolutions.jts.geom.Geometry +import org.apache.spark.rdd.RDD +import org.apache.spark.serializer.KryoSerializer +import org.apache.spark.{SparkConf, SparkContext} +import org.datasyslab.geospark.formatMapper.GeoJsonReader +import org.datasyslab.geospark.formatMapper.shapefileParser.ShapefileReader +import org.datasyslab.geospark.serde.GeoSparkKryoRegistrator +import org.datasyslab.geospark.spatialRDD.SpatialRDD +import utils.Constants.FileTypes +import utils.DatasetConfigurations + +object GeospatialReader { + + def extract(dc: DatasetConfigurations): SpatialRDD[Geometry] = { + val extension = dc.getExtension + extension match { + case FileTypes.GEOJSON => + loadGeoJSON(dc.path, dc.realIdField.getOrElse("id"), dc.dateField) + case FileTypes.SHP => + loadSHP(dc.path, dc.realIdField.getOrElse("id"), dc.dateField) + } + } + + /** + * Loads an ESRI Shapefile + * @param filepath path to the SHP file + * @param realIdField instances' unique id + * @param dateField date field if exists + * @return a spatial RDD + */ + def loadSHP(filepath: String, realIdField: String, dateField: Option[String]): SpatialRDD[Geometry] ={ + val conf = new SparkConf() + conf.set("spark.serializer", classOf[KryoSerializer].getName) + conf.set("spark.kryo.registrator", classOf[GeoSparkKryoRegistrator].getName) + val sc = SparkContext.getOrCreate(conf) + + val parentFolder = filepath.substring(0, filepath.lastIndexOf("/")) + val srdd = ShapefileReader.readToGeometryRDD(sc, parentFolder) + adjustUserData(srdd, realIdField, dateField) + } + + + /** + * Loads a GeoJSON file + * @param filepath path to the SHP file + * @param realIdField instances' unique id + * @param dateField date field if exists + * @return a spatial RDD + */ + def loadGeoJSON(filepath: String, realIdField: String, dateField: Option[String]): SpatialRDD[Geometry] ={ + val conf = new SparkConf() + conf.set("spark.serializer", classOf[KryoSerializer].getName) + conf.set("spark.kryo.registrator", classOf[GeoSparkKryoRegistrator].getName) + val sc = SparkContext.getOrCreate(conf) + + val srdd = GeoJsonReader.readToGeometryRDD(sc, filepath) + adjustUserData(srdd, realIdField, dateField) + } + + /** + * Adjust users' data. + * Discard all properties except the id and the date if it's requested. + * @param srdd the input rdd + * @param realIdField the field of id + * @param dateField the field of data if it's given + * @return geometries with only the necessary user data + */ + def adjustUserData(srdd: SpatialRDD[Geometry], realIdField: String, dateField: Option[String]): SpatialRDD[Geometry]={ + val idIndex = srdd.fieldNames.indexOf(realIdField) + val rddWithUserData: RDD[Geometry] = dateField match { + case Some(dateField) => + val dateIndex = srdd.fieldNames.indexOf(dateField) + srdd.rawSpatialRDD.rdd.map { g => + val userData = g.getUserData.toString.split("\t") + val id = userData(idIndex) + val date = userData(dateIndex) + g.setUserData(id + '\t' + date) + g + } + case _ => + srdd.rawSpatialRDD.rdd.map{ g => + val userData = g.getUserData.toString.split("\t") + val id = userData(idIndex) + g.setUserData(id) + g + } + } + srdd.setRawSpatialRDD(rddWithUserData) + + // filter records with valid geometries and ids + srdd.setRawSpatialRDD(srdd.rawSpatialRDD.rdd.filter(g => ! (g.isEmpty || g == null || g.getUserData.toString == ""))) + srdd + } + +} diff --git a/src/main/scala/utils/readers/RDFGraphReader.scala b/src/main/scala/utils/readers/RDFGraphReader.scala new file mode 100644 index 00000000..09e8fe6e --- /dev/null +++ b/src/main/scala/utils/readers/RDFGraphReader.scala @@ -0,0 +1,112 @@ +package utils.readers + +import com.vividsolutions.jts.geom.Geometry +import org.apache.jena.query.ARQ +import org.apache.jena.riot.Lang +import org.apache.spark.serializer.KryoSerializer +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.functions.{col, udf} +import org.apache.spark.sql.types.{StringType, StructField, StructType} +import org.datasyslab.geospark.serde.GeoSparkKryoRegistrator +import org.datasyslab.geospark.spatialRDD.SpatialRDD +import org.datasyslab.geosparksql.utils.{Adapter, GeoSparkSQLRegistrator} +import utils.{DatasetConfigurations, SparqlExecutor} +import net.sansa_stack.rdf.spark.io._ +import org.apache.spark.rdd.RDD +import utils.Constants.FileTypes + +import scala.collection.mutable + +object RDFGraphReader { + + def extract(dc: DatasetConfigurations): SpatialRDD[Geometry] = { + val extension = dc.getExtension + val lang: Lang = extension match { + case FileTypes.NTRIPLES => Lang.NTRIPLES + case FileTypes.TURTLE => Lang.TURTLE + case FileTypes.RDFXML => Lang.RDFXML + case FileTypes.RDFJSON => Lang.RDFJSON + case _ => Lang.NTRIPLES + } +// loadRDF(dc.path, dc.geometryField, dc.dateField, lang) + loadRdfAsTextual(dc.path, dc.geometryField) + } + + def loadRdfAsTextual(filepath: String, geometryPredicate: String): SpatialRDD[Geometry] = { + val conf = new SparkConf() + conf.set("spark.serializer", classOf[KryoSerializer].getName) + conf.set("spark.kryo.registrator", classOf[GeoSparkKryoRegistrator].getName) + val sc = SparkContext.getOrCreate(conf) + val spark = SparkSession.getActiveSession.get + GeoSparkSQLRegistrator.registerAll(spark) + + val cleanWKT = (wkt: String) => wkt.replaceAll("<\\S+>\\s?", "").replaceAll("\"", "") + val rowRDD: RDD[Row] = spark.read.textFile(filepath) + .rdd.map(s => s.split(" ", 3)) + .filter(s => s(1) == geometryPredicate) + .map(s => (s(0), cleanWKT(s(2)))) + .filter(s => s._1 != null && s._2 != null && !s._2.isEmpty) + .filter(s => !s._2.contains("EMPTY")) + .map(s => Row(s._1, s._2)) + + val schema = new StructType() + .add(StructField("Subject", StringType, nullable = true)) + .add(StructField("WKT", StringType, nullable = true)) + + val df = spark.createDataFrame(rowRDD, schema) + df.createOrReplaceTempView("GEOMETRIES") + val query = "SELECT ST_GeomFromWKT(GEOMETRIES.WKT), GEOMETRIES.Subject FROM GEOMETRIES".stripMargin + + val spatialDF = spark.sql(query) + val srdd = new SpatialRDD[Geometry] + srdd.rawSpatialRDD = Adapter.toRdd(spatialDF) + srdd + } + + def loadRDF(filepath: String, geometryPredicate: String, datePredicate: Option[String], lang: Lang): SpatialRDD[Geometry] = { + val conf = new SparkConf() + conf.set("spark.serializer", classOf[KryoSerializer].getName) + conf.set("spark.kryo.registrator", classOf[GeoSparkKryoRegistrator].getName) + val sc = SparkContext.getOrCreate(conf) + val spark = SparkSession.getActiveSession.get + GeoSparkSQLRegistrator.registerAll(spark) + + val allowedPredicates: mutable.Set[String] = mutable.Set() + var sparqlQuery = s"SELECT ?Subject ?WKT WHERE { ?Subject $geometryPredicate ?WKT.}" + var query = "SELECT ST_GeomFromWKT(GEOMETRIES.WKT), GEOMETRIES.Subject FROM GEOMETRIES".stripMargin + + val cleanGeomPredicate: String = + if (geometryPredicate.head == '<' && geometryPredicate.last == '>') + geometryPredicate.substring(1, geometryPredicate.length - 1) + else geometryPredicate + + allowedPredicates.add(cleanGeomPredicate) + + if (datePredicate.isDefined) { + val datePredicateValue = datePredicate.get + val cleanDatePredicate: String = if (datePredicateValue.head == '<' && datePredicateValue.last == '>') + datePredicateValue.substring(1, datePredicateValue.length - 1) + else datePredicateValue + allowedPredicates.add(cleanDatePredicate) + sparqlQuery = s"SELECT ?Subject ?WKT ?Date WHERE { ?Subject ${datePredicate.get} ?Date. ?Subject $geometryPredicate ?WKT.}" + query = "SELECT ST_GeomFromWKT(GEOMETRIES.WKT), GEOMETRIES.Subject, GEOMETRIES.Date FROM GEOMETRIES".stripMargin + } + + ARQ.init() + val triplesRDD = spark.rdf(lang)(filepath).filter(t => allowedPredicates.contains(t.getPredicate.getURI)) + var df = SparqlExecutor.query(spark, triplesRDD, sparqlQuery) + + val cleanWKT = udf((wkt: String) => wkt.replaceAll("<\\S+>\\s?", ""), StringType) + df = df.withColumn("WKT", cleanWKT(df.col("WKT"))) + .filter(col("WKT").isNotNull) + .filter(!col("WKT").contains("EMPTY")) + + df.createOrReplaceTempView("GEOMETRIES") + + val spatialDF = spark.sql(query) + val srdd = new SpatialRDD[Geometry] + srdd.rawSpatialRDD = Adapter.toRdd(spatialDF) + srdd + } +} diff --git a/src/main/scala/utils/readers/Reader.scala b/src/main/scala/utils/readers/Reader.scala new file mode 100644 index 00000000..388fee24 --- /dev/null +++ b/src/main/scala/utils/readers/Reader.scala @@ -0,0 +1,122 @@ +package utils.readers + +import com.vividsolutions.jts.geom.Geometry +import model.{Entity, MBR, SpatialEntity, SpatioTemporalEntity} +import org.apache.spark.HashPartitioner +import org.apache.spark.rdd.RDD +import org.datasyslab.geospark.enums.GridType +import org.datasyslab.geospark.spatialPartitioning.SpatialPartitioner +import org.datasyslab.geospark.spatialRDD.SpatialRDD +import org.joda.time.DateTime +import org.joda.time.format.DateTimeFormat +import utils.Constants.FileTypes +import utils.{Constants, DatasetConfigurations} + +import scala.collection.JavaConverters._ + +case class Reader(partitions: Int, gt: Constants.GridType.GridType, printStats: Boolean = false) { + + var counter: Long = 0 + + /** + * The transformation of an SRDD into RDD does not preserve partitioning. + * Hence we use a spatial partitioner to spatially index the geometries and then + * we partition using a HashPartitioner and the spatial indexes as the partition keys + */ + var spatialPartitioner: SpatialPartitioner = _ + var partitioner: HashPartitioner = _ + lazy val partitionsZones: Array[MBR] = spatialPartitioner.getGrids.asScala.map(e => MBR(e.getMaxX, e.getMinX, e.getMaxY, e.getMinY)).toArray + + val gridType: GridType = gt match { + case Constants.GridType.KDBTREE => GridType.KDBTREE + case _ => GridType.QUADTREE + } + + /** + * Extract the geometries from the input configurations + * As a side-effect, it also counts the geometries, if requested + * @param dc dataset configuration + * @return the geometries as a SpatialRDD + */ + def extract(dc: DatasetConfigurations) : SpatialRDD[Geometry] = { + val extension = dc.getExtension + val rdd = extension match { + case FileTypes.CSV | FileTypes.TSV => CSVReader.extract(dc) + case FileTypes.SHP | FileTypes.GEOJSON => GeospatialReader.extract(dc) + case FileTypes.NTRIPLES | FileTypes.TURTLE | FileTypes.RDFXML | FileTypes.RDFJSON => RDFGraphReader.extract(dc) + } + if (printStats) counter = rdd.rawSpatialRDD.count() + rdd + } + + /** + * Load source dataset, the dataset which will initialize the partitioners + * @param dc dc dataset configuration + * @return an RDD of pairs of partition index and entities + */ + def loadSource(dc: DatasetConfigurations): RDD[(Int, Entity)] ={ + val sourceRDD = extract(dc) + sourceRDD.analyze() + if (partitions > 0) + sourceRDD.spatialPartitioning(gridType, partitions) + else + sourceRDD.spatialPartitioning(gridType) + spatialPartitioner = sourceRDD.getPartitioner + partitioner = new HashPartitioner(spatialPartitioner.numPartitions) + distribute(sourceRDD, dc) + } + + /** + * Load the input dataset. If the loadSource has not been called, it will result to + * a NullPointerException. + * @param dc dc dataset configuration + * @return an RDD of pairs of partition index and entities + */ + def load(dc: DatasetConfigurations): Either[java.lang.Throwable, RDD[(Int, Entity)]] ={ + val rdd = extract(dc) + try { + Right(distribute(rdd, dc)) + } catch { + case ex: Throwable =>Left(ex) + } + } + + /** + * Loads a dataset into Spatial Partitioned RDD. The partitioner + * is defined by the first dataset (i.e. the source dataset) + * @param dc dataset configuration + * @return a spatial partitioned rdd + */ + def distribute(srdd: SpatialRDD[Geometry], dc: DatasetConfigurations): RDD[(Int, Entity)] = { + val withTemporal = dc.dateField.isDefined + // remove empty, invalid geometries and geometry collections + val filteredGeometriesRDD = srdd.rawSpatialRDD.rdd + .map{ geom => + val userdata = geom.getUserData.asInstanceOf[String].split("\t") + (geom, userdata) + } + .filter{case (g, _) => !g.isEmpty && g.isValid && g.getGeometryType != "GeometryCollection"} + + // create Spatial or SpatioTemporal entities + val entitiesRDD: RDD[Entity] = + if(!withTemporal) + filteredGeometriesRDD.map{ case (geom, userdata) => SpatialEntity(userdata(0), geom)} + else + filteredGeometriesRDD.mapPartitions{ geomIterator => + val pattern = dc.datePattern.get + val formatter = DateTimeFormat.forPattern(pattern) + geomIterator.map{ + case (geom, userdata) => + val realID = userdata(0) + val dateStr = userdata(1) + val date: DateTime = formatter.parseDateTime(dateStr) + val dateStr_ = date.toString(Constants.defaultDatePattern) + SpatioTemporalEntity(realID, geom, dateStr_) + } + } + // redistribute based on spatial index + entitiesRDD + .flatMap(se => spatialPartitioner.placeObject(se.geometry).asScala.map(i => (i._1.toInt, se))) + .partitionBy(partitioner) + } +} \ No newline at end of file