From b22296c9594df75a9835f7ad3499d04b24122a9f Mon Sep 17 00:00:00 2001 From: Alex Liubymov <41168636+aliubymov-playq@users.noreply.github.com> Date: Thu, 6 Aug 2020 18:01:09 +0300 Subject: [PATCH] Feature: Add ScanDeleteBatch request. (#137) --- .../scala/d4s/DynamoInterpreterTest.scala | 46 +++++++---- .../main/scala/d4s/DynamoInterpreter.scala | 19 +++-- .../scala/d4s/models/query/DynamoQuery.scala | 10 +++ .../d4s/models/query/DynamoRequest.scala | 25 ++---- .../query/requests/QueryDeleteBatch.scala | 12 +-- .../query/requests/ScanDeleteBatch.scala | 80 +++++++++++++++++++ .../d4s/models/table/TableReference.scala | 3 + 7 files changed, 150 insertions(+), 45 deletions(-) create mode 100644 d4s/src/main/scala/d4s/models/query/requests/ScanDeleteBatch.scala diff --git a/d4s-test/src/test/scala/d4s/DynamoInterpreterTest.scala b/d4s-test/src/test/scala/d4s/DynamoInterpreterTest.scala index e13e80f..daf7e01 100644 --- a/d4s-test/src/test/scala/d4s/DynamoInterpreterTest.scala +++ b/d4s-test/src/test/scala/d4s/DynamoInterpreterTest.scala @@ -343,7 +343,7 @@ final class DynamoInterpreterTest extends DynamoTestBase[Ctx] with DynamoRnd { val put = testTable.table.putItem.withPrefix(prefix) val expectedSize = 100 for { - _ <- IO.foreachParN(3)(1 to expectedSize)(i => connector.runUnrecorded(put.withItem(payload.copy(field2 = i)).retryWithPrefix(testTable.ddl))) + _ <- IO.foreachParN(3)((1 to expectedSize).toList)(i => connector.runUnrecorded(put.withItem(payload.copy(field2 = i)).retryWithPrefix(testTable.ddl))) get = testTable.table.query .withKey(testTable.mainKey.bind("paging_test")) @@ -366,7 +366,7 @@ final class DynamoInterpreterTest extends DynamoTestBase[Ctx] with DynamoRnd { val put = testTable.table.putItem.withPrefix(prefix) val expectedSize = 100 for { - _ <- IO.foreachParN(3)(1 to expectedSize)(i => connector.runUnrecorded(put.withItem(payload.copy(field2 = i)).retryWithPrefix(testTable.ddl))) + _ <- IO.foreachParN(3)((1 to expectedSize).toList)(i => connector.runUnrecorded(put.withItem(payload.copy(field2 = i)).retryWithPrefix(testTable.ddl))) ref <- Ref.make(Set.empty[InterpreterTestPayload]) get = testTable.table.query @@ -402,7 +402,7 @@ final class DynamoInterpreterTest extends DynamoTestBase[Ctx] with DynamoRnd { val put = testTable.table.putItem.withPrefix(prefix) val expectedSize = 20 for { - _ <- IO.foreachParN(3)(1 to expectedSize) { + _ <- IO.foreachParN(3)((1 to expectedSize).toList) { i => connector.runUnrecorded(put.withItem(payload.copy(field2 = i)).retryWithPrefix(testTable.ddl)) } @@ -557,7 +557,7 @@ final class DynamoInterpreterTest extends DynamoTestBase[Ctx] with DynamoRnd { } yield () } - "perform queryDeleteBatch request" in scopeIO { + "perform streamed DeleteBatch request" in scopeIO { ctx => import ctx._ val payload = InterpreterTestPayload("batch_test", 333, "f3", RandomPayload("f2")) @@ -584,39 +584,51 @@ final class DynamoInterpreterTest extends DynamoTestBase[Ctx] with DynamoRnd { _ <- assertIO(read1.size == expectedSize) _ <- assertIO(read1.toSet == items.map(_.item).toSet) - delete = + deleteQuery = testTable.table .queryDeleteBatch(testTable.mainKey.bind("batch_test")) .withPrefix(prefix) - .exec .retryWithPrefix(testTable.ddl) - _ <- connector.runUnrecorded(delete) + _ <- connector.runUnrecorded(deleteQuery) read2 <- connector.runUnrecorded(get) _ <- assertIO(read2.isEmpty) + + _ <- connector.runUnrecorded(put) + + read3 <- connector.runUnrecorded(get) + _ <- assertIO(read3.size == expectedSize) + _ <- assertIO(read3.toSet == items.map(_.item).toSet) + + deleteScan = + testTable.table.scanDeleteBatch + .withPrefix(prefix) + .retryWithPrefix(testTable.ddl) + _ <- connector.runUnrecorded(deleteScan) + + read4 <- connector.runUnrecorded(get) + _ <- assertIO(read4.isEmpty) } yield () } - "perform queryDeleteBatch request on non-existent table with retryWithPrefix" in scopeIO { + "perform streamed DeleteBatch request on non-existent table with retryWithPrefix" in scopeIO { ctx => import ctx._ - val prefix = UUID.randomUUID() + val prefix1 = UUID.randomUUID() + val prefix2 = UUID.randomUUID() for { _ <- connector.runUnrecorded( testTable.table .queryDeleteBatch(testTable.mainKey.bind("batch_test")) - .withPrefix(prefix) + .withPrefix(prefix1) .retryWithPrefix(testTable.ddl) ) - read2 <- connector.runUnrecorded( - testTable.table.query - .withPrefix(prefix) - .withKey(testTable.mainKey.bind("batch_test")) - .decodeItems[InterpreterTestPayload] - .execPagedFlatten() + _ <- connector.runUnrecorded( + testTable.table.scanDeleteBatch + .withPrefix(prefix2) + .retryWithPrefix(testTable.ddl) ) - _ <- assertIO(read2.isEmpty) } yield () } diff --git a/d4s/src/main/scala/d4s/DynamoInterpreter.scala b/d4s/src/main/scala/d4s/DynamoInterpreter.scala index 6c2a468..8b0398d 100644 --- a/d4s/src/main/scala/d4s/DynamoInterpreter.scala +++ b/d4s/src/main/scala/d4s/DynamoInterpreter.scala @@ -4,7 +4,7 @@ import d4s.config.{DynamoBatchConfig, DynamoConfig} import d4s.models.DynamoException import d4s.models.DynamoException.InterpreterException import d4s.models.ExecutionStrategy.StrategyInput -import d4s.models.query.DynamoRequest.{DynamoWriteBatchRequest, WithBatch} +import d4s.models.query.DynamoRequest.{DynamoWriteBatchRequest, PageableRequest, WithBatch, WithProjectionExpression, WithTableReference} import d4s.models.query._ import d4s.models.query.requests._ import izumi.functional.bio.catz._ @@ -14,6 +14,7 @@ import software.amazon.awssdk.services.dynamodb.DynamoDbClient import software.amazon.awssdk.services.dynamodb.model._ import scala.concurrent.duration._ +import scala.language.reflectiveCalls trait DynamoInterpreter[F[_, _]] { def run[DR <: DynamoRequest, Dec]( @@ -73,7 +74,8 @@ object DynamoInterpreter { case q: DeleteItemBatch => runWriteBatch(q).logWrapError("DeleteItemBatch", q.table.fullName, tapError) case q: PutItemBatch => runWriteBatch(q).logWrapError("WriteItemBatch", q.table.fullName, tapError) case q: GetItemBatch => runGetBatch(q).logWrapError("GetItemBatch", q.table.fullName, tapError) - case q: QueryDeleteBatch => runQueryDeleteBatch(q).logWrapError("QueryDeleteBatch", q.table.fullName, tapError) + case q: QueryDeleteBatch => runStreamDeleteBatch(q.wrapped.toQuery, q.maxParallelDeletes).logWrapError("QueryDeleteBatch", q.table.fullName, tapError) + case q: ScanDeleteBatch => runStreamDeleteBatch(q.wrapped.toQuery, q.maxParallelDeletes).logWrapError("ScanDeleteBatch", q.table.fullName, tapError) case q: UpdateContinuousBackups => updateContinuousBackups(q.toAmz).logWrapError("UpdateContinuousBackups", q.table.fullName, tapError) @@ -81,18 +83,23 @@ object DynamoInterpreter { } } - private[this] def runQueryDeleteBatch(rq: QueryDeleteBatch): F[Throwable, List[BatchWriteItemResponse]] = { + private[this] def runStreamDeleteBatch[DR <: DynamoRequest with WithProjectionExpression[DR] with WithTableReference[DR]]( + dynamoQuery: DynamoQuery[DR, _], + parallelism: Option[Int], + )(implicit ev0: DR#Rsp => { def items(): java.util.List[java.util.Map[String, AttributeValue]] }, + ev1: PageableRequest[DR], + ): F[Throwable, List[BatchWriteItemResponse]] = { import scala.jdk.CollectionConverters._ - val exec = rq.toRegularQuery.toQuery - .withProjectionExpression(rq.table.key.keyFields.toList: _*) + val exec = dynamoQuery + .withProjectionExpression(dynamoQuery.table.key.keyFields.toList: _*) .decode(_.items().asScala.map(_.asScala.toMap).toList) .execStreamedFlatten exec .executionStrategy(StrategyInput(exec.dynamoQuery, F, this)) .chunkN(batchConfig.writeBatchSize) - .parEvalMap(rq.maxParallelDeletes.getOrElse(Int.MaxValue))(itemsChunk => runWriteBatch(DeleteItemBatch(rq.table, itemsChunk.toList))) + .parEvalMap(parallelism.getOrElse(Int.MaxValue))(itemsChunk => runWriteBatch(DeleteItemBatch(dynamoQuery.table, itemsChunk.toList))) .flatMap(fs2.Stream.emits) .compile.toList } diff --git a/d4s/src/main/scala/d4s/models/query/DynamoQuery.scala b/d4s/src/main/scala/d4s/models/query/DynamoQuery.scala index f24801c..e151625 100644 --- a/d4s/src/main/scala/d4s/models/query/DynamoQuery.scala +++ b/d4s/src/main/scala/d4s/models/query/DynamoQuery.scala @@ -287,6 +287,16 @@ object DynamoQuery { def consistent: DynamoQuery[DR, Dec] = withConsistent(true) } + implicit final class TweakWithParallelism[DR <: DynamoRequest with WithParallelism[DR], Dec]( + dynamoQuery: DynamoQuery[DR, Dec] + ) extends WithParallelism[DynamoQuery[DR, Dec]] { + @inline override def maxParallelDeletes: Option[Int] = dynamoQuery.request.maxParallelDeletes + + @inline override def withParallelism(parallelism: Int): DynamoQuery[DR, Dec] = { + dynamoQuery.modify(_.withParallelism(parallelism)) + } + } + implicit final class TweakReturnValue[DR <: DynamoRequest with WithReturnValue[DR], Dec]( dynamoQuery: DynamoQuery[DR, Dec] ) extends WithReturnValue[DynamoQuery[DR, Dec]] { diff --git a/d4s/src/main/scala/d4s/models/query/DynamoRequest.scala b/d4s/src/main/scala/d4s/models/query/DynamoRequest.scala index 552f76c..2192f47 100644 --- a/d4s/src/main/scala/d4s/models/query/DynamoRequest.scala +++ b/d4s/src/main/scala/d4s/models/query/DynamoRequest.scala @@ -147,21 +147,12 @@ object DynamoRequest { final def withItems[I1: D4SEncoder, I2: D4SEncoder](v1: I1, v2: I2): A = withItemAttributeValues(D4SEncoder[I1].encodeObject(v1) ++ D4SEncoder[I2].encodeObject(v2)) } - type HasTableReference[A] = A => WithTableReference[A] - type HasIndex[A] = A => WithIndex[A] - type HasCondition[A] = A => WithCondition[A] - type HasFilterExpression[A] = A => WithFilterExpression[A] - type HasProjectionExpression[A] = A => WithProjectionExpression[A] - type HasUpdateExpression[A] = A => WithUpdateExpression[A] - type HasAttributeValues[A] = A => WithAttributeValues[A] - type HasAttributeNames[A] = A => WithAttributeNames[A] - type HasLimit[A] = A => WithLimit[A] - type HasSelect[A] = A => WithSelect[A] - type HasReturnValue[A] = A => WithReturnValue[A] - type HasStartKey[A] = A => WithStartKey[A] - type HasConsistent[A] = A => WithConsistent[A] - type HasBatch[A, BatchType[_]] = A => WithBatch[A, BatchType] - type HasScanIndexForward[A] = A => WithScanIndexForward[A] - type HasKey[A] = A => WithKey[A] - type HasItem[A] = A => WithItem[A] + trait WithParallelism[A] { + def maxParallelDeletes: Option[Int] + def withParallelism(parallelism: Int): A + } + + trait WithWrappedRequest[Wrapped] { + def wrapped: Wrapped + } } diff --git a/d4s/src/main/scala/d4s/models/query/requests/QueryDeleteBatch.scala b/d4s/src/main/scala/d4s/models/query/requests/QueryDeleteBatch.scala index 180701a..b3ba243 100644 --- a/d4s/src/main/scala/d4s/models/query/requests/QueryDeleteBatch.scala +++ b/d4s/src/main/scala/d4s/models/query/requests/QueryDeleteBatch.scala @@ -5,7 +5,7 @@ import java.util import d4s.models.conditions.Condition import d4s.models.conditions.Condition.ZeroCondition import d4s.models.query.DynamoRequest -import d4s.models.query.DynamoRequest.{WithAttributeNames, WithAttributeValues, WithCondition, WithConsistent, WithFilterExpression, WithIndex, WithKey, WithLimit, WithProjectionExpression, WithScanIndexForward, WithSelect, WithStartKey, WithTableReference} +import d4s.models.query.DynamoRequest.{WithAttributeNames, WithAttributeValues, WithCondition, WithConsistent, WithFilterExpression, WithIndex, WithKey, WithLimit, WithParallelism, WithProjectionExpression, WithScanIndexForward, WithSelect, WithStartKey, WithTableReference, WithWrappedRequest} import d4s.models.table.TableReference import d4s.models.table.index.TableIndex import software.amazon.awssdk.services.dynamodb.model.{AttributeValue, BatchWriteItemResponse, QueryRequest, Select} @@ -38,12 +38,14 @@ final case class QueryDeleteBatch( with WithScanIndexForward[QueryDeleteBatch] with WithCondition[QueryDeleteBatch] with WithTableReference[QueryDeleteBatch] - with WithKey[QueryDeleteBatch] { + with WithKey[QueryDeleteBatch] + with WithParallelism[QueryDeleteBatch] + with WithWrappedRequest[Query] { override type Rq = QueryRequest override type Rsp = List[BatchWriteItemResponse] - def withParallelism(parallelism: Int): QueryDeleteBatch = copy(maxParallelDeletes = Some(parallelism)) + override def withParallelism(parallelism: Int): QueryDeleteBatch = copy(maxParallelDeletes = Some(parallelism)) override def withCondition(t: Condition): QueryDeleteBatch = copy(condition = condition && t) @@ -72,9 +74,9 @@ final case class QueryDeleteBatch( override def withKey(f: Map[String, AttributeValue] => Map[String, AttributeValue]): QueryDeleteBatch = copy(keyConditionAttributeValues = f(keyConditionAttributeValues)) - override def toAmz: QueryRequest = this.toRegularQuery.toAmz + override def toAmz: QueryRequest = wrapped.toAmz - def toRegularQuery: Query = { + override def wrapped: Query = { Query( table = table, index = index, diff --git a/d4s/src/main/scala/d4s/models/query/requests/ScanDeleteBatch.scala b/d4s/src/main/scala/d4s/models/query/requests/ScanDeleteBatch.scala new file mode 100644 index 0000000..4d6bd0d --- /dev/null +++ b/d4s/src/main/scala/d4s/models/query/requests/ScanDeleteBatch.scala @@ -0,0 +1,80 @@ +package d4s.models.query.requests + +import java.util + +import d4s.models.conditions.Condition +import d4s.models.conditions.Condition.ZeroCondition +import d4s.models.query.DynamoRequest +import d4s.models.query.DynamoRequest._ +import d4s.models.table.TableReference +import d4s.models.table.index.TableIndex +import software.amazon.awssdk.services.dynamodb.model.{AttributeValue, BatchWriteItemResponse, ScanRequest, Select} + +final case class ScanDeleteBatch( + table: TableReference, + maxParallelDeletes: Option[Int] = None, + index: Option[String] = None, + filterExpression: Condition = ZeroCondition, + attributeValues: Map[String, AttributeValue] = Map.empty, + attributeNames: Map[String, String] = Map.empty, + projectionExpression: Option[String] = None, + limit: Option[Int] = None, + select: Option[Select] = None, + startKey: Option[java.util.Map[String, AttributeValue]] = None, + consistent: Boolean = false, +) extends DynamoRequest + with WithFilterExpression[ScanDeleteBatch] + with WithAttributeValues[ScanDeleteBatch] + with WithAttributeNames[ScanDeleteBatch] + with WithProjectionExpression[ScanDeleteBatch] + with WithSelect[ScanDeleteBatch] + with WithStartKey[ScanDeleteBatch] + with WithLimit[ScanDeleteBatch] + with WithTableReference[ScanDeleteBatch] + with WithIndex[ScanDeleteBatch] + with WithConsistent[ScanDeleteBatch] + with WithParallelism[ScanDeleteBatch] + with WithWrappedRequest[Scan] { + + override type Rq = ScanRequest + override type Rsp = List[BatchWriteItemResponse] + + override def withParallelism(parallelism: Int): ScanDeleteBatch = copy(maxParallelDeletes = Some(parallelism)) + + override def withAttributeNames(an: Map[String, String] => Map[String, String]): ScanDeleteBatch = copy(attributeNames = an(attributeNames)) + + override def withConsistent(consistentRead: Boolean): ScanDeleteBatch = copy(consistent = consistentRead) + + override def withSelect(newSelect: Select): ScanDeleteBatch = copy(select = Some(newSelect)) + + override def withIndex(index: TableIndex[_, _]): ScanDeleteBatch = copy(index = Some(index.name)) + + override def withStartKeyMap(startKey: util.Map[String, AttributeValue]): ScanDeleteBatch = copy(startKey = Some(startKey)) + + override def withTableReference(t: TableReference => TableReference): ScanDeleteBatch = copy(table = t(table)) + + override def withProjectionExpression(f: Option[String] => Option[String]): ScanDeleteBatch = copy(projectionExpression = f(projectionExpression)) + + override def withFilterExpression(t: Condition): ScanDeleteBatch = copy(filterExpression = filterExpression && t) + + override def withAttributeValues(f: Map[String, AttributeValue] => Map[String, AttributeValue]): ScanDeleteBatch = copy(attributeValues = f(attributeValues)) + + override def withLimit(l: Int): ScanDeleteBatch = copy(limit = Some(l)) + + override def toAmz: ScanRequest = wrapped.toAmz + + override def wrapped: Scan = { + Scan( + table = table, + index = index, + filterExpression = filterExpression, + attributeValues = attributeValues, + attributeNames = attributeNames, + projectionExpression = projectionExpression, + limit = limit, + select = select, + startKey = startKey, + consistent = consistent, + ) + } +} diff --git a/d4s/src/main/scala/d4s/models/table/TableReference.scala b/d4s/src/main/scala/d4s/models/table/TableReference.scala index a34c12a..1990f2e 100644 --- a/d4s/src/main/scala/d4s/models/table/TableReference.scala +++ b/d4s/src/main/scala/d4s/models/table/TableReference.scala @@ -128,5 +128,8 @@ object TableReference { QueryDeleteBatch(table).withIndex(index).withKeyField(index.key.hashKey)(hashKey).toQuery def queryDeleteBatch[H, R](index: TableIndex[H, R], hashKey: H, rangeKey: R): DynamoQuery[QueryDeleteBatch, List[BatchWriteItemResponse]] = QueryDeleteBatch(table).withIndex(index).withKey(index.key.bind(hashKey, rangeKey)).toQuery + + def scanDeleteBatch: DynamoQuery[ScanDeleteBatch, List[BatchWriteItemResponse]] = ScanDeleteBatch(table).toQuery + def scanDeleteBatch(maxParallelDeletes: Int): DynamoQuery[ScanDeleteBatch, List[BatchWriteItemResponse]] = ScanDeleteBatch(table, Some(maxParallelDeletes)).toQuery } }