Merge "libsnapshot: stride compression" into main
This commit is contained in:
commit
4edb9c0088
1 changed files with 27 additions and 21 deletions
|
@ -603,41 +603,47 @@ std::vector<CowWriterV3::CompressedBuffer> CowWriterV3::ProcessBlocksWithCompres
|
|||
std::vector<CowWriterV3::CompressedBuffer> CowWriterV3::ProcessBlocksWithThreadedCompression(
|
||||
const size_t num_blocks, const void* data, CowOperationType type) {
|
||||
const size_t num_threads = num_compress_threads_;
|
||||
const size_t blocks_per_thread = DivRoundUp(num_blocks, num_threads);
|
||||
const uint8_t* iter = reinterpret_cast<const uint8_t*>(data);
|
||||
|
||||
// We will alternate which thread to send compress work to. E.g. alternate between T1 and T2
|
||||
// until all blocks are processed
|
||||
std::vector<CompressedBuffer> compressed_vec;
|
||||
// Submit the blocks per thread. The retrieval of
|
||||
// compressed buffers has to be done in the same order.
|
||||
// We should not poll for completed buffers in a different order as the
|
||||
// buffers are tightly coupled with block ordering.
|
||||
for (size_t i = 0; i < num_threads; i++) {
|
||||
CompressWorker* worker = compress_threads_[i].get();
|
||||
auto blocks_in_batch = std::min(num_blocks - i * blocks_per_thread, blocks_per_thread);
|
||||
// Enqueue the blocks to be compressed for each thread.
|
||||
while (blocks_in_batch) {
|
||||
CompressedBuffer buffer;
|
||||
int iteration = 0;
|
||||
int blocks_to_compress = static_cast<int>(num_blocks);
|
||||
while (blocks_to_compress) {
|
||||
CompressedBuffer buffer;
|
||||
CompressWorker* worker = compress_threads_[iteration % num_threads].get();
|
||||
|
||||
const size_t compression_factor = GetCompressionFactor(blocks_in_batch, type);
|
||||
size_t num_blocks = compression_factor / header_.block_size;
|
||||
const size_t compression_factor = GetCompressionFactor(blocks_to_compress, type);
|
||||
size_t num_blocks = compression_factor / header_.block_size;
|
||||
|
||||
buffer.compression_factor = compression_factor;
|
||||
worker->EnqueueCompressBlocks(iter, compression_factor, 1);
|
||||
compressed_vec.push_back(std::move(buffer));
|
||||
blocks_in_batch -= num_blocks;
|
||||
iter += compression_factor;
|
||||
}
|
||||
worker->EnqueueCompressBlocks(iter, compression_factor, 1);
|
||||
buffer.compression_factor = compression_factor;
|
||||
compressed_vec.push_back(std::move(buffer));
|
||||
|
||||
iteration++;
|
||||
iter += compression_factor;
|
||||
blocks_to_compress -= num_blocks;
|
||||
}
|
||||
|
||||
// Fetch compressed buffers from the threads
|
||||
std::vector<std::vector<uint8_t>> compressed_buf;
|
||||
std::vector<std::vector<std::vector<uint8_t>>> worker_buffers(num_threads);
|
||||
compressed_buf.clear();
|
||||
for (size_t i = 0; i < num_threads; i++) {
|
||||
CompressWorker* worker = compress_threads_[i].get();
|
||||
if (!worker->GetCompressedBuffers(&compressed_buf)) {
|
||||
if (!worker->GetCompressedBuffers(&worker_buffers[i])) {
|
||||
return {};
|
||||
}
|
||||
}
|
||||
// compressed_vec | CB 1 | CB 2 | CB 3 | CB 4 | <-compressed buffers
|
||||
// t1 t2 t1 t2 <- processed by these threads
|
||||
// Ordering is important here. We need to retrieve the compressed data in the same order we
|
||||
// processed it and assume that that we submit data beginning with the first thread and then
|
||||
// round robin the consecutive data calls. We need to Fetch compressed buffers from the threads
|
||||
// via the same ordering
|
||||
for (size_t i = 0; i < compressed_vec.size(); i++) {
|
||||
compressed_buf.emplace_back(worker_buffers[i % num_threads][i / num_threads]);
|
||||
}
|
||||
|
||||
if (compressed_vec.size() != compressed_buf.size()) {
|
||||
LOG(ERROR) << "Compressed buffer size: " << compressed_buf.size()
|
||||
|
|
Loading…
Reference in a new issue