在散布式体系中,业务的处理散布在不同组件、服务中,因而散布式业务的ACID确保面临着一些特别难点。本系列文章介绍了21种散布式业务规划方式,并剖析其完成原理和优缺陷,在面临详细散布式业务问题时,能够挑选合适的方式进行处理。原文: Exploring Solutions for Distributed Transactions (3)

分布式事务的21种武器 - 3

在不同业务场景下,能够有不同的解决方案,常见办法有:

  1. 堵塞重试(Blocking Retry)
  2. 二阶段和三阶段提交(Two-Phase Commit (2PC) and Three-Phase Commit (3PC))
  3. 基于后台队列的异步处理(Using Queues to Process Asynchronously in the Background)
  4. TCC补偿(TCC Compensation Matters)
  5. 本地音讯表(异步确保)/发件箱方式(Local Message Table (Asynchronously Ensured)/Outbox Pattern)
  6. MQ业务(MQ Transaction)
  7. Saga方式(Saga Pattern)
  8. 事情驱动(Event Sourcing)
  9. 指令查询责任别离(Command Query Responsibility Segregation, CQRS)
  10. 原子提交(Atomic Commitment)
  11. 并行提交(Parallel Commits)
  12. 业务复制(Transactional Replication)
  13. 一致性算法(Consensus Algorithms)
  14. 时刻戳排序(Timestamp Ordering)
  15. 乐观并发操控(Optimistic Concurrency Control)
  16. 拜占庭容错(Byzantine Fault Tolerance, BFT)
  17. 散布式锁(Distributed Locking)
  18. 分片(Sharding)
  19. 多版本并发操控(Multi-Version Concurrency Control, MVCC)
  20. 散布式快照(Distributed Snapshots)
  21. 主从复制(Leader-Follower Replication)

本文将介绍Saga、事情驱动以及CQRS三种方式。

7. Saga方式(Saga Pattern)

分布式事务的21种武器 - 3

  • 办理跨多个微服务的长期业务。
  • 将业务分解为一系列较小的、独立的进程,每个进程都由单独的微服务办理。
  • 包括如下进程:
    1. 和谐微服务担任接纳业务初始恳求。
    2. 和谐微服务经过向第一个担任处理业务的微服务发送音讯来启动业务。
    3. 第一个微服务履行业务,并将音讯发送回和谐微服务,反馈其是否成功。
    4. 假如第一步成功,和谐微服务将向担任业务下一步的微服务发送音讯。
    5. 假如第一步失利,和谐微服务发送补偿动作来吊销失利进程的影响。
    6. 重复进程3-5,直到每个微服务要么完成其进程,要么在失利时触发补偿操作(回滚)。
    7. 一旦一切进程都成功完成,和谐微服务就会发送一条音讯,表明整个业务已经成功。
    8. 假如任何进程失利而且触发了补偿操作(回滚),则和谐微服务将发送一条音讯,指示整个业务失利。
import pika
import json
# Define the RabbitMQ connection parameters
credentials = pika.PlainCredentials('guest', 'guest')
parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials)
# Define the messages to be sent between services
start_order_message = {'order_id': '12345', 'items': [{'id': '1', 'name': 'item1'}, {'id': '2', 'name': 'item2'}]}
payment_message = {'order_id': '12345', 'amount': 100.0}
shipping_message = {'order_id': '12345', 'items': [{'id': '1', 'name': 'item1'}, {'id': '2', 'name': 'item2'}]}
# Define the compensation messages to be sent in case of failure
cancel_payment_message = {'order_id': '12345', 'amount': 100.0}
cancel_shipping_message = {'order_id': '12345', 'items': [{'id': '1', 'name': 'item1'}, {'id': '2', 'name': 'item2'}]}
# Define the function to send messages to the RabbitMQ broker
def send_message(queue_name, message):
    connection = pika.BlockingConnection(parameters)
    channel = connection.channel()
    channel.queue_declare(queue=queue_name)
    channel.basic_publish(exchange='', routing_key=queue_name, body=json.dumps(message))
    connection.close()
# Define the function to handle the start of the order
def start_order():
    # Send the start order message to the Order service
    send_message('start_order', start_order_message)
