Mysql在执行query语句的时候会在server层计算每个可选索引的代价,并选择代价最小的索引作为访问路径(access path)去引擎读取数据。 server层的handler类为引擎层提供一个框架来计算索引的代价。

  1. scan_time:计算全表扫描需要执行时间
  2. records_in_range:计算索引在search condition范围内包含多少行数据
  3. read_time:计算索引range query执行时间

Tokudb的records_in_range:根据search condition区间的大小做不同的处理。

  • 如果search condition为NULL,并且start_key和end_key均为NULL,这个函数调用estimate_num_rows去读ft的内存统计信息in_memory_stats.numrows,得到索引有多少个pair,也即unique key的个数。因为mysql的二级索引的key都会拼上pk,到了索引层所有的key都是unique的。
  • 把search condition的start_key和end_key封装成ft的key,调用ft的keys_range64函数计算落在区间的key个数。less表示小于start_key的个数,equal1表示等于start_key的个数,middle表示大于等于start key且小于end_key的个数,equal2表示等于end_key的个数,greater表示大于等于end_key的个数。这个函数递归计算,代码比较多,但是不复杂。
  • 如果start_key和end_key落在同一个basement节点,就读取那个basement节点并把满足条件的记录条数返回给server层。
  • 如果search conditionn跨越多个basement节点,就需要把索引中存储的键值key个数(in_memory_stats.numrows)分摊到从root到leaf路径上的每一层节点上,这样得到每一层节点的权重。然后把start_key到end_key区间在每一层节点上的权重累加起来得到区间的记录条数。

当start_key和end_key不在同一个basement节点时,keys_range64是通过估算得到记录条数的。 估算的值受FT tree layout影响,FT tree可以是瘦高的,也可以是扁平的,不同的layout计算的结果可能会差别比较大。 而且,tokudb的键值key个数(in_memory_stats.numrows)也是个统计值,是每次在leaf节点做msn apply更新的,这个值也可能不准确。

用户创建的表可能只有数据没有索引,也可能有好几个索引。optimizer选择索引的过程:用search condition找出可用索引的集合,然后尝试用每个可选索引计算代价,也就是计算read_time。这个值是根据records_in_range返回的记录条数计算出来的。 Optimizer会选择代价最小的索引(在server层被称为access path)去引擎里面取数据,访问access path的方式可能是index point query/index range query也可能是full index scan,也可能是full table scan。

Read data

选定了索引,server层会把索引信息(keynr)传给引擎,在引擎层创建索引的cursor去读取数据。一般来说,对于full table scan引擎层会隐式转成pk index scan。

我们先来看一下full table scan的函数:

  • rnd_init: 调用index_init隐式转为pk index scan,并锁表
  • rnd_next:调用get_next读取下一行数据,这个函数后面会详细讨论
  • rnd_end:结束scan
  1. int error = 0;
  2. range_lock_grabbed = false;
  3. error = index_init(MAX_KEY, 0);
  4. if (error) { goto cleanup;}
  5. if (scan) {
  6. error = prelock_range(NULL, NULL);
  7. if (error) { goto cleanup; }
  8. range_lock_grabbed = true;
  9. }
  10. error = 0;
  11. cleanup:
  12. if (error) {
  13. index_end();
  14. last_cursor_error = error;
  15. }
  16. TOKUDB_HANDLER_DBUG_RETURN(error);
  17. }
  18. int ha_tokudb::rnd_next(uchar * buf) {
  19. ha_statistic_increment(&SSV::ha_read_rnd_next_count);
  20. int error = get_next(buf, 1, NULL, false);
  21. TOKUDB_HANDLER_DBUG_RETURN(error);
  22. }
  23. int ha_tokudb::rnd_end() {
  24. range_lock_grabbed = false;
  25. TOKUDB_HANDLER_DBUG_RETURN(index_end());
  26. }

Index scan

ICP

当search condition非空时,server层可能会选择使用ICP (index condition pushdown),把search condition下推到引擎层来做过滤。

  • keyno_arg:server层选的索引index:keyno_arg
  • idx_cond_arg:server层的search condition,可能包含过滤条件
  1. Item* ha_tokudb::idx_cond_push(uint keyno_arg, Item* idx_cond_arg) {
  2. toku_pushed_idx_cond_keyno = keyno_arg;
  3. toku_pushed_idx_cond = idx_cond_arg;
  4. return idx_cond_arg;
  5. }

Index init

