调度器的内部结构

Posted by Boenn on October 7, 2021

Heap

heap用于实现调度队列中的 activeQ 和 unschedulableQ 。heap的实现主要依赖结构题 data

data

data 是一个内部结构体,它实现了标准的堆接口并将数据保存在堆中。

type KeyFunc func(obj interface{}) (string, error)

type heapItem struct {
	obj   interface{} // The object which is stored in the heap.存储在堆中的对象
	index int         // 该对象的键
}

type lessFunc = func(item1, item2 interface{}) bool

type data struct {
   // items 是从对象的键到对象及其索引的map。
   items map[string]*heapItem
   // queue 实现了一个堆数据结构,并根据堆不变式保持元素的顺序。queue将对象的键保存在上面的“item”字段中。
   queue []string
   // keyFunc 用于queue item插入和检索的key的。
   keyFunc KeyFunc
   // 比较两个object的大小
   lessFunc lessFunc
}

相关函数

实现排序

// Less compares two objects and returns true if the first one should go
// in front of the second one in the heap.
// Less 比较两个对象,如果第一个对象应该在堆中的第二个对象之前,则返回 true。
func (h *data) Less(i, j int) bool {
   if i > len(h.queue) || j > len(h.queue) {
      return false
   }
   itemi, ok := h.items[h.queue[i]]
   if !ok {
      return false
   }
   itemj, ok := h.items[h.queue[j]]
   if !ok {
      return false
   }
   return h.lessFunc(itemi.obj, itemj.obj)
}

// Len returns the number of items in the Heap.
func (h *data) Len() int { return len(h.queue) }

// 同时修改data中的queue和item字段
func (h *data) Swap(i, j int) {
   h.queue[i], h.queue[j] = h.queue[j], h.queue[i]
   item := h.items[h.queue[i]]
   item.index = i
   item = h.items[h.queue[j]]
   item.index = j
}

push/pop/peek,每次同时修改data的queue和items字段。

type itemKeyValue struct {
	key string
	obj interface{}
}

func (h *data) Push(kv interface{}) {
   keyValue := kv.(*itemKeyValue)
   n := len(h.queue)
   // 处理items
   h.items[keyValue.key] = &heapItem{keyValue.obj, n}
   // 处理queue 
   h.queue = append(h.queue, keyValue.key)
}

func (h *data) Pop() interface{} {
   // 处理queue
   key := h.queue[len(h.queue)-1]
   h.queue = h.queue[0 : len(h.queue)-1]
   // 处理item
   item, ok := h.items[key]
   if !ok {
      return nil
   }
   delete(h.items, key)
   return item.obj
}

func (h *data) Peek() interface{} {
   // 拿出object
   if len(h.queue) > 0 {
      return h.items[h.queue[0]].obj
   }
   return nil
}

Heap

Heap 是一个生产者/消费者队列,实现了堆数据结构。可以用来实现优先级队列和类似的数据结构。相当于在之前的data上面包了一层。

type Heap struct {
   // 数据存储对象并有一个队列,基于堆的不变性保持它们的顺序。
   data *data
   // 当堆的元素被添加或删除时,metricRecorder 更新计数器,如果它是 nil 则什么都不做
   metricRecorder metrics.MetricRecorder
}

相关函数

func (h *Heap) Add(obj interface{}) error {
   // 获取obj的key
   key, err := h.data.keyFunc(obj)
   if err != nil {
      return cache.KeyError{Obj: obj, Err: err}
   }
   // 如果已经存在于items中
   if _, exists := h.data.items[key]; exists {
      // 更新这个obj
      h.data.items[key].obj = obj
      // Fix 在索引 h.data.items[key].index 处的元素更改其值后重新建立堆排序。
      heap.Fix(h.data, h.data.items[key].index)
   } else {
      // 直接push
      heap.Push(h.data, &itemKeyValue{key, obj})
      if h.metricRecorder != nil {
         h.metricRecorder.Inc()
      }
   }
   return nil
}

AddIfNotPresent 插入一个item,并将其放入queue中。

func (h *Heap) AddIfNotPresent(obj interface{}) error {
   key, err := h.data.keyFunc(obj)
   if err != nil {
      return cache.KeyError{Obj: obj, Err: err}
   }
   // 只对不存在该key时进行push操作
   if _, exists := h.data.items[key]; !exists {
      heap.Push(h.data, &itemKeyValue{key, obj})
      if h.metricRecorder != nil {
         h.metricRecorder.Inc()
      }
   }
   return nil
}

Delete/Peek/Pop/Get/GetByKey/List/Len 相关操作

func (h *Heap) Delete(obj interface{}) error {
   key, err := h.data.keyFunc(obj)
   if err != nil {
      return cache.KeyError{Obj: obj, Err: err}
   }
   if item, ok := h.data.items[key]; ok {
      // 直接调用heap包中的函数
      heap.Remove(h.data, item.index)
      if h.metricRecorder != nil {
         h.metricRecorder.Dec()
      }
      return nil
   }
   return fmt.Errorf("object not found")
}

func (h *Heap) Peek() interface{} {
   // 在data中已实现
   return h.data.Peek()
}

func (h *Heap) Pop() (interface{}, error) {
   // 这里直接调用 container/heap 包中的函数
   obj := heap.Pop(h.data)
   if obj != nil {
      if h.metricRecorder != nil {
         h.metricRecorder.Dec()
      }
      return obj, nil
   }
   return nil, fmt.Errorf("object was removed from heap data")
}

func (h *Heap) Get(obj interface{}) (interface{}, bool, error) {
   // 获取key
   key, err := h.data.keyFunc(obj)
   if err != nil {
      return nil, false, cache.KeyError{Obj: obj, Err: err}
   }
   return h.GetByKey(key)
}

func (h *Heap) GetByKey(key string) (interface{}, bool, error) {
   // 根据key从data中获取obj
   item, exists := h.data.items[key]
   if !exists {
      return nil, false, nil
   }
   return item.obj, true, nil
}

// 返回所有的items
func (h *Heap) List() []interface{} {
   list := make([]interface{}, 0, len(h.data.items))
   for _, item := range h.data.items {
      list = append(list, item.obj)
   }
   return list
}

func (h *Heap) Len() int {
   return len(h.data.queue)
}