Heap
heap用于实现调度队列中的 activeQ 和 unschedulableQ 。heap的实现主要依赖结构题 data。
data
data 是一个内部结构体,它实现了标准的堆接口并将数据保存在堆中。
type KeyFunc func(obj interface{}) (string, error)
type heapItem struct {
obj interface{} // The object which is stored in the heap.存储在堆中的对象
index int // 该对象的键
}
type lessFunc = func(item1, item2 interface{}) bool
type data struct {
// items 是从对象的键到对象及其索引的map。
items map[string]*heapItem
// queue 实现了一个堆数据结构,并根据堆不变式保持元素的顺序。queue将对象的键保存在上面的“item”字段中。
queue []string
// keyFunc 用于queue item插入和检索的key的。
keyFunc KeyFunc
// 比较两个object的大小
lessFunc lessFunc
}
相关函数
实现排序
// Less compares two objects and returns true if the first one should go
// in front of the second one in the heap.
// Less 比较两个对象,如果第一个对象应该在堆中的第二个对象之前,则返回 true。
func (h *data) Less(i, j int) bool {
if i > len(h.queue) || j > len(h.queue) {
return false
}
itemi, ok := h.items[h.queue[i]]
if !ok {
return false
}
itemj, ok := h.items[h.queue[j]]
if !ok {
return false
}
return h.lessFunc(itemi.obj, itemj.obj)
}
// Len returns the number of items in the Heap.
func (h *data) Len() int { return len(h.queue) }
// 同时修改data中的queue和item字段
func (h *data) Swap(i, j int) {
h.queue[i], h.queue[j] = h.queue[j], h.queue[i]
item := h.items[h.queue[i]]
item.index = i
item = h.items[h.queue[j]]
item.index = j
}
push/pop/peek,每次同时修改data的queue和items字段。
type itemKeyValue struct {
key string
obj interface{}
}
func (h *data) Push(kv interface{}) {
keyValue := kv.(*itemKeyValue)
n := len(h.queue)
// 处理items
h.items[keyValue.key] = &heapItem{keyValue.obj, n}
// 处理queue
h.queue = append(h.queue, keyValue.key)
}
func (h *data) Pop() interface{} {
// 处理queue
key := h.queue[len(h.queue)-1]
h.queue = h.queue[0 : len(h.queue)-1]
// 处理item
item, ok := h.items[key]
if !ok {
return nil
}
delete(h.items, key)
return item.obj
}
func (h *data) Peek() interface{} {
// 拿出object
if len(h.queue) > 0 {
return h.items[h.queue[0]].obj
}
return nil
}
Heap
Heap 是一个生产者/消费者队列,实现了堆数据结构。可以用来实现优先级队列和类似的数据结构。相当于在之前的data上面包了一层。
type Heap struct {
// 数据存储对象并有一个队列,基于堆的不变性保持它们的顺序。
data *data
// 当堆的元素被添加或删除时,metricRecorder 更新计数器,如果它是 nil 则什么都不做
metricRecorder metrics.MetricRecorder
}
相关函数
func (h *Heap) Add(obj interface{}) error {
// 获取obj的key
key, err := h.data.keyFunc(obj)
if err != nil {
return cache.KeyError{Obj: obj, Err: err}
}
// 如果已经存在于items中
if _, exists := h.data.items[key]; exists {
// 更新这个obj
h.data.items[key].obj = obj
// Fix 在索引 h.data.items[key].index 处的元素更改其值后重新建立堆排序。
heap.Fix(h.data, h.data.items[key].index)
} else {
// 直接push
heap.Push(h.data, &itemKeyValue{key, obj})
if h.metricRecorder != nil {
h.metricRecorder.Inc()
}
}
return nil
}
AddIfNotPresent 插入一个item,并将其放入queue中。
func (h *Heap) AddIfNotPresent(obj interface{}) error {
key, err := h.data.keyFunc(obj)
if err != nil {
return cache.KeyError{Obj: obj, Err: err}
}
// 只对不存在该key时进行push操作
if _, exists := h.data.items[key]; !exists {
heap.Push(h.data, &itemKeyValue{key, obj})
if h.metricRecorder != nil {
h.metricRecorder.Inc()
}
}
return nil
}
Delete/Peek/Pop/Get/GetByKey/List/Len 相关操作
func (h *Heap) Delete(obj interface{}) error {
key, err := h.data.keyFunc(obj)
if err != nil {
return cache.KeyError{Obj: obj, Err: err}
}
if item, ok := h.data.items[key]; ok {
// 直接调用heap包中的函数
heap.Remove(h.data, item.index)
if h.metricRecorder != nil {
h.metricRecorder.Dec()
}
return nil
}
return fmt.Errorf("object not found")
}
func (h *Heap) Peek() interface{} {
// 在data中已实现
return h.data.Peek()
}
func (h *Heap) Pop() (interface{}, error) {
// 这里直接调用 container/heap 包中的函数
obj := heap.Pop(h.data)
if obj != nil {
if h.metricRecorder != nil {
h.metricRecorder.Dec()
}
return obj, nil
}
return nil, fmt.Errorf("object was removed from heap data")
}
func (h *Heap) Get(obj interface{}) (interface{}, bool, error) {
// 获取key
key, err := h.data.keyFunc(obj)
if err != nil {
return nil, false, cache.KeyError{Obj: obj, Err: err}
}
return h.GetByKey(key)
}
func (h *Heap) GetByKey(key string) (interface{}, bool, error) {
// 根据key从data中获取obj
item, exists := h.data.items[key]
if !exists {
return nil, false, nil
}
return item.obj, true, nil
}
// 返回所有的items
func (h *Heap) List() []interface{} {
list := make([]interface{}, 0, len(h.data.items))
for _, item := range h.data.items {
list = append(list, item.obj)
}
return list
}
func (h *Heap) Len() int {
return len(h.data.queue)
}