// append data to mem table [[nodiscard]] virtual Status append(ChunkPtr chunk)= 0; [[nodiscard]] virtual Status append_selective(const Chunk& src, constuint32_t* indexes, uint32_t from, uint32_t size)= 0; // all of data has been added // done will be called in pipeline executor threads virtual Status done()= 0;
// flush all data to callback, then release the memory in memory table // flush will be called in IO threads virtual Status flush(FlushCallBack callback)= 0;
用法:
1 2 3 4 5 6
auto mem_table = create(); while (!mem_table->is_full()) { mem_table->append(next_chunk()); } mem_table->done(); mem_table->flush();
Status Aggregator::spill_aggregate_data(RuntimeState* state, std::function<StatusOr<ChunkPtr>()> chunk_provider){ auto io_executor = this->spill_channel()->io_executor(); auto spiller = this->spiller(); auto spill_channel = this->spill_channel();
while (!spiller->is_full()) { auto chunk_with_st = chunk_provider(); if (chunk_with_st.ok()) { if (!chunk_with_st.value()->is_empty()) { RETURN_IF_ERROR(spiller->spill(state, chunk_with_st.value(), *io_executor, TRACKER_WITH_SPILLER_GUARD(state, spiller))); } } elseif (chunk_with_st.status().is_end_of_file()) { // chunk_provider return eos means provider has output all data from hash_map/hash_set. // then we just return OK return Status::OK(); } else { return chunk_with_st.status(); } }
// be careful close method return a not ok status // then release the pending memory // flush { SCOPED_TIMER(_spiller->metrics().write_io_timer); RETURN_IF_ERROR(block->flush()); } RETURN_IF_ERROR(_spiller->block_manager()->release_block(block));
TemporaryFileOnDiskHolder TemporaryDataOnDisk::createRegularFile(size_t max_file_size) { DiskPtr disk; if (max_file_size > 0) { auto reservation = volume->reserve(max_file_size); if (!reservation) throwException(ErrorCodes::NOT_ENOUGH_SPACE, "Not enough space on temporary disk"); disk = reservation->getDisk(); } else disk = volume->getDisk();
/// A disk can be remote and shared between multiple replicas. /// That's why we must not use Poco::TemporaryFile::tempName() here (Poco::TemporaryFile::tempName() can return the same names for different processes on different nodes). relative_path = prefix + toString(UUIDHelpers::generateV4()); }
voidWriteBufferFromFileDescriptor::nextImpl() { if (!offset()) return;
Stopwatch watch;
size_t bytes_written = 0; while (bytes_written != offset()) { ProfileEvents::increment(ProfileEvents::WriteBufferFromFileDescriptorWrite);
ssize_t res = 0; { CurrentMetrics::Increment metric_increment{CurrentMetrics::Write}; res = ::write(fd, working_buffer.begin() + bytes_written, offset() - bytes_written); }
if ((-1 == res || 0 == res) && errno != EINTR) { ProfileEvents::increment(ProfileEvents::WriteBufferFromFileDescriptorWriteFailed);
/// Don't use getFileName() here because this method can be called from destructor String error_file_name = file_name; if (error_file_name.empty()) error_file_name = "(fd = " + toString(fd) + ")"; throwFromErrnoWithPath("Cannot write to file " + error_file_name, error_file_name, ErrorCodes::CANNOT_WRITE_TO_FILE_DESCRIPTOR); }
if (res > 0) { bytes_written += res; if (throttler) throttler->add(res, ProfileEvents::LocalWriteThrottlerBytes, ProfileEvents::LocalWriteThrottlerSleepMicroseconds); } }
hash_join = makeInMemoryJoin(prev_keys_num); auto right_reader = current_bucket->startJoining(); size_t num_rows = 0; /// count rows that were written and rehashed while (Block block = right_reader.read()) { num_rows += block.rows(); addBlockToJoinImpl(std::move(block)); }
/** Flush data to disk if too much RAM is consumed. * Data can only be flushed to disk if a two-level aggregation structure is used. */ if (params.max_bytes_before_external_group_by && result.isTwoLevel() && current_memory_usage > static_cast<Int64>(params.max_bytes_before_external_group_by) && worth_convert_to_two_level) { size_t size = current_memory_usage + params.min_free_disk_space; writeToTemporaryFile(result, size); }
staticvoidwriteData(const ISerialization & serialization, const ColumnPtr & column, WriteBuffer & ostr, UInt64 offset, UInt64 limit) { /** If there are columns-constants - then we materialize them. * (Since the data type does not know how to serialize / deserialize constants.) */ ColumnPtr full_column = column->convertToFullColumnIfConst();
voidWriteBufferFromFileDescriptor::nextImpl() { if (!offset()) return;
Stopwatch watch;
size_t bytes_written = 0; while (bytes_written != offset()) { ProfileEvents::increment(ProfileEvents::WriteBufferFromFileDescriptorWrite);
ssize_t res = 0; { CurrentMetrics::Increment metric_increment{CurrentMetrics::Write}; res = ::write(fd, working_buffer.begin() + bytes_written, offset() - bytes_written); }
if ((-1 == res || 0 == res) && errno != EINTR) { ProfileEvents::increment(ProfileEvents::WriteBufferFromFileDescriptorWriteFailed);
/// Don't use getFileName() here because this method can be called from destructor String error_file_name = file_name; if (error_file_name.empty()) error_file_name = "(fd = " + toString(fd) + ")"; throwFromErrnoWithPath("Cannot write to file " + error_file_name, error_file_name, ErrorCodes::CANNOT_WRITE_TO_FILE_DESCRIPTOR); }
if (res > 0) { bytes_written += res; if (throttler) throttler->add(res, ProfileEvents::LocalWriteThrottlerBytes, ProfileEvents::LocalWriteThrottlerSleepMicroseconds); } }
TemporaryFileStream::Stat TemporaryFileStream::finishWriting() { if (isWriteFinished()) return stat;
if (out_writer) { out_writer->finalize(); /// The amount of written data can be changed after finalization, some buffers can be flushed /// Need to update the stat updateAllocAndCheck(); out_writer.reset();
/// reader will be created at the first read call, not to consume memory before it is needed } return stat; }
read
AggregatingTransform::initGenerate
1 2 3 4 5 6 7 8 9 10
Pipe pipe; { Pipes pipes;
for (auto * tmp_stream : tmp_data.getStreams()) pipes.emplace_back(Pipe(std::make_unique<SourceFromNativeStream>(tmp_stream)));
Block TemporaryFileStream::read() { if (!isWriteFinished()) throwException(ErrorCodes::LOGICAL_ERROR, "Writing has been not finished");
if (isEof()) return {};
if (!in_reader) { in_reader = std::make_unique<InputReader>(getPath(), header, getSize()); }
Block block = in_reader->read(); if (!block) { /// finalize earlier to release resources, do not wait for the destructor this->release(); } return block; }
if (column->size() != rows) throwException(ErrorCodes::CANNOT_READ_ALL_DATA, "Cannot read all data in NativeReader. Rows read: {}. Rows expected: {}", column->size(), rows); }