该函数位置处于hash_join_iterator.cc
403 ~ 560行
step1:如果被驱动表迭代器没有更多的行数,更新m_state为EOR,然后返回false,表明创建hash表失败
if (!m_build_iterator_has_more_rows) { m_state = State::END_OF_ROWS; return false; }
**step2:**还原插入行缓冲区的最后一行。如果构建输入是一个嵌套循环,内部有一个过滤器,那么这是必需的。这里还不是很理解
if (m_row_buffer.Initialized() && m_row_buffer.LastRowStored() != m_row_buffer.end()) { hash_join_buffer::LoadIntoTableBuffers( m_build_input_tables, m_row_buffer.LastRowStored()->second); }
step3:清除行buffer并且将多有迭代器重新指向它。如果初始化成功,直接返回true。
if (InitRowBuffer()) { return true; }
step4:初始化了两个变量
reject_duplicate_keys
和store_rows_with_null_in_join_key
const bool reject_duplicate_keys = RejectDuplicateKeys(); const bool store_rows_with_null_in_join_key = m_join_type == JoinType::OUTER;
RejectDuplicateKeys()
函数返回值为true的话,说明拒绝哈希表中的重复键。当遇到半连接或反连接等相同键值只需要返回一条结果,不需要返回extra情况。
对于反连接与半连接可以参考:半连接&反连接
指明当前jointype为外连接JoinType::OUTER
step5:将被驱动表输入的SetNullRowFlag
清除。这是为了防止hashjoin用于独立子查询时init被调用多次的情况,不然这个标志将被之前执行的hashjoin操作污染。
m_build_input->SetNullRowFlag(/*is_null_row=*/false);
step6:开始通过迭代器从m_build_input循环读数据,
1、如果线程被kill的话,返回true。
2、当build input为空,内连接和半连接结果也会为空,然而反连接的输出将是probe input的所有行
3、当读到build 迭代器的最后一行,这说明我们不会再去在probe 迭代器中读取数据了。这时候需要我们禁止probe row保存数据
PFSBatchMode batch_mode(m_build_input.get()); for (;;) { // Termination condition within loop. int res = m_build_input->Read(); if (res == 1) { DBUG_ASSERT(thd()->is_error() || thd()->killed); // my_error should have been called. return true; } if (res == -1) { m_build_iterator_has_more_rows = false; // If the build input was empty, the result of inner joins and semijoins // will also be empty. However, if the build input was empty, the output // of antijoins will be all the rows from the probe input. if (m_row_buffer.empty() && m_join_type != JoinType::ANTI && m_join_type != JoinType::OUTER) { m_state = State::END_OF_ROWS; return false; } // As we managed to read to the end of the build iterator, this is the // last time we will read from the probe iterator. Thus, we can disable // probe row saving again (it was enabled if the hash table ran out of // memory _and_ we were not allowed to spill to disk). m_write_to_probe_row_saving = false; SetReadingProbeRowState(); return false; }
step7 :
1、请求所有表的行ID
2、存储当前位于表记录缓冲区中的行,将其放到store_row_result中
3、根据store_row_result状态进行处理
如果是*ROW_STORED*
,说明已经存储完毕,直接break
case hash_join_buffer::StoreRowResult::ROW_STORED: break;
如果是BUFFER_FULL
,说明缓存区已经满了.
如果允许的话,向磁盘操作。如果不允许向磁盘操作,就继续从probe 迭代器中读取数据,并且开启probe row保存,这样没有匹配的probe rows将被写到saving file中。在下一次refill hash表的时候,从saving file中读取probe row。
if (!m_allow_spill_to_disk) { if (m_join_type != JoinType::INNER) { // Enable probe row saving, so that unmatched probe rows are written // to the probe row saving file. After the next refill of the hash // table, we will read rows from the probe row saving file, ensuring // that we only read unmatched probe rows. InitWritingToProbeRowSavingFile(); } SetReadingProbeRowState(); return false; } // If we are not allowed to spill to disk, just go on to reading from // the probe iterator. if (!m_allow_spill_to_disk) { if (m_join_type != JoinType::INNER) { // Enable probe row saving, so that unmatched probe rows are written // to the probe row saving file. After the next refill of the hash // table, we will read rows from the probe row saving file, ensuring // that we only read unmatched probe rows. InitWritingToProbeRowSavingFile(); } SetReadingProbeRowState(); return false; }
初始化两个input的hashjoinchunk。估计需要多少chunks,planner会事先给出一个数,这里会重新计算得到每个块都合适的磁盘块。
if (InitializeChunkFiles( m_estimated_build_rows, m_row_buffer.size(), kMaxChunks, m_probe_input_tables, m_build_input_tables, /*include_match_flag_for_probe=*/m_join_type == JoinType::OUTER, &m_chunk_files_on_disk)) { DBUG_ASSERT(thd()->is_error()); // my_error should have been called. return true; }
将迭代器上剩余的数据写到磁盘的chunk file上,如果出现IO错误的话,返回true
if (WriteRowsToChunks(thd(), m_build_input.get(), m_build_input_tables, m_join_conditions, kChunkPartitioningHashSeed, &m_chunk_files_on_disk, true /* write_to_build_chunks */, false /* write_rows_with_null_in_join_key */, m_tables_to_get_rowid_for, &m_temporary_row_and_join_key_buffer)) { DBUG_ASSERT(thd()->is_error() || thd()->killed); // my_error should have been called. return true; }
从build input起始地方刷新并定位所有chunk files。
for (ChunkPair &chunk_pair : m_chunk_files_on_disk) { if (chunk_pair.build_chunk.Rewind()) { DBUG_ASSERT(thd()->is_error() || thd()->killed); // my_error should have been called. return true; } } SetReadingProbeRowState(); return false; }
如果状态为FATAL_ERROR,说明出现意料之外的错误,可能是malloc失败。返回true。
case hash_join_buffer::StoreRowResult::FATAL_ERROR: // An unrecoverable error. Most likely, malloc failed, so report OOM. // Note that we cannot say for sure how much memory we tried to allocate // when failing, so just report 'join_buffer_size' as the amount of // memory we tried to allocate. my_error(ER_OUTOFMEMORY, MYF(ME_FATALERROR), thd()->variables.join_buff_size); return true; }