Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

延时任务管理——层级时间轮 #11

Open
JemmyH opened this issue Jul 8, 2021 · 3 comments
Open

延时任务管理——层级时间轮 #11

JemmyH opened this issue Jul 8, 2021 · 3 comments
Assignees
Labels
documentation Improvements or additions to documentation done Implementation by Go

Comments

@JemmyH
Copy link
Owner

JemmyH commented Jul 8, 2021

为什么会有 时间轮 这个概念?先看一个场景:在商品抢购项目中,短时间内会创建几百万个定时任务,创建的时候更新某个参数,一段时间以后再去核对,将与逻辑不符合的任务处理。

Golang 内置的 Timer 是采用最小堆来实现的,创建和删除的时间复杂度都为 O(log n)。如果有上万的连接,每个连接都会有很多超时任务,比如发送超时、心跳检测等,如果每个超时任务都对应一个 Timer,性能会比较低下。

论文 Hashed and Hierarchical Timing Wheels 提出了一种用于实现 Timer 的高效数据结构:时间轮。采用时间轮实现的 Timer,创建和删除的时间复杂度为 O(1)

本次借助已有的实现 RussellLuo/timingwheel: Golang implementation of Hierarchical Timing Wheels. 说明原理。该作者的实现中并没有按照常规的使用类似钟表那样的环形数组,而是参考了 kafka 的实现,在这个实现中,需要用到两个组件:

本 issue 的目标是明白层级时间轮的运作原理以及从源码层面上理解上述项目实现的细节。

@JemmyH JemmyH self-assigned this Jul 8, 2021
@JemmyH JemmyH added Digging In I'm on it. documentation Improvements or additions to documentation Implementation by Go labels Jul 8, 2021
@JemmyH
Copy link
Owner Author

JemmyH commented Jul 8, 2021

关于原理解释,参考以下文章,解释的非常清楚:
Go语言中时间轮的实现一张图理解Kafka时间轮(TimingWheel),看不懂算我输!

简单时间轮

image
image

层级时间轮

image

@JemmyH
Copy link
Owner Author

JemmyH commented Jul 12, 2021

实现

用到的数据结构:

  • bucket :表示时间轮的一个格子,里面有一个双向循环链表,每一个链表元素表示一个定时任务,这个格到期时会执行链表中的全部任务
  • Timer:链表中的元素,表示一个定时任务

在具体的实现(参考 kafka 时间轮的实现)上,kafka 的实现和上述原理还有些不同。

1. 时间轮的表示方式

image

  • 新的实现中,使用了大小为 wheelSize 的数组来表示一层时间轮,其中每一个是一个 bucket,每个 bucket 的时间单位为 tick
  • 新的实现中,currentTime 的变化并没有模拟传统的循环列表的行为,而是模拟了哈希表的行为:如上右图所示,时间轮数组会随着 currentTime 的流逝而不断向后移动,也就是说,currentTime 永远指向的是第一个 bucket,每当我们添加新的定时任务时,会根据任务的过期时间和 currentTime 计算((expiration/tick)%wheelSize)应该放在哪个 bucket

2. 时钟的驱动方式

image

  • 往层级时间轮中添加一个定时任务 task1 后,会将该任务所属的 bucket2 的到期时间设置为 task1 的到期时间 expiration(= 当前时间 currentTime + 定时任务到期间隔 duration),并将这个 bucket2 添加(Offer)到 DelayQueue 中。
  • DelayQueue(内部有一个线程)会等待 “到期时间最早(earliest)的 bucket” 到期,图中等到的是排在队首的 bucket2,于是经由 poll 返回并删除这个 bucket2;随后,时间轮会将当前时间 currentTime 往前移动到 bucket2expiration 所指向的时间(图中是 1ms 所在的位置);最后,bucket2 中包含的 task1 会被删除并执行。

@JemmyH
Copy link
Owner Author

JemmyH commented Jul 12, 2021

bucket

bucket 表示时间轮的一个时间格,相同到期时间的所有定时任务都以双链表的形式存储在同一个 bucket 中,当 bucket 对应的过期时间到达时,里面所有的任务会被逐个取出执行。

type bucket struct {
	// 以当前时间为基准,再经过 expiration 后到期。如果为 -1,表示已经过期
	expiration int64

	mu     sync.Mutex
        // 存储具体任务的双链表
	timers *list.List
}

