分布式事务的实现是现代分布式系统中的一个重要问题。由于分布式系统中存在多个独立的服务,每个服务可能有自己的数据库,因此在这些服务之间执行跨越多个数据库的事务时,保证一致性和可靠性变得尤为复杂。分布式事务的目标是确保所有参与的服务要么都成功提交,要么都回滚,以维持系统的一致性。
分布式事务的实现主要有两种经典的方式:
1. 两阶段提交协议(2PC,Two-Phase Commit)
两阶段提交协议是一种经典的分布式事务协议,用于确保所有参与者(通常是各个数据库)在事务中达到一致。
工作原理:
两阶段提交协议分为两个阶段:
第一阶段(准备阶段):
事务协调者(Coordinator)向所有参与者(Participants)发送 "准备提交" 请求。每个参与者执行事务并将结果保存到本地,但不提交。如果没有错误,参与者返回一个 "准备提交" 的响应,表示可以提交;否则返回 "回滚"。
第二阶段(提交/回滚阶段):
如果所有参与者都返回 "准备提交",协调者就会向所有参与者发送 "提交" 请求,要求各参与者提交事务。如果有任何参与者返回 "回滚",协调者会向所有参与者发送 "回滚" 请求,要求所有参与者撤销事务。优缺点:
优点:
保证了事务的一致性,所有参与者要么提交,要么回滚。缺点:
阻塞问题:如果协调者失败,所有参与者会一直等待,直到协调者恢复。单点故障:协调者如果发生故障,事务无法继续进行。性能问题:由于等待各个参与者的确认和响应,可能会引起较高的延迟。 两阶段提交协议(2PC)示例
这个示例演示了如何使用 Spring 来实现一个简化版的两阶段提交协议。
1.1. 步骤 1:创建数据库和事务管理
首先,我们需要准备两个数据库,分别是 db1 和 db2,并使用 Spring 的事务管理器来确保分布式事务的一致性。
@Configuration
@EnableTransactionManagement
public class DataSourceConfig {
@Bean
public DataSource dataSource1() {
// 数据库1的配置
HikariDataSource dataSource = new HikariDataSource();
dataSource.setJdbcUrl("jdbc:mysql://localhost:3306/db1");
dataSource.setUsername("root");
dataSource.setPassword("password");
return dataSource;
}
@Bean
public DataSource dataSource2() {
// 数据库2的配置
HikariDataSource dataSource = new HikariDataSource();
dataSource.setJdbcUrl("jdbc:mysql://localhost:3306/db2");
dataSource.setUsername("root");
dataSource.setPassword("password");
return dataSource;
}
@Bean
public PlatformTransactionManager transactionManager1() {
return new DataSourceTransactionManager(dataSource1());
}
@Bean
public PlatformTransactionManager transactionManager2() {
return new DataSourceTransactionManager(dataSource2());
}
}
1.2. 步骤 2:实现两阶段提交逻辑
接下来,创建一个服务类,模拟两阶段提交协议。我们将通过协调者(Coordinator)来管理分布式事务。
@Service
public class TwoPhaseCommitService {
@Autowired
private PlatformTransactionManager transactionManager1;
@Autowired
private PlatformTransactionManager transactionManager2;
@Autowired
private JdbcTemplate jdbcTemplate1;
@Autowired
private JdbcTemplate jdbcTemplate2;
public void beginTransaction() {
// Step 1: First phase - Prepare
// 1.1 Begin transaction in db1
TransactionStatus status1 = transactionManager1.getTransaction(new DefaultTransactionDefinition());
// 1.2 Begin transaction in db2
TransactionStatus status2 = transactionManager2.getTransaction(new DefaultTransactionDefinition());
try {
// Simulate a database operation in db1
jdbcTemplate1.update("INSERT INTO table1 (id, name) VALUES (?, ?)", 1, "John");
// Simulate a database operation in db2
jdbcTemplate2.update("INSERT INTO table2 (id, description) VALUES (?, ?)", 1, "Test description");
// Step 2: Second phase - Commit if no errors
transactionManager1.commit(status1);
transactionManager2.commit(status2);
System.out.println("Transaction committed successfully in both databases.");
} catch (Exception e) {
// Rollback in case of any failure
transactionManager1.rollback(status1);
transactionManager2.rollback(status2);
System.out.println("Transaction failed, rolled back.");
}
}
}
1.3. 步骤 3:调用分布式事务
你可以在 Controller 或其他地方调用上述服务来开始事务。
@RestController
@RequestMapping("/transaction")
public class TransactionController {
@Autowired
private TwoPhaseCommitService twoPhaseCommitService;
@PostMapping("/start")
public String startTransaction() {
twoPhaseCommitService.beginTransaction();
return "Transaction started.";
}
}
2. 三阶段提交协议(3PC,Three-Phase Commit)
三阶段提交协议在两阶段提交协议的基础上进行了优化,减少了阻塞的问题,增加了容错性。
工作原理:
三阶段提交协议与两阶段提交类似,但加入了一个中间状态,以处理协调者和参与者之间的通信故障。
第一阶段(CanCommit阶段):
协调者向所有参与者发送 "CanCommit" 请求,询问是否可以提交事务。如果参与者准备好提交,就回复 "Yes";如果不能提交,则回复 "No"。
第二阶段(PreCommit阶段):
如果所有参与者都回复 "Yes",协调者向所有参与者发送 "PreCommit" 请求,准备提交事务,但还没有真正提交。
第三阶段(Commit阶段):
如果所有参与者都回复 "PreCommit",协调者向所有参与者发送 "Commit" 请求,表示可以最终提交事务。如果任何参与者在第二阶段后发生故障,协调者可以回滚事务。优缺点:
优点:
通过引入中间状态,减少了两阶段提交中可能的阻塞问题。缺点:
协议更复杂,涉及更多的通信和状态。性能可能受到更多的网络通信和协调操作的影响。三阶段提交协议实现实例
我们创建一个服务类 ThreePhaseCommitService 来处理协议的三个阶段:CanCommit、PreCommit 和 DoCommit。
2.1. ThreePhaseCommitService
@Service
public class ThreePhaseCommitService {
@Autowired
private PlatformTransactionManager transactionManager1;
@Autowired
private PlatformTransactionManager transactionManager2;
@Autowired
private JdbcTemplate jdbcTemplate1;
@Autowired
private JdbcTemplate jdbcTemplate2;
// 1. CanCommit 阶段:检查是否可以提交事务
public boolean canCommit() {
// 这里可以检查一些条件(例如,数据库中的状态)
return true; // 假设可以提交
}
// 2. PreCommit 阶段:参与者准备提交,锁定资源
public boolean preCommit() {
// 可以执行一些准备工作,如锁定资源等
try {
jdbcTemplate1.update("LOCK TABLE table1 WRITE");
jdbcTemplate2.update("LOCK TABLE table2 WRITE");
return true;
} catch (Exception e) {
return false; // 锁定资源失败,返回 false
}
}
// 3. DoCommit 阶段:提交事务
public void doCommit() {
// 这里可以提交事务
try {
jdbcTemplate1.update("INSERT INTO table1 (id, name) VALUES (?, ?)", 1, "John");
jdbcTemplate2.update("INSERT INTO table2 (id, description) VALUES (?, ?)", 1, "Test description");
} catch (Exception e) {
throw new RuntimeException("Commit failed", e);
}
}
// 开始执行三阶段提交
public void beginTransaction() {
if (!canCommit()) {
System.out.println("Transaction cannot proceed in CanCommit phase.");
return;
}
// Step 1: CanCommit phase
System.out.println("Phase 1: CanCommit - All participants are asked if they can commit.");
// 如果 CanCommit 阶段无法通过,直接返回
if (!preCommit()) {
System.out.println("Transaction failed in PreCommit phase.");
return;
}
// Step 2: PreCommit phase
System.out.println("Phase 2: PreCommit - Resources are locked for transaction.");
// 一旦 PreCommit 阶段成功,可以继续提交
doCommit(); // DoCommit
System.out.println("Phase 3: DoCommit - Commit transaction in both databases.");
}
}
3. 基于消息队列的最终一致性方案(异步消息机制)
分布式事务的另一种常见方式是基于消息队列的异步事务。这种方式的核心思想是:通过消息传递,确保系统中各个服务的最终一致性,而不是要求立即的一致性。
工作原理:
系统中的每个服务执行本地事务(即操作本地数据库)。操作完成后,服务通过消息队列将消息发送到其他服务,通知它们进行必要的操作。消息接收方服务接收到消息后,执行本地事务并返回执行结果。通过一些补偿机制(例如消息重试、回滚等)来保证各个服务之间的最终一致性。这种方式特别适合于 高可用性 和 高扩展性 的系统,在服务间没有严格的实时一致性要求时,使用异步消息机制可以实现更高的吞吐量和更低的延迟。
优缺点:
优点:
高可用、无阻塞:各个服务可以异步执行,减少了系统的负担。可以容忍一定程度的失败,通过消息重试或补偿机制来最终恢复一致性。缺点:
实现复杂:需要设计和实现补偿机制、消息幂等性等问题。一致性是最终的而非立即的:并非每个服务都能在同一时刻保持一致,可能需要等待一定时间才能达到一致性。 消息队列实现:用于处理分布式事务
为了简化模拟,我们将使用 RabbitMQ 来传递一些操作信息,从而触发跨数据库的事务处理。
3.1. 配置 RabbitMQ 消息队列
@Configuration
public class RabbitConfig {
@Bean
public Queue transactionQueue() {
return new Queue("transactionQueue", false);
}
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory factory = new CachingConnectionFactory("localhost");
factory.setUsername("guest");
factory.setPassword("guest");
return factory;
}
@Bean
public RabbitTemplate rabbitTemplate() {
return new RabbitTemplate(connectionFactory());
}
}
3.2. 发送消息触发事务
我们将通过消息队列将事务处理的控制信息发送到各个服务。
@Service
public class TransactionService {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private ThreePhaseCommitService threePhaseCommitService;
public void startTransaction() {
// 发送消息到队列,让其他服务知道需要开始事务
rabbitTemplate.convertAndSend("transactionQueue", "Start Transaction");
System.out.println("Message sent to start transaction.");
// 启动三阶段提交协议
threePhaseCommitService.beginTransaction();
}
}
3.3. 消费消息并执行事务
当 RabbitMQ 收到消息后,会触发事务处理。
@Component
public class TransactionMessageListener {
@Autowired
private ThreePhaseCommitService threePhaseCommitService;
@RabbitListener(queues = "transactionQueue")
public void handleTransactionMessage(String message) {
if (message.equals("Start Transaction")) {
System.out.println("Received message to start transaction.");
threePhaseCommitService.beginTransaction();
}
}
}
4. 控制器触发事务
最后,我们通过 HTTP 接口来启动分布式事务。
@RestController
@RequestMapping("/transaction")
public class TransactionController {
@Autowired
private TransactionService transactionService;
@PostMapping("/start")
public String startTransaction() {
transactionService.startTransaction();
return "Transaction started.";
}
}
4. TCC(Try-Confirm/Cancel)协议
TCC 是一种适用于分布式系统的补偿事务模型,通常用于在微服务架构中实现分布式事务。
工作原理:
TCC 将事务分为三个阶段:
Try:执行本地事务的预处理,如资源的占用和锁定,确保事务在后续可以正常完成。Confirm:确认事务操作,即执行真正的业务操作,确保资源最终被提交。Cancel:如果 Try 阶段无法成功,执行回滚操作,撤销资源的占用和锁定。TCC 协议的核心在于每个步骤都是幂等的,服务必须能够处理部分失败,并能够补偿操作。
优缺点:
优点:
支持强一致性,相比于最终一致性,它能够提供更高的可靠性。容错性强:失败时可以通过 Cancel 来补偿事务。缺点:
实现复杂,需要确保每个操作的幂等性,尤其是取消和确认阶段的处理。TCC 服务实现案例
在服务层,我们实现 TCC 的三个阶段。
4.1. Try、Confirm 和 Cancel 方法
@Service
public class TccService {
@Autowired
private JdbcTemplate jdbcTemplate1;
@Autowired
private JdbcTemplate jdbcTemplate2;
// Try 阶段:预备操作,锁定资源
public boolean tryTransaction() {
try {
// 锁定资源
jdbcTemplate1.update("LOCK TABLE table1 WRITE");
jdbcTemplate2.update("LOCK TABLE table2 WRITE");
return true; // 返回 true 表示 Try 阶段成功
} catch (Exception e) {
return false; // 失败则返回 false
}
}
// Confirm 阶段:确认提交事务
public void confirmTransaction() {
try {
// 执行实际的事务提交
jdbcTemplate1.update("INSERT INTO table1 (id, name) VALUES (?, ?)", 1, "Alice");
jdbcTemplate2.update("INSERT INTO table2 (id, description) VALUES (?, ?)", 1, "Test record");
} catch (Exception e) {
throw new RuntimeException("Confirm phase failed", e);
}
}
// Cancel 阶段:回滚操作
public void cancelTransaction() {
try {
// 释放资源或撤销操作
jdbcTemplate1.update("ROLLBACK");
jdbcTemplate2.update("ROLLBACK");
} catch (Exception e) {
throw new RuntimeException("Cancel phase failed", e);
}
}
// 执行 TCC 事务
public void executeTccTransaction() {
// Step 1: Try 阶段
if (!tryTransaction()) {
System.out.println("Try phase failed. Aborting transaction.");
cancelTransaction(); // Try 失败时回滚
return;
}
// Step 2: Confirm 阶段
try {
confirmTransaction();
System.out.println("Transaction committed successfully.");
} catch (Exception e) {
// Step 3: 如果 Confirm 失败,则执行 Cancel 阶段
System.out.println("Confirm phase failed. Rolling back.");
cancelTransaction();
}
}
}
在分布式系统中,消息队列用于服务间通信。以下是一个 RabbitMQ 消息队列的配置,用于触发 TCC 事务。
配置 RabbitMQ
@Configuration
public class RabbitConfig {
@Bean
public Queue transactionQueue() {
return new Queue("transactionQueue", false);
}
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory factory = new CachingConnectionFactory("localhost");
factory.setUsername("guest");
factory.setPassword("guest");
return factory;
}
@Bean
public RabbitTemplate rabbitTemplate() {
return new RabbitTemplate(connectionFactory());
}
}
消息发送触发 TCC 事务
@Service
public class TransactionService {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private TccService tccService;
public void startTransaction() {
// 发送消息到队列
rabbitTemplate.convertAndSend("transactionQueue", "Start TCC Transaction");
System.out.println("Message sent to start TCC transaction.");
// 执行 TCC 事务
tccService.executeTccTransaction();
}
}
消费者监听消息
@Component
public class TransactionMessageListener {
@Autowired
private TccService tccService;
@RabbitListener(queues = "transactionQueue")
public void handleTransactionMessage(String message) {
if (message.equals("Start TCC Transaction")) {
System.out.println("Received message to start TCC transaction.");
tccService.executeTccTransaction();
}
}
}
控制器接口
通过 HTTP 接口触发 TCC 事务。
@RestController
@RequestMapping("/transaction")
public class TransactionController {
@Autowired
private TransactionService transactionService;
@PostMapping("/start")
public String startTransaction() {
transactionService.startTransaction();
return "TCC Transaction started.";
}
}
5. Saga(补偿事务)
Saga 是另一种分布式事务处理模式,它通过将一个长事务分解为多个小事务(局部事务),每个局部事务的执行与提交都有对应的补偿操作来保证整个过程的一致性。
工作原理:
Saga 将事务拆分成多个小的子事务,每个子事务可以独立提交。如果某个子事务失败,则会通过执行补偿操作回滚已经成功的子事务,保证系统的一致性。
每个子事务都有一个补偿事务,如果某个子事务失败,会执行补偿事务来撤销已执行的操作。可以采用两种模式:
长事务补偿模式:每个子事务执行后都通过补偿来保证一致性。事件驱动模式:使用事件驱动来触发补偿操作。优缺点:
优点:
适用于长时间运行的分布式事务(如预订、支付等)。相比于传统的 2PC,Saga 更加灵活,可以容忍失败并进行补偿。缺点:
需要管理和设计补偿事务。可能需要增加一些业务复杂性,确保子事务能够独立回滚。5.1 Saga示例场景:预订服务
我们设计一个简化的订单系统,订单需要跨多个服务进行处理,比如:
库存服务:扣减库存。支付服务:支付金额。通知服务:发送通知。假设我们有以下几个步骤:
扣减库存。
扣款支付。
发送通知。如果其中的任意一步失败,前面已经成功的操作需要进行补偿(例如,退款和恢复库存)。
5.2 关键代码实现:Saga 模式
5.2.1 模拟服务实现
库存服务:
@Service
public class InventoryService {
@Autowired
private JdbcTemplate jdbcTemplate;
// 扣减库存
public boolean deductInventory(Long productId, int quantity) {
try {
jdbcTemplate.update("UPDATE inventory SET stock = stock - ? WHERE product_id = ? AND stock >= ?", quantity, productId, quantity);
return true;
} catch (Exception e) {
return false;
}
}
// 恢复库存(补偿操作)
public void compensateInventory(Long productId, int quantity) {
jdbcTemplate.update("UPDATE inventory SET stock = stock + ? WHERE product_id = ?", quantity, productId);
}
}
支付服务:
@Service
public class PaymentService {
@Autowired
private JdbcTemplate jdbcTemplate;
// 扣款操作
public boolean deductPayment(Long orderId, BigDecimal amount) {
try {
jdbcTemplate.update("UPDATE account SET balance = balance - ? WHERE account_id = (SELECT account_id FROM orders WHERE order_id = ?)", amount, orderId);
return true;
} catch (Exception e) {
return false;
}
}
// 退款操作(补偿)
public void refundPayment(Long orderId, BigDecimal amount) {
jdbcTemplate.update("UPDATE account SET balance = balance + ? WHERE account_id = (SELECT account_id FROM orders WHERE order_id = ?)", amount, orderId);
}
}
通知服务:
@Service
public class NotificationService {
// 发送订单确认通知
public boolean sendOrderConfirmation(Long orderId) {
try {
// 发送通知的实现
System.out.println("Sending order confirmation for orderId: " + orderId);
return true;
} catch (Exception e) {
return false;
}
}
// 发送退款通知(补偿)
public void sendRefundNotification(Long orderId) {
System.out.println("Sending refund notification for orderId: " + orderId);
}
}
5.3 Saga 管理器
在 Saga 模式中,我们需要一个服务来协调这些局部事务,并确保它们的顺利执行。如果某个局部事务失败,我们需要执行相应的补偿操作。
@Service
public class OrderSagaService {
@Autowired
private InventoryService inventoryService;
@Autowired
private PaymentService paymentService;
@Autowired
private NotificationService notificationService;
// 执行整个订单事务
public void processOrder(Long orderId, Long productId, int quantity, BigDecimal amount) {
// 1. 扣减库存
if (!inventoryService.deductInventory(productId, quantity)) {
System.out.println("Failed to deduct inventory, compensating...");
return; // 结束事务
}
// 2. 扣款支付
if (!paymentService.deductPayment(orderId, amount)) {
System.out.println("Failed to deduct payment, compensating...");
// 执行补偿操作:恢复库存
inventoryService.compensateInventory(productId, quantity);
return; // 结束事务
}
// 3. 发送通知
if (!notificationService.sendOrderConfirmation(orderId)) {
System.out.println("Failed to send notification, compensating...");
// 执行补偿操作:退款并恢复库存
paymentService.refundPayment(orderId, amount);
inventoryService.compensateInventory(productId, quantity);
return; // 结束事务
}
System.out.println("Order processed successfully!");
}
}
控制器接口
通过 HTTP 接口触发订单处理。
@RestController
@RequestMapping("/order")
public class OrderController {
@Autowired
private OrderSagaService orderSagaService;
@PostMapping("/process")
public String processOrder(@RequestParam Long orderId, @RequestParam Long productId, @RequestParam int quantity, @RequestParam BigDecimal amount) {
orderSagaService.processOrder(orderId, productId, quantity, amount);
return "Order processing started.";
}
}
处理流程
用户发起订单请求,通过 OrderController 调用 OrderSagaService。OrderSagaService 按照顺序调用三个服务:
库存服务:扣减库存,成功则继续;失败则执行补偿。支付服务:扣减支付,成功则继续;失败则执行补偿(退款并恢复库存)。通知服务:发送通知,成功则完成事务;失败则执行补偿(退款、恢复库存,并发送退款通知)。如果所有步骤成功,则最终提交。如果有步骤失败,Saga 会执行补偿操作,回滚前面的步骤,保证数据一致性。总结
不同的分布式事务方案适用于不同的场景和需求:
2PC 适用于需要严格一致性的场景,但它存在阻塞和性能问题。3PC 在 2PC 的基础上解决了阻塞问题,但依然存在一些性能和复杂性问题。基于消息队列的最终一致性 适用于高可用、低延迟要求的系统,适合“最终一致性”的场景。TCC 和 Saga 适合于复杂的分布式事务,需要确保数据的准确性和一致性,尤其是在微服务架构中。选择哪种方案取决于你系统的特性、可用性需求、性能要求以及一致性保证的程度。