From 96ff54bf347378025d1e55bf0e6364835005d585 Mon Sep 17 00:00:00 2001 From: Alex Buynytskyy Date: Thu, 13 Feb 2020 06:52:04 -0800 Subject: [PATCH] Incremental installations in adb, client/host side. Test: adb install --incremental Change-Id: I74e3eaf9718a16272bc533bc8298dfcf81120caa --- adb/Android.bp | 3 + adb/adb_trace.cpp | 32 +- adb/adb_trace.h | 9 +- adb/client/adb_install.cpp | 104 +++++- adb/client/commandline.cpp | 13 + adb/client/fastdeploy.cpp | 22 +- adb/client/incremental.cpp | 213 ++++++++++++ adb/client/incremental.h | 30 ++ adb/client/incremental_server.cpp | 545 ++++++++++++++++++++++++++++++ adb/client/incremental_server.h | 26 ++ adb/sysdeps.h | 65 +++- adb/sysdeps_unix.cpp | 34 ++ adb/sysdeps_win32.cpp | 60 ++++ 13 files changed, 1108 insertions(+), 48 deletions(-) create mode 100644 adb/client/incremental.cpp create mode 100644 adb/client/incremental.h create mode 100644 adb/client/incremental_server.cpp create mode 100644 adb/client/incremental_server.h diff --git a/adb/Android.bp b/adb/Android.bp index c71138af1..32581a255 100644 --- a/adb/Android.bp +++ b/adb/Android.bp @@ -341,6 +341,8 @@ cc_binary_host { "client/line_printer.cpp", "client/fastdeploy.cpp", "client/fastdeploycallbacks.cpp", + "client/incremental.cpp", + "client/incremental_server.cpp", "shell_service_protocol.cpp", ], @@ -360,6 +362,7 @@ cc_binary_host { "libfastdeploy_host", "libdiagnose_usb", "liblog", + "liblz4", "libmdnssd", "libprotobuf-cpp-lite", "libusb", diff --git a/adb/adb_trace.cpp b/adb/adb_trace.cpp index 80f146c3f..cea24fe2f 100644 --- a/adb/adb_trace.cpp +++ b/adb/adb_trace.cpp @@ -118,22 +118,22 @@ static void setup_trace_mask() { return; } - std::unordered_map trace_flags = { - {"1", -1}, - {"all", -1}, - {"adb", ADB}, - {"sockets", SOCKETS}, - {"packets", PACKETS}, - {"rwx", RWX}, - {"usb", USB}, - {"sync", SYNC}, - {"sysdeps", SYSDEPS}, - {"transport", TRANSPORT}, - {"jdwp", JDWP}, - {"services", SERVICES}, - {"auth", AUTH}, - {"fdevent", FDEVENT}, - {"shell", SHELL}}; + std::unordered_map trace_flags = {{"1", -1}, + {"all", -1}, + {"adb", ADB}, + {"sockets", SOCKETS}, + {"packets", PACKETS}, + {"rwx", RWX}, + {"usb", USB}, + {"sync", SYNC}, + {"sysdeps", SYSDEPS}, + {"transport", TRANSPORT}, + {"jdwp", JDWP}, + {"services", SERVICES}, + {"auth", AUTH}, + {"fdevent", FDEVENT}, + {"shell", SHELL}, + {"incremental", INCREMENTAL}}; std::vector elements = android::base::Split(trace_setting, " "); for (const auto& elem : elements) { diff --git a/adb/adb_trace.h b/adb/adb_trace.h index 1d2c8c704..ed4be88a6 100644 --- a/adb/adb_trace.h +++ b/adb/adb_trace.h @@ -25,19 +25,20 @@ * the adb_trace_init() function implemented in adb_trace.cpp. */ enum AdbTrace { - ADB = 0, /* 0x001 */ + ADB = 0, /* 0x001 */ SOCKETS, PACKETS, TRANSPORT, - RWX, /* 0x010 */ + RWX, /* 0x010 */ USB, SYNC, SYSDEPS, - JDWP, /* 0x100 */ + JDWP, /* 0x100 */ SERVICES, AUTH, FDEVENT, - SHELL + SHELL, + INCREMENTAL, }; #define VLOG_IS_ON(TAG) \ diff --git a/adb/client/adb_install.cpp b/adb/client/adb_install.cpp index 2bcd0a682..982a96b74 100644 --- a/adb/client/adb_install.cpp +++ b/adb/client/adb_install.cpp @@ -36,8 +36,10 @@ #include "client/file_sync_client.h" #include "commandline.h" #include "fastdeploy.h" +#include "incremental.h" static constexpr int kFastDeployMinApi = 24; +static constexpr int kIncrementalMinApi = 29; namespace { @@ -45,8 +47,8 @@ enum InstallMode { INSTALL_DEFAULT, INSTALL_PUSH, INSTALL_STREAM, + INSTALL_INCREMENTAL, }; - } static bool can_use_feature(const char* feature) { @@ -70,6 +72,10 @@ static bool is_apex_supported() { return can_use_feature(kFeatureApex); } +static bool is_abb_exec_supported() { + return can_use_feature(kFeatureAbbExec); +} + static int pm_command(int argc, const char** argv) { std::string cmd = "pm"; @@ -193,14 +199,14 @@ static int install_app_streamed(int argc, const char** argv, bool use_fastdeploy posix_fadvise(local_fd.get(), 0, 0, POSIX_FADV_SEQUENTIAL | POSIX_FADV_NOREUSE); #endif - const bool use_abb = can_use_feature(kFeatureAbbExec); + const bool use_abb_exec = is_abb_exec_supported(); std::string error; - std::vector cmd_args = {use_abb ? "package" : "exec:cmd package"}; + std::vector cmd_args = {use_abb_exec ? "package" : "exec:cmd package"}; cmd_args.reserve(argc + 3); // don't copy the APK name, but, copy the rest of the arguments as-is while (argc-- > 1) { - if (use_abb) { + if (use_abb_exec) { cmd_args.push_back(*argv++); } else { cmd_args.push_back(escape_arg(*argv++)); @@ -217,7 +223,7 @@ static int install_app_streamed(int argc, const char** argv, bool use_fastdeploy } unique_fd remote_fd; - if (use_abb) { + if (use_abb_exec) { remote_fd = send_abb_exec_command(cmd_args, &error); } else { remote_fd.reset(adb_connect(android::base::Join(cmd_args, " "), &error)); @@ -287,8 +293,60 @@ static int install_app_legacy(int argc, const char** argv, bool use_fastdeploy) return result; } +template +static int msBetween(TimePoint start, TimePoint end) { + return std::chrono::duration_cast(end - start).count(); +} + +static int install_app_incremental(int argc, const char** argv) { + printf("Performing Incremental Install\n"); + using clock = std::chrono::high_resolution_clock; + const auto start = clock::now(); + int first_apk = -1; + int last_apk = -1; + std::string cert_path; + bool wait = false; + std::vector args = {"package"}; + for (int i = 0; i < argc; ++i) { + const auto arg = std::string_view(argv[i]); + if (android::base::EndsWithIgnoreCase(arg, ".apk")) { + last_apk = i; + if (first_apk == -1) { + first_apk = i; + } + } else if (arg == "--wait") { + wait = true; + } else if (arg.starts_with("install-")) { + // incremental installation command on the device is the same for all its variations in + // the adb, e.g. install-multiple or install-multi-package + args.push_back("install"); + } else { + args.push_back(arg); + } + } + + if (first_apk == -1) error_exit("Need at least one APK file on command line"); + + const auto afterApk = clock::now(); + + auto server_process = incremental::install({argv + first_apk, argv + last_apk + 1}); + if (!server_process) { + return -1; + } + + const auto end = clock::now(); + printf("Install command complete (ms: %d total, %d apk prep, %d install)\n", + msBetween(start, end), msBetween(start, afterApk), msBetween(afterApk, end)); + + if (wait) { + (*server_process).wait(); + } + + return 0; +} + int install_app(int argc, const char** argv) { - std::vector processedArgIndicies; + std::vector processedArgIndices; InstallMode installMode = INSTALL_DEFAULT; bool use_fastdeploy = false; bool is_reinstall = false; @@ -296,30 +354,42 @@ int install_app(int argc, const char** argv) { for (int i = 1; i < argc; i++) { if (!strcmp(argv[i], "--streaming")) { - processedArgIndicies.push_back(i); + processedArgIndices.push_back(i); installMode = INSTALL_STREAM; } else if (!strcmp(argv[i], "--no-streaming")) { - processedArgIndicies.push_back(i); + processedArgIndices.push_back(i); installMode = INSTALL_PUSH; } else if (!strcmp(argv[i], "-r")) { - // Note that this argument is not added to processedArgIndicies because it + // Note that this argument is not added to processedArgIndices because it // must be passed through to pm is_reinstall = true; } else if (!strcmp(argv[i], "--fastdeploy")) { - processedArgIndicies.push_back(i); + processedArgIndices.push_back(i); use_fastdeploy = true; } else if (!strcmp(argv[i], "--no-fastdeploy")) { - processedArgIndicies.push_back(i); + processedArgIndices.push_back(i); use_fastdeploy = false; } else if (!strcmp(argv[i], "--force-agent")) { - processedArgIndicies.push_back(i); + processedArgIndices.push_back(i); agent_update_strategy = FastDeploy_AgentUpdateAlways; } else if (!strcmp(argv[i], "--date-check-agent")) { - processedArgIndicies.push_back(i); + processedArgIndices.push_back(i); agent_update_strategy = FastDeploy_AgentUpdateNewerTimeStamp; } else if (!strcmp(argv[i], "--version-check-agent")) { - processedArgIndicies.push_back(i); + processedArgIndices.push_back(i); agent_update_strategy = FastDeploy_AgentUpdateDifferentVersion; + } else if (!strcmp(argv[i], "--incremental")) { + processedArgIndices.push_back(i); + installMode = INSTALL_INCREMENTAL; + } else if (!strcmp(argv[i], "--no-incremental")) { + processedArgIndices.push_back(i); + installMode = INSTALL_DEFAULT; + } + } + + if (installMode == INSTALL_INCREMENTAL) { + if (get_device_api_level() < kIncrementalMinApi || !is_abb_exec_supported()) { + error_exit("Attempting to use incremental install on unsupported device"); } } @@ -341,8 +411,8 @@ int install_app(int argc, const char** argv) { std::vector passthrough_argv; for (int i = 0; i < argc; i++) { - if (std::find(processedArgIndicies.begin(), processedArgIndicies.end(), i) == - processedArgIndicies.end()) { + if (std::find(processedArgIndices.begin(), processedArgIndices.end(), i) == + processedArgIndices.end()) { passthrough_argv.push_back(argv[i]); } } @@ -357,6 +427,8 @@ int install_app(int argc, const char** argv) { case INSTALL_STREAM: return install_app_streamed(passthrough_argv.size(), passthrough_argv.data(), use_fastdeploy); + case INSTALL_INCREMENTAL: + return install_app_incremental(passthrough_argv.size(), passthrough_argv.data()); case INSTALL_DEFAULT: default: return 1; diff --git a/adb/client/commandline.cpp b/adb/client/commandline.cpp index c3029652c..84c0e0134 100644 --- a/adb/client/commandline.cpp +++ b/adb/client/commandline.cpp @@ -60,6 +60,7 @@ #include "client/file_sync_client.h" #include "commandline.h" #include "fastdeploy.h" +#include "incremental_server.h" #include "services.h" #include "shell_protocol.h" #include "sysdeps/chrono.h" @@ -1959,6 +1960,18 @@ int adb_commandline(int argc, const char** argv) { error_exit("usage: adb reconnect [device|offline]"); } } + } else if (!strcmp(argv[0], "inc-server")) { + if (argc < 3) { + error_exit("usage: adb inc-server FD FILE1 FILE2 ..."); + } + int fd = atoi(argv[1]); + if (fd < 3) { + // Disallow invalid FDs and stdin/out/err as well. + error_exit("Invalid fd number given: %d", fd); + } + fd = adb_register_socket(fd); + close_on_exec(fd); + return incremental::serve(fd, argc - 2, argv + 2); } error_exit("unknown command %s", argv[0]); diff --git a/adb/client/fastdeploy.cpp b/adb/client/fastdeploy.cpp index 5fa0edbac..c5fc12f0a 100644 --- a/adb/client/fastdeploy.cpp +++ b/adb/client/fastdeploy.cpp @@ -81,17 +81,21 @@ struct FileDeleter { } // namespace int get_device_api_level() { - REPORT_FUNC_TIME(); - std::vector sdk_version_output_buffer; - std::vector sdk_version_error_buffer; - int api_level = -1; + static const int api_level = [] { + REPORT_FUNC_TIME(); + std::vector sdk_version_output_buffer; + std::vector sdk_version_error_buffer; + int api_level = -1; - int statusCode = capture_shell_command("getprop ro.build.version.sdk", - &sdk_version_output_buffer, &sdk_version_error_buffer); - if (statusCode == 0 && sdk_version_output_buffer.size() > 0) { - api_level = strtol((char*)sdk_version_output_buffer.data(), NULL, 10); - } + int status_code = + capture_shell_command("getprop ro.build.version.sdk", &sdk_version_output_buffer, + &sdk_version_error_buffer); + if (status_code == 0 && sdk_version_output_buffer.size() > 0) { + api_level = strtol((char*)sdk_version_output_buffer.data(), nullptr, 10); + } + return api_level; + }(); return api_level; } diff --git a/adb/client/incremental.cpp b/adb/client/incremental.cpp new file mode 100644 index 000000000..cffd4bd0b --- /dev/null +++ b/adb/client/incremental.cpp @@ -0,0 +1,213 @@ +/* + * Copyright (C) 2020 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "incremental.h" + +#include +#include +#include + +#include "adb_client.h" +#include "adb_io.h" +#include "adb_utils.h" +#include "commandline.h" +#include "sysdeps.h" + +#ifndef _WIN32 +#include +#else +#define be32toh(x) _byteswap_ulong(x) +#endif + +using namespace std::literals; + +namespace incremental { + +namespace { + +static constexpr auto IDSIG = ".idsig"sv; + +using android::base::StringPrintf; + +using Size = int64_t; + +static inline int32_t read_int32(borrowed_fd fd) { + int32_t result; + ReadFully(fd, &result, sizeof(result)); + return result; +} + +static inline int32_t read_be_int32(borrowed_fd fd) { + return int32_t(be32toh(read_int32(fd))); +} + +static inline void append_bytes_with_size(borrowed_fd fd, std::vector* bytes) { + int32_t be_size = read_int32(fd); + int32_t size = int32_t(be32toh(be_size)); + auto old_size = bytes->size(); + bytes->resize(old_size + sizeof(be_size) + size); + memcpy(bytes->data() + old_size, &be_size, sizeof(be_size)); + ReadFully(fd, bytes->data() + old_size + sizeof(be_size), size); +} + +static inline std::pair, int32_t> read_id_sig_headers(borrowed_fd fd) { + std::vector result; + append_bytes_with_size(fd, &result); // verityRootHash + append_bytes_with_size(fd, &result); // v3Digest + append_bytes_with_size(fd, &result); // pkcs7SignatureBlock + auto tree_size = read_be_int32(fd); // size of the verity tree + return {std::move(result), tree_size}; +} + +static inline Size verity_tree_size_for_file(Size fileSize) { + constexpr int INCFS_DATA_FILE_BLOCK_SIZE = 4096; + constexpr int SHA256_DIGEST_SIZE = 32; + constexpr int digest_size = SHA256_DIGEST_SIZE; + constexpr int hash_per_block = INCFS_DATA_FILE_BLOCK_SIZE / digest_size; + + Size total_tree_block_count = 0; + + auto block_count = 1 + (fileSize - 1) / INCFS_DATA_FILE_BLOCK_SIZE; + auto hash_block_count = block_count; + for (auto i = 0; hash_block_count > 1; i++) { + hash_block_count = (hash_block_count + hash_per_block - 1) / hash_per_block; + total_tree_block_count += hash_block_count; + } + return total_tree_block_count * INCFS_DATA_FILE_BLOCK_SIZE; +} + +// Base64-encode signature bytes. Keeping fd at the position of start of verity tree. +static std::pair read_and_encode_signature(Size file_size, + std::string signature_file) { + signature_file += IDSIG; + + struct stat st; + if (stat(signature_file.c_str(), &st)) { + fprintf(stderr, "Failed to stat signature file %s. Abort.\n", signature_file.c_str()); + return {}; + } + + unique_fd fd(adb_open(signature_file.c_str(), O_RDONLY | O_CLOEXEC)); + if (fd < 0) { + fprintf(stderr, "Failed to open signature file: %s. Abort.\n", signature_file.c_str()); + return {}; + } + + auto [signature, tree_size] = read_id_sig_headers(fd); + if (auto expected = verity_tree_size_for_file(file_size); tree_size != expected) { + fprintf(stderr, + "Verity tree size mismatch in signature file: %s [was %lld, expected %lld].\n", + signature_file.c_str(), (long long)tree_size, (long long)expected); + return {}; + } + + size_t base64_len = 0; + if (!EVP_EncodedLength(&base64_len, signature.size())) { + fprintf(stderr, "Fail to estimate base64 encoded length. Abort.\n"); + return {}; + } + std::string encoded_signature; + encoded_signature.resize(base64_len); + encoded_signature.resize(EVP_EncodeBlock((uint8_t*)encoded_signature.data(), + (const uint8_t*)signature.data(), signature.size())); + + return {std::move(fd), std::move(encoded_signature)}; +} + +// Send install-incremental to the device along with properly configured file descriptors in +// streaming format. Once connection established, send all fs-verity tree bytes. +static unique_fd start_install(const std::vector& files) { + std::vector command_args{"package", "install-incremental"}; + + // fd's with positions at the beginning of fs-verity + std::vector signature_fds; + signature_fds.reserve(files.size()); + for (int i = 0, size = files.size(); i < size; ++i) { + const auto& file = files[i]; + + struct stat st; + if (stat(file.c_str(), &st)) { + fprintf(stderr, "Failed to stat input file %s. Abort.\n", file.c_str()); + return {}; + } + + auto [signature_fd, signature] = read_and_encode_signature(st.st_size, file); + if (!signature_fd.ok()) { + return {}; + } + + auto file_desc = + StringPrintf("%s:%lld:%s:%s", android::base::Basename(file).c_str(), + (long long)st.st_size, std::to_string(i).c_str(), signature.c_str()); + command_args.push_back(std::move(file_desc)); + + signature_fds.push_back(std::move(signature_fd)); + } + + std::string error; + auto connection_fd = unique_fd(send_abb_exec_command(command_args, &error)); + if (connection_fd < 0) { + fprintf(stderr, "Failed to run: %s, error: %s\n", + android::base::Join(command_args, " ").c_str(), error.c_str()); + return {}; + } + + // Pushing verity trees for all installation files. + for (auto&& local_fd : signature_fds) { + if (!copy_to_file(local_fd.get(), connection_fd.get())) { + fprintf(stderr, "Failed to stream tree bytes: %s. Abort.\n", strerror(errno)); + return {}; + } + } + + return connection_fd; +} + +} // namespace + +std::optional install(std::vector files) { + auto connection_fd = start_install(files); + if (connection_fd < 0) { + fprintf(stderr, "adb: failed to initiate installation on device.\n"); + return {}; + } + + std::string adb_path = android::base::GetExecutablePath(); + + auto osh = adb_get_os_handle(connection_fd.get()); +#ifdef _WIN32 + auto fd_param = std::to_string(reinterpret_cast(osh)); +#else /* !_WIN32 a.k.a. Unix */ + auto fd_param = std::to_string(osh); +#endif + + std::vector args(std::move(files)); + args.insert(args.begin(), {"inc-server", fd_param}); + auto child = adb_launch_process(adb_path, std::move(args), {connection_fd.get()}); + if (!child) { + fprintf(stderr, "adb: failed to fork: %s\n", strerror(errno)); + return {}; + } + + auto killOnExit = [](Process* p) { p->kill(); }; + std::unique_ptr serverKiller(&child, killOnExit); + // TODO: Terminate server process if installation fails. + serverKiller.release(); + + return child; +} + +} // namespace incremental diff --git a/adb/client/incremental.h b/adb/client/incremental.h new file mode 100644 index 000000000..4b9f6bde0 --- /dev/null +++ b/adb/client/incremental.h @@ -0,0 +1,30 @@ +/* + * Copyright (C) 2020 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "adb_unique_fd.h" + +#include +#include + +#include "sysdeps.h" + +namespace incremental { + +std::optional install(std::vector files); + +} // namespace incremental diff --git a/adb/client/incremental_server.cpp b/adb/client/incremental_server.cpp new file mode 100644 index 000000000..d9fd77ae2 --- /dev/null +++ b/adb/client/incremental_server.cpp @@ -0,0 +1,545 @@ +/* + * Copyright (C) 2020 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#define TRACE_TAG INCREMENTAL + +#include "incremental_server.h" + +#include "adb.h" +#include "adb_io.h" +#include "adb_trace.h" +#include "adb_unique_fd.h" +#include "adb_utils.h" +#include "sysdeps.h" + +#ifndef _WIN32 +#include +#else +#define be64toh(x) _byteswap_uint64(x) +#define be32toh(x) _byteswap_ulong(x) +#define be16toh(x) _byteswap_ushort(x) +#define htobe64(x) _byteswap_uint64(x) +#define htobe32(x) _byteswap_ulong(x) +#define htobe16(x) _byteswap_ushort(x) +#endif + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +namespace incremental { + +static constexpr int kBlockSize = 4096; +static constexpr int kCompressedSizeMax = kBlockSize * 0.95; +static constexpr short kCompressionNone = 0; +static constexpr short kCompressionLZ4 = 1; +static constexpr int kCompressBound = std::max(kBlockSize, LZ4_COMPRESSBOUND(kBlockSize)); +static constexpr auto kReadBufferSize = 128 * 1024; + +using BlockSize = int16_t; +using FileId = int16_t; +using BlockIdx = int32_t; +using NumBlocks = int32_t; +using CompressionType = int16_t; +using RequestType = int16_t; +using ChunkHeader = int32_t; +using MagicType = uint32_t; + +static constexpr MagicType INCR = 0x494e4352; // LE INCR + +static constexpr RequestType EXIT = 0; +static constexpr RequestType BLOCK_MISSING = 1; +static constexpr RequestType PREFETCH = 2; + +static constexpr inline int64_t roundDownToBlockOffset(int64_t val) { + return val & ~(kBlockSize - 1); +} + +static constexpr inline int64_t roundUpToBlockOffset(int64_t val) { + return roundDownToBlockOffset(val + kBlockSize - 1); +} + +static constexpr inline NumBlocks numBytesToNumBlocks(int64_t bytes) { + return roundUpToBlockOffset(bytes) / kBlockSize; +} + +static constexpr inline off64_t blockIndexToOffset(BlockIdx blockIdx) { + return static_cast(blockIdx) * kBlockSize; +} + +template +static inline constexpr T toBigEndian(T t) { + using unsigned_type = std::make_unsigned_t; + if constexpr (std::is_same_v) { + return htobe16(static_cast(t)); + } else if constexpr (std::is_same_v) { + return htobe32(static_cast(t)); + } else if constexpr (std::is_same_v) { + return htobe64(static_cast(t)); + } else { + return t; + } +} + +template +static inline constexpr T readBigEndian(void* data) { + using unsigned_type = std::make_unsigned_t; + if constexpr (std::is_same_v) { + return static_cast(be16toh(*reinterpret_cast(data))); + } else if constexpr (std::is_same_v) { + return static_cast(be32toh(*reinterpret_cast(data))); + } else if constexpr (std::is_same_v) { + return static_cast(be64toh(*reinterpret_cast(data))); + } else { + return T(); + } +} + +// Received from device +// !Does not include magic! +struct RequestCommand { + RequestType request_type; // 2 bytes + FileId file_id; // 2 bytes + union { + BlockIdx block_idx; + NumBlocks num_blocks; + }; // 4 bytes +} __attribute__((packed)); + +// Placed before actual data bytes of each block +struct ResponseHeader { + FileId file_id; // 2 bytes + CompressionType compression_type; // 2 bytes + BlockIdx block_idx; // 4 bytes + BlockSize block_size; // 2 bytes +} __attribute__((packed)); + +// Holds streaming state for a file +class File { + public: + // Plain file + File(const char* filepath, FileId id, int64_t size, unique_fd fd) : File(filepath, id, size) { + this->fd_ = std::move(fd); + } + int64_t ReadBlock(BlockIdx block_idx, void* buf, bool* is_zip_compressed, + std::string* error) const { + char* buf_ptr = static_cast(buf); + int64_t bytes_read = -1; + const off64_t offsetStart = blockIndexToOffset(block_idx); + bytes_read = adb_pread(fd_, &buf_ptr[sizeof(ResponseHeader)], kBlockSize, offsetStart); + return bytes_read; + } + + const unique_fd& RawFd() const { return fd_; } + + std::vector sentBlocks; + NumBlocks sentBlocksCount; + + const char* const filepath; + const FileId id; + const int64_t size; + + private: + File(const char* filepath, FileId id, int64_t size) : filepath(filepath), id(id), size(size) { + sentBlocks.resize(numBytesToNumBlocks(size)); + } + unique_fd fd_; +}; + +class IncrementalServer { + public: + IncrementalServer(unique_fd fd, std::vector files) + : adb_fd_(std::move(fd)), files_(std::move(files)) { + buffer_.reserve(kReadBufferSize); + } + + bool Serve(); + + private: + struct PrefetchState { + const File* file; + BlockIdx overallIndex = 0; + BlockIdx overallEnd = 0; + + PrefetchState(const File& f) : file(&f), overallEnd((BlockIdx)f.sentBlocks.size()) {} + PrefetchState(const File& f, BlockIdx start, int count) + : file(&f), + overallIndex(start), + overallEnd(std::min(start + count, f.sentBlocks.size())) {} + + bool done() const { return overallIndex >= overallEnd; } + }; + + bool SkipToRequest(void* buffer, size_t* size, bool blocking); + std::optional ReadRequest(bool blocking); + + void erase_buffer_head(int count) { buffer_.erase(buffer_.begin(), buffer_.begin() + count); } + + enum class SendResult { Sent, Skipped, Error }; + SendResult SendBlock(FileId fileId, BlockIdx blockIdx, bool flush = false); + bool SendDone(); + void RunPrefetching(); + + void Send(const void* data, size_t size, bool flush); + void Flush(); + using TimePoint = decltype(std::chrono::high_resolution_clock::now()); + bool Exit(std::optional startTime, int missesCount, int missesSent); + + unique_fd const adb_fd_; + std::vector files_; + + // Incoming data buffer. + std::vector buffer_; + + std::deque prefetches_; + int compressed_ = 0, uncompressed_ = 0; + long long sentSize_ = 0; + + std::vector pendingBlocks_; +}; + +bool IncrementalServer::SkipToRequest(void* buffer, size_t* size, bool blocking) { + while (true) { + // Looking for INCR magic. + bool magic_found = false; + int bcur = 0; + for (int bsize = buffer_.size(); bcur + 4 < bsize; ++bcur) { + uint32_t magic = be32toh(*(uint32_t*)(buffer_.data() + bcur)); + if (magic == INCR) { + magic_found = true; + break; + } + } + + if (bcur > 0) { + // Stream the rest to stderr. + fprintf(stderr, "%.*s", bcur, buffer_.data()); + erase_buffer_head(bcur); + } + + if (magic_found && buffer_.size() >= *size + sizeof(INCR)) { + // fine, return + memcpy(buffer, buffer_.data() + sizeof(INCR), *size); + erase_buffer_head(*size + sizeof(INCR)); + return true; + } + + adb_pollfd pfd = {adb_fd_.get(), POLLIN, 0}; + auto res = adb_poll(&pfd, 1, blocking ? -1 : 0); + if (res != 1) { + if (res < 0) { + fprintf(stderr, "Failed to poll: %s\n", strerror(errno)); + return false; + } + *size = 0; + return true; + } + + auto bsize = buffer_.size(); + buffer_.resize(kReadBufferSize); + int r = adb_read(adb_fd_, buffer_.data() + bsize, kReadBufferSize - bsize); + if (r > 0) { + buffer_.resize(bsize + r); + continue; + } + + if (r == -1) { + fprintf(stderr, "Failed to read from fd %d: %d. Exit\n", adb_fd_.get(), errno); + return true; + } + + // socket is closed + return false; + } +} + +std::optional IncrementalServer::ReadRequest(bool blocking) { + uint8_t commandBuf[sizeof(RequestCommand)]; + auto size = sizeof(commandBuf); + if (!SkipToRequest(&commandBuf, &size, blocking)) { + return {{EXIT}}; + } + if (size < sizeof(RequestCommand)) { + return {}; + } + RequestCommand request; + request.request_type = readBigEndian(&commandBuf[0]); + request.file_id = readBigEndian(&commandBuf[2]); + request.block_idx = readBigEndian(&commandBuf[4]); + return request; +} + +auto IncrementalServer::SendBlock(FileId fileId, BlockIdx blockIdx, bool flush) -> SendResult { + auto& file = files_[fileId]; + if (blockIdx >= static_cast(file.sentBlocks.size())) { + fprintf(stderr, "Failed to read file %s at block %" PRId32 " (past end).\n", file.filepath, + blockIdx); + return SendResult::Error; + } + if (file.sentBlocks[blockIdx]) { + return SendResult::Skipped; + } + std::string error; + char raw[sizeof(ResponseHeader) + kBlockSize]; + bool isZipCompressed = false; + const int64_t bytesRead = file.ReadBlock(blockIdx, &raw, &isZipCompressed, &error); + if (bytesRead < 0) { + fprintf(stderr, "Failed to get data for %s at blockIdx=%d (%s).\n", file.filepath, blockIdx, + error.c_str()); + return SendResult::Error; + } + + ResponseHeader* header = nullptr; + char data[sizeof(ResponseHeader) + kCompressBound]; + char* compressed = data + sizeof(*header); + int16_t compressedSize = 0; + if (!isZipCompressed) { + compressedSize = + LZ4_compress_default(raw + sizeof(*header), compressed, bytesRead, kCompressBound); + } + int16_t blockSize; + if (compressedSize > 0 && compressedSize < kCompressedSizeMax) { + ++compressed_; + blockSize = compressedSize; + header = reinterpret_cast(data); + header->compression_type = toBigEndian(kCompressionLZ4); + } else { + ++uncompressed_; + blockSize = bytesRead; + header = reinterpret_cast(raw); + header->compression_type = toBigEndian(kCompressionNone); + } + + header->file_id = toBigEndian(fileId); + header->block_size = toBigEndian(blockSize); + header->block_idx = toBigEndian(blockIdx); + + file.sentBlocks[blockIdx] = true; + file.sentBlocksCount += 1; + Send(header, sizeof(*header) + blockSize, flush); + return SendResult::Sent; +} + +bool IncrementalServer::SendDone() { + ResponseHeader header; + header.file_id = -1; + header.compression_type = 0; + header.block_idx = 0; + header.block_size = 0; + Send(&header, sizeof(header), true); + return true; +} + +void IncrementalServer::RunPrefetching() { + constexpr auto kPrefetchBlocksPerIteration = 128; + + int blocksToSend = kPrefetchBlocksPerIteration; + while (!prefetches_.empty() && blocksToSend > 0) { + auto& prefetch = prefetches_.front(); + const auto& file = *prefetch.file; + for (auto& i = prefetch.overallIndex; blocksToSend > 0 && i < prefetch.overallEnd; ++i) { + if (auto res = SendBlock(file.id, i); res == SendResult::Sent) { + --blocksToSend; + } else if (res == SendResult::Error) { + fprintf(stderr, "Failed to send block %" PRId32 "\n", i); + } + } + if (prefetch.done()) { + prefetches_.pop_front(); + } + } +} + +void IncrementalServer::Send(const void* data, size_t size, bool flush) { + constexpr auto kChunkFlushSize = 31 * kBlockSize; + + if (pendingBlocks_.empty()) { + pendingBlocks_.resize(sizeof(ChunkHeader)); + } + pendingBlocks_.insert(pendingBlocks_.end(), static_cast(data), + static_cast(data) + size); + if (flush || pendingBlocks_.size() > kChunkFlushSize) { + Flush(); + } +} + +void IncrementalServer::Flush() { + if (pendingBlocks_.empty()) { + return; + } + + *(ChunkHeader*)pendingBlocks_.data() = + toBigEndian(pendingBlocks_.size() - sizeof(ChunkHeader)); + if (!WriteFdExactly(adb_fd_, pendingBlocks_.data(), pendingBlocks_.size())) { + fprintf(stderr, "Failed to write %d bytes\n", int(pendingBlocks_.size())); + } + sentSize_ += pendingBlocks_.size(); + pendingBlocks_.clear(); +} + +bool IncrementalServer::Exit(std::optional startTime, int missesCount, int missesSent) { + using namespace std::chrono; + auto endTime = high_resolution_clock::now(); + fprintf(stderr, + "Connection failed or received exit command. Exit.\n" + "Misses: %d, of those unique: %d; sent compressed: %d, uncompressed: " + "%d, mb: %.3f\n" + "Total time taken: %.3fms\n", + missesCount, missesSent, compressed_, uncompressed_, sentSize_ / 1024.0 / 1024.0, + duration_cast(endTime - (startTime ? *startTime : endTime)).count() / + 1000.0); + return true; +} + +bool IncrementalServer::Serve() { + // Initial handshake to verify connection is still alive + if (!SendOkay(adb_fd_)) { + fprintf(stderr, "Connection is dead. Abort.\n"); + return false; + } + + std::unordered_set prefetchedFiles; + bool doneSent = false; + int missesCount = 0; + int missesSent = 0; + + using namespace std::chrono; + std::optional startTime; + + while (true) { + if (!doneSent && prefetches_.empty() && + std::all_of(files_.begin(), files_.end(), [](const File& f) { + return f.sentBlocksCount == NumBlocks(f.sentBlocks.size()); + })) { + fprintf(stdout, "All files should be loaded. Notifying the device.\n"); + SendDone(); + doneSent = true; + } + + const bool blocking = prefetches_.empty(); + if (blocking) { + // We've no idea how long the blocking call is, so let's flush whatever is still unsent. + Flush(); + } + auto request = ReadRequest(blocking); + + if (!startTime) { + startTime = high_resolution_clock::now(); + } + + if (request) { + FileId fileId = request->file_id; + BlockIdx blockIdx = request->block_idx; + + switch (request->request_type) { + case EXIT: { + // Stop everything. + return Exit(startTime, missesCount, missesSent); + } + case BLOCK_MISSING: { + ++missesCount; + // Sends one single block ASAP. + if (fileId < 0 || fileId >= (FileId)files_.size() || blockIdx < 0 || + blockIdx >= (BlockIdx)files_[fileId].sentBlocks.size()) { + fprintf(stderr, + "Received invalid data request for file_id %" PRId16 + " block_idx %" PRId32 ".\n", + fileId, blockIdx); + break; + } + // fprintf(stderr, "\treading file %d block %04d\n", (int)fileId, + // (int)blockIdx); + if (auto res = SendBlock(fileId, blockIdx, true); res == SendResult::Error) { + fprintf(stderr, "Failed to send block %" PRId32 ".\n", blockIdx); + } else if (res == SendResult::Sent) { + ++missesSent; + // Make sure we send more pages from this place onward, in case if the OS is + // reading a bigger block. + prefetches_.emplace_front(files_[fileId], blockIdx + 1, 7); + } + break; + } + case PREFETCH: { + // Start prefetching for a file + if (fileId < 0) { + fprintf(stderr, + "Received invalid prefetch request for file_id %" PRId16 "\n", + fileId); + break; + } + if (!prefetchedFiles.insert(fileId).second) { + fprintf(stderr, + "Received duplicate prefetch request for file_id %" PRId16 "\n", + fileId); + break; + } + D("Received prefetch request for file_id %" PRId16 ".\n", fileId); + prefetches_.emplace_back(files_[fileId]); + break; + } + default: + fprintf(stderr, "Invalid request %" PRId16 ",%" PRId16 ",%" PRId32 ".\n", + request->request_type, fileId, blockIdx); + break; + } + } + + RunPrefetching(); + } +} + +bool serve(int adb_fd, int argc, const char** argv) { + auto connection_fd = unique_fd(adb_fd); + if (argc <= 0) { + error_exit("inc-server: must specify at least one file."); + } + + std::vector files; + files.reserve(argc); + for (int i = 0; i < argc; ++i) { + auto filepath = argv[i]; + + struct stat st; + if (stat(filepath, &st)) { + fprintf(stderr, "Failed to stat input file %s. Abort.\n", filepath); + return {}; + } + + unique_fd fd(adb_open(filepath, O_RDONLY)); + if (fd < 0) { + error_exit("inc-server: failed to open file '%s'.", filepath); + } + files.emplace_back(filepath, i, st.st_size, std::move(fd)); + } + + IncrementalServer server(std::move(connection_fd), std::move(files)); + printf("Serving...\n"); + fclose(stdin); + fclose(stdout); + return server.Serve(); +} + +} // namespace incremental diff --git a/adb/client/incremental_server.h b/adb/client/incremental_server.h new file mode 100644 index 000000000..53f011eb3 --- /dev/null +++ b/adb/client/incremental_server.h @@ -0,0 +1,26 @@ +/* + * Copyright (C) 2020 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +namespace incremental { + +// Expecting arguments like: +// {FILE1 FILE2 ...} +// Where FILE* are files to serve. +bool serve(int adbFd, int argc, const char** argv); + +} // namespace incremental diff --git a/adb/sysdeps.h b/adb/sysdeps.h index 0c5a6b4c1..231839505 100644 --- a/adb/sysdeps.h +++ b/adb/sysdeps.h @@ -267,6 +267,39 @@ inline void seekdir(DIR*, long) { #define getcwd adb_getcwd +// A very simple wrapper over a launched child process +class Process { + public: + constexpr explicit Process(HANDLE h = nullptr) : h_(h) {} + ~Process() { close(); } + constexpr explicit operator bool() const { return h_ != nullptr; } + + void wait() { + if (*this) { + ::WaitForSingleObject(h_, INFINITE); + close(); + } + } + void kill() { + if (*this) { + ::TerminateProcess(h_, -1); + } + } + + private: + void close() { + if (*this) { + ::CloseHandle(h_); + h_ = nullptr; + } + } + + HANDLE h_; +}; + +Process adb_launch_process(std::string_view executable, std::vector args, + std::initializer_list fds_to_inherit = {}); + // Helper class to convert UTF-16 argv from wmain() to UTF-8 args that can be // passed to main(). class NarrowArgs { @@ -432,11 +465,11 @@ static __inline__ int adb_read(borrowed_fd fd, void* buf, size_t len) { return TEMP_FAILURE_RETRY(read(fd.get(), buf, len)); } -static __inline__ int adb_pread(int fd, void* buf, size_t len, off64_t offset) { +static __inline__ int adb_pread(borrowed_fd fd, void* buf, size_t len, off64_t offset) { #if defined(__APPLE__) - return TEMP_FAILURE_RETRY(pread(fd, buf, len, offset)); + return TEMP_FAILURE_RETRY(pread(fd.get(), buf, len, offset)); #else - return TEMP_FAILURE_RETRY(pread64(fd, buf, len, offset)); + return TEMP_FAILURE_RETRY(pread64(fd.get(), buf, len, offset)); #endif } @@ -612,6 +645,32 @@ static __inline__ int adb_get_os_handle(borrowed_fd fd) { return fd.get(); } +// A very simple wrapper over a launched child process +class Process { + public: + constexpr explicit Process(pid_t pid) : pid_(pid) {} + constexpr explicit operator bool() const { return pid_ >= 0; } + + void wait() { + if (*this) { + int status; + ::waitpid(pid_, &status, 0); + pid_ = -1; + } + } + void kill() { + if (*this) { + ::kill(pid_, SIGTERM); + } + } + + private: + pid_t pid_; +}; + +Process adb_launch_process(std::string_view executable, std::vector args, + std::initializer_list fds_to_inherit = {}); + #endif /* !_WIN32 */ static inline void disable_tcp_nagle(borrowed_fd fd) { diff --git a/adb/sysdeps_unix.cpp b/adb/sysdeps_unix.cpp index 3fdc917dd..e56570676 100644 --- a/adb/sysdeps_unix.cpp +++ b/adb/sysdeps_unix.cpp @@ -56,3 +56,37 @@ bool set_tcp_keepalive(borrowed_fd fd, int interval_sec) { return true; } + +static __inline__ void disable_close_on_exec(borrowed_fd fd) { + const auto oldFlags = fcntl(fd.get(), F_GETFD); + const auto newFlags = (oldFlags & ~FD_CLOEXEC); + if (newFlags != oldFlags) { + fcntl(fd.get(), F_SETFD, newFlags); + } +} + +Process adb_launch_process(std::string_view executable, std::vector args, + std::initializer_list fds_to_inherit) { + const auto pid = fork(); + if (pid != 0) { + // parent, includes the case when failed to fork() + return Process(pid); + } + // child + std::vector copies; + copies.reserve(args.size() + 1); + copies.emplace_back(executable); + copies.insert(copies.end(), std::make_move_iterator(args.begin()), + std::make_move_iterator(args.end())); + + std::vector rawArgs; + rawArgs.reserve(copies.size() + 1); + for (auto&& str : copies) { + rawArgs.push_back(str.data()); + } + rawArgs.push_back(nullptr); + for (auto fd : fds_to_inherit) { + disable_close_on_exec(fd); + } + exit(execv(copies.front().data(), rawArgs.data())); +} diff --git a/adb/sysdeps_win32.cpp b/adb/sysdeps_win32.cpp index d9cc36f83..e33d51cc3 100644 --- a/adb/sysdeps_win32.cpp +++ b/adb/sysdeps_win32.cpp @@ -2771,6 +2771,66 @@ char* adb_getcwd(char* buf, int size) { return buf; } +void enable_inherit(borrowed_fd fd) { + auto osh = adb_get_os_handle(fd); + const auto h = reinterpret_cast(osh); + ::SetHandleInformation(h, HANDLE_FLAG_INHERIT, HANDLE_FLAG_INHERIT); +} + +void disable_inherit(borrowed_fd fd) { + auto osh = adb_get_os_handle(fd); + const auto h = reinterpret_cast(osh); + ::SetHandleInformation(h, HANDLE_FLAG_INHERIT, 0); +} + +Process adb_launch_process(std::string_view executable, std::vector args, + std::initializer_list fds_to_inherit) { + std::wstring wexe; + if (!android::base::UTF8ToWide(executable.data(), executable.size(), &wexe)) { + return Process(); + } + + std::wstring wargs = L"\"" + wexe + L"\""; + std::wstring warg; + for (auto arg : args) { + warg.clear(); + if (!android::base::UTF8ToWide(arg.data(), arg.size(), &warg)) { + return Process(); + } + wargs += L" \""; + wargs += warg; + wargs += L'\"'; + } + + STARTUPINFOW sinfo = {sizeof(sinfo)}; + PROCESS_INFORMATION pinfo = {}; + + // TODO: use the Vista+ API to pass the list of inherited handles explicitly; + // see http://blogs.msdn.com/b/oldnewthing/archive/2011/12/16/10248328.aspx + for (auto fd : fds_to_inherit) { + enable_inherit(fd); + } + const auto created = CreateProcessW(wexe.c_str(), wargs.data(), + nullptr, // process attributes + nullptr, // thread attributes + fds_to_inherit.size() > 0, // inherit any handles? + 0, // flags + nullptr, // environment + nullptr, // current directory + &sinfo, // startup info + &pinfo); + for (auto fd : fds_to_inherit) { + disable_inherit(fd); + } + + if (!created) { + return Process(); + } + + ::CloseHandle(pinfo.hThread); + return Process(pinfo.hProcess); +} + // The SetThreadDescription API was brought in version 1607 of Windows 10. typedef HRESULT(WINAPI* SetThreadDescription)(HANDLE hThread, PCWSTR lpThreadDescription);