# Define the function to handle the payment of the order
def payment():
    try:
        # Send the payment message to the Payment service
        send_message('payment', payment_message)
    except Exception as e:
        # Send the cancel payment message to the Payment service in case of failure
        send_message('cancel_payment', cancel_payment_message)
        raise e
# Define the function to handle the shipping of the order
def shipping():
    try:
        # Send the shipping message to the Shipping service
        send_message('shipping', shipping_message)
    except Exception as e:
        # Send the cancel shipping message to the Shipping service in case of failure
        send_message('cancel_shipping', cancel_shipping_message)
        raise e
# Define the function to handle the cancellation of the order
def cancel_order():
    # Send the cancel payment message to the Payment service
    send_message('cancel_payment', cancel_payment_message)
    # Send the cancel shipping message to the Shipping service
    send_message('cancel_shipping', cancel_shipping_message)
# Define the main function to execute the Saga
def execute_saga():
    try:
        # Start the order
        start_order()
        # Perform the payment
        payment()
        # Perform the shipping
        shipping()
    except Exception as e:
        # Cancel the order in case of failure
        cancel_order()
        raise e
# Call the main function to execute the Saga
execute_saga()

示例代码

  • 用RabbitMQ作为简略音讯署理
  • 界说了在服务之间发送的五条音讯: start_order_messagepayment_messageshipping_messagecancel_payment_message以及cancel_shipping_message
  • start_order函数将start_order_message发送给order_service
  • 在收到来自start_order函数的音讯后,order_service创立订单,并发送回包括order_id的承认音讯。
  • 一旦start_order函数接纳到承认音讯,将发送payment_messagepayment_service来处理订单付出。
  • 假如付出成功,payment_service将回来一条包括payment_id的承认音讯。
  • start_order函数将shipping_message发送给shipping_service,以便在付款成功后发货。
  • 假如发货成功,shipping_service将回来一条包括shipping_id的承认音讯。
  • 假如上述任何进程失利,则回滚业务,别离给shipping_servicepayment_service发送cancel_shipping_messagecancel_payment_message,吊销所做的更改。
  • 经过向start_order发送初始音讯、监听承认音讯以及在产生毛病时处理补偿来处理整个Saga流程。换句话说,Saga方式触及一系列补偿操作,以便在产生毛病时吊销业务的影响。

长处

  • 办理跨多个微服务的长期业务
  • 避免服务独立运转时呈现不一致或数据损坏
  • 假如业务中的某个进程失利,则供给补偿操作
  • 答应服务自主、独立运转

缺陷

  • 完成这种方式或许比较杂乱
  • 很难规划和测试补偿逻辑
  • 会给体系添加额外的杂乱性,使保护和毛病排除变得愈加困难

适用场景

  • 触及多个服务(如付出处理、订单履约、物流)的电子商务交易
  • 触及多个体系和服务的杂乱金融交易
  • 触及多个供应商、制造商和物流供应商的供应链办理体系

测验-承认-撤销(TCC)方式与Saga方式的相似之处

  • 两种方式都保护散布式业务中触及多个微服务的数据一致性
  • 两种方式都要求每个服务界说一组需求作为业务一部分的操作

测验-承认-撤销(TCC)方式与Saga方式的不同之处

  • Saga方式运用前向康复法,每个服务在呈现毛病时启动一个补偿业务,而TCC方式运用后向康复法,每个服务验证业务是否能够继续,然后才承认或撤销。
  • Saga方式将业务表明为事情序列,事情由相关服务之间发送的音讯表明。TCC方式将业务表明为由所触及的每个服务履行的操作序列。
  • Saga方式适用于触及多个服务的长期业务,而TCC方式适用于触及较少服务的短时刻业务。
  • Saga方式的完成或许比TCC方式更杂乱,要求每个服务能够发起补偿业务并处理潜在毛病。

8. 事情驱动(Event Sourcing)

分布式事务的21种武器 - 3

  • 对运用程序状况所做的一切更改作为一系列事情。
  • 将这些事情存储在数据库或事情日志中,然后供给运用程序状况随时刻改动的完整审计盯梢。
  • 触及如下进程:
    1. 每当运用程序状况产生改动时,就会捕获相应事情,事情包括一切更改相关信息(例如已修改的数据和进行更改的用户)。
    2. 事情存储在事情日志中,能够用数据库或音讯署理完成,每个事情都有一个唯一标识符,并带有时刻戳,以确保事情有序。
    3. 经过按时刻次序重播事情日志中的事情来重构运用程序当时状况。该进程包括将运用程序的状况初始化为其初始状况,然后顺次运用每个事情来更新状况。
    4. 一旦状况被重构,就能够对其进行查询,以供给有关运用程序当时状况的信息。
    5. 能够实时处理事情,触发其他动作或更新。
    6. 事情处理完成后,能够将其归档或删除以释放存储空间。

