Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Figure out future DAG operators #125

Open
tomerk opened this issue May 18, 2015 · 15 comments
Open

Figure out future DAG operators #125

tomerk opened this issue May 18, 2015 · 15 comments

Comments

@tomerk
Copy link
Contributor

tomerk commented May 18, 2015

There are some concepts we've talked about (and have in some cases touched using FunctionNodes #121) that we need to figure out how to cleanly integrate into our pipelines interfaces.

Among these are:

  • Block Transformers and Estimators (operating on big feature spaces split across several RDDs)
  • Aggregators (sampling, windowing, caching, etc.)
  • Joining (both in the context of block nodes, and e.g. join(identity, transformer))
  • Operating on grouped data (e.g. Grouped(Transformer[A, B]): Transformer[Seq[A], Seq[B]])
  • Hyperparameter tuning & cross validation (making sure to allow optimizations like how linear mappers can train multiple lambdas at once)
@etrain
Copy link
Contributor

etrain commented May 18, 2015

I have a strong preference for avoiding "zip" in preference of a logical "join" with several physical implementations - one of which might be zip.

I've been giving some thought to the hyperparameter tuning problem along the way, and have lots of thoughts there 👍

@tomerk
Copy link
Contributor Author

tomerk commented May 25, 2015

@etrain @shivaram
So I think I’ve finally come up with a valuable insight for keeping the pipeline interface linearized even when doing complex DAG operations. (Important because the linearized interface gives us nice things like being more understandable and flexible, passing training data through the whole pipeline automatically, etc.)

The key insight is instead of complex operators taking nodes as parameters, they take functions that would chain new nodes onto the current pipeline.

Some examples of what this could look like for different operators:

Join Operator:

(A => B, A => C) => (A => (B, C))
Useful for obvious reasons

nodeAB then nodeBC thenJoin {
     _ then nodeCD, 
     _ then nodeCE
}

Concatenate Operator:

Seq[(A => B)] => (A => Seq[B])
Concatenate is to Join as Seq(a1, a2, a3) is to (a, b)
Block Transformers & Block Estimators would interact w/ an alternate physical concat operator than usual nodes would.

nodeAB then nodeBC thenConcat {
     _ then nodeCD, 
     x => (x thenEstimator nodeCE).fit(rddOfA) then nodeED,
     _ then otherNodeCD 
}
nodeAB then nodeBC thenConcat {
     (1 to 4).map(_ then randNodeCD()):_* 
}

Grouped Operator:

(A => B) => (Seq[A] => Seq[B])
Used for applying single-item transformers and estimators on grouped data, e.g. in Timit training LDA on a per-frame basis even though the frames are grouped into entire utterances.

nodeAB then nodeBSeqC thenOnEach {
     (_ then nodeCD thenEstimator nodeDE).fit(rddOfA)
}

Risky point of trickiness: the chaining rules inside the grouped operator have to be subtly different from the usual. _ thenLabelEstimator nodeCDL, would have to return LabelEstimator[A, D, Seq[L]], whereas usually nodeAC thenLabelEstimator nodeCDL would return LabelEstimator[A, D, L].

Switch Operator:

(A => Int)(Array[A => B]) => (A => B)
Effectively the branching conditional of pipelines. A => Int specifies which branch to take, Array[A => B] are the potential branches.
A use case we’ve seen for this is training a separate event extractor for each event type in the event extraction pipeline.

nodeAB then nodeBC thenSwitch (c => if (c > 1) 0 else 1) {
     _ then zeroNodeCD, 
     _ then oneNodeCD
}

Risky point of trickiness: if a branch contains an estimator, it will only be trained on the data that flows into that branch.

@etrain
Copy link
Contributor

etrain commented May 26, 2015

Thanks @tomerk - this is useful stuff. I don't fully grok the use case for the Concatenate operator, so let's chat about it in person. We should also discuss semantics for identifying the key for grouped and join operators.

The Switch operator seems useful, but I'm not sure what its use would be at the current level of abstraction. If it's for hyperparameter tuning, I think that belongs in a separate discussion and perhaps at a higher level of abstraction altogether. If it's for supporting more complex pipelines then it would be good to have a concrete use case to be working from.

