sr-spill

Starrocks spill(TBD)

SpillableMemTable

spill data 的临时 buffe, 根据上层的需要, 可能是有序或者无序的.

主要接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
bool is_full() const { return _tracker->consumption() >= _max_buffer_size; };
virtual bool is_empty() = 0;
size_t mem_usage() { return _tracker->consumption(); }

// append data to mem table
[[nodiscard]] virtual Status append(ChunkPtr chunk) = 0;
[[nodiscard]] virtual Status append_selective(const Chunk& src, const uint32_t* indexes, uint32_t from,
uint32_t size) = 0;
// all of data has been added
// done will be called in pipeline executor threads
virtual Status done() = 0;

// flush all data to callback, then release the memory in memory table
// flush will be called in IO threads
virtual Status flush(FlushCallBack callback) = 0;

用法:

1
2
3
4
5
6
auto mem_table = create();
while (!mem_table->is_full()) {
mem_table->append(next_chunk());
}
mem_table->done();
mem_table->flush();

UnorderedMemTable

OrderedMemTable

SpillerWriter

RawSpillerWriter

PartitionedSpillerWriter

SpillableHashJoinProbeOperator

SpillableAggregateBlockingSinkOperator

`SpillableAggregateBlockingSinkOperator::_spill_all_data(RuntimeState* state, bool should_spill_hash_table)``

Aggregator::spill_aggregate_data
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
Status Aggregator::spill_aggregate_data(RuntimeState* state, std::function<StatusOr<ChunkPtr>()> chunk_provider) {
auto io_executor = this->spill_channel()->io_executor();
auto spiller = this->spiller();
auto spill_channel = this->spill_channel();

while (!spiller->is_full()) {
auto chunk_with_st = chunk_provider();
if (chunk_with_st.ok()) {
if (!chunk_with_st.value()->is_empty()) {
RETURN_IF_ERROR(spiller->spill(state, chunk_with_st.value(), *io_executor,
TRACKER_WITH_SPILLER_GUARD(state, spiller)));
}
} else if (chunk_with_st.status().is_end_of_file()) {
// chunk_provider return eos means provider has output all data from hash_map/hash_set.
// then we just return OK
return Status::OK();
} else {
return chunk_with_st.status();
}
}

spill_channel->add_spill_task(std::move(chunk_provider));

return Status::OK();
}

Spiller

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
Status Spiller::spill(RuntimeState* state, const ChunkPtr& chunk, TaskExecutor&& executor, MemGuard&& guard) {
SCOPED_TIMER(_metrics.append_data_timer);
RETURN_IF_ERROR(task_status());
DCHECK(!chunk->is_empty());
DCHECK(!is_full());

COUNTER_UPDATE(_metrics.spill_rows, chunk->num_rows());
_spilled_append_rows += chunk->num_rows();
TRACE_SPILL_LOG << "spilled rows:" << chunk->num_rows() << ",cumulative:" << _spilled_append_rows
<< ",spiller:" << this;

if (_chunk_builder.chunk_schema()->empty()) {
_chunk_builder.chunk_schema()->set_schema(chunk);
RETURN_IF_ERROR(_serde->prepare());
}

if (_opts.init_partition_nums > 0) {
return _writer->as<PartitionedSpillerWriter*>()->spill(state, chunk, executor, guard);
} else {
return _writer->as<RawSpillerWriter*>()->spill(state, chunk, executor, guard);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
template <class TaskExecutor, class MemGuard>
Status PartitionedSpillerWriter::spill(RuntimeState* state, const ChunkPtr& chunk, TaskExecutor&& executor,
MemGuard&& guard) {
DCHECK(!chunk->is_empty());
DCHECK(!is_full());

// the last column was hash column
auto hash_column = chunk->columns().back();

{
SCOPED_TIMER(_spiller->metrics().shuffle_timer);
std::vector<uint32_t> shuffle_result;
shuffle(shuffle_result, down_cast<SpillHashColumn*>(hash_column.get()));
process_partition_data(chunk, shuffle_result,
[&chunk](SpilledPartition* partition, const std::vector<uint32_t>& selection,
int32_t from, int32_t size) {
auto mem_table = partition->spill_writer->mem_table();
(void)mem_table->append_selective(*chunk, selection.data(), from, size);
partition->mem_size = mem_table->mem_usage();
partition->num_rows += size;
});
}

DCHECK_EQ(_spiller->spilled_append_rows(), _partition_rows());

RETURN_IF_ERROR(flush_if_full(state, executor, guard));

return Status::OK();
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
Status RawSpillerWriter::flush_task(RuntimeState* state, const MemTablePtr& mem_table) {
if (state->is_cancelled()) {
return Status::OK();
}

const auto& serde = _spiller->serde();
spill::AcquireBlockOptions opts;
opts.query_id = state->query_id();
opts.plan_node_id = options().plan_node_id;
opts.name = options().name;
ASSIGN_OR_RETURN(auto block, _spiller->block_manager()->acquire_block(opts));
COUNTER_UPDATE(_spiller->metrics().block_count, 1);

// TODO: reuse io context
SerdeContext spill_ctx;
{
TRY_CATCH_ALLOC_SCOPE_START()
// flush all pending result to spilled files
size_t num_rows_flushed = 0;
RETURN_IF_ERROR(mem_table->flush([&](const auto& chunk) {
num_rows_flushed += chunk->num_rows();
RETURN_IF_ERROR(serde->serialize(spill_ctx, chunk, block));
return Status::OK();
}));
TRACE_SPILL_LOG << "spill flush rows:" << num_rows_flushed << ",spiller:" << this;
TRY_CATCH_ALLOC_SCOPE_END();
}

// be careful close method return a not ok status
// then release the pending memory
// flush
{
SCOPED_TIMER(_spiller->metrics().write_io_timer);
RETURN_IF_ERROR(block->flush());
}
RETURN_IF_ERROR(_spiller->block_manager()->release_block(block));

{
std::lock_guard<std::mutex> l(_mutex);
_block_group.append(std::move(block));
}

return Status::OK();
}

CH SPILL

Grace hash join

write

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void GraceHashJoin::initBuckets()
{
if (!buckets.empty())
return;

const auto & settings = context->getSettingsRef();

size_t initial_num_buckets = roundUpToPowerOfTwoOrZero(std::clamp<size_t>(settings.grace_hash_join_initial_buckets, 1, settings.grace_hash_join_max_buckets));

addBuckets(initial_num_buckets);

if (buckets.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "No buckets created");

LOG_TRACE(log, "Initialize {} bucket{}", buckets.size(), buckets.size() > 1 ? "s" : "");

current_bucket = buckets.front().get();
current_bucket->startJoining();
}
1
2
3
4
5
6
7
for (size_t i = 0; i < bucket_count; ++i)
{
auto & left_file = tmp_data->createStream(left_sample_block);
auto & right_file = tmp_data->createStream(prepareRightBlock(right_sample_block));
BucketPtr new_bucket = std::make_shared<FileBucket>(current_size + i, left_file, right_file, log);
tmp_buckets.emplace_back(std::move(new_bucket));
}
1
2
3
4
5
auto tmp_file = createRegularFile(max_file_size);
std::lock_guard lock(mutex);
TemporaryFileStreamPtr & tmp_stream = streams.emplace_back(std::make_unique<TemporaryFileStream>(std::move(tmp_file), header, this));
return *tmp_stream;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
TemporaryFileOnDiskHolder TemporaryDataOnDisk::createRegularFile(size_t max_file_size)
{
DiskPtr disk;
if (max_file_size > 0)
{
auto reservation = volume->reserve(max_file_size);
if (!reservation)
throw Exception(ErrorCodes::NOT_ENOUGH_SPACE, "Not enough space on temporary disk");
disk = reservation->getDisk();
}
else
disk = volume->getDisk();

return std::make_unique<TemporaryFileOnDisk>(disk, current_metric_scope);
}

TemporaryFileOnDisk 代表磁盘上一个文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
TemporaryFileOnDisk::TemporaryFileOnDisk(const DiskPtr & disk_, const String & prefix)
: disk(disk_)
, metric_increment(CurrentMetrics::TotalTemporaryFiles)
{
if (!disk)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Disk is not specified");

if (fs::path prefix_path(prefix); prefix_path.has_parent_path())
disk->createDirectories(prefix_path.parent_path());

ProfileEvents::increment(ProfileEvents::ExternalProcessingFilesTotal);

/// A disk can be remote and shared between multiple replicas.
/// That's why we must not use Poco::TemporaryFile::tempName() here (Poco::TemporaryFile::tempName() can return the same names for different processes on different nodes).
relative_path = prefix + toString(UUIDHelpers::generateV4());
}

1
2
3
4
5
6
7
8
TemporaryFileStream::TemporaryFileStream(TemporaryFileOnDiskHolder file_, const Block & header_, TemporaryDataOnDisk * parent_)
: parent(parent_)
, header(header_)
, file(std::move(file_))
, out_writer(std::make_unique<OutputWriter>(std::make_unique<WriteBufferFromFile>(file->getAbsolutePath()), header))
{
LOG_TEST(&Poco::Logger::get("TemporaryFileStream"), "Writing to temporary file {}", file->getAbsolutePath());
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
void WriteBufferFromFileDescriptor::nextImpl()
{
if (!offset())
return;

Stopwatch watch;

size_t bytes_written = 0;
while (bytes_written != offset())
{
ProfileEvents::increment(ProfileEvents::WriteBufferFromFileDescriptorWrite);

ssize_t res = 0;
{
CurrentMetrics::Increment metric_increment{CurrentMetrics::Write};
res = ::write(fd, working_buffer.begin() + bytes_written, offset() - bytes_written);
}

if ((-1 == res || 0 == res) && errno != EINTR)
{
ProfileEvents::increment(ProfileEvents::WriteBufferFromFileDescriptorWriteFailed);

/// Don't use getFileName() here because this method can be called from destructor
String error_file_name = file_name;
if (error_file_name.empty())
error_file_name = "(fd = " + toString(fd) + ")";
throwFromErrnoWithPath("Cannot write to file " + error_file_name, error_file_name,
ErrorCodes::CANNOT_WRITE_TO_FILE_DESCRIPTOR);
}

if (res > 0)
{
bytes_written += res;
if (throttler)
throttler->add(res, ProfileEvents::LocalWriteThrottlerBytes, ProfileEvents::LocalWriteThrottlerSleepMicroseconds);
}
}

ProfileEvents::increment(ProfileEvents::DiskWriteElapsedMicroseconds, watch.elapsedMicroseconds());
ProfileEvents::increment(ProfileEvents::WriteBufferFromFileDescriptorWriteBytes, bytes_written);
}

read

DelayedJoinedBlocksTransform

这个方法中会读取起来 blocks

1
2
3
4
5
6
7
8
void DelayedJoinedBlocksTransform::work()
{
if (finished)
return;

delayed_blocks = join->getDelayedBlocks();
finished = finished || delayed_blocks == nullptr;
}

getDelayedBlocks

真正的 read 数据并且构建右表

1
2
3
4
5
6
7
8
hash_join = makeInMemoryJoin(prev_keys_num);
auto right_reader = current_bucket->startJoining();
size_t num_rows = 0; /// count rows that were written and rehashed
while (Block block = right_reader.read())
{
num_rows += block.rows();
addBlockToJoinImpl(std::move(block));
}
DelayedJoinedBlocksWorkerTransform

Aggregate

write

Aggregator::executeOnBlock/mergeOnBlock

  1. 只有two level agg 会spill
  2. 用的是 query-level memory tracker, 有个问题是当 query 的其他算子内存占用的比较高, 会导致 agg 的 spill 更容易发生(当是 two level 的时候), 基本上每个 block 都会执行一次 spill
1
2
3
4
5
6
7
8
9
10
11
/** Flush data to disk if too much RAM is consumed.
* Data can only be flushed to disk if a two-level aggregation structure is used.
*/
if (params.max_bytes_before_external_group_by
&& result.isTwoLevel()
&& current_memory_usage > static_cast<Int64>(params.max_bytes_before_external_group_by)
&& worth_convert_to_two_level)
{
size_t size = current_memory_usage + params.min_free_disk_space;
writeToTemporaryFile(result, size);
}

Aggregator::writeToTemporaryFileImpl, NUM_BUCKETS 是 256

1
2
3
4
5
6
7
for (UInt32 bucket = 0; bucket < Method::Data::NUM_BUCKETS; ++bucket)
{
Block block = convertOneBucketToBlock(data_variants, method, data_variants.aggregates_pool, false, bucket);
out.write(block);
update_max_sizes(block);
}

1
2
3
4
5
6
7
8
9
size_t TemporaryFileStream::write(const Block & block)
{
if (!out_writer)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Writing has been finished");

updateAllocAndCheck();
size_t bytes_written = out_writer->write(block);
return bytes_written;
}

NativeWriter

会传入 buffer, spill 传入的都是 WriteBufferFromFile, 一个 WriteBufferFromFileDescriptor 对应一个 fd

NativeWriter::write 只是把 block 序列化到 write buffer 中, 并不会写文件, 在 nextImpl 会调用系统的 write 写文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
static void writeData(const ISerialization & serialization, const ColumnPtr & column, WriteBuffer & ostr, UInt64 offset, UInt64 limit)
{
/** If there are columns-constants - then we materialize them.
* (Since the data type does not know how to serialize / deserialize constants.)
*/
ColumnPtr full_column = column->convertToFullColumnIfConst();

ISerialization::SerializeBinaryBulkSettings settings;
settings.getter = [&ostr](ISerialization::SubstreamPath) -> WriteBuffer * { return &ostr; };
settings.position_independent_encoding = false;
settings.low_cardinality_max_dictionary_size = 0;

ISerialization::SerializeBinaryBulkStatePtr state;
serialization.serializeBinaryBulkStatePrefix(*full_column, settings, state);
serialization.serializeBinaryBulkWithMultipleStreams(*full_column, offset, limit, settings, state);
serialization.serializeBinaryBulkStateSuffix(settings, state);
}

writeToTemporaryFile 调用完 writeToTemporaryFileImpl 之后, 会调用 auto stat = out_stream.finishWriting();, 这里面会真正触发 写文件: NativeWriter::flush()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
void WriteBufferFromFileDescriptor::nextImpl()
{
if (!offset())
return;

Stopwatch watch;

size_t bytes_written = 0;
while (bytes_written != offset())
{
ProfileEvents::increment(ProfileEvents::WriteBufferFromFileDescriptorWrite);

ssize_t res = 0;
{
CurrentMetrics::Increment metric_increment{CurrentMetrics::Write};
res = ::write(fd, working_buffer.begin() + bytes_written, offset() - bytes_written);
}

if ((-1 == res || 0 == res) && errno != EINTR)
{
ProfileEvents::increment(ProfileEvents::WriteBufferFromFileDescriptorWriteFailed);

/// Don't use getFileName() here because this method can be called from destructor
String error_file_name = file_name;
if (error_file_name.empty())
error_file_name = "(fd = " + toString(fd) + ")";
throwFromErrnoWithPath("Cannot write to file " + error_file_name, error_file_name,
ErrorCodes::CANNOT_WRITE_TO_FILE_DESCRIPTOR);
}

if (res > 0)
{
bytes_written += res;
if (throttler)
throttler->add(res, ProfileEvents::LocalWriteThrottlerBytes, ProfileEvents::LocalWriteThrottlerSleepMicroseconds);
}
}

ProfileEvents::increment(ProfileEvents::DiskWriteElapsedMicroseconds, watch.elapsedMicroseconds());
ProfileEvents::increment(ProfileEvents::WriteBufferFromFileDescriptorWriteBytes, bytes_written);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
TemporaryFileStream::Stat TemporaryFileStream::finishWriting()
{
if (isWriteFinished())
return stat;

if (out_writer)
{
out_writer->finalize();
/// The amount of written data can be changed after finalization, some buffers can be flushed
/// Need to update the stat
updateAllocAndCheck();
out_writer.reset();

/// reader will be created at the first read call, not to consume memory before it is needed
}
return stat;
}

read

AggregatingTransform::initGenerate

1
2
3
4
5
6
7
8
9
10
Pipe pipe;
{
Pipes pipes;

for (auto * tmp_stream : tmp_data.getStreams())
pipes.emplace_back(Pipe(std::make_unique<SourceFromNativeStream>(tmp_stream)));

pipe = Pipe::unitePipes(std::move(pipes));
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Block TemporaryFileStream::read()
{
if (!isWriteFinished())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Writing has been not finished");

if (isEof())
return {};

if (!in_reader)
{
in_reader = std::make_unique<InputReader>(getPath(), header, getSize());
}

Block block = in_reader->read();
if (!block)
{
/// finalize earlier to release resources, do not wait for the destructor
this->release();
}
return block;
}

ReadBufferFromFile

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
void NativeReader::readData(const ISerialization & serialization, ColumnPtr & column, ReadBuffer & istr, size_t rows, double avg_value_size_hint)
{
ISerialization::DeserializeBinaryBulkSettings settings;
settings.getter = [&](ISerialization::SubstreamPath) -> ReadBuffer * { return &istr; };
settings.avg_value_size_hint = avg_value_size_hint;
settings.position_independent_encoding = false;
settings.native_format = true;

ISerialization::DeserializeBinaryBulkStatePtr state;

serialization.deserializeBinaryBulkStatePrefix(settings, state);
serialization.deserializeBinaryBulkWithMultipleStreams(column, rows, settings, state, nullptr);

if (column->size() != rows)
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA,
"Cannot read all data in NativeReader. Rows read: {}. Rows expected: {}", column->size(), rows);
}