TimingWheel
This article introduces the delayed operation in . Delayed operation, two options can be used:
Timer
: The timer maintains a priority queue, executes it at the time, and then stores the tasks that need to be executed in the map- The
timingWheel
incollection
maintains an array for storing task groups, and each slot maintains a doubly linked list of tasks. When the execution starts, the timer executes the tasks in a slot every specified time.
Scheme 2 reduces the maintenance task from the priority queue O(nlog(n))
to the doubly linked list O(1)
, and the execution of the task only needs to poll the tasks O(N)
at a time point. Priority queue, put and delete elements O(nlog(n))
.
First, let’s first talk about the use of TimingWheel in the cache of collection:
This is the initialization of cache
and the initialization of timingWheel
at the same time for key expiration processing. The parameters in turn represent:
interval
: Time division scalenumSlots
: time slotsexecute
: execute a function at a point in time
The function executed in the cache
is to delete the expired key, and this expiration is controlled by the timingWheel
to advance the time.
Next, let’s learn about it through the use of timingWheel by cache.
func newTimingWheelWithClock(interval time.Duration, numSlots int, execute Execute, ticker timex.Ticker) (
*TimingWheel, error) {
tw := &TimingWheel{
interval: interval, // Single time grid time interval
ticker: ticker, // Timer, do time push, advance by interval
slots: make([]*list.List, numSlots), // Time wheel
timers: NewSafeMap(), // Store the map of task{key, value} [parameters needed to execute execute]
tickedPos: numSlots - 1, // at previous virtual circle
execute: execute, // Execution function
numSlots: numSlots, // Initialize slots num
setChannel: make(chan timingEntry), // The following channels are used for task delivery
moveChannel: make(chan baseEntry),
removeChannel: make(chan interface{}),
drainChannel: make(chan func(key, value interface{})),
stopChannel: make(chan lang.PlaceholderType),
}
// Prepare all the lists stored in the slot
tw.initSlots()
// Open asynchronous coroutine, use channel for task communication and delivery
go tw.run()
return tw, nil
}
The above is a more intuitive display of the “time wheel” of the timingWheel
, and the details of the advancement will be explained later around this picture.
func (tw *TimingWheel) run() {
for {
select {
// Timer do time push -> scanAndRunTasks()
tw.onTick()
// add task will enter task into setChannel
case task := <-tw.setChannel:
tw.setTask(&task)
...
}
}
It can be seen that the timer
execution is started at the time of initialization, and it is rotated in the internal
time period, and then the bottom layer keeps getting the tasks from the list
in the slot
and handing them over to the execute
for execution.
The next step is to set the cache key
:
- First look at whether this key exists in the
data map
- If it exists, update
expire
->MoveTimer()
- Set the key for the first time ->
SetTimer()
So the use of timingWheel
is clear. Developers can add
or update
according to their needs.
At the same time, when we follow the source code, we will find that: SetTimer() MoveTimer()
all transports tasks to channel, and the coroutine opened in run()
continuously takes out the task operations of channel
.
SetTimer() -> setTask()
:
- not exist task:
getPostion -> pushBack to list -> setPosition
- exist task:
get from timers -> moveTask()
MoveTimer() -> moveTask()
From the above call chain, there is a function that will be called: moveTask()
func (tw *TimingWheel) moveTask(task baseEntry) {
// timers: Map => Get [positionEntry「pos, task」] by key
val, ok := tw.timers.Get(task.key)
if !ok {
return
}
timer := val.(*positionEntry)
// {delay <interval} => The delay time is less than a time grid interval, and there is no smaller scale, indicating that the task should be executed immediately
if task.delay < tw.interval {
threading.GoSafe(func() {
tw.execute(timer.item.key, timer.item.value)
})
return
}
// If> interval, the new pos, circle in the time wheel is calculated by the delay time delay
pos, circle := tw.getPositionAndCircle(task.delay)
if pos >= timer.pos {
timer.item.circle = circle
// Move offset before and after recording. To re-enter the team for later process
timer.item.diff = pos - timer.pos
} else if circle > 0 {
// Move to the next layer and convert circle to part of diff
circle--
// Because it is an array, add numSlots [that is equivalent to going to the next level]
timer.item.diff = tw.numSlots + pos - timer.pos
} else {
// Mark to delete the old task, and re-enter the team, waiting to be executed
timer.item.removed = true
newItem := &timingEntry{
baseEntry: task,
value: timer.item.value,
}
tw.slots[pos].PushBack(newItem)
tw.setTimerPosition(pos, newItem)
}
}
delay <internal
: Because <single time precision, it means that this task has expired and needs to be executed immediately- For changed
delay
:new >= old
:<newPos, newCircle, diff>
newCircle> 0
: Calculate diff and convert circle to the next layer, so diff + numslots- If only the delay time is shortened, delete the old task mark, re-add it to the list, and wait for the next loop to be executed
In the previous initialization, the timer in run()
kept advancing, and the process of advancing was mainly to pass the tasks in the list to the executed execute func
. Let’s start with the execution of the timer:
// Timer "It will be executed every internal"
func (tw *TimingWheel) onTick() {
// Update the current execution tick position every time it is executed
tw.tickedPos = (tw.tickedPos + 1) % tw.numSlots
// Get the doubly linked list of stored tasks in the tick position at this time
l := tw.slots[tw.tickedPos]
tw.scanAndRunTasks(l)
}
Next is how to execute execute
:
The specific branching situation is explained in the comments. When you look at it, it can be combined with the previous moveTask()
, where the circle
drops, and the calculation of diff
is the key to linking the two functions.
As for the calculation of diff
, the calculation of pos, circle
is involved:
// interval: 4min, d: 60min, numSlots: 16, tickedPos = 15
// step = 15, pos = 14, circle = 0
func (tw *TimingWheel) getPositionAndCircle(d time.Duration) (pos int, circle int) {
steps := int(d / tw.interval)
pos = (tw.tickedPos + steps) % tw.numSlots
circle = (steps - 1) / tw.numSlots
return
}
The above process can be simplified to the following:
steps = d / interval
pos = step % numSlots - 1
Summary
The timingWheel
is driven by the timer. As the time advances, the tasks of the list
“doubly linked list” in the current time grid will be taken out and passed to the execute
for execution.
In terms of time separation, the time wheel has circle
layers, so that the original numSlots
can be reused continuously, because the timer is constantly loop
, and execution can drop the upper layer slot
to the lower layer. You can execute the task up to the upper level continuously in the loop
.
There are many useful component tools in . Good use of tools is of great help to improve service performance and development efficiency. I hope this article can bring you some gains.