PushingAsyncPipelineExecutor::PushingAsyncPipelineExecutor(QueryPipeline & pipeline_) : pipeline(pipeline_) { if (!pipeline.pushing()) throw Exception(ErrorCodes::LOGICAL_ERROR, "Pipeline for PushingPipelineExecutor must be pushing");
if (state.io.pipeline.pushing()) /// FIXME: check explicitly that insert query suggests to receive data via native protocol, { state.need_receive_data_for_insert = true; processInsertQuery(); } else if (state.io.pipeline.pulling()) { processOrdinaryQueryWithProcessors(); }
enumclassStatus { /// Processor needs some data at its inputs to proceed. /// You need to run another processor to generate required input and then call 'prepare' again. NeedData,
/// Processor cannot proceed because output port is full or not isNeeded(). /// You need to transfer data from output port to the input port of another processor and then call 'prepare' again. PortFull,
/// All work is done (all data is processed or all output are closed), nothing more to do. Finished,
/// No one needs data on output ports. /// Unneeded,
/// You may call 'work' method and processor will do some work synchronously. Ready,
/// You may call 'schedule' method and processor will return descriptor. /// You need to poll this descriptor and call work() afterwards. Async,
/// Processor wants to add other processors to pipeline. /// New processors must be obtained by expandPipeline() call. ExpandPipeline, };
SCOPE_EXIT_SAFE( if (!finished_flag) { finish(); joinThreads(); } );
if (num_threads > 1) { spawnThreads(); // start at least one thread tasks.processAsyncTasks(); joinThreads(); } else { auto slot = slots->tryAcquire(); executeSingleThread(0); }
std::optional<Chunk> RemoteSource::tryGenerate() { /// onCancel() will do the cancel if the query was sent. if (was_query_canceled) return {};
if (!was_query_sent) { /// Progress method will be called on Progress packet. query_executor->setProgressCallback([this](const Progress & value) { if (value.total_rows_to_read) addTotalRowsApprox(value.total_rows_to_read); progress(value.read_rows, value.read_bytes); });
/// Get rows_before_limit result for remote query from ProfileInfo packet. query_executor->setProfileInfoCallback([this](const ProfileInfo & info) { if (rows_before_limit && info.hasAppliedLimit()) rows_before_limit->set(info.getRowsBeforeLimit()); });
query_executor->sendQuery();
was_query_sent = true; }
Block block;
if (async_read) { auto res = query_executor->read(read_context); if (std::holds_alternative<int>(res)) { fd = std::get<int>(res); is_async_state = true; returnChunk(); }
if (add_aggregation_info) { auto info = std::make_shared<AggregatedChunkInfo>(); info->bucket_num = block.info.bucket_num; info->is_overflows = block.info.is_overflows; chunk.setChunkInfo(std::move(info)); }
ISource::Status RemoteSource::prepare() { /// Check if query was cancelled before returning Async status. Otherwise it may lead to infinite loop. if (was_query_canceled) { getPort().finish(); return Status::Finished; }
if (is_async_state) return Status::Async;
Status status = ISource::prepare(); /// To avoid resetting the connection (because of "unfinished" query) in the /// RemoteQueryExecutor it should be finished explicitly. if (status == Status::Finished) { query_executor->finish(&read_context); is_async_state = false; } return status; }
do { if (!read_context->resumeRoutine()) returnBlock();
if (read_context->is_read_in_progress.load(std::memory_order_relaxed)) { read_context->setTimer(); return read_context->epoll.getFileDescriptor(); } else { /// We need to check that query was not cancelled again, /// to avoid the race between cancel() thread and read() thread. /// (since cancel() thread will steal the fiber and may update the packet). if (was_cancelled) returnBlock();
if (auto data = processPacket(std::move(read_context->packet))) return std::move(*data); elseif (got_duplicated_part_uuids) returnrestartQueryWithoutDuplicatedUUIDs(&read_context); } } while (true); #else returnread(); #endif }
Fiber operator()(Fiber && sink)const { try { while (true) { read_context.packet = connections.receivePacketUnlocked(ReadCallback{read_context, sink}, false/* is_draining */); sink = std::move(sink).resume(); } } catch (const boost::context::detail::forced_unwind &) { /// This exception is thrown by fiber implementation in case if fiber is being deleted but hasn't exited /// It should not be caught or it will segfault. /// Other exceptions must be caught throw; } catch (...) { read_context.exception = std::current_exception(); }
return std::move(sink); } };
然后看下函数resumeRoutine():
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
boolRemoteQueryExecutorReadContext::resumeRoutine() { if (is_read_in_progress.load(std::memory_order_relaxed) && !checkTimeout()) returnfalse;
{ std::lock_guard guard(fiber_lock); if (!fiber) returnfalse;
fiber = std::move(fiber).resume();
if (exception) std::rethrow_exception(exception); }