再来看 bucket 相关的一些方法:

// newBucket 创建一个 bucket
func newBucket() *bucket {
	return &bucket{
		timers:     list.New(),
		expiration: -1,
	}
}

// GetExpiration 返回当前 bucket 的 expiration
func (b *bucket) GetExpiration() int64 {
	return atomic.LoadInt64(&b.expiration)
}

// SetExpiration 设置当前 bucket 的 expiration,如果设置的值和已有的不相同,返回 true
func (b *bucket) SetExpiration(expiration int64) bool {
	return atomic.SwapInt64(&b.expiration, expiration) != expiration
}

// Add 添加任务到 bucket 中。Timer 表示一个具体的任务
func (b *bucket) Add(t *Timer) {
	b.mu.Lock()
        
        // 主要做的是将这个 Timer 放到 bucket 的双链表中,
        // 并且绑定 Timer 和 bucket 之间的关系。
	e := b.timers.PushBack(t)
	t.setBucket(b)
	t.element = e

	b.mu.Unlock()
}

func (b *bucket) remove(t *Timer) bool {
	if t.getBucket() != b {
		// If remove is called from t.Stop, and this happens just after the timing wheel's goroutine has:
		//     1. removed t from b (through b.Flush -> b.remove)
		//     2. moved t from b to another bucket ab (through b.Flush -> b.remove and ab.Add)
		// then t.getBucket will return nil for case 1, or ab (non-nil) for case 2.
		// In either case, the returned value does not equal to b.
		return false
	}
	b.timers.Remove(t.element)
	t.setBucket(nil)
	t.element = nil
	return true
}

// Remove 将一个 Timer 从 bucket 的双链表中移除
func (b *bucket) Remove(t *Timer) bool {
	b.mu.Lock()
	defer b.mu.Unlock()
	return b.remove(t)
}

// Flush 核心方法,bucket 对应的任务到期后,将所有的任务取出,并逐个执行传入的 reinsert 方法
func (b *bucket) Flush(reinsert func(*Timer)) {
	var ts []*Timer

	b.mu.Lock()
        // 遍历双链表,取出后删除
	for e := b.timers.Front(); e != nil; {
		next := e.Next()

		t := e.Value.(*Timer)
		b.remove(t)
		ts = append(ts, t)

		e = next
	}
	b.mu.Unlock()

	b.SetExpiration(-1) // TODO: Improve the coordination with b.Add()

        // 逐个调用回调函数处理定时任务
	for _, t := range ts {
		reinsert(t)
	}
}

Timer

Timer 表示一个具体的定时任务。

// Timer represents a single event. When the Timer expires, the given
// task will be executed.
type Timer struct {
	expiration int64 // in milliseconds
        // 到期执行的方法
	task       func()

	// The bucket that holds the list to which this timer's element belongs.
	//
	// NOTE: This field may be updated and read concurrently,
	// through Timer.Stop() and Bucket.Flush().
	b unsafe.Pointer // type: *bucket

	// The timer's element.
	element *list.Element
}

func (t *Timer) getBucket() *bucket {
	return (*bucket)(atomic.LoadPointer(&t.b))
}

func (t *Timer) setBucket(b *bucket) {
	atomic.StorePointer(&t.b, unsafe.Pointer(b))
}

// Stop 将还未执行的 Timer 移除,如果移除成功返回 true,Timer 已经过期或者已经被停止,返回 false
// 考虑到层级时间轮的实现方式,一个 Timer 有可能会被转移到不同的 bucket 上,因此,为保险起见,遍历对应的 bucket
func (t *Timer) Stop() bool {
	stopped := false
	for b := t.getBucket(); b != nil; b = t.getBucket() {
		// If b.Remove is called just after the timing wheel's goroutine has:
		//     1. removed t from b (through b.Flush -> b.remove)
		//     2. moved t from b to another bucket ab (through b.Flush -> b.remove and ab.Add)
		// this may fail to remove t due to the change of t's bucket.
		stopped = b.Remove(t)

		// Thus, here we re-get t's possibly new bucket (nil for case 1, or ab (non-nil) for case 2),
		// and retry until the bucket becomes nil, which indicates that t has finally been removed.
	}
	return stopped
}

TimingWheel

TimingWheel 表示一个层级时间轮的实例。

