数据库事务:保证数据一致性的关键
在数据库操作中,我们经常需要执行多个相关的操作,这些操作要么全部成功,要么全部失败。事务(Transaction)就是用来保证这种原子性的机制。
本文将深入探讨数据库事务的原理和在 Go 中的实现。
什么是事务?
事务是一组数据库操作的逻辑单元,具有以下 ACID 特性:
- 原子性(Atomicity):事务中的操作要么全部成功,要么全部失败
- 一致性(Consistency):事务执行前后,数据库保持一致状态
- 隔离性(Isolation):并发事务之间互不干扰
- 持久性(Durability):事务提交后,数据永久保存
基础事务操作
package main
import (
"context"
"database/sql"
"log"
_ "github.com/go-sql-driver/mysql"
)
func main() {
db, err := sql.Open("mysql", "user:password@/dbname")
if err != nil {
log.Fatal(err)
}
defer db.Close()
// 开始事务
tx, err := db.Begin()
if err != nil {
log.Fatal(err)
}
// 执行操作
_, err = tx.Exec("UPDATE accounts SET balance = balance - 100 WHERE id = 1")
if err != nil {
tx.Rollback()
log.Fatal(err)
}
_, err = tx.Exec("UPDATE accounts SET balance = balance + 100 WHERE id = 2")
if err != nil {
tx.Rollback()
log.Fatal(err)
}
// 提交事务
err = tx.Commit()
if err != nil {
log.Fatal(err)
}
log.Println("Transaction committed")
}
使用 defer 确保回滚
func transfer(db *sql.DB, fromID, toID int, amount float64) (err error) {
tx, err := db.Begin()
if err != nil {
return err
}
// 确保在函数退出时回滚(如果未提交)
defer func() {
if err != nil {
tx.Rollback()
return
}
err = tx.Commit()
}()
// 扣款
_, err = tx.Exec("UPDATE accounts SET balance = balance - ? WHERE id = ?", amount, fromID)
if err != nil {
return err
}
// 检查余额
var balance float64
err = tx.QueryRow("SELECT balance FROM accounts WHERE id = ?", fromID).Scan(&balance)
if err != nil {
return err
}
if balance < 0 {
return fmt.Errorf("insufficient balance")
}
// 入账
_, err = tx.Exec("UPDATE accounts SET balance = balance + ? WHERE id = ?", amount, toID)
if err != nil {
return err
}
return nil
}
事务与 Context
func transferWithContext(ctx context.Context, db *sql.DB, fromID, toID int, amount float64) error {
// 使用 context 开始事务
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()
// 执行操作(带 context)
_, err = tx.ExecContext(ctx,
"UPDATE accounts SET balance = balance - ? WHERE id = ?",
amount, fromID)
if err != nil {
return err
}
_, err = tx.ExecContext(ctx,
"UPDATE accounts SET balance = balance + ? WHERE id = ?",
amount, toID)
if err != nil {
return err
}
return tx.Commit()
}
事务隔离级别
// 设置隔离级别
tx, err := db.BeginTx(ctx, &sql.TxOptions{
Isolation: sql.LevelSerializable,
})
// 常用隔离级别
// LevelReadUncommitted - 读未提交
// LevelReadCommitted - 读已提交
// LevelRepeatableRead - 可重复读
// LevelSerializable - 串行化
隔离级别与并发问题
// 脏读(Dirty Read)
// 事务 A 读取了事务 B 未提交的数据
// 不可重复读(Non-repeatable Read)
// 事务 A 两次读取同一数据,结果不同(事务 B 修改了数据)
// 幻读(Phantom Read)
// 事务 A 两次查询,结果集不同(事务 B 插入了新数据)
乐观锁与悲观锁
悲观锁
// 使用 SELECT ... FOR UPDATE
func updateWithPessimisticLock(ctx context.Context, db *sql.DB, id int, amount float64) error {
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()
// 锁定行
var balance float64
err = tx.QueryRowContext(ctx,
"SELECT balance FROM accounts WHERE id = ? FOR UPDATE", id).
Scan(&balance)
if err != nil {
return err
}
// 更新
newBalance := balance + amount
_, err = tx.ExecContext(ctx,
"UPDATE accounts SET balance = ? WHERE id = ?",
newBalance, id)
if err != nil {
return err
}
return tx.Commit()
}
乐观锁
// 使用版本号
func updateWithOptimisticLock(ctx context.Context, db *sql.DB, id int, amount float64) error {
for i := 0; i < 3; i++ { // 最多重试 3 次
// 读取当前值和版本号
var balance float64
var version int
err := db.QueryRowContext(ctx,
"SELECT balance, version FROM accounts WHERE id = ?", id).
Scan(&balance, &version)
if err != nil {
return err
}
// 更新(带版本号检查)
newBalance := balance + amount
result, err := db.ExecContext(ctx,
"UPDATE accounts SET balance = ?, version = version + 1 WHERE id = ? AND version = ?",
newBalance, id, version)
if err != nil {
return err
}
rows, err := result.RowsAffected()
if err != nil {
return err
}
if rows > 0 {
return nil // 更新成功
}
// 版本号不匹配,重试
time.Sleep(100 * time.Millisecond)
}
return fmt.Errorf("failed to update after retries")
}
嵌套事务(Savepoint)
func nestedTransaction(ctx context.Context, db *sql.DB) error {
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()
// 外层事务操作
_, err = tx.ExecContext(ctx, "INSERT INTO orders (user_id) VALUES (1)")
if err != nil {
return err
}
// 创建保存点
_, err = tx.ExecContext(ctx, "SAVEPOINT sp1")
if err != nil {
return err
}
// 内层事务操作
_, err = tx.ExecContext(ctx, "INSERT INTO order_items (order_id, product_id) VALUES (1, 1)")
if err != nil {
// 回滚到保存点
tx.ExecContext(ctx, "ROLLBACK TO SAVEPOINT sp1")
log.Println("Rolled back to savepoint")
}
// 继续外层事务
_, err = tx.ExecContext(ctx, "UPDATE users SET order_count = order_count + 1 WHERE id = 1")
if err != nil {
return err
}
return tx.Commit()
}
实战:订单系统
package main
import (
"context"
"database/sql"
"fmt"
"log"
"time"
)
type OrderService struct {
db *sql.DB
}
func NewOrderService(db *sql.DB) *OrderService {
return &OrderService{db: db}
}
// CreateOrder 创建订单(事务)
func (s *OrderService) CreateOrder(ctx context.Context, userID int, items []OrderItem) error {
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()
// 1. 创建订单
var orderID int
err = tx.QueryRowContext(ctx,
"INSERT INTO orders (user_id, created_at) VALUES (?, ?) RETURNING id",
userID, time.Now()).
Scan(&orderID)
if err != nil {
return fmt.Errorf("create order: %w", err)
}
// 2. 创建订单项并扣减库存
totalAmount := 0.0
for _, item := range items {
// 检查库存(悲观锁)
var stock int
var price float64
err = tx.QueryRowContext(ctx,
"SELECT stock, price FROM products WHERE id = ? FOR UPDATE",
item.ProductID).
Scan(&stock, &price)
if err != nil {
return fmt.Errorf("check stock: %w", err)
}
if stock < item.Quantity {
return fmt.Errorf("insufficient stock for product %d", item.ProductID)
}
// 扣减库存
_, err = tx.ExecContext(ctx,
"UPDATE products SET stock = stock - ? WHERE id = ?",
item.Quantity, item.ProductID)
if err != nil {
return fmt.Errorf("update stock: %w", err)
}
// 创建订单项
amount := price * float64(item.Quantity)
_, err = tx.ExecContext(ctx,
"INSERT INTO order_items (order_id, product_id, quantity, price, amount) VALUES (?, ?, ?, ?, ?)",
orderID, item.ProductID, item.Quantity, price, amount)
if err != nil {
return fmt.Errorf("create order item: %w", err)
}
totalAmount += amount
}
// 3. 更新订单总金额
_, err = tx.ExecContext(ctx,
"UPDATE orders SET total_amount = ? WHERE id = ?",
totalAmount, orderID)
if err != nil {
return fmt.Errorf("update order amount: %w", err)
}
// 4. 记录用户购买历史
_, err = tx.ExecContext(ctx,
"INSERT INTO purchase_history (user_id, order_id, amount) VALUES (?, ?, ?)",
userID, orderID, totalAmount)
if err != nil {
return fmt.Errorf("record purchase: %w", err)
}
return tx.Commit()
}
type OrderItem struct {
ProductID int
Quantity int
}
func main() {
db, err := sql.Open("mysql", "user:password@/shop")
if err != nil {
log.Fatal(err)
}
defer db.Close()
service := NewOrderService(db)
ctx := context.Background()
items := []OrderItem{
{ProductID: 1, Quantity: 2},
{ProductID: 2, Quantity: 1},
}
err = service.CreateOrder(ctx, 123, items)
if err != nil {
log.Printf("Create order failed: %v", err)
return
}
log.Println("Order created successfully")
}
事务最佳实践
1. 保持事务简短
// ❌ 不好:事务中包含耗时操作
func badTransaction(db *sql.DB) error {
tx, err := db.Begin()
if err != nil {
return err
}
defer tx.Rollback()
_, err = tx.Exec("UPDATE accounts SET balance = balance - 100 WHERE id = 1")
if err != nil {
return err
}
// 耗时的外部调用
err = callExternalService() // 可能很慢
if err != nil {
return err
}
_, err = tx.Exec("UPDATE accounts SET balance = balance + 100 WHERE id = 2")
if err != nil {
return err
}
return tx.Commit()
}
// ✅ 好:事务只包含数据库操作
func goodTransaction(db *sql.DB) error {
// 先执行外部调用
err := callExternalService()
if err != nil {
return err
}
// 再执行事务
tx, err := db.Begin()
if err != nil {
return err
}
defer tx.Rollback()
_, err = tx.Exec("UPDATE accounts SET balance = balance - 100 WHERE id = 1")
if err != nil {
return err
}
_, err = tx.Exec("UPDATE accounts SET balance = balance + 100 WHERE id = 2")
if err != nil {
return err
}
return tx.Commit()
}
2. 处理死锁
func handleDeadlock(db *sql.DB, maxRetries int) error {
for i := 0; i < maxRetries; i++ {
err := doTransaction(db)
if err == nil {
return nil
}
// 检查是否是死锁错误
if isDeadlockError(err) {
log.Printf("Deadlock detected, retry %d/%d", i+1, maxRetries)
time.Sleep(time.Duration(i+1) * 100 * time.Millisecond)
continue
}
return err
}
return fmt.Errorf("max retries exceeded")
}
func isDeadlockError(err error) bool {
// MySQL: Error 1213
// PostgreSQL: Error 40P01
return strings.Contains(err.Error(), "deadlock")
}
3. 使用连接池
db.SetMaxOpenConns(25)
db.SetMaxIdleConns(5)
db.SetConnMaxLifetime(5 * time.Minute)
总结
事务是保证数据一致性的关键机制:
- ACID 特性:原子性、一致性、隔离性、持久性
- 隔离级别:从低到高依次为读未提交、读已提交、可重复读、串行化
- 并发控制:悲观锁(SELECT FOR UPDATE)和乐观锁(版本号)
- 最佳实践:保持事务简短、处理死锁、使用连接池
记住:事务虽然强大,但也会带来性能开销。要在一致性和性能之间找到平衡。
继续阅读
探索更多技术文章
浏览归档,发现更多关于系统设计、工具链和工程实践的内容。