Skip to content

Commit

Permalink
log after every count in PageRowRDD.explore
Browse files Browse the repository at this point in the history
  • Loading branch information
Peng Cheng committed Dec 21, 2014
1 parent 3374277 commit 90bbfcc
Showing 1 changed file with 8 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.{Partition, TaskContext}
import org.slf4j.LoggerFactory
import org.tribbloid.spookystuff.actions._
import org.tribbloid.spookystuff.dsl.{Inner, JoinType, _}
import org.tribbloid.spookystuff.entity._
Expand Down Expand Up @@ -610,7 +611,8 @@ case class PageRowRDD(
.values
if (depth % 20 == 0) {
totalPages.persist().checkpoint()
totalPages.count()
val size = totalPages.count()
LoggerFactory.getLogger(this.getClass).info(s"explored $size pages in total")
}

val joined = joinedBeforeFlatten.flattenPages(flattenPagesPattern, flattenPagesIndexKey)
Expand All @@ -626,9 +628,10 @@ case class PageRowRDD(
newRows.checkpoint()
}

val newRowsCount = newRows.count()
val newRowsSize = newRows.count()
LoggerFactory.getLogger(this.getClass).info(s"fetched $newRowsSize new row(s)")

if (newRowsCount == 0){
if (newRowsSize == 0){
return total
.select(select: _*)
.clearTemp
Expand All @@ -641,7 +644,8 @@ case class PageRowRDD(
).coalesce(numPartitions)
if (depth % 20 == 0) {
total.persist().checkpoint()
total.count()
val size = total.count()
LoggerFactory.getLogger(this.getClass).info(s"fetched $size rows in total")
}
}

Expand Down

0 comments on commit 90bbfcc

Please sign in to comment.