Skip to content
This repository has been archived by the owner on Sep 27, 2021. It is now read-only.

Commit

Permalink
Feature: Add ScanDeleteBatch request. (#137)
Browse files Browse the repository at this point in the history
  • Loading branch information
aliubymov-playq authored Aug 6, 2020
1 parent acda2f1 commit b22296c
Show file tree
Hide file tree
Showing 7 changed files with 150 additions and 45 deletions.
46 changes: 29 additions & 17 deletions d4s-test/src/test/scala/d4s/DynamoInterpreterTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ final class DynamoInterpreterTest extends DynamoTestBase[Ctx] with DynamoRnd {
val put = testTable.table.putItem.withPrefix(prefix)
val expectedSize = 100
for {
_ <- IO.foreachParN(3)(1 to expectedSize)(i => connector.runUnrecorded(put.withItem(payload.copy(field2 = i)).retryWithPrefix(testTable.ddl)))
_ <- IO.foreachParN(3)((1 to expectedSize).toList)(i => connector.runUnrecorded(put.withItem(payload.copy(field2 = i)).retryWithPrefix(testTable.ddl)))
get =
testTable.table.query
.withKey(testTable.mainKey.bind("paging_test"))
Expand All @@ -366,7 +366,7 @@ final class DynamoInterpreterTest extends DynamoTestBase[Ctx] with DynamoRnd {
val put = testTable.table.putItem.withPrefix(prefix)
val expectedSize = 100
for {
_ <- IO.foreachParN(3)(1 to expectedSize)(i => connector.runUnrecorded(put.withItem(payload.copy(field2 = i)).retryWithPrefix(testTable.ddl)))
_ <- IO.foreachParN(3)((1 to expectedSize).toList)(i => connector.runUnrecorded(put.withItem(payload.copy(field2 = i)).retryWithPrefix(testTable.ddl)))
ref <- Ref.make(Set.empty[InterpreterTestPayload])
get =
testTable.table.query
Expand Down Expand Up @@ -402,7 +402,7 @@ final class DynamoInterpreterTest extends DynamoTestBase[Ctx] with DynamoRnd {
val put = testTable.table.putItem.withPrefix(prefix)
val expectedSize = 20
for {
_ <- IO.foreachParN(3)(1 to expectedSize) {
_ <- IO.foreachParN(3)((1 to expectedSize).toList) {
i =>
connector.runUnrecorded(put.withItem(payload.copy(field2 = i)).retryWithPrefix(testTable.ddl))
}
Expand Down Expand Up @@ -557,7 +557,7 @@ final class DynamoInterpreterTest extends DynamoTestBase[Ctx] with DynamoRnd {
} yield ()
}

"perform queryDeleteBatch request" in scopeIO {
"perform streamed DeleteBatch request" in scopeIO {
ctx =>
import ctx._
val payload = InterpreterTestPayload("batch_test", 333, "f3", RandomPayload("f2"))
Expand All @@ -584,39 +584,51 @@ final class DynamoInterpreterTest extends DynamoTestBase[Ctx] with DynamoRnd {
_ <- assertIO(read1.size == expectedSize)
_ <- assertIO(read1.toSet == items.map(_.item).toSet)

delete =
deleteQuery =
testTable.table
.queryDeleteBatch(testTable.mainKey.bind("batch_test"))
.withPrefix(prefix)
.exec
.retryWithPrefix(testTable.ddl)
_ <- connector.runUnrecorded(delete)
_ <- connector.runUnrecorded(deleteQuery)

read2 <- connector.runUnrecorded(get)
_ <- assertIO(read2.isEmpty)

_ <- connector.runUnrecorded(put)

read3 <- connector.runUnrecorded(get)
_ <- assertIO(read3.size == expectedSize)
_ <- assertIO(read3.toSet == items.map(_.item).toSet)

deleteScan =
testTable.table.scanDeleteBatch
.withPrefix(prefix)
.retryWithPrefix(testTable.ddl)
_ <- connector.runUnrecorded(deleteScan)

read4 <- connector.runUnrecorded(get)
_ <- assertIO(read4.isEmpty)
} yield ()
}

"perform queryDeleteBatch request on non-existent table with retryWithPrefix" in scopeIO {
"perform streamed DeleteBatch request on non-existent table with retryWithPrefix" in scopeIO {
ctx =>
import ctx._
val prefix = UUID.randomUUID()
val prefix1 = UUID.randomUUID()
val prefix2 = UUID.randomUUID()

for {
_ <- connector.runUnrecorded(
testTable.table
.queryDeleteBatch(testTable.mainKey.bind("batch_test"))
.withPrefix(prefix)
.withPrefix(prefix1)
.retryWithPrefix(testTable.ddl)
)
read2 <- connector.runUnrecorded(
testTable.table.query
.withPrefix(prefix)
.withKey(testTable.mainKey.bind("batch_test"))
.decodeItems[InterpreterTestPayload]
.execPagedFlatten()
_ <- connector.runUnrecorded(
testTable.table.scanDeleteBatch
.withPrefix(prefix2)
.retryWithPrefix(testTable.ddl)
)
_ <- assertIO(read2.isEmpty)
} yield ()
}

Expand Down
19 changes: 13 additions & 6 deletions d4s/src/main/scala/d4s/DynamoInterpreter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import d4s.config.{DynamoBatchConfig, DynamoConfig}
import d4s.models.DynamoException
import d4s.models.DynamoException.InterpreterException
import d4s.models.ExecutionStrategy.StrategyInput
import d4s.models.query.DynamoRequest.{DynamoWriteBatchRequest, WithBatch}
import d4s.models.query.DynamoRequest.{DynamoWriteBatchRequest, PageableRequest, WithBatch, WithProjectionExpression, WithTableReference}
import d4s.models.query._
import d4s.models.query.requests._
import izumi.functional.bio.catz._
Expand All @@ -14,6 +14,7 @@ import software.amazon.awssdk.services.dynamodb.DynamoDbClient
import software.amazon.awssdk.services.dynamodb.model._

import scala.concurrent.duration._
import scala.language.reflectiveCalls

trait DynamoInterpreter[F[_, _]] {
def run[DR <: DynamoRequest, Dec](
Expand Down Expand Up @@ -73,26 +74,32 @@ object DynamoInterpreter {
case q: DeleteItemBatch => runWriteBatch(q).logWrapError("DeleteItemBatch", q.table.fullName, tapError)
case q: PutItemBatch => runWriteBatch(q).logWrapError("WriteItemBatch", q.table.fullName, tapError)
case q: GetItemBatch => runGetBatch(q).logWrapError("GetItemBatch", q.table.fullName, tapError)
case q: QueryDeleteBatch => runQueryDeleteBatch(q).logWrapError("QueryDeleteBatch", q.table.fullName, tapError)
case q: QueryDeleteBatch => runStreamDeleteBatch(q.wrapped.toQuery, q.maxParallelDeletes).logWrapError("QueryDeleteBatch", q.table.fullName, tapError)
case q: ScanDeleteBatch => runStreamDeleteBatch(q.wrapped.toQuery, q.maxParallelDeletes).logWrapError("ScanDeleteBatch", q.table.fullName, tapError)

case q: UpdateContinuousBackups => updateContinuousBackups(q.toAmz).logWrapError("UpdateContinuousBackups", q.table.fullName, tapError)

case r: RawRequest[_, _] => r.interpret(r.toAmz)(F, client).logWrapError("RawRequest")
}
}

private[this] def runQueryDeleteBatch(rq: QueryDeleteBatch): F[Throwable, List[BatchWriteItemResponse]] = {
private[this] def runStreamDeleteBatch[DR <: DynamoRequest with WithProjectionExpression[DR] with WithTableReference[DR]](
dynamoQuery: DynamoQuery[DR, _],
parallelism: Option[Int],
)(implicit ev0: DR#Rsp => { def items(): java.util.List[java.util.Map[String, AttributeValue]] },
ev1: PageableRequest[DR],
): F[Throwable, List[BatchWriteItemResponse]] = {
import scala.jdk.CollectionConverters._

val exec = rq.toRegularQuery.toQuery
.withProjectionExpression(rq.table.key.keyFields.toList: _*)
val exec = dynamoQuery
.withProjectionExpression(dynamoQuery.table.key.keyFields.toList: _*)
.decode(_.items().asScala.map(_.asScala.toMap).toList)
.execStreamedFlatten

exec
.executionStrategy(StrategyInput(exec.dynamoQuery, F, this))
.chunkN(batchConfig.writeBatchSize)
.parEvalMap(rq.maxParallelDeletes.getOrElse(Int.MaxValue))(itemsChunk => runWriteBatch(DeleteItemBatch(rq.table, itemsChunk.toList)))
.parEvalMap(parallelism.getOrElse(Int.MaxValue))(itemsChunk => runWriteBatch(DeleteItemBatch(dynamoQuery.table, itemsChunk.toList)))
.flatMap(fs2.Stream.emits)
.compile.toList
}
Expand Down
10 changes: 10 additions & 0 deletions d4s/src/main/scala/d4s/models/query/DynamoQuery.scala
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,16 @@ object DynamoQuery {
def consistent: DynamoQuery[DR, Dec] = withConsistent(true)
}

implicit final class TweakWithParallelism[DR <: DynamoRequest with WithParallelism[DR], Dec](
dynamoQuery: DynamoQuery[DR, Dec]
) extends WithParallelism[DynamoQuery[DR, Dec]] {
@inline override def maxParallelDeletes: Option[Int] = dynamoQuery.request.maxParallelDeletes

@inline override def withParallelism(parallelism: Int): DynamoQuery[DR, Dec] = {
dynamoQuery.modify(_.withParallelism(parallelism))
}
}

implicit final class TweakReturnValue[DR <: DynamoRequest with WithReturnValue[DR], Dec](
dynamoQuery: DynamoQuery[DR, Dec]
) extends WithReturnValue[DynamoQuery[DR, Dec]] {
Expand Down
25 changes: 8 additions & 17 deletions d4s/src/main/scala/d4s/models/query/DynamoRequest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -147,21 +147,12 @@ object DynamoRequest {
final def withItems[I1: D4SEncoder, I2: D4SEncoder](v1: I1, v2: I2): A = withItemAttributeValues(D4SEncoder[I1].encodeObject(v1) ++ D4SEncoder[I2].encodeObject(v2))
}

type HasTableReference[A] = A => WithTableReference[A]
type HasIndex[A] = A => WithIndex[A]
type HasCondition[A] = A => WithCondition[A]
type HasFilterExpression[A] = A => WithFilterExpression[A]
type HasProjectionExpression[A] = A => WithProjectionExpression[A]
type HasUpdateExpression[A] = A => WithUpdateExpression[A]
type HasAttributeValues[A] = A => WithAttributeValues[A]
type HasAttributeNames[A] = A => WithAttributeNames[A]
type HasLimit[A] = A => WithLimit[A]
type HasSelect[A] = A => WithSelect[A]
type HasReturnValue[A] = A => WithReturnValue[A]
type HasStartKey[A] = A => WithStartKey[A]
type HasConsistent[A] = A => WithConsistent[A]
type HasBatch[A, BatchType[_]] = A => WithBatch[A, BatchType]
type HasScanIndexForward[A] = A => WithScanIndexForward[A]
type HasKey[A] = A => WithKey[A]
type HasItem[A] = A => WithItem[A]
trait WithParallelism[A] {
def maxParallelDeletes: Option[Int]
def withParallelism(parallelism: Int): A
}

trait WithWrappedRequest[Wrapped] {
def wrapped: Wrapped
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import java.util
import d4s.models.conditions.Condition
import d4s.models.conditions.Condition.ZeroCondition
import d4s.models.query.DynamoRequest
import d4s.models.query.DynamoRequest.{WithAttributeNames, WithAttributeValues, WithCondition, WithConsistent, WithFilterExpression, WithIndex, WithKey, WithLimit, WithProjectionExpression, WithScanIndexForward, WithSelect, WithStartKey, WithTableReference}
import d4s.models.query.DynamoRequest.{WithAttributeNames, WithAttributeValues, WithCondition, WithConsistent, WithFilterExpression, WithIndex, WithKey, WithLimit, WithParallelism, WithProjectionExpression, WithScanIndexForward, WithSelect, WithStartKey, WithTableReference, WithWrappedRequest}
import d4s.models.table.TableReference
import d4s.models.table.index.TableIndex
import software.amazon.awssdk.services.dynamodb.model.{AttributeValue, BatchWriteItemResponse, QueryRequest, Select}
Expand Down Expand Up @@ -38,12 +38,14 @@ final case class QueryDeleteBatch(
with WithScanIndexForward[QueryDeleteBatch]
with WithCondition[QueryDeleteBatch]
with WithTableReference[QueryDeleteBatch]
with WithKey[QueryDeleteBatch] {
with WithKey[QueryDeleteBatch]
with WithParallelism[QueryDeleteBatch]
with WithWrappedRequest[Query] {

override type Rq = QueryRequest
override type Rsp = List[BatchWriteItemResponse]

def withParallelism(parallelism: Int): QueryDeleteBatch = copy(maxParallelDeletes = Some(parallelism))
override def withParallelism(parallelism: Int): QueryDeleteBatch = copy(maxParallelDeletes = Some(parallelism))

override def withCondition(t: Condition): QueryDeleteBatch = copy(condition = condition && t)

Expand Down Expand Up @@ -72,9 +74,9 @@ final case class QueryDeleteBatch(
override def withKey(f: Map[String, AttributeValue] => Map[String, AttributeValue]): QueryDeleteBatch =
copy(keyConditionAttributeValues = f(keyConditionAttributeValues))

override def toAmz: QueryRequest = this.toRegularQuery.toAmz
override def toAmz: QueryRequest = wrapped.toAmz

def toRegularQuery: Query = {
override def wrapped: Query = {
Query(
table = table,
index = index,
Expand Down
80 changes: 80 additions & 0 deletions d4s/src/main/scala/d4s/models/query/requests/ScanDeleteBatch.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package d4s.models.query.requests

import java.util

import d4s.models.conditions.Condition
import d4s.models.conditions.Condition.ZeroCondition
import d4s.models.query.DynamoRequest
import d4s.models.query.DynamoRequest._
import d4s.models.table.TableReference
import d4s.models.table.index.TableIndex
import software.amazon.awssdk.services.dynamodb.model.{AttributeValue, BatchWriteItemResponse, ScanRequest, Select}

final case class ScanDeleteBatch(
table: TableReference,
maxParallelDeletes: Option[Int] = None,
index: Option[String] = None,
filterExpression: Condition = ZeroCondition,
attributeValues: Map[String, AttributeValue] = Map.empty,
attributeNames: Map[String, String] = Map.empty,
projectionExpression: Option[String] = None,
limit: Option[Int] = None,
select: Option[Select] = None,
startKey: Option[java.util.Map[String, AttributeValue]] = None,
consistent: Boolean = false,
) extends DynamoRequest
with WithFilterExpression[ScanDeleteBatch]
with WithAttributeValues[ScanDeleteBatch]
with WithAttributeNames[ScanDeleteBatch]
with WithProjectionExpression[ScanDeleteBatch]
with WithSelect[ScanDeleteBatch]
with WithStartKey[ScanDeleteBatch]
with WithLimit[ScanDeleteBatch]
with WithTableReference[ScanDeleteBatch]
with WithIndex[ScanDeleteBatch]
with WithConsistent[ScanDeleteBatch]
with WithParallelism[ScanDeleteBatch]
with WithWrappedRequest[Scan] {

override type Rq = ScanRequest
override type Rsp = List[BatchWriteItemResponse]

override def withParallelism(parallelism: Int): ScanDeleteBatch = copy(maxParallelDeletes = Some(parallelism))

override def withAttributeNames(an: Map[String, String] => Map[String, String]): ScanDeleteBatch = copy(attributeNames = an(attributeNames))

override def withConsistent(consistentRead: Boolean): ScanDeleteBatch = copy(consistent = consistentRead)

override def withSelect(newSelect: Select): ScanDeleteBatch = copy(select = Some(newSelect))

override def withIndex(index: TableIndex[_, _]): ScanDeleteBatch = copy(index = Some(index.name))

override def withStartKeyMap(startKey: util.Map[String, AttributeValue]): ScanDeleteBatch = copy(startKey = Some(startKey))

override def withTableReference(t: TableReference => TableReference): ScanDeleteBatch = copy(table = t(table))

override def withProjectionExpression(f: Option[String] => Option[String]): ScanDeleteBatch = copy(projectionExpression = f(projectionExpression))

override def withFilterExpression(t: Condition): ScanDeleteBatch = copy(filterExpression = filterExpression && t)

override def withAttributeValues(f: Map[String, AttributeValue] => Map[String, AttributeValue]): ScanDeleteBatch = copy(attributeValues = f(attributeValues))

override def withLimit(l: Int): ScanDeleteBatch = copy(limit = Some(l))

override def toAmz: ScanRequest = wrapped.toAmz

override def wrapped: Scan = {
Scan(
table = table,
index = index,
filterExpression = filterExpression,
attributeValues = attributeValues,
attributeNames = attributeNames,
projectionExpression = projectionExpression,
limit = limit,
select = select,
startKey = startKey,
consistent = consistent,
)
}
}
3 changes: 3 additions & 0 deletions d4s/src/main/scala/d4s/models/table/TableReference.scala
Original file line number Diff line number Diff line change
Expand Up @@ -128,5 +128,8 @@ object TableReference {
QueryDeleteBatch(table).withIndex(index).withKeyField(index.key.hashKey)(hashKey).toQuery
def queryDeleteBatch[H, R](index: TableIndex[H, R], hashKey: H, rangeKey: R): DynamoQuery[QueryDeleteBatch, List[BatchWriteItemResponse]] =
QueryDeleteBatch(table).withIndex(index).withKey(index.key.bind(hashKey, rangeKey)).toQuery

def scanDeleteBatch: DynamoQuery[ScanDeleteBatch, List[BatchWriteItemResponse]] = ScanDeleteBatch(table).toQuery
def scanDeleteBatch(maxParallelDeletes: Int): DynamoQuery[ScanDeleteBatch, List[BatchWriteItemResponse]] = ScanDeleteBatch(table, Some(maxParallelDeletes)).toQuery
}
}

0 comments on commit b22296c

Please sign in to comment.