From ec44d35fde60d20d9ab1c6fb9ac7ee844335f1eb Mon Sep 17 00:00:00 2001 From: Josh Gao Date: Thu, 26 Mar 2020 22:02:03 -0700 Subject: [PATCH] adb: implement LZ4 compression. Add support for LZ4 compression, which compresses and decompresses far more quickly than brotli, at the cost of worse compression ratio. `adb sync -d system` speeds (in MB/s) on aosp_blueline-eng: none brotli lz4 USB 3.0 120 110 190 USB 2.0 38 75 63 Bug: https://issuetracker.google.com/150827486 Test: python3 -m unittest test_device.FileOperationsTest{Uncompressed,Brotli,LZ4} Change-Id: Ibef6ac15a76b4e5dcd02d7fb9433cbb1c02b8382 --- adb/Android.bp | 3 + adb/client/commandline.cpp | 2 + adb/client/file_sync_client.cpp | 30 +++++- adb/compression_utils.h | 152 +++++++++++++++++++++++++++++++ adb/daemon/file_sync_service.cpp | 30 +++++- adb/file_sync_protocol.h | 2 + adb/test_device.py | 4 + adb/transport.cpp | 2 + adb/transport.h | 2 + 9 files changed, 220 insertions(+), 7 deletions(-) diff --git a/adb/Android.bp b/adb/Android.bp index 12d9a1409..6386a78ed 100644 --- a/adb/Android.bp +++ b/adb/Android.bp @@ -470,6 +470,7 @@ cc_library { "libadbd_core", "libbrotli", "libdiagnose_usb", + "liblz4", ], shared_libs: [ @@ -571,6 +572,7 @@ cc_library { "libbrotli", "libcutils_sockets", "libdiagnose_usb", + "liblz4", "libmdnssd", ], @@ -605,6 +607,7 @@ cc_binary { "libadbd_services", "libasyncio", "libcap", + "liblz4", "libminijail", "libssl", ], diff --git a/adb/client/commandline.cpp b/adb/client/commandline.cpp index 9078ae97c..02f6e9c1d 100644 --- a/adb/client/commandline.cpp +++ b/adb/client/commandline.cpp @@ -1331,6 +1331,8 @@ static CompressionType parse_compression_type(const std::string& str, bool allow if (str == "brotli") { return CompressionType::Brotli; + } else if (str == "lz4") { + return CompressionType::LZ4; } error_exit("unexpected compression type %s", str.c_str()); diff --git a/adb/client/file_sync_client.cpp b/adb/client/file_sync_client.cpp index c71880c2e..75334d74a 100644 --- a/adb/client/file_sync_client.cpp +++ b/adb/client/file_sync_client.cpp @@ -237,6 +237,7 @@ class SyncConnection { have_ls_v2_ = CanUseFeature(features_, kFeatureLs2); have_sendrecv_v2_ = CanUseFeature(features_, kFeatureSendRecv2); have_sendrecv_v2_brotli_ = CanUseFeature(features_, kFeatureSendRecv2Brotli); + have_sendrecv_v2_lz4_ = CanUseFeature(features_, kFeatureSendRecv2LZ4); fd.reset(adb_connect("sync:", &error)); if (fd < 0) { Error("connect failed: %s", error.c_str()); @@ -262,12 +263,15 @@ class SyncConnection { bool HaveSendRecv2() const { return have_sendrecv_v2_; } bool HaveSendRecv2Brotli() const { return have_sendrecv_v2_brotli_; } + bool HaveSendRecv2LZ4() const { return have_sendrecv_v2_lz4_; } // Resolve a compression type which might be CompressionType::Any to a specific compression // algorithm. CompressionType ResolveCompressionType(CompressionType compression) const { if (compression == CompressionType::Any) { - if (HaveSendRecv2Brotli()) { + if (HaveSendRecv2LZ4()) { + return CompressionType::LZ4; + } else if (HaveSendRecv2Brotli()) { return CompressionType::Brotli; } return CompressionType::None; @@ -361,6 +365,10 @@ class SyncConnection { msg.send_v2_setup.flags = kSyncFlagBrotli; break; + case CompressionType::LZ4: + msg.send_v2_setup.flags = kSyncFlagLZ4; + break; + case CompressionType::Any: LOG(FATAL) << "unexpected CompressionType::Any"; } @@ -400,6 +408,10 @@ class SyncConnection { msg.recv_v2_setup.flags |= kSyncFlagBrotli; break; + case CompressionType::LZ4: + msg.recv_v2_setup.flags |= kSyncFlagLZ4; + break; + case CompressionType::Any: LOG(FATAL) << "unexpected CompressionType::Any"; } @@ -599,7 +611,7 @@ class SyncConnection { syncsendbuf sbuf; sbuf.id = ID_DATA; - std::variant encoder_storage; + std::variant encoder_storage; Encoder* encoder = nullptr; switch (compression) { case CompressionType::None: @@ -610,6 +622,10 @@ class SyncConnection { encoder = &encoder_storage.emplace(SYNC_DATA_MAX); break; + case CompressionType::LZ4: + encoder = &encoder_storage.emplace(SYNC_DATA_MAX); + break; + case CompressionType::Any: LOG(FATAL) << "unexpected CompressionType::Any"; } @@ -891,6 +907,7 @@ class SyncConnection { bool have_ls_v2_; bool have_sendrecv_v2_; bool have_sendrecv_v2_brotli_; + bool have_sendrecv_v2_lz4_; TransferLedger global_ledger_; TransferLedger current_ledger_; @@ -1093,7 +1110,7 @@ static bool sync_recv_v2(SyncConnection& sc, const char* rpath, const char* lpat uint64_t bytes_copied = 0; Block buffer(SYNC_DATA_MAX); - std::variant decoder_storage; + std::variant decoder_storage; Decoder* decoder = nullptr; std::span buffer_span(buffer.data(), buffer.size()); @@ -1106,6 +1123,10 @@ static bool sync_recv_v2(SyncConnection& sc, const char* rpath, const char* lpat decoder = &decoder_storage.emplace(buffer_span); break; + case CompressionType::LZ4: + decoder = &decoder_storage.emplace(buffer_span); + break; + case CompressionType::Any: LOG(FATAL) << "unexpected CompressionType::Any"; } @@ -1160,8 +1181,7 @@ static bool sync_recv_v2(SyncConnection& sc, const char* rpath, const char* lpat } bytes_copied += output.size(); - - sc.RecordBytesTransferred(msg.data.size); + sc.RecordBytesTransferred(output.size()); sc.ReportProgress(name != nullptr ? name : rpath, bytes_copied, expected_size); if (result == DecodeResult::NeedInput) { diff --git a/adb/compression_utils.h b/adb/compression_utils.h index f349697dd..a0c48a207 100644 --- a/adb/compression_utils.h +++ b/adb/compression_utils.h @@ -24,6 +24,7 @@ #include #include +#include #include "types.h" @@ -229,3 +230,154 @@ struct BrotliEncoder final : public Encoder { size_t output_bytes_left_; std::unique_ptr encoder_; }; + +struct LZ4Decoder final : public Decoder { + explicit LZ4Decoder(std::span output_buffer) + : Decoder(output_buffer), decoder_(nullptr, nullptr) { + LZ4F_dctx* dctx; + if (LZ4F_createDecompressionContext(&dctx, LZ4F_VERSION) != 0) { + LOG(FATAL) << "failed to initialize LZ4 decompression context"; + } + decoder_ = std::unique_ptr( + dctx, LZ4F_freeDecompressionContext); + } + + DecodeResult Decode(std::span* output) final { + size_t available_in = input_buffer_.front_size(); + const char* next_in = input_buffer_.front_data(); + + size_t available_out = output_buffer_.size(); + char* next_out = output_buffer_.data(); + + size_t rc = LZ4F_decompress(decoder_.get(), next_out, &available_out, next_in, + &available_in, nullptr); + if (LZ4F_isError(rc)) { + LOG(ERROR) << "LZ4F_decompress failed: " << LZ4F_getErrorName(rc); + return DecodeResult::Error; + } + + input_buffer_.drop_front(available_in); + + if (rc == 0) { + if (!input_buffer_.empty()) { + LOG(ERROR) << "LZ4 stream hit end before reading all data"; + return DecodeResult::Error; + } + lz4_done_ = true; + } + + *output = std::span(output_buffer_.data(), available_out); + + if (finished_) { + return input_buffer_.empty() && lz4_done_ ? DecodeResult::Done + : DecodeResult::MoreOutput; + } + + return DecodeResult::NeedInput; + } + + private: + bool lz4_done_ = false; + std::unique_ptr decoder_; +}; + +struct LZ4Encoder final : public Encoder { + explicit LZ4Encoder(size_t output_block_size) + : Encoder(output_block_size), encoder_(nullptr, nullptr) { + LZ4F_cctx* cctx; + if (LZ4F_createCompressionContext(&cctx, LZ4F_VERSION) != 0) { + LOG(FATAL) << "failed to initialize LZ4 compression context"; + } + encoder_ = std::unique_ptr( + cctx, LZ4F_freeCompressionContext); + Block header(LZ4F_HEADER_SIZE_MAX); + size_t rc = LZ4F_compressBegin(encoder_.get(), header.data(), header.size(), nullptr); + if (LZ4F_isError(rc)) { + LOG(FATAL) << "LZ4F_compressBegin failed: %s", LZ4F_getErrorName(rc); + } + header.resize(rc); + output_buffer_.append(std::move(header)); + } + + // As an optimization, only emit a block if we have an entire output block ready, or we're done. + bool OutputReady() const { + return output_buffer_.size() >= output_block_size_ || lz4_finalized_; + } + + // TODO: Switch the output type to IOVector to remove a copy? + EncodeResult Encode(Block* output) final { + size_t available_in = input_buffer_.front_size(); + const char* next_in = input_buffer_.front_data(); + + // LZ4 makes no guarantees about being able to recover from trying to compress with an + // insufficiently large output buffer. LZ4F_compressBound tells us how much buffer we + // need to compress a given number of bytes, but the smallest value seems to be bigger + // than SYNC_DATA_MAX, so we need to buffer ourselves. + + // Input size chosen to be a local maximum for LZ4F_compressBound (i.e. the block size). + constexpr size_t max_input_size = 65536; + const size_t encode_block_size = LZ4F_compressBound(max_input_size, nullptr); + + if (available_in != 0) { + if (lz4_finalized_) { + LOG(ERROR) << "LZ4Encoder received data after Finish?"; + return EncodeResult::Error; + } + + available_in = std::min(available_in, max_input_size); + + Block encode_block(encode_block_size); + size_t available_out = encode_block.capacity(); + char* next_out = encode_block.data(); + + size_t rc = LZ4F_compressUpdate(encoder_.get(), next_out, available_out, next_in, + available_in, nullptr); + if (LZ4F_isError(rc)) { + LOG(ERROR) << "LZ4F_compressUpdate failed: " << LZ4F_getErrorName(rc); + return EncodeResult::Error; + } + + input_buffer_.drop_front(available_in); + + available_out -= rc; + next_out += rc; + + encode_block.resize(encode_block_size - available_out); + output_buffer_.append(std::move(encode_block)); + } + + if (finished_ && !lz4_finalized_) { + lz4_finalized_ = true; + + Block final_block(encode_block_size + 4); + size_t rc = LZ4F_compressEnd(encoder_.get(), final_block.data(), final_block.size(), + nullptr); + if (LZ4F_isError(rc)) { + LOG(ERROR) << "LZ4F_compressEnd failed: " << LZ4F_getErrorName(rc); + return EncodeResult::Error; + } + + final_block.resize(rc); + output_buffer_.append(std::move(final_block)); + } + + if (OutputReady()) { + size_t len = std::min(output_block_size_, output_buffer_.size()); + *output = output_buffer_.take_front(len).coalesce(); + } else { + output->clear(); + } + + if (lz4_finalized_ && output_buffer_.empty()) { + return EncodeResult::Done; + } else if (OutputReady()) { + return EncodeResult::MoreOutput; + } + return EncodeResult::NeedInput; + } + + private: + bool lz4_finalized_ = false; + std::unique_ptr encoder_; + IOVector output_buffer_; +}; diff --git a/adb/daemon/file_sync_service.cpp b/adb/daemon/file_sync_service.cpp index 3138ab497..3436e32c0 100644 --- a/adb/daemon/file_sync_service.cpp +++ b/adb/daemon/file_sync_service.cpp @@ -272,7 +272,7 @@ static bool handle_send_file_data(borrowed_fd s, unique_fd fd, uint32_t* timesta syncmsg msg; Block buffer(SYNC_DATA_MAX); std::span buffer_span(buffer.data(), buffer.size()); - std::variant decoder_storage; + std::variant decoder_storage; Decoder* decoder = nullptr; switch (compression) { @@ -284,6 +284,10 @@ static bool handle_send_file_data(borrowed_fd s, unique_fd fd, uint32_t* timesta decoder = &decoder_storage.emplace(buffer_span); break; + case CompressionType::LZ4: + decoder = &decoder_storage.emplace(buffer_span); + break; + case CompressionType::Any: LOG(FATAL) << "unexpected CompressionType::Any"; } @@ -569,6 +573,15 @@ static bool do_send_v2(int s, const std::string& path, std::vector& buffer } compression = CompressionType::Brotli; } + if (msg.send_v2_setup.flags & kSyncFlagLZ4) { + msg.send_v2_setup.flags &= ~kSyncFlagLZ4; + if (compression) { + SendSyncFail(s, android::base::StringPrintf("multiple compression flags received: %d", + orig_flags)); + return false; + } + compression = CompressionType::LZ4; + } if (msg.send_v2_setup.flags) { SendSyncFail(s, android::base::StringPrintf("unknown flags: %d", msg.send_v2_setup.flags)); @@ -598,7 +611,7 @@ static bool recv_impl(borrowed_fd s, const char* path, CompressionType compressi syncmsg msg; msg.data.id = ID_DATA; - std::variant encoder_storage; + std::variant encoder_storage; Encoder* encoder; switch (compression) { @@ -610,6 +623,10 @@ static bool recv_impl(borrowed_fd s, const char* path, CompressionType compressi encoder = &encoder_storage.emplace(SYNC_DATA_MAX); break; + case CompressionType::LZ4: + encoder = &encoder_storage.emplace(SYNC_DATA_MAX); + break; + case CompressionType::Any: LOG(FATAL) << "unexpected CompressionType::Any"; } @@ -688,6 +705,15 @@ static bool do_recv_v2(borrowed_fd s, const char* path, std::vector& buffe } compression = CompressionType::Brotli; } + if (msg.recv_v2_setup.flags & kSyncFlagLZ4) { + msg.recv_v2_setup.flags &= ~kSyncFlagLZ4; + if (compression) { + SendSyncFail(s, android::base::StringPrintf("multiple compression flags received: %d", + orig_flags)); + return false; + } + compression = CompressionType::LZ4; + } if (msg.recv_v2_setup.flags) { SendSyncFail(s, android::base::StringPrintf("unknown flags: %d", msg.recv_v2_setup.flags)); diff --git a/adb/file_sync_protocol.h b/adb/file_sync_protocol.h index 70425f7aa..90bd76393 100644 --- a/adb/file_sync_protocol.h +++ b/adb/file_sync_protocol.h @@ -92,12 +92,14 @@ struct __attribute__((packed)) sync_dent_v2 { enum SyncFlag : uint32_t { kSyncFlagNone = 0, kSyncFlagBrotli = 1, + kSyncFlagLZ4 = 2, }; enum class CompressionType { None, Any, Brotli, + LZ4, }; // send_v1 sent the path in a buffer, followed by a comma and the mode as a string. diff --git a/adb/test_device.py b/adb/test_device.py index 496a0ffff..3be7c9aa4 100755 --- a/adb/test_device.py +++ b/adb/test_device.py @@ -1302,6 +1302,10 @@ class FileOperationsTestBrotli(FileOperationsTest.Base): compression = "brotli" +class FileOperationsTestLZ4(FileOperationsTest.Base): + compression = "lz4" + + class DeviceOfflineTest(DeviceTest): def _get_device_state(self, serialno): output = subprocess.check_output(self.device.adb_cmd + ['devices']) diff --git a/adb/transport.cpp b/adb/transport.cpp index e06dbe327..cef3850fa 100644 --- a/adb/transport.cpp +++ b/adb/transport.cpp @@ -84,6 +84,7 @@ const char* const kFeatureRemountShell = "remount_shell"; const char* const kFeatureTrackApp = "track_app"; const char* const kFeatureSendRecv2 = "sendrecv_v2"; const char* const kFeatureSendRecv2Brotli = "sendrecv_v2_brotli"; +const char* const kFeatureSendRecv2LZ4 = "sendrecv_v2_lz4"; namespace { @@ -1183,6 +1184,7 @@ const FeatureSet& supported_features() { kFeatureTrackApp, kFeatureSendRecv2, kFeatureSendRecv2Brotli, + kFeatureSendRecv2LZ4, // Increment ADB_SERVER_VERSION when adding a feature that adbd needs // to know about. Otherwise, the client can be stuck running an old // version of the server even after upgrading their copy of adb. diff --git a/adb/transport.h b/adb/transport.h index b1984db30..12803b5c3 100644 --- a/adb/transport.h +++ b/adb/transport.h @@ -88,6 +88,8 @@ extern const char* const kFeatureTrackApp; extern const char* const kFeatureSendRecv2; // adbd supports brotli for send/recv v2. extern const char* const kFeatureSendRecv2Brotli; +// adbd supports LZ4 for send/recv v2. +extern const char* const kFeatureSendRecv2LZ4; TransportId NextTransportId();