分布式事务的21种武器 - 3

import uuid
import json
import time
class BankAccount:
    def __init__(self):
        self.balance = 0
        self.event_sourcing = EventSourcing()
    def deposit(self, amount):
        event = Event('deposit', {'amount': amount})
        self.event_sourcing.add_event(event)
        self.balance += amount
    def withdraw(self, amount):
        if self.balance < amount:
            raise ValueError('Insufficient balance')
        event = Event('withdraw', {'amount': amount})
        self.event_sourcing.add_event(event)
        self.balance -= amount
    def get_balance(self):
        return self.balance
    def get_events(self):
        return self.event_sourcing.get_events()
    def get_event_by_id(self, event_id):
        return self.event_sourcing.get_event_by_id(event_id)
    def replay_events(self):
        self.balance = 0
        for event in self.event_sourcing.get_events():
            if event.type == 'deposit':
                self.balance += event.data['amount']
            elif event.type == 'withdraw':
                self.balance -= event.data['amount']
class Event:
    def __init__(self, type, data):
        self.id = uuid.uuid4()
        self.timestamp = int(time.time())
        self.type = type
        self.data = data
class EventSourcing:
    def __init__(self):
        self.event_store = EventStore()
    def add_event(self, event):
        self.event_store.store_event(event)
    def get_events(self):
        return self.event_store.get_events()
    def get_event_by_id(self, event_id):
        return self.event_store.get_event_by_id(event_id)
class EventStore:
    def __init__(self):
        self.events = []
    def store_event(self, event):
        self.events.append(event)
    def get_events(self):
        return self.events
    def get_event_by_id(self, event_id):
        for event in self.events:
            if event.id == event_id:
                return event
        raise ValueError('Event not found')
class Auditor:
    def __init__(self, event_store):
        self.event_store = event_store
    def log_events(self):
        for event in self.event_store.get_events():
            print(json.dumps({'id': str(event.id), 'type': event.type, 'data': event.data}))
account = BankAccount()
auditor = Auditor(account.event_sourcing.event_store)
account.deposit(100)
account.withdraw(50)
account.deposit(75)
print('Current balance:', account.get_balance())
print('All events:')
auditor.log_events()
event_id = account.get_events()[1].id
event = account.get_event_by_id(event_id)
print('Event details:', json.dumps({'id': str(event.id), 'type': event.type, 'data': event.data}))

示例代码

  • BankAccount类用来演示如何运用事情来重建实体状况,包括balance特点,支持deposit(存款)和withdraw(取款)两种操作。
  • Event类有类型和数据特点。
  • EventSourcing类界说了BankAccount的event_sourcing特点
  • EventSourcing类包括event_store特点作为事情列表。
  • EventStore类有两个主要办法: store_event()(在列表中存储事情)和get_events()(从列表中检索事情)。
  • add_event办法向event_store添加新事情。
  • get_eventsget_event_by_id办法能够经过ID检索一切事情或特定事情。
  • depositwithdraw办法创立具有唯一ID、时刻戳和字典(包括操作信息,在本例中为操作类型和金额)的新Event目标,事情被添加到BankAccount实例的event_sourcing特点中。
  • 每次进行depositwithdraw时,都会创立相应事情,并经过EventStore类将其存储在事情存储中。
  • get_balance办法回来帐户当时余额。
  • replay_events()办法从事情存储中检索一切事情,并核算当时余额。遍历事情存储中的一切事情,并根据每个事情的类型和数据更新BankAccountbalance特点。
  • Auditor类监听存储在事情存储库中的一切事情,并在终端上输出相应log。
  • 以JSON格局打印当时余额和一切事情,经过ID检索特定事情并打印其详细信息。
  • 事情源方式是创立事情来表明对体系状况的更改,将这些事情存储在事情存储中,并重播事情以重建体系当时状况。