前面提到full table scan会隐式转为pk index scan,在rnd_init中调用index_init把tokudb_active_index设置为primary_key。 Handler类成员active_index表示当前索引index,这个值等于MAX_KEY(64)表示full table scan。 Tokudb类成员tokudb_active_index表示tokudb当前的索引index,一般来说这个值跟active_index是一样的。 Full table scan是个例外,active_index等于MAX_KEY,tokudb_active_index等于primary_key。 Index_init中最重要的工作就是创建cursor,并且重置bulk fetch信息。bulk fetch将在get_next函数中详细讨论。

  1. int ha_tokudb::index_init(uint keynr, bool sorted) {
  2. int error;
  3. THD* thd = ha_thd();
  4. /*
  5. Under some very rare conditions (like full joins) we may already have
  6. an active cursor at this point
  7. */
  8. if (cursor) {
  9. int r = cursor->c_close(cursor);
  10. assert(r==0);
  11. remove_from_trx_handler_list();
  12. }
  13. active_index = keynr;
  14. if (active_index < MAX_KEY) {
  15. DBUG_ASSERT(keynr <= table->s->keys);
  16. } else {
  17. DBUG_ASSERT(active_index == MAX_KEY);
  18. keynr = primary_key;
  19. }
  20. tokudb_active_index = keynr;
  21. #if TOKU_CLUSTERING_IS_COVERING
  22. if (keynr < table->s->keys && table->key_info[keynr].option_struct->clustering)
  23. key_read = false;
  24. #endif
  25. last_cursor_error = 0;
  26. range_lock_grabbed = false;
  27. range_lock_grabbed_null = false;
  28. DBUG_ASSERT(share->key_file[keynr]);
  29. cursor_flags = get_cursor_isolation_flags(lock.type, thd);
  30. if (use_write_locks) {
  31. cursor_flags |= DB_RMW;
  32. }
  33. if (get_disable_prefetching(thd)) {
  34. cursor_flags |= DBC_DISABLE_PREFETCHING;
  35. }
  36. if ((error = share->key_file[keynr]->cursor(share->key_file[keynr], transaction, &cursor, cursor_flags))) {
  37. last_cursor_error = error;
  38. cursor = NULL; // Safety
  39. goto exit;
  40. }
  41. cursor->c_set_check_interrupt_callback(cursor, tokudb_killed_thd_callback, thd);
  42. memset((void *) &last_key, 0, sizeof(last_key));
  43. add_to_trx_handler_list();
  44. if (thd_sql_command(thd) == SQLCOM_SELECT) {
  45. set_query_columns(keynr);
  46. unpack_entire_row = false;
  47. }
  48. else {
  49. unpack_entire_row = true;
  50. }
  51. invalidate_bulk_fetch();
  52. doing_bulk_fetch = false;
  53. maybe_index_scan = false;
  54. error = 0;
  55. exit:
  56. TOKUDB_HANDLER_DBUG_RETURN(error);
  57. }

Prepare index

初始化cursor之后,server层会调下面四个函数之一去拿区间的range锁。

  • prepare_index_scan
  • prepare_index_key_scan
  • prepare_range_scan
  • read_range_first

这四个函数都是调用prelock_range去拿rangelock。

  • prepare_index_scan拿的是<负无穷,正无穷>区间的rangelock,其实就是锁表。
  • prepare_index_key_scan只拿对应key的rangelock,
  • prepare_range_scan和read_range_first都是拿区间的rangelock。前者是处理reverse index range scan的,后者是处理index range scan的。

Start_key和end_key就是server层传下来的range区间的起点和终点,是server层的数据结构,prelock_range会生成相应的索引key并获取索引key的rangelock。

Full table scan的时候,rnd_init直接调用prelock_range拿<负无穷,正无穷>区间的rangelock,也就是锁表。 由于full table scan转pk index scan是在引擎内部做隐式转换,sever层并不知道,不走prepare_index_scan。

  1. static int
  2. c_set_bounds(DBC *dbc, const DBT *left_key, const DBT *right_key, bool pre_acquire, int out_of_range_error) {
  3. if (out_of_range_error != DB_NOTFOUND &&
  4. out_of_range_error != TOKUDB_OUT_OF_RANGE &&
  5. out_of_range_error != 0) {
  6. return toku_ydb_do_error(
  7. dbc->dbp->dbenv,
  8. EINVAL,
  9. "Invalid out_of_range_error [%d] for %s\n",
  10. out_of_range_error,
  11. __FUNCTION__
  12. );
  13. }
  14. if (left_key == toku_dbt_negative_infinity() && right_key == toku_dbt_positive_infinity()) {
  15. out_of_range_error = 0;
  16. }
  17. DB *db = dbc->dbp;
  18. DB_TXN *txn = dbc_struct_i(dbc)->txn;
  19. HANDLE_PANICKED_DB(db);
  20. toku_ft_cursor_set_range_lock(dbc_ftcursor(dbc), left_key, right_key,
  21. (left_key == toku_dbt_negative_infinity()),
  22. (right_key == toku_dbt_positive_infinity()),
  23. out_of_range_error);
  24. if (!db->i->lt || !txn || !pre_acquire)
  25. return 0;
  26. //READ_UNCOMMITTED and READ_COMMITTED transactions do not need read locks.
  27. if (!dbc_struct_i(dbc)->rmw && dbc_struct_i(dbc)->iso != TOKU_ISO_SERIALIZABLE)
  28. return 0;
  29. toku::lock_request::type lock_type = dbc_struct_i(dbc)->rmw ?
  30. toku::lock_request::type::WRITE : toku::lock_request::type::READ;
  31. int r = toku_db_get_range_lock(db, txn, left_key, right_key, lock_type);
  32. return r;
  33. }