type TimingWheel struct {
	// 轮中每一个格的跨度,单位 毫秒
	tick int64
	// 格子个数
	wheelSize int64
	// 一轮总跨度,单位 毫秒
	interval int64
	// 当前指针时间,单位 毫秒。必须是 tick 的倍数
	currentTime int64
	// 格子
	buckets []*bucket
	// 延时队列
	queen *DelayQueue

	// 上级的时间轮
	overflowWheel unsafe.Pointer // type: *TimingWheel

	// 用于退出当前 TimingWheel
	exitC     chan struct{}
	waitGroup waitGroupWrapper
}

在实现具体的逻辑之前,我们定义以下的工具函数:

// timeToMs 将 t 变成时间戳,单位为毫秒
func timeToMs(t time.Time) int64 {
	return t.UnixNano() / int64(time.Millisecond)
}

// truncate 返回小于等于 x 的 m 的整数倍,如传入 (100,11),最靠近且小于 100 的 11 的倍数是 99,所以返回 99
func truncate(x, m int64) int64 {
	if m <= 0 {
		return x
	}
	return x - x%m
}

// waitGroupWrapper 其实是对 sync.WaitGroup 的封装
type waitGroupWrapper struct {
	sync.WaitGroup
}

func (w *waitGroupWrapper) Wrap(cb func()) {
	w.Add(1)
	go func() {
		cb()
		w.Done()
	}()
}

创建一个层级时间轮实例

func NewTimingWheel(tick time.Duration, wheelSize int64) *TimingWheel {
	// 将 tick 转化为毫秒
	tickMs := int64(tick / time.Millisecond)
	if tickMs <= 0 {
		panic("tick must be greater than 0ms")
	}
	if wheelSize < 0 {
		panic("wheelSize must be greater than 0")
	}
	startMs := timeToMs(time.Now())
	return newTimingWheelInternal(tickMs, wheelSize, startMs, NewDelayQueen(int(wheelSize)))
}

func newTimingWheelInternal(tickMs int64, wheelSize int64, startMs int64, dq *DelayQueue) *TimingWheel {
        // 创建并初始化 wheelSize 长度的 bucket
	buckets := make([]*bucket, wheelSize)
	for i := range buckets {
		buckets[i] = newBucket()
	}
	return &TimingWheel{
		tick:        tickMs,
		wheelSize:   wheelSize,
		interval:    tickMs * wheelSize,
		currentTime: truncate(startMs, tickMs),
		buckets:     buckets,
		queen:       dq,
		exitC:       make(chan struct{}),
		waitGroup:   waitGroupWrapper{sync.WaitGroup{}},
	}
}

时间轮启动

func (tw *TimingWheel) Start() {
	// 第一个 worker,不断从优先级队列中取出到期的元素,将其放在 queen.C 中
	tw.waitGroup.Wrap(func() {
		tw.queen.Poll(tw.exitC, func() int64 {
			return timeToMs(time.Now().UTC())
		})
	})
	tw.waitGroup.Wrap(func() {
		for {
			select {
			case item := <-tw.queen.C:
				// b 表示到达执行时间的时间格
				b := item.(*bucket)
				// 时间轮会将 currentTime 往前移动到 bucket到期的时间 或 下一个时间格
				tw.advanceClock(b.GetExpiration())
				// 取出bucket队列的数据,并调用 addOrRun 方法执行
				b.Flush(tw.addOrRun)
			case <-tw.exitC:
                                // 退出时间轮
				return
			}
		}
	})
}

Start 中会启动两个 goroutine:第一个会持续接收从优先级队列中的弹出的到期的 bucket,并将其放到 timingWheel.C 中;第二个会持续阻塞运行,监听 timingWheel.C 中是否有 bucket 传入,拿到 bucket 后进行具体的逻辑处理。
先看 advanceClock,这个方法主要更新当前时间轮的 currentTime为最先到期的 bucket 的到期时间:

// advanceClock 表示有 bucket 事件到期,需要更新 tw.currentTime 为 bucket 的 expiration
func (tw *TimingWheel) advanceClock(expiration int64) {
	curTime := atomic.LoadInt64(&tw.currentTime)
	// 过期时间大于等于(当前时间+tick)
	if expiration >= curTime+tw.tick {
		// 更新 tw.currentTime 为 expiration,从而推动时间轮的转动
		curTime = truncate(expiration, tw.tick)
		atomic.StoreInt64(&tw.currentTime, curTime)

		// 如果有上层时间轮,也要递归处理上层时间轮的推动
		overflowWheel := atomic.LoadPointer(&tw.overflowWheel)
		if overflowWheel != nil {
			(*TimingWheel)(overflowWheel).advanceClock(curTime)
		}
	}
}

