From 90bbfcc422b45914850438cf37df4a62344a9ffc Mon Sep 17 00:00:00 2001 From: Peng Cheng Date: Sun, 21 Dec 2014 18:09:11 -0500 Subject: [PATCH] log after every count in PageRowRDD.explore --- .../spookystuff/sparkbinding/PageRowRDD.scala | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/tribbloid/spookystuff/sparkbinding/PageRowRDD.scala b/core/src/main/scala/org/tribbloid/spookystuff/sparkbinding/PageRowRDD.scala index fb86d5d57..2770d3d02 100644 --- a/core/src/main/scala/org/tribbloid/spookystuff/sparkbinding/PageRowRDD.scala +++ b/core/src/main/scala/org/tribbloid/spookystuff/sparkbinding/PageRowRDD.scala @@ -5,6 +5,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.{Partition, TaskContext} +import org.slf4j.LoggerFactory import org.tribbloid.spookystuff.actions._ import org.tribbloid.spookystuff.dsl.{Inner, JoinType, _} import org.tribbloid.spookystuff.entity._ @@ -610,7 +611,8 @@ case class PageRowRDD( .values if (depth % 20 == 0) { totalPages.persist().checkpoint() - totalPages.count() + val size = totalPages.count() + LoggerFactory.getLogger(this.getClass).info(s"explored $size pages in total") } val joined = joinedBeforeFlatten.flattenPages(flattenPagesPattern, flattenPagesIndexKey) @@ -626,9 +628,10 @@ case class PageRowRDD( newRows.checkpoint() } - val newRowsCount = newRows.count() + val newRowsSize = newRows.count() + LoggerFactory.getLogger(this.getClass).info(s"fetched $newRowsSize new row(s)") - if (newRowsCount == 0){ + if (newRowsSize == 0){ return total .select(select: _*) .clearTemp @@ -641,7 +644,8 @@ case class PageRowRDD( ).coalesce(numPartitions) if (depth % 20 == 0) { total.persist().checkpoint() - total.count() + val size = total.count() + LoggerFactory.getLogger(this.getClass).info(s"fetched $size rows in total") } }