Read index

如果server层指定了range的start_key和end_key,handler的执行框架会根据execution plan指定的方式访问索引数据。

第一行数据的访问方式:

  • Index point query:server层直接调用index_read
  • Index range scan且start_key非空:server层通常是调用read_range_first函数读取第一行数据。read_range_first最终也是调用index_read
  • Index range scan且start_key为空:server层直接调用index_first
  • Index reverse range scan且end_key非空:server层通常是调用index_read读数据
  • Index reverse range scan且end_key为空:server层直接调用index_last
  • Full index scan:server层直接调用index_first
  • Reverse full index scan:server层直接调用index_last

Index_read函数比较长,举几个常见的场景来说明 1) index point query:

  • HA_READ_KEY_EXACT

2) index range query:

  • HA_READ_AFTER_KEY:处理大于start_key的情况
  • HA_READ_KEY_OR_NEXT:处理大于等于start_key的情况

3) reverse index range query:

  • HA_READ_BEFORE_KEY:处理小于end_key的情况
  • HA_READ_PREFIX_LAST_OR_PREV:处理小于等于end_key的情况

Index_read在调用ydb_cursor.cc中的回调函数时,flags参数初始化为0。ydb_cursor.cc中注册的回调函数会检查tokudb_cursor->rmw标记,如果tokudb_cursor->rmw是0,并且不是SERIALIZABLE隔离级别,函数 query_context_with_input_init会设置context的do_locking字段,告诉toku_ft_cursor_set_range在成功返回前去拿rangelock。

  1. static int
  2. c_getf_set_range_with_bound(DBC *c, uint32_t flag, DBT *key, DBT *key_bound, YDB_CALLBACK_FUNCTION f, void *extra) {
  3. HANDLE_PANICKED_DB(c->dbp);
  4. HANDLE_CURSOR_ILLEGAL_WORKING_PARENT_TXN(c);
  5. int r = 0;
  6. QUERY_CONTEXT_WITH_INPUT_S context; //Describes the context of this query.
  7. query_context_with_input_init(&context, c, flag, key, NULL, f, extra);
  8. while (r == 0) {
  9. //toku_ft_cursor_set_range will call c_getf_set_range_callback(..., context) (if query is successful)
  10. r = toku_ft_cursor_set_range(dbc_ftcursor(c), key, key_bound, c_getf_set_range_callback, &context);
  11. if (r == DB_LOCK_NOTGRANTED) {
  12. r = toku_db_wait_range_lock(context.base.db, context.base.txn, &context.base.request);
  13. } else {
  14. break;
  15. }
  16. }
  17. query_context_base_destroy(&context.base);
  18. return r;
  19. }
  20. static int
  21. c_getf_set_range_callback(uint32_t keylen, const void *key, uint32_t vallen, const void *val, void *extra, bool lock_only) {
  22. QUERY_CONTEXT_WITH_INPUT super_context = (QUERY_CONTEXT_WITH_INPUT) extra;
  23. QUERY_CONTEXT_BASE context = &super_context->base;
  24. int r;
  25. DBT found_key = { .data = (void *) key, .size = keylen };
  26. //Lock:
  27. // left(key,val) = (input_key, -infinity)
  28. // right(key) = found ? found_key : infinity
  29. // right(val) = found ? found_val : infinity
  30. if (context->do_locking) {
  31. const DBT *left_key = super_context->input_key;
  32. const DBT *right_key = key != NULL ? &found_key : toku_dbt_positive_infinity();
  33. r = toku_db_start_range_lock(context->db, context->txn, left_key, right_key, query_context_determine_lock_type(context), &context->request);
  34. } else {
  35. r = 0;
  36. }
  37. //Call application-layer callback if found and locks were successfully obtained.
  38. if (r==0 && key!=NULL && !lock_only) {
  39. DBT found_val = { .data = (void *) val, .size = vallen };
  40. context->r_user_callback = context->f(&found_key, &found_val, context->f_extra);
  41. r = context->r_user_callback;
  42. }
  43. //Give ft-layer an error (if any) to return from toku_ft_cursor_set_range
  44. return r;
  45. }

