Redis 集成:高性能缓存与数据存储
Redis 是一个高性能的内存数据库,广泛用于缓存、会话存储、消息队列等场景。在 Go 应用中集成 Redis,可以显著提升系统性能和响应速度。
本文将介绍如何使用 go-redis 库与 Redis 交互,并实现常见的应用场景。
安装 go-redis
go get github.com/redis/go-redis/v9
基础连接
package main
import (
"context"
"fmt"
"log"
"github.com/redis/go-redis/v9"
)
var ctx = context.Background()
func main() {
// 创建 Redis 客户端
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "", // 无密码
DB: 0, // 默认数据库
})
// 测试连接
pong, err := rdb.Ping(ctx).Result()
if err != nil {
log.Fatal(err)
}
fmt.Println("Connected to Redis:", pong)
}
基本数据类型操作
String(字符串)
// SET
err := rdb.Set(ctx, "username", "alice", 0).Err()
if err != nil {
panic(err)
}
// GET
val, err := rdb.Get(ctx, "username").Result()
if err == redis.Nil {
fmt.Println("Key does not exist")
} else if err != nil {
panic(err)
} else {
fmt.Println("username:", val)
}
// SET with expiration (1 hour)
err = rdb.Set(ctx, "session:abc123", "user_data", 1*time.Hour).Err()
// SETNX (SET if Not eXists)
ok, err := rdb.SetNX(ctx, "counter", 1, 0).Result()
if ok {
fmt.Println("Key was set")
}
// INCR / DECR
rdb.Set(ctx, "visits", 0, 0)
rdb.Incr(ctx, "visits")
rdb.IncrBy(ctx, "visits", 10)
visits, _ := rdb.Get(ctx, "visits").Int()
fmt.Println("Visits:", visits) // 11
Hash(哈希)
// HSET
rdb.HSet(ctx, "user:123", map[string]interface{}{
"name": "Alice",
"email": "alice@example.com",
"age": 30,
})
// HGET
name, err := rdb.HGet(ctx, "user:123", "name").Result()
fmt.Println("Name:", name)
// HGETALL
user, err := rdb.HGetAll(ctx, "user:123").Result()
fmt.Println("User:", user)
// HMGET (get multiple fields)
vals, err := rdb.HMGet(ctx, "user:123", "name", "email").Result()
fmt.Println("Values:", vals)
// HINCRBY
rdb.HIncrBy(ctx, "user:123", "login_count", 1)
List(列表)
// LPUSH (left push)
rdb.LPush(ctx, "queue:tasks", "task1", "task2", "task3")
// RPUSH (right push)
rdb.RPush(ctx, "queue:tasks", "task4")
// LPOP (left pop)
task, err := rdb.LPop(ctx, "queue:tasks").Result()
fmt.Println("Task:", task)
// RPOP (right pop)
task, err = rdb.RPop(ctx, "queue:tasks").Result()
// LRANGE (get range)
tasks, err := rdb.LRange(ctx, "queue:tasks", 0, -1).Result()
fmt.Println("All tasks:", tasks)
// LLEN (length)
length, err := rdb.LLen(ctx, "queue:tasks").Result()
fmt.Println("Queue length:", length)
// BRPOP (blocking pop, timeout 10 seconds)
result, err := rdb.BRPop(ctx, 10*time.Second, "queue:tasks").Result()
if err == redis.Nil {
fmt.Println("Timeout")
} else {
fmt.Println("Popped:", result[1])
}
Set(集合)
// SADD
rdb.SAdd(ctx, "tags:post:1", "go", "redis", "tutorial")
// SMEMBERS
tags, err := rdb.SMembers(ctx, "tags:post:1").Result()
fmt.Println("Tags:", tags)
// SISMEMBER
isMember, err := rdb.SIsMember(ctx, "tags:post:1", "go").Result()
fmt.Println("Has 'go' tag:", isMember)
// SINTER (intersection)
rdb.SAdd(ctx, "tags:post:2", "go", "web", "api")
common, err := rdb.SInter(ctx, "tags:post:1", "tags:post:2").Result()
fmt.Println("Common tags:", common)
// SUNION (union)
all, err := rdb.SUnion(ctx, "tags:post:1", "tags:post:2").Result()
fmt.Println("All tags:", all)
// SREM (remove)
rdb.SRem(ctx, "tags:post:1", "tutorial")
Sorted Set(有序集合)
// ZADD
rdb.ZAdd(ctx, "leaderboard",
redis.Z{Score: 100, Member: "alice"},
redis.Z{Score: 200, Member: "bob"},
redis.Z{Score: 150, Member: "charlie"},
)
// ZRANGE (get range by index, ascending)
leaders, err := rdb.ZRange(ctx, "leaderboard", 0, -1).Result()
fmt.Println("Leaders:", leaders)
// ZREVRANGE (get range by index, descending)
topLeaders, err := rdb.ZRevRange(ctx, "leaderboard", 0, 2).Result()
fmt.Println("Top 3:", topLeaders)
// ZRANGEBYSCORE (get range by score)
highScorers, err := rdb.ZRangeByScore(ctx, "leaderboard", &redis.ZRangeBy{
Min: "150",
Max: "+inf",
}).Result()
fmt.Println("High scorers:", highScorers)
// ZINCRBY (increment score)
rdb.ZIncrBy(ctx, "leaderboard", 50, "alice")
// ZRANK (get rank, 0-based)
rank, err := rdb.ZRank(ctx, "leaderboard", "bob").Result()
fmt.Println("Bob's rank:", rank+1)
实战应用
1. 缓存层
package main
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/redis/go-redis/v9"
)
type Cache struct {
rdb *redis.Client
}
func NewCache(rdb *redis.Client) *Cache {
return &Cache{rdb: rdb}
}
// Get 从缓存获取
func (c *Cache) Get(ctx context.Context, key string, dest interface{}) error {
val, err := c.rdb.Get(ctx, key).Result()
if err == redis.Nil {
return fmt.Errorf("cache miss")
}
if err != nil {
return err
}
return json.Unmarshal([]byte(val), dest)
}
// Set 设置缓存
func (c *Cache) Set(ctx context.Context, key string, value interface{}, expiration time.Duration) error {
data, err := json.Marshal(value)
if err != nil {
return err
}
return c.rdb.Set(ctx, key, data, expiration).Err()
}
// Delete 删除缓存
func (c *Cache) Delete(ctx context.Context, key string) error {
return c.rdb.Del(ctx, key).Err()
}
// GetOrSet 获取或设置缓存
func (c *Cache) GetOrSet(ctx context.Context, key string, dest interface{},
loader func() (interface{}, error), expiration time.Duration) error {
// 尝试从缓存获取
err := c.Get(ctx, key, dest)
if err == nil {
return nil // 缓存命中
}
// 缓存未命中,从数据源加载
value, err := loader()
if err != nil {
return err
}
// 设置缓存
if err := c.Set(ctx, key, value, expiration); err != nil {
return err
}
// 将值复制到 dest
data, _ := json.Marshal(value)
return json.Unmarshal(data, dest)
}
// 使用示例
type User struct {
ID int `json:"id"`
Name string `json:"name"`
Email string `json:"email"`
}
func getUserFromDB(id int) (*User, error) {
// 模拟数据库查询
time.Sleep(100 * time.Millisecond)
return &User{ID: id, Name: "Alice", Email: "alice@example.com"}, nil
}
func main() {
rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
cache := NewCache(rdb)
ctx := context.Background()
var user User
err := cache.GetOrSet(ctx, "user:123", &user, func() (interface{}, error) {
return getUserFromDB(123)
}, 1*time.Hour)
if err != nil {
panic(err)
}
fmt.Printf("User: %+v\n", user)
}
2. 会话存储
package main
import (
"context"
"crypto/rand"
"encoding/hex"
"encoding/json"
"net/http"
"time"
"github.com/redis/go-redis/v9"
)
type Session struct {
UserID int `json:"user_id"`
Username string `json:"username"`
CreatedAt time.Time `json:"created_at"`
}
type SessionStore struct {
rdb *redis.Client
}
func NewSessionStore(rdb *redis.Client) *SessionStore {
return &SessionStore{rdb: rdb}
}
func (s *SessionStore) generateSessionID() string {
bytes := make([]byte, 32)
rand.Read(bytes)
return hex.EncodeToString(bytes)
}
// CreateSession 创建会话
func (s *SessionStore) CreateSession(ctx context.Context, userID int, username string) (string, error) {
sessionID := s.generateSessionID()
session := Session{
UserID: userID,
Username: username,
CreatedAt: time.Now(),
}
data, err := json.Marshal(session)
if err != nil {
return "", err
}
key := "session:" + sessionID
err = s.rdb.Set(ctx, key, data, 24*time.Hour).Err()
if err != nil {
return "", err
}
return sessionID, nil
}
// GetSession 获取会话
func (s *SessionStore) GetSession(ctx context.Context, sessionID string) (*Session, error) {
key := "session:" + sessionID
data, err := s.rdb.Get(ctx, key).Result()
if err == redis.Nil {
return nil, nil
}
if err != nil {
return nil, err
}
var session Session
err = json.Unmarshal([]byte(data), &session)
if err != nil {
return nil, err
}
// 刷新过期时间
s.rdb.Expire(ctx, key, 24*time.Hour)
return &session, nil
}
// DeleteSession 删除会话
func (s *SessionStore) DeleteSession(ctx context.Context, sessionID string) error {
key := "session:" + sessionID
return s.rdb.Del(ctx, key).Err()
}
var sessionStore *SessionStore
func sessionMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
cookie, err := r.Cookie("session_id")
if err == nil {
session, err := sessionStore.GetSession(r.Context(), cookie.Value)
if err == nil && session != nil {
// 将会话信息添加到 context
ctx := context.WithValue(r.Context(), "session", session)
r = r.WithContext(ctx)
}
}
next.ServeHTTP(w, r)
})
}
func loginHandler(w http.ResponseWriter, r *http.Request) {
// 验证用户凭证(简化)
userID := 123
username := "alice"
// 创建会话
sessionID, err := sessionStore.CreateSession(r.Context(), userID, username)
if err != nil {
http.Error(w, "Failed to create session", http.StatusInternalServerError)
return
}
// 设置 Cookie
http.SetCookie(w, &http.Cookie{
Name: "session_id",
Value: sessionID,
Path: "/",
HttpOnly: true,
Secure: true,
MaxAge: 86400, // 24 hours
})
w.Write([]byte("Login successful"))
}
func profileHandler(w http.ResponseWriter, r *http.Request) {
session := r.Context().Value("session").(*Session)
if session == nil {
http.Error(w, "Not authenticated", http.StatusUnauthorized)
return
}
w.Write([]byte(fmt.Sprintf("Welcome, %s!", session.Username)))
}
func logoutHandler(w http.ResponseWriter, r *http.Request) {
cookie, err := r.Cookie("session_id")
if err == nil {
sessionStore.DeleteSession(r.Context(), cookie.Value)
}
// 删除 Cookie
http.SetCookie(w, &http.Cookie{
Name: "session_id",
Value: "",
Path: "/",
MaxAge: -1,
})
w.Write([]byte("Logged out"))
}
func main() {
rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
sessionStore = NewSessionStore(rdb)
mux := http.NewServeMux()
mux.HandleFunc("/login", loginHandler)
mux.HandleFunc("/logout", logoutHandler)
mux.Handle("/profile", sessionMiddleware(http.HandlerFunc(profileHandler)))
http.ListenAndServe(":8080", mux)
}
3. 分布式锁
package main
import (
"context"
"fmt"
"time"
"github.com/redis/go-redis/v9"
)
type RedisLock struct {
rdb *redis.Client
}
func NewRedisLock(rdb *redis.Client) *RedisLock {
return &RedisLock{rdb: rdb}
}
// Lock 获取锁
func (l *RedisLock) Lock(ctx context.Context, key string, value string, expiration time.Duration) (bool, error) {
return l.rdb.SetNX(ctx, key, value, expiration).Result()
}
// Unlock 释放锁
func (l *RedisLock) Unlock(ctx context.Context, key string, value string) error {
// 使用 Lua 脚本确保只释放自己的锁
script := redis.NewScript(`
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
`)
result, err := script.Run(ctx, l.rdb, []string{key}, value).Int()
if err != nil {
return err
}
if result == 0 {
return fmt.Errorf("lock not held or already expired")
}
return nil
}
// 使用示例
func main() {
rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
lock := NewRedisLock(rdb)
ctx := context.Background()
lockKey := "lock:order:123"
lockValue := fmt.Sprintf("%d", time.Now().UnixNano())
// 尝试获取锁
acquired, err := lock.Lock(ctx, lockKey, lockValue, 10*time.Second)
if err != nil {
panic(err)
}
if !acquired {
fmt.Println("Failed to acquire lock")
return
}
defer func() {
// 释放锁
if err := lock.Unlock(ctx, lockKey, lockValue); err != nil {
fmt.Println("Failed to release lock:", err)
}
}()
// 执行受保护的操作
fmt.Println("Processing order...")
time.Sleep(5 * time.Second)
fmt.Println("Order processed")
}
4. 消息队列
package main
import (
"context"
"fmt"
"time"
"github.com/redis/go-redis/v9"
)
type MessageQueue struct {
rdb *redis.Client
}
func NewMessageQueue(rdb *redis.Client) *MessageQueue {
return &MessageQueue{rdb: rdb}
}
// Publish 发布消息
func (q *MessageQueue) Publish(ctx context.Context, channel string, message string) error {
return q.rdb.Publish(ctx, channel, message).Err()
}
// Subscribe 订阅消息
func (q *MessageQueue) Subscribe(ctx context.Context, channels ...string) *redis.PubSub {
return q.rdb.Subscribe(ctx, channels...)
}
// Producer 生产者
func producer(rdb *redis.Client) {
mq := NewMessageQueue(rdb)
ctx := context.Background()
for i := 0; i < 10; i++ {
message := fmt.Sprintf("Message %d", i)
err := mq.Publish(ctx, "notifications", message)
if err != nil {
fmt.Println("Publish error:", err)
continue
}
fmt.Println("Published:", message)
time.Sleep(1 * time.Second)
}
}
// Consumer 消费者
func consumer(rdb *redis.Client) {
mq := NewMessageQueue(rdb)
ctx := context.Background()
sub := mq.Subscribe(ctx, "notifications")
defer sub.Close()
ch := sub.Channel()
for msg := range ch {
fmt.Println("Received:", msg.Payload)
}
}
func main() {
rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
// 在不同的 goroutine 中运行生产者和消费者
go producer(rdb)
consumer(rdb)
}
连接池配置
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "",
DB: 0,
PoolSize: 10, // 连接池大小
MinIdleConns: 5, // 最小空闲连接数
MaxIdleConns: 10, // 最大空闲连接数
PoolTimeout: 4 * time.Second, // 连接池超时
IdleTimeout: 5 * time.Minute, // 空闲连接超时
})
总结
Redis 在 Go 应用中扮演着重要角色,主要应用场景包括:
- 缓存:减少数据库压力,提升响应速度
- 会话存储:支持分布式会话管理
- 消息队列:实现发布/订阅和任务队列
- 分布式锁:协调分布式系统中的并发操作
- 计数器:实时统计和限流
- 排行榜:有序集合实现排名系统
最佳实践:
- 合理设置过期时间,避免内存泄漏
- 使用连接池提高性能
- 实现缓存穿透、雪崩、击穿的防护
- 监控 Redis 内存使用情况
- 使用 Pipeline 批量操作提升性能
记住:Redis 是内存数据库,要注意数据持久化和内存管理。
继续阅读
探索更多技术文章
浏览归档,发现更多关于系统设计、工具链和工程实践的内容。