Finally, let's not forget about Aggregation - it's possible to think of plain aggregation as a grouped operator with the null key (which could be how we implement it), but we should think about user-facing support for aggregation and what the semantics are at that level.

@tomerk
Copy link
Contributor Author

tomerk commented May 26, 2015

After talking to @etrain and @shivaram it turns out I was mistaken about the interface of the BlockLinearEstimator. So, concatenate should actually have the interface:
Seq[(T => DenseVector)] => (T => DenseVector)

This could be generalized to reduce & aggregate:
(Seq[(A => B)], (B, B) => B) => (A => B)
(Seq[(A => B)], C, (C, B) => C, (C, C) => C) => (A => C)
But I haven't seen any use cases for these yet, even in more complex pipelines

@tomerk
Copy link
Contributor Author

tomerk commented May 27, 2015

Okay @shivaram and @etrain, as per our discussion I’ve gone through MnistRandomFFT, RandomPatchCifar, VOCSIFTFisher, and the Imagenet pipeline to figure out how to better linearize our existing pipelines.

Lessons learned

The main way to linearize these pipelines is to consider how to push manual fitting & joining logic into a node.
Some cases the logic is so complicated it needs to have it’s own estimator (potentially inlined). Other patterns are generalizable enough it may be worth making nodes that take other nodes as input. e.g:

Concat(node1, node2)
OnMatrix(pcaNode)
OnSeq(ldaNode)
FisherVector(gmmNode)

Note that if you guys prefer it’s also possible to implement any of these via chaining instead of composition:

node1.concat(node2)
pcaNode.onMatrix
ldaNode.onSeq
gmmNode.toFisherVector

Note on sampling:
Sampling doesn’t quite fit into the above (because we need to maintain type information when sampling so we can fit sampling estimators into the above nodes, e.g. the current use case of ColumnSampler). We can either make estimators that we want to be able to sample take an optional sample size as a parameter, or we can make sampling a method of all estimators, node.sampling(count) and extend it while maintaining type information. Lets us do e.g. FisherVector(gmmNode.sampling())

MnistRandomFFT

val featurizer = Concat {
  Array.fill(conf.numFFTs) {
    RandomSignNode(mnistImageSize, randomSignSource) then PaddedFFT then LinearRectifier(0.0)
  }
}

val mnistClassifier = (featurizer thenLabelEstimator
    new BlockLeastSquaresEstimator(conf.blockSize, 1, conf.lambda.getOrElse(0)))
    .fit(train.data, labels) then MaxClassifier

// Do the evals
val trainEval = MulticlassClassifierEvaluator(mnistClassifier(train.data), train.labels, numClasses)
val testEval = MulticlassClassifierEvaluator(mnistClassifier(test.data), test.labels, numClasses)

The only change needed for the MnistRandomFFT is replacing the zip vectors FunctionNode w/ a Concat node that extends Transformer and takes nodes of matching types as parameters

RandomPatchCifar

// Inline an Estimator
val convolverEstimator = Estimator { (in: RDD[Image]) =>
  val patchExtractor = new Windower(conf.patchSteps, conf.patchSize)
      .andThen(ImageVectorizer.apply)
      .andThen(new Sampler(whitenerSize))

  val baseFilters = patchExtractor(in)
  val baseFilterMat = Stats.normalizeRows(MatrixUtils.rowsToMatrix(baseFilters), 10.0)
  val whitener = new ZCAWhitenerEstimator().fitSingle(baseFilterMat)

  //Normalize them.
  val sampleFilters = MatrixUtils.sampleRows(baseFilterMat, conf.numFilters)
  val unnormFilters = whitener(sampleFilters)
  val unnormSq = pow(unnormFilters, 2.0)
  val twoNorms = sqrt(sum(unnormSq(*, ::)))

  val filters = (unnormFilters(::, *) / (twoNorms + 1e-10)) * whitener.whitener.t
  new Convolver(filters, imageSize, imageSize, numChannels, Some(whitener), true)
}

// Build the pipeline
val featurizer = convolverEstimator.fit(trainImages)
    .then(SymmetricRectifier(alpha=conf.alpha))
    .then(new Pooler(conf.poolStride, conf.poolSize, identity, _.sum))
    .then(ImageVectorizer)
    .thenEstimator(new StandardScaler).fit(trainImages)