Get next

取到第一行数据后,server层会根据execution plan来调用 index_next(index_next_same)或者index_prev来取后面的记录。

  • index_next_same:读取相同index的下一个记录。
  • index_next:读取range区间内的下一个记录,如果设置ICP,还会对找到的记录进行过滤条件匹配。

还有两个读取数据的方法:

  • index_first:读取index的第一条记录
  • index_last:读取index最后一条记录

这几个函数比较简单,这里只分析index_next函数,感兴趣的朋友可以自行分析其余的函数。 index_next直接调用get_next函数读取下一条记录。

  1. int ha_tokudb::index_next(uchar * buf) {
  2. TOKUDB_HANDLER_DBUG_ENTER("");
  3. ha_statistic_increment(&SSV::ha_read_next_count);
  4. int error = get_next(buf, 1, NULL, key_read);
  5. TOKUDB_HANDLER_DBUG_RETURN(error);
  6. }

Bulk fetch

Tokudb为range query做了一个优化,被称作bulk fetch。对当前basement节点上落在range区间的key进行批量读取,一次msg apply多次读key操作,同时也减轻leaf节点读写锁争抢,避免频繁拿锁放锁。 Tokudb为提供bulk fetch功能,增加了如下几个数据成员:

  • doing_bulk_fetch:标记是否正在进行bulk fetch
  • range_query_buff:缓存批量读取数据的buffer
  • size_range_query_buff:range_query_buff的malloc_size
  • bytes_used_in_range_query_buff:range_query_buff的实际size
  • curr_range_query_buff_offset:range_query_buff的当前位置
  • bulk_fetch_iteration和rows_fetched_using_bulk_fetch是统计数据,控制批量大小
  1. class ha_tokudb : public handler {
  2. private:
  3. ...
  4. uchar* range_query_buff; // range query buffer
  5. uint32_t size_range_query_buff; // size of the allocated range query buffer
  6. uint32_t bytes_used_in_range_query_buff; // number of bytes used in the range query buffer
  7. uint32_t curr_range_query_buff_offset; // current offset into the range query buffer for queries to read
  8. uint64_t bulk_fetch_iteration;
  9. uint64_t rows_fetched_using_bulk_fetch;
  10. bool doing_bulk_fetch;
  11. ...
  12. };

Get_next函数首先判读是否可以从当前的bulk fetch bufffer中读取数据,判读的标准是bytes_used_in_range_query_buff - curr_range_query_buff_offset > 0,表示bulk fetch buffer有数据可以读取。

  1. 如果条件成立,调用read_data_from_range_query_buff直接从bulk fetch buffer中读数据。
  2. 如果bulk fetch buffer没有数据可读了,需要检查icp_went_out_of_range判断是否已超出range范围,那样的话表示没有更多数据,可以直接返回。
  3. 如果前面两个条件都不满足,需要调用cursor读取后面的数据。如果是bulk fetch的情况,需要调用invalidate_bulk_fetch重置bulf fetch的数据结构。

如果用户禁掉bulk fetch的功能,该如何处理呢?禁掉bulk fetch,第1和第2两个条件都不满足,直接执行invalidate_bulk_fetch,然后检查doing_bulk_fetch标记为false,调用cursor读取数据。这部分比较简单,请读者自行分析。

smart_dbt_bf_info结构告诉回调函数如何缓存当前数据,并且如何读取下一行数据。

  • ha:tokudb handler指针
  • need_val:bulk fetch buffer是否要缓存value,对于pk和cluster index情况设置成true,其他情况为false
  • director:读取数据的方向。1表示next,-1表示prev
  • thd:server层的线程指针
  • buf:server层提供的buffer
  • key_to_compare:比较key,只有index_next_same需要设置这个参数
  1. typedef struct smart_dbt_bf_info {
  2. ha_tokudb* ha;
  3. bool need_val;
  4. int direction;
  5. THD* thd;
  6. DBT* key_to_compare;
  7. } *SMART_DBT_BF_INFO

一个批量读取完成后,get_next会调用read_data_from_range_query_buff,从bulk fetch buffer中取数据。

Get_next成功读取一行数据后,需要判断是否是需要回表读取row。对于pk和cluster index的情况,index中存储了完整数据不需要回表。

