• one-thread-per-connection 每个连接一个线程

  • no-threads 所有连接共用一个线程

  • pool-of-threads 线程池

no-threads 只适用于简单的系统,并发数稍高性能就会严重下降

one-thread-per-connection 在多数情况下性能优良,是个合适的选择,生产系统也常用此配置。但在高并发、短连接的业务场景下,使用 one-thread-per-connection 会频繁得创建和销毁线程,严重影响性能

pool-of-threads 适用于高并发短连接的业务场景,线程复用,避免频繁创建和销毁线程带来的性能损耗

MariaDB 的 thread pool 在 win 和 unix 系统的实现不同,本文分析 unix 系统下的实现

thread pool 由若干个 thread_group 组成,每个 thread_group 有若干个 worker 线程,和 0~1 个 listenr 线程

server 接收到连接请求时,将这个连接分配给一个 group 处理,listener 线程负责监听请求,worker 线程处理请求内容

struct scheduler_functions 内部声明了连接建立相关的属性和方法,这里使用函数指针实现多态

每种线程调度方式,分别实例化一个struct scheduler_functions,给相关变量和函数指针赋值

  • tp_init

初始化 thread_group

  1. mysqld_main
  2. network_init
  3. /* 调用相应 thread_scheduler 下的 init 函数,这里是 tp_init */
  4. MYSQL_CALLBACK_ELSE(thread_scheduler, init, (), 0)
  5. tp_init
  6. /* thread_group_t 申请内存 */
  7. all_groups= (thread_group_t *)
  8. my_malloc(sizeof(thread_group_t) * threadpool_max_size, MYF(MY_WME|MY_ZEROFILL));
  9. /* 初始化 thread_group_t 数组 */
  10. for (uint i= 0; i < threadpool_max_size; i++)
  11. {
  12. thread_group_init(&all_groups[i], get_connection_attrib());
  13. }
  14. /* 设置最大执行时间,worker thread */
  15. pool_timer.tick_interval= threadpool_stall_limit;
  16. /* 开启 timer thread*/
  17. start_timer(&pool_timer);
  • tp_add_connection

创建 connection,将 connection 分配到一个 thread_group,插入 group 队列

唤醒或创建一个 worker thread,如果 group 中没有活跃的 worker thread

  • worker_main

从 queue 中取出和处理 event

在 group 没有 listener 时会变成 listener

没取到 event 时,进入休眠状态,等待被唤醒

最后进入休眠状态的 worker thread 最先被唤醒

  1. worker_main
  2. for(;;)
  3. {
  4. connection = get_event(&this_thread, thread_group, &ts);
  5. /* 没有 event 时,跳出循环,结束 worker thread */
  6. if (!connection)
  7. break;
  8. handle_event(connection);
  9. }
  10. get_event
  11. for(;;)
  12. {
  13. /*
  14. 取出 tp_add_connection/listener 插入的 connection
  15. */
  16. connection = queue_get(thread_group);
  17. /* 取到 connection,跳出循环,进入下一步 handle_event */
  18. if(connection)
  19. break;
  20. /* 如果没有 listener thread,这个线程变为 listener */
  21. if(!thread_group->listener)
  22. {
  23. connection = listener(current_thread, thread_group);
  24. /* listener 已变为 worker */
  25. thread_group->listener= NULL;
  26. /* 跳出循环,进入 handle_event 处理 connection */
  27. break;
  28. }
  29. /* 进入休眠状态前,尝试非阻塞方式获取一个 connection */
  30. if (io_poll_wait(thread_group->pollfd,&nev,1, 0) == 1)
  31. {
  32. thread_group->io_event_count++;
  33. connection = (connection_t *)native_event_get_userdata(&nev);
  34. break;
  35. }
  36. /*
  37. 进入休眠状态
  38. 最后休眠的 worker thread 最先被唤醒,更容易命中 cache
  39. */
  40. current_thread->woken = false;
  41. thread_group->waiting_threads.push_front(current_thread);
  42. /* 等待被唤醒 */
  43. mysql_cond_wait
  44. }
  45. /* 返回获取到的 connection */
  46. DBUG_RETURN(connection);
  47. handle_event
  48. if (!connection->logged_in)
  49. {
  50. /* login connection */
  51. err= threadpool_add_connection(connection->thd);
  52. login_connection
  53. connection->logged_in= true;
  54. }
  55. else
  56. {
  57. /* 处理 connection 请求 */
  58. err= threadpool_process_request(connection->thd);
  59. do_command
  60. }
  61. /*
  62. 告诉客户端可以读
  63. 可能会变动 connection 所属的 group
  64. */
  65. err= start_io(connection);
  66. /*
  67. group_count 允许动态改变,所以处理 connection 的 group 可能发生变动
  68. 这里检查 group 是否需要变动
  69. */
  70. thread_group_t *group =
  71. &all_groups[connection->thd->thread_id%group_count];
  72. change_group(connection, connection->thread_group, group)
  73. return io_poll_start_read(group->pollfd, fd, connection);
  • listener