之后调用 bucketFlush 方法,将该 bucket 中的任务全部取出,执行、删除。这里,传入的具体执行的逻辑是 tw.addOrRun,我们看下这个逻辑:

func (tw *TimingWheel) addOrRun(t *Timer) {
	if !tw.add(t) {
		go t.task()
	}
}

首先调用 tw.add,表示将这个 Timer 再次添加到时间轮中,如果成功添加,说明这个 Timer 还没到执行时间,返回 false;如果返回 true,说明没添加成功,没添加成功的唯一原因就是这个 Timer 到期了,该执行了,此时就会调用 Timer.task 回调函数,执行此 Timer 设定好的具体任务逻辑。

添加一个任务到时间轮

核心实现在 tw.add(t *Timer) 方法中。正如前面所说,这里的设计与常规的设计(环形转圈)并不一致,而是参考 kafka 的设计:每一个格子使用数组去存储,而不是环形链表,currentTime 永远指向数组的第一个元素,当我们添加一个任务(Timer) 时:

  1. 如果到期时间在第一轮,那么通过 t.expirationtw.tick 之间的关系,马上就能确定在哪个格子中,也就是 virtualID = t.expiration / tw.tick
  2. 如果在更高的层级时间轮,那么新创建一个 overflowTimingWheel,设定 tick 为 低一层的 interval。什么时候更新 tw.currentTime? 从 DelayQueen 中收到一个 bucket 时,将 currentTime 设置为 bucket.expiration
// add 尝试将 t 添加到时间轮中,如果成功添加,说明 t 还没到执行的时间,插入到了某个 bucket,返回 true;如果返回 false,说明 t 已经到
// 执行的时间了,没必要添加到时间轮中
func (tw *TimingWheel) add(t *Timer) bool {
	currentTime := atomic.LoadInt64(&tw.currentTime)
	if t.expiration < currentTime+tw.tick {
		//  case A: 已经过期或在当前时间格执行
		return false
	} else if t.expiration < currentTime+tw.interval {
		// case B:到期时间在第一层时间轮之内,这里也包含上层时间轮的降级
		virtualID := t.expiration / tw.tick
		b := tw.buckets[virtualID%tw.wheelSize]
		b.Add(t)

		if b.SetExpiration(virtualID * tw.tick) {
			tw.queen.Offer(b, b.expiration)
		}
		return true
	} else {
		// case C: 在更上层的时间轮中(过期时间超过了 tw.interval)
		overflowWheel := atomic.LoadPointer(&tw.overflowWheel)
		if overflowWheel == nil {
			// 创建一个上层时间轮,注意这里创建的 TimingWheel 的 tick 为 底下一层的 interval
			atomic.CompareAndSwapPointer(
				&tw.overflowWheel,
				nil,
				unsafe.Pointer(newTimingWheelInternal(tw.interval, tw.wheelSize, currentTime, tw.queen)),
			)
			overflowWheel = atomic.LoadPointer(&tw.overflowWheel)
		}
		// 递归调用,将这个 Timer 插入到更上层时间轮的某个 bucket 中
		return (*TimingWheel)(overflowWheel).add(t)
	}
}

手动添加延时任务

有了以上的逻辑,添加一个延时任务就变得非常简单:创建一个 Timer,设定好 expiration,调用 addOrRun将其添加到时间轮中:

func (tw *TimingWheel) AfterFunc(d time.Duration, h func()) *Timer {
	t := &Timer{
		expiration: timeToMs(time.Now().UTC().Add(d)),
		task:       h,
	}
	tw.addOrRun(t)
	return t
}

停止时间轮

给信号给 Start 中的两个 worker 并等待结束即可:

func (tw *TimingWheel) Stop() {
	close(tw.exitC)
	tw.waitGroup.Wait()
}

@JemmyH JemmyH added done and removed Digging In I'm on it. labels Jul 12, 2021
@JemmyH JemmyH pinned this issue Jul 12, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
documentation Improvements or additions to documentation done Implementation by Go
Projects
None yet
Development

No branches or pull requests

1 participant