一、说明

1.redis是支持分布式、高并发,但redis做消息队列要明白缓存的数据有可能丢失,并不能做为专业的mq消息中间件。

2.实现方式,生产者将消息发送到redis某个队列,消费者从redis队列中取出一个消息进行消费。消息只能给到一个线程并消费,没有确认机制。

二、代码实现

1.先下载对应的redis操作包,用于与redis建立tcp连接,并将发送接收封装到一个包里

package redispkgimport ("fmt"mredis "github.com/garyburd/redigo/redis"
)type MyRedisTool struct {Addr     stringPass     stringDataBase intConn     mredis.Conn
}
// RedisInit 与redis建立连接
func (r *MyRedisTool) RedisInit() {conn, err := mredis.Dial("tcp", r.Addr, mredis.DialPassword(r.Pass), mredis.DialDatabase(r.DataBase))if err != nil {fmt.Println("redis conn fail:", err)return}r.Conn = conn
}// SendMsg 发送消息 queueName-队列名称
func (r *MyRedisTool) SendMsg(queueName, msg string) error {_, err := r.Conn.Do("rpush", queueName, msg)return err
}// GetMsg 接收消息 queueName-队列名称
func (r *MyRedisTool) GetMsg(queueName string) (msg string, err error) {return mredis.String(r.Conn.Do("lpop", queueName))
}

2.生产者发送消息

package mainimport ("fmt""go_code/code/redispkg"
)func main() {// 初始化redis操作对象var r redispkg.MyRedisToolr.Addr = "127.0.0.1:6379"r.Pass = ""r.DataBase = 0r.RedisInit()// 循环发送消息(根据实际业务发送)for i := 0; i < 10; i++ {err := r.SendMsg("test", strconv.Itoa(i))if err != nil {r.Conn.Close()fmt.Println("send msg fail:", err)break}}r.Conn.Close()
}

2.消费者接收消息

package mainimport ("fmt""go_code/code/redispkg"
)func main() {// 初始化redis操作对象var r redispkg.MyRedisToolr.Addr = "127.0.0.1:6379"r.Pass = ""r.DataBase = 0r.RedisInit()// 循环接收消息for {msg, err := r.GetMsg("test")if err != nil {fmt.Println("接收消息发生错误:", err)}else{fmt.Println("成功收到msg:", msg)}}
}