listener 线程通过 epoll_wait 监听 group 关联的描述符,epoll使用一个文件描述符管理多个描述符

监听获得的 event 插入队列,如果插入前队列为空,listener 变成 worker 线程处理第一个event,其余插入队列,否则所有 event 插入队列

如果 group 中没有活跃线程,唤醒或者创建一个 worker 线程

这里考虑一种情况,如果监听只获得一个 event 且队列为空,那么这个 event 将会被 listener 处理,队列中不会插入新的 event,此时只需要 listener 变成 worker 线程处理 event,不需要再唤醒其他 worker 线程

  • timer_thread

如果 listener 不存在,且检查周期内没有监听到新的 event,则创建一个 worker thread(自动变成 listener)

如果没有活跃线程(运行状态的 worker thread),且检查周期内没有处理 event,创建一个 worker thread

关闭长时间空闲的 connection

  1. timer_thread
  2. for(;;)
  3. {
  4. /*
  5. 等待一个 tick_interval,即 thread_pool_stall_limit
  6. 正常情况下回等待至超时,只有 stop_timer 会发出 timer->cond 信号
  7. */
  8. set_timespec_nsec(ts,timer->tick_interval*1000000);
  9. err= mysql_cond_timedwait(&timer->cond, &timer->mutex, &ts);
  10. /* ETIMEDOUT 等待超时,代表 timer 没有被 shutdown */
  11. if (err == ETIMEDOUT)
  12. {
  13. timer->current_microtime= microsecond_interval_timer();
  14. /* 遍历 thread_group,查看是否 stall */
  15. for (i= 0; i < threadpool_max_size; i++)
  16. {
  17. if(all_groups[i].connection_count)
  18. check_stall(&all_groups[i]);
  19. }
  20. /* 关闭空闲连接 */
  21. if (timer->next_timeout_check <= timer->current_microtime)
  22. timeout_check(timer);
  23. }
  24. }
  25. check_stall
  26. /*
  27. thread_group->io_event_count 代表插入队列的 event 数量
  28. thread_group 没有 listener,且没有 event 在队列中,这个检查周期内 group 没有监听连接信息
  29. 唤醒或者创建一个 worker 线程,这个线程会自动成为 listener
  30. */
  31. if (!thread_group->listener && !thread_group->io_event_count)
  32. {
  33. wake_or_create_thread(thread_group);
  34. return;
  35. }
  36. /* Reset io event count */
  37. thread_group->io_event_count= 0;
  38. /*
  39. thread_group->queue_event_count 代表已经出队的 event 数量
  40. 队列非空,并且出队 event 数量为0,这个检查周期内 worker thread 没有处理 event
  41. 唤醒或创建一个 worker thread, thread_group 标记为 stalled, 下个 event 出队时 reset stalled标记
  42. */
  43. if (!thread_group->queue.is_empty() && !thread_group->queue_event_count)
  44. {
  45. thread_group->stalled= true;
  46. wake_or_create_thread(thread_group);
  47. }
  • tp_wait_begin/tp_wait_end

线程进入阻塞状态前/阻塞状态结束后,分别调用者两个函数汇报状态,用于维护 active_thread_count 即活跃线程数量

tp_wait_begin 将活跃线程数 -1,活跃线程为 0 时,唤醒或创建一个线程

tp_wait_eng 将活跃线程数 +1

这里需要注意,不是所有的等待都需要调用 tp_wait_begin,预期内短时间结束的等待,比如 mutex,可以不调用 tp_wait_begin

行锁或者磁盘IO这种长时间的等待,需要调用 tp_wait_begin,汇报这个线程暂时无法处理请求,需要开启新的线程

线程池是在 server 端的优化,避免频繁的创建和销毁线程

连接池是在 client 端的优化,减少连接创建时间,节省资源

连接池和线程池是两个不同的概念,可以同时使用