事务

Python 中两种使用事物的方式

# 使用DNS种子列表连接格式为连接定义uriString
uri = 'mongodb+srv://server.example.com/'
client = MongoClient(uriString)

my_wc_majority = WriteConcern('majority', wtimeout=1000)

# 前提条件/步骤0:如果集合不存在,就创建集合
# 事务中的CRUD操作必须在已存在的集合中进行

client.get_database( "webshop",
                     write_concern=my_wc_majority).orders.insert_one({"sku":
                     "abc123", "qty":0})
client.get_database( "webshop",
                     write_concern=my_wc_majority).inventory.insert_one(
                     {"sku": "abc123", "qty": 1000})

# 步骤1:在事务中定义操作及其顺序
def update_orders_and_inventory(my_session):
    orders = my_session.client.webshop.orders
    inventory = my_session.client.webshop.inventory
    with my_session.start_transaction(
        read_concern=ReadConcern("snapshot"),
        write_concern=WriteConcern(w="majority"),
        read_preference=ReadPreference.PRIMARY):
        orders.insert_one({"sku": "abc123", "qty": 100}, session=my_session)
        inventory.update_one({"sku": "abc123", "qty": {"$gte": 100}},
                             {"$inc": {"qty": -100}}, session=my_session)
        commit_with_retry(my_session)

# 步骤2:尝试使用重试逻辑运行并提交事务
def commit_with_retry(session):
    while True:
        try:
            # 提交操作会使用事务开始时设置的写关注
            session.commit_transaction()
            print("Transaction committed.")
            break
        except (ConnectionFailure, OperationFailure) as exc:
            # 可以重试提交
            if exc.has_error_label("UnknownTransactionCommitResult"):
                print("UnknownTransactionCommitResult, retrying "
                      "commit operation ...")
                continue
            else:
                print("Error during commit ...")
                raise

# 步骤3:尝试使用重试逻辑运行事务函数txn_func
def run_transaction_with_retry(txn_func, session):
    while True:
        try:
            txn_func(session) # 运行事务
            break
        except (ConnectionFailure, OperationFailure) as exc:
            # 如果出现暂时性错误,则重试整个事务
            if exc.has_error_label("TransientTransactionError"):
                print("TransientTransactionError, retrying transaction ...")
                continue
            else:
                raise

# 步骤4:开启一个会话
with client.start_session() as my_session:

# 步骤5:调用函数run_transaction_with_retry并向其传递update_orders_and_inventory
函数的调用和my_session会话,以关联此事务

    try:
        run_transaction_with_retry(update_orders_and_inventory, my_session)
    except Exception as exc:
        # 错误处理。在核心API中没有提供错误处理代码
        raise
# 使用DNS种子列表连接格式为连接定义uriString
uriString = 'mongodb+srv://server.example.com/'
client = MongoClient(uriString)

my_wc_majority = WriteConcern('majority', wtimeout=1000)

# 前提条件/步骤0:如果集合不存在,就创建集合
# 事务中的CRUD操作必须在已存在的集合中进行

client.get_database( "webshop",
                     write_concern=my_wc_majority).orders.insert_one(
                     {"sku": "abc123", "qty":0})
client.get_database( "webshop",
                     write_concern=my_wc_majority).inventory.insert_one(
                     {"sku": "abc123", "qty": 1000})

# 步骤1:定义回调方法以指定在事务内部执行的操作序列

def callback(my_session):
    orders = my_session.client.webshop.orders
    inventory = my_session.client.webshop.inventory

    # 重要:必须将会话变量my_session传递给操作

    orders.insert_one({"sku": "abc123", "qty": 100}, session=my_session)
    inventory.update_one({"sku": "abc123", "qty": {"$gte": 100}},
                         {"$inc": {"qty": -100}}, session=my_session)

# 步骤2:启动客户端会话

with client.start_session() as session:

# 步骤3:使用with_transaction启动事务、执行回调并提交(或在发生错误时中止)

    session.with_transaction(callback,
                             read_concern=ReadConcern('local'),
                             write_concern=my_write_concern_majority,
                             read_preference=ReadPreference.PRIMARY)