-
Notifications
You must be signed in to change notification settings - Fork 117
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Figure out future DAG operators #125
Comments
I have a strong preference for avoiding "zip" in preference of a logical "join" with several physical implementations - one of which might be zip. I've been giving some thought to the hyperparameter tuning problem along the way, and have lots of thoughts there 👍 |
@etrain @shivaram The key insight is instead of complex operators taking nodes as parameters, they take functions that would chain new nodes onto the current pipeline. Some examples of what this could look like for different operators: Join Operator:
nodeAB then nodeBC thenJoin {
_ then nodeCD,
_ then nodeCE
} Concatenate Operator:
nodeAB then nodeBC thenConcat {
_ then nodeCD,
x => (x thenEstimator nodeCE).fit(rddOfA) then nodeED,
_ then otherNodeCD
} nodeAB then nodeBC thenConcat {
(1 to 4).map(_ then randNodeCD()):_*
} Grouped Operator:
nodeAB then nodeBSeqC thenOnEach {
(_ then nodeCD thenEstimator nodeDE).fit(rddOfA)
} Risky point of trickiness: the chaining rules inside the grouped operator have to be subtly different from the usual. Switch Operator:
nodeAB then nodeBC thenSwitch (c => if (c > 1) 0 else 1) {
_ then zeroNodeCD,
_ then oneNodeCD
} Risky point of trickiness: if a branch contains an estimator, it will only be trained on the data that flows into that branch. |
Thanks @tomerk - this is useful stuff. I don't fully grok the use case for the The Finally, let's not forget about Aggregation - it's possible to think of plain aggregation as a grouped operator with the null key (which could be how we implement it), but we should think about user-facing support for aggregation and what the semantics are at that level. |
After talking to @etrain and @shivaram it turns out I was mistaken about the interface of the BlockLinearEstimator. So, concatenate should actually have the interface: This could be generalized to reduce & aggregate: |
Okay @shivaram and @etrain, as per our discussion I’ve gone through MnistRandomFFT, RandomPatchCifar, VOCSIFTFisher, and the Imagenet pipeline to figure out how to better linearize our existing pipelines. Lessons learnedThe main way to linearize these pipelines is to consider how to push manual fitting & joining logic into a node.
Note that if you guys prefer it’s also possible to implement any of these via chaining instead of composition:
Note on sampling: MnistRandomFFTval featurizer = Concat {
Array.fill(conf.numFFTs) {
RandomSignNode(mnistImageSize, randomSignSource) then PaddedFFT then LinearRectifier(0.0)
}
}
val mnistClassifier = (featurizer thenLabelEstimator
new BlockLeastSquaresEstimator(conf.blockSize, 1, conf.lambda.getOrElse(0)))
.fit(train.data, labels) then MaxClassifier
// Do the evals
val trainEval = MulticlassClassifierEvaluator(mnistClassifier(train.data), train.labels, numClasses)
val testEval = MulticlassClassifierEvaluator(mnistClassifier(test.data), test.labels, numClasses) The only change needed for the MnistRandomFFT is replacing the zip vectors FunctionNode w/ a Concat node that extends Transformer and takes nodes of matching types as parameters RandomPatchCifar// Inline an Estimator
val convolverEstimator = Estimator { (in: RDD[Image]) =>
val patchExtractor = new Windower(conf.patchSteps, conf.patchSize)
.andThen(ImageVectorizer.apply)
.andThen(new Sampler(whitenerSize))
val baseFilters = patchExtractor(in)
val baseFilterMat = Stats.normalizeRows(MatrixUtils.rowsToMatrix(baseFilters), 10.0)
val whitener = new ZCAWhitenerEstimator().fitSingle(baseFilterMat)
//Normalize them.
val sampleFilters = MatrixUtils.sampleRows(baseFilterMat, conf.numFilters)
val unnormFilters = whitener(sampleFilters)
val unnormSq = pow(unnormFilters, 2.0)
val twoNorms = sqrt(sum(unnormSq(*, ::)))
val filters = (unnormFilters(::, *) / (twoNorms + 1e-10)) * whitener.whitener.t
new Convolver(filters, imageSize, imageSize, numChannels, Some(whitener), true)
}
// Build the pipeline
val featurizer = convolverEstimator.fit(trainImages)
.then(SymmetricRectifier(alpha=conf.alpha))
.then(new Pooler(conf.poolStride, conf.poolSize, identity, _.sum))
.then(ImageVectorizer)
.thenEstimator(new StandardScaler).fit(trainImages)
val predictionPipeline = (featurizer thenLabelEstimator
new BlockLeastSquaresEstimator(4096, 1, conf.lambda.getOrElse(0.0)))
.fit(trainImages, trainLabels) then MaxClassifier
// Do the model evals.
val trainEval = MulticlassClassifierEvaluator(
predictionPipeline(trainImages), LabelExtractor(trainData), numClasses)
val testEval = MulticlassClassifierEvaluator(predictionPipeline(testImages), LabelExtractor(testData), numClasses) The main change to the RandomPatchCifar pipeline is moving the convolver fitting logic into an Estimator (note we may want this as a not inlined estimator). It was too complicated internally to easily generalize to patterns I've seen elsewhere, but we could make it take a WhiteningEstimator as a parameter so it can use more than just the ZCAWhitener. Note that I've removed the caching nodes here because they're using simple enough rules that a DAG planner could put them in (just cache before any estimator). Alternatively w/ a DAG planner we could also have caching nodes that only cache in the fitting stage, not the final transformer, or we could specify caching the same I specify sampling in VOC, via VOCSIFTFisher// Part 1: Scale and convert images to grayscale.
val grayscaler = MultiLabeledImageExtractor then PixelScaler then GrayScaler
// Part 2: Extract Sifts & Compute dimensionality-reduced PCA features.
val siftExtractor = grayscaler then new SIFTExtractor(scaleStep = conf.scaleStep)
val pcaFeaturizer = conf.pcaFile match {
case Some(fname) => siftExtractor then
OnMatrix(PCATransformer(convert(csvread(new File(fname)), Float).t))
case None => (siftExtractor thenEstimator
OnMatrix(PCAEstimator(conf.descDim).sampling(conf.numPcaSamples))).fit(parsedRDD)
}
// Part 3: Compute Fisher Vectors and signed-square-root normalization.
val fisherFeaturizer = conf.gmmMeanFile match {
// Part 3a: If necessary to compute the Fisher Vectors, compute a GMM based on the dimensionality-reduced features, or load from disk.
case Some(f) => pcaFeaturizer then
FisherVector(
csvread(new File(conf.gmmMeanFile.get)),
csvread(new File(conf.gmmVarFile.get)),
csvread(new File(conf.gmmWtsFile.get)).toDenseVector)
case None => (pcaFeaturizer thenEstimator
FisherVectorEstimator(GaussianMixtureModelEstimator(conf.vocabSize).sampling(conf.numGmmSamples)))
.fit(parsedRDD)
} then FloatToDouble then
MatrixVectorizer then
NormalizeRows then
SignedHellingerMapper then
NormalizeRows
// Part 4: Fit a linear model to the data.
val model = (fisherFeaturizer thenLabelEstimator
new BlockLeastSquaresEstimator(4096, 1, conf.lambda))
.fit(parsedRDD, trainingLabels)
// Now featurize and apply the model to test data.
val testActuals = MultiLabelExtractor(testParsedRDD)
val predictions = model(testParsedRDD)
val map = MeanAveragePrecisionEvaluator(testActuals, predictions, VOCLoader.NUM_CLASSES) Notice the OnMatrix & FisherVector wrapping I mentioned in the “lessons learned”, leaving it up to the DAG executor to manage the caching, and how switching between a full pipeline linearization in ImageNetSiftLcsFVA mix of the strategies used above should much better linearize the pipeline:
|
cc @thisisdhaas @sjyk who have a use case for a |
Here is my use case/what I want to be able to do. -my side of things -your side of things For each of these types of features, transformers exist, however, when they are all mixed it doesn't fit into a nice linear pipeline. I think it is fair to assume a one-to-many relationship between cols and features. |
Can you enumerate the set of types you care about - are they basically, If you think of the FeatureView as a TupleN[A,B,..,N], and each component On Fri, May 29, 2015 at 1:58 AM, Sanjay Krishnan [email protected]
|
Spoke with @shivaram and @tomerk about this - it looks like we can support the pattern you're asking for with the Concat operator and assuming the input tables can be converted to RDD[case class]. @tomerk is going to check in this operator first and then we can see how well these things are supported. |
FYI here is the Spark PR open to convert DataFrames to typed RDDs apache/spark#5713 |
Sounds good. We can assume that the user knows the schema of the data frame and thus can give a case class description of it. |
It would be great if we could have a many-to-many relationship between inputs and outputs. For example, one common use case for us is to compute a similarity feature from two columns of the input (signature looks like This certainly complicates the interface @etrain described above, and I don't know enough about what you're planning with RDD[case class] + Concat to understand if it will work with that interface. |
So the basic idea right now is that each operator to _.concat will take in the entire row (a single object) and that we provide some simple syntax for selecting appropriate fields. Say you've got
And the output type of this pipeline would be something like a Tuple[J,K,...] if we assume that the output of EntityResolver is of type K. It may not end up being this exact syntax, but that's the basic idea. |
Ah, that concat operation looks very well aligned with our use case. Looking forward to seeing it. |
There are some concepts we've talked about (and have in some cases touched using FunctionNodes #121) that we need to figure out how to cleanly integrate into our pipelines interfaces.
Among these are:
join(identity, transformer)
)Grouped(Transformer[A, B]): Transformer[Seq[A], Seq[B]]
)The text was updated successfully, but these errors were encountered: