Skip to content

Commit

Permalink
Merge pull request #2 from GiorgosMandi/new-methods
Browse files Browse the repository at this point in the history
merging new methonds
  • Loading branch information
GiorgosMandi authored Mar 24, 2021
2 parents 668c705 + 52e0e12 commit ef33ece
Show file tree
Hide file tree
Showing 42 changed files with 1,580 additions and 1,184 deletions.
28 changes: 19 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# DS-JedAI

DS-JedAI (Distributed Spatial JedAI) is a system for Holistic Geospatial Interlinking for big geospatial data.
In Holistic Geospatial Interlinking we aim to discover all the topological relations between two geospatial datasets,
In Holistic Geospatial Interlinking, we aim to discover all the topological relations between the geometries of two geospatial datasets,
using the [DE-9IM](https://en.wikipedia.org/wiki/DE-9IM) topological model. DS-JedAI offers a novel batch algorithm for
geospatial interlinking and several algorithms for progressive Geospatial Interlinking. All the algorithms have been
parallelized based on the MapReduce Framework.
Expand All @@ -12,7 +12,7 @@ These algorithms take as input a budget (*BU*) that indicates the total number o
and a weighting scheme *W* that quantify the probability of two geometries to relate. Furthermore, DS-JedAI allows
temporal filtering in order to detect pairs that coincide not only spatially but also temporally.

DS-JedAI in implemented on top of Apache Spark and can run in any distributed or standalone environment that
DS-JedAI in implemented on top of **Apache Spark** and can run in any distributed or standalone environment that
supports the execution of Apache Spark jobs. Currently, supports most of the RDF formats (i.e., N­Triples, Turtle,
RDF/JSON and RDF/XML), as well as CSV, TSV, GeoJSON and ESRI shapefiles.

Expand Down Expand Up @@ -43,19 +43,23 @@ to relate. The idea is, that not all the verifications will be performed, but us
Hence, the progressive algorithms prioritize the *BU* prominent verifications.
The implemented algorithms are the following:

- **Progressive Giant**: Implements GIA.nt but prioritizes the BU most promising geometry pairs.
- **Progressive GIA.nt**: Implements GIA.nt but prioritizes the BU most promising geometry pairs.
- **Dynamic Progressive GIA.nt**: Extends Progressive GIA.nt by boosting the weight of the pairs that are associated with qualifying pairs. So the Priority Queue,
that stores the geometry pairs, dynamically changes during the verifications.
- **Geometry Top-k** : For each geometry, finds and verifies its top-k.
- **Geometry Reciprocal Top-k**: Verifies only the pairs *(s, t)* that *s* belongs to the top-k of *t* and *t* belongs to the top-k of *s*.
- **Geometry-Centric**: The prioritization of *(s, t)* is based on the mean weight of all the pairs that *t* is part of.
- **RandomScheduling**: Implements random prioritization.

Currently, the supported weighting schemes are:

- Co-occurrence Frequencies (CF)
- Jaccard Similarity (JS)
- Pearson's chi square test (PEARSON_x2)
- MBR INTERSECTION
- GEOMETRY POINTS
- Minimum Bounding Rectangle Overlap (MBRO)
- Inverse Sum of Points (ISP)

The algorithms also supports composite schemes, where we combine two weighting schemes in the sense that the second weighting
scheme is for resolving the ties of the main one.

The progressive Algorithms, the weighting schemes and the budget *BU* are specified in the configuration file. Advise the
configuration template in `config/configurationTemplate.yaml` to see how you can specify them. To execute, run:
Expand All @@ -66,8 +70,14 @@ Some additional options are the following:

- **-p N**: specify the number of partitions
- **-gt type**: specify the grid type for the spatial partitioning. Accepted values are KDBTREE and QUADTREE.
- **ws WS**: specify weighting scheme - allowed values: *CF, JS, MBR_INTERSECTION, PEARSON_X2, POINTS*.
- **progressiveAlgorithm PA**: specify progressive algorithm - allowed values: *PROGRESSIVE_GIANT, TOPK, RECIPROCAL_TOPK, GEOMETRY_CENTRIC, RANDOM*
- **mws WS**: specify the main weighting scheme - allowed values: *CF, JS, MBRO, PEARSON_X2, ISP*.
- **sws WS**: specify the secondary weighting scheme (optional)- allowed values: *CF, JS, MBRO, PEARSON_X2, ISP*, MBRO is preferred.
- **progressiveAlgorithm PA**: specify progressive algorithm - allowed values: *PROGRESSIVE_GIANT, DYNAMIC_PROGRESSIVE_GIANT, TOPK, RECIPROCAL_TOPK, RANDOM*
- **budget** BU: the input budget.

The command line options will overwrite the corresponding options of the configuration file.
The command line options will overwrite the corresponding options of the configuration file.

---
## Publication

*Progressive, Holistic Geospatial Interlinking. George Papadakis, Georgios Mandilaras, Nikos Mamoulis, Manolis Koubarakis. In Proceedings of The Web Conference 2021*
17 changes: 17 additions & 0 deletions config/LINEARWATER-AREAWATER.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@

source:
path: "/home/gmandi/Documents/Extreme-Earth/Datasets/SPATIAL-HADOOP/LINEARWATER_100K.tsv"
realIdField: "id"
geometryField: "WKT"

target:
path: "/home/gmandi/Documents/Extreme-Earth/Datasets/SPATIAL-HADOOP/AREAWATER_100K.tsv"
realIdField: "id"
geometryField: "WKT"

relation: "DE9IM"

configurations:
thetaGranularity: "avg"
secondaryWS: "MBRO"
mainWS: "JS"
3 changes: 2 additions & 1 deletion config/configurationTemplate.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ configurations:
partitions: "number of partitions"
thetaGranularity: "avg" # define the extend of dynamic tiling based on the geometries of source - Experiments have shown that "avg" is the best option
gridType: "spatial paritioner grid Type algorithm" # allowed values: KDBTREE, QUADTREE
weightingScheme: "WS" # specify weighting scheme - allowed values: CF, JS, MBR_INTERSECTION, PEARSON_X2, POINTS
mainWS: "WS" # specify weighting scheme - allowed values: CF, JS, MBRO, PEARSON_X2, ISP
secondaryWS: "WS"
progressiveAlgorithm : "PA" # specify progressive algorithm - allowed values: PROGRESSIVE_GIANT, TOPK, RECIPROCAL_TOPK, GEOMETRY_CENTRIC, RANDOM
budget: "BU" # the budget of progressive algorithms
63 changes: 0 additions & 63 deletions src/main/scala/dataModel/ComparisonPQ.scala

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,23 @@ package experiments

import java.util.Calendar

import geospatialInterlinking.IndexBasedMatching
import geospatialInterlinking.IndexBasedMatching
import geospatialInterlinking.progressive.ProgressiveAlgorithmsFactory
import model.Entity
import interlinkers.{GIAnt, IndexedJoinInterlinking}
import org.apache.log4j.{Level, LogManager, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.sql.SparkSession
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext, TaskContext}
import org.datasyslab.geospark.serde.GeoSparkKryoRegistrator
import utils.Constants.ProgressiveAlgorithm.ProgressiveAlgorithm
import utils.Constants.{GridType, ProgressiveAlgorithm, Relation, WeightingScheme}
import utils.Constants.WeightingScheme.WeightingScheme
import utils.{ConfigurationParser, SpatialReader, Utils}
import utils.Constants.{GridType, Relation}
import utils.readers.Reader
import utils.{ConfigurationParser, Utils}


object WellBalancedExp {
object BalancingExp {

implicit class TuppleAdd(t: (Int, Int, Int, Int, Int, Int, Int, Int, Int, Int, Int)) {
implicit class TupleAdd(t: (Int, Int, Int, Int, Int, Int, Int, Int, Int, Int, Int)) {
def +(p: (Int, Int, Int, Int, Int, Int, Int, Int, Int, Int, Int))
: (Int, Int, Int, Int, Int, Int, Int, Int, Int, Int, Int) =
(p._1 + t._1, p._2 + t._2, p._3 +t._3, p._4+t._4, p._5+t._5, p._6+t._6, p._7+t._7, p._8+t._8, p._9+t._9, p._10+t._10, p._11+t._11)
Expand All @@ -46,20 +45,8 @@ object WellBalancedExp {
case Nil => map
case ("-c" | "-conf") :: value :: tail =>
nextOption(map ++ Map("conf" -> value), tail)
case ("-f" | "-fraction") :: value :: tail =>
nextOption(map ++ Map("fraction" -> value), tail)
case ("-s" | "-stats") :: tail =>
nextOption(map ++ Map("stats" -> "true"), tail)
case "-auc" :: tail =>
nextOption(map ++ Map("auc" -> "true"), tail)
case ("-p" | "-partitions") :: value :: tail =>
nextOption(map ++ Map("partitions" -> value), tail)
case ("-b" | "-budget") :: value :: tail =>
nextOption(map ++ Map("budget" -> value), tail)
case "-ws" :: value :: tail =>
nextOption(map ++ Map("ws" -> value), tail)
case "-ma" :: value :: tail =>
nextOption(map ++ Map("ma" -> value), tail)
case "-gt" :: value :: tail =>
nextOption(map ++ Map("gt" -> value), tail)
case _ :: tail =>
Expand All @@ -80,41 +67,67 @@ object WellBalancedExp {
val confPath = options("conf")
val conf = ConfigurationParser.parse(confPath)
val partitions: Int = if (options.contains("partitions")) options("partitions").toInt else conf.getPartitions
val budget: Int = if (options.contains("budget")) options("budget").toInt else conf.getBudget
val ws: WeightingScheme = if (options.contains("ws")) WeightingScheme.withName(options("ws")) else conf.getWeightingScheme
val ma: ProgressiveAlgorithm = if (options.contains("ma")) ProgressiveAlgorithm.withName(options("ma")) else conf.getProgressiveAlgorithm
val gridType: GridType.GridType = if (options.contains("gt")) GridType.withName(options("gt").toString) else conf.getGridType
val relation = conf.getRelation
val startTime = Calendar.getInstance().getTimeInMillis

log.info("DS-JEDAI: Input Budget: " + budget)
log.info("DS-JEDAI: Weighting Scheme: " + ws.toString)

val startTime = Calendar.getInstance().getTimeInMillis
val reader = SpatialReader(conf.source, partitions, gridType)
val sourceRDD = reader.load()
// reading source dataset
val reader = Reader(partitions, gridType)
val sourceRDD: RDD[(Int, Entity)] = reader.loadSource(conf.source)
sourceRDD.persist(StorageLevel.MEMORY_AND_DISK)
Utils(sourceRDD.map(_._2.mbr), conf.getTheta, reader.partitionsZones)
val sourcePartitions: RDD[(Int, Iterator[Entity])] = sourceRDD.mapPartitions(si => Iterator((TaskContext.getPartitionId(), si.map(_._2))))


val targetRDD = reader.load(conf.target)
// reading target dataset
val targetRDD: RDD[(Int, Entity)] = reader.load(conf.target) match {
case Left(e) =>
log.error("Paritioner is not initialized, call first the `loadSource`.")
e.printStackTrace()
System.exit(1)
null
case Right(rdd) => rdd
}
val partitioner = reader.partitioner

val partitionEntitiesAVG = sourceRDD.mapPartitions(si => Iterator(si.toArray.length)).sum()/sourceRDD.getNumPartitions
val balancedSource = sourceRDD.mapPartitions(si => Iterator(si.toArray)).filter(_.length < partitionEntitiesAVG*3).flatMap(_.toIterator)
val overloadedSource = sourceRDD.mapPartitions(si => Iterator(si.toArray)).filter(_.length >= partitionEntitiesAVG*3).flatMap(_.toIterator)
val overloadedPartitionIds = overloadedSource.map(_ => TaskContext.getPartitionId()).collect().toSet
val balancedTarget = targetRDD.mapPartitions(ti => Iterator((TaskContext.getPartitionId(), ti))).filter{ case (pid, _) => !overloadedPartitionIds.contains(pid) }.flatMap(_._2)
val overloadedTarget = targetRDD.mapPartitions(ti => Iterator((TaskContext.getPartitionId(), ti))).filter{ case (pid, _) => overloadedPartitionIds.contains(pid) }.flatMap(_._2)
log.info("DS-JEDAI: Overloaded partitions: " + overloadedPartitionIds.size)
val entitiesPerPartitions: Seq[(Int, Int)] = sourcePartitions.map{ case (pid, si) => (pid, si.size)}.collect()

// find outlier partitions
val mean = entitiesPerPartitions.map(_._2).sum/sourceRDD.getNumPartitions
val variance = entitiesPerPartitions.map(_._2.toDouble).map(x => math.pow(x - mean, 2)).sum / entitiesPerPartitions.length
val std = Math.sqrt(variance)
val zScore: (Int, Int) => (Int, Double) = (p: Int, x: Int) => (p, (x - mean).toDouble/std)

val matchingStartTime = Calendar.getInstance().getTimeInMillis
val outliers = entitiesPerPartitions.map{case (p, x) => zScore(p, x)}.filter(_._2 > 2.5)
val outlierPartitions = outliers.map(_._1).toSet
log.info("DS-JEDAI: Overloaded partitions: " + outlierPartitions.size)

val pm = ProgressiveAlgorithmsFactory.get(ma, sourceRDD, targetRDD, partitioner, budget, ws)
val ibm = IndexBasedMatching(overloadedSource.map(_._2), overloadedTarget.map(_._2), Utils.getTheta)
val goodSourceRDD = sourceRDD.filter(s => !outlierPartitions.contains(s._1))
val badSourceRDD = sourceRDD.filter(s => outlierPartitions.contains(s._1))

val goodTargetRDD = targetRDD.filter(t => !outlierPartitions.contains(t._1))
val badTargetRDD = targetRDD.filter(t => outlierPartitions.contains(t._1))

val giant = GIAnt(goodSourceRDD, goodTargetRDD, partitioner)
val iji = IndexedJoinInterlinking(badSourceRDD, badTargetRDD, Utils.getTheta)

if (relation.equals(Relation.DE9IM)) {
val (totalContains, totalCoveredBy, totalCovers, totalCrosses, totalEquals, totalIntersects,
totalOverlaps, totalTouches, totalWithin, intersectingPairs, interlinkedGeometries) = pm.countAllRelations + ibm.countAllRelations
val giantStartTime = Calendar.getInstance().getTimeInMillis
val giantResults = giant.countAllRelations
val giantEndTime = Calendar.getInstance().getTimeInMillis
log.info("DS-JEDAI: GIA.nt Time: " + (giantEndTime - giantStartTime) / 1000.0)
log.info("DS-JEDAI: GIA.nt Interlinked Geometries: " + giantResults._11)
log.info("-----------------------------------------------------------\n")

val indexedJoinStartTime = Calendar.getInstance().getTimeInMillis
val indexedJoinResults = iji.countAllRelations
val indexedJoinEndTime = Calendar.getInstance().getTimeInMillis
log.info("DS-JEDAI: INDEXED-JOIN Time: " + (indexedJoinEndTime - indexedJoinStartTime) / 1000.0)
log.info("DS-JEDAI:INDEXED-JOIN Interlinked Geometries: " + indexedJoinResults._11)
log.info("-----------------------------------------------------------\n")

val (totalContains, totalCoveredBy, totalCovers, totalCrosses, totalEquals, totalIntersects,
totalOverlaps, totalTouches, totalWithin, intersectingPairs, interlinkedGeometries) = giantResults + indexedJoinResults
val totalRelations = totalContains + totalCoveredBy + totalCovers + totalCrosses + totalEquals +
totalIntersects + totalOverlaps + totalTouches + totalWithin
log.info("DS-JEDAI: Total Intersecting Pairs: " + intersectingPairs)
Expand All @@ -132,11 +145,9 @@ object WellBalancedExp {
log.info("DS-JEDAI: Total Relations Discovered: " + totalRelations)
}
else{
val totalMatches = pm.countRelation(relation) + ibm.countRelation(relation)
val totalMatches = giant.countRelation(relation) + iji.countRelation(relation)
log.info("DS-JEDAI: " + relation.toString +": " + totalMatches)
}
val matchingEndTime = Calendar.getInstance().getTimeInMillis
log.info("DS-JEDAI: Interlinking Time: " + (matchingEndTime - matchingStartTime) / 1000.0)

val endTime = Calendar.getInstance()
log.info("DS-JEDAI: Total Execution Time: " + (endTime.getTimeInMillis - startTime) / 1000.0)
Expand Down
Loading

0 comments on commit ef33ece

Please sign in to comment.