Merge "adb: fix sync."

This commit is contained in:
Treehugger Robot 2020-03-23 06:06:25 +00:00 committed by Gerrit Code Review
commit 4abafe2230

View file

@ -204,7 +204,8 @@ struct TransferLedger {
class SyncConnection {
public:
SyncConnection() {
SyncConnection() : acknowledgement_buffer_(sizeof(sync_status) + SYNC_DATA_MAX) {
acknowledgement_buffer_.resize(0);
max = SYNC_DATA_MAX; // TODO: decide at runtime.
std::string error;
@ -502,34 +503,6 @@ class SyncConnection {
return WriteOrDie(lpath, rpath, &msg.data, sizeof(msg.data));
}
bool ReadAcknowledgments() {
bool result = true;
while (!deferred_acknowledgements_.empty()) {
auto [from, to] = std::move(deferred_acknowledgements_.front());
deferred_acknowledgements_.pop_front();
result &= CopyDone(from, to);
}
return result;
}
bool CopyDone(const std::string& from, const std::string& to) {
syncmsg msg;
if (!ReadFdExactly(fd, &msg.status, sizeof(msg.status))) {
Error("failed to copy '%s' to '%s': couldn't read from device", from.c_str(),
to.c_str());
return false;
}
if (msg.status.id == ID_OKAY) {
return true;
}
if (msg.status.id != ID_FAIL) {
Error("failed to copy '%s' to '%s': unknown reason %d", from.c_str(), to.c_str(),
msg.status.id);
return false;
}
return ReportCopyFailure(from, to, msg);
}
bool ReportCopyFailure(const std::string& from, const std::string& to, const syncmsg& msg) {
std::vector<char> buf(msg.status.msglen + 1);
if (!ReadFdExactly(fd, &buf[0], msg.status.msglen)) {
@ -542,6 +515,97 @@ class SyncConnection {
return false;
}
void CopyDone() { deferred_acknowledgements_.pop_front(); }
void ReportDeferredCopyFailure(const std::string& msg) {
auto& [from, to] = deferred_acknowledgements_.front();
Error("failed to copy '%s' to '%s': remote %s", from.c_str(), to.c_str(), msg.c_str());
deferred_acknowledgements_.pop_front();
}
bool ReadAcknowledgements(bool read_all = false) {
// We need to read enough such that adbd's intermediate socket's write buffer can't be
// full. The default buffer on Linux is 212992 bytes, but there's 576 bytes of bookkeeping
// overhead per write. The worst case scenario is a continuous string of failures, since
// each logical packet is divided into two writes. If our packet size if conservatively 512
// bytes long, this leaves us with space for 128 responses.
constexpr size_t max_deferred_acks = 128;
auto& buf = acknowledgement_buffer_;
adb_pollfd pfd = {.fd = fd.get(), .events = POLLIN};
while (!deferred_acknowledgements_.empty()) {
bool should_block = read_all || deferred_acknowledgements_.size() >= max_deferred_acks;
ssize_t rc = adb_poll(&pfd, 1, should_block ? -1 : 0);
if (rc == 0) {
CHECK(!should_block);
return true;
}
if (acknowledgement_buffer_.size() < sizeof(sync_status)) {
const ssize_t header_bytes_left = sizeof(sync_status) - buf.size();
ssize_t rc = adb_read(fd, buf.end(), header_bytes_left);
if (rc <= 0) {
Error("failed to read copy response");
return false;
}
buf.resize(buf.size() + rc);
if (rc != header_bytes_left) {
// Early exit if we run out of data in the socket.
return true;
}
if (!should_block) {
// We don't want to read again yet, because the socket might be empty.
continue;
}
}
auto* hdr = reinterpret_cast<sync_status*>(buf.data());
if (hdr->id == ID_OKAY) {
buf.resize(0);
if (hdr->msglen != 0) {
Error("received ID_OKAY with msg_len (%" PRIu32 " != 0", hdr->msglen);
return false;
}
CopyDone();
continue;
} else if (hdr->id != ID_FAIL) {
Error("unexpected response from daemon: id = %#" PRIx32, hdr->id);
return false;
} else if (hdr->msglen > SYNC_DATA_MAX) {
Error("too-long message length from daemon: msglen = %" PRIu32, hdr->msglen);
return false;
}
const ssize_t msg_bytes_left = hdr->msglen + sizeof(sync_status) - buf.size();
CHECK_GE(msg_bytes_left, 0);
if (msg_bytes_left > 0) {
ssize_t rc = adb_read(fd, buf.end(), msg_bytes_left);
if (rc <= 0) {
Error("failed to read copy failure message");
return false;
}
buf.resize(buf.size() + rc);
if (rc != msg_bytes_left) {
if (should_block) {
continue;
} else {
return true;
}
}
std::string msg(buf.begin() + sizeof(sync_status), buf.end());
ReportDeferredCopyFailure(msg);
buf.resize(0);
return false;
}
}
return true;
}
void Printf(const char* fmt, ...) __attribute__((__format__(__printf__, 2, 3))) {
std::string s;
@ -608,6 +672,7 @@ class SyncConnection {
private:
std::deque<std::pair<std::string, std::string>> deferred_acknowledgements_;
Block acknowledgement_buffer_;
FeatureSet features_;
bool have_stat_v2_;
bool have_ls_v2_;
@ -716,7 +781,7 @@ static bool sync_send(SyncConnection& sc, const std::string& lpath, const std::s
if (!sc.SendSmallFile(rpath, mode, lpath, rpath, mtime, buf, data_length)) {
return false;
}
return true;
return sc.ReadAcknowledgements();
#endif
}
@ -739,7 +804,7 @@ static bool sync_send(SyncConnection& sc, const std::string& lpath, const std::s
return false;
}
}
return true;
return sc.ReadAcknowledgements();
}
static bool sync_recv(SyncConnection& sc, const char* rpath, const char* lpath,
@ -966,8 +1031,9 @@ static bool copy_local_dir_remote(SyncConnection& sc, std::string lpath,
}
sc.RecordFilesSkipped(skipped);
bool success = sc.ReadAcknowledgements(true);
sc.ReportTransferRate(lpath, TransferDirection::push);
return true;
return success;
}
bool do_sync_push(const std::vector<const char*>& srcs, const char* dst, bool sync) {
@ -1060,7 +1126,7 @@ bool do_sync_push(const std::vector<const char*>& srcs, const char* dst, bool sy
sc.ReportTransferRate(src_path, TransferDirection::push);
}
success &= sc.ReadAcknowledgments();
success &= sc.ReadAcknowledgements(true);
sc.ReportOverallTransferRate(TransferDirection::push);
return success;
}