diff --git a/.github/workflows/coding-standards.yml b/.github/workflows/coding-standards.yml index 1ffe40198b..5d46f728ad 100644 --- a/.github/workflows/coding-standards.yml +++ b/.github/workflows/coding-standards.yml @@ -5,6 +5,7 @@ on: pull_request: branches: - "*.x" + - "feature/*" push: jobs: diff --git a/.github/workflows/continuous-integration.yml b/.github/workflows/continuous-integration.yml index 6fe4f72009..7544e111d9 100644 --- a/.github/workflows/continuous-integration.yml +++ b/.github/workflows/continuous-integration.yml @@ -4,6 +4,7 @@ on: pull_request: branches: - "*.x" + - "feature/*" push: env: @@ -21,6 +22,7 @@ jobs: - "8.2" - "8.3" mongodb-version: + - "7.0" - "6.0" - "5.0" - "4.4" @@ -33,24 +35,34 @@ jobs: symfony-version: - "stable" include: + # Test against lowest dependencies - dependencies: "lowest" php-version: "8.1" mongodb-version: "4.4" driver-version: "1.11.0" topology: "server" symfony-version: "stable" - - topology: "sharded_cluster" + # Test with highest dependencies + - topology: "server" + php-version: "8.2" + mongodb-version: "7.0" + driver-version: "stable" + dependencies: "highest" + symfony-version: "7" + # Test with a 4.4 replica set + - topology: "replica_set" php-version: "8.2" mongodb-version: "4.4" driver-version: "stable" dependencies: "highest" symfony-version: "stable" - - topology: "server" + # Test with a 4.4 sharded cluster + - topology: "sharded_cluster" php-version: "8.2" - mongodb-version: "6.0" + mongodb-version: "4.4" driver-version: "stable" dependencies: "highest" - symfony-version: "7" + symfony-version: "stable" steps: - name: "Checkout" diff --git a/.github/workflows/performance.yml b/.github/workflows/performance.yml index 5641a5ba6b..a964c8d9cb 100644 --- a/.github/workflows/performance.yml +++ b/.github/workflows/performance.yml @@ -4,6 +4,7 @@ on: pull_request: branches: - "*.x" + - "feature/*" push: jobs: diff --git a/.github/workflows/static-analysis.yml b/.github/workflows/static-analysis.yml index c0931e8835..decb455a35 100644 --- a/.github/workflows/static-analysis.yml +++ b/.github/workflows/static-analysis.yml @@ -5,6 +5,7 @@ on: pull_request: branches: - "*.x" + - "feature/*" push: jobs: diff --git a/docs/en/cookbook/validation-of-documents.rst b/docs/en/cookbook/validation-of-documents.rst index cc25ed603a..72f89736f2 100644 --- a/docs/en/cookbook/validation-of-documents.rst +++ b/docs/en/cookbook/validation-of-documents.rst @@ -86,7 +86,7 @@ Now validation is performed whenever you call ``DocumentManager#persist($order)`` or when you call ``DocumentManager#flush()`` and an order is about to be updated. Any Exception that happens in the lifecycle callbacks will be cached by -the DocumentManager and the current transaction is rolled back. +the DocumentManager. Of course you can do any type of primitive checks, not null, email-validation, string size, integer and date ranges in your diff --git a/docs/en/reference/architecture.rst b/docs/en/reference/architecture.rst index dc9df0f140..9ff13338ce 100644 --- a/docs/en/reference/architecture.rst +++ b/docs/en/reference/architecture.rst @@ -56,7 +56,7 @@ A document instance can be characterized as being NEW, MANAGED, DETACHED or REMO DocumentManager and a UnitOfWork. - A REMOVED document instance is an instance with a persistent identity, associated with a DocumentManager, that will be removed - from the database upon transaction commit. + from the database upon UnitOfWork commit. Persistent fields ~~~~~~~~~~~~~~~~~ @@ -103,7 +103,7 @@ persistent objects. Transactional write-behind ~~~~~~~~~~~~~~~~~~~~~~~~~~ -An ``DocumentManager`` and the underlying ``UnitOfWork`` employ a +The ``DocumentManager`` and the underlying ``UnitOfWork`` employ a strategy called "transactional write-behind" that delays the execution of query statements in order to execute them in the most efficient way and to execute them at the end of a transaction so diff --git a/docs/en/reference/events.rst b/docs/en/reference/events.rst index 1e12a2fe0a..679e1481e2 100644 --- a/docs/en/reference/events.rst +++ b/docs/en/reference/events.rst @@ -42,7 +42,7 @@ Now we can add some event listeners to the ``$evm``. Let's create a $evm->addEventListener([self::preFoo, self::postFoo], $this); } - public function preFoo(EventArgs $e): void + public function preFoo(EventArgs $e): void { $this->preFooInvoked = true; } @@ -345,6 +345,38 @@ follow this restrictions very carefully since operations in the wrong event may produce lots of different errors, such as inconsistent data and lost updates/persists/removes. +Handling Transactional Flushes +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +When a flush operation is executed in a transaction, all queries inside a lifecycle event listener also have to make use +of the session used during the flush operation. This session object is exposed through the ``LifecycleEventArgs`` +parameter passed to the listener. Passing the session to queries ensures that the query will become part of the +transaction and will see data that has not been committed yet. + +.. code-block:: php + + isInTransaction()) { + // Do something + } + + // Pass the session to any query you execute + $eventArgs->getDocumentManager()->createQueryBuilder(User::class) + // Query logic + ->getQuery(['session' => $eventArgs->session]) + ->execute(); + } + +.. note:: + + Event listeners are only called during the first transaction attempt. If the transaction is retried, event listeners + will not be invoked again. Make sure to run any persistence logic through the UnitOfWork instead of modifying data + directly through queries run in an event listener. + prePersist ~~~~~~~~~~ @@ -693,8 +725,8 @@ Define the ``EventTest`` class with a ``postCollectionLoad()`` method: } } -Load ClassMetadata Event ------------------------- +loadClassMetadata +~~~~~~~~~~~~~~~~~ When the mapping information for a document is read, it is populated in to a ``ClassMetadata`` instance. You can hook in to diff --git a/docs/en/reference/transactions-and-concurrency.rst b/docs/en/reference/transactions-and-concurrency.rst index 17fc0a5af4..dee35e9571 100644 --- a/docs/en/reference/transactions-and-concurrency.rst +++ b/docs/en/reference/transactions-and-concurrency.rst @@ -9,26 +9,78 @@ Transactions As per the `documentation `_, MongoDB write operations are "atomic on the level of a single document". -Even when updating multiple documents within a single write operation, -though the modification of each document is atomic, -the operation as a whole is not and other operations may interleave. +Even when updating multiple documents within a single write operation, though the modification of each document is +atomic, the operation as a whole is not and other operations may interleave. -As stated in the `FAQ `_, -"MongoDB does not support multi-document transactions" and neither does Doctrine MongoDB ODM. +Transaction support +~~~~~~~~~~~~~~~~~~~ + +MongoDB supports multi-document transactions on replica sets (starting in MongoDB 4.2) and sharded clusters (MongoDB +4.4). Standalone topologies do not support multi-document transactions. + +Transaction Support in Doctrine MongoDB ODM +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +.. note:: + Transaction support in MongoDB ODM was introduced in version 2.7. + +You can instruct the ODM to use transactions when writing changes to the databases by enabling the +``useTransactionalFlush`` setting in your configuration: + +.. code-block:: php + + $config = new Configuration(); + $config->setUseTransactionalFlush(true); + // Other configuration + + $dm = DocumentManager::create(null, $config); + +From then onwards, any call to ``DocumentManager::flush`` will start a transaction, apply the write operations, then +commit the transaction. + +To enable or disable transaction usage for a single flush operation, use the ``withTransaction`` write option when +calling ``DocumentManager::flush``: + +.. code-block:: php + + // To explicitly enable transaction for this write + $dm->flush(['withTransaction' => true]); + + // To disable transaction usage for a write, regardless of the ``useTransactionalFlush`` config: + $dm->flush(['withTransaction' => false]); + +.. note:: + + Please note that transactions are only used for write operations executed during the ``flush`` operation. For any + other operations, e.g. manually executed queries or aggregation pipelines, transactions will not be used and you + will have to rely on the MongoDB driver's transaction mechanism. + +Lifecycle Events and Transactions +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +When using transactional flushes, either through the configuration or explicitly, there are a couple of important things +to note regarding lifecycle events. Due to the way MongoDB transactions work, it is possible that ODM attempts write +operations multiple times. However, to preserve the expectation that lifecycle events are only triggered once per flush +operation, lifecycle events will not be dispatched when the transaction is retried. This maintains current functionality +when a lifecycle event modifies the unit of work, as this change is automatically carried over when the transaction is +retried. -Limitation -~~~~~~~~~~ -At the moment, Doctrine MongoDB ODM does not provide any native strategy to emulate multi-document transactions. +Lifecycle events now expose a ``MongoDB\Driver\Session`` object which needs to be used if it is set. Since MongoDB +transactions are not tied to the connection but only to a session, any command that should be part of the transaction +needs to be told about the session to be used. This does not only apply to write commands, but also to read commands +that need to see the transaction state. If a session is given in a lifecycle event, this session should always be used +regardless of whether a transaction is active or not. -Workaround -~~~~~~~~~~ -To work around this limitation, one can utilize `two phase commits `_. -Concurrency ------------ +Other Concurrency Controls +-------------------------- -Doctrine MongoDB ODM offers native support for pessimistic and optimistic locking strategies. -This allows for very fine-grained control over what kind of locking is required for documents in your application. +Multi-Document transactions provide certain guarantees regarding your database writes and prevent two simultaneous write +operations from interfering with each other. Depending on your use case, this is not enough, as the transactional +guarantee will only apply once you start writing to the database as part of the ``DocumentManager::flush()`` call. This +could still lead to data loss if you replace data that was written to the database by a different process in between you +reading the data and starting the transaction. To solve this problem, optimistic and pessimistic locking strategies can +be used, allowing for fine-grained control over what kind of locking is required for documents in your application. .. _transactions_and_concurrency_optimistic_locking: diff --git a/lib/Doctrine/ODM/MongoDB/Configuration.php b/lib/Doctrine/ODM/MongoDB/Configuration.php index 7524f6e65e..aef7fa05b7 100644 --- a/lib/Doctrine/ODM/MongoDB/Configuration.php +++ b/lib/Doctrine/ODM/MongoDB/Configuration.php @@ -117,6 +117,8 @@ class Configuration private int $autoGenerateProxyClasses = self::AUTOGENERATE_EVAL; + private bool $useTransactionalFlush = false; + public function __construct() { $this->proxyManagerConfiguration = new ProxyManagerConfiguration(); @@ -596,6 +598,16 @@ public function getProxyManagerConfiguration(): ProxyManagerConfiguration { return $this->proxyManagerConfiguration; } + + public function setUseTransactionalFlush(bool $useTransactionalFlush): void + { + $this->useTransactionalFlush = $useTransactionalFlush; + } + + public function isTransactionalFlushEnabled(): bool + { + return $this->useTransactionalFlush; + } } interface_exists(MappingDriver::class); diff --git a/lib/Doctrine/ODM/MongoDB/Event/LifecycleEventArgs.php b/lib/Doctrine/ODM/MongoDB/Event/LifecycleEventArgs.php index 2d2866ca84..6184c0a4fb 100644 --- a/lib/Doctrine/ODM/MongoDB/Event/LifecycleEventArgs.php +++ b/lib/Doctrine/ODM/MongoDB/Event/LifecycleEventArgs.php @@ -6,6 +6,8 @@ use Doctrine\ODM\MongoDB\DocumentManager; use Doctrine\Persistence\Event\LifecycleEventArgs as BaseLifecycleEventArgs; +use Doctrine\Persistence\ObjectManager; +use MongoDB\Driver\Session; /** * Lifecycle Events are triggered by the UnitOfWork during lifecycle transitions @@ -15,6 +17,14 @@ */ class LifecycleEventArgs extends BaseLifecycleEventArgs { + public function __construct( + object $object, + ObjectManager $objectManager, + public readonly ?Session $session = null, + ) { + parent::__construct($object, $objectManager); + } + public function getDocument(): object { return $this->getObject(); @@ -24,4 +34,9 @@ public function getDocumentManager(): DocumentManager { return $this->getObjectManager(); } + + public function isInTransaction(): bool + { + return $this->session?->isInTransaction() ?? false; + } } diff --git a/lib/Doctrine/ODM/MongoDB/Event/PreLoadEventArgs.php b/lib/Doctrine/ODM/MongoDB/Event/PreLoadEventArgs.php index 39db5cd0a9..a30f705f29 100644 --- a/lib/Doctrine/ODM/MongoDB/Event/PreLoadEventArgs.php +++ b/lib/Doctrine/ODM/MongoDB/Event/PreLoadEventArgs.php @@ -5,21 +5,21 @@ namespace Doctrine\ODM\MongoDB\Event; use Doctrine\ODM\MongoDB\DocumentManager; +use MongoDB\Driver\Session; /** * Class that holds event arguments for a preLoad event. */ final class PreLoadEventArgs extends LifecycleEventArgs { - /** @var array */ - private array $data; - /** @param array $data */ - public function __construct(object $document, DocumentManager $dm, array &$data) - { - parent::__construct($document, $dm); - - $this->data =& $data; + public function __construct( + object $document, + DocumentManager $dm, + private array &$data, + ?Session $session = null, + ) { + parent::__construct($document, $dm, $session); } /** diff --git a/lib/Doctrine/ODM/MongoDB/Event/PreUpdateEventArgs.php b/lib/Doctrine/ODM/MongoDB/Event/PreUpdateEventArgs.php index a2385cb757..f01dff1a56 100644 --- a/lib/Doctrine/ODM/MongoDB/Event/PreUpdateEventArgs.php +++ b/lib/Doctrine/ODM/MongoDB/Event/PreUpdateEventArgs.php @@ -7,6 +7,7 @@ use Doctrine\ODM\MongoDB\DocumentManager; use Doctrine\ODM\MongoDB\UnitOfWork; use InvalidArgumentException; +use MongoDB\Driver\Session; use function get_class; use function sprintf; @@ -18,26 +19,27 @@ */ final class PreUpdateEventArgs extends LifecycleEventArgs { - /** @psalm-var array */ - private array $documentChangeSet; - /** @psalm-param array $changeSet */ - public function __construct(object $document, DocumentManager $dm, array $changeSet) - { - parent::__construct($document, $dm); - - $this->documentChangeSet = $changeSet; + public function __construct( + object $document, + DocumentManager $dm, + private array $changeSet, + ?Session $session = null, + ) { + parent::__construct($document, $dm, $session); + + $this->changeSet = $changeSet; } /** @return array */ public function getDocumentChangeSet(): array { - return $this->documentChangeSet; + return $this->changeSet; } public function hasChangedField(string $field): bool { - return isset($this->documentChangeSet[$field]); + return isset($this->changeSet[$field]); } /** @@ -49,7 +51,7 @@ public function getOldValue(string $field) { $this->assertValidField($field); - return $this->documentChangeSet[$field][0]; + return $this->changeSet[$field][0]; } /** @@ -61,7 +63,7 @@ public function getNewValue(string $field) { $this->assertValidField($field); - return $this->documentChangeSet[$field][1]; + return $this->changeSet[$field][1]; } /** @@ -73,8 +75,8 @@ public function setNewValue(string $field, $value): void { $this->assertValidField($field); - $this->documentChangeSet[$field][1] = $value; - $this->getDocumentManager()->getUnitOfWork()->setDocumentChangeSet($this->getDocument(), $this->documentChangeSet); + $this->changeSet[$field][1] = $value; + $this->getDocumentManager()->getUnitOfWork()->setDocumentChangeSet($this->getDocument(), $this->changeSet); } /** @@ -84,7 +86,7 @@ public function setNewValue(string $field, $value): void */ private function assertValidField(string $field): void { - if (! isset($this->documentChangeSet[$field])) { + if (! isset($this->changeSet[$field])) { throw new InvalidArgumentException(sprintf( 'Field "%s" is not a valid field of the document "%s" in PreUpdateEventArgs.', $field, diff --git a/lib/Doctrine/ODM/MongoDB/MongoDBException.php b/lib/Doctrine/ODM/MongoDB/MongoDBException.php index 620bcc86d8..a9d14a140e 100644 --- a/lib/Doctrine/ODM/MongoDB/MongoDBException.php +++ b/lib/Doctrine/ODM/MongoDB/MongoDBException.php @@ -155,4 +155,9 @@ public static function cannotCreateRepository(string $className): self { return new self(sprintf('Cannot create repository for class "%s".', $className)); } + + public static function transactionalSessionMismatch(): self + { + return new self('The transactional operation cannot be executed because it was started in a different session.'); + } } diff --git a/lib/Doctrine/ODM/MongoDB/Persisters/DocumentPersister.php b/lib/Doctrine/ODM/MongoDB/Persisters/DocumentPersister.php index 373207d91f..2a3a767310 100644 --- a/lib/Doctrine/ODM/MongoDB/Persisters/DocumentPersister.php +++ b/lib/Doctrine/ODM/MongoDB/Persisters/DocumentPersister.php @@ -35,6 +35,7 @@ use MongoDB\Driver\CursorInterface; use MongoDB\Driver\Exception\Exception as DriverException; use MongoDB\Driver\Exception\WriteException; +use MongoDB\Driver\Session; use MongoDB\Driver\WriteConcern; use MongoDB\GridFS\Bucket; use stdClass; @@ -73,7 +74,14 @@ * * @template T of object * - * @psalm-import-type CommitOptions from UnitOfWork + * @psalm-type CommitOptions array{ + * fsync?: bool, + * safe?: int, + * session?: ?Session, + * w?: int, + * withTransaction?: bool, + * writeConcern?: WriteConcern + * } * @psalm-import-type Hints from UnitOfWork * @psalm-import-type FieldMapping from ClassMetadata * @psalm-import-type SortMeta from Sort @@ -1580,7 +1588,23 @@ private function getWriteOptions(array $options = []): array unset($writeOptions['w']); } - return $writeOptions; + return $this->isInTransaction($options) + ? $this->uow->stripTransactionOptions($writeOptions) + : $writeOptions; + } + + private function isInTransaction(array $options): bool + { + if (! isset($options['session'])) { + return false; + } + + $session = $options['session']; + if (! $session instanceof Session) { + return false; + } + + return $session->isInTransaction(); } /** diff --git a/lib/Doctrine/ODM/MongoDB/UnitOfWork.php b/lib/Doctrine/ODM/MongoDB/UnitOfWork.php index e29b883a00..b25ea58eb0 100644 --- a/lib/Doctrine/ODM/MongoDB/UnitOfWork.php +++ b/lib/Doctrine/ODM/MongoDB/UnitOfWork.php @@ -25,14 +25,21 @@ use Doctrine\Persistence\PropertyChangedListener; use InvalidArgumentException; use MongoDB\BSON\UTCDateTime; +use MongoDB\Driver\Exception\RuntimeException; +use MongoDB\Driver\Session; use MongoDB\Driver\WriteConcern; use ProxyManager\Proxy\GhostObjectInterface; use ReflectionProperty; +use Throwable; use UnexpectedValueException; +use function array_diff_key; use function array_filter; +use function array_intersect_key; use function array_key_exists; +use function array_merge; use function assert; +use function call_user_func; use function count; use function get_class; use function in_array; @@ -61,6 +68,7 @@ * fsync?: bool, * safe?: int, * w?: int, + * withTransaction?: bool, * writeConcern?: WriteConcern * } */ @@ -92,6 +100,12 @@ final class UnitOfWork implements PropertyChangedListener /** @internal */ public const DEPRECATED_WRITE_OPTIONS = ['fsync', 'safe', 'w']; + private const TRANSACTION_OPTIONS = [ + 'maxCommitTimeMS' => 1, + 'readConcern' => 1, + 'readPreference' => 1, + 'writeConcern' => 1, + ]; /** * The identity map holds references to all managed documents. @@ -162,42 +176,42 @@ final class UnitOfWork implements PropertyChangedListener * * @var array */ - private array $documentInsertions = []; + private array $scheduledDocumentInsertions = []; /** * A list of all pending document updates. * * @var array */ - private array $documentUpdates = []; + private array $scheduledDocumentUpdates = []; /** * A list of all pending document upserts. * * @var array */ - private array $documentUpserts = []; + private array $scheduledDocumentUpserts = []; /** * A list of all pending document deletions. * * @var array */ - private array $documentDeletions = []; + private array $scheduledDocumentDeletions = []; /** * All pending collection deletions. * * @psalm-var array> */ - private array $collectionDeletions = []; + private array $scheduledCollectionDeletions = []; /** * All pending collection updates. * * @psalm-var array> */ - private array $collectionUpdates = []; + private array $scheduledCollectionUpdates = []; /** * A list of documents related to collections scheduled for update or deletion @@ -418,12 +432,12 @@ public function commit(array $options = []): void $this->computeChangeSets(); if ( - ! ($this->documentInsertions || - $this->documentUpserts || - $this->documentDeletions || - $this->documentUpdates || - $this->collectionUpdates || - $this->collectionDeletions || + ! ($this->scheduledDocumentInsertions || + $this->scheduledDocumentUpserts || + $this->scheduledDocumentDeletions || + $this->scheduledDocumentUpdates || + $this->scheduledCollectionUpdates || + $this->scheduledCollectionDeletions || $this->orphanRemovals) ) { return; // Nothing to do. @@ -441,46 +455,42 @@ public function commit(array $options = []): void } } - // Raise onFlush $this->evm->dispatchEvent(Events::onFlush, new Event\OnFlushEventArgs($this->dm)); - foreach ($this->getClassesForCommitAction($this->documentUpserts) as $classAndDocuments) { - [$class, $documents] = $classAndDocuments; - $this->executeUpserts($class, $documents, $options); - } + if ($this->useTransaction($options)) { + $session = $this->dm->getClient()->startSession(); - foreach ($this->getClassesForCommitAction($this->documentInsertions) as $classAndDocuments) { - [$class, $documents] = $classAndDocuments; - $this->executeInserts($class, $documents, $options); - } + $this->lifecycleEventManager->enableTransactionalMode($session); - foreach ($this->getClassesForCommitAction($this->documentUpdates) as $classAndDocuments) { - [$class, $documents] = $classAndDocuments; - $this->executeUpdates($class, $documents, $options); - } - - foreach ($this->getClassesForCommitAction($this->documentDeletions, true) as $classAndDocuments) { - [$class, $documents] = $classAndDocuments; - $this->executeDeletions($class, $documents, $options); + $this->withTransaction( + $session, + function (Session $session) use ($options): void { + $this->doCommit(['session' => $session] + $this->stripTransactionOptions($options)); + }, + $this->getTransactionOptions($options), + ); + } else { + $this->doCommit($options); } // Raise postFlush $this->evm->dispatchEvent(Events::postFlush, new Event\PostFlushEventArgs($this->dm)); // Clear up - $this->documentInsertions = - $this->documentUpserts = - $this->documentUpdates = - $this->documentDeletions = - $this->documentChangeSets = - $this->collectionUpdates = - $this->collectionDeletions = - $this->visitedCollections = - $this->scheduledForSynchronization = - $this->orphanRemovals = - $this->hasScheduledCollections = []; + $this->scheduledDocumentInsertions = + $this->scheduledDocumentUpserts = + $this->scheduledDocumentUpdates = + $this->scheduledDocumentDeletions = + $this->documentChangeSets = + $this->scheduledCollectionUpdates = + $this->scheduledCollectionDeletions = + $this->visitedCollections = + $this->scheduledForSynchronization = + $this->orphanRemovals = + $this->hasScheduledCollections = []; } finally { $this->commitsInProgress--; + $this->lifecycleEventManager->clearTransactionalState(); } } @@ -537,7 +547,7 @@ private function getClassesForCommitAction(array $documents, bool $includeEmbedd */ private function computeScheduleInsertsChangeSets(): void { - foreach ($this->documentInsertions as $document) { + foreach ($this->scheduledDocumentInsertions as $document) { $class = $this->dm->getClassMetadata($document::class); if ($class->isEmbeddedDocument || $class->isView()) { continue; @@ -554,7 +564,7 @@ private function computeScheduleInsertsChangeSets(): void */ private function computeScheduleUpsertsChangeSets(): void { - foreach ($this->documentUpserts as $document) { + foreach ($this->scheduledDocumentUpserts as $document) { $class = $this->dm->getClassMetadata($document::class); if ($class->isEmbeddedDocument || $class->isView()) { continue; @@ -647,7 +657,7 @@ public function getDocumentActualData(object $document): array * entry is the new value of the property. Changesets are used by persisters * to INSERT/UPDATE the persistent document state. * - * {@link documentUpdates} + * {@link scheduledDocumentUpdates} * If the document is already fully MANAGED (has been fetched from the database before) * and any changes to its properties are detected, then a reference to the document is stored * there to mark it for an update. @@ -930,9 +940,9 @@ public function computeChangeSets(): void // Only MANAGED documents that are NOT SCHEDULED FOR INSERTION, UPSERT OR DELETION are processed here. $oid = spl_object_hash($document); if ( - isset($this->documentInsertions[$oid]) - || isset($this->documentUpserts[$oid]) - || isset($this->documentDeletions[$oid]) + isset($this->scheduledDocumentInsertions[$oid]) + || isset($this->scheduledDocumentUpserts[$oid]) + || isset($this->scheduledDocumentDeletions[$oid]) || ! isset($this->documentStates[$oid]) ) { continue; @@ -953,7 +963,7 @@ public function computeChangeSets(): void */ private function computeAssociationChanges(object $parentDocument, array $assoc, $value): void { - $isNewParentDocument = isset($this->documentInsertions[spl_object_hash($parentDocument)]); + $isNewParentDocument = isset($this->scheduledDocumentInsertions[spl_object_hash($parentDocument)]); $class = $this->dm->getClassMetadata($parentDocument::class); $topOrExistingDocument = ( ! $isNewParentDocument || ! $class->isEmbeddedDocument); @@ -1161,15 +1171,14 @@ private function executeInserts(ClassMetadata $class, array $documents, array $o { $persister = $this->getDocumentPersister($class->name); - foreach ($documents as $oid => $document) { + foreach ($documents as $document) { $persister->addInsert($document); - unset($this->documentInsertions[$oid]); } $persister->executeInserts($options); foreach ($documents as $document) { - $this->lifecycleEventManager->postPersist($class, $document); + $this->lifecycleEventManager->postPersist($class, $document, $options['session'] ?? null); } } @@ -1186,15 +1195,14 @@ private function executeUpserts(ClassMetadata $class, array $documents, array $o { $persister = $this->getDocumentPersister($class->name); - foreach ($documents as $oid => $document) { + foreach ($documents as $document) { $persister->addUpsert($document); - unset($this->documentUpserts[$oid]); } $persister->executeUpserts($options); foreach ($documents as $document) { - $this->lifecycleEventManager->postPersist($class, $document); + $this->lifecycleEventManager->postPersist($class, $document, $options['session'] ?? null); } } @@ -1217,15 +1225,13 @@ private function executeUpdates(ClassMetadata $class, array $documents, array $o $persister = $this->getDocumentPersister($className); foreach ($documents as $oid => $document) { - $this->lifecycleEventManager->preUpdate($class, $document); + $this->lifecycleEventManager->preUpdate($class, $document, $options['session'] ?? null); if (! empty($this->documentChangeSets[$oid]) || $this->hasScheduledCollections($document)) { $persister->update($document, $options); } - unset($this->documentUpdates[$oid]); - - $this->lifecycleEventManager->postUpdate($class, $document); + $this->lifecycleEventManager->postUpdate($class, $document, $options['session'] ?? null); } } @@ -1248,7 +1254,6 @@ private function executeDeletions(ClassMetadata $class, array $documents, array } unset( - $this->documentDeletions[$oid], $this->documentIdentifiers[$oid], $this->originalDocumentData[$oid], ); @@ -1268,11 +1273,7 @@ private function executeDeletions(ClassMetadata $class, array $documents, array $value->clearSnapshot(); } - // Document with this $oid after deletion treated as NEW, even if the $oid - // is obtained by a new document because the old one went out of scope. - $this->documentStates[$oid] = self::STATE_NEW; - - $this->lifecycleEventManager->postRemove($class, $document); + $this->lifecycleEventManager->postRemove($class, $document, $options['session'] ?? null); } } @@ -1294,19 +1295,19 @@ public function scheduleForInsert(ClassMetadata $class, object $document): void { $oid = spl_object_hash($document); - if (isset($this->documentUpdates[$oid])) { + if (isset($this->scheduledDocumentUpdates[$oid])) { throw new InvalidArgumentException('Dirty document can not be scheduled for insertion.'); } - if (isset($this->documentDeletions[$oid])) { + if (isset($this->scheduledDocumentDeletions[$oid])) { throw new InvalidArgumentException('Removed document can not be scheduled for insertion.'); } - if (isset($this->documentInsertions[$oid])) { + if (isset($this->scheduledDocumentInsertions[$oid])) { throw new InvalidArgumentException('Document can not be scheduled for insertion twice.'); } - $this->documentInsertions[$oid] = $document; + $this->scheduledDocumentInsertions[$oid] = $document; if (! isset($this->documentIdentifiers[$oid])) { return; @@ -1336,20 +1337,20 @@ public function scheduleForUpsert(ClassMetadata $class, object $document): void throw new InvalidArgumentException('Embedded document can not be scheduled for upsert.'); } - if (isset($this->documentUpdates[$oid])) { + if (isset($this->scheduledDocumentUpdates[$oid])) { throw new InvalidArgumentException('Dirty document can not be scheduled for upsert.'); } - if (isset($this->documentDeletions[$oid])) { + if (isset($this->scheduledDocumentDeletions[$oid])) { throw new InvalidArgumentException('Removed document can not be scheduled for upsert.'); } - if (isset($this->documentUpserts[$oid])) { + if (isset($this->scheduledDocumentUpserts[$oid])) { throw new InvalidArgumentException('Document can not be scheduled for upsert twice.'); } - $this->documentUpserts[$oid] = $document; - $this->documentIdentifiers[$oid] = $class->getIdentifierValue($document); + $this->scheduledDocumentUpserts[$oid] = $document; + $this->documentIdentifiers[$oid] = $class->getIdentifierValue($document); $this->addToIdentityMap($document); } @@ -1358,7 +1359,7 @@ public function scheduleForUpsert(ClassMetadata $class, object $document): void */ public function isScheduledForInsert(object $document): bool { - return isset($this->documentInsertions[spl_object_hash($document)]); + return isset($this->scheduledDocumentInsertions[spl_object_hash($document)]); } /** @@ -1366,7 +1367,7 @@ public function isScheduledForInsert(object $document): bool */ public function isScheduledForUpsert(object $document): bool { - return isset($this->documentUpserts[spl_object_hash($document)]); + return isset($this->scheduledDocumentUpserts[spl_object_hash($document)]); } /** @@ -1383,19 +1384,19 @@ public function scheduleForUpdate(object $document): void throw new InvalidArgumentException('Document has no identity.'); } - if (isset($this->documentDeletions[$oid])) { + if (isset($this->scheduledDocumentDeletions[$oid])) { throw new InvalidArgumentException('Document is removed.'); } if ( - isset($this->documentUpdates[$oid]) - || isset($this->documentInsertions[$oid]) - || isset($this->documentUpserts[$oid]) + isset($this->scheduledDocumentUpdates[$oid]) + || isset($this->scheduledDocumentInsertions[$oid]) + || isset($this->scheduledDocumentUpserts[$oid]) ) { return; } - $this->documentUpdates[$oid] = $document; + $this->scheduledDocumentUpdates[$oid] = $document; } /** @@ -1405,7 +1406,7 @@ public function scheduleForUpdate(object $document): void */ public function isScheduledForUpdate(object $document): bool { - return isset($this->documentUpdates[spl_object_hash($document)]); + return isset($this->scheduledDocumentUpdates[spl_object_hash($document)]); } /** @@ -1427,12 +1428,12 @@ public function scheduleForDelete(object $document, bool $isView = false): void { $oid = spl_object_hash($document); - if (isset($this->documentInsertions[$oid])) { + if (isset($this->scheduledDocumentInsertions[$oid])) { if ($this->isInIdentityMap($document)) { $this->removeFromIdentityMap($document); } - unset($this->documentInsertions[$oid]); + unset($this->scheduledDocumentInsertions[$oid]); return; // document has not been persisted yet, so nothing more to do. } @@ -1444,15 +1445,15 @@ public function scheduleForDelete(object $document, bool $isView = false): void $this->removeFromIdentityMap($document); $this->documentStates[$oid] = self::STATE_REMOVED; - if (isset($this->documentUpdates[$oid])) { - unset($this->documentUpdates[$oid]); + if (isset($this->scheduledDocumentUpdates[$oid])) { + unset($this->scheduledDocumentUpdates[$oid]); } - if (isset($this->documentUpserts[$oid])) { - unset($this->documentUpserts[$oid]); + if (isset($this->scheduledDocumentUpserts[$oid])) { + unset($this->scheduledDocumentUpserts[$oid]); } - if (isset($this->documentDeletions[$oid])) { + if (isset($this->scheduledDocumentDeletions[$oid])) { return; } @@ -1460,7 +1461,7 @@ public function scheduleForDelete(object $document, bool $isView = false): void return; } - $this->documentDeletions[$oid] = $document; + $this->scheduledDocumentDeletions[$oid] = $document; } /** @@ -1469,7 +1470,7 @@ public function scheduleForDelete(object $document, bool $isView = false): void */ public function isScheduledForDelete(object $document): bool { - return isset($this->documentDeletions[spl_object_hash($document)]); + return isset($this->scheduledDocumentDeletions[spl_object_hash($document)]); } /** @@ -1481,10 +1482,10 @@ public function isDocumentScheduled(object $document): bool { $oid = spl_object_hash($document); - return isset($this->documentInsertions[$oid]) || - isset($this->documentUpserts[$oid]) || - isset($this->documentUpdates[$oid]) || - isset($this->documentDeletions[$oid]); + return isset($this->scheduledDocumentInsertions[$oid]) || + isset($this->scheduledDocumentUpserts[$oid]) || + isset($this->scheduledDocumentUpdates[$oid]) || + isset($this->scheduledDocumentDeletions[$oid]); } /** @@ -1781,7 +1782,7 @@ private function doPersist(object $document, array &$visited): void case self::STATE_REMOVED: // Document becomes managed again - unset($this->documentDeletions[$oid]); + unset($this->scheduledDocumentDeletions[$oid]); $this->documentStates[$oid] = self::STATE_MANAGED; break; @@ -2094,14 +2095,14 @@ private function doDetach(object $document, array &$visited): void case self::STATE_MANAGED: $this->removeFromIdentityMap($document); unset( - $this->documentInsertions[$oid], - $this->documentUpdates[$oid], - $this->documentDeletions[$oid], + $this->scheduledDocumentInsertions[$oid], + $this->scheduledDocumentUpdates[$oid], + $this->scheduledDocumentDeletions[$oid], $this->documentIdentifiers[$oid], $this->documentStates[$oid], $this->originalDocumentData[$oid], $this->parentAssociations[$oid], - $this->documentUpserts[$oid], + $this->scheduledDocumentUpserts[$oid], $this->hasScheduledCollections[$oid], $this->embeddedDocumentsRegistry[$oid], ); @@ -2392,22 +2393,22 @@ public function unlock(object $document): void public function clear(?string $documentName = null): void { if ($documentName === null) { - $this->identityMap = - $this->documentIdentifiers = - $this->originalDocumentData = - $this->documentChangeSets = - $this->documentStates = - $this->scheduledForSynchronization = - $this->documentInsertions = - $this->documentUpserts = - $this->documentUpdates = - $this->documentDeletions = - $this->collectionUpdates = - $this->collectionDeletions = - $this->parentAssociations = - $this->embeddedDocumentsRegistry = - $this->orphanRemovals = - $this->hasScheduledCollections = []; + $this->identityMap = + $this->documentIdentifiers = + $this->originalDocumentData = + $this->documentChangeSets = + $this->documentStates = + $this->scheduledForSynchronization = + $this->scheduledDocumentInsertions = + $this->scheduledDocumentUpserts = + $this->scheduledDocumentUpdates = + $this->scheduledDocumentDeletions = + $this->scheduledCollectionUpdates = + $this->scheduledCollectionDeletions = + $this->parentAssociations = + $this->embeddedDocumentsRegistry = + $this->orphanRemovals = + $this->hasScheduledCollections = []; $event = new Event\OnClearEventArgs($this->dm); } else { @@ -2497,12 +2498,12 @@ private function fixPersistentCollectionOwnership(PersistentCollectionInterface public function scheduleCollectionDeletion(PersistentCollectionInterface $coll): void { $oid = spl_object_hash($coll); - unset($this->collectionUpdates[$oid]); - if (isset($this->collectionDeletions[$oid])) { + unset($this->scheduledCollectionUpdates[$oid]); + if (isset($this->scheduledCollectionDeletions[$oid])) { return; } - $this->collectionDeletions[$oid] = $coll; + $this->scheduledCollectionDeletions[$oid] = $coll; $this->scheduleCollectionOwner($coll); } @@ -2515,7 +2516,7 @@ public function scheduleCollectionDeletion(PersistentCollectionInterface $coll): */ public function isCollectionScheduledForDeletion(PersistentCollectionInterface $coll): bool { - return isset($this->collectionDeletions[spl_object_hash($coll)]); + return isset($this->scheduledCollectionDeletions[spl_object_hash($coll)]); } /** @@ -2532,12 +2533,12 @@ public function unscheduleCollectionDeletion(PersistentCollectionInterface $coll } $oid = spl_object_hash($coll); - if (! isset($this->collectionDeletions[$oid])) { + if (! isset($this->scheduledCollectionDeletions[$oid])) { return; } $topmostOwner = $this->getOwningDocument($coll->getOwner()); - unset($this->collectionDeletions[$oid]); + unset($this->scheduledCollectionDeletions[$oid]); unset($this->hasScheduledCollections[spl_object_hash($topmostOwner)][$oid]); } @@ -2559,11 +2560,11 @@ public function scheduleCollectionUpdate(PersistentCollectionInterface $coll): v } $oid = spl_object_hash($coll); - if (isset($this->collectionUpdates[$oid])) { + if (isset($this->scheduledCollectionUpdates[$oid])) { return; } - $this->collectionUpdates[$oid] = $coll; + $this->scheduledCollectionUpdates[$oid] = $coll; $this->scheduleCollectionOwner($coll); } @@ -2581,12 +2582,12 @@ public function unscheduleCollectionUpdate(PersistentCollectionInterface $coll): } $oid = spl_object_hash($coll); - if (! isset($this->collectionUpdates[$oid])) { + if (! isset($this->scheduledCollectionUpdates[$oid])) { return; } $topmostOwner = $this->getOwningDocument($coll->getOwner()); - unset($this->collectionUpdates[$oid]); + unset($this->scheduledCollectionUpdates[$oid]); unset($this->hasScheduledCollections[spl_object_hash($topmostOwner)][$oid]); } @@ -2599,7 +2600,7 @@ public function unscheduleCollectionUpdate(PersistentCollectionInterface $coll): */ public function isCollectionScheduledForUpdate(PersistentCollectionInterface $coll): bool { - return isset($this->collectionUpdates[spl_object_hash($coll)]); + return isset($this->scheduledCollectionUpdates[spl_object_hash($coll)]); } /** @@ -2939,7 +2940,7 @@ public function getDocumentIdentifier(object $document) */ public function hasPendingInsertions(): bool { - return ! empty($this->documentInsertions); + return ! empty($this->scheduledDocumentInsertions); } /** @@ -3034,7 +3035,7 @@ public function propertyChanged($sender, $propertyName, $oldValue, $newValue) */ public function getScheduledDocumentInsertions(): array { - return $this->documentInsertions; + return $this->scheduledDocumentInsertions; } /** @@ -3044,7 +3045,7 @@ public function getScheduledDocumentInsertions(): array */ public function getScheduledDocumentUpserts(): array { - return $this->documentUpserts; + return $this->scheduledDocumentUpserts; } /** @@ -3054,7 +3055,7 @@ public function getScheduledDocumentUpserts(): array */ public function getScheduledDocumentUpdates(): array { - return $this->documentUpdates; + return $this->scheduledDocumentUpdates; } /** @@ -3064,7 +3065,7 @@ public function getScheduledDocumentUpdates(): array */ public function getScheduledDocumentDeletions(): array { - return $this->documentDeletions; + return $this->scheduledDocumentDeletions; } /** @@ -3076,7 +3077,7 @@ public function getScheduledDocumentDeletions(): array */ public function getScheduledCollectionDeletions(): array { - return $this->collectionDeletions; + return $this->scheduledCollectionDeletions; } /** @@ -3088,7 +3089,7 @@ public function getScheduledCollectionDeletions(): array */ public function getScheduledCollectionUpdates(): array { - return $this->collectionUpdates; + return $this->scheduledCollectionUpdates; } /** @@ -3119,8 +3120,139 @@ public function isUninitializedObject(object $obj): bool }; } + /** @internal */ + public function stripTransactionOptions(array $options): array + { + return array_diff_key( + $options, + self::TRANSACTION_OPTIONS, + ); + } + private function objToStr(object $obj): string { return method_exists($obj, '__toString') ? (string) $obj : $obj::class . '@' . spl_object_hash($obj); } + + /** @psalm-param CommitOptions $options */ + private function doCommit(array $options): void + { + foreach ($this->getClassesForCommitAction($this->scheduledDocumentUpserts) as $classAndDocuments) { + [$class, $documents] = $classAndDocuments; + $this->executeUpserts($class, $documents, $options); + } + + foreach ($this->getClassesForCommitAction($this->scheduledDocumentInsertions) as $classAndDocuments) { + [$class, $documents] = $classAndDocuments; + $this->executeInserts($class, $documents, $options); + } + + foreach ($this->getClassesForCommitAction($this->scheduledDocumentUpdates) as $classAndDocuments) { + [$class, $documents] = $classAndDocuments; + $this->executeUpdates($class, $documents, $options); + } + + foreach ($this->getClassesForCommitAction($this->scheduledDocumentDeletions, true) as $classAndDocuments) { + [$class, $documents] = $classAndDocuments; + $this->executeDeletions($class, $documents, $options); + } + } + + /** @psalm-param CommitOptions $options */ + private function useTransaction(array $options): bool + { + if (isset($options['withTransaction'])) { + return $options['withTransaction']; + } + + return $this->dm->getConfiguration()->isTransactionalFlushEnabled(); + } + + /** @psalm-param CommitOptions $options */ + private function getTransactionOptions(array $options): array + { + return array_intersect_key( + array_merge( + $this->dm->getConfiguration()->getDefaultCommitOptions(), + $options, + ), + self::TRANSACTION_OPTIONS, + ); + } + + /** + * This following method was taken from the MongoDB Library and adapted to not use the default 120 seconds timeout. + * The code within this method is licensed under the Apache License. Copyright belongs to MongoDB, Inc. + * + * @see https://github.com/mongodb/mongo-php-library/blob/1.17.0/src/Operation/WithTransaction.php + * @see https://github.com/mongodb/specifications/blob/master/source/transactions-convenient-api/transactions-convenient-api.rst#pseudo-code + */ + private function withTransaction(Session $session, callable $callback, array $transactionOptions = []): void + { + $numAttempts = 0; + + while (true) { + $session->startTransaction($transactionOptions); + + try { + $numAttempts++; + call_user_func($callback, $session); + } catch (Throwable $e) { + if ($session->isInTransaction()) { + $session->abortTransaction(); + } + + if ( + $e instanceof RuntimeException && + $e->hasErrorLabel('TransientTransactionError') && + ! $this->shouldAbortWithTransaction($numAttempts) + ) { + continue; + } + + throw $e; + } + + if (! $session->isInTransaction()) { + // Assume callback intentionally ended the transaction + return; + } + + while (true) { + try { + $session->commitTransaction(); + } catch (RuntimeException $e) { + if ( + $e->getCode() !== 50 /* MaxTimeMSExpired */ && + $e->hasErrorLabel('UnknownTransactionCommitResult') && + ! $this->shouldAbortWithTransaction($numAttempts) + ) { + // Retry committing the transaction + continue; + } + + if ( + $e->hasErrorLabel('TransientTransactionError') && + ! $this->shouldAbortWithTransaction($numAttempts) + ) { + // Restart the transaction, invoking the callback again + continue 2; + } + + throw $e; + } + + // Commit was successful + break; + } + + // Transaction was successful + break; + } + } + + private function shouldAbortWithTransaction(int $numAttempts): bool + { + return $numAttempts >= 2; + } } diff --git a/lib/Doctrine/ODM/MongoDB/Utility/LifecycleEventManager.php b/lib/Doctrine/ODM/MongoDB/Utility/LifecycleEventManager.php index 46cbf3d17a..a44ff8f254 100644 --- a/lib/Doctrine/ODM/MongoDB/Utility/LifecycleEventManager.php +++ b/lib/Doctrine/ODM/MongoDB/Utility/LifecycleEventManager.php @@ -13,16 +13,40 @@ use Doctrine\ODM\MongoDB\Event\PreUpdateEventArgs; use Doctrine\ODM\MongoDB\Events; use Doctrine\ODM\MongoDB\Mapping\ClassMetadata; +use Doctrine\ODM\MongoDB\MongoDBException; use Doctrine\ODM\MongoDB\PersistentCollection\PersistentCollectionInterface; use Doctrine\ODM\MongoDB\UnitOfWork; +use MongoDB\Driver\Session; + +use function spl_object_hash; /** @internal */ final class LifecycleEventManager { + private bool $transactionalModeEnabled = false; + + private ?Session $session = null; + + /** @var array> */ + private array $transactionalEvents = []; + public function __construct(private DocumentManager $dm, private UnitOfWork $uow, private EventManager $evm) { } + public function clearTransactionalState(): void + { + $this->transactionalModeEnabled = false; + $this->session = null; + $this->transactionalEvents = []; + } + + public function enableTransactionalMode(Session $session): void + { + $this->transactionalModeEnabled = true; + $this->session = $session; + } + /** * @param mixed $id * @@ -55,11 +79,17 @@ public function postCollectionLoad(PersistentCollectionInterface $coll): void * * @template T of object */ - public function postPersist(ClassMetadata $class, object $document): void + public function postPersist(ClassMetadata $class, object $document, ?Session $session = null): void { - $class->invokeLifecycleCallbacks(Events::postPersist, $document, [new LifecycleEventArgs($document, $this->dm)]); - $this->dispatchEvent($class, Events::postPersist, new LifecycleEventArgs($document, $this->dm)); - $this->cascadePostPersist($class, $document); + if (! $this->shouldDispatchEvent($document, Events::postPersist, $session)) { + return; + } + + $eventArgs = new LifecycleEventArgs($document, $this->dm, $session); + + $class->invokeLifecycleCallbacks(Events::postPersist, $document, [$eventArgs]); + $this->dispatchEvent($class, Events::postPersist, $eventArgs); + $this->cascadePostPersist($class, $document, $session); } /** @@ -70,10 +100,16 @@ public function postPersist(ClassMetadata $class, object $document): void * * @template T of object */ - public function postRemove(ClassMetadata $class, object $document): void + public function postRemove(ClassMetadata $class, object $document, ?Session $session = null): void { - $class->invokeLifecycleCallbacks(Events::postRemove, $document, [new LifecycleEventArgs($document, $this->dm)]); - $this->dispatchEvent($class, Events::postRemove, new LifecycleEventArgs($document, $this->dm)); + if (! $this->shouldDispatchEvent($document, Events::postRemove, $session)) { + return; + } + + $eventArgs = new LifecycleEventArgs($document, $this->dm, $session); + + $class->invokeLifecycleCallbacks(Events::postRemove, $document, [$eventArgs]); + $this->dispatchEvent($class, Events::postRemove, $eventArgs); } /** @@ -85,11 +121,17 @@ public function postRemove(ClassMetadata $class, object $document): void * * @template T of object */ - public function postUpdate(ClassMetadata $class, object $document): void + public function postUpdate(ClassMetadata $class, object $document, ?Session $session = null): void { - $class->invokeLifecycleCallbacks(Events::postUpdate, $document, [new LifecycleEventArgs($document, $this->dm)]); - $this->dispatchEvent($class, Events::postUpdate, new LifecycleEventArgs($document, $this->dm)); - $this->cascadePostUpdate($class, $document); + if (! $this->shouldDispatchEvent($document, Events::postUpdate, $session)) { + return; + } + + $eventArgs = new LifecycleEventArgs($document, $this->dm, $session); + + $class->invokeLifecycleCallbacks(Events::postUpdate, $document, [$eventArgs]); + $this->dispatchEvent($class, Events::postUpdate, $eventArgs); + $this->cascadePostUpdate($class, $document, $session); } /** @@ -102,8 +144,14 @@ public function postUpdate(ClassMetadata $class, object $document): void */ public function prePersist(ClassMetadata $class, object $document): void { - $class->invokeLifecycleCallbacks(Events::prePersist, $document, [new LifecycleEventArgs($document, $this->dm)]); - $this->dispatchEvent($class, Events::prePersist, new LifecycleEventArgs($document, $this->dm)); + if (! $this->shouldDispatchEvent($document, Events::prePersist, null)) { + return; + } + + $eventArgs = new LifecycleEventArgs($document, $this->dm); + + $class->invokeLifecycleCallbacks(Events::prePersist, $document, [$eventArgs]); + $this->dispatchEvent($class, Events::prePersist, $eventArgs); } /** @@ -116,8 +164,14 @@ public function prePersist(ClassMetadata $class, object $document): void */ public function preRemove(ClassMetadata $class, object $document): void { - $class->invokeLifecycleCallbacks(Events::preRemove, $document, [new LifecycleEventArgs($document, $this->dm)]); - $this->dispatchEvent($class, Events::preRemove, new LifecycleEventArgs($document, $this->dm)); + if (! $this->shouldDispatchEvent($document, Events::preRemove, null)) { + return; + } + + $eventArgs = new LifecycleEventArgs($document, $this->dm); + + $class->invokeLifecycleCallbacks(Events::preRemove, $document, [$eventArgs]); + $this->dispatchEvent($class, Events::preRemove, $eventArgs); } /** @@ -128,15 +182,24 @@ public function preRemove(ClassMetadata $class, object $document): void * * @template T of object */ - public function preUpdate(ClassMetadata $class, object $document): void + public function preUpdate(ClassMetadata $class, object $document, ?Session $session = null): void { + if (! $this->shouldDispatchEvent($document, Events::preUpdate, $session)) { + return; + } + if (! empty($class->lifecycleCallbacks[Events::preUpdate])) { - $class->invokeLifecycleCallbacks(Events::preUpdate, $document, [new PreUpdateEventArgs($document, $this->dm, $this->uow->getDocumentChangeSet($document))]); + $eventArgs = new PreUpdateEventArgs($document, $this->dm, $this->uow->getDocumentChangeSet($document), $session); + $class->invokeLifecycleCallbacks(Events::preUpdate, $document, [$eventArgs]); $this->uow->recomputeSingleDocumentChangeSet($class, $document); } - $this->dispatchEvent($class, Events::preUpdate, new PreUpdateEventArgs($document, $this->dm, $this->uow->getDocumentChangeSet($document))); - $this->cascadePreUpdate($class, $document); + $this->dispatchEvent( + $class, + Events::preUpdate, + new PreUpdateEventArgs($document, $this->dm, $this->uow->getDocumentChangeSet($document), $session), + ); + $this->cascadePreUpdate($class, $document, $session); } /** @@ -147,7 +210,7 @@ public function preUpdate(ClassMetadata $class, object $document): void * * @template T of object */ - private function cascadePreUpdate(ClassMetadata $class, object $document): void + private function cascadePreUpdate(ClassMetadata $class, object $document, ?Session $session = null): void { foreach ($class->getEmbeddedFieldsMappings() as $mapping) { $value = $class->reflFields[$mapping['fieldName']]->getValue($document); @@ -162,7 +225,7 @@ private function cascadePreUpdate(ClassMetadata $class, object $document): void continue; } - $this->preUpdate($this->dm->getClassMetadata($entry::class), $entry); + $this->preUpdate($this->dm->getClassMetadata($entry::class), $entry, $session); } } } @@ -175,7 +238,7 @@ private function cascadePreUpdate(ClassMetadata $class, object $document): void * * @template T of object */ - private function cascadePostUpdate(ClassMetadata $class, object $document): void + private function cascadePostUpdate(ClassMetadata $class, object $document, ?Session $session = null): void { foreach ($class->getEmbeddedFieldsMappings() as $mapping) { $value = $class->reflFields[$mapping['fieldName']]->getValue($document); @@ -192,10 +255,17 @@ private function cascadePostUpdate(ClassMetadata $class, object $document): void $entryClass = $this->dm->getClassMetadata($entry::class); $event = $this->uow->isScheduledForInsert($entry) ? Events::postPersist : Events::postUpdate; - $entryClass->invokeLifecycleCallbacks($event, $entry, [new LifecycleEventArgs($entry, $this->dm)]); - $this->dispatchEvent($entryClass, $event, new LifecycleEventArgs($entry, $this->dm)); - $this->cascadePostUpdate($entryClass, $entry); + if (! $this->shouldDispatchEvent($entry, $event, $session)) { + continue; + } + + $eventArgs = new LifecycleEventArgs($entry, $this->dm, $session); + + $entryClass->invokeLifecycleCallbacks($event, $entry, [$eventArgs]); + $this->dispatchEvent($entryClass, $event, $eventArgs); + + $this->cascadePostUpdate($entryClass, $entry, $session); } } } @@ -208,7 +278,7 @@ private function cascadePostUpdate(ClassMetadata $class, object $document): void * * @template T of object */ - private function cascadePostPersist(ClassMetadata $class, object $document): void + private function cascadePostPersist(ClassMetadata $class, object $document, ?Session $session = null): void { foreach ($class->getEmbeddedFieldsMappings() as $mapping) { $value = $class->reflFields[$mapping['fieldName']]->getValue($document); @@ -218,7 +288,7 @@ private function cascadePostPersist(ClassMetadata $class, object $document): voi $values = $mapping['type'] === ClassMetadata::ONE ? [$value] : $value; foreach ($values as $embeddedDocument) { - $this->postPersist($this->dm->getClassMetadata($embeddedDocument::class), $embeddedDocument); + $this->postPersist($this->dm->getClassMetadata($embeddedDocument::class), $embeddedDocument, $session); } } } @@ -232,4 +302,23 @@ private function dispatchEvent(ClassMetadata $class, string $eventName, ?EventAr $this->evm->dispatchEvent($eventName, $eventArgs); } + + private function shouldDispatchEvent(object $document, string $eventName, ?Session $session): bool + { + if (! $this->transactionalModeEnabled) { + return true; + } + + if ($session !== $this->session) { + throw MongoDBException::transactionalSessionMismatch(); + } + + // Check whether the event has already been dispatched. + $hasDispatched = isset($this->transactionalEvents[spl_object_hash($document)][$eventName]); + + // Mark the event as dispatched - no problem doing this if it already was dispatched + $this->transactionalEvents[spl_object_hash($document)][$eventName] = true; + + return ! $hasDispatched; + } } diff --git a/phpstan-baseline.neon b/phpstan-baseline.neon index a8424a74f5..b24788a7ab 100644 --- a/phpstan-baseline.neon +++ b/phpstan-baseline.neon @@ -545,6 +545,11 @@ parameters: count: 1 path: lib/Doctrine/ODM/MongoDB/PersistentCollection.php + - + message: "#^Method Doctrine\\\\ODM\\\\MongoDB\\\\Persisters\\\\DocumentPersister\\:\\:isInTransaction\\(\\) has parameter \\$options with no value type specified in iterable type array\\.$#" + count: 1 + path: lib/Doctrine/ODM/MongoDB/Persisters/DocumentPersister.php + - message: "#^Method Doctrine\\\\ODM\\\\MongoDB\\\\Proxy\\\\Factory\\\\StaticProxyFactory\\:\\:createInitializer\\(\\) should return Closure\\(ProxyManager\\\\Proxy\\\\GhostObjectInterface\\&TDocument\\=, string\\=, array\\\\=, Closure\\|null\\=, array\\\\=\\)\\: bool but returns Closure\\(ProxyManager\\\\Proxy\\\\GhostObjectInterface, string, array, mixed, array\\)\\: true\\.$#" count: 1 @@ -600,6 +605,26 @@ parameters: count: 1 path: lib/Doctrine/ODM/MongoDB/Types/DateType.php + - + message: "#^Method Doctrine\\\\ODM\\\\MongoDB\\\\UnitOfWork\\:\\:getTransactionOptions\\(\\) return type has no value type specified in iterable type array\\.$#" + count: 1 + path: lib/Doctrine/ODM/MongoDB/UnitOfWork.php + + - + message: "#^Method Doctrine\\\\ODM\\\\MongoDB\\\\UnitOfWork\\:\\:stripTransactionOptions\\(\\) has parameter \\$options with no value type specified in iterable type array\\.$#" + count: 1 + path: lib/Doctrine/ODM/MongoDB/UnitOfWork.php + + - + message: "#^Method Doctrine\\\\ODM\\\\MongoDB\\\\UnitOfWork\\:\\:stripTransactionOptions\\(\\) return type has no value type specified in iterable type array\\.$#" + count: 1 + path: lib/Doctrine/ODM/MongoDB/UnitOfWork.php + + - + message: "#^Method Doctrine\\\\ODM\\\\MongoDB\\\\UnitOfWork\\:\\:withTransaction\\(\\) has parameter \\$transactionOptions with no value type specified in iterable type array\\.$#" + count: 1 + path: lib/Doctrine/ODM/MongoDB/UnitOfWork.php + - message: "#^Unable to resolve the template type T in call to method Doctrine\\\\ODM\\\\MongoDB\\\\DocumentManager\\:\\:getClassMetadata\\(\\)$#" count: 1 diff --git a/psalm-baseline.xml b/psalm-baseline.xml index b62935081a..bd7271b148 100644 --- a/psalm-baseline.xml +++ b/psalm-baseline.xml @@ -1,5 +1,5 @@ - + IteratorAggregate @@ -301,6 +301,9 @@ + + name])]]> + @@ -489,6 +492,9 @@ $options + $expectedWriteOptions + + empty($dbCommands[$databaseName]) + diff --git a/tests/Doctrine/ODM/MongoDB/Tests/BaseTestCase.php b/tests/Doctrine/ODM/MongoDB/Tests/BaseTestCase.php index 9941e7f284..66afcd1f07 100644 --- a/tests/Doctrine/ODM/MongoDB/Tests/BaseTestCase.php +++ b/tests/Doctrine/ODM/MongoDB/Tests/BaseTestCase.php @@ -11,15 +11,24 @@ use Doctrine\ODM\MongoDB\UnitOfWork; use Doctrine\Persistence\Mapping\Driver\MappingDriver; use MongoDB\Client; +use MongoDB\Driver\Manager; +use MongoDB\Driver\Server; use MongoDB\Model\DatabaseInfo; use PHPUnit\Framework\TestCase; use function array_key_exists; use function array_map; +use function count; +use function explode; use function getenv; +use function implode; use function in_array; use function iterator_to_array; +use function parse_url; use function preg_match; +use function strlen; +use function strpos; +use function substr_replace; use function version_compare; use const DOCTRINE_MONGODB_DATABASE; @@ -27,6 +36,8 @@ abstract class BaseTestCase extends TestCase { + protected static ?bool $supportsTransactions; + protected static bool $allowsTransactions = true; protected ?DocumentManager $dm; protected UnitOfWork $uow; @@ -80,6 +91,9 @@ protected static function getConfiguration(): Configuration $config->addFilter('testFilter', Filter::class); $config->addFilter('testFilter2', Filter::class); + // Enable transactions if supported + $config->setUseTransactionalFlush(static::$allowsTransactions && self::supportsTransactions()); + return $config; } @@ -108,7 +122,7 @@ protected static function createMetadataDriverImpl(): MappingDriver protected static function createTestDocumentManager(): DocumentManager { $config = static::getConfiguration(); - $client = new Client(getenv('DOCTRINE_MONGODB_SERVER') ?: DOCTRINE_MONGODB_SERVER); + $client = new Client(self::getUri()); return DocumentManager::create($client, $config); } @@ -120,6 +134,32 @@ protected function getServerVersion(): string return $result['version']; } + protected function getPrimaryServer(): Server + { + return $this->dm->getClient()->getManager()->selectServer(); + } + + protected function skipTestIfNoTransactionSupport(): void + { + if (! self::supportsTransactions()) { + $this->markTestSkipped('Test requires a topology that supports transactions'); + } + } + + protected function skipTestIfTransactionalFlushDisabled(): void + { + if (! $this->dm?->getConfiguration()->isTransactionalFlushEnabled()) { + $this->markTestSkipped('Test only applies when transactional flush is enabled'); + } + } + + protected function skipTestIfTransactionalFlushEnabled(): void + { + if ($this->dm?->getConfiguration()->isTransactionalFlushEnabled()) { + $this->markTestSkipped('Test is not compatible with transactional flush'); + } + } + /** @psalm-param class-string $className */ protected function skipTestIfNotSharded(string $className): void { @@ -162,4 +202,55 @@ protected function requireMongoDB42(string $message): void { $this->requireVersion($this->getServerVersion(), '4.2.0', '<', $message); } + + protected static function getUri(bool $useMultipleMongoses = true): string + { + $uri = getenv('DOCTRINE_MONGODB_SERVER') ?: DOCTRINE_MONGODB_SERVER; + + return $useMultipleMongoses ? $uri : self::removeMultipleHosts($uri); + } + + /** + * Removes any hosts beyond the first in a URI. This function should only be + * used with a sharded cluster URI, but that is not enforced. + */ + protected static function removeMultipleHosts(string $uri): string + { + $parts = parse_url($uri); + + self::assertIsArray($parts); + + $hosts = explode(',', $parts['host']); + + // Nothing to do if the URI already has a single mongos host + if (count($hosts) === 1) { + return $uri; + } + + // Re-append port to last host + if (isset($parts['port'])) { + $hosts[count($hosts) - 1] .= ':' . $parts['port']; + } + + $singleHost = $hosts[0]; + $multipleHosts = implode(',', $hosts); + + $pos = strpos($uri, $multipleHosts); + + self::assertNotFalse($pos); + + return substr_replace($uri, $singleHost, $pos, strlen($multipleHosts)); + } + + protected static function supportsTransactions(): bool + { + return self::$supportsTransactions ??= self::detectTransactionSupport(); + } + + private static function detectTransactionSupport(): bool + { + $manager = new Manager(self::getUri()); + + return $manager->selectServer()->getType() !== Server::TYPE_STANDALONE; + } } diff --git a/tests/Doctrine/ODM/MongoDB/Tests/ConfigurationTest.php b/tests/Doctrine/ODM/MongoDB/Tests/ConfigurationTest.php index faf41361fc..38797b822c 100644 --- a/tests/Doctrine/ODM/MongoDB/Tests/ConfigurationTest.php +++ b/tests/Doctrine/ODM/MongoDB/Tests/ConfigurationTest.php @@ -27,4 +27,17 @@ public function testDefaultPersistentCollectionGenerator(): void self::assertInstanceOf(PersistentCollectionGenerator::class, $generator); self::assertSame($generator, $c->getPersistentCollectionGenerator()); } + + public function testEnableTransactionalFlush(): void + { + $c = new Configuration(); + + self::assertFalse($c->isTransactionalFlushEnabled(), 'Transactional flush is disabled by default'); + + $c->setUseTransactionalFlush(true); + self::assertTrue($c->isTransactionalFlushEnabled(), 'Transactional flush is enabled after setTransactionalFlush(true)'); + + $c->setUseTransactionalFlush(false); + self::assertFalse($c->isTransactionalFlushEnabled(), 'Transactional flush is disabled after setTransactionalFlush(false)'); + } } diff --git a/tests/Doctrine/ODM/MongoDB/Tests/Events/TransactionalLifecycleEventsTest.php b/tests/Doctrine/ODM/MongoDB/Tests/Events/TransactionalLifecycleEventsTest.php new file mode 100644 index 0000000000..f5bddaf2a3 --- /dev/null +++ b/tests/Doctrine/ODM/MongoDB/Tests/Events/TransactionalLifecycleEventsTest.php @@ -0,0 +1,275 @@ +skipTestIfTransactionalFlushDisabled(); + } + + public function tearDown(): void + { + $this->dm->getClient()->selectDatabase('admin')->command([ + 'configureFailPoint' => 'failCommand', + 'mode' => 'off', + ]); + + parent::tearDown(); + } + + public function testPersistEvents(): void + { + $root = new RootEventDocument(); + $root->name = 'root'; + + $root->embedded = new EmbeddedEventDocument(); + $root->embedded->name = 'embedded'; + + $this->createFailPoint('insert'); + + $this->dm->persist($root); + $this->dm->flush(); + + $this->assertSame(1, $root->postPersist); + $this->assertSame(1, $root->embedded->postPersist); + } + + public function testUpdateEvents(): void + { + $root = new RootEventDocument(); + $root->name = 'root'; + + $root->embedded = new EmbeddedEventDocument(); + $root->embedded->name = 'embedded'; + + $this->dm->persist($root); + $this->dm->flush(); + + $this->createFailPoint('update'); + + $root->name = 'updated'; + $root->embedded->name = 'updated'; + + $this->dm->flush(); + + $this->assertSame(1, $root->preUpdate); + $this->assertSame(1, $root->postUpdate); + $this->assertSame(1, $root->embedded->preUpdate); + $this->assertSame(1, $root->embedded->postUpdate); + } + + public function testUpdateEventsRootOnly(): void + { + $root = new RootEventDocument(); + $root->name = 'root'; + + $root->embedded = new EmbeddedEventDocument(); + $root->embedded->name = 'embedded'; + + $this->dm->persist($root); + $this->dm->flush(); + + $this->createFailPoint('update'); + + $root->name = 'updated'; + + $this->dm->flush(); + + $this->assertSame(1, $root->preUpdate); + $this->assertSame(1, $root->postUpdate); + $this->assertSame(0, $root->embedded->preUpdate); + $this->assertSame(0, $root->embedded->postUpdate); + } + + public function testUpdateEventsEmbeddedOnly(): void + { + $root = new RootEventDocument(); + $root->name = 'root'; + + $root->embedded = new EmbeddedEventDocument(); + $root->embedded->name = 'embedded'; + + $this->dm->persist($root); + $this->dm->flush(); + + $this->createFailPoint('update'); + + $root->embedded->name = 'updated'; + + $this->dm->flush(); + + $this->assertSame(1, $root->preUpdate); + $this->assertSame(1, $root->postUpdate); + + $this->assertSame(1, $root->embedded->preUpdate); + $this->assertSame(1, $root->embedded->postUpdate); + } + + public function testUpdateEventsWithNewEmbeddedDocument(): void + { + $firstEmbedded = new EmbeddedEventDocument(); + $firstEmbedded->name = 'embedded'; + + $secondEmbedded = new EmbeddedEventDocument(); + $secondEmbedded->name = 'new'; + + $root = new RootEventDocument(); + $root->name = 'root'; + $root->embedded = $firstEmbedded; + + $this->dm->persist($root); + $this->dm->flush(); + + $this->createFailPoint('update'); + + $root->name = 'updated'; + $root->embedded = $secondEmbedded; + + $this->dm->flush(); + + $this->assertSame(1, $root->preUpdate); + $this->assertSame(1, $root->postUpdate); + + // First embedded document was removed but not updated + $this->assertSame(1, $firstEmbedded->postRemove); + $this->assertSame(0, $firstEmbedded->preUpdate); + $this->assertSame(0, $firstEmbedded->postUpdate); + + // Second embedded document was persisted but not updated + $this->assertSame(1, $secondEmbedded->postPersist); + $this->assertSame(0, $secondEmbedded->preUpdate); + $this->assertSame(0, $secondEmbedded->postUpdate); + } + + public function testRemoveEvents(): void + { + $root = new RootEventDocument(); + $root->name = 'root'; + + $root->embedded = new EmbeddedEventDocument(); + $root->embedded->name = 'embedded'; + + $this->dm->persist($root); + $this->dm->flush(); + + $this->createFailPoint('delete'); + + $this->dm->remove($root); + $this->dm->flush(); + + $this->assertSame(1, $root->postRemove); + $this->assertSame(1, $root->embedded->postRemove); + } + + /** Create a document manager with a single host to ensure failpoints target the correct server */ + protected static function createTestDocumentManager(): DocumentManager + { + $config = static::getConfiguration(); + $client = new Client(self::getUri(false), [], ['typeMap' => ['root' => 'array', 'document' => 'array']]); + + return DocumentManager::create($client, $config); + } + + private function createFailPoint(string $failCommand): void + { + $this->dm->getClient()->selectDatabase('admin')->command([ + 'configureFailPoint' => 'failCommand', + 'mode' => ['times' => 1], + 'data' => [ + 'errorCode' => 192, // FailPointEnabled + 'errorLabels' => ['TransientTransactionError'], + 'failCommands' => [$failCommand], + ], + ]); + } +} + +/** + * @ODM\MappedSuperclass + * @ODM\HasLifecycleCallbacks + */ +abstract class BaseEventDocument +{ + public function __construct() + { + } + + /** + * @ODM\Field(type="string") + * + * @var string|null + */ + public $name; + + public int $preUpdate = 0; + + public int $postPersist = 0; + + public int $postUpdate = 0; + + public int $postRemove = 0; + + /** @ODM\PreUpdate */ + public function preUpdate(Event\PreUpdateEventArgs $e): void + { + $this->assertTransactionState($e); + $this->preUpdate++; + } + + /** @ODM\PostPersist */ + public function postPersist(Event\LifecycleEventArgs $e): void + { + $this->assertTransactionState($e); + $this->postPersist++; + } + + /** @ODM\PostUpdate */ + public function postUpdate(Event\LifecycleEventArgs $e): void + { + $this->assertTransactionState($e); + $this->postUpdate++; + } + + /** @ODM\PostRemove */ + public function postRemove(Event\LifecycleEventArgs $e): void + { + $this->assertTransactionState($e); + $this->postRemove++; + } + + private function assertTransactionState(LifecycleEventArgs $e): void + { + Assert::assertTrue($e->isInTransaction()); + Assert::assertInstanceOf(Session::class, $e->session); + } +} + +/** @ODM\EmbeddedDocument */ +class EmbeddedEventDocument extends BaseEventDocument +{ +} + +/** @ODM\Document */ +class RootEventDocument extends BaseEventDocument +{ + /** @ODM\Id */ + public string $id; + + /** @ODM\EmbedOne(targetDocument=EmbeddedEventDocument::class) */ + public ?EmbeddedEventDocument $embedded; +} diff --git a/tests/Doctrine/ODM/MongoDB/Tests/Functional/AtomicSetTest.php b/tests/Doctrine/ODM/MongoDB/Tests/Functional/AtomicSetTest.php index 4047de34cf..dd5cdbc3ae 100644 --- a/tests/Doctrine/ODM/MongoDB/Tests/Functional/AtomicSetTest.php +++ b/tests/Doctrine/ODM/MongoDB/Tests/Functional/AtomicSetTest.php @@ -25,6 +25,9 @@ */ class AtomicSetTest extends BaseTestCase { + // This test counts executed commands and thus doesn't work with transactions + protected static bool $allowsTransactions = false; + private CommandLogger $logger; public function setUp(): void diff --git a/tests/Doctrine/ODM/MongoDB/Tests/Functional/CollectionPersisterTest.php b/tests/Doctrine/ODM/MongoDB/Tests/Functional/CollectionPersisterTest.php index c2ef520fdb..35b3c27454 100644 --- a/tests/Doctrine/ODM/MongoDB/Tests/Functional/CollectionPersisterTest.php +++ b/tests/Doctrine/ODM/MongoDB/Tests/Functional/CollectionPersisterTest.php @@ -15,6 +15,9 @@ class CollectionPersisterTest extends BaseTestCase { + // This test counts executed commands and thus doesn't work with transactions + protected static bool $allowsTransactions = false; + private CommandLogger $logger; public function setUp(): void diff --git a/tests/Doctrine/ODM/MongoDB/Tests/Functional/CommitImprovementTest.php b/tests/Doctrine/ODM/MongoDB/Tests/Functional/CommitImprovementTest.php index a16571c702..0ef9381496 100644 --- a/tests/Doctrine/ODM/MongoDB/Tests/Functional/CommitImprovementTest.php +++ b/tests/Doctrine/ODM/MongoDB/Tests/Functional/CommitImprovementTest.php @@ -18,6 +18,9 @@ class CommitImprovementTest extends BaseTestCase { + // This test counts executed commands and thus doesn't work with transactions + protected static bool $allowsTransactions = false; + private CommandLogger $logger; public function setUp(): void diff --git a/tests/Doctrine/ODM/MongoDB/Tests/Functional/DocumentPersisterTest.php b/tests/Doctrine/ODM/MongoDB/Tests/Functional/DocumentPersisterTest.php index 5e36e96909..aa8313c528 100644 --- a/tests/Doctrine/ODM/MongoDB/Tests/Functional/DocumentPersisterTest.php +++ b/tests/Doctrine/ODM/MongoDB/Tests/Functional/DocumentPersisterTest.php @@ -632,6 +632,8 @@ public static function dataProviderTestWriteConcern(): array #[DataProvider('dataProviderTestWriteConcern')] public function testExecuteInsertsRespectsWriteConcern(string $class, $writeConcern): void { + $this->skipTestIfTransactionalFlushEnabled(); + $documentPersister = $this->uow->getDocumentPersister($class); $collection = $this->createMock(Collection::class); @@ -648,6 +650,28 @@ public function testExecuteInsertsRespectsWriteConcern(string $class, $writeConc $this->dm->flush(); } + /** @psalm-param class-string $class */ + #[DataProvider('dataProviderTestWriteConcern')] + public function testExecuteInsertsOmitsWriteConcernInTransaction(string $class): void + { + $this->skipTestIfTransactionalFlushDisabled(); + + $documentPersister = $this->uow->getDocumentPersister($class); + + $collection = $this->createMock(Collection::class); + $collection->expects($this->once()) + ->method('insertMany') + ->with($this->isType('array'), $this->logicalNot($this->arrayHasKey('writeConcern'))); + + $reflectionProperty = new ReflectionProperty($documentPersister, 'collection'); + $reflectionProperty->setAccessible(true); + $reflectionProperty->setValue($documentPersister, $collection); + + $testDocument = new $class(); + $this->dm->persist($testDocument); + $this->dm->flush(); + } + /** * @param int|string $writeConcern * @psalm-param class-string $class @@ -655,6 +679,8 @@ public function testExecuteInsertsRespectsWriteConcern(string $class, $writeConc #[DataProvider('dataProviderTestWriteConcern')] public function testExecuteUpsertsRespectsWriteConcern(string $class, $writeConcern): void { + $this->skipTestIfTransactionalFlushEnabled(); + $documentPersister = $this->uow->getDocumentPersister($class); $collection = $this->createMock(Collection::class); @@ -672,6 +698,29 @@ public function testExecuteUpsertsRespectsWriteConcern(string $class, $writeConc $this->dm->flush(); } + /** @psalm-param class-string $class */ + #[DataProvider('dataProviderTestWriteConcern')] + public function testExecuteUpsertsDoesNotUseWriteConcernInTransaction(string $class): void + { + $this->skipTestIfTransactionalFlushDisabled(); + + $documentPersister = $this->uow->getDocumentPersister($class); + + $collection = $this->createMock(Collection::class); + $collection->expects($this->once()) + ->method('updateOne') + ->with($this->isType('array'), $this->logicalNot($this->arrayHasKey('writeConcern'))); + + $reflectionProperty = new ReflectionProperty($documentPersister, 'collection'); + $reflectionProperty->setAccessible(true); + $reflectionProperty->setValue($documentPersister, $collection); + + $testDocument = new $class(); + $testDocument->id = new ObjectId(); + $this->dm->persist($testDocument); + $this->dm->flush(); + } + /** * @param int|string $writeConcern * @psalm-param class-string $class @@ -679,6 +728,8 @@ public function testExecuteUpsertsRespectsWriteConcern(string $class, $writeConc #[DataProvider('dataProviderTestWriteConcern')] public function testRemoveRespectsWriteConcern(string $class, $writeConcern): void { + $this->skipTestIfTransactionalFlushEnabled(); + $documentPersister = $this->uow->getDocumentPersister($class); $collection = $this->createMock(Collection::class); @@ -698,8 +749,35 @@ public function testRemoveRespectsWriteConcern(string $class, $writeConcern): vo $this->dm->flush(); } + /** @psalm-param class-string $class */ + #[DataProvider('dataProviderTestWriteConcern')] + public function testRemoveDoesNotUseWriteConcernInTransaction(string $class): void + { + $this->skipTestIfTransactionalFlushDisabled(); + + $documentPersister = $this->uow->getDocumentPersister($class); + + $collection = $this->createMock(Collection::class); + $collection->expects($this->once()) + ->method('deleteOne') + ->with($this->isType('array'), $this->logicalNot($this->arrayHasKey('writeConcern'))); + + $reflectionProperty = new ReflectionProperty($documentPersister, 'collection'); + $reflectionProperty->setAccessible(true); + $reflectionProperty->setValue($documentPersister, $collection); + + $testDocument = new $class(); + $this->dm->persist($testDocument); + $this->dm->flush(); + + $this->dm->remove($testDocument); + $this->dm->flush(); + } + public function testDefaultWriteConcernIsRespected(): void { + $this->skipTestIfTransactionalFlushEnabled(); + $class = DocumentPersisterTestDocument::class; $documentPersister = $this->uow->getDocumentPersister($class); @@ -719,8 +797,33 @@ public function testDefaultWriteConcernIsRespected(): void $this->dm->flush(); } + public function testDefaultWriteConcernIsIgnoredInTransaction(): void + { + $this->skipTestIfTransactionalFlushDisabled(); + + $class = DocumentPersisterTestDocument::class; + $documentPersister = $this->uow->getDocumentPersister($class); + + $collection = $this->createMock(Collection::class); + $collection->expects($this->once()) + ->method('insertMany') + ->with($this->isType('array'), $this->logicalNot($this->arrayHasKey('writeConcern'))); + + $reflectionProperty = new ReflectionProperty($documentPersister, 'collection'); + $reflectionProperty->setAccessible(true); + $reflectionProperty->setValue($documentPersister, $collection); + + $this->dm->getConfiguration()->setDefaultCommitOptions(['writeConcern' => new WriteConcern(1)]); + + $testDocument = new $class(); + $this->dm->persist($testDocument); + $this->dm->flush(); + } + public function testDefaultWriteConcernIsRespectedBackwardCompatibility(): void { + $this->skipTestIfTransactionalFlushEnabled(); + $class = DocumentPersisterTestDocument::class; $documentPersister = $this->uow->getDocumentPersister($class); diff --git a/tests/Doctrine/ODM/MongoDB/Tests/Functional/ReferencePrimerTest.php b/tests/Doctrine/ODM/MongoDB/Tests/Functional/ReferencePrimerTest.php index 11539f282d..44c2c5c40e 100644 --- a/tests/Doctrine/ODM/MongoDB/Tests/Functional/ReferencePrimerTest.php +++ b/tests/Doctrine/ODM/MongoDB/Tests/Functional/ReferencePrimerTest.php @@ -389,7 +389,9 @@ public function testPrimeReferencesInvokesPrimer(): void $invokedArgs[] = func_get_args(); }; - $readPreference = new ReadPreference(ReadPreference::RP_SECONDARY_PREFERRED); + // Note: using a secondary read preference here can cause issues when using transactions + // Using a primaryPreferred works just as well to check if the hint is passed on to the primer + $readPreference = new ReadPreference(ReadPreference::RP_PRIMARY_PREFERRED); $this->dm->createQueryBuilder(User::class) ->field('account')->prime($primer) ->field('groups')->prime($primer) diff --git a/tests/Doctrine/ODM/MongoDB/Tests/Functional/ShardKeyTest.php b/tests/Doctrine/ODM/MongoDB/Tests/Functional/ShardKeyTest.php index e5aa6de7bc..c4eafc9f96 100644 --- a/tests/Doctrine/ODM/MongoDB/Tests/Functional/ShardKeyTest.php +++ b/tests/Doctrine/ODM/MongoDB/Tests/Functional/ShardKeyTest.php @@ -11,6 +11,7 @@ use MongoDB\BSON\ObjectId; use PHPUnit\Framework\Attributes\Group; +use function array_shift; use function assert; use function end; @@ -48,13 +49,15 @@ public function testUpdateAfterSave(): void $o = $this->dm->find($o::class, $o->id); assert($o instanceof ShardedOne); $o->title = 'test2'; + + $this->logger->clear(); $this->dm->flush(); - $queries = $this->logger->getAll(); - $lastQuery = end($queries); - self::assertSame('update', $lastQuery->getCommandName()); + $queries = $this->logger->getAll(); + $updateQuery = array_shift($queries); + self::assertSame('update', $updateQuery->getCommandName()); - $command = $lastQuery->getCommand(); + $command = $updateQuery->getCommand(); self::assertIsArray($command->updates); self::assertCount(1, $command->updates); self::assertEquals($o->key, $command->updates[0]->q->k); @@ -67,11 +70,11 @@ public function testUpsert(): void $this->dm->persist($o); $this->dm->flush(); - $queries = $this->logger->getAll(); - $lastQuery = end($queries); - self::assertSame('update', $lastQuery->getCommandName()); + $queries = $this->logger->getAll(); + $upsertQuery = array_shift($queries); + self::assertSame('update', $upsertQuery->getCommandName()); - $command = $lastQuery->getCommand(); + $command = $upsertQuery->getCommand(); self::assertIsArray($command->updates); self::assertCount(1, $command->updates); self::assertEquals($o->key, $command->updates[0]->q->k); @@ -83,14 +86,17 @@ public function testRemove(): void $o = new ShardedOne(); $this->dm->persist($o); $this->dm->flush(); + $this->dm->remove($o); + + $this->logger->clear(); $this->dm->flush(); - $queries = $this->logger->getAll(); - $lastQuery = end($queries); - self::assertSame('delete', $lastQuery->getCommandName()); + $queries = $this->logger->getAll(); + $removeQuery = array_shift($queries); + self::assertSame('delete', $removeQuery->getCommandName()); - $command = $lastQuery->getCommand(); + $command = $removeQuery->getCommand(); self::assertIsArray($command->deletes); self::assertCount(1, $command->deletes); self::assertEquals($o->key, $command->deletes[0]->q->k); diff --git a/tests/Doctrine/ODM/MongoDB/Tests/Functional/Ticket/GH1138Test.php b/tests/Doctrine/ODM/MongoDB/Tests/Functional/Ticket/GH1138Test.php index c06b34540c..9e80fcffd7 100644 --- a/tests/Doctrine/ODM/MongoDB/Tests/Functional/Ticket/GH1138Test.php +++ b/tests/Doctrine/ODM/MongoDB/Tests/Functional/Ticket/GH1138Test.php @@ -12,6 +12,9 @@ class GH1138Test extends BaseTestCase { + // This test counts executed commands and thus doesn't work with transactions + protected static bool $allowsTransactions = false; + private CommandLogger $logger; public function setUp(): void diff --git a/tests/Doctrine/ODM/MongoDB/Tests/UnitOfWorkCommitConsistencyTest.php b/tests/Doctrine/ODM/MongoDB/Tests/UnitOfWorkCommitConsistencyTest.php new file mode 100644 index 0000000000..71bc7c2a26 --- /dev/null +++ b/tests/Doctrine/ODM/MongoDB/Tests/UnitOfWorkCommitConsistencyTest.php @@ -0,0 +1,461 @@ +dm->getClient()->selectDatabase('admin')->command([ + 'configureFailPoint' => 'failCommand', + 'mode' => 'off', + ]); + + parent::tearDown(); + } + + public function testInsertErrorKeepsFailingInsertions(): void + { + $firstUser = new ForumUser(); + $firstUser->username = 'alcaeus'; + $this->uow->persist($firstUser); + + $secondUser = new ForumUser(); + $secondUser->username = 'jmikola'; + $this->uow->persist($secondUser); + + $friendUser = new FriendUser('GromNaN'); + $this->uow->persist($friendUser); + + // Add failpoint to let the first insert command fail. This affects the ForumUser documents + $this->createFailpoint('insert'); + + try { + $this->uow->commit(); + self::fail('Expected exception when committing'); + } catch (Throwable) { + } + + self::assertSame( + 0, + $this->dm->getDocumentCollection(ForumUser::class)->countDocuments(), + ); + + self::assertTrue($this->uow->isScheduledForInsert($firstUser)); + self::assertNotEquals([], $this->uow->getDocumentChangeSet($firstUser)); + + self::assertTrue($this->uow->isScheduledForInsert($secondUser)); + self::assertNotEquals([], $this->uow->getDocumentChangeSet($secondUser)); + + self::assertTrue($this->uow->isScheduledForInsert($friendUser)); + self::assertNotEquals([], $this->uow->getDocumentChangeSet($friendUser)); + } + + public function testInsertErrorKeepsFailingInsertionsForDocumentClass(): void + { + // Create a unique index on the collection to let the second document fail, as using a fail point would also + // affect the first document. + $collection = $this->dm->getDocumentCollection(ForumUser::class); + $collection->createIndex(['username' => 1], ['unique' => true]); + + $firstUser = new ForumUser(); + $firstUser->username = 'alcaeus'; + $this->uow->persist($firstUser); + + $secondUser = new ForumUser(); + $secondUser->username = 'alcaeus'; + $this->uow->persist($secondUser); + + $thirdUser = new ForumUser(); + $thirdUser->username = 'jmikola'; + $this->uow->persist($thirdUser); + + try { + $this->uow->commit(); + self::fail('Expected exception when committing'); + } catch (Throwable) { + } + + // One user inserted, the second insert failed, the last was skipped + self::assertSame( + 1, + $this->dm->getDocumentCollection(ForumUser::class)->countDocuments(), + ); + + // Wrong behaviour: user was saved and should no longer be scheduled for insertion + self::assertTrue($this->uow->isScheduledForInsert($firstUser)); + // Wrong behaviour: changeset should be empty + self::assertNotEquals([], $this->uow->getDocumentChangeSet($firstUser)); + + self::assertTrue($this->uow->isScheduledForInsert($secondUser)); + self::assertNotEquals([], $this->uow->getDocumentChangeSet($secondUser)); + + self::assertTrue($this->uow->isScheduledForInsert($thirdUser)); + self::assertNotEquals([], $this->uow->getDocumentChangeSet($thirdUser)); + } + + public function testInsertErrorWithEmbeddedDocumentKeepsInsertions(): void + { + // Create a unique index on the collection to let the second insert fail + $collection = $this->dm->getDocumentCollection(User::class); + $collection->createIndex(['username' => 1], ['unique' => true]); + + $firstAddress = new Address(); + $firstAddress->setCity('Olching'); + $firstUser = new User(); + $firstUser->setUsername('alcaeus'); + $firstUser->setAddress($firstAddress); + + $secondAddress = new Address(); + $secondAddress->setCity('Olching'); + $secondUser = new User(); + $secondUser->setUsername('alcaeus'); + $secondUser->setAddress($secondAddress); + + $this->uow->persist($firstUser); + $this->uow->persist($secondUser); + + try { + $this->uow->commit(); + self::fail('Expected exception when committing'); + } catch (Throwable) { + } + + // First document inserted, second failed due to index error + self::assertSame(1, $collection->countDocuments()); + + // Wrong behaviour: document should no longer be scheduled and changeset should be cleared + $this->assertTrue($this->uow->isScheduledForInsert($firstUser)); + $this->assertNotEquals([], $this->uow->getDocumentChangeSet($firstUser)); + + // Wrong behaviour: document should no longer be scheduled for insertion and changeset cleared + $this->assertTrue($this->uow->isScheduledForInsert($firstAddress)); + $this->assertNotEquals([], $this->uow->getDocumentChangeSet($firstAddress)); + + $this->assertTrue($this->uow->isScheduledForInsert($secondUser)); + $this->assertNotEquals([], $this->uow->getDocumentChangeSet($secondUser)); + $this->assertTrue($this->uow->isScheduledForInsert($secondAddress)); + $this->assertNotEquals([], $this->uow->getDocumentChangeSet($secondAddress)); + } + + public function testUpsertErrorDropsFailingUpserts(): void + { + $user = new ForumUser(); + $user->id = new ObjectId(); // Specifying an identifier makes this an upsert + $user->username = 'alcaeus'; + $this->uow->persist($user); + + $this->createFailpoint('update'); + + try { + $this->uow->commit(); + self::fail('Expected exception when committing'); + } catch (Throwable) { + } + + // No document was inserted + self::assertSame( + 0, + $this->dm->getDocumentCollection(ForumUser::class)->countDocuments(), + ); + + self::assertTrue($this->uow->isScheduledForUpsert($user)); + self::assertNotEquals([], $this->uow->getDocumentChangeSet($user)); + } + + public function testUpdateErrorKeepsFailingUpdate(): void + { + $user = new ForumUser(); + $user->username = 'alcaeus'; + $this->uow->persist($user); + $this->uow->commit(); + + $user->username = 'jmikola'; + + // Make sure update command fails once + $this->createFailpoint('update'); + + try { + $this->uow->commit(); + self::fail('Expected exception when committing'); + } catch (Throwable) { + } + + // The update is kept, user data is not changed + self::assertSame( + 1, + $this->dm->getDocumentCollection(ForumUser::class)->countDocuments(['username' => 'alcaeus']), + ); + + self::assertTrue($this->uow->isScheduledForUpdate($user)); + self::assertNotEquals([], $this->uow->getDocumentChangeSet($user)); + } + + public function testUpdateErrorWithNewEmbeddedDocumentKeepsFailingChangeset(): void + { + $user = new User(); + $user->setUsername('alcaeus'); + + $this->uow->persist($user); + $this->uow->commit(); + + $address = new Address(); + $address->setCity('Olching'); + $user->setAddress($address); + + $this->createFailpoint('update'); + + try { + $this->uow->commit(); + self::fail('Expected exception when committing'); + } catch (Throwable) { + } + + $this->assertTrue($this->uow->isScheduledForUpdate($user)); + $this->assertNotEquals([], $this->uow->getDocumentChangeSet($user)); + $this->assertTrue($this->uow->isScheduledForInsert($address)); + $this->assertNotEquals([], $this->uow->getDocumentChangeSet($address)); + } + + public function testUpdateWithNewEmbeddedDocumentClearsChangesets(): void + { + $user = new User(); + $user->setUsername('alcaeus'); + + $this->uow->persist($user); + $this->uow->commit(); + + $address = new Address(); + $address->setCity('Olching'); + $user->setAddress($address); + + $this->uow->commit(); + + $this->assertFalse($this->uow->isScheduledForUpdate($user)); + $this->assertEquals([], $this->uow->getDocumentChangeSet($user)); + $this->assertFalse($this->uow->isScheduledForInsert($address)); + $this->assertEquals([], $this->uow->getDocumentChangeSet($address)); + } + + public function testUpdateErrorWithEmbeddedDocumentKeepsFailingChangeset(): void + { + $address = new Address(); + $address->setCity('Olching'); + + $user = new User(); + $user->setUsername('alcaeus'); + $user->setAddress($address); + + $this->uow->persist($user); + $this->uow->commit(); + + $address->setCity('Munich'); + + $this->createFailpoint('update'); + + try { + $this->uow->commit(); + self::fail('Expected exception when committing'); + } catch (Throwable) { + } + + $this->assertTrue($this->uow->isScheduledForUpdate($user)); + $this->assertNotEquals([], $this->uow->getDocumentChangeSet($user)); + $this->assertTrue($this->uow->isScheduledForUpdate($address)); + $this->assertNotEquals([], $this->uow->getDocumentChangeSet($address)); + } + + public function testUpdateWithEmbeddedDocumentClearsChangesets(): void + { + $address = new Address(); + $address->setCity('Olching'); + + $user = new User(); + $user->setUsername('alcaeus'); + $user->setAddress($address); + + $this->uow->persist($user); + $this->uow->commit(); + + $address->setCity('Munich'); + + $this->uow->commit(); + + $this->assertFalse($this->uow->isScheduledForUpdate($user)); + $this->assertEquals([], $this->uow->getDocumentChangeSet($user)); + $this->assertFalse($this->uow->isScheduledForUpdate($address)); + $this->assertEquals([], $this->uow->getDocumentChangeSet($address)); + } + + public function testUpdateErrorWithRemovedEmbeddedDocumentKeepsFailingChangeset(): void + { + $address = new Address(); + $address->setCity('Olching'); + + $user = new User(); + $user->setUsername('alcaeus'); + $user->setAddress($address); + + $this->uow->persist($user); + $this->uow->commit(); + + $user->removeAddress(); + + $this->createFailpoint('update'); + + try { + $this->uow->commit(); + self::fail('Expected exception when committing'); + } catch (Throwable) { + } + + $this->assertTrue($this->uow->isScheduledForUpdate($user)); + $this->assertNotEquals([], $this->uow->getDocumentChangeSet($user)); + $this->assertTrue($this->uow->isScheduledForDelete($address)); + + // As $address is orphaned after changeset computation, it is removed from the identity map + $this->assertFalse($this->uow->isInIdentityMap($address)); + } + + public function testUpdateWithRemovedEmbeddedDocumentClearsChangesets(): void + { + $address = new Address(); + $address->setCity('Olching'); + + $user = new User(); + $user->setUsername('alcaeus'); + $user->setAddress($address); + + $this->uow->persist($user); + $this->uow->commit(); + + $user->removeAddress(); + + $this->uow->commit(); + + $this->assertFalse($this->uow->isScheduledForUpdate($user)); + $this->assertEquals([], $this->uow->getDocumentChangeSet($user)); + $this->assertFalse($this->uow->isScheduledForDelete($address)); + $this->assertFalse($this->uow->isInIdentityMap($address)); + } + + public function testDeleteErrorKeepsFailingDelete(): void + { + $user = new ForumUser(); + $user->username = 'alcaeus'; + $this->uow->persist($user); + $this->uow->commit(); + + $this->uow->remove($user); + + // Make sure delete command fails once + $this->createFailpoint('delete'); + + try { + $this->uow->commit(); + self::fail('Expected exception when committing'); + } catch (Throwable) { + } + + // The document still exists, the deletion is still scheduled + self::assertSame( + 1, + $this->dm->getDocumentCollection(ForumUser::class)->countDocuments(['username' => 'alcaeus']), + ); + + self::assertTrue($this->uow->isScheduledForDelete($user)); + } + + public function testDeleteErrorWithEmbeddedDocumentKeepsChangeset(): void + { + $address = new Address(); + $address->setCity('Olching'); + + $user = new User(); + $user->setUsername('alcaeus'); + $user->setAddress($address); + + $this->uow->persist($user); + $this->uow->commit(); + + $this->uow->remove($user); + + // Make sure delete command fails once + $this->createFailpoint('delete'); + + try { + $this->uow->commit(); + self::fail('Expected exception when committing'); + } catch (Throwable) { + } + + // The document still exists, the deletion is still scheduled + self::assertSame( + 1, + $this->dm->getDocumentCollection(User::class)->countDocuments(['username' => 'alcaeus']), + ); + + self::assertTrue($this->uow->isScheduledForDelete($user)); + self::assertTrue($this->uow->isScheduledForDelete($address)); + } + + public function testDeleteWithEmbeddedDocumentClearsChangeset(): void + { + $address = new Address(); + $address->setCity('Olching'); + + $user = new User(); + $user->setUsername('alcaeus'); + $user->setAddress($address); + + $this->uow->persist($user); + $this->uow->commit(); + + $this->uow->remove($user); + + $this->uow->commit(); + + self::assertSame( + 0, + $this->dm->getDocumentCollection(User::class)->countDocuments(['username' => 'alcaeus']), + ); + + self::assertFalse($this->uow->isScheduledForDelete($user)); + self::assertFalse($this->uow->isScheduledForDelete($address)); + } + + /** Create a document manager with a single host to ensure failpoints target the correct server */ + protected static function createTestDocumentManager(): DocumentManager + { + $config = static::getConfiguration(); + $client = new Client(self::getUri(false), [], ['typeMap' => ['root' => 'array', 'document' => 'array']]); + + return DocumentManager::create($client, $config); + } + + private function createFailpoint(string $commandName): void + { + $this->dm->getClient()->selectDatabase('admin')->command([ + 'configureFailPoint' => 'failCommand', + 'mode' => ['times' => 1], + 'data' => [ + 'errorCode' => 192, // FailPointEnabled + 'failCommands' => [$commandName], + ], + ]); + } +} diff --git a/tests/Doctrine/ODM/MongoDB/Tests/UnitOfWorkTest.php b/tests/Doctrine/ODM/MongoDB/Tests/UnitOfWorkTest.php index 61a529e06c..c3a22b308c 100644 --- a/tests/Doctrine/ODM/MongoDB/Tests/UnitOfWorkTest.php +++ b/tests/Doctrine/ODM/MongoDB/Tests/UnitOfWorkTest.php @@ -8,6 +8,7 @@ use DateTime; use Doctrine\Common\Collections\ArrayCollection; use Doctrine\Common\Collections\Collection; +use Doctrine\ODM\MongoDB\APM\CommandLogger; use Doctrine\ODM\MongoDB\Mapping\Annotations as ODM; use Doctrine\ODM\MongoDB\Mapping\ClassMetadata; use Doctrine\ODM\MongoDB\MongoDBException; @@ -24,11 +25,15 @@ use Documents\Functional\NotSaved; use Documents\User; use MongoDB\BSON\ObjectId; +use MongoDB\Collection as MongoDBCollection; +use MongoDB\Driver\WriteConcern; use PHPUnit\Framework\Attributes\DataProvider; use PHPUnit\Framework\Attributes\DoesNotPerformAssertions; use ProxyManager\Proxy\GhostObjectInterface; +use ReflectionProperty; use Throwable; +use function end; use function spl_object_hash; use function sprintf; @@ -548,6 +553,57 @@ public function testCommitsInProgressIsUpdatedOnException(): void $this->fail('This should never be reached, an exception should have been thrown.'); } + + public function testTransactionalCommitOmitsWriteConcernInOperation(): void + { + $this->skipTestIfNoTransactionSupport(); + + // Force transaction config to be enabled + $this->dm->getConfiguration()->setUseTransactionalFlush(true); + + $collection = $this->createMock(MongoDBCollection::class); + $collection->expects($this->once()) + ->method('insertMany') + ->with($this->isType('array'), $this->logicalNot($this->arrayHasKey('writeConcern'))); + + $documentPersister = $this->uow->getDocumentPersister(ForumUser::class); + + $reflectionProperty = new ReflectionProperty($documentPersister, 'collection'); + $reflectionProperty->setAccessible(true); + $reflectionProperty->setValue($documentPersister, $collection); + + $user = new ForumUser(); + $user->username = '12345'; + $this->uow->persist($user); + + $this->uow->commit(['writeConcern' => new WriteConcern(1)]); + } + + public function testTransactionalCommitUsesWriteConcernInCommitCommand(): void + { + $this->skipTestIfNoTransactionSupport(); + + // Force transaction config to be enabled + $this->dm->getConfiguration()->setUseTransactionalFlush(true); + + $user = new ForumUser(); + $user->username = '12345'; + $this->uow->persist($user); + + $logger = new CommandLogger(); + $logger->register(); + + $this->uow->commit(['writeConcern' => new WriteConcern('majority')]); + + $logger->unregister(); + + $commands = $logger->getAll(); + $commitCommand = end($commands); + + $this->assertSame('commitTransaction', $commitCommand->getCommandName()); + $this->assertObjectHasProperty('writeConcern', $commitCommand->getCommand()); + $this->assertEquals((object) ['w' => 'majority'], $commitCommand->getCommand()->writeConcern); + } } class ParentAssociationTest diff --git a/tests/Doctrine/ODM/MongoDB/Tests/UnitOfWorkTransactionalCommitConsistencyTest.php b/tests/Doctrine/ODM/MongoDB/Tests/UnitOfWorkTransactionalCommitConsistencyTest.php new file mode 100644 index 0000000000..6a3a3ae270 --- /dev/null +++ b/tests/Doctrine/ODM/MongoDB/Tests/UnitOfWorkTransactionalCommitConsistencyTest.php @@ -0,0 +1,605 @@ +skipTestIfNoTransactionSupport(); + } + + public function tearDown(): void + { + $this->dm->getClient()->selectDatabase('admin')->command([ + 'configureFailPoint' => 'failCommand', + 'mode' => 'off', + ]); + + parent::tearDown(); + } + + public function testFatalInsertError(): void + { + $firstUser = new ForumUser(); + $firstUser->username = 'alcaeus'; + $this->uow->persist($firstUser); + + $secondUser = new ForumUser(); + $secondUser->username = 'jmikola'; + $this->uow->persist($secondUser); + + $friendUser = new FriendUser('GromNaN'); + $this->uow->persist($friendUser); + + $this->createFatalFailPoint('insert'); + + try { + $this->uow->commit(); + self::fail('Expected exception when committing'); + } catch (Throwable $e) { + self::assertInstanceOf(BulkWriteException::class, $e); + self::assertSame(192, $e->getCode()); + } + + self::assertSame( + 0, + $this->dm->getDocumentCollection(ForumUser::class)->countDocuments(), + ); + + self::assertSame( + 0, + $this->dm->getDocumentCollection(FriendUser::class)->countDocuments(), + ); + + self::assertTrue($this->uow->isScheduledForInsert($firstUser)); + self::assertNotEquals([], $this->uow->getDocumentChangeSet($firstUser)); + + self::assertTrue($this->uow->isScheduledForInsert($secondUser)); + self::assertNotEquals([], $this->uow->getDocumentChangeSet($secondUser)); + + self::assertTrue($this->uow->isScheduledForInsert($friendUser)); + self::assertNotEquals([], $this->uow->getDocumentChangeSet($friendUser)); + } + + public function testTransientInsertError(): void + { + $firstUser = new ForumUser(); + $firstUser->username = 'alcaeus'; + $this->uow->persist($firstUser); + + $secondUser = new ForumUser(); + $secondUser->username = 'jmikola'; + $this->uow->persist($secondUser); + + $friendUser = new FriendUser('GromNaN'); + $this->uow->persist($friendUser); + + // Add a failpoint that triggers a transient error. The transaction will be retried and succeeds + $this->createTransientFailPoint('insert'); + + $this->uow->commit(); + + self::assertSame( + 2, + $this->dm->getDocumentCollection(ForumUser::class)->countDocuments(), + ); + + self::assertSame( + 1, + $this->dm->getDocumentCollection(FriendUser::class)->countDocuments(), + ); + + self::assertFalse($this->uow->isScheduledForInsert($firstUser)); + self::assertEquals([], $this->uow->getDocumentChangeSet($firstUser)); + + self::assertFalse($this->uow->isScheduledForInsert($secondUser)); + self::assertEquals([], $this->uow->getDocumentChangeSet($secondUser)); + + self::assertFalse($this->uow->isScheduledForInsert($friendUser)); + self::assertEquals([], $this->uow->getDocumentChangeSet($friendUser)); + } + + public function testMultipleTransientErrors(): void + { + $firstUser = new ForumUser(); + $firstUser->username = 'alcaeus'; + $this->uow->persist($firstUser); + + $secondUser = new ForumUser(); + $secondUser->username = 'jmikola'; + $this->uow->persist($secondUser); + + $friendUser = new FriendUser('GromNaN'); + $this->uow->persist($friendUser); + + // Add a failpoint that triggers multiple transient errors. The transaction is expected to fail + $this->createTransientFailPoint('insert', 2); + + try { + $this->uow->commit(); + self::fail('Expected exception when committing'); + } catch (Throwable $e) { + self::assertInstanceOf(BulkWriteException::class, $e); + self::assertSame(192, $e->getCode()); + } + + self::assertSame( + 0, + $this->dm->getDocumentCollection(ForumUser::class)->countDocuments(), + ); + + self::assertSame( + 0, + $this->dm->getDocumentCollection(FriendUser::class)->countDocuments(), + ); + + self::assertTrue($this->uow->isScheduledForInsert($firstUser)); + self::assertNotEquals([], $this->uow->getDocumentChangeSet($firstUser)); + + self::assertTrue($this->uow->isScheduledForInsert($secondUser)); + self::assertNotEquals([], $this->uow->getDocumentChangeSet($secondUser)); + + self::assertTrue($this->uow->isScheduledForInsert($friendUser)); + self::assertNotEquals([], $this->uow->getDocumentChangeSet($friendUser)); + } + + public function testDuplicateKeyError(): void + { + // Create a unique index on the collection to let the second insert fail + $collection = $this->dm->getDocumentCollection(ForumUser::class); + $collection->createIndex(['username' => 1], ['unique' => true]); + + $firstUser = new ForumUser(); + $firstUser->username = 'alcaeus'; + $this->uow->persist($firstUser); + + $secondUser = new ForumUser(); + $secondUser->username = 'alcaeus'; + $this->uow->persist($secondUser); + + $thirdUser = new ForumUser(); + $thirdUser->username = 'jmikola'; + $this->uow->persist($thirdUser); + + try { + $this->uow->commit(); + self::fail('Expected exception when committing'); + } catch (Throwable $e) { + self::assertInstanceOf(BulkWriteException::class, $e); + self::assertSame(11000, $e->getCode()); // Duplicate key + } + + // No users inserted + self::assertSame( + 0, + $this->dm->getDocumentCollection(ForumUser::class)->countDocuments(), + ); + + self::assertTrue($this->uow->isScheduledForInsert($firstUser)); + self::assertNotEquals([], $this->uow->getDocumentChangeSet($firstUser)); + + self::assertTrue($this->uow->isScheduledForInsert($secondUser)); + self::assertNotEquals([], $this->uow->getDocumentChangeSet($secondUser)); + + self::assertTrue($this->uow->isScheduledForInsert($thirdUser)); + self::assertNotEquals([], $this->uow->getDocumentChangeSet($thirdUser)); + } + + public function testFatalInsertErrorWithEmbeddedDocument(): void + { + // Create a unique index on the collection to let the second insert fail + $collection = $this->dm->getDocumentCollection(User::class); + $collection->createIndex(['username' => 1], ['unique' => true]); + + $firstAddress = new Address(); + $firstAddress->setCity('Olching'); + $firstUser = new User(); + $firstUser->setUsername('alcaeus'); + $firstUser->setAddress($firstAddress); + + $secondAddress = new Address(); + $secondAddress->setCity('Olching'); + $secondUser = new User(); + $secondUser->setUsername('alcaeus'); + $secondUser->setAddress($secondAddress); + + $this->uow->persist($firstUser); + $this->uow->persist($secondUser); + + try { + $this->uow->commit(); + self::fail('Expected exception when committing'); + } catch (Throwable $e) { + self::assertInstanceOf(BulkWriteException::class, $e); + self::assertSame(11000, $e->getCode()); + } + + self::assertSame(0, $collection->countDocuments()); + + $this->assertTrue($this->uow->isScheduledForInsert($firstUser)); + $this->assertNotEquals([], $this->uow->getDocumentChangeSet($firstUser)); + $this->assertTrue($this->uow->isScheduledForInsert($firstAddress)); + $this->assertNotEquals([], $this->uow->getDocumentChangeSet($firstAddress)); + + $this->assertTrue($this->uow->isScheduledForInsert($secondUser)); + $this->assertNotEquals([], $this->uow->getDocumentChangeSet($secondUser)); + $this->assertTrue($this->uow->isScheduledForInsert($secondAddress)); + $this->assertNotEquals([], $this->uow->getDocumentChangeSet($secondAddress)); + } + + public function testFatalUpsertError(): void + { + $user = new ForumUser(); + $user->id = new ObjectId(); // Specifying an identifier makes this an upsert + $user->username = 'alcaeus'; + $this->uow->persist($user); + + $this->createFatalFailPoint('update'); + + try { + $this->uow->commit(); + self::fail('Expected exception when committing'); + } catch (Throwable $e) { + self::assertInstanceOf(BulkWriteException::class, $e); + self::assertSame(192, $e->getCode()); + } + + // No document was inserted + self::assertSame( + 0, + $this->dm->getDocumentCollection(ForumUser::class)->countDocuments(), + ); + + self::assertTrue($this->uow->isScheduledForUpsert($user)); + self::assertNotEquals([], $this->uow->getDocumentChangeSet($user)); + } + + public function testTransientUpsertError(): void + { + $user = new ForumUser(); + $user->id = new ObjectId(); // Specifying an identifier makes this an upsert + $user->username = 'alcaeus'; + $this->uow->persist($user); + + $this->createTransientFailPoint('update'); + + $this->uow->commit(); + + self::assertSame( + 1, + $this->dm->getDocumentCollection(ForumUser::class)->countDocuments(), + ); + + self::assertFalse($this->uow->isScheduledForUpsert($user)); + self::assertEquals([], $this->uow->getDocumentChangeSet($user)); + } + + public function testFatalUpdateError(): void + { + $user = new ForumUser(); + $user->username = 'alcaeus'; + $this->uow->persist($user); + $this->uow->commit(); + + $user->username = 'jmikola'; + + $this->createFatalFailPoint('update'); + + try { + $this->uow->commit(); + self::fail('Expected exception when committing'); + } catch (Throwable $e) { + self::assertInstanceOf(BulkWriteException::class, $e); + self::assertSame(192, $e->getCode()); + } + + self::assertSame( + 1, + $this->dm->getDocumentCollection(ForumUser::class)->countDocuments(['username' => 'alcaeus']), + ); + + self::assertTrue($this->uow->isScheduledForUpdate($user)); + self::assertNotEquals([], $this->uow->getDocumentChangeSet($user)); + } + + public function testTransientUpdateError(): void + { + $user = new ForumUser(); + $user->username = 'alcaeus'; + $this->uow->persist($user); + $this->uow->commit(); + + $user->username = 'jmikola'; + + $this->createTransientFailPoint('update'); + + $this->uow->commit(); + + self::assertSame( + 1, + $this->dm->getDocumentCollection(ForumUser::class)->countDocuments(['username' => 'jmikola']), + ); + + self::assertFalse($this->uow->isScheduledForUpdate($user)); + self::assertEquals([], $this->uow->getDocumentChangeSet($user)); + } + + public function testFatalUpdateErrorWithNewEmbeddedDocument(): void + { + $user = new User(); + $user->setUsername('alcaeus'); + + $this->uow->persist($user); + $this->uow->commit(); + + $address = new Address(); + $address->setCity('Olching'); + $user->setAddress($address); + + $this->createFatalFailPoint('update'); + + try { + $this->uow->commit(); + self::fail('Expected exception when committing'); + } catch (Throwable $e) { + self::assertInstanceOf(BulkWriteException::class, $e); + self::assertSame(192, $e->getCode()); + } + + $this->assertTrue($this->uow->isScheduledForUpdate($user)); + $this->assertNotEquals([], $this->uow->getDocumentChangeSet($user)); + $this->assertTrue($this->uow->isScheduledForInsert($address)); + $this->assertNotEquals([], $this->uow->getDocumentChangeSet($address)); + } + + public function testTransientUpdateErrorWithNewEmbeddedDocument(): void + { + $user = new User(); + $user->setUsername('alcaeus'); + + $this->uow->persist($user); + $this->uow->commit(); + + $address = new Address(); + $address->setCity('Olching'); + $user->setAddress($address); + + $this->createTransientFailPoint('update'); + + $this->uow->commit(); + + $this->assertFalse($this->uow->isScheduledForUpdate($user)); + $this->assertEquals([], $this->uow->getDocumentChangeSet($user)); + $this->assertFalse($this->uow->isScheduledForInsert($address)); + $this->assertEquals([], $this->uow->getDocumentChangeSet($address)); + } + + public function testFatalUpdateErrorOfEmbeddedDocument(): void + { + $address = new Address(); + $address->setCity('Olching'); + + $user = new User(); + $user->setUsername('alcaeus'); + $user->setAddress($address); + + $this->uow->persist($user); + $this->uow->commit(); + + $address->setCity('Munich'); + + $this->createFatalFailPoint('update'); + + try { + $this->uow->commit(); + self::fail('Expected exception when committing'); + } catch (Throwable $e) { + self::assertInstanceOf(BulkWriteException::class, $e); + self::assertSame(192, $e->getCode()); + } + + $this->assertTrue($this->uow->isScheduledForUpdate($user)); + $this->assertNotEquals([], $this->uow->getDocumentChangeSet($user)); + $this->assertTrue($this->uow->isScheduledForUpdate($address)); + $this->assertNotEquals([], $this->uow->getDocumentChangeSet($address)); + } + + public function testTransientUpdateErrorOfEmbeddedDocument(): void + { + $address = new Address(); + $address->setCity('Olching'); + + $user = new User(); + $user->setUsername('alcaeus'); + $user->setAddress($address); + + $this->uow->persist($user); + $this->uow->commit(); + + $address->setCity('Munich'); + + $this->createTransientFailPoint('update'); + + $this->uow->commit(); + + $this->assertFalse($this->uow->isScheduledForUpdate($user)); + $this->assertEquals([], $this->uow->getDocumentChangeSet($user)); + $this->assertFalse($this->uow->isScheduledForUpdate($address)); + $this->assertEquals([], $this->uow->getDocumentChangeSet($address)); + } + + public function testFatalUpdateErrorWithRemovedEmbeddedDocument(): void + { + $address = new Address(); + $address->setCity('Olching'); + + $user = new User(); + $user->setUsername('alcaeus'); + $user->setAddress($address); + + $this->uow->persist($user); + $this->uow->commit(); + + $user->removeAddress(); + + $this->createFatalFailPoint('update'); + + try { + $this->uow->commit(); + self::fail('Expected exception when committing'); + } catch (Throwable $e) { + self::assertInstanceOf(BulkWriteException::class, $e); + self::assertSame(192, $e->getCode()); + } + + $this->assertTrue($this->uow->isScheduledForUpdate($user)); + $this->assertNotEquals([], $this->uow->getDocumentChangeSet($user)); + $this->assertTrue($this->uow->isScheduledForDelete($address)); + + // As $address is orphaned after changeset computation, it is removed from the identity map + $this->assertFalse($this->uow->isInIdentityMap($address)); + } + + public function testTransientUpdateErrorWithRemovedEmbeddedDocument(): void + { + $address = new Address(); + $address->setCity('Olching'); + + $user = new User(); + $user->setUsername('alcaeus'); + $user->setAddress($address); + + $this->uow->persist($user); + $this->uow->commit(); + + $user->removeAddress(); + + $this->createTransientFailPoint('update'); + + $this->uow->commit(); + + $this->assertFalse($this->uow->isScheduledForUpdate($user)); + $this->assertEquals([], $this->uow->getDocumentChangeSet($user)); + $this->assertFalse($this->uow->isScheduledForDelete($address)); + $this->assertFalse($this->uow->isInIdentityMap($address)); + } + + public function testFatalDeleteErrorWithEmbeddedDocument(): void + { + $address = new Address(); + $address->setCity('Olching'); + + $user = new User(); + $user->setUsername('alcaeus'); + $user->setAddress($address); + + $this->uow->persist($user); + $this->uow->commit(); + + $this->uow->remove($user); + + $this->createFatalFailPoint('delete'); + + try { + $this->uow->commit(); + self::fail('Expected exception when committing'); + } catch (Throwable $e) { + self::assertInstanceOf(BulkWriteException::class, $e); + self::assertSame(192, $e->getCode()); + } + + // The document still exists, the deletion is still scheduled + self::assertSame( + 1, + $this->dm->getDocumentCollection(User::class)->countDocuments(['username' => 'alcaeus']), + ); + + self::assertTrue($this->uow->isScheduledForDelete($user)); + self::assertTrue($this->uow->isScheduledForDelete($address)); + } + + public function testTransientDeleteErrorWithEmbeddedDocument(): void + { + $address = new Address(); + $address->setCity('Olching'); + + $user = new User(); + $user->setUsername('alcaeus'); + $user->setAddress($address); + + $this->uow->persist($user); + $this->uow->commit(); + + $this->uow->remove($user); + + $this->createTransientFailPoint('delete'); + + $this->uow->commit(); + + self::assertSame( + 0, + $this->dm->getDocumentCollection(User::class)->countDocuments(['username' => 'alcaeus']), + ); + + self::assertFalse($this->uow->isScheduledForDelete($address)); + self::assertFalse($this->uow->isScheduledForDelete($user)); + } + + /** Create a document manager with a single host to ensure failpoints target the correct server */ + protected static function createTestDocumentManager(): DocumentManager + { + $config = static::getConfiguration(); + $client = new Client(self::getUri(false), [], ['typeMap' => ['root' => 'array', 'document' => 'array']]); + + return DocumentManager::create($client, $config); + } + + protected static function getConfiguration(): Configuration + { + $configuration = parent::getConfiguration(); + $configuration->setUseTransactionalFlush(true); + + return $configuration; + } + + private function createTransientFailPoint(string $failCommand, int $times = 1): void + { + $this->dm->getClient()->selectDatabase('admin')->command([ + 'configureFailPoint' => 'failCommand', + 'mode' => ['times' => $times], + 'data' => [ + 'errorCode' => 192, // FailPointEnabled + 'errorLabels' => ['TransientTransactionError'], + 'failCommands' => [$failCommand], + ], + ]); + } + + private function createFatalFailPoint(string $failCommand): void + { + $this->dm->getClient()->selectDatabase('admin')->command([ + 'configureFailPoint' => 'failCommand', + 'mode' => ['times' => 1], + 'data' => [ + 'errorCode' => 192, // FailPointEnabled + 'failCommands' => [$failCommand], + ], + ]); + } +}