本文主要研究一下dapr的consistent hash
dapr/pkg/placement/hashing/consistent_hash.go
var replicationFactor int // ErrNoHosts is an error for no hosts var ErrNoHosts = errors.New("no hosts added") // ConsistentHashTables is a table holding a map of consistent hashes with a given version type ConsistentHashTables struct { Version string Entries map[string]*Consistent } // Host represents a host of stateful entities with a given name, id, port and load type Host struct { Name string Port int64 Load int64 AppID string } // Consistent represents a data structure for consistent hashing type Consistent struct { hosts map[uint64]string sortedSet []uint64 loadMap map[string]*Host totalLoad int64 sync.RWMutex }
ConsistentHashTables定义了Version、Entries属性,Entries是个map,value为Consistent;Consistent定义了hosts、sortedSet、loadMap、totalLoad、sync.RWMutex属性
dapr/pkg/placement/hashing/consistent_hash.go
// GetInternals returns the internal data structure of the consistent hash func (c *Consistent) GetInternals() (map[uint64]string, []uint64, map[string]*Host, int64) { c.RLock() defer c.RUnlock() return c.hosts, c.sortedSet, c.loadMap, c.totalLoad }
GetInternals方法返回hosts、sortedSet、loadMap、totalLoad属性
dapr/pkg/placement/hashing/consistent_hash.go
// Add adds a host with port to the table func (c *Consistent) Add(host, id string, port int64) bool { c.Lock() defer c.Unlock() if _, ok := c.loadMap[host]; ok { return true } c.loadMap[host] = &Host{Name: host, AppID: id, Load: 0, Port: port} for i := 0; i < replicationFactor; i++ { h := c.hash(fmt.Sprintf("%s%d", host, i)) c.hosts[h] = host c.sortedSet = append(c.sortedSet, h) } // sort hashes ascendingly sort.Slice(c.sortedSet, func(i int, j int) bool { return c.sortedSet[i] < c.sortedSet[j] }) return false }
Add方法创建Host并添加到loadMap中,之后根据replicationFactor次数对host进行hash并添加到hosts及sortedSet中,最后对sortedSet进行排序
dapr/pkg/placement/hashing/consistent_hash.go
// Get returns the host that owns `key`. // // As described in https://en.wikipedia.org/wiki/Consistent_hashing // // It returns ErrNoHosts if the ring has no hosts in it. func (c *Consistent) Get(key string) (string, error) { c.RLock() defer c.RUnlock() if len(c.hosts) == 0 { return "", ErrNoHosts } h := c.hash(key) idx := c.search(h) return c.hosts[c.sortedSet[idx]], nil }
Get方法先对key进行hash,然后通过search查找idx,最后找到idx在sortedSet中对应的host,最后从hosts中返回对应host
dapr/pkg/placement/hashing/consistent_hash.go
// GetHost gets a host func (c *Consistent) GetHost(key string) (*Host, error) { h, err := c.Get(key) if err != nil { return nil, err } return c.loadMap[h], nil }
GetHost方法先get key,之后再去loadMap获取host
dapr/pkg/placement/hashing/consistent_hash.go
// GetLeast uses Consistent Hashing With Bounded loads // // https://research.googleblog.com/2017/04/consistent-hashing-with-bounded-loads.html // // to pick the least loaded host that can serve the key // // It returns ErrNoHosts if the ring has no hosts in it. // func (c *Consistent) GetLeast(key string) (string, error) { c.RLock() defer c.RUnlock() if len(c.hosts) == 0 { return "", ErrNoHosts } h := c.hash(key) idx := c.search(h) i := idx for { host := c.hosts[c.sortedSet[i]] if c.loadOK(host) { return host, nil } i++ if i >= len(c.hosts) { i = 0 } } }
GetLeast方法先对key进行hash,然后通过search获取idx,之后通过loadOK来获取least loaded host
dapr/pkg/placement/hashing/consistent_hash.go
func (c *Consistent) search(key uint64) int { idx := sort.Search(len(c.sortedSet), func(i int) bool { return c.sortedSet[i] >= key }) if idx >= len(c.sortedSet) { idx = 0 } return idx }
search方法通过sort.Search根据key获取idx
dapr的Consistent定义了hosts、sortedSet、loadMap、totalLoad、sync.RWMutex属性;它定义了GetInternals、Add、Get、GetHost、GetLeast、search等方法。