Transactions

    交易不会产生额外费用。您只需为作为事务一部分使用的读取和写入 IOs 付费。

    要使用事务功能,您需要满足以下要求:

    • 您必须使用 Amazon DocumentDB 4.0 引擎。

    • 您必须使用与 MongoDB 4.0 或更高版本兼容的驱动程序。

    最佳实践

    下面是一些最佳做法,以便您能充分利用与 Amazon DocumentDB 的事务。

    • 在事务完成后,始终提交或中止事务。使事务保持在不完整的状态会联系数据库资源,并可能导致写冲突。

    • 建议将事务保持在所需的最小命令数。如果您的事务具有多个可拆分为多个较小事务的语句,则建议这样做以降低超时的可能性。始终旨在创建短事务,而不是长时间运行的读取。

    Limitations

    • Amazon DocumentDB 不支持在事务中使用游标。

    • Amazon DocumentDB 无法在事务中创建新集合,并且无法针对非现有集合进行查询/更新。

    • 文档级写入锁定受 1 分钟超时的约束,用户无法配置该超时。

    • 不支持可重试写入、可重试提交和可重试中止。

    • 每个 Amazon DocumentDB 实例对于同时在实例上打开的并发事务的数量都有一个上限。有关限制,请参阅实例限制

    • 对于给定事务,事务日志大小必须小于 32MB。

    • Amazon DocumentDB 在事务中支持 ,但并非所有驱动程序都支持此功能。一种替代方法是使用 countDocuments() API,它将计数查询转换为客户端的聚合查询。

    • 事务具有一分钟执行限制,会话超时为 30 分钟。如果事务超时,则该事务将中止,并且现有事务的会话中发出的任何后续命令将生成以下错误:

    在 Amazon DocumentDB 4.0 中支持事务时,添加了其他 CloudWatch 指标,以帮助您监控事务。

    新 CloudWatch 指标

    • DatabaseTransactions:在一分钟期间内进行的打开事务数。

    • DatabaseTransactionsAborted:在一分钟周期内执行的已中止事务数。

    • DatabaseTransactionsMax:一分钟周期内打开的事务的最大数量。

    • TransactionsAborted:一分钟内在实例上中止的事务数。

    • TransactionsCommitted:一分钟内在实例上提交的事务数。

    • TransactionsOpen:在以一分钟为间隔在实例上打开的事务数。

    • TransactionsOpenMax:在一分钟内在实例上打开的事务的最大数量。

    • TransactionsStarted:在一分钟内在实例上启动的事务数。

    注意

    有关 CloudWatch 的更多 Amazon DocumentDB 指标,请转到 。

    此外,新字段已添加到 currentOp lsidtransactionThreadId 和“idle transaction”和 serverStatus 事务的新状态:currentActivecurrentInactivecurrentOpentotalAbortedtotalCommittedtotalStarted

    事务隔离级别

    在启动事务时,您能够同时指定 readConcernwriteConcern,如以下示例所示:

    mySession.startTransaction({readConcern: {level: 'snapshot'}, writeConcern: {w: 'majority'}});

    对于 readConcern,Amazon DocumentDB 默认情况下支持快照隔离。如果指定了本地、可用或大多数 readConcern,Amazon DocumentDB 会将 readConcern 级别升级到快照。Amazon DocumentDB 不支持可线性的 readConcern,并且指定此类读取问题会导致错误。

    对于 writeConcern,默认情况下,Amazon DocumentDB 支持大多数写入 quorum,并在跨三个 AZs 保留四个数据副本时实现写入 quorum。 如果指定了较低的 writeConcern,Amazon DocumentDB 会将 writeConcern 升级到大多数。此外,会记录所有 Amazon DocumentDB 写入,并且无法禁用日志。

    使用案例

    在本节中,我们将演练事务的两个使用案例:多语句和多集合。

    Amazon DocumentDB 事务是多语句,这意味着您可以编写一个跨多个语句以及显式提交或回滚的事务。您可以将 insertupdateupdatefindAndModify 操作分组为单个原子操作。

    1. // *** Transfer $500 from Alice to Bob inside a transaction: Success Scenario***
    2. // Setup bank account for Alice and Bob. Each have $1000 in their account
    3. var databaseName = "bank";
    4. var collectionName = "account";
    5. var amountToTransfer = 500;
    6. var session = db.getMongo().startSession({causalConsistency: false});
    7. var bankDB = session.getDatabase(databaseName);
    8. var accountColl = bankDB[collectionName];
    9. accountColl.drop();
    10. accountColl.insert({name: "Alice", balance: 1000});
    11. accountColl.insert({name: "Bob", balance: 1000});
    12. session.startTransaction();
    13. // deduct $500 from Alice's account
    14. var aliceBalance = accountColl.find({"name": "Alice"}).next().balance;
    15. var newAliceBalance = aliceBalance - amountToTransfer;
    16. accountColl.update({"name": "Alice"},{"$set": {"balance": newAliceBalance}});
    17. var findAliceBalance = accountColl.find({"name": "Alice"}).next().balance;
    18. // add $500 to Bob's account
    19. var bobBalance = accountColl.find({"name": "Bob"}).next().balance;
    20. var newBobBalance = bobBalance + amountToTransfer;
    21. accountColl.update({"name": "Bob"},{"$set": {"balance": newBobBalance}});
    22. var findBobBalance = accountColl.find({"name": "Bob"}).next().balance;
    23. session.commitTransaction();
    24. accountColl.find();
    25. // *** Transfer $500 from Alice to Bob inside a transaction: Failure Scenario***
    26. // Setup bank account for Alice and Bob. Each have $1000 in their account
    27. var databaseName = "bank";
    28. var collectionName = "account";
    29. var amountToTransfer = 500;
    30. var session = db.getMongo().startSession({causalConsistency: false});
    31. var bankDB = session.getDatabase(databaseName);
    32. var accountColl = bankDB[collectionName];
    33. accountColl.drop();
    34. accountColl.insert({name: "Alice", balance: 1000});
    35. accountColl.insert({name: "Bob", balance: 1000});
    36. session.startTransaction();
    37. // deduct $500 from Alice's account
    38. var aliceBalance = accountColl.find({"name": "Alice"}).next().balance;
    39. var newAliceBalance = aliceBalance - amountToTransfer;
    40. accountColl.update({"name": "Alice"},{"$set": {"balance": newAliceBalance}});
    41. var findAliceBalance = accountColl.find({"name": "Alice"}).next().balance;
    42. session.abortTransaction();

    多集合事务

    我们的事务也是多集合事务,这意味着它们可用于在单个事务内以及跨多个集合执行多个操作。这将提供一致的数据视图并维护数据的完整性。当您将命令提交为单个 <> 时,事务是“要么全有要么全无”执行—,因为前者要么全部成功,要么全部失败。

    下面是使用同一个方案的多收集事务的示例,以及来自多语句事务示例的数据。

    1. // *** Transfer $500 from Alice to Bob inside a transaction: Success Scenario***
    2. // Setup bank account for Alice and Bob. Each have $1000 in their account
    3. var amountToTransfer = 500;
    4. var collectionName = "account";
    5. var session = db.getMongo().startSession({causalConsistency: false});
    6. var accountCollInBankA = session.getDatabase("bankA")[collectionName];
    7. var accountCollInBankB = session.getDatabase("bankB")[collectionName];
    8. accountCollInBankA.drop();
    9. accountCollInBankB.drop();
    10. accountCollInBankA.insert({name: "Alice", balance: 1000});
    11. accountCollInBankB.insert({name: "Bob", balance: 1000});
    12. session.startTransaction();
    13. // deduct $500 from Alice's account
    14. var aliceBalance = accountCollInBankA.find({"name": "Alice"}).next().balance;
    15. var newAliceBalance = aliceBalance - amountToTransfer;
    16. accountCollInBankA.update({"name": "Alice"},{"$set": {"balance": newAliceBalance}});
    17. var findAliceBalance = accountCollInBankA.find({"name": "Alice"}).next().balance;
    18. // add $500 to Bob's account
    19. var bobBalance = accountCollInBankB.find({"name": "Bob"}).next().balance;
    20. var newBobBalance = bobBalance + amountToTransfer;
    21. accountCollInBankB.update({"name": "Bob"},{"$set": {"balance": newBobBalance}});
    22. var findBobBalance = accountCollInBankB.find({"name": "Bob"}).next().balance;
    23. session.commitTransaction();
    24. accountCollInBankA.find(); // Alice holds $500 in bankA
    25. accountCollInBankB.find(); // Bob holds $1500 in bankB
    26. // *** Transfer $500 from Alice to Bob inside a transaction: Failure Scenario***
    27. // Setup bank account for Alice and Bob. Each have $1000 in their account
    28. var collectionName = "account";
    29. var amountToTransfer = 500;
    30. var session = db.getMongo().startSession({causalConsistency: false});
    31. var accountCollInBankA = session.getDatabase("bankA")[collectionName];
    32. var accountCollInBankB = session.getDatabase("bankB")[collectionName];
    33. accountCollInBankA.drop();
    34. accountCollInBankB.drop();
    35. accountCollInBankA.insert({name: "Alice", balance: 1000});
    36. accountCollInBankB.insert({name: "Bob", balance: 1000});
    37. session.startTransaction();
    38. // deduct $500 from Alice's account
    39. var aliceBalance = accountCollInBankA.find({"name": "Alice"}).next().balance;
    40. var newAliceBalance = aliceBalance - amountToTransfer;
    41. accountCollInBankA.update({"name": "Alice"},{"$set": {"balance": newAliceBalance}});
    42. var findAliceBalance = accountCollInBankA.find({"name": "Alice"}).next().balance;
    43. // add $500 to Bob's account
    44. var bobBalance = accountCollInBankB.find({"name": "Bob"}).next().balance;
    45. var newBobBalance = bobBalance + amountToTransfer;
    46. accountCollInBankB.update({"name": "Bob"},{"$set": {"balance": newBobBalance}});
    47. var findBobBalance = accountCollInBankB.find({"name": "Bob"}).next().balance;
    48. session.abortTransaction();
    49. accountCollInBankA.find(); // Alice holds $1000 in bankA
    50. accountCollInBankB.find(); // Bob holds $1000 in bankB

    回调 API 仅适用于 4.2 版以上的驱动程序。

    Javascript

    以下代码演示如何将 Amazon DocumentDB 事务 API 与 Javascript 结合使用。

    1. // *** Transfer $500 from Alice to Bob inside a transaction: Success ***
    2. // Setup bank account for Alice and Bob. Each have $1000 in their account
    3. var databaseName = "bank";
    4. var collectionName = "account";
    5. var amountToTransfer = 500;
    6. var session = db.getMongo().startSession({causalConsistency: false});
    7. var bankDB = session.getDatabase(databaseName);
    8. var accountColl = bankDB[collectionName];
    9. accountColl.drop();
    10. accountColl.insert({name: "Alice", balance: 1000});
    11. accountColl.insert({name: "Bob", balance: 1000});
    12. session.startTransaction();
    13. // deduct $500 from Alice's account
    14. var aliceBalance = accountColl.find({"name": "Alice"}).next().balance;
    15. assert(aliceBalance >= amountToTransfer);
    16. var newAliceBalance = aliceBalance - amountToTransfer;
    17. accountColl.update({"name": "Alice"},{"$set": {"balance": newAliceBalance}});
    18. var findAliceBalance = accountColl.find({"name": "Alice"}).next().balance;
    19. assert.eq(newAliceBalance, findAliceBalance);
    20. // add $500 to Bob's account
    21. var bobBalance = accountColl.find({"name": "Bob"}).next().balance;
    22. var newBobBalance = bobBalance + amountToTransfer;
    23. accountColl.update({"name": "Bob"},{"$set": {"balance": newBobBalance}});
    24. var findBobBalance = accountColl.find({"name": "Bob"}).next().balance;
    25. assert.eq(newBobBalance, findBobBalance);
    26. session.commitTransaction();
    27. accountColl.find();

    Node.js

    以下代码演示了如何将 Amazon DocumentDB 事务 API 与 Node.js 结合使用。

    1. // Node.js callback API:
    2. const bankDB = await mongoclient.db("bank");
    3. var accountColl = await bankDB.createCollection("account");
    4. var amountToTransfer = 500;
    5. const session = mongoclient.startSession({causalConsistency: false});
    6. await accountColl.drop();
    7. await accountColl.insertOne({name: "Alice", balance: 1000}, { session });
    8. await accountColl.insertOne({name: "Bob", balance: 1000}, { session });
    9. const transactionOptions = {
    10. readConcern: { level: 'snapshot' },
    11. writeConcern: { w: 'majority' }
    12. };
    13. // deduct $500 from Alice's account
    14. var aliceBalance = await accountColl.findOne({name: "Alice"}, {session});
    15. assert(aliceBalance.balance >= amountToTransfer);
    16. var newAliceBalance = aliceBalance - amountToTransfer;
    17. session.startTransaction(transactionOptions);
    18. await accountColl.updateOne({name: "Alice"}, {$set: {balance: newAliceBalance}}, {session });
    19. await session.commitTransaction();
    20. aliceBalance = await accountColl.findOne({name: "Alice"}, {session});
    21. assert(newAliceBalance == aliceBalance.balance);
    22. // add $500 to Bob's account
    23. var bobBalance = await accountColl.findOne({name: "Bob"}, {session});
    24. var newBobBalance = bobBalance.balance + amountToTransfer;
    25. session.startTransaction(transactionOptions);
    26. await accountColl.updateOne({name: "Bob"}, {$set: {balance: newBobBalance}}, {session });
    27. await session.commitTransaction();
    28. bobBalance = await accountColl.findOne({name: "Bob"}, {session});
    29. assert(newBobBalance == bobBalance.balance);

    C#

    以下代码演示了如何将 Amazon DocumentDB 事务 API 与 C# 结合使用。

    1. // C# Callback API
    2. var dbName = "bank";
    3. var collName = "account";
    4. var amountToTransfer = 500;
    5. using (var session = client.StartSession(new ClientSessionOptions{CausalConsistency = false}))
    6. {
    7. var bankDB = client.GetDatabase(dbName);
    8. var accountColl = bankDB.GetCollection<BsonDocument>(collName);
    9. bankDB.DropCollection(collName);
    10. accountColl.InsertOne(session, new BsonDocument { {"name", "Alice"}, {"balance", 1000 } });
    11. accountColl.InsertOne(session, new BsonDocument { {"name", "Bob"}, {"balance", 1000 } });
    12. // start transaction
    13. var transactionOptions = new TransactionOptions(
    14. readConcern: ReadConcern.Snapshot,
    15. writeConcern: WriteConcern.WMajority);
    16. var result = session.WithTransaction(
    17. (sess, cancellationtoken) =>
    18. {
    19. // deduct $500 from Alice's account
    20. var aliceBalance = accountColl.Find(sess, Builders<BsonDocument>.Filter.Eq("name", "Alice")).FirstOrDefault().GetValue("balance");
    21. Debug.Assert(aliceBalance >= amountToTransfer);
    22. var newAliceBalance = aliceBalance.AsInt32 - amountToTransfer;
    23. accountColl.UpdateOne(sess, Builders<BsonDocument>.Filter.Eq("name", "Alice"),
    24. Builders<BsonDocument>.Update.Set("balance", newAliceBalance));
    25. aliceBalance = accountColl.Find(sess, Builders<BsonDocument>.Filter.Eq("name", "Alice")).FirstOrDefault().GetValue("balance");
    26. Debug.Assert(aliceBalance == newAliceBalance);
    27. // add $500 from Bob's account
    28. var bobBalance = accountColl.Find(sess, Builders<BsonDocument>.Filter.Eq("name", "Bob")).FirstOrDefault().GetValue("balance");
    29. var newBobBalance = bobBalance.AsInt32 + amountToTransfer;
    30. accountColl.UpdateOne(sess, Builders<BsonDocument>.Filter.Eq("name", "Bob"),
    31. Builders<BsonDocument>.Update.Set("balance", newBobBalance));
    32. bobBalance = accountColl.Find(sess, Builders<BsonDocument>.Filter.Eq("name", "Bob")).FirstOrDefault().GetValue("balance");
    33. Debug.Assert(bobBalance == newBobBalance);
    34. return "Transaction committed";
    35. }, transactionOptions);
    36. // check values outside of transaction
    37. var aliceNewBalance = accountColl.Find(Builders<BsonDocument>.Filter.Eq("name", "Alice")).FirstOrDefault().GetValue("balance");
    38. var bobNewBalance = accountColl.Find(Builders<BsonDocument>.Filter.Eq("name", "Bob")).FirstOrDefault().GetValue("balance");
    39. Debug.Assert(aliceNewBalance == 500);
    40. Debug.Assert(bobNewBalance == 1500);
    41. }

    Ruby

    以下代码演示了如何将 Amazon DocumentDB 事务 API 与 Ruby 结合使用。

    1. // Ruby Callback API
    2. dbName = "bank"
    3. collName = "account"
    4. amountToTransfer = 500
    5. session = client.start_session(:causal_consistency=> false)
    6. bankDB = Mongo::Database.new(client, dbName)
    7. accountColl = bankDB[collName]
    8. accountColl.drop()
    9. accountColl.insert_one({"name"=>"Alice", "balance"=>1000})
    10. accountColl.insert_one({"name"=>"Bob", "balance"=>1000})
    11. # start transaction
    12. session.with_transaction(read_concern: {level: :snapshot}, write_concern: {w: :majority}) do
    13. # deduct $500 from Alice's account
    14. aliceBalance = accountColl.find({"name"=>"Alice"}, :session=> session).first['balance']
    15. assert aliceBalance >= amountToTransfer
    16. newAliceBalance = aliceBalance - amountToTransfer
    17. accountColl.update_one({"name"=>"Alice"}, { "$set" => {"balance"=>newAliceBalance} }, :session=> session)
    18. aliceBalance = accountColl.find({"name"=>>"Alice"}, :session=> session).first['balance']
    19. assert_equal(newAliceBalance, aliceBalance)
    20. # add $500 from Bob's account
    21. bobBalance = accountColl.find({"name"=>"Bob"}, :session=> session).first['balance']
    22. newBobBalance = bobBalance + amountToTransfer
    23. accountColl.update_one({"name"=>"Bob"}, { "$set" => {"balance"=>newBobBalance} }, :session=> session)
    24. bobBalance = accountColl.find({"name"=>"Bob"}, :session=> session).first['balance']
    25. assert_equal(newBobBalance, bobBalance)
    26. end
    27. # check results outside of transaction
    28. aliceBalance = accountColl.find({"name"=>"Alice"}).first['balance']
    29. bobBalance = accountColl.find({"name"=>"Bob"}).first['balance']
    30. assert_equal(aliceBalance, 500)
    31. assert_equal(bobBalance, 1500)
    32. session.end_session

    Go

    以下代码演示了如何将 Amazon DocumentDB 事务 API 与 Go 结合使用。

    Java

    以下代码演示如何将 Amazon DocumentDB 事务 API 与 Java 结合使用。

    1. // Java (sync) - Callback API
    2. MongoDatabase bankDB = mongoClient.getDatabase("bank");
    3. MongoCollection accountColl = bankDB.getCollection("account");
    4. accountColl.drop();
    5. int amountToTransfer = 500;
    6. // add sample data
    7. accountColl.insertOne(new Document("name", "Alice").append("balance", 1000));
    8. accountColl.insertOne(new Document("name", "Bob").append("balance", 1000));
    9. TransactionOptions txnOptions = TransactionOptions.builder()
    10. .readConcern(ReadConcern.SNAPSHOT)
    11. .writeConcern(WriteConcern.MAJORITY)
    12. .build();
    13. ClientSessionOptions sessionOptions = ClientSessionOptions.builder().causallyConsistent(false).build();
    14. try ( ClientSession clientSession = mongoClient.startSession(sessionOptions) ) {
    15. clientSession.withTransaction(new TransactionBody<Void>() {
    16. @Override
    17. public Void execute() {
    18. // deduct $500 from Alice's account
    19. List<Document> documentList = new ArrayList<>();
    20. accountColl.find(clientSession, new Document("name", "Alice")).into(documentList);
    21. int aliceBalance = (int) documentList.get(0).get("balance");
    22. int newAliceBalance = aliceBalance - amountToTransfer;
    23. accountColl.updateOne(clientSession, new Document("name", "Alice"), new Document("$set", new Document("balance", newAliceBalance)));
    24. documentList = new ArrayList<>();
    25. accountColl.find(clientSession, new Document("name", "Alice")).into(documentList);
    26. int updatedBalance = (int) documentList.get(0).get("balance");
    27. Assert.assertEquals(updatedBalance, newAliceBalance);
    28. // add $500 to Bob's account
    29. documentList = new ArrayList<>();
    30. accountColl.find(clientSession, new Document("name", "Bob")).into(documentList);
    31. int newBobBalance = bobBalance + amountToTransfer;
    32. accountColl.updateOne(clientSession, new Document("name", "Bob"), new Document("$set", new Document("balance", newBobBalance)));
    33. // check Bob's new balance
    34. documentList = new ArrayList<>();
    35. accountColl.find(clientSession, new Document("name", "Bob")).into(documentList);
    36. updatedBalance = (int) documentList.get(0).get("balance");
    37. Assert.assertEquals(updatedBalance, newBobBalance);
    38. return null;
    39. }
    40. }, txnOptions);
    41. }

    C

    以下代码演示了如何将 Amazon DocumentDB 事务 API 与 C 结合使用。

    1. // Sample Code for C with Callback
    2. #include <bson.h>
    3. #include <mongoc.h>
    4. #include <stdio.h>
    5. #include <string.h>
    6. #include <assert.h>
    7. typedef struct {
    8. int64_t balance;
    9. bson_t *account;
    10. bson_t *opts;
    11. mongoc_collection_t *collection;
    12. } ctx_t;
    13. bool callback_session (mongoc_client_session_t *session, void *ctx, bson_t **reply, bson_error_t *error)
    14. {
    15. bool r = true;
    16. ctx_t *data = (ctx_t *) ctx;
    17. bson_t local_reply;
    18. bson_t *selector = data->account;
    19. bson_t *update = BCON_NEW ("$set", "{", "balance", BCON_INT64 (data->balance), "}");
    20. mongoc_collection_update_one (data->collection, selector, update, data->opts, &local_reply, error);
    21. *reply = bson_copy (&local_reply);
    22. bson_destroy (&local_reply);
    23. bson_destroy (update);
    24. return r;
    25. }
    26. void test_callback_money_transfer(mongoc_client_t* client, mongoc_collection_t* collection, int amount_to_transfer){
    27. bson_t reply;
    28. bool r = true;
    29. const bson_t *doc;
    30. bson_iter_t iter;
    31. ctx_t alice_ctx;
    32. ctx_t bob_ctx;
    33. bson_error_t error;
    34. // find query
    35. bson_t *alice_query = bson_new ();
    36. BSON_APPEND_UTF8(alice_query, "name", "Alice");
    37. bson_t *bob_query = bson_new ();
    38. BSON_APPEND_UTF8(bob_query, "name", "Bob");
    39. // create session
    40. // set causal consistency to false
    41. mongoc_session_opt_t *session_opts = mongoc_session_opts_new ();
    42. mongoc_session_opts_set_causal_consistency (session_opts, false);
    43. // start the session
    44. mongoc_client_session_t *client_session = mongoc_client_start_session (client, session_opts, &error);
    45. // add session to options
    46. bson_t *opts = bson_new();
    47. mongoc_client_session_append (client_session, opts, &error);
    48. // deduct 500 from Alice
    49. // find account balance of Alice
    50. mongoc_cursor_t *cursor = mongoc_collection_find_with_opts (collection, alice_query, NULL, NULL);
    51. mongoc_cursor_next (cursor, &doc);
    52. bson_iter_init (&iter, doc);
    53. bson_iter_find (&iter, "balance");
    54. int64_t alice_balance = (bson_iter_value (&iter))->value.v_int64;
    55. assert(alice_balance >= amount_to_transfer);
    56. int64_t new_alice_balance = alice_balance - amount_to_transfer;
    57. // set variables which will be used by callback function
    58. alice_ctx.collection = collection;
    59. alice_ctx.opts = opts;
    60. alice_ctx.balance = new_alice_balance;
    61. alice_ctx.account = alice_query;
    62. // callback
    63. r = mongoc_client_session_with_transaction (client_session, &callback_session, NULL, &alice_ctx, &reply, &error);
    64. assert(r);
    65. // find account balance of Alice after transaction
    66. cursor = mongoc_collection_find_with_opts (collection, alice_query, NULL, NULL);
    67. mongoc_cursor_next (cursor, &doc);
    68. bson_iter_init (&iter, doc);
    69. bson_iter_find (&iter, "balance");
    70. alice_balance = (bson_iter_value (&iter))->value.v_int64;
    71. assert(alice_balance == new_alice_balance);
    72. assert(alice_balance == 500);
    73. // add 500 to bob's balance
    74. // find account balance of Bob
    75. cursor = mongoc_collection_find_with_opts (collection, bob_query, NULL, NULL);
    76. mongoc_cursor_next (cursor, &doc);
    77. bson_iter_init (&iter, doc);
    78. bson_iter_find (&iter, "balance");
    79. int64_t bob_balance = (bson_iter_value (&iter))->value.v_int64;
    80. int64_t new_bob_balance = bob_balance + amount_to_transfer;
    81. bob_ctx.collection = collection;
    82. bob_ctx.opts = opts;
    83. bob_ctx.balance = new_bob_balance;
    84. bob_ctx.account = bob_query;
    85. // set read & write concern
    86. mongoc_read_concern_t *read_concern = mongoc_read_concern_new ();
    87. mongoc_write_concern_t *write_concern = mongoc_write_concern_new ();
    88. mongoc_transaction_opt_t *txn_opts = mongoc_transaction_opts_new ();
    89. mongoc_write_concern_set_w(write_concern, MONGOC_WRITE_CONCERN_W_MAJORITY);
    90. mongoc_read_concern_set_level(read_concern, MONGOC_READ_CONCERN_LEVEL_SNAPSHOT);
    91. mongoc_transaction_opts_set_write_concern (txn_opts, write_concern);
    92. mongoc_transaction_opts_set_read_concern (txn_opts, read_concern);
    93. // callback
    94. r = mongoc_client_session_with_transaction (client_session, &callback_session, txn_opts, &bob_ctx, &reply, &error);
    95. assert(r);
    96. // find account balance of Bob after transaction
    97. cursor = mongoc_collection_find_with_opts (collection, bob_query, NULL, NULL);
    98. mongoc_cursor_next (cursor, &doc);
    99. bson_iter_init (&iter, doc);
    100. bson_iter_find (&iter, "balance");
    101. bob_balance = (bson_iter_value (&iter))->value.v_int64;
    102. assert(bob_balance == new_bob_balance);
    103. assert(bob_balance == 1500);
    104. // cleanup
    105. bson_destroy(alice_query);
    106. bson_destroy(bob_query);
    107. mongoc_client_session_destroy(client_session);
    108. bson_destroy(opts);
    109. mongoc_transaction_opts_destroy(txn_opts);
    110. mongoc_read_concern_destroy(read_concern);
    111. mongoc_write_concern_destroy(write_concern);
    112. mongoc_cursor_destroy(cursor);
    113. bson_destroy(doc);
    114. }
    115. int main(int argc, char* argv[]) {
    116. mongoc_init ();
    117. mongoc_client_t* client = mongoc_client_new (<connection uri>);
    118. bson_error_t error;
    119. // connect to bank db
    120. mongoc_database_t *database = mongoc_client_get_database (client, "bank");
    121. // access account collection
    122. mongoc_collection_t* collection = mongoc_client_get_collection(client, "bank", "account");
    123. // set amount to transfer
    124. int64_t amount_to_transfer = 500;
    125. // delete the collection if already existing
    126. mongoc_collection_drop(collection, &error);
    127. // open Alice account
    128. bson_t *alice_account = bson_new ();
    129. BSON_APPEND_UTF8(alice_account, "name", "Alice");
    130. BSON_APPEND_INT64(alice_account, "balance", 1000);
    131. // open Bob account
    132. bson_t *bob_account = bson_new ();
    133. BSON_APPEND_UTF8(bob_account, "name", "Bob");
    134. BSON_APPEND_INT64(bob_account, "balance", 1000);
    135. bool r = true;
    136. r = mongoc_collection_insert_one(collection, alice_account, NULL, NULL, &error);
    137. if (!r) {printf("Error encountered:%s", error.message);}
    138. r = mongoc_collection_insert_one(collection, bob_account, NULL, NULL, &error);
    139. if (!r) {printf("Error encountered:%s", error.message);}
    140. test_callback_money_transfer(client, collection, amount_to_transfer);
    141. }

    Python

    以下代码演示了如何将 Amazon DocumentDB 事务 API 与 Python 结合使用。

    1. // Sample Python code with callback api
    2. import pymongo
    3. def callback(session, balance, query):
    4. collection.update_one(query, {'$set': {"balance": balance}}, session=session)
    5. client = pymongo.MongoClient(<connection uri>)
    6. rc_snapshot = pymongo.read_concern.ReadConcern('snapshot')
    7. wc_majority = pymongo.write_concern.WriteConcern('majority')
    8. # To start, drop and create an account collection and insert balances for both Alice and Bob
    9. collection = client.get_database("bank").get_collection("account")
    10. collection.drop()
    11. collection.insert_one({"_id": 1, "name": "Alice", "balance": 1000})
    12. collection.insert_one({"_id": 2, "name": "Bob", "balance": 1000})
    13. amount_to_transfer = 500
    14. # deduct 500 from Alice's account
    15. alice_balance = collection.find_one({"name": "Alice"}).get("balance")
    16. assert alice_balance >= amount_to_transfer
    17. new_alice_balance = alice_balance - amount_to_transfer
    18. with client.start_session({'causalConsistency':False}) as session:
    19. session.with_transaction(lambda s: callback(s, new_alice_balance, {"name": "Alice"}), read_concern=rc_snapshot, write_concern=wc_majority)
    20. updated_alice_balance = collection.find_one({"name": "Alice"}).get("balance")
    21. assert updated_alice_balance == new_alice_balance
    22. # add 500 to Bob's account
    23. bob_balance = collection.find_one({"name": "Bob"}).get("balance")
    24. assert bob_balance >= amount_to_transfer
    25. new_bob_balance = bob_balance + amount_to_transfer
    26. with client.start_session({'causalConsistency':False}) as session:
    27. session.with_transaction(lambda s: callback(s, new_bob_balance, {"name": "Bob"}), read_concern=rc_snapshot, write_concern=wc_majority)
    28. updated_bob_balance = collection.find_one({"name": "Bob"}).get("balance")
    29. assert updated_bob_balance == new_bob_balance
    30. Sample Python code with Core api
    31. import pymongo
    32. client = pymongo.MongoClient(<connection_string>)
    33. rc_snapshot = pymongo.read_concern.ReadConcern('snapshot')
    34. wc_majority = pymongo.write_concern.WriteConcern('majority')
    35. # To start, drop and create an account collection and insert balances for both Alice and Bob
    36. collection = client.get_database("bank").get_collection("account")
    37. collection.drop()
    38. collection.insert_one({"_id": 1, "name": "Alice", "balance": 1000})
    39. collection.insert_one({"_id": 2, "name": "Bob", "balance": 1000})
    40. amount_to_transfer = 500
    41. # deduct 500 from Alice's account
    42. alice_balance = collection.find_one({"name": "Alice"}).get("balance")
    43. assert alice_balance >= amount_to_transfer
    44. new_alice_balance = alice_balance - amount_to_transfer
    45. with client.start_session({'causalConsistency':False}) as session:
    46. session.start_transaction(read_concern=rc_snapshot, write_concern=wc_majority)
    47. collection.update_one({"name": "Alice"}, {'$set': {"balance": new_alice_balance}}, session=session)
    48. session.commit_transaction()
    49. updated_alice_balance = collection.find_one({"name": "Alice"}).get("balance")
    50. assert updated_alice_balance == new_alice_balance
    51. # add 500 to Bob's account
    52. bob_balance = collection.find_one({"name": "Bob"}).get("balance")
    53. assert bob_balance >= amount_to_transfer
    54. new_bob_balance = bob_balance + amount_to_transfer
    55. with client.start_session({'causalConsistency':False}) as session:
    56. session.start_transaction(read_concern=rc_snapshot, write_concern=wc_majority)
    57. collection.update_one({"name": "Bob"}, {'$set': {"balance": new_bob_balance}}, session=session)
    58. session.commit_transaction()
    59. updated_bob_balance = collection.find_one({"name": "Bob"}).get("balance")
    60. assert updated_bob_balance == new_bob_balance

    核心 API 的事务 API 示例

    Javascript

    以下代码演示如何将 Amazon DocumentDB 事务 API 与 Javascript 结合使用。

    1. // *** Transfer $500 from Alice to Bob inside a transaction: Success ***
    2. // Setup bank account for Alice and Bob. Each have $1000 in their account
    3. var databaseName = "bank";
    4. var collectionName = "account";
    5. var amountToTransfer = 500;
    6. var session = db.getMongo().startSession({causalConsistency: false});
    7. var bankDB = session.getDatabase(databaseName);
    8. var accountColl = bankDB[collectionName];
    9. accountColl.drop();
    10. accountColl.insert({name: "Alice", balance: 1000});
    11. accountColl.insert({name: "Bob", balance: 1000});
    12. session.startTransaction();
    13. // deduct $500 from Alice's account
    14. var aliceBalance = accountColl.find({"name": "Alice"}).next().balance;
    15. assert(aliceBalance >= amountToTransfer);
    16. var newAliceBalance = aliceBalance - amountToTransfer;
    17. accountColl.update({"name": "Alice"},{"$set": {"balance": newAliceBalance}});
    18. var findAliceBalance = accountColl.find({"name": "Alice"}).next().balance;
    19. assert.eq(newAliceBalance, findAliceBalance);
    20. // add $500 to Bob's account
    21. var bobBalance = accountColl.find({"name": "Bob"}).next().balance;
    22. var newBobBalance = bobBalance + amountToTransfer;
    23. accountColl.update({"name": "Bob"},{"$set": {"balance": newBobBalance}});
    24. var findBobBalance = accountColl.find({"name": "Bob"}).next().balance;
    25. assert.eq(newBobBalance, findBobBalance);
    26. session.commitTransaction();
    27. accountColl.find();

    C#

    以下代码演示了如何将 Amazon DocumentDB 事务 API 与 C# 结合使用。

    1. // C# Core API
    2. public void TransferMoneyWithRetry(IMongoCollection<bSondocument> accountColl, IClientSessionHandle session)
    3. {
    4. var amountToTransfer = 500;
    5. // start transaction
    6. var transactionOptions = new TransactionOptions(
    7. readConcern: ReadConcern.Snapshot,
    8. writeConcern: WriteConcern.WMajority);
    9. session.StartTransaction(transactionOptions);
    10. try
    11. {
    12. // deduct $500 from Alice's account
    13. var aliceBalance = accountColl.Find(session, Builders<bSondocument>.Filter.Eq("name", "Alice")).FirstOrDefault().GetValue("balance");
    14. Debug.Assert(aliceBalance >= amountToTransfer);
    15. var newAliceBalance = aliceBalance.AsInt32 - amountToTransfer;
    16. accountColl.UpdateOne(session, Builders<bSondocument>.Filter.Eq("name", "Alice"),
    17. Builders<bSondocument>.Update.Set("balance", newAliceBalance));
    18. aliceBalance = accountColl.Find(session, Builders<bSondocument>.Filter.Eq("name", "Alice")).FirstOrDefault().GetValue("balance");
    19. Debug.Assert(aliceBalance == newAliceBalance);
    20. // add $500 from Bob's account
    21. var bobBalance = accountColl.Find(session, Builders<bSondocument>.Filter.Eq("name", "Bob")).FirstOrDefault().GetValue("balance");
    22. var newBobBalance = bobBalance.AsInt32 + amountToTransfer;
    23. accountColl.UpdateOne(session, Builders<bSondocument>.Filter.Eq("name", "Bob"),
    24. Builders<bSondocument>.Update.Set("balance", newBobBalance));
    25. bobBalance = accountColl.Find(session, Builders<bSondocument>.Filter.Eq("name", "Bob")).FirstOrDefault().GetValue("balance");
    26. Debug.Assert(bobBalance == newBobBalance);
    27. }
    28. catch (Exception e)
    29. {
    30. session.AbortTransaction();
    31. throw;
    32. }
    33. session.CommitTransaction();
    34. }
    35. public void DoTransactionWithRetry(MongoClient client)
    36. {
    37. var dbName = "bank";
    38. var collName = "account";
    39. using (var session = client.StartSession(new ClientSessionOptions{CausalConsistency = false}))
    40. {
    41. try
    42. {
    43. var bankDB = client.GetDatabase(dbName);
    44. var accountColl = bankDB.GetCollection<bSondocument>(collName);
    45. bankDB.DropCollection(collName);
    46. accountColl.InsertOne(session, new BsonDocument { {"name", "Alice"}, {"balance", 1000 } });
    47. accountColl.InsertOne(session, new BsonDocument { {"name", "Bob"}, {"balance", 1000 } });
    48. while(true) {
    49. try
    50. TransferMoneyWithRetry(accountColl, session);
    51. break;
    52. }
    53. catch (MongoException e)
    54. {
    55. if(e.HasErrorLabel("TransientTransactionError"))
    56. {
    57. continue;
    58. }
    59. else
    60. {
    61. throw;
    62. }
    63. }
    64. }
    65. // check values outside of transaction
    66. var aliceNewBalance = accountColl.Find(Builders<bSondocument>.Filter.Eq("name", "Alice")).FirstOrDefault().GetValue("balance");
    67. var bobNewBalance = accountColl.Find(Builders<bSondocument>.Filter.Eq("name", "Bob")).FirstOrDefault().GetValue("balance");
    68. Debug.Assert(aliceNewBalance == 500);
    69. Debug.Assert(bobNewBalance == 1500);
    70. }
    71. catch (Exception e)
    72. {
    73. Console.WriteLine("Error running transaction: " + e.Message);
    74. }
    75. }
    76. }

    Ruby

    以下代码演示如何将 Amazon DocumentDB 事务 API 与 Ruby 结合使用。

    1. # Ruby Core API
    2. def transfer_money_w_retry(session, accountColl)
    3. amountToTransfer = 500
    4. session.start_transaction(read_concern: {level: :snapshot}, write_concern: {w: :majority})
    5. # deduct $500 from Alice's account
    6. aliceBalance = accountColl.find({"name"=>"Alice"}, :session=> session).first['balance']
    7. assert aliceBalance >= amountToTransfer
    8. newAliceBalance = aliceBalance - amountToTransfer
    9. accountColl.update_one({"name"=>"Alice"}, { "$set" => {"balance"=>newAliceBalance} }, :session=> session)
    10. aliceBalance = accountColl.find({"name"=>"Alice"}, :session=> session).first['balance']
    11. assert_equal(newAliceBalance, aliceBalance)
    12. # add $500 to Bob's account
    13. bobBalance = accountColl.find({"name"=>"Bob"}, :session=> session).first['balance']
    14. newBobBalance = bobBalance + amountToTransfer
    15. accountColl.update_one({"name"=>"Bob"}, { "$set" => {"balance"=>newBobBalance} }, :session=> session)
    16. bobBalance = accountColl.find({"name"=>"Bob"}, :session=> session).first['balance']
    17. assert_equal(newBobBalance, bobBalance)
    18. session.commit_transaction
    19. end
    20. def do_txn_w_retry(client)
    21. dbName = "bank"
    22. collName = "account"
    23. session = client.start_session(:causal_consistency=> false)
    24. bankDB = Mongo::Database.new(client, dbName)
    25. accountColl = bankDB[collName]
    26. accountColl.drop()
    27. accountColl.insert_one({"name"=>"Alice", "balance"=>1000})
    28. accountColl.insert_one({"name"=>"Bob", "balance"=>1000})
    29. begin
    30. transferMoneyWithRetry(session, accountColl)
    31. puts "transaction committed"
    32. rescue Mongo::Error => e
    33. if e.label?('TransientTransactionError')
    34. retry
    35. else
    36. puts "transaction failed"
    37. raise
    38. end
    39. end
    40. # check results outside of transaction
    41. aliceBalance = accountColl.find({"name"=>"Alice"}).first['balance']
    42. bobBalance = accountColl.find({"name"=>"Bob"}).first['balance']
    43. assert_equal(aliceBalance, 500)
    44. assert_equal(bobBalance, 1500)
    45. end

    Java

    以下代码演示如何将 Amazon DocumentDB 事务 API 与 Java 结合使用。

    C

    以下代码演示了如何将 Amazon DocumentDB 事务 API 与 C 结合使用。

    1. // Sample C code with core session
    2. bool core_session(mongoc_client_session_t *client_session, mongoc_collection_t* collection, bson_t *selector, int64_t balance){
    3. bool r = true;
    4. bson_error_t error;
    5. bson_t *opts = bson_new();
    6. bson_t *update = BCON_NEW ("$set", "{", "balance", BCON_INT64 (balance), "}");
    7. // set read & write concern
    8. mongoc_read_concern_t *read_concern = mongoc_read_concern_new ();
    9. mongoc_write_concern_t *write_concern = mongoc_write_concern_new ();
    10. mongoc_transaction_opt_t *txn_opts = mongoc_transaction_opts_new ();
    11. mongoc_write_concern_set_w(write_concern, MONGOC_WRITE_CONCERN_W_MAJORITY);
    12. mongoc_read_concern_set_level(read_concern, MONGOC_READ_CONCERN_LEVEL_SNAPSHOT);
    13. mongoc_transaction_opts_set_write_concern (txn_opts, write_concern);
    14. mongoc_transaction_opts_set_read_concern (txn_opts, read_concern);
    15. mongoc_client_session_start_transaction (client_session, txn_opts, &error);
    16. mongoc_client_session_append (client_session, opts, &error);
    17. r = mongoc_collection_update_one (collection, selector, update, opts, NULL, &error);
    18. mongoc_client_session_commit_transaction (client_session, NULL, &error);
    19. bson_destroy (opts);
    20. mongoc_transaction_opts_destroy(txn_opts);
    21. mongoc_read_concern_destroy(read_concern);
    22. mongoc_write_concern_destroy(write_concern);
    23. bson_destroy (update);
    24. return r;
    25. }
    26. void test_core_money_transfer(mongoc_client_t* client, mongoc_collection_t* collection, int amount_to_transfer){
    27. bson_t reply;
    28. bool r = true;
    29. const bson_t *doc;
    30. bson_iter_t iter;
    31. bson_error_t error;
    32. // find query
    33. bson_t *alice_query = bson_new ();
    34. BSON_APPEND_UTF8(alice_query, "name", "Alice");
    35. bson_t *bob_query = bson_new ();
    36. BSON_APPEND_UTF8(bob_query, "name", "Bob");
    37. // create session
    38. // set causal consistency to false
    39. mongoc_session_opt_t *session_opts = mongoc_session_opts_new ();
    40. mongoc_session_opts_set_causal_consistency (session_opts, false);
    41. // start the session
    42. mongoc_client_session_t *client_session = mongoc_client_start_session (client, session_opts, &error);
    43. // add session to options
    44. bson_t *opts = bson_new();
    45. mongoc_client_session_append (client_session, opts, &error);
    46. // deduct 500 from Alice
    47. // find account balance of Alice
    48. mongoc_cursor_t *cursor = mongoc_collection_find_with_opts (collection, alice_query, NULL, NULL);
    49. mongoc_cursor_next (cursor, &doc);
    50. bson_iter_init (&iter, doc);
    51. bson_iter_find (&iter, "balance");
    52. int64_t alice_balance = (bson_iter_value (&iter))->value.v_int64;
    53. assert(alice_balance >= amount_to_transfer);
    54. int64_t new_alice_balance = alice_balance - amount_to_transfer;
    55. // core
    56. r = core_session (client_session, collection, alice_query, new_alice_balance);
    57. assert(r);
    58. // find account balance of Alice after transaction
    59. cursor = mongoc_collection_find_with_opts (collection, alice_query, NULL, NULL);
    60. mongoc_cursor_next (cursor, &doc);
    61. bson_iter_init (&iter, doc);
    62. bson_iter_find (&iter, "balance");
    63. alice_balance = (bson_iter_value (&iter))->value.v_int64;
    64. assert(alice_balance == new_alice_balance);
    65. assert(alice_balance == 500);
    66. // add 500 to Bob's balance
    67. // find account balance of Bob
    68. cursor = mongoc_collection_find_with_opts (collection, bob_query, NULL, NULL);
    69. mongoc_cursor_next (cursor, &doc);
    70. bson_iter_init (&iter, doc);
    71. bson_iter_find (&iter, "balance");
    72. int64_t bob_balance = (bson_iter_value (&iter))->value.v_int64;
    73. int64_t new_bob_balance = bob_balance + amount_to_transfer;
    74. //core
    75. r = core_session (client_session, collection, bob_query, new_bob_balance);
    76. assert(r);
    77. // find account balance of Bob after transaction
    78. cursor = mongoc_collection_find_with_opts (collection, bob_query, NULL, NULL);
    79. mongoc_cursor_next (cursor, &doc);
    80. bson_iter_init (&iter, doc);
    81. bson_iter_find (&iter, "balance");
    82. bob_balance = (bson_iter_value (&iter))->value.v_int64;
    83. assert(bob_balance == new_bob_balance);
    84. assert(bob_balance == 1500);
    85. // cleanup
    86. bson_destroy(alice_query);
    87. bson_destroy(bob_query);
    88. mongoc_client_session_destroy(client_session);
    89. bson_destroy(opts);
    90. mongoc_cursor_destroy(cursor);
    91. bson_destroy(doc);
    92. }
    93. int main(int argc, char* argv[]) {
    94. mongoc_init ();
    95. mongoc_client_t* client = mongoc_client_new (<connection uri>);
    96. bson_error_t error;
    97. // connect to bank db
    98. mongoc_database_t *database = mongoc_client_get_database (client, "bank");
    99. // access account collection
    100. mongoc_collection_t* collection = mongoc_client_get_collection(client, "bank", "account");
    101. // set amount to transfer
    102. int64_t amount_to_transfer = 500;
    103. // delete the collection if already existing
    104. mongoc_collection_drop(collection, &error);
    105. // open Alice account
    106. bson_t *alice_account = bson_new ();
    107. BSON_APPEND_UTF8(alice_account, "name", "Alice");
    108. BSON_APPEND_INT64(alice_account, "balance", 1000);
    109. // open Bob account
    110. bson_t *bob_account = bson_new ();
    111. BSON_APPEND_UTF8(bob_account, "name", "Bob");
    112. BSON_APPEND_INT64(bob_account, "balance", 1000);
    113. bool r = true;
    114. r = mongoc_collection_insert_one(collection, alice_account, NULL, NULL, &error);
    115. if (!r) {printf("Error encountered:%s", error.message);}
    116. r = mongoc_collection_insert_one(collection, bob_account, NULL, NULL, &error);
    117. if (!r) {printf("Error encountered:%s", error.message);}
    118. test_core_money_transfer(client, collection, amount_to_transfer);
    119. }

    Scala

    以下代码演示了如何将 Amazon DocumentDB 事务 API 与 Scala 结合使用。

    1. // Scala Core API
    2. def transferMoneyWithRetry(sessionObservable: SingleObservable[ClientSession] , database: MongoDatabase ): Unit = {
    3. val accountColl = database.getCollection("account")
    4. var amountToTransfer = 500
    5. var transactionObservable: Observable[ClientSession] = sessionObservable.map(clientSession => {
    6. clientSession.startTransaction()
    7. // deduct $500 from Alice's account
    8. var aliceBalance = accountColl.find(clientSession, Document("name" -> "Alice")).await().head.getInteger("balance")
    9. assert(aliceBalance >= amountToTransfer)
    10. var newAliceBalance = aliceBalance - amountToTransfer
    11. accountColl.updateOne(clientSession, Document("name" -> "Alice"), Document("$set" -> Document("balance" -> newAliceBalance))).await()
    12. aliceBalance = accountColl.find(clientSession, Document("name" -> "Alice")).await().head.getInteger("balance")
    13. assert(aliceBalance == newAliceBalance)
    14. // add $500 to Bob's account
    15. var bobBalance = accountColl.find(clientSession, Document("name" -> "Bob")).await().head.getInteger("balance")
    16. var newBobBalance = bobBalance + amountToTransfer
    17. accountColl.updateOne(clientSession, Document("name" -> "Bob"), Document("$set" -> Document("balance" -> newBobBalance))).await()
    18. bobBalance = accountColl.find(clientSession, Document("name" -> "Bob")).await().head.getInteger("balance")
    19. assert(bobBalance == newBobBalance)
    20. clientSession
    21. })
    22. transactionObservable.flatMap(clientSession => clientSession.commitTransaction()).await()
    23. }
    24. def doTransactionWithRetry(): Unit = {
    25. val client: MongoClient = MongoClientWrapper.getMongoClient()
    26. val database: MongoDatabase = client.getDatabase("bank")
    27. val accountColl = database.getCollection("account")
    28. accountColl.drop().await()
    29. val sessionOptions = ClientSessionOptions.builder().causallyConsistent(false).build()
    30. var sessionObservable: SingleObservable[ClientSession] = client.startSession(sessionOptions)
    31. accountColl.insertOne(Document("name" -> "Alice", "balance" -> 1000)).await()
    32. accountColl.insertOne(Document("name" -> "Bob", "balance" -> 1000)).await()
    33. var retry = true
    34. while (retry) {
    35. try {
    36. transferMoneyWithRetry(sessionObservable, database)
    37. println("transaction committed")
    38. retry = false
    39. }
    40. catch {
    41. case e: MongoException if e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL) => {
    42. println("retrying transaction")
    43. }
    44. case other: Throwable => {
    45. println("transaction failed")
    46. retry = false
    47. throw other
    48. }
    49. }
    50. }
    51. // check results outside of transaction
    52. assert(accountColl.find(Document("name" -> "Alice")).results().head.getInteger("balance") == 500)
    53. assert(accountColl.find(Document("name" -> "Bob")).results().head.getInteger("balance") == 1500)
    54. accountColl.drop().await()
    55. }

    不支持的功能

    Methods阶段或命令

    db.collection.aggregate()

    $collStats

    $currentOp

    $indexStats

    $listSessions

    $out

    db.collection.count()

    db.collection.countDocuments()

    $where

    $near

    $nearSphere

    db.collection.insert()

    insert 如果不是针对现有集合运行的,则不支持 。如果此方法针对的是预先存在的集合,则支持此方法。

    Sessions

    MongoDB 会话是一个框架,用于跨分片支持可重试写入、因果一致性、事务和管理操作。创建会话时,客户端会生成一个逻辑会话标识符 (lsid),用于在向服务器发送命令时标记该会话中的所有操作。

    Amazon DocumentDB 支持使用会话启用事务,但不支持因果一致性或可重试写入。

    在 Amazon DocumentDB 中使用事务时,事务将从使用 session.startTransaction() API 的会话中发起,并且会话一次支持单个事务。同样,事务使用提交 (session.commitTransaction()) 或中止 (session.abortTransaction()) APIs 完成。

    原因一致性保证在单个客户端会话中,客户端将观察先写后读一致性、 Monatomical 读/写,并且写入操作将遵循读取并且这些保证适用于集群中的所有实例,而不仅仅是主实例。Amazon DocumentDB 不支持因果一致性,以下语句将导致错误。

    1. var mySession = db.getMongo().startSession();
    2. var mySessionObject = mySession.getDatabase('test').getCollection('account');
    3. mySessionObject.updateOne({"_id": 2}, {"$inc": {"balance": 400}});
    4. //Result:{ "acknowledged" : true, "matchedCount" : 1, "modifiedCount" : 1 }
    5. mySessionObject.find()
    6. //Error: error: {
    7. // "ok" : 0,
    8. // "code" : 303,
    9. // "errmsg" : "Feature not supported: 'causal consistency'",
    10. // "operationTime" : Timestamp(1603461817, 493214)
    11. //}
    12. mySession.endSession()

    您可以在会话中禁用因果一致性。请注意,这样做将使您能够利用会话框架,但不会对读取提供因果一致性保证。使用 Amazon DocumentDB 时,从主集群进行的读取将具有先写后读一致性,从副本实例进行的读取将具有最终一致性。事务是使用会话的主要使用案例。

    1. var mySession = db.getMongo().startSession({causalConsistency: false});
    2. var mySessionObject = mySession.getDatabase('test').getCollection('account');
    3. mySessionObject.updateOne({"_id": 2}, {"$inc": {"balance": 400}});
    4. //Result:{ "acknowledged" : true, "matchedCount" : 1, "modifiedCount" : 1 }
    5. mySessionObject.find()
    6. //{ "_id" : 1, "name" : "Bob", "balance" : 100 }
    7. //{ "_id" : 2, "name" : "Alice", "balance" : 1700 }

    可重试写入

    可重试写入是一种功能,在该功能中,客户端尝试重试写入操作(一次、网络出现错误或客户端找不到主集群)。在 Amazon DocumentDB 中,不支持可重试写入,必须禁用。您可以使用连接字符串中的命令 (retryWrites=false) 将其禁用。以下是示例:

    1. mongodb://chimera:<insertYourPassword>@docdb-2019-01-29-02-57-28.cluster-ccuszbx3pn5e.us-east-1.docdb.amazonaws.com:27017/?ssl=true&ssl_ca_certs=rds-combined-ca-bundle.pem&replicaSet=rs0&readPreference=secondaryPreferred&retryWrites=false

    使用事务时,在某些情况下, 可能会产生一个错误,指出事务编号与任何进行中的事务都不匹配。

    错误可在至少两个不同的场景中生成:

    • After the one-minute transaction timeout.
    • After an instance restart (due to patching, crash recovery, etc.), it is possible to receive this error even in cases where the transaction successfully committed. During an instance restart, the database can’t tell the difference between a transaction that successfully completed versus a transaction that aborted. In other words, the transaction completion state is ambiguous.

    处理此错误的最佳方式是使事务更新具有静态 - 例如,通过使用 $set 排列器而不是递增/递减操作。请参阅以下内容:

    1. { "ok" : 0,
    2. "operationTime" : Timestamp(1603938167, 1),
    3. "code" : 251,
    4. "errmsg" : "Given transaction number 1 does not match any in-progress transactions."