其中sql_query是sphinx每次从mysql拉取数据的sql,而sql_query_range则是取得需要从mysql拉取的数据条目,而sql_rang_step则是表示每次从mysql拉取多少数据。sql_rang_range执行分两种情况,第一种是第一次拉取数据的时候,第二种是当当前的range数据读取完毕之后。

    首先来看CSphSource_SQL::NextDocument函数,这个函数的主要作用是从mysql读取数据然后切分保存,首先我们来看读取数据这一部分,这里步骤很简单,就是执行对应的sql,然后判断当前range的数据是否读取完毕,如果读取完毕则继续执行sql_query_rang(RunQueryStep)。这里要注意的是,sphinx读取数据是一条一条的读取然后执行的.

    1. {
    2. // try to get next row
    3. bool bGotRow = SqlFetchRow ();
    4. // when the party's over...
    5. while ( !bGotRow )
    6. {
    7. // is that an error?
    8. if ( SqlIsError() )
    9. {
    10. sError.SetSprintf ( "sql_fetch_row: %s", SqlError() );
    11. m_tDocInfo.m_uDocID = 1; // 0 means legal eof
    12. return NULL;
    13. }
    14. // maybe we can do next step yet?
    15. if ( !RunQueryStep ( m_tParams.m_sQuery.cstr(), sError ) )
    16. {
    17. // if there's a message, there's an error
    18. // otherwise, we're just over
    19. if ( !sError.IsEmpty() )
    20. {
    21. m_tDocInfo.m_uDocID = 1; // 0 means legal eof
    22. return NULL;
    23. }
    24. } else
    25. {
    26. // step went fine; try to fetch
    27. bGotRow = SqlFetchRow ();
    28. continue;
    29. }
    30. SqlDismissResult ();
    31. // ok, we're over
    32. ARRAY_FOREACH ( i, m_tParams.m_dQueryPost )
    33. {
    34. if ( !SqlQuery ( m_tParams.m_dQueryPost[i].cstr() ) )
    35. {
    36. sphWarn ( "sql_query_post[%d]: error=%s, query=%s",
    37. i, SqlError(), m_tParams.m_dQueryPost[i].cstr() );
    38. }
    39. SqlDismissResult ();
    40. }
    41. m_tDocInfo.m_uDocID = 0; // 0 means legal eof
    42. return NULL;
    43. }
    44. // get him!
    45. m_tDocInfo.m_uDocID = VerifyID ( sphToDocid ( SqlColumn(0) ) );
    46. m_uMaxFetchedID = Max ( m_uMaxFetchedID, m_tDocInfo.m_uDocID );
    47. } while ( !m_tDocInfo.m_uDocID );

    上面的代码我们可以看到一个很关键的字段m_uDocID,这个字段表示当前doc的id(因此数据库的表设计必须有这个id字段).

    读取完毕数据之后,开始处理读取的数据,这里会按照字段来切分,主要是将对应的数据库字段保存到索引fielld

    1. // split columns into fields and attrs
    2. for ( int i=0; i<m_iPlainFieldsLength; i++ )
    3. {
    4. // get that field
    5. #if USE_ZLIB
    6. if ( m_dUnpack[i]!=SPH_UNPACK_NONE )
    7. {
    8. DWORD uUnpackedLen = 0;
    9. m_dFields[i] = (BYTE*) SqlUnpackColumn ( i, uUnpackedLen, m_dUnpack[i] );
    10. m_dFieldLengths[i] = (int)uUnpackedLen;
    11. continue;
    12. }
    13. #endif
    14. m_dFields[i] = (BYTE*) SqlColumn ( m_tSchema.m_dFields[i].m_iIndex );
    15. m_dFieldLengths[i] = SqlColumnLength ( m_tSchema.m_dFields[i].m_iIndex );
    16. }

    紧接着就是处理attribute,后续我们会详细介绍attribute,现在我们只需要知道它是一个类似二级索引的东西(不进入全文索引).

    这里需要注意两个地方,第一个是m_dAccum这个域,这个域是一个vector,而这个vector里面保存了CSphWordHit这个结构,我们来看这个结构的定义

    1. struct CSphWordHit
    2. {
    3. SphDocID_t m_uDocID; ///< document ID
    4. SphWordID_t m_uWordID; ///< word ID in current dictionary
    5. Hitpos_t m_uWordPos; ///< word position in current document
    6. };

    可以看到其实这个结构也就是保存了对应分词的信息.

    然后我们来看核心代码,这里主要是便利刚才从mysql得到的数据,去重然后保存数据.

    1. int iHits = 0;
    2. if ( pHits && pHits->Length() )
    3. {
    4. CSphWordHit tLastHit;
    5. tLastHit.m_uDocID = 0;
    6. tLastHit.m_uWordID = 0;
    7. tLastHit.m_uWordPos = 0;
    8. m_dAccum.Reserve ( m_dAccum.GetLength()+iHits );
    9. for ( const CSphWordHit * pHit = pHits->First(); pHit<=pHits->Last(); pHit++ )
    10. {
    11. // ignore duplicate hits
    12. if ( pHit->m_uDocID==tLastHit.m_uDocID && pHit->m_uWordID==tLastHit.m_uWordID && pHit->m_uWordPos==tLastHit.m_uWordPos )
    13. continue;
    14. // update field lengths
    15. if ( pFieldLens && HITMAN::GetField ( pHit->m_uWordPos )!=HITMAN::GetField ( tLastHit.m_uWordPos ) )
    16. pFieldLens [ HITMAN::GetField ( tLastHit.m_uWordPos ) ] = HITMAN::GetPos ( tLastHit.m_uWordPos );
    17. // accumulate
    18. m_dAccum.Add ( *pHit );
    19. tLastHit = *pHit;
    20. }
    21. if ( pFieldLens )
    22. pFieldLens [ HITMAN::GetField ( tLastHit.m_uWordPos ) ] = HITMAN::GetPos ( tLastHit.m_uWordPos );
    23. }

    做完上面这些事情之后,就需要提交数据给索引处理引擎了,这里核心的代码都是在RtIndex_t::Commit中.

    这个函数主要做两个事情,第一个提取出前面我们构造好的RtAccum_t,然后对于所有的doc进行排序,创建segment,也就是对应的索引块(ram chunk),最后调用CommitReplayable来提交ram chunk到磁盘.

    然后我们来看RtAccum_t::CreateSegment函数,这个函数用来将分词好的数据保存到ram chunk,这里需要注意两个数据结构分别是RtDoc_t和RtWord_t,这两个数据结构分别表示doc信息和分词信息.

    结构很简单,后面的注释都很详细

    1. template < typename DOCID = SphDocID_t >
    2. struct RtDoc_T
    3. {
    4. DOCID m_uDocID; ///< my document id
    5. DWORD m_uDocFields; ///< fields mask
    6. DWORD m_uHits; ///< hit count
    7. DWORD m_uHit; ///< either index into segment hits, or the only hit itself (if hit count is 1)
    8. };
    9. template < typename WORDID=SphWordID_t >
    10. struct RtWord_T
    11. {
    12. union
    13. {
    14. WORDID m_uWordID; ///< my keyword id
    15. const BYTE * m_sWord;
    16. };
    17. DWORD m_uDocs; ///< document count (for stats and/or BM25)
    18. DWORD m_uHits; ///< hit count (for stats and/or BM25)
    19. DWORD m_uDoc; ///< index into segment docs
    20. };

    然后来看代码,首先是初始化对应的写结构,可以看到都是会写到我们创建好的segment中.

    1. RtDocWriter_t tOutDoc ( pSeg );
    2. RtWordWriter_t tOutWord ( pSeg, m_bKeywordDict, iWordsCheckpoint );
    3. RtHitWriter_t tOutHit ( pSeg );

    然后就是写数据了,这里主要是做一个聚合,也就是将相同的keyword对应的属性聚合起来.

    这次就分析到这里,下次我们将会分析最核心的部分就是Sphinx如何刷新数据到磁盘.