Skip to content

Commit

Permalink
RUM-6307 Make sure ConsentAwareFileOrchestrator is thread safe
Browse files Browse the repository at this point in the history
  • Loading branch information
mariusc83 committed Oct 8, 2024
1 parent 7ebe827 commit 71ddb60
Show file tree
Hide file tree
Showing 4 changed files with 289 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import com.datadog.android.core.internal.persistence.AbstractStorage
import com.datadog.android.core.internal.persistence.ConsentAwareStorage
import com.datadog.android.core.internal.persistence.NoOpStorage
import com.datadog.android.core.internal.persistence.Storage
import com.datadog.android.core.internal.persistence.ThreadSafeConsentAwareStorage
import com.datadog.android.core.internal.persistence.datastore.DataStoreFileHandler
import com.datadog.android.core.internal.persistence.datastore.DataStoreFileHelper
import com.datadog.android.core.internal.persistence.datastore.DatastoreFileReader
Expand Down Expand Up @@ -174,6 +175,7 @@ internal class SdkFeature(
val contextProvider = coreFeature.contextProvider
if (contextProvider is NoOpContextProvider) return
val context = contextProvider.context
Thread.sleep(200)
storage.writeCurrentBatch(context, forceNewBatch) { callback(context, it) }
}

Expand Down Expand Up @@ -336,7 +338,7 @@ internal class SdkFeature(
)
this.fileOrchestrator = fileOrchestrator