下面我们一起看看回调函数smart_dbt_bf_callback的处理。这个函数是fill_range_query_buf简单封装,当成功读取一行索引数据后,把结果缓存到bulk fetch buffer中,并继续读取下一行数据。

  1. static int smart_dbt_bf_callback(
  2. DBT const* key,
  3. DBT const* row,
  4. void* context) {
  5. SMART_DBT_BF_INFO info = (SMART_DBT_BF_INFO)context;
  6. return
  7. info->ha->fill_range_query_buf(
  8. info->need_val,
  9. key,
  10. row,
  11. info->direction,
  12. info->thd,
  13. info->buf,
  14. info->key_to_compare);
  15. }

接下来,让我们把目光聚焦在fill_range_query_buf函数。参数key和value是当前读取到索引key和value;其余的参数是从smart_dbt_bf_info中结构提取出来的server层调用时指定的信息。

如果指定了key_to_compare,需要判断当前读取的key是否等于key_to_compare,因为二级索引的key后面拼了pk,所以这里做的是前缀比较。如果前缀不匹配,表示已经读到一个新key,设置icp_went_out_of_range并退出。

如果server层设置了ICP信息,需要判断当前读取的索引key是否在range范围内。 一般来说,判断是否在range范围内的方法是跟prelocked_right_range(range scan)或者prelocked_left_range(reverse range scan)比较的。 而ICP的情况下,判断是否在range范围内是跟end_range做比较的。 对于索引key不在range范围内的情况,设置icp_went_out_of_range并返回。

如果当前读到的索引key是在range范围内,ICP的情况还要做过滤条件检查。如果满足过滤条件,就存储到bulk fetch buffer中;不满足过滤条件,就跳过这条记录取下一条。

把key存储到bulk fetch buffer中时,需要检查need_val。为true时,先存key后存value;否则,只存key。 Value要存的数据可能是整个row,可能是set_query_columns函数记录的那些字段的数据。如果是第二种情况,需要把相应字段的数据提取出来。 Bulk fetch buffer中的数据按照一定格式存储,先存4个字节的size,接着存data。 当前key的存储位置是在bytes_used_in_range_query_buff偏移位置。 把key/value数据缓存到bulk fetch buffer中以后,还需要更新bytes_used_in_range_query_buff指向下一次写入的位置。

对于非ICP的情况,在fill_range_query_buf函数的最后判断是否超出range范围。这里跟prelocked_right_range(range scan)或者prelocked_left_range(reverse range scan)做比较。这两个值是在prelock_range函数设置的,也是rangelock的范围。

