diff --git a/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt b/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt index 5c6fd2441..b5d23b6bc 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt @@ -148,6 +148,7 @@ import org.opensearch.indexmanagement.spi.IndexManagementExtension import org.opensearch.indexmanagement.spi.indexstatemanagement.IndexMetadataService import org.opensearch.indexmanagement.spi.indexstatemanagement.StatusChecker import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData +import org.opensearch.indexmanagement.transform.TargetIndexMappingService import org.opensearch.indexmanagement.transform.TransformRunner import org.opensearch.indexmanagement.transform.action.delete.DeleteTransformsAction import org.opensearch.indexmanagement.transform.action.delete.TransportDeleteTransformsAction @@ -467,6 +468,8 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin indexNameExpressionResolver, ) + TargetIndexMappingService.initialize(client) + return listOf( managedIndexRunner, rollupRunner, diff --git a/src/main/kotlin/org/opensearch/indexmanagement/common/model/dimension/DateHistogram.kt b/src/main/kotlin/org/opensearch/indexmanagement/common/model/dimension/DateHistogram.kt index 2c9dda200..73de49726 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/common/model/dimension/DateHistogram.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/common/model/dimension/DateHistogram.kt @@ -28,7 +28,8 @@ data class DateHistogram( override val targetField: String = sourceField, val fixedInterval: String? = null, val calendarInterval: String? = null, - val timezone: ZoneId = ZoneId.of(UTC) + val timezone: ZoneId = ZoneId.of(UTC), + val format: String? = null ) : Dimension(Type.DATE_HISTOGRAM, sourceField, targetField) { init { @@ -54,6 +55,7 @@ data class DateHistogram( return builder.field(DIMENSION_SOURCE_FIELD_FIELD, sourceField) .field(DIMENSION_TARGET_FIELD_FIELD, targetField) .field(DATE_HISTOGRAM_TIMEZONE_FIELD, timezone.id) + .field(FORMAT, format) .endObject() .endObject() } @@ -82,6 +84,9 @@ data class DateHistogram( fixedInterval?.let { this.fixedInterval(DateHistogramInterval(it)) } + format?.let { + this.format(it) + } } } @@ -128,6 +133,7 @@ data class DateHistogram( const val FIXED_INTERVAL_FIELD = "fixed_interval" const val CALENDAR_INTERVAL_FIELD = "calendar_interval" const val DATE_HISTOGRAM_TIMEZONE_FIELD = "timezone" + const val FORMAT = "format" @Suppress("ComplexMethod", "LongMethod") @JvmStatic @@ -138,6 +144,7 @@ data class DateHistogram( var fixedInterval: String? = null var calendarInterval: String? = null var timezone = ZoneId.of(UTC) + var format: String? = null ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp) while (xcp.nextToken() != Token.END_OBJECT) { @@ -150,6 +157,7 @@ data class DateHistogram( DATE_HISTOGRAM_TIMEZONE_FIELD -> timezone = ZoneId.of(xcp.text()) DIMENSION_SOURCE_FIELD_FIELD -> sourceField = xcp.text() DIMENSION_TARGET_FIELD_FIELD -> targetField = xcp.text() + FORMAT -> format = xcp.textOrNull() else -> throw IllegalArgumentException("Invalid field [$fieldName] found in date histogram") } } @@ -159,7 +167,8 @@ data class DateHistogram( targetField = requireNotNull(targetField) { "Target field must not be null" }, fixedInterval = fixedInterval, calendarInterval = calendarInterval, - timezone = timezone + timezone = timezone, + format = format ) } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/TargetIndexMappingService.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/TargetIndexMappingService.kt new file mode 100644 index 000000000..0a5954a94 --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/TargetIndexMappingService.kt @@ -0,0 +1,250 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.transform + +import org.apache.logging.log4j.LogManager +import org.opensearch.action.admin.indices.mapping.get.GetMappingsRequest +import org.opensearch.action.admin.indices.mapping.get.GetMappingsResponse +import org.opensearch.client.Client +import org.opensearch.common.bytes.BytesReference +import org.opensearch.common.xcontent.LoggingDeprecationHandler +import org.opensearch.common.xcontent.XContentFactory +import org.opensearch.common.xcontent.XContentHelper +import org.opensearch.common.xcontent.XContentType +import org.opensearch.core.xcontent.NamedXContentRegistry +import org.opensearch.core.xcontent.XContentBuilder +import org.opensearch.core.xcontent.XContentParser +import org.opensearch.index.IndexNotFoundException +import org.opensearch.indexmanagement.IndexManagementIndices +import org.opensearch.indexmanagement.common.model.dimension.DateHistogram +import org.opensearch.indexmanagement.opensearchapi.string +import org.opensearch.indexmanagement.opensearchapi.suspendUntil +import org.opensearch.indexmanagement.transform.exceptions.TransformIndexException +import org.opensearch.indexmanagement.transform.model.Transform +import org.opensearch.indexmanagement.transform.util.DEFAULT_DATE_FORMAT +import org.opensearch.indexmanagement.util.IndexUtils +import org.opensearch.search.aggregations.AggregationBuilder +import org.opensearch.search.aggregations.support.ValuesSourceAggregationBuilder +import java.nio.ByteBuffer +import java.nio.charset.StandardCharsets + +/** + * Service designed for creating dynamic target index mapping based on the date field types of the source index. + * Creates target index date properties based on the date properties of the source index + * (ie. if the term grouping is applied on a date field of source index, target index field will have date type also) + */ +object TargetIndexMappingService { + private val logger = LogManager.getLogger(javaClass) + private lateinit var client: Client + + private const val TYPE = "type" + private const val PROPERTIES = "properties" + private val DATE_FIELD_TYPES = setOf("date", "date_nanos") + + fun initialize(client: Client) { + this.client = client + } + + /** + * + * Check if the source index contains date fields and returns target index mapping for date fields by using default date format + * Example: + * input map: [tpep_pickup_datetime, [type: date]] + * target index mapping: "tpep_pickup_datetime": { + * "type": "date", + * "format": "strict_date_optional_time||epoch_millis" + * } + * @return map of the date properties + * + */ + suspend fun getTargetMappingsForDates(transform: Transform): Map { + val sourceIndex = transform.sourceIndex + try { + val result: GetMappingsResponse = client.admin().indices().suspendUntil { + getMappings(GetMappingsRequest().indices(sourceIndex), it) + } ?: error("GetMappingResponse for [$transform.sourceIndex] was null") + + val sourceIndexMapping = result.mappings[sourceIndex]?.sourceAsMap + + val targetIndexDateFieldMappings = mutableMapOf() + if (!sourceIndexMapping.isNullOrEmpty()) { + mapDateTermAggregation(transform, sourceIndexMapping, targetIndexDateFieldMappings) + mapDateAggregation(transform.aggregations.aggregatorFactories, sourceIndexMapping, targetIndexDateFieldMappings, null) + } + return targetIndexDateFieldMappings + } catch (ex: IndexNotFoundException) { + logger.error("Index $sourceIndex doesn't exist") + return emptyMap() + } + } + + private fun mapDateTermAggregation( + transform: Transform, + sourceIndexMapping: MutableMap, + dateFieldMappings: MutableMap, + ) { + transform.groups.forEach { dimension -> + if (!isFieldInMappings(dimension.sourceField, sourceIndexMapping)) { + throw TransformIndexException("Missing field ${dimension.sourceField} in source index") + } + val sourceFieldType = IndexUtils.getFieldFromMappings(dimension.sourceField, sourceIndexMapping) + // Consider only date fields as relevant for building the target index mapping + // Excluding date histogram since user can define format in it + if (dimension !is DateHistogram && isSourceFieldDate(sourceFieldType)) { + // Taking the source field settings (type, format etc.) + val dateTypeTargetMapping = mapOf("type" to sourceFieldType!![TYPE], "format" to DEFAULT_DATE_FORMAT) + dateFieldMappings[dimension.targetField] = dateTypeTargetMapping + } + } + } + + private fun isSourceFieldDate(sourceFieldType: Map<*, *>?) = + sourceFieldType?.get(TYPE) != null && DATE_FIELD_TYPES.contains(sourceFieldType[TYPE]) + + /** + * Loads transform target index mappings from json and adds date properties mapping + * + * @param dateFieldMappings target index date fields mappings + */ + fun createTargetIndexMapping(dateFieldMappings: Map): String { + val builder = XContentFactory.jsonBuilder() + val dynamicMappings = IndexManagementIndices.transformTargetMappings + val byteBuffer = ByteBuffer.wrap(dynamicMappings.toByteArray(StandardCharsets.UTF_8)) + val bytesReference = BytesReference.fromByteBuffer(byteBuffer) + + val xcp = XContentHelper.createParser( + NamedXContentRegistry.EMPTY, + LoggingDeprecationHandler.INSTANCE, + bytesReference, + XContentType.JSON + ) + loop@while (!xcp.isClosed) { + val token = xcp.currentToken() + val fieldName = xcp.currentName() + + when (token) { + XContentParser.Token.VALUE_NUMBER -> builder.field(fieldName, xcp.intValue()) + XContentParser.Token.VALUE_STRING -> builder.field(fieldName, xcp.text()) + XContentParser.Token.START_OBJECT -> { + if (fieldName != null) { + builder.startObject(fieldName) + } else { + builder.startObject() + } + } + XContentParser.Token.END_OBJECT -> builder.endObject() + XContentParser.Token.START_ARRAY -> builder.startArray(fieldName) + XContentParser.Token.END_ARRAY -> { + builder.endArray() + // Add target index date fields mappings only if the date field mappings are present + if (dateFieldMappings.isNotEmpty()) { + builder.startObject(PROPERTIES) + mapCompositeAggregation(dateFieldMappings, builder) + builder.endObject() + } + } + else -> { + xcp.nextToken() + continue@loop + } + } + xcp.nextToken() + } + return builder.string() + } + + @Suppress("UNCHECKED_CAST") + private fun mapCompositeAggregation( + compositeAggregation: Map, + builder: XContentBuilder, + ) { + val iterator = compositeAggregation.entries.iterator() + while (iterator.hasNext()) { + val it = iterator.next() + if (it.value is Map<*, *>) { + builder.startObject(it.key) + // Start object until reaching the "leaf"; leaf is the last key value pair, where value is not a map + mapCompositeAggregation(it.value as Map, builder) + builder.endObject() + } else { + if (DATE_FIELD_TYPES.contains(it.value)) { + builder.field(it.key, it.value.toString()) + } + } + } + } + + /** + * Creates properties section in target index mappings based on the given date fields + * Parses target index mapping as a string - instead of using XContentBuilder + */ + @Suppress("UNUSED_PARAMETER") + private fun createTargetIndexMappingsAsString( + dateFieldMappings: Map, + dynamicMappings: String, + ): String { + val compositeAgg = mapCompositeAggregationToString(dateFieldMappings) + return dynamicMappings.trimIndent().dropLast(1) + ", \n \"properties\" : \n { \n $compositeAgg \n } \n }" + } + + @Suppress("UNCHECKED_CAST") + private fun mapCompositeAggregationToString( + compositeAggregation: Map, + ): String { + return buildString { + var isFirst = true + val iterator = compositeAggregation.entries.iterator() + while (iterator.hasNext()) { + val it = iterator.next() + if (!isFirst) { + append(",") + } + isFirst = false + if (it.value is Map<*, *>) { + append("\"${it.key}\" : {") + append(mapCompositeAggregationToString(it.value as Map)) + append("\n }") + } else { + append("\n") + append("\"${it.key}\" : \"${it.value}\"") + } + } + } + } + + private fun mapDateAggregation( + aggBuilders: Collection, + sourceIndexMapping: Map, + targetIndexMapping: MutableMap, + parentPath: String?, + ) { + val iterator = aggBuilders.iterator() + while (iterator.hasNext()) { + + val aggBuilder = iterator.next() + val targetIdxFieldName = aggBuilder.name + val fullPath = parentPath?.plus(".")?.plus(targetIdxFieldName) ?: targetIdxFieldName + // In the case of a date field used in aggregation - MIN, MAX or COUNT + if (aggBuilder is ValuesSourceAggregationBuilder<*>) { + val sourceIdxFieldName = aggBuilder.field() + + val sourceFieldType = IndexUtils.getFieldFromMappings(sourceIdxFieldName, sourceIndexMapping) + // Consider only aggregations on date field type + if (!sourceFieldType.isNullOrEmpty() && sourceFieldType[TYPE] == "date") { + val dateTypeTargetMapping = mapOf("type" to "date", "format" to DEFAULT_DATE_FORMAT) + // In the case if sub-aggregation exist + targetIndexMapping[fullPath] = dateTypeTargetMapping + } + } + if (aggBuilder.subAggregations.isNullOrEmpty()) { + continue + } + // Do the same for all sub-aggregations + mapDateAggregation(aggBuilder.subAggregations, sourceIndexMapping, targetIndexMapping, fullPath) + } + } + private fun isFieldInMappings(fieldName: String, mappings: Map<*, *>) = IndexUtils.getFieldFromMappings(fieldName, mappings) != null +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt index 568be5fde..8f5805f0f 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt @@ -19,11 +19,11 @@ import org.opensearch.action.index.IndexRequest import org.opensearch.client.Client import org.opensearch.cluster.service.ClusterService import org.opensearch.common.settings.Settings -import org.opensearch.indexmanagement.IndexManagementIndices import org.opensearch.indexmanagement.opensearchapi.retry import org.opensearch.indexmanagement.opensearchapi.suspendUntil import org.opensearch.indexmanagement.transform.exceptions.TransformIndexException import org.opensearch.indexmanagement.transform.settings.TransformSettings +import org.opensearch.indexmanagement.transform.util.TransformContext import org.opensearch.rest.RestStatus import org.opensearch.transport.RemoteTransportException @@ -36,7 +36,8 @@ class TransformIndexer( private val logger = LogManager.getLogger(javaClass) - @Volatile private var backoffPolicy = BackoffPolicy.constantBackoff( + @Volatile + private var backoffPolicy = BackoffPolicy.constantBackoff( TransformSettings.TRANSFORM_JOB_INDEX_BACKOFF_MILLIS.get(settings), TransformSettings.TRANSFORM_JOB_INDEX_BACKOFF_COUNT.get(settings) ) @@ -51,21 +52,21 @@ class TransformIndexer( } } - private suspend fun createTargetIndex(index: String) { - if (!clusterService.state().routingTable.hasIndex(index)) { - val request = CreateIndexRequest(index) - .mapping(IndexManagementIndices.transformTargetMappings) + private suspend fun createTargetIndex(targetIndex: String, targetFieldMappings: Map) { + if (!clusterService.state().routingTable.hasIndex(targetIndex)) { + val transformTargetIndexMapping = TargetIndexMappingService.createTargetIndexMapping(targetFieldMappings) + val request = CreateIndexRequest(targetIndex).mapping(transformTargetIndexMapping) // TODO: Read in the actual mappings from the source index and use that val response: CreateIndexResponse = client.admin().indices().suspendUntil { create(request, it) } if (!response.isAcknowledged) { - logger.error("Failed to create the target index $index") + logger.error("Failed to create the target index $targetIndex") throw TransformIndexException("Failed to create the target index") } } } @Suppress("ThrowsCount", "RethrowCaughtException") - suspend fun index(docsToIndex: List>): Long { + suspend fun index(transformTargetIndex: String, docsToIndex: List>, transformContext: TransformContext): Long { var updatableDocsToIndex = docsToIndex var indexTimeInMillis = 0L val nonRetryableFailures = mutableListOf() @@ -73,7 +74,8 @@ class TransformIndexer( if (updatableDocsToIndex.isNotEmpty()) { val targetIndex = updatableDocsToIndex.first().index() logger.debug("Attempting to index ${updatableDocsToIndex.size} documents to $targetIndex") - createTargetIndex(targetIndex) + + createTargetIndex(transformTargetIndex, transformContext.getTargetIndexDateFieldMappings()) backoffPolicy.retry(logger, listOf(RestStatus.TOO_MANY_REQUESTS)) { val bulkRequest = BulkRequest().add(updatableDocsToIndex) val bulkResponse: BulkResponse = client.suspendUntil { bulk(bulkRequest, it) } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformRunner.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformRunner.kt index abf94a161..0cf658a29 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformRunner.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformRunner.kt @@ -72,6 +72,7 @@ object TransformRunner : this.transformMetadataService = TransformMetadataService(client, xContentRegistry) this.transformIndexer = TransformIndexer(settings, clusterService, client) this.transformValidator = TransformValidator(indexNameExpressionResolver, clusterService, client, settings, jvmService) + this.threadPool = threadPool return this } @@ -108,7 +109,10 @@ object TransformRunner : val transformProcessedBucketLog = TransformProcessedBucketLog() var bucketsToTransform = BucketsToTransform(HashSet(), metadata) - val transformContext = TransformContext(TransformLockManager(transform, context)) + val transformContext = TransformContext( + TransformLockManager(transform, context) + ) + // Acquires the lock if there is no running job execution for the given transform; Lock is acquired per transform val transformLockManager = transformContext.transformLockManager transformLockManager.acquireLockForScheduledJob() @@ -129,6 +133,10 @@ object TransformRunner : currentMetadata = validatedMetadata return } + // If date was used in term query generate target date field mapping and store it in transform context + val targetIndexDateFieldMappings = TargetIndexMappingService.getTargetMappingsForDates(transform) + transformContext.setTargetDateFieldMappings(targetIndexDateFieldMappings) + if (transform.continuous) { // If we have not populated the list of shards to search, do so now if (bucketsToTransform.shardsToSearch == null) { @@ -271,7 +279,7 @@ object TransformRunner : ) } val indexTimeInMillis = withTransformSecurityContext(transform) { - transformIndexer.index(transformSearchResult.docsToIndex) + transformIndexer.index(transform.targetIndex, transformSearchResult.docsToIndex, transformContext) } val afterKey = transformSearchResult.afterKey val stats = transformSearchResult.stats @@ -298,7 +306,7 @@ object TransformRunner : transformSearchService.executeCompositeSearch(transform, null, modifiedBuckets, transformContext) } val indexTimeInMillis = withTransformSecurityContext(transform) { - transformIndexer.index(transformSearchResult.docsToIndex) + transformIndexer.index(transform.targetIndex, transformSearchResult.docsToIndex, transformContext) } val stats = transformSearchResult.stats val updatedStats = stats.copy( diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt index 0330aa952..6afa93637 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt @@ -206,7 +206,12 @@ class TransformSearchService( // If the request was successful, update page size transformContext.lastSuccessfulPageSize = pageSize transformContext.renewLockForLongSearch(Instant.now().epochSecond - searchStart) - return convertResponse(transform, searchResponse, modifiedBuckets = modifiedBuckets) + return convertResponse( + transform, + searchResponse, + modifiedBuckets = modifiedBuckets, + targetIndexDateFieldMappings = transformContext.getTargetIndexDateFieldMappings() + ) } catch (e: TransformSearchServiceException) { throw e } catch (e: RemoteTransportException) { @@ -333,7 +338,8 @@ class TransformSearchService( transform: Transform, searchResponse: SearchResponse, waterMarkDocuments: Boolean = true, - modifiedBuckets: MutableSet>? = null + modifiedBuckets: MutableSet>? = null, + targetIndexDateFieldMappings: Map, ): TransformSearchResult { val aggs = searchResponse.aggregations.get(transform.id) as CompositeAggregation val buckets = if (modifiedBuckets != null) aggs.buckets.filter { modifiedBuckets.contains(it.key) } else aggs.buckets @@ -348,8 +354,12 @@ class TransformSearchService( val hashedId = hashToFixedSize(id) val document = transform.convertToDoc(aggregatedBucket.docCount, waterMarkDocuments) - aggregatedBucket.key.entries.forEach { bucket -> document[bucket.key] = bucket.value } - aggregatedBucket.aggregations.forEach { aggregation -> document[aggregation.name] = getAggregationValue(aggregation) } + aggregatedBucket.key.entries.forEach { bucket -> + document[bucket.key] = bucket.value + } + aggregatedBucket.aggregations.forEach { aggregation -> + document[aggregation.name] = getAggregationValue(aggregation, targetIndexDateFieldMappings) + } val indexRequest = IndexRequest(transform.targetIndex) .id(hashedId) @@ -370,11 +380,20 @@ class TransformSearchService( return BucketSearchResult(modifiedBuckets, aggs.afterKey(), searchResponse.took.millis) } - private fun getAggregationValue(aggregation: Aggregation): Any { + private fun getAggregationValue(aggregation: Aggregation, targetIndexDateFieldMappings: Map): Any { return when (aggregation) { is InternalSum, is InternalMin, is InternalMax, is InternalAvg, is InternalValueCount -> { val agg = aggregation as NumericMetricsAggregation.SingleValue - agg.value() + /** + * When date filed is used in transform aggregation (min, max avg), the value of the field is in exponential format + * which is not allowed since the target index mapping for date field is strict_date_optional_time||epoch_millis + * That's why the exponential value is transformed to long: agg.value().toLong() + */ + if (aggregation is InternalValueCount || aggregation is InternalSum || !targetIndexDateFieldMappings.containsKey(agg.name)) { + agg.value() + } else { + agg.value().toLong() + } } is Percentiles -> { val percentiles = mutableMapOf() diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/action/preview/TransportPreviewTransformAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/action/preview/TransportPreviewTransformAction.kt index ccd4e0518..aef1859bd 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/action/preview/TransportPreviewTransformAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/action/preview/TransportPreviewTransformAction.kt @@ -5,6 +5,9 @@ package org.opensearch.indexmanagement.transform.action.preview +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.launch import org.apache.logging.log4j.LogManager import org.opensearch.ExceptionsHelper import org.opensearch.OpenSearchStatusException @@ -21,10 +24,16 @@ import org.opensearch.client.Client import org.opensearch.cluster.metadata.IndexNameExpressionResolver import org.opensearch.cluster.service.ClusterService import org.opensearch.common.inject.Inject +import org.opensearch.common.settings.Settings import org.opensearch.commons.ConfigConstants +import org.opensearch.indexmanagement.opensearchapi.IndexManagementSecurityContext +import org.opensearch.indexmanagement.opensearchapi.suspendUntil +import org.opensearch.indexmanagement.opensearchapi.withClosableContext +import org.opensearch.indexmanagement.transform.TargetIndexMappingService import org.opensearch.indexmanagement.transform.TransformSearchService import org.opensearch.indexmanagement.transform.TransformValidator import org.opensearch.indexmanagement.transform.model.Transform +import org.opensearch.indexmanagement.util.SecurityUtils import org.opensearch.rest.RestStatus import org.opensearch.tasks.Task import org.opensearch.transport.TransportService @@ -32,6 +41,7 @@ import org.opensearch.transport.TransportService class TransportPreviewTransformAction @Inject constructor( transportService: TransportService, actionFilters: ActionFilters, + val settings: Settings, private val client: Client, private val clusterService: ClusterService, private val indexNameExpressionResolver: IndexNameExpressionResolver @@ -69,7 +79,15 @@ class TransportPreviewTransformAction @Inject constructor( return } val searchRequest = TransformSearchService.getSearchServiceRequest(transform = transform, pageSize = 10) - executeSearch(searchRequest, transform, listener) + val user = SecurityUtils.buildUser(client.threadPool().threadContext) + + CoroutineScope(Dispatchers.IO).launch { + withClosableContext( + IndexManagementSecurityContext("PreviewTransformHandler", settings, client.threadPool().threadContext, user) + ) { + executeSearch(searchRequest, transform, listener) + } + } } override fun onFailure(e: Exception) { @@ -87,31 +105,31 @@ class TransportPreviewTransformAction @Inject constructor( return issues } + suspend fun executeSearch(searchRequest: SearchRequest, transform: Transform, listener: ActionListener) { + val response = try { + val searchResponse: SearchResponse = client.suspendUntil { search(searchRequest, it) } + searchResponse + } catch (e: Exception) { + listener.onFailure(e) + return + } - fun executeSearch(searchRequest: SearchRequest, transform: Transform, listener: ActionListener) { - client.search( - searchRequest, - object : ActionListener { - override fun onResponse(response: SearchResponse) { - try { - val transformSearchResult = TransformSearchService.convertResponse( - transform = transform, searchResponse = response, waterMarkDocuments = false - ) - val formattedResult = transformSearchResult.docsToIndex.map { - it.sourceAsMap() - } - listener.onResponse(PreviewTransformResponse(formattedResult, RestStatus.OK)) - } catch (e: Exception) { - listener.onFailure( - OpenSearchStatusException( - "Failed to parse the transformed results", RestStatus.INTERNAL_SERVER_ERROR, ExceptionsHelper.unwrapCause(e) - ) - ) - } - } - - override fun onFailure(e: Exception) = listener.onFailure(e) + try { + val targetIndexDateFieldMappings = TargetIndexMappingService.getTargetMappingsForDates(transform) + val transformSearchResult = TransformSearchService.convertResponse( + transform = transform, searchResponse = response, waterMarkDocuments = false, + targetIndexDateFieldMappings = targetIndexDateFieldMappings + ) + val formattedResult = transformSearchResult.docsToIndex.map { + it.sourceAsMap() } - ) + listener.onResponse(PreviewTransformResponse(formattedResult, RestStatus.OK)) + } catch (e: Exception) { + listener.onFailure( + OpenSearchStatusException( + "Failed to parse the transformed results", RestStatus.INTERNAL_SERVER_ERROR, ExceptionsHelper.unwrapCause(e) + ) + ) + } } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/util/TransformContext.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/util/TransformContext.kt index 8c674724e..3c7627232 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/util/TransformContext.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/util/TransformContext.kt @@ -10,8 +10,10 @@ package org.opensearch.indexmanagement.transform.util */ class TransformContext( val transformLockManager: TransformLockManager, - var lastSuccessfulPageSize: Int? = null + var lastSuccessfulPageSize: Int? = null, ) { + private lateinit var targetDateFieldMappings: Map + fun getMaxRequestTimeoutInSeconds(): Long? { // Lock timeout must be greater than LOCK_BUFFER var maxRequestTimeout = transformLockManager.lockExpirationInSeconds()?.minus(LOCK_BUFFER_SECONDS) @@ -22,6 +24,12 @@ class TransformContext( return maxRequestTimeout } + fun getTargetIndexDateFieldMappings() = targetDateFieldMappings + + fun setTargetDateFieldMappings(dateFieldMappings: Map) { + this.targetDateFieldMappings = dateFieldMappings + } + suspend fun renewLockForLongSearch(timeSpentOnSearch: Long) { transformLockManager.renewLockForLongSearch(timeSpentOnSearch) } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/util/TransformUtils.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/util/TransformUtils.kt new file mode 100644 index 000000000..17f7577b9 --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/util/TransformUtils.kt @@ -0,0 +1,18 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.transform.util + +import org.opensearch.common.time.DateFormatter +import java.time.ZoneId + +const val DEFAULT_DATE_FORMAT = "strict_date_optional_time||epoch_millis" +private const val DATE_FORMAT = "uuuu-MM-dd'T'HH:mm:ss.SSSZZ" +private val timezone: ZoneId = ZoneId.of("UTC") +private val dateFormatter = DateFormatter.forPattern(DATE_FORMAT).withZone(timezone) + +fun formatMillis( + dateTimeInMillis: Long, +): String = dateFormatter.formatMillis(dateTimeInMillis) diff --git a/src/main/resources/mappings/opendistro-ism-config.json b/src/main/resources/mappings/opendistro-ism-config.json index 92084c3ad..e82a1937a 100644 --- a/src/main/resources/mappings/opendistro-ism-config.json +++ b/src/main/resources/mappings/opendistro-ism-config.json @@ -1,6 +1,6 @@ { "_meta" : { - "schema_version": 17 + "schema_version": 18 }, "dynamic": "strict", "properties": { @@ -1197,6 +1197,9 @@ }, "timezone": { "type": "keyword" + }, + "format": { + "type": "keyword" } } }, diff --git a/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt index 0924b3539..dadd99ec3 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt @@ -37,7 +37,7 @@ import kotlin.collections.HashSet abstract class IndexManagementRestTestCase : ODFERestTestCase() { - val configSchemaVersion = 17 + val configSchemaVersion = 18 val historySchemaVersion = 5 // Having issues with tests leaking into other tests and mappings being incorrect and they are not caught by any pending task wait check as diff --git a/src/test/kotlin/org/opensearch/indexmanagement/transform/TargetIndexMappingServiceTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/transform/TargetIndexMappingServiceTests.kt new file mode 100644 index 000000000..46416b4e1 --- /dev/null +++ b/src/test/kotlin/org/opensearch/indexmanagement/transform/TargetIndexMappingServiceTests.kt @@ -0,0 +1,27 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.transform + +import org.junit.Assert +import org.opensearch.test.OpenSearchTestCase + +class TargetIndexMappingServiceTests : OpenSearchTestCase() { + + fun `test create target index mapping fields mapped correctly`() { + val expectedResult = """{"_meta":{"schema_version":1},"dynamic_templates":[{"strings":{"match_mapping_type":"string","mapping":{"type":"keyword"}}}],"properties":{"tpep_pickup_datetime":{"type":"date"}}}""" + val dateFieldMap = mapOf("tpep_pickup_datetime" to mapOf("type" to "date")) + val result = TargetIndexMappingService.createTargetIndexMapping(dateFieldMap) + Assert.assertNotNull(result) + assertEquals("Target index mapping with date fields not correct", expectedResult.trimIndent(), result.trimIndent()) + } + + fun `test create target index mapping empty map`() { + val expectedResult = """{"_meta":{"schema_version":1},"dynamic_templates":[{"strings":{"match_mapping_type":"string","mapping":{"type":"keyword"}}}]}""" + val result = TargetIndexMappingService.createTargetIndexMapping(emptyMap()) + Assert.assertNotNull(result) + assertEquals("Target index mapping with date fields not correct", expectedResult.trimIndent(), result.trimIndent()) + } +} diff --git a/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt index ac02a6917..f41e2e935 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt @@ -10,6 +10,7 @@ import org.apache.hc.core5.http.io.entity.StringEntity import org.opensearch.client.Request import org.opensearch.client.RequestOptions import org.opensearch.common.settings.Settings +import org.opensearch.common.xcontent.XContentType import org.opensearch.index.query.TermQueryBuilder import org.opensearch.indexmanagement.common.model.dimension.DateHistogram import org.opensearch.indexmanagement.common.model.dimension.Histogram @@ -284,6 +285,303 @@ class TransformRunnerIT : TransformRestTestCase() { } } + @Suppress("UNCHECKED_CAST") + fun `test transform term aggregation on date field generate target mapping same as source mapping for date field`() { + val sourceIdxTestName = "source_idx_test_14" + val targetIdxTestName = "target_idx_test_14" + + val pickupDateTime = "tpep_pickup_datetime" + + val fareAmount = "fare_amount" + + validateSourceIndex(sourceIdxTestName) + + val transform = Transform( + id = "id_14", + schemaVersion = 1L, + enabled = true, + enabledAt = Instant.now(), + updatedAt = Instant.now(), + jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES), + description = "test transform doc values must be the same", + metadataId = null, + sourceIndex = sourceIdxTestName, + targetIndex = targetIdxTestName, + roles = emptyList(), + pageSize = 1, + groups = listOf( + Terms(sourceField = pickupDateTime, targetField = pickupDateTime) + ), + aggregations = AggregatorFactories.builder().addAggregator(AggregationBuilders.avg(fareAmount).field(fareAmount)) + ).let { createTransform(it, it.id) } + + updateTransformStartTime(transform) + + waitFor { + assertTrue("Target transform index was not created", indexExists(transform.targetIndex)) + } + + waitFor(Instant.ofEpochSecond(60)) { + val transformJob = getTransform(transformId = transform.id) + assertNotNull("Transform job doesn't have metadata set", transformJob.metadataId) + val transformMetadata = getTransformMetadata(transformJob.metadataId!!) + assertEquals("Transform is not finished", TransformMetadata.Status.FINISHED, transformMetadata.status) + } + + val sourceIndexMapping = client().makeRequest("GET", "/$sourceIdxTestName/_mapping") + val sourceIndexParserMap = createParser(XContentType.JSON.xContent(), sourceIndexMapping.entity.content).map() as Map> + val targetIndexMapping = client().makeRequest("GET", "/$targetIdxTestName/_mapping") + val targetIndexParserMap = createParser(XContentType.JSON.xContent(), targetIndexMapping.entity.content).map() as Map> + + val sourcePickupDate = (((sourceIndexParserMap[sourceIdxTestName]?.get("mappings") as Map)["properties"] as Map)["tpep_pickup_datetime"] as Map)["type"] + val targetPickupDate = (((targetIndexParserMap[targetIdxTestName]?.get("mappings") as Map)["properties"] as Map)["tpep_pickup_datetime"] as Map)["type"] + + assertEquals(sourcePickupDate, targetPickupDate) + + val pickupDateTimeTerm = "pickupDateTerm14" + + val request = """ + { + "size": 0, + "aggs": { + "$pickupDateTimeTerm": { + "terms": { + "field": "$pickupDateTime", "order": { "_key": "asc" } + }, + "aggs": { + "avgFareAmount": { "avg": { "field": "$fareAmount" } } } + } + } + } + """ + + var rawRes = client().makeRequest(RestRequest.Method.POST.name, "/$sourceIdxTestName/_search", emptyMap(), StringEntity(request, ContentType.APPLICATION_JSON)) + assertTrue(rawRes.restStatus() == RestStatus.OK) + + var transformRes = client().makeRequest(RestRequest.Method.POST.name, "/$targetIdxTestName/_search", emptyMap(), StringEntity(request, ContentType.APPLICATION_JSON)) + assertTrue(transformRes.restStatus() == RestStatus.OK) + + val rawAggBuckets = (rawRes.asMap()["aggregations"] as Map>>>>)[pickupDateTimeTerm]!!["buckets"]!! + val transformAggBuckets = (transformRes.asMap()["aggregations"] as Map>>>>)[pickupDateTimeTerm]!!["buckets"]!! + + assertEquals("Different bucket sizes", rawAggBuckets.size, transformAggBuckets.size) + + // Verify the values of keys and metrics in all buckets + for (i in rawAggBuckets.indices) { + assertEquals("Term pickup date bucket keys are not the same", rawAggBuckets[i]["key"], transformAggBuckets[i]["key"]) + assertEquals("Avg fare amounts are not the same", rawAggBuckets[i]["avgFareAmount"], transformAggBuckets[i]["avgFareAmount"]) + } + } + + @Suppress("UNCHECKED_CAST") + fun `test transform max aggregation on date field verify search request term aggregation on store_and_fwd_flag field`() { + val sourceIdxTestName = "source_idx_test_15" + val targetIdxTestName = "target_idx_test_15" + + val storeAndForward = "store_and_fwd_flag" + val pickupDateTime = "tpep_pickup_datetime" + val fareAmount = "fare_amount" + + validateSourceIndex(sourceIdxTestName) + + val avgFareAmountAgg = AggregationBuilders.avg(fareAmount).field(fareAmount) + val maxDateAggBuilder = AggregationBuilders.max(pickupDateTime).field(pickupDateTime) + + val transform = Transform( + id = "id_15", + schemaVersion = 1L, + enabled = true, + enabledAt = Instant.now(), + updatedAt = Instant.now(), + jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES), + description = "test transform doc values must be the same", + metadataId = null, + sourceIndex = sourceIdxTestName, + targetIndex = targetIdxTestName, + roles = emptyList(), + pageSize = 1, + groups = listOf( + Terms(sourceField = storeAndForward, targetField = storeAndForward) + ), + aggregations = AggregatorFactories.builder().addAggregator(avgFareAmountAgg).addAggregator(maxDateAggBuilder) + ).let { createTransform(it, it.id) } + updateTransformStartTime(transform) + + waitFor { + assertTrue("Target transform index was not created", indexExists(transform.targetIndex)) + } + + waitFor(timeout = Instant.ofEpochSecond(30)) { + val transformJob = getTransform(transformId = transform.id) + assertNotNull("Transform job doesn't have metadata set", transformJob.metadataId) + val transformMetadata = getTransformMetadata(transformJob.metadataId!!) + assertEquals("Transform is not finished", TransformMetadata.Status.FINISHED, transformMetadata.status) + } + + val sourceIndexMapping = client().makeRequest("GET", "/$sourceIdxTestName/_mapping") + val sourceIndexParserMap = createParser(XContentType.JSON.xContent(), sourceIndexMapping.entity.content).map() as Map> + val targetIndexMapping = client().makeRequest("GET", "/$targetIdxTestName/_mapping") + val targetIndexParserMap = createParser(XContentType.JSON.xContent(), targetIndexMapping.entity.content).map() as Map> + + val sourcePickupDate = (((sourceIndexParserMap[sourceIdxTestName]?.get("mappings") as Map)["properties"] as Map)[pickupDateTime] as Map)["type"] + val targetPickupDate = (((targetIndexParserMap[targetIdxTestName]?.get("mappings") as Map)["properties"] as Map)[pickupDateTime] as Map)["type"] + + assertEquals("date", targetPickupDate) + assertEquals(sourcePickupDate, targetPickupDate) + + waitFor(Instant.ofEpochSecond(30)) { + val storeAndForwardTerm = "storeAndForwardTerm" + val request = """ + { + "size": 0, + "aggs": { + "$storeAndForwardTerm": { + "terms": { + "field": "$storeAndForward", "order": { "_key": "asc" } + }, + "aggs": { + "$fareAmount": { "avg": { "field": "$fareAmount" } }, + "$pickupDateTime": {"max": {"field": "$pickupDateTime"}} + } + } + } + } + """ + + var rawRes = client().makeRequest(RestRequest.Method.POST.name, "/$sourceIdxTestName/_search", emptyMap(), StringEntity(request, ContentType.APPLICATION_JSON)) + assertTrue(rawRes.restStatus() == RestStatus.OK) + + var transformRes = client().makeRequest(RestRequest.Method.POST.name, "/$targetIdxTestName/_search", emptyMap(), StringEntity(request, ContentType.APPLICATION_JSON)) + assertTrue(transformRes.restStatus() == RestStatus.OK) + + val rawAggBuckets = (rawRes.asMap()["aggregations"] as Map>>>>)[storeAndForwardTerm]!!["buckets"]!! + val transformAggBuckets = (transformRes.asMap()["aggregations"] as Map>>>>)[storeAndForwardTerm]!!["buckets"]!! + + assertEquals("Different bucket sizes", rawAggBuckets.size, transformAggBuckets.size) + + for (i in rawAggBuckets.indices) { + assertEquals("Avg Fare amounts are not the same", rawAggBuckets[i]["fareAmount"], transformAggBuckets[i]["fareAmount"]) + assertEquals("Max pickup date times are not the same", rawAggBuckets[i][pickupDateTime]!!["value"], transformAggBuckets[i][pickupDateTime]!!["value"]) + } + } + } + + @Suppress("UNCHECKED_CAST") + fun `test transform term on date field and aggregation on date field`() { + val sourceIdxTestName = "source_idx_test_16" + val targetIdxTestName = "target_idx_test_16" + + val pickupDateTime = "tpep_pickup_datetime" + val pickupDateTimeTerm = pickupDateTime.plus("_term") + val fareAmount = "fare_amount" + + validateSourceIndex(sourceIdxTestName) + + val avgFareAmountAgg = AggregationBuilders.avg(fareAmount).field(fareAmount) + val countDateAggBuilder = AggregationBuilders.count(pickupDateTime).field(pickupDateTime) + + val transform = Transform( + id = "id_16", + schemaVersion = 1L, + enabled = true, + enabledAt = Instant.now(), + updatedAt = Instant.now(), + jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES), + description = "test transform doc values must be the same", + metadataId = null, + sourceIndex = sourceIdxTestName, + targetIndex = targetIdxTestName, + roles = emptyList(), + pageSize = 1, + groups = listOf( + Terms(sourceField = pickupDateTime, targetField = pickupDateTimeTerm) + ), + aggregations = AggregatorFactories.builder().addAggregator(avgFareAmountAgg).addAggregator(countDateAggBuilder) + ).let { createTransform(it, it.id) } + updateTransformStartTime(transform) + + waitFor { + assertTrue("Target transform index was not created", indexExists(transform.targetIndex)) + } + + waitFor(Instant.ofEpochSecond(60)) { + val transformJob = getTransform(transformId = transform.id) + assertNotNull("Transform job doesn't have metadata set", transformJob.metadataId) + val transformMetadata = getTransformMetadata(transformJob.metadataId!!) + assertEquals("Transform is not finished", TransformMetadata.Status.FINISHED, transformMetadata.status) + } + + val sourceIndexMapping = client().makeRequest("GET", "/$sourceIdxTestName/_mapping") + val sourceIndexParserMap = createParser(XContentType.JSON.xContent(), sourceIndexMapping.entity.content).map() as Map> + val targetIndexMapping = client().makeRequest("GET", "/$targetIdxTestName/_mapping") + val targetIndexParserMap = createParser(XContentType.JSON.xContent(), targetIndexMapping.entity.content).map() as Map> + + val sourceProperties = ((sourceIndexParserMap[sourceIdxTestName]?.get("mappings") as Map)["properties"] as Map) + val targetProperties = ((targetIndexParserMap[targetIdxTestName]?.get("mappings") as Map)["properties"] as Map) + + val sourcePickupDate = (sourceProperties [pickupDateTime] as Map)["type"] + val targetPickupDateTerm = (targetProperties [pickupDateTimeTerm] as Map)["type"] + + assertEquals("date", targetPickupDateTerm) + assertEquals(sourcePickupDate, targetPickupDateTerm) + + val targetPickupDate = (targetProperties [pickupDateTime] as Map)["type"] + + assertEquals("date", targetPickupDate) + assertEquals(sourcePickupDate, targetPickupDate) + + val sourceRequest = """ + { + "size": 0, + "aggs": { + "$pickupDateTimeTerm": { + "terms": { + "field": "$pickupDateTime", "order": { "_key": "asc" } + }, + "aggs": { + "$fareAmount": { "avg": { "field": "$fareAmount" } }, + "$pickupDateTime": {"value_count": {"field": "$pickupDateTime"}} + } + } + } + } + """ + + val targetRequest = """ + { + "size": 0, + "aggs": { + "$pickupDateTimeTerm": { + "terms": { + "field": "$pickupDateTimeTerm", "order": { "_key": "asc" } + }, + "aggs": { + "$fareAmount": { "avg": { "field": "$fareAmount" } }, + "$pickupDateTime": {"value_count": {"field": "$pickupDateTime"}} + } + } + } + } + """ + + var rawRes = client().makeRequest(RestRequest.Method.POST.name, "/$sourceIdxTestName/_search", emptyMap(), StringEntity(sourceRequest, ContentType.APPLICATION_JSON)) + assertTrue(rawRes.restStatus() == RestStatus.OK) + + var transformRes = client().makeRequest(RestRequest.Method.POST.name, "/$targetIdxTestName/_search", emptyMap(), StringEntity(targetRequest, ContentType.APPLICATION_JSON)) + assertTrue(transformRes.restStatus() == RestStatus.OK) + + val rawAggBuckets = (rawRes.asMap()["aggregations"] as Map>>>>)[pickupDateTimeTerm]!!["buckets"]!! + val transformAggBuckets = (transformRes.asMap()["aggregations"] as Map>>>>)[pickupDateTimeTerm]!!["buckets"]!! + + assertEquals("Different bucket sizes", rawAggBuckets.size, transformAggBuckets.size) + + for (i in rawAggBuckets.indices) { + assertEquals("Term pickup date bucket keys are not the same", rawAggBuckets[i]["key"], transformAggBuckets[i]["key"]) + assertEquals("Avg fare amounts are not the same", rawAggBuckets[i]["fareAmount"], transformAggBuckets[i]["fareAmount"]) + assertEquals("Count pickup dates are not the same", rawAggBuckets[i][pickupDateTime]!!["value"], transformAggBuckets[i][pickupDateTime]!!["value"]) + } + } + fun `test transform with failure during indexing`() { validateSourceIndex("transform-source-index") diff --git a/src/test/kotlin/org/opensearch/indexmanagement/transform/resthandler/RestPreviewTransformActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/transform/resthandler/RestPreviewTransformActionIT.kt index 30fa45c1f..d3239edaa 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/transform/resthandler/RestPreviewTransformActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/transform/resthandler/RestPreviewTransformActionIT.kt @@ -8,15 +8,22 @@ package org.opensearch.indexmanagement.transform.resthandler import org.junit.AfterClass import org.junit.Before import org.opensearch.client.ResponseException +import org.opensearch.common.time.DateFormatter import org.opensearch.index.IndexNotFoundException import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.TRANSFORM_BASE_URI import org.opensearch.indexmanagement.common.model.dimension.Terms import org.opensearch.indexmanagement.makeRequest import org.opensearch.indexmanagement.transform.TransformRestTestCase +import org.opensearch.indexmanagement.transform.model.Transform import org.opensearch.indexmanagement.transform.randomTransform +import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule import org.opensearch.rest.RestStatus import org.opensearch.search.aggregations.AggregationBuilders import org.opensearch.search.aggregations.AggregatorFactories +import java.time.Instant +import java.time.ZoneId +import java.time.format.DateTimeParseException +import java.time.temporal.ChronoUnit @Suppress("UNCHECKED_CAST") class RestPreviewTransformActionIT : TransformRestTestCase() { @@ -68,6 +75,46 @@ class RestPreviewTransformActionIT : TransformRestTestCase() { assertEquals("Transformed docs have unexpected schema", expectedKeys, transformedDocs.first().keys) } + fun `test preview with term aggregation on date field`() { + val targetIdxTestName = "target_idx_test_14" + val pickupDateTime = "tpep_pickup_datetime" + val fareAmount = "fare_amount" + + val transform = Transform( + id = "id_14", + schemaVersion = 1L, + enabled = true, + enabledAt = Instant.now(), + updatedAt = Instant.now(), + jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES), + description = "test transform doc values must be the same", + metadataId = null, + sourceIndex = sourceIndex, + targetIndex = targetIdxTestName, + roles = emptyList(), + pageSize = 1, + groups = listOf( + Terms(sourceField = pickupDateTime, targetField = pickupDateTime) + ), + aggregations = AggregatorFactories.builder().addAggregator(AggregationBuilders.avg(fareAmount).field(fareAmount)) + ).let { createTransform(it, it.id) } + + val response = client().makeRequest( + "POST", + "$TRANSFORM_BASE_URI/_preview", + emptyMap(), + transform.toHttpEntity() + ) + val expectedKeys = setOf("fare_amount", "tpep_pickup_datetime", "transform._doc_count", "_doc_count") + assertEquals("Preview transform failed", RestStatus.OK, response.restStatus()) + val transformedDocs = response.asMap()["documents"] as List> + assertEquals("Transformed docs have unexpected schema", expectedKeys, transformedDocs.first().keys) + val dateFormatter = DateFormatter.forPattern("uuuu-MM-dd'T'HH:mm:ss.SSSZZ").withZone(ZoneId.of("UTC")) + for (doc in transformedDocs) { + assertTrue(isValid(doc["tpep_pickup_datetime"].toString().toLong(), dateFormatter)) + } + } + fun `test mismatched columns`() { val factories = AggregatorFactories.builder() .addAggregator(AggregationBuilders.sum("revenue").field("total_amountdzdfd")) @@ -102,4 +149,13 @@ class RestPreviewTransformActionIT : TransformRestTestCase() { assertEquals("Unexpected failure code", RestStatus.NOT_FOUND, e.response.restStatus()) } } + + private fun isValid(date: Long, dateFormatter: DateFormatter): Boolean { + try { + dateFormatter.formatMillis(date) + } catch (e: DateTimeParseException) { + return false + } + return true + } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/transform/util/TransformContextTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/transform/util/TransformContextTests.kt index 9a7b050ff..988033123 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/transform/util/TransformContextTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/transform/util/TransformContextTests.kt @@ -19,6 +19,7 @@ class TransformContextTests : OpenSearchTestCase() { fun setup() { transformLockManager = Mockito.mock(TransformLockManager::class.java) transformContext = TransformContext(transformLockManager) + transformContext.setTargetDateFieldMappings(emptyMap()) } fun `test getMaxRequestTimeoutInSeconds`() { diff --git a/src/test/resources/mappings/cached-opendistro-ism-config.json b/src/test/resources/mappings/cached-opendistro-ism-config.json index 92084c3ad..e82a1937a 100644 --- a/src/test/resources/mappings/cached-opendistro-ism-config.json +++ b/src/test/resources/mappings/cached-opendistro-ism-config.json @@ -1,6 +1,6 @@ { "_meta" : { - "schema_version": 17 + "schema_version": 18 }, "dynamic": "strict", "properties": { @@ -1197,6 +1197,9 @@ }, "timezone": { "type": "keyword" + }, + "format": { + "type": "keyword" } } },