val predictionPipeline = (featurizer thenLabelEstimator
    new BlockLeastSquaresEstimator(4096, 1, conf.lambda.getOrElse(0.0)))
    .fit(trainImages, trainLabels) then MaxClassifier

// Do the model evals.
val trainEval = MulticlassClassifierEvaluator(
  predictionPipeline(trainImages), LabelExtractor(trainData), numClasses)
val testEval = MulticlassClassifierEvaluator(predictionPipeline(testImages), LabelExtractor(testData), numClasses)

The main change to the RandomPatchCifar pipeline is moving the convolver fitting logic into an Estimator (note we may want this as a not inlined estimator). It was too complicated internally to easily generalize to patterns I've seen elsewhere, but we could make it take a WhiteningEstimator as a parameter so it can use more than just the ZCAWhitener.

Note that I've removed the caching nodes here because they're using simple enough rules that a DAG planner could put them in (just cache before any estimator). Alternatively w/ a DAG planner we could also have caching nodes that only cache in the fitting stage, not the final transformer, or we could specify caching the same I specify sampling in VOC, via node.sampling()

VOCSIFTFisher

// Part 1: Scale and convert images to grayscale.
val grayscaler = MultiLabeledImageExtractor then PixelScaler then GrayScaler

// Part 2: Extract Sifts & Compute dimensionality-reduced PCA features.
val siftExtractor = grayscaler then new SIFTExtractor(scaleStep = conf.scaleStep)
val pcaFeaturizer = conf.pcaFile match {
  case Some(fname) => siftExtractor then
      OnMatrix(PCATransformer(convert(csvread(new File(fname)), Float).t))
  case None => (siftExtractor thenEstimator
      OnMatrix(PCAEstimator(conf.descDim).sampling(conf.numPcaSamples))).fit(parsedRDD)
}

// Part 3: Compute Fisher Vectors and signed-square-root normalization.
val fisherFeaturizer = conf.gmmMeanFile match {
  // Part 3a: If necessary to compute the Fisher Vectors, compute a GMM based on the dimensionality-reduced features, or load from disk.
  case Some(f) => pcaFeaturizer then
    FisherVector(
      csvread(new File(conf.gmmMeanFile.get)),
      csvread(new File(conf.gmmVarFile.get)),
      csvread(new File(conf.gmmWtsFile.get)).toDenseVector)
  case None => (pcaFeaturizer thenEstimator
      FisherVectorEstimator(GaussianMixtureModelEstimator(conf.vocabSize).sampling(conf.numGmmSamples)))
      .fit(parsedRDD)
} then FloatToDouble then
  MatrixVectorizer then
  NormalizeRows then
  SignedHellingerMapper then
  NormalizeRows

// Part 4: Fit a linear model to the data.
val model = (fisherFeaturizer thenLabelEstimator
    new BlockLeastSquaresEstimator(4096, 1, conf.lambda))
        .fit(parsedRDD, trainingLabels)

// Now featurize and apply the model to test data.
val testActuals = MultiLabelExtractor(testParsedRDD)
val predictions = model(testParsedRDD)

val map = MeanAveragePrecisionEvaluator(testActuals, predictions, VOCLoader.NUM_CLASSES)

Notice the OnMatrix & FisherVector wrapping I mentioned in the “lessons learned”, leaving it up to the DAG executor to manage the caching, and how switching between a full pipeline linearization in conf.x match { } instead of just the specific node to be chained.

ImageNetSiftLcsFV

A mix of the strategies used above should much better linearize the pipeline:

  • Nodes that wrap other nodes (OnMatrix, FisherVectorEstimator, etc.)
  • using wrapper(estimator.sampling()) instead of ColumnSampler
  • having conf.x match { } specify the full linearization
  • (Optionally but not required) leaving it up to the DAG executor to decide when to cache / unpersist cached data when fitting
  • moving complicated fitting logic that doesn’t fit into any of the above into it’s own estimator
  • using Concat to combine the Sift & LCS features

@shivaram
Copy link
Contributor

cc @thisisdhaas @sjyk who have a use case for a split operator that goes from a SparkSQL query output to a transformer

@sjyk
Copy link

sjyk commented May 29, 2015

Here is my use case/what I want to be able to do.