如果当前读取的key属于range范围内,需要继续读取下一条数据到bulk fetch buffer中,fill_range_query_buf返回TOKUDB_CURSOR_CONTINUE告诉toku_ft_search继续读取当前basement节点的下一条数据。 bulk fetch不能跨越basement节点,因为无法保证其他basement节点上是否做过msg apply。

  1. int ha_tokudb::fill_range_query_buf(
  2. bool need_val,
  3. DBT const* key,
  4. DBT const* row,
  5. int direction,
  6. THD* thd,
  7. uchar* buf,
  8. DBT* key_to_compare) {
  9. int error;
  10. //
  11. // first put the value into range_query_buf
  12. //
  13. uint32_t size_remaining =
  14. size_range_query_buff - bytes_used_in_range_query_buff;
  15. uint32_t size_needed;
  16. uint32_t user_defined_size = tokudb::sysvars::read_buf_size(thd);
  17. uchar* curr_pos = NULL;
  18. if (key_to_compare) {
  19. int cmp = tokudb_prefix_cmp_dbt_key(
  20. share->key_file[tokudb_active_index],
  21. key_to_compare,
  22. key);
  23. if (cmp) {
  24. icp_went_out_of_range = true;
  25. error = 0;
  26. goto cleanup;
  27. }
  28. }
  29. // if we have an index condition pushed down, we check it
  30. if (toku_pushed_idx_cond &&
  31. (tokudb_active_index == toku_pushed_idx_cond_keyno)) {
  32. unpack_key(buf, key, tokudb_active_index);
  33. enum icp_result result =
  34. toku_handler_index_cond_check(toku_pushed_idx_cond);
  35. // If we have reason to stop, we set icp_went_out_of_range and get out
  36. // otherwise, if we simply see that the current key is no match,
  37. // we tell the cursor to continue and don't store
  38. // the key locally
  39. if (result == ICP_OUT_OF_RANGE || thd_killed(thd)) {
  40. icp_went_out_of_range = true;
  41. error = 0;
  42. DEBUG_SYNC(ha_thd(), "tokudb_icp_asc_scan_out_of_range");
  43. goto cleanup;
  44. } else if (result == ICP_NO_MATCH) {
  45. // if we are performing a DESC ICP scan and have no end_range
  46. // to compare to stop using ICP filtering as there isn't much more
  47. // that we can do without going through contortions with remembering
  48. // and comparing key parts.
  49. if (!end_range &&
  50. direction < 0) {
  51. cancel_pushed_idx_cond();
  52. DEBUG_SYNC(ha_thd(), "tokudb_icp_desc_scan_invalidate");
  53. }
  54. error = TOKUDB_CURSOR_CONTINUE;
  55. goto cleanup;
  56. }
  57. }
  58. // at this point, if ICP is on, we have verified that the key is one
  59. // we are interested in, so we proceed with placing the data
  60. // into the range query buffer
  61. if (need_val) {
  62. if (unpack_entire_row) {
  63. size_needed = 2*sizeof(uint32_t) + key->size + row->size;
  64. } else {
  65. // this is an upper bound
  66. size_needed =
  67. // size of key length
  68. sizeof(uint32_t) +
  69. // key and row
  70. key->size + row->size +
  71. // lengths of varchars stored
  72. num_var_cols_for_query * (sizeof(uint32_t)) +
  73. // length of blobs
  74. sizeof(uint32_t);
  75. }
  76. } else {
  77. size_needed = sizeof(uint32_t) + key->size;
  78. }
  79. if (size_remaining < size_needed) {
  80. range_query_buff =
  81. static_cast<uchar*>(tokudb::memory::realloc(
  82. static_cast<void*>(range_query_buff),
  83. bytes_used_in_range_query_buff + size_needed,
  84. MYF(MY_WME)));
  85. if (range_query_buff == NULL) {
  86. error = ENOMEM;
  87. invalidate_bulk_fetch();
  88. goto cleanup;
  89. }
  90. size_range_query_buff = bytes_used_in_range_query_buff + size_needed;
  91. }
  92. //
  93. // now we know we have the size, let's fill the buffer, starting with the key
  94. //
  95. curr_pos = range_query_buff + bytes_used_in_range_query_buff;
  96. *reinterpret_cast<uint32_t*>(curr_pos) = key->size;
  97. curr_pos += sizeof(uint32_t);
  98. memcpy(curr_pos, key->data, key->size);
  99. curr_pos += key->size;
  100. if (need_val) {
  101. if (unpack_entire_row) {
  102. *reinterpret_cast<uint32_t*>(curr_pos) = row->size;
  103. curr_pos += sizeof(uint32_t);
  104. memcpy(curr_pos, row->data, row->size);
  105. curr_pos += row->size;
  106. } else {
  107. // need to unpack just the data we care about
  108. const uchar* fixed_field_ptr = static_cast<const uchar*>(row->data);
  109. fixed_field_ptr += table_share->null_bytes;
  110. const uchar* var_field_offset_ptr = NULL;
  111. const uchar* var_field_data_ptr = NULL;
  112. var_field_offset_ptr =
  113. fixed_field_ptr +
  114. share->kc_info.mcp_info[tokudb_active_index].fixed_field_size;
  115. var_field_data_ptr =
  116. var_field_offset_ptr +
  117. share->kc_info.mcp_info[tokudb_active_index].len_of_offsets;
  118. // first the null bytes
  119. memcpy(curr_pos, row->data, table_share->null_bytes);
  120. curr_pos += table_share->null_bytes;
  121. // now the fixed fields
  122. //
  123. // first the fixed fields
  124. //
  125. for (uint32_t i = 0; i < num_fixed_cols_for_query; i++) {
  126. uint field_index = fixed_cols_for_query[i];
  127. memcpy(
  128. curr_pos,
  129. fixed_field_ptr + share->kc_info.cp_info[tokudb_active_index][field_index].col_pack_val,
  130. share->kc_info.field_lengths[field_index]);
  131. curr_pos += share->kc_info.field_lengths[field_index];
  132. }
  133. //
  134. // now the var fields
  135. //
  136. for (uint32_t i = 0; i < num_var_cols_for_query; i++) {
  137. uint field_index = var_cols_for_query[i];
  138. uint32_t var_field_index =
  139. share->kc_info.cp_info[tokudb_active_index][field_index].col_pack_val;
  140. uint32_t data_start_offset;
  141. uint32_t field_len;
  142. get_var_field_info(
  143. &field_len,
  144. &data_start_offset,
  145. var_field_index,
  146. var_field_offset_ptr,
  147. share->kc_info.num_offset_bytes);
  148. memcpy(curr_pos, &field_len, sizeof(field_len));
  149. curr_pos += sizeof(field_len);
  150. memcpy(
  151. curr_pos,
  152. var_field_data_ptr + data_start_offset,
  153. field_len);
  154. curr_pos += field_len;
  155. }
  156. if (read_blobs) {
  157. uint32_t blob_offset = 0;
  158. uint32_t data_size = 0;
  159. //
  160. // now the blobs
  161. //
  162. get_blob_field_info(
  163. &blob_offset,
  164. share->kc_info.mcp_info[tokudb_active_index].len_of_offsets,
  165. var_field_data_ptr,
  166. share->kc_info.num_offset_bytes);
  167. data_size =
  168. row->size -
  169. blob_offset -
  170. static_cast<uint32_t>((var_field_data_ptr -
  171. static_cast<const uchar*>(row->data)));
  172. memcpy(curr_pos, &data_size, sizeof(data_size));
  173. curr_pos += sizeof(data_size);
  174. memcpy(curr_pos, var_field_data_ptr + blob_offset, data_size);
  175. }
  176. }
  177. }
  178. bytes_used_in_range_query_buff = curr_pos - range_query_buff;
  179. assert_always(bytes_used_in_range_query_buff <= size_range_query_buff);
  180. //
  181. // now determine if we should continue with the bulk fetch
  182. // we want to stop under these conditions:
  183. // - we overran the prelocked range
  184. // - we are close to the end of the buffer
  185. // - we have fetched an exponential amount of rows with
  186. // respect to the bulk fetch iteration, which is initialized
  187. // to 0 in index_init() and prelock_range().
  188. rows_fetched_using_bulk_fetch++;
  189. // if the iteration is less than the number of possible shifts on
  190. // a 64 bit integer, check that we haven't exceeded this iterations
  191. // row fetch upper bound.
  192. if (bulk_fetch_iteration < HA_TOKU_BULK_FETCH_ITERATION_MAX) {
  193. uint64_t row_fetch_upper_bound = 1LLU << bulk_fetch_iteration;
  194. assert_always(row_fetch_upper_bound > 0);
  195. if (rows_fetched_using_bulk_fetch >= row_fetch_upper_bound) {
  196. error = 0;
  197. goto cleanup;
  198. }
  199. }
  200. if (bytes_used_in_range_query_buff +
  201. table_share->rec_buff_length >
  202. user_defined_size) {
  203. error = 0;
  204. goto cleanup;
  205. }
  206. if (direction > 0) {
  207. // compare what we got to the right endpoint of prelocked range
  208. // because we are searching keys in ascending order
  209. if (prelocked_right_range_size == 0) {
  210. error = TOKUDB_CURSOR_CONTINUE;
  211. goto cleanup;
  212. }
  213. DBT right_range;
  214. memset(&right_range, 0, sizeof(right_range));
  215. right_range.size = prelocked_right_range_size;
  216. right_range.data = prelocked_right_range;
  217. int cmp = tokudb_cmp_dbt_key(
  218. share->key_file[tokudb_active_index],
  219. key,
  220. &right_range);
  221. error = (cmp > 0) ? 0 : TOKUDB_CURSOR_CONTINUE;
  222. } else {
  223. // compare what we got to the left endpoint of prelocked range
  224. // because we are searching keys in descending order
  225. if (prelocked_left_range_size == 0) {
  226. error = TOKUDB_CURSOR_CONTINUE;
  227. goto cleanup;
  228. }
  229. DBT left_range;
  230. memset(&left_range, 0, sizeof(left_range));
  231. left_range.size = prelocked_left_range_size;
  232. left_range.data = prelocked_left_range;
  233. int cmp = tokudb_cmp_dbt_key(
  234. share->key_file[tokudb_active_index],
  235. key,
  236. &left_range);
  237. error = (cmp < 0) ? 0 : TOKUDB_CURSOR_CONTINUE;
  238. }
  239. cleanup:
  240. return error;
  241. }

