Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement the redis features of subscribe, psubscribe, spublish #4746

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

POABOB
Copy link
Contributor

@POABOB POABOB commented Mar 30, 2025

Issue #4708, #4653.

Why add cmd interface

The reason why I add Subscribe and PSubscribe interface is that every time I need to use Subscribe/PSubscribe, I have to use switch case to change the type of redis client.

	RedisNode interface {
		red.Cmdable
		red.BitMapCmdable

		Subscribe(ctx context.Context, channels ...string) *red.PubSub
		PSubscribe(ctx context.Context, patterns ...string) *red.PubSub
	}

	// If we don't add Subscribe/PSubscribe to interface, I need to use switch case to change the type of redis client.
	var pubsub *redis.PubSub
	switch c := conn.(type) {
	case *redis.Client:
		pubsub = c.Subscribe(ctx, channels...)
	case *redis.ClusterClient:
		pubsub = c.Subscribe(ctx, channels...)
	default:
		return nil, fmt.Errorf("unsupported redis connection type for subscription")
	}

Discuss how to encapsulate the subscribe and psubscribe

  1. return the redis primitive PubSub type and user handle with its needs
// Subscribe subscribes to one or more specific channels.
// Returns a PubSub object to receive messages, or an error if it fails.
func (s *Redis) Subscribe(channels ...string) (*red.PubSub, error) {
	return s.SubscribeCtx(context.Background(), channels...)
}

// SubscribeCtx subscribes to one or more specific channels with context control.
// Returns a PubSub object to receive messages, or an error if it fails.
func (s *Redis) SubscribeCtx(ctx context.Context, channels ...string) (*red.PubSub, error) {
	conn, err := getRedis(s)
	if err != nil {
		return nil, err
	}
	return conn.Subscribe(ctx, channels...), nil
}

func exampleSubscribe(client *Redis) {
	ctx := context.Background()

	// Subscribe to "my.channel"
	pubSub, err := client.SubscribeCtx(ctx, "my.channel")
	if err != nil {
		fmt.Printf("Subscribe failed: %v\n", err)
		return
	}
	defer pubSub.Close()

	// Start a goroutine to publish a message
	go func() {
		time.Sleep(100 * time.Millisecond) // Wait for subscription to be ready
		client.Publish(ctx, "my.channel", "Hello from Subscribe!")
	}()

	// Receive messages
	ch := pubSub.Channel()
	for msg := range ch {
		fmt.Printf("Received from Subscribe: Channel=%s, Message=%s\n", msg.Channel, msg.Payload)
		break // Exit after one message for simplicity
	}
}
  1. user pass the params of func to handle the message
func (s *Redis) Subscribe(ctx context.Context, handler func(channel string, message string), channels ...string) (<-chan error, error) {
    conn, err := getRedis(s)
    if err != nil {
        return nil, err
    }

    pubsub := conn.Subscribe(ctx, channels...) // 直接調用接口方法
    errChan := make(chan error, 1)

    go func() {
        defer pubsub.Close()
        ch := pubsub.Channel()
        for {
            select {
            case <-ctx.Done():
                return
            case msg, ok := <-ch:
                if !ok {
                    errChan <- fmt.Errorf("subscription channel closed")
                    return
                }
                if msg != nil {
                    handler(msg.Channel, msg.Payload)
                }
            }
        }
    }()

    return errChan, nil
}

func exampleSubscribe(client *Redis) {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    errChan, err := r.Subscribe(ctx, func(channel, message string) {
        fmt.Printf("Received message from %s: %s\n", channel, message)
    }, "channel1", "channel2")
    if err != nil {
        log.Fatal(err)
    }

    // 監聽錯誤
    go func() {
        for err := range errChan {
            fmt.Println("Subscription error:", err)
        }
    }()

    // 發布消息
    r.Publish(ctx, "channel1", "Hello, world!")
}

SPublish

SPublish is the method of sending message to a sharding channel. In the other words, Publish will broadcast the whole cluster to publish, but SPublish won't.
It cost less network but it only support redis above v7.0. When I test with the miniredis lib, it didn't support it...

Copy link

codecov bot commented Mar 30, 2025

Codecov Report

Attention: Patch coverage is 95.83333% with 1 line in your changes missing coverage. Please review.

Project coverage is 94.58%. Comparing base (8690859) to head (4bd0450).
Report is 291 commits behind head on master.

Files with missing lines Patch % Lines
core/stores/redis/redis.go 95.83% 1 Missing ⚠️
Additional details and impacted files
Files with missing lines Coverage Δ
core/stores/redis/redis.go 99.50% <95.83%> (-0.28%) ⬇️

... and 14 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant