Python 中两种使用事物的方式
# 使用DNS种子列表连接格式为连接定义uriString
uri = 'mongodb+srv://server.example.com/'
client = MongoClient(uriString)
my_wc_majority = WriteConcern('majority', wtimeout=1000)
# 前提条件/步骤0:如果集合不存在,就创建集合
# 事务中的CRUD操作必须在已存在的集合中进行
client.get_database( "webshop",
write_concern=my_wc_majority).orders.insert_one({"sku":
"abc123", "qty":0})
client.get_database( "webshop",
write_concern=my_wc_majority).inventory.insert_one(
{"sku": "abc123", "qty": 1000})
# 步骤1:在事务中定义操作及其顺序
def update_orders_and_inventory(my_session):
orders = my_session.client.webshop.orders
inventory = my_session.client.webshop.inventory
with my_session.start_transaction(
read_concern=ReadConcern("snapshot"),
write_concern=WriteConcern(w="majority"),
read_preference=ReadPreference.PRIMARY):
orders.insert_one({"sku": "abc123", "qty": 100}, session=my_session)
inventory.update_one({"sku": "abc123", "qty": {"$gte": 100}},
{"$inc": {"qty": -100}}, session=my_session)
commit_with_retry(my_session)
# 步骤2:尝试使用重试逻辑运行并提交事务
def commit_with_retry(session):
while True:
try:
# 提交操作会使用事务开始时设置的写关注
session.commit_transaction()
print("Transaction committed.")
break
except (ConnectionFailure, OperationFailure) as exc:
# 可以重试提交
if exc.has_error_label("UnknownTransactionCommitResult"):
print("UnknownTransactionCommitResult, retrying "
"commit operation ...")
continue
else:
print("Error during commit ...")
raise
# 步骤3:尝试使用重试逻辑运行事务函数txn_func
def run_transaction_with_retry(txn_func, session):
while True:
try:
txn_func(session) # 运行事务
break
except (ConnectionFailure, OperationFailure) as exc:
# 如果出现暂时性错误,则重试整个事务
if exc.has_error_label("TransientTransactionError"):
print("TransientTransactionError, retrying transaction ...")
continue
else:
raise
# 步骤4:开启一个会话
with client.start_session() as my_session:
# 步骤5:调用函数run_transaction_with_retry并向其传递update_orders_and_inventory
函数的调用和my_session会话,以关联此事务
try:
run_transaction_with_retry(update_orders_and_inventory, my_session)
except Exception as exc:
# 错误处理。在核心API中没有提供错误处理代码
raise
# 使用DNS种子列表连接格式为连接定义uriString
uriString = 'mongodb+srv://server.example.com/'
client = MongoClient(uriString)
my_wc_majority = WriteConcern('majority', wtimeout=1000)
# 前提条件/步骤0:如果集合不存在,就创建集合
# 事务中的CRUD操作必须在已存在的集合中进行
client.get_database( "webshop",
write_concern=my_wc_majority).orders.insert_one(
{"sku": "abc123", "qty":0})
client.get_database( "webshop",
write_concern=my_wc_majority).inventory.insert_one(
{"sku": "abc123", "qty": 1000})
# 步骤1:定义回调方法以指定在事务内部执行的操作序列
def callback(my_session):
orders = my_session.client.webshop.orders
inventory = my_session.client.webshop.inventory
# 重要:必须将会话变量my_session传递给操作
orders.insert_one({"sku": "abc123", "qty": 100}, session=my_session)
inventory.update_one({"sku": "abc123", "qty": {"$gte": 100}},
{"$inc": {"qty": -100}}, session=my_session)
# 步骤2:启动客户端会话
with client.start_session() as session:
# 步骤3:使用with_transaction启动事务、执行回调并提交(或在发生错误时中止)
session.with_transaction(callback,
read_concern=ReadConcern('local'),
write_concern=my_write_concern_majority,
read_preference=ReadPreference.PRIMARY)