Skip to content

Commit

Permalink
Transform pipeline aggr test (#1027)
Browse files Browse the repository at this point in the history
* tester code: pipeline aggr. transform job

Signed-off-by: n-dohrmann <[email protected]>

* made test case for pipeline aggregator in transform job

Signed-off-by: n-dohrmann <[email protected]>

* removed unnec. test lines

Signed-off-by: n-dohrmann <[email protected]>

* re-added method call on Transform obj

Signed-off-by: n-dohrmann <[email protected]>

---------

Signed-off-by: n-dohrmann <[email protected]>
  • Loading branch information
n-dohrmann committed Nov 1, 2023
1 parent 8673c16 commit 4eaf365
Showing 1 changed file with 38 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ import org.opensearch.script.ScriptType
import org.opensearch.search.aggregations.AggregationBuilders
import org.opensearch.search.aggregations.AggregatorFactories
import org.opensearch.search.aggregations.metrics.ScriptedMetricAggregationBuilder
import org.opensearch.search.aggregations.pipeline.BucketScriptPipelineAggregationBuilder
import java.lang.Integer.min
import java.time.Instant
import java.time.temporal.ChronoUnit
import kotlin.test.assertFailsWith

class TransformRunnerIT : TransformRestTestCase() {

Expand Down Expand Up @@ -688,6 +690,42 @@ class TransformRunnerIT : TransformRestTestCase() {
assertTrue("Expected failure message to be present", !metadata.failureReason.isNullOrBlank())
}

fun `test transform with invalid pipeline aggregation triggering search failure`() {
assertFailsWith(IllegalArgumentException::class, "Bucket-script aggregation must fail!") {
validateSourceIndex("transform-source-index")

val aggregatorFactories = AggregatorFactories.builder()
aggregatorFactories.addPipelineAggregator(
BucketScriptPipelineAggregationBuilder(
"test_pipeline_aggregation",
Script("1")
)
)

val transform = Transform(
id = "id_17",
schemaVersion = 1L,
enabled = true,
enabledAt = Instant.now(),
updatedAt = Instant.now(),
jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES),
description = "test transform",
metadataId = null,
sourceIndex = "transform-source-index",
targetIndex = "transform-target-index",
roles = emptyList(),
pageSize = 1,
groups = listOf(
Terms(sourceField = "store_and_fwd_flag", targetField = "flag"),
Histogram(sourceField = "passenger_count", targetField = "count", interval = 2.0),
DateHistogram(sourceField = "tpep_pickup_datetime", targetField = "date", fixedInterval = "1d")
),
aggregations = aggregatorFactories
).let { createTransform(it, it.id) }
updateTransformStartTime(transform)
}
}

fun `test transform with data stream`() {
// Create a data stream.
val dataStreamName = "transform-data-stream"
Expand Down

0 comments on commit 4eaf365

Please sign in to comment.