- 前言
实际上我只分析到第六天,最后一天使用protobuf其实是锦上添花的东西,实际到day6就结束了。
1. 项目构成剖析
1.1 整体流程
是
接收 key --> 检查是否被缓存 -----> 返回缓存值 ⑴
| 否 是
|-----> 是否应当从远程节点获取 -----> 与远程节点交互 --> 返回缓存值 ⑵
| 否
|-----> 调用`回调函数`,获取值并添加到缓存 --> 返回缓存值 ⑶
第二步详细分解:
使用一致性哈希选择节点 是 是
|-----> key是否在远程节点 -----> 利用HTTP 客户端访问远程节点 --> 成功?-----> 服务端返回返回值
| 否 ↓ 否
|----------------------------> 回退到本地节点处理。
1.2 结合代码剖析
1. 节点创建一个缓存空间Group,启动与用户交互的接口和可被远程访问的缓存服务器;
(main文件中,"http://localhost:9999"是本地节点对用户开放的访问地址(api地址);"http://localhost:8001"、"http://localhost:8002"、"http://localhost:8003",实际上是模拟的三个不同的远程节点)
2. 用户有访问请求了,是采用"http://localhost:9999/api?key=Tom"格式的get请求来访问9999的本地交互接口,如果本地节点有缓存值,那么返回;如果本地节点没有再去远程节点找,找不到就回调函数使用源数据。
(main中的startAPIServer就是开启的本地交互接口,startCacheServer实际上是创建和启动3个远程节点的服务器,并且把它注册到Group中,这样本地找不到就可以看看远程节点有没有)
3. 本地缓存没有,会调用Group的load方法找远程节点或采用回调函数找缓存值;
4. 找远程节点就涉及到从一致性哈希找节点,然后返回对应的远程节点,再拿到远程节点的HTTP客户端,用它去访问远程节点。
(HTTPPool理解为HTTP节点池比较好,里面存有一致性哈希,一致性哈希存有key和对应节点的关系;还存有节点和HTTP客户端的映射表,找到对应的节点然后返回对应的HTTP客户端给Group,Group负责与用户的交互,并且控制缓存值存储和获取的流程)
- 一个容易搞混的地方是Group里面的peers字段是存了能够访问的远程节点(HTTPPool),HTTPPool里面的peers字段是一致性哈希算法实例;区别是Group要记录远程节点是因为它要知道能否从这些远程节点找到自己想要的值,而它怎么判断这个远程节点上有这个缓存呢?就是基于HTTPPool里面的peers一致性哈希去计算key存在哪个节点上,然后返回这个HTTP客户端,利用HTTP客户端去访问这个远程节点取值。
2. 代码详解(含详细备注)
2.1 项目文件结构
geecache/
|--consistenthash/
|--consistenthash.go // 一致性哈希算法
|--lru/
|--lru.go // lru 缓存淘汰策略
|--singleflight/
|--singleflight.go // 防止缓存击穿策略
|--byteview.go // 缓存值的抽象与封装
|--cache.go // 并发控制
|--geecache.go // 负责与外部交互,控制缓存存储和获取的主流程
|--http.go // 提供被其他节点访问的能力(基于http)
|--peers.go // 抽象的两个接口PeerPicker和PeerGetter
|--main.go
- PeerPicker接口的目的是查找拥有key的节点,HTTPPool实现了此接口;
- PeerGetter接口的目的是查找缓存值,httpGetter(http客户端)实现了此接口。
2.2 代码注释
consistenthash.go
package consistenthash
import (
"hash/crc32"
"sort"
"strconv"
)
// Hash maps bytes to uint32
type Hash func(data []byte) uint32
// Map constains all hashed keys
type Map struct {
// 哈希函数
hash Hash
// 虚拟节点倍数
// 即根据真实节点创建replicas个虚拟节点
replicas int
// 哈希环
keys []int // Sorted
// 虚拟节点与真实节点的映射表
// 键是虚拟节点的哈希值,值是真实节点的名称
hashMap map[int]string
}
// New creates a Map instance
// 实例化一致性哈希算法
func New(replicas int, fn Hash) *Map {
m := &Map{
replicas: replicas,
hash: fn,
hashMap: make(map[int]string),
}
if m.hash == nil {
m.hash = crc32.ChecksumIEEE
}
return m
}
// Add adds some keys to the hash.
// 添加节点到哈希环上
// 允许可变长的参数keys(函数允许传入0或多个真实节点的名称)
// keys是一个字符串数组["http://localhost:8001","http://localhost:8002","http://localhost:8003"]
func (m *Map) Add(keys ...string) {
for _, key := range keys {
// 对每一个真实节点key,对应创建m.replicas个虚拟节点
for i := 0; i < m.replicas; i++ {
// 虚拟节点的名称是:strconv.Itoa(i)+key
// m.hash()计算虚拟节点的哈希值
hash := int(m.hash([]byte(strconv.Itoa(i) + key)))
// 将节点哈希值添加到环上
m.keys = append(m.keys, hash)
// 增加虚拟节点和真实节点的映射关系
m.hashMap[hash] = key
}
}
// 对环上的哈希值排序(升序)
sort.Ints(m.keys)
}
// Get gets the closest item in the hash to the provided key.
func (m *Map) Get(key string) string {
if len(m.keys) == 0 {
return ""
}
// key是节点名称,通过其计算得到key的哈希值
hash := int(m.hash([]byte(key)))
// Binary search for appropriate replica.
/*
func Search(n int, f func(int) bool) int
Search函数采用二分法搜索找到[0,n)区间内最小的满足f(i)==true的值i
*/
// idx是顺时针找到的第一个匹配的虚拟节点下标
idx := sort.Search(len(m.keys), func(i int) bool {
return m.keys[i] >= hash
})
// 因为是环,所以要取余
// 返回虚拟节点在hashMap映射到的真实节点
return m.hashMap[m.keys[idx % len(m.keys)]]
}
lru.go
package lru
import "container/list"
// Cache is a LRU cache. It is not safe for concurrent access.
type Cache struct {
maxBytes int64
nbytes int64
ll *list.List
cache map[string]*list.Element
// optional and executed when an entry is purged.
OnEvicted func(key string, value Value)
}
type entry struct {
key string
value Value
}
// Value use Len to count how many bytes it takes
type Value interface {
Len() int
}
// New is the Constructor of Cache
func New(maxBytes int64, onEvicted func(string, Value)) *Cache {
return &Cache{
maxBytes: maxBytes,
ll: list.New(),
cache: make(map[string]*list.Element),
OnEvicted: onEvicted,
}
}
// Add adds a value to the cache.
func (c *Cache) Add(key string, value Value) {
if ele, ok := c.cache[key]; ok {
c.ll.MoveToFront(ele)
kv := ele.Value.(*entry)
c.nbytes += int64(value.Len()) - int64(kv.value.Len())
kv.value = value
} else {
ele := c.ll.PushFront(&entry{key, value})
c.cache[key] = ele
c.nbytes += int64(len(key)) + int64(value.Len())
}
for c.maxBytes != 0 && c.maxBytes < c.nbytes {
c.RemoveOldest()
}
}
// Get look ups a key's value
func (c *Cache) Get(key string) (value Value, ok bool) {
if ele, ok := c.cache[key]; ok {
c.ll.MoveToFront(ele)
kv := ele.Value.(*entry)
return kv.value, true
}
return
}
// RemoveOldest removes the oldest item
func (c *Cache) RemoveOldest() {
ele := c.ll.Back()
if ele != nil {
c.ll.Remove(ele)
kv := ele.Value.(*entry)
delete(c.cache, kv.key)
c.nbytes -= int64(len(kv.key)) + int64(kv.value.Len())
if c.OnEvicted != nil {
c.OnEvicted(kv.key, kv.value)
}
}
}
// Len the number of cache entries
func (c *Cache) Len() int {
return c.ll.Len()
}
singleflight.go
package singleflight
import "sync"
// call is an in-flight or completed Do call
type call struct {
wg sync.WaitGroup
val interface{}
err error
}
// Group represents a class of work and forms a namespace in which
// units of work can be executed with duplicate suppression.
type Group struct {
mu sync.Mutex // protects m
m map[string]*call // lazily initialized
}
// Do executes and returns the results of the given function, making
// sure that only one execution is in-flight for a given key at a
// time. If a duplicate comes in, the duplicate caller waits for the
// original to complete and receives the same results.
// Do方法传入key和回调函数,返回找到的缓存值;
// 能够保证一个key只会调用一个回调函数进行查找,防止缓存击穿;
func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
g.mu.Unlock()
c.wg.Wait()
return c.val, c.err
}
c := new(call)
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
c.val, c.err = fn()
c.wg.Done()
g.mu.Lock()
delete(g.m, key)
g.mu.Unlock()
return c.val, c.err
}
byteview.go
package geecache
// A ByteView holds an immutable view of bytes.
type ByteView struct {
b []byte
}
// Len returns the view's length
func (v ByteView) Len() int {
return len(v.b)
}
// ByteSlice returns a copy of the data as a byte slice.
func (v ByteView) ByteSlice() []byte {
return cloneBytes(v.b)
}
// String returns the data as a string, making a copy if necessary.
func (v ByteView) String() string {
return string(v.b)
}
func cloneBytes(b []byte) []byte {
c := make([]byte, len(b))
copy(c, b)
return c
}
cache.go
package lru
import "container/list"
/*
本文件实现的是lru缓存淘汰策略
*/
// Cache is a LRU cache. It is not safe for concurrent access.
type Cache struct {
// 允许使用的最大内存
maxBytes int64
// 当前已使用的内存
nbytes int64
// 双向链表
// List represents a doubly linked list. The zero value for List is an empty list ready to use.
ll *list.List
// 键是字符串,值是双向链表的对应节点指针
// Element is an element of a linked list.
cache map[string]*list.Element
// 某条记录被移除时的回调函数,可为nil
// optional and executed when an entry is purged.
OnEvicted func(key string, value Value)
}
// entry表示双向链表节点的数据类型
// 在链表中仍保存每个值对应的key的好处在于,淘汰队首节点时,需要用key从字典中删除对应的映射
type entry struct {
key string
// 值是实现了Value接口的任意类型,该接口只包含了一个方法 Len() int,用于返回值所占用的内存大小。
value Value
}
// Value use Len to count how many bytes it takes
// 返回值所占用的内存大小
type Value interface {
Len() int
}
// New is the Constructor of Cache
func New(maxBytes int64, onEvicted func(string, Value)) *Cache {
return &Cache{
maxBytes: maxBytes,
ll: list.New(),
cache: make(map[string]*list.Element),
OnEvicted: onEvicted,
}
}
/*
约定:代码中约定front作为队尾,back作为队头
因此访问到的元素要移动到front,淘汰元素直接删除back
说明:计算内存nbytes时,包含了双向链表条目中的key和value两者所占内存之和
*/
// Add adds a value to the cache.
// 增加&修改
func (c *Cache) Add(key string, value Value) {
// 若存在则更新节点对应的值
if ele, ok := c.cache[key]; ok {
c.ll.MoveToFront(ele)
kv := ele.Value.(*entry)
// 计算新加值与原值的大小差
c.nbytes += int64(value.Len()) - int64(kv.value.Len())
kv.value = value
} else {
// 不存在,则新建一个节点,添加到队尾
ele := c.ll.PushFront(&entry{key, value})
// 添加映射关系
c.cache[key] = ele
// key和value占内存之和
c.nbytes += int64(len(key)) + int64(value.Len())
}
// 更新c.nbytes,如果超过了设定的最大值c.maxBytes,则移除最少访问的节点
// 当maxBytes等于0时,不触发,即默认无限添加
for c.maxBytes != 0 && c.maxBytes < c.nbytes {
c.RemoveOldest()
}
}
// Get look ups a key's value
// 查找
func (c *Cache) Get(key string) (value Value, ok bool) {
// 若节点存在,移动到front,并返回找到的值
if ele, ok := c.cache[key]; ok {
c.ll.MoveToFront(ele)
kv := ele.Value.(*entry)
return kv.value, true
}
return
}
// RemoveOldest removes the oldest item
// 删除
func (c *Cache) RemoveOldest() {
// 取队首节点
ele := c.ll.Back()
if ele != nil {
// 从双向链表中删除
c.ll.Remove(ele)
kv := ele.Value.(*entry)
// 从map中删除
delete(c.cache, kv.key)
// 更新占用的内存(减去key和value所占的内存)
c.nbytes -= int64(len(kv.key)) + int64(kv.value.Len())
// 如果回调函数OnEvicted不为nil,则调用回调函数
if c.OnEvicted != nil {
c.OnEvicted(kv.key, kv.value)
}
}
}
// Len the number of cache entries
// 实现Value接口的方法Len
// 获取添加了多少条数据
func (c *Cache) Len() int {
return c.ll.Len()
}
geecache.go
package geecache
import (
"fmt"
"geecache/singleflight"
"log"
"sync"
)
// A Group is a cache namespace and associated data loaded spread over
type Group struct {
// Group的名称
name string
// 回调,用于缓存未命中时获取源数据
getter Getter
// 并发LRU缓存
mainCache cache
// HTTPPool对象(HTTP服务端),它实现了PeerPicker
// 记录可访问的远程节点
peers PeerPicker
// use singleflight.Group to make sure that
// each key is only fetched once
loader *singleflight.Group
}
// A Getter loads data for a key.
type Getter interface {
Get(key string) ([]byte, error)
}
// A GetterFunc implements Getter with a function.
// 接口型函数
type GetterFunc func(key string) ([]byte, error)
// Get implements Getter interface function
func (f GetterFunc) Get(key string) ([]byte, error) {
return f(key)
}
var (
mu sync.RWMutex
groups = make(map[string]*Group)
)
// NewGroup create a new instance of Group
// 创建Group实例
func NewGroup(name string, cacheBytes int64, getter Getter) *Group {
if getter == nil {
panic("nil Getter")
}
mu.Lock()
defer mu.Unlock()
g := &Group{
name: name,
getter: getter,
mainCache: cache{cacheBytes: cacheBytes},
loader: &singleflight.Group{},
}
groups[name] = g
return g
}
// GetGroup returns the named group previously created with NewGroup, or
// nil if there's no such group.
func GetGroup(name string) *Group {
mu.RLock()
g := groups[name]
mu.RUnlock()
return g
}
// Get value for a key from cache
// 从LRU缓存中找缓存值,若缓存不存在,则调用load方法
func (g *Group) Get(key string) (ByteView, error) {
if key == "" {
return ByteView{}, fmt.Errorf("key is required")
}
// 从LRU缓存中获取缓存值
if v, ok := g.mainCache.get(key); ok {
log.Println("[GeeCache] hit")
return v, nil
}
// 缓存中没有,则调用load从远程节点 或 调用回调函数
return g.load(key)
}
// RegisterPeers registers a PeerPicker for choosing remote peer
// 将实现了 PeerPicker 接口的 HTTPPool 注入到 Group 中
func (g *Group) RegisterPeers(peers PeerPicker) {
if g.peers != nil {
panic("RegisterPeerPicker called more than once")
}
g.peers = peers
}
// 若缓存值在远程节点上存在,则用对应的HTTP客户端从远程节点上访问获取缓存值
// 若不能则调用回调函数,获取值并添加到缓存
func (g *Group) load(key string) (value ByteView, err error) {
// each key is only fetched once (either locally or remotely)
// regardless of the number of concurrent callers.
// 调用防止缓存击穿的查找缓存值方法
viewi, err := g.loader.Do(key, func() (interface{}, error) {
// 有HTTPPool(HTTP服务端),则从远程节点找缓存值
if g.peers != nil {
// 通过一致性哈希找到存储key的节点客户端peer
if peer, ok := g.peers.PickPeer(key); ok {
// 利用HTTP客户端访问远程节点
if value, err = g.getFromPeer(peer, key); err == nil {
return value, nil
}
log.Println("[GeeCache] Failed to get from peer", err)
}
}
// 没有远程节点,则调用回调函数获取源数据
return g.getLocally(key)
})
if err == nil {
return viewi.(ByteView), nil
}
return
}
// 将源数据添加到LRU缓存 mainCache 中
func (g *Group) populateCache(key string, value ByteView) {
g.mainCache.add(key, value)
}
// 调用回调函数获取源数据
func (g *Group) getLocally(key string) (ByteView, error) {
// 调用回调函数
bytes, err := g.getter.Get(key)
if err != nil {
return ByteView{}, err
}
// 采用回调返回值的副本,因为[]byte是切片,传的是引用地址,如果后面有改动,所有的值都会变化
value := ByteView{b: cloneBytes(bytes)}
// 将获取到的源数据加入到缓存中
g.populateCache(key, value)
return value, nil
}
// 使用实现了 PeerGetter 接口的 httpGetter(HTTP客户端)访问远程节点,获取缓存值
func (g *Group) getFromPeer(peer PeerGetter, key string) (ByteView, error) {
bytes, err := peer.Get(g.name, key)
if err != nil {
return ByteView{}, err
}
return ByteView{b: bytes}, nil
}
http.go
package geecache
import (
"fmt"
"geecache/consistenthash"
"io/ioutil"
"log"
"net/http"
"net/url"
"strings"
"sync"
)
const (
defaultBasePath = "/_geecache/"
defaultReplicas = 50
)
// HTTPPool implements PeerPicker for a pool of HTTP peers.
// HTTP节点池
type HTTPPool struct {
// this peer's base URL, e.g. "https://example.net:8000"
self string
basePath string
mu sync.Mutex // guards peers and httpGetters
peers *consistenthash.Map // 一致性哈希
// key是"http://10.0.0.2:8008",value是对应的HTTP客户端
// 即,从一致性哈希里面找到了key存在"http://10.0.0.2:8008"这个远程节点上,利用此字段就可获取到访问这个远程节点的HTTP客户端
httpGetters map[string]*httpGetter // keyed by e.g. "http://10.0.0.2:8008"
}
// NewHTTPPool initializes an HTTP pool of peers.
// 实例化HTTPPool
func NewHTTPPool(self string) *HTTPPool {
return &HTTPPool{
self: self,
basePath: defaultBasePath,
}
}
// Log info with server name
func (p *HTTPPool) Log(format string, v ...interface{}) {
log.Printf("[Server %s] %s", p.self, fmt.Sprintf(format, v...))
}
// ServeHTTP handle all http requests
func (p *HTTPPool) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// 判断访问路径的前缀是否是basePath,不是则返回错误信息
if !strings.HasPrefix(r.URL.Path, p.basePath) {
panic("HTTPPool serving unexpected path: " + r.URL.Path)
}
p.Log("%s %s", r.Method, r.URL.Path)
// /<basepath>/<groupname>/<key> required
// 第一个参数实际的输入是<groupname>/<key>,默认的basePath是/_geecache/
// 然后,/作为分隔符,将<groupname>/<key>字符串分割出2个子串,即<groupname>和<key>
parts := strings.SplitN(r.URL.Path[len(p.basePath):], "/", 2)
if len(parts) != 2 {
http.Error(w, "bad request", http.StatusBadRequest)
return
}
// 分别取出字符串
groupName := parts[0]
key := parts[1]
// 返回指定name的Group
group := GetGroup(groupName)
if group == nil {
http.Error(w, "no such group: "+groupName, http.StatusNotFound)
return
}
// 根据key取缓存值
view, err := group.Get(key)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
// 设置httpResponse的头部
// Content-Type:内容类型;application/octet-stream:二进制流数据(如常见的文件下载)
// 头部字段详解见:https://www.runoob.com/http/http-content-type.html
w.Header().Set("Content-Type", "application/octet-stream")
// 将缓存值作为 httpResponse的body返回
w.Write(view.ByteSlice())
}
// Set updates the pool's list of peers.
// 实例化一致性哈希算法,并添加传入的节点
// peers是一个字符串数组["http://localhost:8001","http://localhost:8002","http://localhost:8003"]
func (p *HTTPPool) Set(peers ...string) {
p.mu.Lock()
defer p.mu.Unlock()
p.peers = consistenthash.New(defaultReplicas, nil)
p.peers.Add(peers...)
p.httpGetters = make(map[string]*httpGetter, len(peers))
for _, peer := range peers {
p.httpGetters[peer] = &httpGetter{baseURL: peer + p.basePath}
}
}
// PickPeer picks a peer according to key
// 实现了PeerPicker接口,在哈希环上找key对应的节点,然后返回这个节点的http客户端
func (p *HTTPPool) PickPeer(key string) (PeerGetter, bool) {
p.mu.Lock()
defer p.mu.Unlock()
// Get方法是在一致性哈希上面找存储key的节点,返回的peer是string,如"http://localhost:8001"
if peer := p.peers.Get(key); peer != "" && peer != p.self {
p.Log("Pick peer %s", peer)
return p.httpGetters[peer], true
}
return nil, false
}
// 检查 HTTPPool 是否实现了接口 PeerPicker ,若没有则会编译出错
var _ PeerPicker = (*HTTPPool)(nil)
// http客户端类,实现了PeerGetter接口
type httpGetter struct {
baseURL string
}
// Get HTTP客户端httpGetter提交的get请求,访问远程节点
func (h *httpGetter) Get(group string, key string) ([]byte, error) {
// 进行字符串拼接,格式:http://example.com/_geecache/group/key
u := fmt.Sprintf(
"%v%v/%v",
h.baseURL,
url.QueryEscape(group),
url.QueryEscape(key),
)
// 发起http的get请求
res, err := http.Get(u)
if err != nil {
return nil, err
}
defer res.Body.Close()
// 检测状态码
if res.StatusCode != http.StatusOK {
return nil, fmt.Errorf("server returned: %v", res.Status)
}
// 读取消息体响应内容
bytes, err := ioutil.ReadAll(res.Body)
if err != nil {
return nil, fmt.Errorf("reading response body: %v", err)
}
return bytes, nil
}
// 检查 httpGetter 是否实现了接口 PeerGetter ,若没有则会编译出错
var _ PeerGetter = (*httpGetter)(nil)
peers.go
package geecache
// PeerPicker is the interface that must be implemented to locate
// the peer that owns a specific key.
// PeerPicker 是必须实现的接口,用于查找拥有特定key的节点
// 此项目中由 HTTPPool 实现此接口,HTTPPool是HTTP池(也可理解为HTTP服务器)
type PeerPicker interface {
// PickPeer 根据传入的key选择相应节点PeerGetter(http客户端)
PickPeer(key string) (peer PeerGetter, ok bool)
}
// PeerGetter is the interface that must be implemented by a peer.
// http客户端接口,http客户端必须实现Get方法
// 此项目中的http客户端类是httpGetter
type PeerGetter interface {
// Get 从对应group查找缓存值
Get(group string, key string) ([]byte, error)
}
main.go
package main
/*
$ curl "http://localhost:9999/api?key=Tom"
630
$ curl "http://localhost:9999/api?key=kkk"
kkk not exist
*/
import (
"flag"
"fmt"
"geecache"
"log"
"net/http"
)
// 本地数据源
// 如果在缓存中没有找到,就会通过回调函数访问这里的数据源
var db = map[string]string{
"Tom": "630",
"Jack": "589",
"Sam": "567",
}
// 创建一个缓存空间Group,返回*geecache.Group
func createGroup() *geecache.Group {
// Group的名称是scores,缓存大小是2的10次方,回调函数是从上述本地数据源的map找对应的键值对
return geecache.NewGroup("scores", 2<<10, geecache.GetterFunc(
func(key string) ([]byte, error) {
log.Println("[SlowDB] search key", key)
if v, ok := db[key]; ok {
return []byte(v), nil
}
return nil, fmt.Errorf("%s not exist", key)
}))
}
// 启动缓存服务器:创建HTTPPool,添加节点信息,注册到gee中,启动HTTP服务(共3个端口,8001/8002/8003),用户不感知
func startCacheServer(addr string, addrs []string, gee *geecache.Group) {
// 创建HTTPPool
peers := geecache.NewHTTPPool(addr)
// 添加节点信息
peers.Set(addrs...)
// 将节点注册到Group
gee.RegisterPeers(peers)
log.Println("geecache is running at", addr)
// 启动HTTP服务
log.Fatal(http.ListenAndServe(addr[7:], peers))
}
// 开启api服务,与用户进行交互
func startAPIServer(apiAddr string, gee *geecache.Group) {
// 第一个参数是路由匹配规则,第二个参数是调用接口型函数HandlerFunc,传入一个处理请求的方法
http.Handle("/api", http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
// 解析Get请求的URL,并找到匹配的值
// 如http://localhost:9999/api?key=Tom解析找到"key"对应的是Tom,返回Tom
key := r.URL.Query().Get("key")
// 从缓存空间Group找key对应的缓存,返回找到的缓存值
view, err := gee.Get(key)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
// 设置HTTP头
w.Header().Set("Content-Type", "application/octet-stream")
// 响应http请求并传输找到的缓存值
w.Write(view.ByteSlice())
}))
log.Println("fontend server is running at", apiAddr)
// 开启监听服务,第二个参数不用指定是因为上面http.Handle已经指定了请求处理逻辑
log.Fatal(http.ListenAndServe(apiAddr[7:], nil))
}
func main() {
var port int
var api bool
/*
使用flag包,解析命令行参数,分为两步:
(1)绑定;(2)解析。
使用效果:
go build -o server
./server -port=8003 -api=1
变量port会被赋值为8003,api赋值为1
*/
// 将命令行中的port(第二个参数)绑定在变量port(第一个参数)上,默认值是8001,usage是帮助信息
flag.IntVar(&port, "port", 8001, "Geecache server port")
flag.BoolVar(&api, "api", false, "Start a api server?")
// 把用户传递的命令行参数解析为对应变量的值
flag.Parse()
// 本节点开启节点服务的地址和端口
apiAddr := "http://localhost:9999"
addrMap := map[int]string{
8001: "http://localhost:8001",
8002: "http://localhost:8002",
8003: "http://localhost:8003",
}
// 取map中的所有value,放入切片中
var addrs []string
for _, v := range addrMap {
addrs = append(addrs, v)
}
// 创建一个缓存空间Group,返回*geecache.Group
gee := createGroup()
// 若api是true,开启api服务,用户可通过端口9999进行访问
if api {
go startAPIServer(apiAddr, gee)
}
// 启动缓存服务器
// addrs的值是["http://localhost:8001","http://localhost:8002","http://localhost:8003"]
startCacheServer(addrMap[port], addrs, gee)
}
3. 知识点答疑
- 1.http.Handle()和http.HandFunc()
- 2.接口型函数
/*
1. http.Handle()和http.HandFunc()
它们的作用是当接收到一个匹配路由的请求时,会调用该方法。
1) func Handle(pattern string, handler Handler)
func Handle(pattern string, handler Handler) {
DefaultServeMux.Handle(pattern, handler)
}
第二个参数是Handler接口,如下:
type Handler interface {
ServeHTTP(ResponseWriter, *Request)
}
2) 这里有一个容易搞混的是和Handle相同作用的http.HandleFunc()
func HandleFunc(pattern string, handler func(ResponseWriter, *Request)) {
DefaultServeMux.HandleFunc(pattern, handler)
}
可以看到第一个参数都是路由匹配的字符串,与Handle的区别就在于第二个参数!
深挖HandleFunc,发现HandleFunc最后是将handler转换成了HandlerFunc类型;
func (mux *ServeMux) HandleFunc(pattern string, handler func(ResponseWriter, *Request)) {
...
mux.Handle(pattern, HandlerFunc(handler))
}
到这里发现调用了Handle,从源码中发现mux.Handle和上面的DefaultServeMux.Handle是一个函数
所以到这里Handle和HandleFunc就一样了。
现在就很明确,HandleFunc第二个参数传入的是一个函数,然后在内部将函数转为了HandlerFunc类型,然后利用handle进行和(1)方法一样的流程。
那么现在就需要搞清楚HandlerFunc类型?在2中详解
总结:
使用http.Handle()传入的第二个参数必须实现Handler接口的ServeHTTP方法;
使用http.HandleFunc()传入的第二个参数是一个函数。
它们的核心调用逻辑是一样的,只是http.HandleFunc()将函数做了一次类型转换。
2. http.HandlerFunc()
// HandlerFunc类型是一个适配器,允许将普通函数用作HTTP处理程序。
// 如果f是一个具有适当签名(参数、返回值)的函数,HandlerFunc(f)是一个调用f的Handler。
// 这个f其实对应的是Handler接口中的方法。
// The HandlerFunc type is an adapter to allow the use of
// ordinary functions as HTTP handlers. If f is a function
// with the appropriate signature, HandlerFunc(f) is a
// Handler that calls f.
type HandlerFunc func(ResponseWriter, *Request)
// ServeHTTP calls f(w, r).
func (f HandlerFunc) ServeHTTP(w ResponseWriter, r *Request) {
f(w, r)
}
其实,HandlerFunc就是一个接口型函数,在本项目也有使用到。
接口型函数就是用函数实现接口,这里HandlerFunc就实现了Handler接口的ServeHTTP方法。
接口型函数的参数、返回值和要实现的接口中的方法时一致的。
调用逻辑:
HandlerFunc实现了Handler接口,ServeHTTP方法调用的是HandlerFunc本身,因为HandlerFunc类型的变量就是一个方法。
接口型函数解决的问题:
1. 实现一个接口,通常的方式是定义一个新类型,然后这个类型来实现接口方法;
2. 定义新类型实现接口的方法,方法名要和接口内的方法名一致。(如果接口方法名变了,那么也失效了)
使用接口型函数可以自定义方法名,而不用一定要与接口内方法名一致。
使用接口型函数可以将调用者转换为指定类型,如1中的HandlerFunc(handler),因此可以直接接收函数类型,不用定义类型实现方法了,因此可以去掉类型定义。
接口型函数总结:
1. 定义接口型函数,其签名和接口中的方法签名要一致;
2. 接口型函数实现接口方法,内部调用接口型函数自己;
3. 自定义调用者使用的函数,参数中含有对应的函数,这一步是将传入的函数强制转为接口型函数类型;(可不要)
4. 调用者使用上述函数传入方法参数,这个传入的方法是接口中的方法的处理逻辑;
推荐文章:https://cloud.tencent.com/developer/article/1196581
*/
4. 个人思考
- 借用一张图
这张图很明确了,本项目是一个分布式缓存,它存的是客户端频繁访问的数据库中的数据,客户端先在本地分布式缓冲找,找不到去远程节点找,再找不到则去数据源(DB)中找,并将结构缓存到本地,如果缓存已满,则涉及到如本项目的LRU策略淘汰机制。
那么对于实际部署,其实我就有两个疑问引出一连串问题,(1)一个节点怎么知道其他节点上哪个节点存了对应的key?不同的节点需要信息同步吧?他们要相互知道对方有哪些key?是每次有一个节点添加了缓存,其他节点对应的要在其一致性哈希上做记录吗?不做记录怎么(2)一个节点上线或下线需要同步它的状态信息到其他节点吗?否则其他节点不知道对方能不能访问,不可能就永久存储不改变远程节点信息吧。 - 一些个人理解
- 现在的单一节点,其实Group拥有所有信息,不存在分布式节点同步的问题,因此有上诉问题。
- 在项目中没有缓存的删除、更新操作;
- 有类似TCP通信的队头阻塞问题,即多个请求来到同一个cache,第一个没有返回时后续的请求都会阻塞;
- 还有一个重要的理解是:一个节点定义的Group,其实它既含客户端也含服务端;
- 不支持节点间同步信息,如果要同步需要借用第三方工具。(这里其实就回答了上面一些疑问,本项目和groupcache都不支持节点信息同步,需要借助第三方工具,节点上下线可能引起的数据迁移问题被一致性哈希解决了,不会引起大规模的信息迁移。所以按理来说每个节点还是要知道其他各节点和它们的key,否则怎么在一致性哈希上面做运算,是吧?
- 这里我没有搜到相关的资料,先去问问作者,后序有解答会在这里更新!
参考