-my side of things
The output of my loading/data transformation code is a table/dataframe with a schema. I want the user to be able to specify a "FeatureView" of this table, that is for every column specify in words the featurization semantics of data. For example, suppose I have a table Review(comment, city, type, rating), the user should be able to say:
FeatureView(Review, List( (comment, "text"), (type, "categorical"), (rating, "numerical")))

-your side of things
Ideally there would be a pipeline architecture that takes a FeatureView as input generically without having rewrite every time. Either by automatically splitting the features into branches of the pipeline or allowing transformation on an index of the list.

For each of these types of features, transformers exist, however, when they are all mixed it doesn't fit into a nice linear pipeline. I think it is fair to assume a one-to-many relationship between cols and features.

@etrain
Copy link
Contributor

etrain commented May 29, 2015

Can you enumerate the set of types you care about - are they basically,
String, Enum, Double? Are there more? I ask because I'm worried about
enforcing type-safety if possible later on in the pipeline.

If you think of the FeatureView as a TupleN[A,B,..,N], and each component
pipeline as a (A => J, B =>K, .. N=>L), then we could pretty easily come up
with something that takes a FeatureView and a TupleN( functions) and gives
back a pipeline that goes from TupleN[A,B,..,N] => TupleN[J,K,..,L], but
I'm not entirely sure whether those are obvious semantics or what the DAG
operator would be called.

On Fri, May 29, 2015 at 1:58 AM, Sanjay Krishnan [email protected]
wrote:

Here is my use case/what I want to be able to do.

-my side of things
The output of my loading/data transformation code is a table/dataframe
with a schema. I want the user to be able to specify a "FeatureView" of
this table, that is for every column specify in words the featurization
semantics of data. For example, suppose I have a table Review(comment,
city, type, rating), the user should be able to say:
FeatureView(Review, List( (comment, "text"), (type, "categorical"),
(rating, "numerical")))

-your side of things
Ideally there would be a pipeline architecture that takes a FeatureView as
input generically without having rewrite every time. Either by
automatically splitting the features into branches of the pipeline or
allowing transformation on an index of the list.

For each of these types of features, transformers exist, however, when
they are all mixed it doesn't fit into a nice linear pipeline. I think it
is fair to assume a one-to-many relationship between cols and features.


Reply to this email directly or view it on GitHub
#125 (comment).

@etrain
Copy link
Contributor

etrain commented May 29, 2015

Spoke with @shivaram and @tomerk about this - it looks like we can support the pattern you're asking for with the Concat operator and assuming the input tables can be converted to RDD[case class].

@tomerk is going to check in this operator first and then we can see how well these things are supported.

@shivaram
Copy link
Contributor

FYI here is the Spark PR open to convert DataFrames to typed RDDs apache/spark#5713

@sjyk
Copy link

sjyk commented May 29, 2015

Sounds good. We can assume that the user knows the schema of the data frame and thus can give a case class description of it.

@thisisdhaas
Copy link
Member

It would be great if we could have a many-to-many relationship between inputs and outputs. For example, one common use case for us is to compute a similarity feature from two columns of the input (signature looks like (A,B => J)).

This certainly complicates the interface @etrain described above, and I don't know enough about what you're planning with RDD[case class] + Concat to understand if it will work with that interface.

@etrain
Copy link
Contributor

etrain commented May 29, 2015

So the basic idea right now is that each operator to _.concat will take in the entire row (a single object) and that we provide some simple syntax for selecting appropriate fields. Say you've got
case class ViewRecord(a: A, b: B), then the operator might look something like

preprocess thenConcat {
   CosineSimilarity(_.a, _.b),
   new EntityResolver(0.6)(_.a),
   ...
}

And the output type of this pipeline would be something like a Tuple[J,K,...] if we assume that the output of EntityResolver is of type K.

It may not end up being this exact syntax, but that's the basic idea.

@etrain
Copy link
Contributor

etrain commented May 29, 2015

Also, @sjyk - @shivaram mentioned that there may be a .toCaseClass (or something) method on DataFrames coming soon, but I can't find a reference to it, so this may not be something you need to solve.

@thisisdhaas
Copy link
Member

Ah, that concat operation looks very well aligned with our use case. Looking forward to seeing it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants