diff --git a/adb/adb.cpp b/adb/adb.cpp index 19300f660..38c6f62c9 100644 --- a/adb/adb.cpp +++ b/adb/adb.cpp @@ -920,13 +920,45 @@ int launch_server(const std::string& socket_spec) { } #endif /* ADB_HOST */ +bool handle_forward_request(const char* service, atransport* transport, int reply_fd) { + return handle_forward_request(service, [transport](std::string*) { return transport; }, + reply_fd); +} + // Try to handle a network forwarding request. -// This returns 1 on success, 0 on failure, and -1 to indicate this is not -// a forwarding-related request. -int handle_forward_request(const char* service, atransport* transport, int reply_fd) { +bool handle_forward_request(const char* service, + std::function transport_acquirer, + int reply_fd) { + if (!strcmp(service, "list-forward")) { + // Create the list of forward redirections. + std::string listeners = format_listeners(); +#if ADB_HOST + SendOkay(reply_fd); +#endif + SendProtocolString(reply_fd, listeners); + return true; + } + + if (!strcmp(service, "killforward-all")) { + remove_all_listeners(); +#if ADB_HOST + /* On the host: 1st OKAY is connect, 2nd OKAY is status */ + SendOkay(reply_fd); +#endif + SendOkay(reply_fd); + return true; + } + if (!strncmp(service, "forward:", 8) || !strncmp(service, "killforward:", 12)) { // killforward:local // forward:(norebind:)?local;remote + std::string error; + atransport* transport = transport_acquirer(&error); + if (!transport) { + SendFail(reply_fd, error); + return true; + } + bool kill_forward = false; bool no_rebind = false; if (android::base::StartsWith(service, "killforward:")) { @@ -946,17 +978,16 @@ int handle_forward_request(const char* service, atransport* transport, int reply // Check killforward: parameter format: '' if (pieces.size() != 1 || pieces[0].empty()) { SendFail(reply_fd, android::base::StringPrintf("bad killforward: %s", service)); - return 1; + return true; } } else { // Check forward: parameter format: ';' if (pieces.size() != 2 || pieces[0].empty() || pieces[1].empty() || pieces[1][0] == '*') { SendFail(reply_fd, android::base::StringPrintf("bad forward: %s", service)); - return 1; + return true; } } - std::string error; InstallStatus r; int resolved_tcp_port = 0; if (kill_forward) { @@ -977,7 +1008,7 @@ int handle_forward_request(const char* service, atransport* transport, int reply SendProtocolString(reply_fd, android::base::StringPrintf("%d", resolved_tcp_port)); } - return 1; + return true; } std::string message; @@ -996,9 +1027,10 @@ int handle_forward_request(const char* service, atransport* transport, int reply break; } SendFail(reply_fd, message); - return 1; + return true; } - return 0; + + return false; } #if ADB_HOST @@ -1186,35 +1218,15 @@ int handle_host_request(const char* service, TransportType type, const char* ser return SendOkay(reply_fd, response); } - if (!strcmp(service, "list-forward")) { - // Create the list of forward redirections. - std::string listeners = format_listeners(); -#if ADB_HOST - SendOkay(reply_fd); -#endif - return SendProtocolString(reply_fd, listeners); + if (handle_forward_request(service, + [=](std::string* error) { + return acquire_one_transport(type, serial, transport_id, nullptr, + error); + }, + reply_fd)) { + return 0; } - if (!strcmp(service, "killforward-all")) { - remove_all_listeners(); -#if ADB_HOST - /* On the host: 1st OKAY is connect, 2nd OKAY is status */ - SendOkay(reply_fd); -#endif - SendOkay(reply_fd); - return 1; - } - - std::string error; - atransport* t = acquire_one_transport(type, serial, transport_id, nullptr, &error); - if (!t) { - SendFail(reply_fd, error); - return 1; - } - - int ret = handle_forward_request(service, t, reply_fd); - if (ret >= 0) - return ret - 1; return -1; } diff --git a/adb/adb.h b/adb/adb.h index 13ca4d7e0..e6af780c4 100644 --- a/adb/adb.h +++ b/adb/adb.h @@ -158,7 +158,10 @@ asocket* create_jdwp_tracker_service_socket(); unique_fd create_jdwp_connection_fd(int jdwp_pid); #endif -int handle_forward_request(const char* service, atransport* transport, int reply_fd); +bool handle_forward_request(const char* service, atransport* transport, int reply_fd); +bool handle_forward_request(const char* service, + std::function transport_acquirer, + int reply_fd); /* packet allocator */ apacket* get_apacket(void); diff --git a/adb/client/commandline.cpp b/adb/client/commandline.cpp index 2375ebce4..da273fd3f 100644 --- a/adb/client/commandline.cpp +++ b/adb/client/commandline.cpp @@ -1614,9 +1614,9 @@ int adb_commandline(int argc, const char** argv) { return bugreport.DoIt(argc, argv); } else if (!strcmp(argv[0], "forward") || !strcmp(argv[0], "reverse")) { bool reverse = !strcmp(argv[0], "reverse"); - ++argv; --argc; if (argc < 1) return syntax_error("%s requires an argument", argv[0]); + ++argv; // Determine the for this command. std::string host_prefix; diff --git a/adb/daemon/services.cpp b/adb/daemon/services.cpp index 25024b05a..1f59d6446 100644 --- a/adb/daemon/services.cpp +++ b/adb/daemon/services.cpp @@ -157,7 +157,7 @@ unique_fd reverse_service(const char* command, atransport* transport) { return unique_fd{}; } VLOG(SERVICES) << "service socketpair: " << s[0] << ", " << s[1]; - if (handle_forward_request(command, transport, s[1]) < 0) { + if (!handle_forward_request(command, transport, s[1])) { SendFail(s[1], "not a reverse forwarding command"); } adb_close(s[1]); diff --git a/adb/transport.cpp b/adb/transport.cpp index 793c283fc..922200826 100644 --- a/adb/transport.cpp +++ b/adb/transport.cpp @@ -50,6 +50,7 @@ #include "adb_trace.h" #include "adb_utils.h" #include "fdevent.h" +#include "sysdeps/chrono.h" static void register_transport(atransport* transport); static void remove_transport(atransport* transport); @@ -80,6 +81,7 @@ class SCOPED_CAPABILITY ScopedAssumeLocked { ~ScopedAssumeLocked() RELEASE() {} }; +#if ADB_HOST // Tracks and handles atransport*s that are attempting reconnection. class ReconnectHandler { public: @@ -102,12 +104,18 @@ class ReconnectHandler { // Tracks a reconnection attempt. struct ReconnectAttempt { atransport* transport; - std::chrono::system_clock::time_point deadline; + std::chrono::steady_clock::time_point reconnect_time; size_t attempts_left; + + bool operator<(const ReconnectAttempt& rhs) const { + // std::priority_queue returns the largest element first, so we want attempts that have + // less time remaining (i.e. smaller time_points) to compare greater. + return reconnect_time > rhs.reconnect_time; + } }; // Only retry for up to one minute. - static constexpr const std::chrono::seconds kDefaultTimeout = std::chrono::seconds(10); + static constexpr const std::chrono::seconds kDefaultTimeout = 10s; static constexpr const size_t kMaxAttempts = 6; // Protects all members. @@ -115,7 +123,7 @@ class ReconnectHandler { bool running_ GUARDED_BY(reconnect_mutex_) = true; std::thread handler_thread_; std::condition_variable reconnect_cv_; - std::queue reconnect_queue_ GUARDED_BY(reconnect_mutex_); + std::priority_queue reconnect_queue_ GUARDED_BY(reconnect_mutex_); DISALLOW_COPY_AND_ASSIGN(ReconnectHandler); }; @@ -137,7 +145,7 @@ void ReconnectHandler::Stop() { // Drain the queue to free all resources. std::lock_guard lock(reconnect_mutex_); while (!reconnect_queue_.empty()) { - ReconnectAttempt attempt = reconnect_queue_.front(); + ReconnectAttempt attempt = reconnect_queue_.top(); reconnect_queue_.pop(); remove_transport(attempt.transport); } @@ -148,9 +156,10 @@ void ReconnectHandler::TrackTransport(atransport* transport) { { std::lock_guard lock(reconnect_mutex_); if (!running_) return; - reconnect_queue_.emplace(ReconnectAttempt{ - transport, std::chrono::system_clock::now() + ReconnectHandler::kDefaultTimeout, - ReconnectHandler::kMaxAttempts}); + // Arbitrary sleep to give adbd time to get ready, if we disconnected because it exited. + auto reconnect_time = std::chrono::steady_clock::now() + 250ms; + reconnect_queue_.emplace( + ReconnectAttempt{transport, reconnect_time, ReconnectHandler::kMaxAttempts}); } reconnect_cv_.notify_one(); } @@ -162,15 +171,27 @@ void ReconnectHandler::Run() { std::unique_lock lock(reconnect_mutex_); ScopedAssumeLocked assume_lock(reconnect_mutex_); - auto deadline = std::chrono::time_point::max(); - if (!reconnect_queue_.empty()) deadline = reconnect_queue_.front().deadline; - reconnect_cv_.wait_until(lock, deadline, [&]() REQUIRES(reconnect_mutex_) { - return !running_ || - (!reconnect_queue_.empty() && reconnect_queue_.front().deadline < deadline); - }); + if (!reconnect_queue_.empty()) { + // FIXME: libstdc++ (used on Windows) implements condition_variable with + // system_clock as its clock, so we're probably hosed if the clock changes, + // even if we use steady_clock throughout. This problem goes away once we + // switch to libc++. + reconnect_cv_.wait_until(lock, reconnect_queue_.top().reconnect_time); + } else { + reconnect_cv_.wait(lock); + } if (!running_) return; - attempt = reconnect_queue_.front(); + if (reconnect_queue_.empty()) continue; + + // Go back to sleep in case |reconnect_cv_| woke up spuriously and we still + // have more time to wait for the current attempt. + auto now = std::chrono::steady_clock::now(); + if (reconnect_queue_.top().reconnect_time > now) { + continue; + } + + attempt = reconnect_queue_.top(); reconnect_queue_.pop(); if (attempt.transport->kicked()) { D("transport %s was kicked. giving up on it.", attempt.transport->serial.c_str()); @@ -191,9 +212,9 @@ void ReconnectHandler::Run() { std::lock_guard lock(reconnect_mutex_); reconnect_queue_.emplace(ReconnectAttempt{ - attempt.transport, - std::chrono::system_clock::now() + ReconnectHandler::kDefaultTimeout, - attempt.attempts_left - 1}); + attempt.transport, + std::chrono::steady_clock::now() + ReconnectHandler::kDefaultTimeout, + attempt.attempts_left - 1}); continue; } @@ -204,6 +225,8 @@ void ReconnectHandler::Run() { static auto& reconnect_handler = *new ReconnectHandler(); +#endif + } // namespace TransportId NextTransportId() { @@ -677,9 +700,11 @@ static void transport_registration_func(int _fd, unsigned ev, void*) { update_transports(); } +#if ADB_HOST void init_reconnect_handler(void) { reconnect_handler.Start(); } +#endif void init_transport_registration(void) { int s[2]; @@ -698,7 +723,9 @@ void init_transport_registration(void) { } void kick_all_transports() { +#if ADB_HOST reconnect_handler.Stop(); +#endif // To avoid only writing part of a packet to a transport after exit, kick all transports. std::lock_guard lock(transport_lock); for (auto t : transport_list) { @@ -736,13 +763,19 @@ static void transport_unref(atransport* t) { t->ref_count--; if (t->ref_count == 0) { t->connection()->Stop(); +#if ADB_HOST if (t->IsTcpDevice() && !t->kicked()) { - D("transport: %s unref (attempting reconnection) %d", t->serial.c_str(), t->kicked()); + D("transport: %s unref (attempting reconnection)", t->serial.c_str()); reconnect_handler.TrackTransport(t); } else { D("transport: %s unref (kicking and closing)", t->serial.c_str()); remove_transport(t); } +#else + D("transport: %s unref (kicking and closing)", t->serial.c_str()); + remove_transport(t); +#endif + } else { D("transport: %s unref (count=%zu)", t->serial.c_str(), t->ref_count); }