长处

  • 捕获、存储一切事情,以便进行审计和确保一致性
  • 能够重播一切事情以创立数据的不同视图
  • 经过存储事情来处理大量数据
  • 能够重播事情以将体系康复到曾经的状况
  • 事情日志能够作为一段时刻内体系行为的文档,使其更简单保护和扩展。

缺陷

  • 完成或许比较杂乱,特别是在处理事情规划和数据搬迁的杂乱性时
  • 存储一切事情需求更多存储空间
  • 有必要重放一切事情以确定当时状况,或许导致额外功能开支

适用场景

  • 记载交易和财务事项
  • 记载健康事情和医疗程序
  • 记载订单事情和付款事情

注意事项

  • 事情规划 —— 以细粒度方式捕捉体系状况的改动。事情应该是不可变的,这意味着事情被创立后不能被修改。事情的规划应该支持简略的查询和剖析。
  • 存储需求 —— 一切对体系状况的更改都以事情序列的方式存储。存储空间明显大于传统数据库。
  • 数据搬迁 —— 提前计划数据搬迁,并考虑如何将数据从旧体系搬迁到新的事情源体系。

9. 指令查询责任别离(Command Query Responsibility Segregation, CQRS)

分布式事务的21种武器 - 3

  • 将读写操作别离到单独的服务或模型中
  • 指令: 改动体系状况
  • 查询: 回来数据
  • 触及如下进程:
    1. 用户向体系宣布读取或写入数据的恳求
    2. 假如恳求是指令(写操作),则将该指令发送给指令服务,指令服务处理恳求并更新体系状况。
    3. 指令服务更新写入模型,其中包括体系的当时状况,并创立描述更改的事情。事情被添加到事情流中,事情流是体系中产生的一切事情的日志。
    4. 指令服务将事情发布到音讯署理,音讯署理将事情传递给感兴趣的订阅者。
    5. 假如恳求是查询(读操作),则将查询发送给查询服务,查询服务从读模型中检索数据。
    6. 查询服务从读模型中检索数据并将其回来给用户。
    7. 假如用户想履行另一个写操作,则从进程2开端重复该进程。
    8. 假如用户想要履行读操作,则从进程5开端重复该进程。
from abc import ABC, abstractmethod
from typing import Any, Dict, Optional
class Command(ABC):
    pass
class CreateProductCommand(Command):
    def __init__(self, name: str, price: float):
        self.name = name
        self.price = price
class UpdateProductCommand(Command):
    def __init__(self, product_id: str, name: Optional[str] = None, price: Optional[float] = None):
        self.product_id = product_id
        self.name = name
        self.price = price
class DeleteProductCommand(Command):
    def __init__(self, product_id: str):
        self.product_id = product_id
class Query(ABC):
    pass
class GetProductQuery(Query):
    def __init__(self, product_id: str):
        self.product_id = product_id
class GetAllProductsQuery(Query):
    pass
class Product:
    def __init__(self, id: str, name: str, price: float):
        self.id = id
        self.name = name
        self.price = price
class ProductRepository:
    def __init__(self):
        self.products = []
    def create(self, name: str, price: float) -> Product:
        product = Product(str(len(self.products) + 1), name, price)
        self.products.append(product)
        return product
    def get(self, id: str) -> Optional[Product]:
        for product in self.products:
            if product.id == id:
                return product
        return None
    def get_all(self) -> List[Product]:
        return self.products
    def update(self, id: str, name: Optional[str] = None, price: Optional[float] = None) -> Optional[Product]:
        for product in self.products:
            if product.id == id:
                if name is not None:
                    product.name = name
                if price is not None:
                    product.price = price
                return product
        return None
    def delete(self, id: str) -> bool:
        for product in self.products:
            if product.id == id:
                self.products.remove(product)
                return True
        return False
class ProductCommandHandler:
    def __init__(self, repository: ProductRepository):
        self.repository = repository
    def handle(self, command: Command) -> Optional[Product]:
        if isinstance(command, CreateProductCommand):
            return self.repository.create(command.name, command.price)
        elif isinstance(command, UpdateProductCommand):
            return self.repository.update(command.product_id, command.name, command.price)
        elif isinstance(command, DeleteProductCommand):
            success = self.repository.delete(command.product_id)
            return success