Bulk fetch buffer数据准备好了,我们就可以从read_data_from_range_query_buff读取数据了。 Curr_range_query_buff_offset表示当前读取的位置。 首先读key信息。如果need_value为true,还要读取data信息。可能读整行数据,也可能只需要读取函数set_query_columns设置的那些字段。 读取完成之后,调整curr_range_query_buff_offset指向下一次读取的位置。

  1. int ha_tokudb::read_data_from_range_query_buff(uchar* buf, bool need_val, bool do_key_read) {
  2. // buffer has the next row, get it from there
  3. int error;
  4. uchar* curr_pos = range_query_buff+curr_range_query_buff_offset;
  5. DBT curr_key;
  6. memset((void *) &curr_key, 0, sizeof(curr_key));
  7. // get key info
  8. uint32_t key_size = *(uint32_t *)curr_pos;
  9. curr_pos += sizeof(key_size);
  10. uchar* curr_key_buff = curr_pos;
  11. curr_pos += key_size;
  12. curr_key.data = curr_key_buff;
  13. curr_key.size = key_size;
  14. // if this is a covering index, this is all we need
  15. if (do_key_read) {
  16. assert_always(!need_val);
  17. extract_hidden_primary_key(tokudb_active_index, &curr_key);
  18. read_key_only(buf, tokudb_active_index, &curr_key);
  19. error = 0;
  20. }
  21. // we need to get more data
  22. else {
  23. DBT curr_val;
  24. memset((void *) &curr_val, 0, sizeof(curr_val));
  25. uchar* curr_val_buff = NULL;
  26. uint32_t val_size = 0;
  27. // in this case, we don't have a val, we are simply extracting the pk
  28. if (!need_val) {
  29. curr_val.data = curr_val_buff;
  30. curr_val.size = val_size;
  31. extract_hidden_primary_key(tokudb_active_index, &curr_key);
  32. error = read_primary_key( buf, tokudb_active_index, &curr_val, &curr_key);
  33. }
  34. else {
  35. extract_hidden_primary_key(tokudb_active_index, &curr_key);
  36. // need to extract a val and place it into buf
  37. if (unpack_entire_row) {
  38. // get val info
  39. val_size = *(uint32_t *)curr_pos;
  40. curr_pos += sizeof(val_size);
  41. curr_val_buff = curr_pos;
  42. curr_pos += val_size;
  43. curr_val.data = curr_val_buff;
  44. curr_val.size = val_size;
  45. error = unpack_row(buf,&curr_val, &curr_key, tokudb_active_index);
  46. }
  47. else {
  48. if (!(hidden_primary_key && tokudb_active_index == primary_key)) {
  49. unpack_key(buf,&curr_key,tokudb_active_index);
  50. }
  51. // read rows we care about
  52. // first the null bytes;
  53. memcpy(buf, curr_pos, table_share->null_bytes);
  54. curr_pos += table_share->null_bytes;
  55. // now the fixed sized rows
  56. for (uint32_t i = 0; i < num_fixed_cols_for_query; i++) {
  57. uint field_index = fixed_cols_for_query[i];
  58. Field* field = table->field[field_index];
  59. unpack_fixed_field(
  60. buf + field_offset(field, table),
  61. curr_pos,
  62. share->kc_info.field_lengths[field_index]
  63. );
  64. curr_pos += share->kc_info.field_lengths[field_index];
  65. }
  66. // now the variable sized rows
  67. for (uint32_t i = 0; i < num_var_cols_for_query; i++) {
  68. uint field_index = var_cols_for_query[i];
  69. Field* field = table->field[field_index];
  70. uint32_t field_len = *(uint32_t *)curr_pos;
  71. curr_pos += sizeof(field_len);
  72. unpack_var_field(
  73. buf + field_offset(field, table),
  74. curr_pos,
  75. field_len,
  76. share->kc_info.length_bytes[field_index]
  77. );
  78. curr_pos += field_len;
  79. }
  80. // now the blobs
  81. if (read_blobs) {
  82. uint32_t blob_size = *(uint32_t *)curr_pos;
  83. curr_pos += sizeof(blob_size);
  84. error = unpack_blobs(
  85. buf,
  86. curr_pos,
  87. blob_size,
  88. true
  89. );
  90. curr_pos += blob_size;
  91. if (error) {
  92. invalidate_bulk_fetch();
  93. goto exit;
  94. }
  95. }
  96. error = 0;
  97. }
  98. }
  99. }
  100. curr_range_query_buff_offset = curr_pos - range_query_buff;
  101. exit:
  102. return error;
  103. }

Index_end

  1. int ha_tokudb::index_end() {
  2. range_lock_grabbed = false;
  3. range_lock_grabbed_null = false;
  4. if (cursor) {
  5. int r = cursor->c_close(cursor);
  6. assert_always(r==0);
  7. cursor = NULL;
  8. remove_from_trx_handler_list();
  9. last_cursor_error = 0;
  10. }
  11. active_index = tokudb_active_index = MAX_KEY;
  12. //
  13. // reset query variables
  14. //
  15. unpack_entire_row = true;
  16. read_blobs = true;
  17. read_key = true;
  18. num_fixed_cols_for_query = 0;
  19. num_var_cols_for_query = 0;
  20. invalidate_bulk_fetch();
  21. invalidate_icp();
  22. doing_bulk_fetch = false;
  23. close_dsmrr();

这就是一条query语句在tokudb引擎执行的大致过程。下个月见!