return ConsentAwareStorage(
return ThreadSafeConsentAwareStorage(
executorService = coreFeature.persistenceExecutorService,
grantedOrchestrator = fileOrchestrator.grantedOrchestrator,
pendingOrchestrator = fileOrchestrator.pendingOrchestrator,
Expand All @@ -351,8 +353,27 @@ internal class SdkFeature(
fileMover = FileMover(internalLogger),
internalLogger = internalLogger,
filePersistenceConfig = filePersistenceConfig,
metricsDispatcher = metricsDispatcher
metricsDispatcher = metricsDispatcher,
coreFeature.trackingConsentProvider,
featureName
)
// return ConsentAwareStorage(
// executorService = coreFeature.persistenceExecutorService,
// grantedOrchestrator = fileOrchestrator.grantedOrchestrator,
// pendingOrchestrator = fileOrchestrator.pendingOrchestrator,
// batchEventsReaderWriter = BatchFileReaderWriter.create(
// internalLogger = internalLogger,
// encryption = coreFeature.localDataEncryption
// ),
// batchMetadataReaderWriter = FileReaderWriter.create(
// internalLogger = internalLogger,
// encryption = coreFeature.localDataEncryption
// ),
// fileMover = FileMover(internalLogger),
// internalLogger = internalLogger,
// filePersistenceConfig = filePersistenceConfig,
// metricsDispatcher = metricsDispatcher
// )
}

private fun createUploader(requestFactory: RequestFactory): DataUploader {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
/*
* Unless explicitly stated otherwise all files in this repository are licensed under the Apache License Version 2.0.
* This product includes software developed at Datadog (https://www.datadoghq.com/).
* Copyright 2016-Present Datadog, Inc.
*/

package com.datadog.android.core.internal.persistence

import androidx.annotation.AnyThread
import androidx.annotation.WorkerThread
import com.datadog.android.api.InternalLogger
import com.datadog.android.api.context.DatadogContext
import com.datadog.android.api.storage.EventBatchWriter
import com.datadog.android.core.internal.metrics.MetricsDispatcher
import com.datadog.android.core.internal.metrics.RemovalReason
import com.datadog.android.core.internal.persistence.file.FileMover
import com.datadog.android.core.internal.persistence.file.FileOrchestrator
import com.datadog.android.core.internal.persistence.file.FilePersistenceConfig
import com.datadog.android.core.internal.persistence.file.FileReaderWriter
import com.datadog.android.core.internal.persistence.file.batch.BatchFileReaderWriter
import com.datadog.android.core.internal.persistence.file.existsSafe
import com.datadog.android.core.internal.privacy.ConsentProvider
import com.datadog.android.core.internal.privacy.TrackingConsentProvider
import com.datadog.android.core.internal.utils.submitSafe
import com.datadog.android.core.metrics.MethodCallSamplingRate
import com.datadog.android.core.metrics.TelemetryMetricType
import com.datadog.android.privacy.TrackingConsent
import com.datadog.android.privacy.TrackingConsentProviderCallback
import java.io.File
import java.util.Locale
import java.util.concurrent.ExecutorService

internal class ThreadSafeConsentAwareStorage(
private val executorService: ExecutorService,
internal val grantedOrchestrator: FileOrchestrator,
internal val pendingOrchestrator: FileOrchestrator,
private val batchEventsReaderWriter: BatchFileReaderWriter,
private val batchMetadataReaderWriter: FileReaderWriter,
private val fileMover: FileMover,
private val internalLogger: InternalLogger,
internal val filePersistenceConfig: FilePersistenceConfig,
private val metricsDispatcher: MetricsDispatcher,
private val trackingConsentProvider: ConsentProvider,
private val featureName:String
) : Storage {

/**
* Keeps track of files currently being read.
*/
private val lockedBatches: MutableSet<Batch> = mutableSetOf()

private val writeLock = Any()

/** @inheritdoc */
@WorkerThread
override fun writeCurrentBatch(
datadogContext: DatadogContext,
forceNewBatch: Boolean,
callback: (EventBatchWriter) -> Unit
) {


val metric = internalLogger.startPerformanceMeasure(
callerClass = ThreadSafeConsentAwareStorage::class.java.name,
metric = TelemetryMetricType.MethodCalled,
samplingRate = MethodCallSamplingRate.RARE.rate,
operationName = "writeCurrentBatch[$featureName]"
)

executorService.submitSafe("Data write", internalLogger) {
synchronized(writeLock) {
val orchestrator = resolveOrchestrator()
val batchFile = orchestrator?.getWritableFile(forceNewBatch)
val metadataFile = if (batchFile != null) {
orchestrator.getMetadataFile(batchFile)
} else {
null
}
val writer = if (orchestrator == null || batchFile == null) {
NoOpEventBatchWriter()
} else {
FileEventBatchWriter(
batchFile = batchFile,
metadataFile = metadataFile,
eventsWriter = batchEventsReaderWriter,
metadataReaderWriter = batchMetadataReaderWriter,
filePersistenceConfig = filePersistenceConfig,
internalLogger = internalLogger
)
}
callback.invoke(writer)
metric?.stopAndSend(writer !is NoOpEventBatchWriter)
}
}
}

private fun resolveOrchestrator(): FileOrchestrator? {
return when (trackingConsentProvider.getConsent()) {
TrackingConsent.GRANTED -> grantedOrchestrator
TrackingConsent.PENDING -> pendingOrchestrator
TrackingConsent.NOT_GRANTED -> null
}
}

/** @inheritdoc */
@WorkerThread
override fun readNextBatch(): BatchData? {
val (batchFile, metaFile) = synchronized(lockedBatches) {
val batchFile = grantedOrchestrator
.getReadableFile(lockedBatches.map { it.file }.toSet()) ?: return null

val metaFile = grantedOrchestrator.getMetadataFile(batchFile)
lockedBatches.add(Batch(batchFile, metaFile))
batchFile to metaFile
}

val batchId = BatchId.fromFile(batchFile)
val batchMetadata = if (metaFile == null || !metaFile.existsSafe(internalLogger)) {
null
} else {
batchMetadataReaderWriter.readData(metaFile)
}
val batchData = batchEventsReaderWriter.readData(batchFile)

return BatchData(id = batchId, data = batchData, metadata = batchMetadata)
}

/** @inheritdoc */
@WorkerThread
override fun confirmBatchRead(
batchId: BatchId,
removalReason: RemovalReason,
deleteBatch: Boolean
) {
val batch = synchronized(lockedBatches) {
lockedBatches.firstOrNull { batchId.matchesFile(it.file) }
} ?: return

if (deleteBatch) {
deleteBatch(batch, removalReason)
}
synchronized(lockedBatches) {
lockedBatches.remove(batch)
}
}

/** @inheritdoc */
@AnyThread
override fun dropAll() {
executorService.submitSafe("ConsentAwareStorage.dropAll", internalLogger) {
synchronized(lockedBatches) {
lockedBatches.forEach {
deleteBatch(it, RemovalReason.Flushed)
}
lockedBatches.clear()
}
arrayOf(pendingOrchestrator, grantedOrchestrator).forEach { orchestrator ->
orchestrator.getAllFiles().forEach {
val metaFile = orchestrator.getMetadataFile(it)
deleteBatch(it, metaFile, RemovalReason.Flushed)
}
}
}
}

@WorkerThread
private fun deleteBatch(batch: Batch, reason: RemovalReason) {
deleteBatch(batch.file, batch.metaFile, reason)
}

@WorkerThread
private fun deleteBatch(batchFile: File, metaFile: File?, reason: RemovalReason) {
deleteBatchFile(batchFile, reason)
if (metaFile?.existsSafe(internalLogger) == true) {
deleteBatchMetadataFile(metaFile)
}
}

@WorkerThread
private fun deleteBatchFile(batchFile: File, reason: RemovalReason) {
val result = fileMover.delete(batchFile)
if (result) {
metricsDispatcher.sendBatchDeletedMetric(batchFile, reason)
} else {
internalLogger.log(
InternalLogger.Level.WARN,
InternalLogger.Target.MAINTAINER,
{ WARNING_DELETE_FAILED.format(Locale.US, batchFile.path) }
)
}
}

@WorkerThread
private fun deleteBatchMetadataFile(metadataFile: File) {
val result = fileMover.delete(metadataFile)
if (!result) {
internalLogger.log(
InternalLogger.Level.WARN,
InternalLogger.Target.MAINTAINER,
{ WARNING_DELETE_FAILED.format(Locale.US, metadataFile.path) }
)
}
}


private data class Batch(val file: File, val metaFile: File?)

companion object {
internal const val WARNING_DELETE_FAILED = "Unable to delete file: %s"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ internal open class ConsentAwareFileOrchestrator(
internal val internalLogger: InternalLogger
) : FileOrchestrator, TrackingConsentProviderCallback {

@Volatile
private lateinit var delegateOrchestrator: FileOrchestrator

init {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import org.junit.Before
import org.junit.Rule
import org.junit.Test
import org.junit.runner.RunWith
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit

/**
* Instrumentation tests for the feature scope.
Expand Down Expand Up @@ -156,6 +158,58 @@ class FeatureScopeTest : MockServerTest() {
}.doWait(LONG_WAIT_MS)
}

@Test
fun mustReceiveTheEvents_whenFeatureWrite_trackingConsentPendingToGranted_switched_asynchronously() {
// Given
fakeBatchData = forge.aList(size = forge.anInt(min = 10, max = 20)) {
val fakeEvent: JsonObject = forge.getForgery()
val eventMetadata = forge.anAlphabeticalString()
RawBatchEvent(
fakeEvent.toString().toByteArray(),
eventMetadata.toByteArray()
)
}
trackingConsent = TrackingConsent.PENDING
testedInternalSdkCore = Datadog.initialize(
context = ApplicationProvider.getApplicationContext(),
configuration = fakeConfiguration,
trackingConsent = trackingConsent
) as InternalSdkCore
testedInternalSdkCore.registerFeature(stubFeature)
val featureScope = testedInternalSdkCore.getFeature(fakeFeatureName)
checkNotNull(featureScope)
val countDownLatch = CountDownLatch(2)

// When
Thread {
Thread.sleep(200)
Datadog.setTrackingConsent(TrackingConsent.GRANTED)
countDownLatch.countDown()
}.start()
Thread {
fakeBatchData.forEach { rawBatchEvent ->
featureScope.withWriteContext { _, eventBatchWriter ->
eventBatchWriter.write(
rawBatchEvent,
fakeBatchMetadata,
eventType
)
}
}
countDownLatch.countDown()
}.start()

// Then
countDownLatch.await(TimeUnit.SECONDS.toMillis(10), TimeUnit.MILLISECONDS)
ConditionWatcher {
MockWebServerAssert.assertThat(getMockServerWrapper())
.withConfiguration(fakeConfiguration)
.withTrackingConsent(TrackingConsent.GRANTED)
.receivedData(fakeBatchData, fakeBatchMetadata)
true
}.doWait(LONG_WAIT_MS)
}

@Test
fun mustReceiveTheEvents_whenFeatureWrite_trackingConsentGranted_metadataIsNull() {
// Given
Expand Down

0 comments on commit 71ddb60

Please sign in to comment.