adb: implement LZ4 compression. am: ec44d35fde

Change-Id: Ie0ab6283ea843884a56bf0f0453036ae9f2e788c
This commit is contained in:
Josh Gao 2020-04-07 21:40:15 +00:00 committed by Automerger Merge Worker
commit e8d1281a48
9 changed files with 220 additions and 7 deletions

View file

@ -470,6 +470,7 @@ cc_library {
"libadbd_core",
"libbrotli",
"libdiagnose_usb",
"liblz4",
],
shared_libs: [
@ -571,6 +572,7 @@ cc_library {
"libbrotli",
"libcutils_sockets",
"libdiagnose_usb",
"liblz4",
"libmdnssd",
],
@ -605,6 +607,7 @@ cc_binary {
"libadbd_services",
"libasyncio",
"libcap",
"liblz4",
"libminijail",
"libssl",
],

View file

@ -1331,6 +1331,8 @@ static CompressionType parse_compression_type(const std::string& str, bool allow
if (str == "brotli") {
return CompressionType::Brotli;
} else if (str == "lz4") {
return CompressionType::LZ4;
}
error_exit("unexpected compression type %s", str.c_str());

View file

@ -237,6 +237,7 @@ class SyncConnection {
have_ls_v2_ = CanUseFeature(features_, kFeatureLs2);
have_sendrecv_v2_ = CanUseFeature(features_, kFeatureSendRecv2);
have_sendrecv_v2_brotli_ = CanUseFeature(features_, kFeatureSendRecv2Brotli);
have_sendrecv_v2_lz4_ = CanUseFeature(features_, kFeatureSendRecv2LZ4);
fd.reset(adb_connect("sync:", &error));
if (fd < 0) {
Error("connect failed: %s", error.c_str());
@ -262,12 +263,15 @@ class SyncConnection {
bool HaveSendRecv2() const { return have_sendrecv_v2_; }
bool HaveSendRecv2Brotli() const { return have_sendrecv_v2_brotli_; }
bool HaveSendRecv2LZ4() const { return have_sendrecv_v2_lz4_; }
// Resolve a compression type which might be CompressionType::Any to a specific compression
// algorithm.
CompressionType ResolveCompressionType(CompressionType compression) const {
if (compression == CompressionType::Any) {
if (HaveSendRecv2Brotli()) {
if (HaveSendRecv2LZ4()) {
return CompressionType::LZ4;
} else if (HaveSendRecv2Brotli()) {
return CompressionType::Brotli;
}
return CompressionType::None;
@ -361,6 +365,10 @@ class SyncConnection {
msg.send_v2_setup.flags = kSyncFlagBrotli;
break;
case CompressionType::LZ4:
msg.send_v2_setup.flags = kSyncFlagLZ4;
break;
case CompressionType::Any:
LOG(FATAL) << "unexpected CompressionType::Any";
}
@ -400,6 +408,10 @@ class SyncConnection {
msg.recv_v2_setup.flags |= kSyncFlagBrotli;
break;
case CompressionType::LZ4:
msg.recv_v2_setup.flags |= kSyncFlagLZ4;
break;
case CompressionType::Any:
LOG(FATAL) << "unexpected CompressionType::Any";
}
@ -599,7 +611,7 @@ class SyncConnection {
syncsendbuf sbuf;
sbuf.id = ID_DATA;
std::variant<std::monostate, NullEncoder, BrotliEncoder> encoder_storage;
std::variant<std::monostate, NullEncoder, BrotliEncoder, LZ4Encoder> encoder_storage;
Encoder* encoder = nullptr;
switch (compression) {
case CompressionType::None:
@ -610,6 +622,10 @@ class SyncConnection {
encoder = &encoder_storage.emplace<BrotliEncoder>(SYNC_DATA_MAX);
break;
case CompressionType::LZ4:
encoder = &encoder_storage.emplace<LZ4Encoder>(SYNC_DATA_MAX);
break;
case CompressionType::Any:
LOG(FATAL) << "unexpected CompressionType::Any";
}
@ -891,6 +907,7 @@ class SyncConnection {
bool have_ls_v2_;
bool have_sendrecv_v2_;
bool have_sendrecv_v2_brotli_;
bool have_sendrecv_v2_lz4_;
TransferLedger global_ledger_;
TransferLedger current_ledger_;
@ -1093,7 +1110,7 @@ static bool sync_recv_v2(SyncConnection& sc, const char* rpath, const char* lpat
uint64_t bytes_copied = 0;
Block buffer(SYNC_DATA_MAX);
std::variant<std::monostate, NullDecoder, BrotliDecoder> decoder_storage;
std::variant<std::monostate, NullDecoder, BrotliDecoder, LZ4Decoder> decoder_storage;
Decoder* decoder = nullptr;
std::span buffer_span(buffer.data(), buffer.size());
@ -1106,6 +1123,10 @@ static bool sync_recv_v2(SyncConnection& sc, const char* rpath, const char* lpat
decoder = &decoder_storage.emplace<BrotliDecoder>(buffer_span);
break;
case CompressionType::LZ4:
decoder = &decoder_storage.emplace<LZ4Decoder>(buffer_span);
break;
case CompressionType::Any:
LOG(FATAL) << "unexpected CompressionType::Any";
}
@ -1160,8 +1181,7 @@ static bool sync_recv_v2(SyncConnection& sc, const char* rpath, const char* lpat
}
bytes_copied += output.size();
sc.RecordBytesTransferred(msg.data.size);
sc.RecordBytesTransferred(output.size());
sc.ReportProgress(name != nullptr ? name : rpath, bytes_copied, expected_size);
if (result == DecodeResult::NeedInput) {

View file

@ -24,6 +24,7 @@
#include <brotli/decode.h>
#include <brotli/encode.h>
#include <lz4frame.h>
#include "types.h"
@ -229,3 +230,154 @@ struct BrotliEncoder final : public Encoder {
size_t output_bytes_left_;
std::unique_ptr<BrotliEncoderState, void (*)(BrotliEncoderState*)> encoder_;
};
struct LZ4Decoder final : public Decoder {
explicit LZ4Decoder(std::span<char> output_buffer)
: Decoder(output_buffer), decoder_(nullptr, nullptr) {
LZ4F_dctx* dctx;
if (LZ4F_createDecompressionContext(&dctx, LZ4F_VERSION) != 0) {
LOG(FATAL) << "failed to initialize LZ4 decompression context";
}
decoder_ = std::unique_ptr<LZ4F_dctx, decltype(&LZ4F_freeDecompressionContext)>(
dctx, LZ4F_freeDecompressionContext);
}
DecodeResult Decode(std::span<char>* output) final {
size_t available_in = input_buffer_.front_size();
const char* next_in = input_buffer_.front_data();
size_t available_out = output_buffer_.size();
char* next_out = output_buffer_.data();
size_t rc = LZ4F_decompress(decoder_.get(), next_out, &available_out, next_in,
&available_in, nullptr);
if (LZ4F_isError(rc)) {
LOG(ERROR) << "LZ4F_decompress failed: " << LZ4F_getErrorName(rc);
return DecodeResult::Error;
}
input_buffer_.drop_front(available_in);
if (rc == 0) {
if (!input_buffer_.empty()) {
LOG(ERROR) << "LZ4 stream hit end before reading all data";
return DecodeResult::Error;
}
lz4_done_ = true;
}
*output = std::span<char>(output_buffer_.data(), available_out);
if (finished_) {
return input_buffer_.empty() && lz4_done_ ? DecodeResult::Done
: DecodeResult::MoreOutput;
}
return DecodeResult::NeedInput;
}
private:
bool lz4_done_ = false;
std::unique_ptr<LZ4F_dctx, LZ4F_errorCode_t (*)(LZ4F_dctx*)> decoder_;
};
struct LZ4Encoder final : public Encoder {
explicit LZ4Encoder(size_t output_block_size)
: Encoder(output_block_size), encoder_(nullptr, nullptr) {
LZ4F_cctx* cctx;
if (LZ4F_createCompressionContext(&cctx, LZ4F_VERSION) != 0) {
LOG(FATAL) << "failed to initialize LZ4 compression context";
}
encoder_ = std::unique_ptr<LZ4F_cctx, decltype(&LZ4F_freeCompressionContext)>(
cctx, LZ4F_freeCompressionContext);
Block header(LZ4F_HEADER_SIZE_MAX);
size_t rc = LZ4F_compressBegin(encoder_.get(), header.data(), header.size(), nullptr);
if (LZ4F_isError(rc)) {
LOG(FATAL) << "LZ4F_compressBegin failed: %s", LZ4F_getErrorName(rc);
}
header.resize(rc);
output_buffer_.append(std::move(header));
}
// As an optimization, only emit a block if we have an entire output block ready, or we're done.
bool OutputReady() const {
return output_buffer_.size() >= output_block_size_ || lz4_finalized_;
}
// TODO: Switch the output type to IOVector to remove a copy?
EncodeResult Encode(Block* output) final {
size_t available_in = input_buffer_.front_size();
const char* next_in = input_buffer_.front_data();
// LZ4 makes no guarantees about being able to recover from trying to compress with an
// insufficiently large output buffer. LZ4F_compressBound tells us how much buffer we
// need to compress a given number of bytes, but the smallest value seems to be bigger
// than SYNC_DATA_MAX, so we need to buffer ourselves.
// Input size chosen to be a local maximum for LZ4F_compressBound (i.e. the block size).
constexpr size_t max_input_size = 65536;
const size_t encode_block_size = LZ4F_compressBound(max_input_size, nullptr);
if (available_in != 0) {
if (lz4_finalized_) {
LOG(ERROR) << "LZ4Encoder received data after Finish?";
return EncodeResult::Error;
}
available_in = std::min(available_in, max_input_size);
Block encode_block(encode_block_size);
size_t available_out = encode_block.capacity();
char* next_out = encode_block.data();
size_t rc = LZ4F_compressUpdate(encoder_.get(), next_out, available_out, next_in,
available_in, nullptr);
if (LZ4F_isError(rc)) {
LOG(ERROR) << "LZ4F_compressUpdate failed: " << LZ4F_getErrorName(rc);
return EncodeResult::Error;
}
input_buffer_.drop_front(available_in);
available_out -= rc;
next_out += rc;
encode_block.resize(encode_block_size - available_out);
output_buffer_.append(std::move(encode_block));
}
if (finished_ && !lz4_finalized_) {
lz4_finalized_ = true;
Block final_block(encode_block_size + 4);
size_t rc = LZ4F_compressEnd(encoder_.get(), final_block.data(), final_block.size(),
nullptr);
if (LZ4F_isError(rc)) {
LOG(ERROR) << "LZ4F_compressEnd failed: " << LZ4F_getErrorName(rc);
return EncodeResult::Error;
}
final_block.resize(rc);
output_buffer_.append(std::move(final_block));
}
if (OutputReady()) {
size_t len = std::min(output_block_size_, output_buffer_.size());
*output = output_buffer_.take_front(len).coalesce();
} else {
output->clear();
}
if (lz4_finalized_ && output_buffer_.empty()) {
return EncodeResult::Done;
} else if (OutputReady()) {
return EncodeResult::MoreOutput;
}
return EncodeResult::NeedInput;
}
private:
bool lz4_finalized_ = false;
std::unique_ptr<LZ4F_cctx, LZ4F_errorCode_t (*)(LZ4F_cctx*)> encoder_;
IOVector output_buffer_;
};

View file

@ -272,7 +272,7 @@ static bool handle_send_file_data(borrowed_fd s, unique_fd fd, uint32_t* timesta
syncmsg msg;
Block buffer(SYNC_DATA_MAX);
std::span<char> buffer_span(buffer.data(), buffer.size());
std::variant<std::monostate, NullDecoder, BrotliDecoder> decoder_storage;
std::variant<std::monostate, NullDecoder, BrotliDecoder, LZ4Decoder> decoder_storage;
Decoder* decoder = nullptr;
switch (compression) {
@ -284,6 +284,10 @@ static bool handle_send_file_data(borrowed_fd s, unique_fd fd, uint32_t* timesta
decoder = &decoder_storage.emplace<BrotliDecoder>(buffer_span);
break;
case CompressionType::LZ4:
decoder = &decoder_storage.emplace<LZ4Decoder>(buffer_span);
break;
case CompressionType::Any:
LOG(FATAL) << "unexpected CompressionType::Any";
}
@ -569,6 +573,15 @@ static bool do_send_v2(int s, const std::string& path, std::vector<char>& buffer
}
compression = CompressionType::Brotli;
}
if (msg.send_v2_setup.flags & kSyncFlagLZ4) {
msg.send_v2_setup.flags &= ~kSyncFlagLZ4;
if (compression) {
SendSyncFail(s, android::base::StringPrintf("multiple compression flags received: %d",
orig_flags));
return false;
}
compression = CompressionType::LZ4;
}
if (msg.send_v2_setup.flags) {
SendSyncFail(s, android::base::StringPrintf("unknown flags: %d", msg.send_v2_setup.flags));
@ -598,7 +611,7 @@ static bool recv_impl(borrowed_fd s, const char* path, CompressionType compressi
syncmsg msg;
msg.data.id = ID_DATA;
std::variant<std::monostate, NullEncoder, BrotliEncoder> encoder_storage;
std::variant<std::monostate, NullEncoder, BrotliEncoder, LZ4Encoder> encoder_storage;
Encoder* encoder;
switch (compression) {
@ -610,6 +623,10 @@ static bool recv_impl(borrowed_fd s, const char* path, CompressionType compressi
encoder = &encoder_storage.emplace<BrotliEncoder>(SYNC_DATA_MAX);
break;
case CompressionType::LZ4:
encoder = &encoder_storage.emplace<LZ4Encoder>(SYNC_DATA_MAX);
break;
case CompressionType::Any:
LOG(FATAL) << "unexpected CompressionType::Any";
}
@ -688,6 +705,15 @@ static bool do_recv_v2(borrowed_fd s, const char* path, std::vector<char>& buffe
}
compression = CompressionType::Brotli;
}
if (msg.recv_v2_setup.flags & kSyncFlagLZ4) {
msg.recv_v2_setup.flags &= ~kSyncFlagLZ4;
if (compression) {
SendSyncFail(s, android::base::StringPrintf("multiple compression flags received: %d",
orig_flags));
return false;
}
compression = CompressionType::LZ4;
}
if (msg.recv_v2_setup.flags) {
SendSyncFail(s, android::base::StringPrintf("unknown flags: %d", msg.recv_v2_setup.flags));

View file

@ -92,12 +92,14 @@ struct __attribute__((packed)) sync_dent_v2 {
enum SyncFlag : uint32_t {
kSyncFlagNone = 0,
kSyncFlagBrotli = 1,
kSyncFlagLZ4 = 2,
};
enum class CompressionType {
None,
Any,
Brotli,
LZ4,
};
// send_v1 sent the path in a buffer, followed by a comma and the mode as a string.

View file

@ -1302,6 +1302,10 @@ class FileOperationsTestBrotli(FileOperationsTest.Base):
compression = "brotli"
class FileOperationsTestLZ4(FileOperationsTest.Base):
compression = "lz4"
class DeviceOfflineTest(DeviceTest):
def _get_device_state(self, serialno):
output = subprocess.check_output(self.device.adb_cmd + ['devices'])

View file

@ -84,6 +84,7 @@ const char* const kFeatureRemountShell = "remount_shell";
const char* const kFeatureTrackApp = "track_app";
const char* const kFeatureSendRecv2 = "sendrecv_v2";
const char* const kFeatureSendRecv2Brotli = "sendrecv_v2_brotli";
const char* const kFeatureSendRecv2LZ4 = "sendrecv_v2_lz4";
namespace {
@ -1183,6 +1184,7 @@ const FeatureSet& supported_features() {
kFeatureTrackApp,
kFeatureSendRecv2,
kFeatureSendRecv2Brotli,
kFeatureSendRecv2LZ4,
// Increment ADB_SERVER_VERSION when adding a feature that adbd needs
// to know about. Otherwise, the client can be stuck running an old
// version of the server even after upgrading their copy of adb.

View file

@ -88,6 +88,8 @@ extern const char* const kFeatureTrackApp;
extern const char* const kFeatureSendRecv2;
// adbd supports brotli for send/recv v2.
extern const char* const kFeatureSendRecv2Brotli;
// adbd supports LZ4 for send/recv v2.
extern const char* const kFeatureSendRecv2LZ4;
TransportId NextTransportId();