使用Golang实现一个消息队列,具体要满足以下功能:
-
可以向消息队列发送消息和拉取消息
-
可以向消息队列发送消息和拉取消息
-
发送的消息不能超过指定的容量
-
拉取消息支持拉取指定数量的消息,如果不满足指定的数量,等待超时以后返回超时时间内拉取的所有消息(不会超过指定的数量)
-
使用Golang中的基本数据结构和功能来实现
package main
import (
"time"
)
// MessageQueue define the interface for the message queue.
type MessageQueue interface {
// Send the message into the MessageQueue.
Send(message interface{})
// Pull the messages with the given size and timeout.
// size indicates the maximum number of messages pulled at one time
// timeout indicates the timeout when pulled the messages
// So, the number of messages returned depends on the maximum number of
// messages that can be pulled during the timeout period,
// which must be less than or equal to the given size.
Pull(size int, timeout time.Duration) []interface{}
// Size the current number of messages in MessageQueue.
Size() int
// Capacity the maximum number of messages in MessageQueue.
Capacity() int
}
type MyMessageQueue struct {
queue chan interface{}
capacity int
}
func (this *MyMessageQueue) Send(message interface{}) {
select {
case this.queue <- message:
default:
}
}
func (this *MyMessageQueue) Pull(size int, timeout time.Duration) []interface{} {
ret := make([]interface{}, 0)
for i := 0; i < size; i++ {
select {
case msg := <-this.queue:
ret = append(ret, msg)
case <-time.After(timeout):
return ret
}
}
return ret
}
func (this *MyMessageQueue) Size() int {
return len(this.queue)
}
func (this *MyMessageQueue) Capacity() int {
return this.capacity
}
func NewMessageQueue(capacity int) MessageQueue {
var mq MessageQueue
mq = &MyMessageQueue{
queue: make(chan interface{}, capacity),
capacity: capacity,
}
return mq
}