class ProductQueryHandler:
    def __init__(self, repository: ProductRepository):
        self.repository = repository
    def handle(self, query: Query) -> Optional[Any]:
        if isinstance(query, GetProductQuery):
            return self.repository.get(query.product_id)
        elif isinstance(query, GetAllProductsQuery):
            return self.repository.get_all()
class ProductService:
    def __init__(self, command_handler: ProductCommandHandler, query_handler: ProductQueryHandler):
        self.command_handler = command_handler
        self.query_handler = query_handler
    def create_product(self, name: str, price: float) -> Product:
        command = CreateProductCommand(name, price)
        return self.command_handler.handle(command)
    def get_product(self, id: str) -> Optional[Product]:
        query = GetProductQuery(id)
        return self.query_handler.handle(query)
    def get_all_products(self) -> List[Product]:
        query = GetAllProductsQuery()
        return self

示例代码

  • 一个产品办理体系。
  • 抽象类CommandQuery别离由详细类完成。
  • 3个指令类的完成: CreateProductCommandUpdateProductCommandDeleteProductCommand
  • CreateProductCommand创立产品
  • UpdateProductCommand更新产品
  • DeleteProductCommand删除产品
  • 2个查询类的完成: GetProductQueryGetAllProductQuery
  • GetProductQuery检索关于特定产品的信息
  • GetAllProductQuery检索一切产品的信息
  • Product类表明一个产品,包括idnameprice
  • ProductRepository类处理产品数据的持久性,具有创立、检索、更新和删除产品的办法
  • ProductCommandHandler类处理指令并将ProductRepository作为依靠项
  • ProductQueryHandler类处理查询并将ProductRepository作为依靠项
  • 两个handle办法担任接受指令或查询,并回来适当的响应
  • ProductService类作为客户端与产品办理体系交互的进口,将ProductCommandHandlerProductQueryHandler作为依靠项,并揭露用于创立、检索和列出产品的办法,这些办法只是对适当指令或查询的包装,并将其传递给相应的处理程序。

长处

  • 别离读写模型以别离优化两个进程,能够获得更好的功能。
  • 别离读写模型使代码更简单保护。
  • 此方式可针对特定用例进行优化。

缺陷

  • 体系的规划和完成比较杂乱
  • 比传统的整体架构需求更多时刻和资源
  • 在产生写操作和更新读模型之间或许存在延迟,然后导致只能确保最终一致性

适用场景

  • 具有杂乱域逻辑和高读写负载的运用程序
  • 具有不同用户界面的体系,例如移动和web运用程序,其中每个界面都有自己特定的读取要求
  • 电子商务体系中的产品目录办理和订单办理
  • 医疗保健体系中的患者数据检索和数据输入
  • 金融交易体系中的实时市场数据检索和订单履行

CQRS和事情驱动的结合

分布式事务的21种武器 - 3


参考文献

Saga pattern

Microservices Pattern: Sagas

Saga Pattern

Saga Pattern Microservices

Saga Pattern for Microservices Distributed Transactions

Microservice Design Pattern – Saga

Event Driven Saga Pattern

How to Use Saga Pattern in Microservices

Saga Orchestration for Microservices Using the Outbox Pattern

Saga Without the Headaches

Event Sourcing

Event Sourcing – why a dedicated event store?

Beginner’s Guide to Event Sourcing

Microservices Pattern: Event Sourcing

Event Sourcing pattern

Event Sourcing

Event Sourcing explained

CQRS Event Sourcing JAVA

Introduction to CQRS

CQRS Pattern

bliki: CQRS

Microservices Pattern: Command Query Responsibility Segregation (CQRS)

A Beginner’s Guide to CQRS

CQRS Desgin Pattern in Microservices Architecture

The Command and Query Responsibility Segregation(CQRS)

Event Driven CQRS Pattern

CQRS Pattern

CQRS Software Architecture Pattern: The Good, the Bad, and the Ugly


你好,我是俞凡,在Motorola做过研制,现在在Mavenir做技能工作,对通讯、网络、后端架构、云原生、DevOps、CICD、区块链、AI等技能一直保持着浓厚的兴趣,平常喜爱阅读、考虑,相信继续学习、终身成长,欢迎一同交流学习。
微信公众号:DeepNoMind