TimingWheel

    This article introduces the delayed operation in . Delayed operation, two options can be used:

    1. Timer: The timer maintains a priority queue, executes it at the time, and then stores the tasks that need to be executed in the map
    2. The timingWheel in collection maintains an array for storing task groups, and each slot maintains a doubly linked list of tasks. When the execution starts, the timer executes the tasks in a slot every specified time.

    Scheme 2 reduces the maintenance task from the priority queue O(nlog(n)) to the doubly linked list O(1), and the execution of the task only needs to poll the tasks O(N) at a time point. Priority queue, put and delete elements O(nlog(n)).

    First, let’s first talk about the use of TimingWheel in the cache of collection:

    This is the initialization of cache and the initialization of timingWheel at the same time for key expiration processing. The parameters in turn represent:

    • interval: Time division scale
    • numSlots: time slots
    • execute: execute a function at a point in time

    The function executed in the cache is to delete the expired key, and this expiration is controlled by the timingWheel to advance the time.

    Next, let’s learn about it through the use of timingWheel by cache.

    1. func newTimingWheelWithClock(interval time.Duration, numSlots int, execute Execute, ticker timex.Ticker) (
    2. *TimingWheel, error) {
    3. tw := &TimingWheel{
    4. interval: interval, // Single time grid time interval
    5. ticker: ticker, // Timer, do time push, advance by interval
    6. slots: make([]*list.List, numSlots), // Time wheel
    7. timers: NewSafeMap(), // Store the map of task{key, value} [parameters needed to execute execute]
    8. tickedPos: numSlots - 1, // at previous virtual circle
    9. execute: execute, // Execution function
    10. numSlots: numSlots, // Initialize slots num
    11. setChannel: make(chan timingEntry), // The following channels are used for task delivery
    12. moveChannel: make(chan baseEntry),
    13. removeChannel: make(chan interface{}),
    14. drainChannel: make(chan func(key, value interface{})),
    15. stopChannel: make(chan lang.PlaceholderType),
    16. }
    17. // Prepare all the lists stored in the slot
    18. tw.initSlots()
    19. // Open asynchronous coroutine, use channel for task communication and delivery
    20. go tw.run()
    21. return tw, nil
    22. }

    The above is a more intuitive display of the “time wheel” of the timingWheel, and the details of the advancement will be explained later around this picture.

    1. func (tw *TimingWheel) run() {
    2. for {
    3. select {
    4. // Timer do time push -> scanAndRunTasks()
    5. tw.onTick()
    6. // add task will enter task into setChannel
    7. case task := <-tw.setChannel:
    8. tw.setTask(&task)
    9. ...
    10. }
    11. }

    It can be seen that the timer execution is started at the time of initialization, and it is rotated in the internal time period, and then the bottom layer keeps getting the tasks from the list in the slot and handing them over to the execute for execution.

    3bbddc1ebb79455da91dfcf3da6bc72f~tplv-k3u1fbpfcp-zoom-1.image.png

    The next step is to set the cache key:

    1. First look at whether this key exists in the data map
    2. If it exists, update expire -> MoveTimer()
    3. Set the key for the first time -> SetTimer()

    So the use of timingWheel is clear. Developers can add or update according to their needs.

    At the same time, when we follow the source code, we will find that: SetTimer() MoveTimer() all transports tasks to channel, and the coroutine opened in run() continuously takes out the task operations of channel.

    SetTimer() -> setTask()

    • not exist task:getPostion -> pushBack to list -> setPosition
    • exist task:get from timers -> moveTask()

    MoveTimer() -> moveTask()

    From the above call chain, there is a function that will be called: moveTask()

    1. func (tw *TimingWheel) moveTask(task baseEntry) {
    2. // timers: Map => Get [positionEntry「pos, task」] by key
    3. val, ok := tw.timers.Get(task.key)
    4. if !ok {
    5. return
    6. }
    7. timer := val.(*positionEntry)
    8. // {delay <interval} => The delay time is less than a time grid interval, and there is no smaller scale, indicating that the task should be executed immediately
    9. if task.delay < tw.interval {
    10. threading.GoSafe(func() {
    11. tw.execute(timer.item.key, timer.item.value)
    12. })
    13. return
    14. }
    15. // If> interval, the new pos, circle in the time wheel is calculated by the delay time delay
    16. pos, circle := tw.getPositionAndCircle(task.delay)
    17. if pos >= timer.pos {
    18. timer.item.circle = circle
    19. // Move offset before and after recording. To re-enter the team for later process
    20. timer.item.diff = pos - timer.pos
    21. } else if circle > 0 {
    22. // Move to the next layer and convert circle to part of diff
    23. circle--
    24. // Because it is an array, add numSlots [that is equivalent to going to the next level]
    25. timer.item.diff = tw.numSlots + pos - timer.pos
    26. } else {
    27. // Mark to delete the old task, and re-enter the team, waiting to be executed
    28. timer.item.removed = true
    29. newItem := &timingEntry{
    30. baseEntry: task,
    31. value: timer.item.value,
    32. }
    33. tw.slots[pos].PushBack(newItem)
    34. tw.setTimerPosition(pos, newItem)
    35. }
    36. }
    • delay <internal: Because <single time precision, it means that this task has expired and needs to be executed immediately
    • For changed delay:
      • new >= old<newPos, newCircle, diff>
      • newCircle> 0: Calculate diff and convert circle to the next layer, so diff + numslots
      • If only the delay time is shortened, delete the old task mark, re-add it to the list, and wait for the next loop to be executed

    In the previous initialization, the timer in run() kept advancing, and the process of advancing was mainly to pass the tasks in the list to the executed execute func. Let’s start with the execution of the timer:

    1. // Timer "It will be executed every internal"
    2. func (tw *TimingWheel) onTick() {
    3. // Update the current execution tick position every time it is executed
    4. tw.tickedPos = (tw.tickedPos + 1) % tw.numSlots
    5. // Get the doubly linked list of stored tasks in the tick position at this time
    6. l := tw.slots[tw.tickedPos]
    7. tw.scanAndRunTasks(l)
    8. }

    Next is how to execute execute:

    The specific branching situation is explained in the comments. When you look at it, it can be combined with the previous moveTask(), where the circle drops, and the calculation of diff is the key to linking the two functions.

    As for the calculation of diff, the calculation of pos, circle is involved:

    1. // interval: 4min, d: 60min, numSlots: 16, tickedPos = 15
    2. // step = 15, pos = 14, circle = 0
    3. func (tw *TimingWheel) getPositionAndCircle(d time.Duration) (pos int, circle int) {
    4. steps := int(d / tw.interval)
    5. pos = (tw.tickedPos + steps) % tw.numSlots
    6. circle = (steps - 1) / tw.numSlots
    7. return
    8. }

    The above process can be simplified to the following:

    1. steps = d / interval
    2. pos = step % numSlots - 1

    Summary

    The timingWheel is driven by the timer. As the time advances, the tasks of the list “doubly linked list” in the current time grid will be taken out and passed to the execute for execution.

    In terms of time separation, the time wheel has circle layers, so that the original numSlots can be reused continuously, because the timer is constantly loop, and execution can drop the upper layer slot to the lower layer. You can execute the task up to the upper level continuously in the loop.

    There are many useful component tools in . Good use of tools is of great help to improve service performance and development efficiency. I hope this article can bring you some gains.