diff --git a/adb/adb.h b/adb/adb.h index cb38e6148..9227eb172 100644 --- a/adb/adb.h +++ b/adb/adb.h @@ -188,7 +188,7 @@ void put_apacket(apacket *p); void local_init(int port); -void local_connect(int port); +bool local_connect(int port); int local_connect_arbitrary_ports(int console_port, int adb_port, std::string* error); // USB host/client interface. diff --git a/adb/sysdeps/condition_variable.h b/adb/sysdeps/condition_variable.h new file mode 100644 index 000000000..117cd40ad --- /dev/null +++ b/adb/sysdeps/condition_variable.h @@ -0,0 +1,61 @@ +/* + * Copyright (C) 2016 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include "sysdeps/mutex.h" + +#if defined(_WIN32) + +#include + +#include + +// The prebuilt version of mingw we use doesn't support condition_variable. +// Therefore, implement our own using the Windows primitives. +// Put them directly into the std namespace, so that when they're actually available, the build +// breaks until they're removed. + +namespace std { + +class condition_variable { + public: + condition_variable() { + InitializeConditionVariable(&cond_); + } + + void wait(std::unique_lock& lock) { + std::mutex *m = lock.mutex(); + m->lock_count_--; + SleepConditionVariableCS(&cond_, m->native_handle(), INFINITE); + m->lock_count_++; + } + + void notify_one() { + WakeConditionVariable(&cond_); + } + + private: + CONDITION_VARIABLE cond_; + + DISALLOW_COPY_AND_ASSIGN(condition_variable); +}; + +} + +#endif // defined(_WIN32) diff --git a/adb/sysdeps/mutex.h b/adb/sysdeps/mutex.h index 73c9e6e06..226f7f194 100644 --- a/adb/sysdeps/mutex.h +++ b/adb/sysdeps/mutex.h @@ -1,5 +1,3 @@ -#pragma once - /* * Copyright (C) 2016 The Android Open Source Project * @@ -16,6 +14,7 @@ * limitations under the License. */ +#pragma once #if defined(_WIN32) #include @@ -35,34 +34,42 @@ namespace std { // CRITICAL_SECTION is recursive, so just wrap it in a Mutex-compatible class. class recursive_mutex { public: + typedef CRITICAL_SECTION* native_handle_type; + recursive_mutex() { - InitializeCriticalSection(&mutex_); + InitializeCriticalSection(&cs_); } ~recursive_mutex() { - DeleteCriticalSection(&mutex_); + DeleteCriticalSection(&cs_); } void lock() { - EnterCriticalSection(&mutex_); + EnterCriticalSection(&cs_); } bool try_lock() { - return TryEnterCriticalSection(&mutex_); + return TryEnterCriticalSection(&cs_); } void unlock() { - LeaveCriticalSection(&mutex_); + LeaveCriticalSection(&cs_); + } + + native_handle_type native_handle() { + return &cs_; } private: - CRITICAL_SECTION mutex_; + CRITICAL_SECTION cs_; DISALLOW_COPY_AND_ASSIGN(recursive_mutex); }; class mutex { public: + typedef CRITICAL_SECTION* native_handle_type; + mutex() { } @@ -97,11 +104,17 @@ class mutex { return true; } + native_handle_type native_handle() { + return mutex_.native_handle(); + } + private: recursive_mutex mutex_; size_t lock_count_ = 0; + + friend class condition_variable; }; } -#endif +#endif // defined(_WIN32) diff --git a/adb/sysdeps_test.cpp b/adb/sysdeps_test.cpp index 395d22d2a..740f28365 100644 --- a/adb/sysdeps_test.cpp +++ b/adb/sysdeps_test.cpp @@ -20,6 +20,8 @@ #include "adb_io.h" #include "sysdeps.h" +#include "sysdeps/condition_variable.h" +#include "sysdeps/mutex.h" static void increment_atomic_int(void* c) { sleep(1); @@ -245,7 +247,6 @@ TEST_F(sysdeps_poll, fd_count) { } } -#include "sysdeps/mutex.h" TEST(sysdeps_mutex, mutex_smoke) { static std::atomic finished(false); static std::mutex &m = *new std::mutex(); @@ -301,3 +302,21 @@ TEST(sysdeps_mutex, recursive_mutex_smoke) { m.lock(); m.unlock(); } + +TEST(sysdeps_condition_variable, smoke) { + static std::mutex &m = *new std::mutex; + static std::condition_variable &cond = *new std::condition_variable; + static volatile bool flag = false; + + std::unique_lock lock(m); + adb_thread_create([](void*) { + m.lock(); + flag = true; + cond.notify_one(); + m.unlock(); + }, nullptr); + + while (!flag) { + cond.wait(lock); + } +} diff --git a/adb/transport.cpp b/adb/transport.cpp index c31f655f6..ad63a6aab 100644 --- a/adb/transport.cpp +++ b/adb/transport.cpp @@ -950,6 +950,8 @@ int register_socket_transport(int s, const char *serial, int port, int local) { for (const auto& transport : pending_list) { if (transport->serial && strcmp(serial, transport->serial) == 0) { adb_mutex_unlock(&transport_lock); + VLOG(TRANSPORT) << "socket transport " << transport->serial + << " is already in pending_list and fails to register"; delete t; return -1; } @@ -958,6 +960,8 @@ int register_socket_transport(int s, const char *serial, int port, int local) { for (const auto& transport : transport_list) { if (transport->serial && strcmp(serial, transport->serial) == 0) { adb_mutex_unlock(&transport_lock); + VLOG(TRANSPORT) << "socket transport " << transport->serial + << " is already in transport_list and fails to register"; delete t; return -1; } @@ -990,8 +994,7 @@ atransport *find_transport(const char *serial) { void kick_all_tcp_devices() { adb_mutex_lock(&transport_lock); for (auto& t : transport_list) { - // TCP/IP devices have adb_port == 0. - if (t->type == kTransportLocal && t->adb_port == 0) { + if (t->IsTcpDevice()) { // Kicking breaks the read_transport thread of this transport out of any read, then // the read_transport thread will notify the main thread to make this transport // offline. Then the main thread will notify the write_transport thread to exit. diff --git a/adb/transport.h b/adb/transport.h index 35d7b505d..46d472b38 100644 --- a/adb/transport.h +++ b/adb/transport.h @@ -87,7 +87,22 @@ public: char* model = nullptr; char* device = nullptr; char* devpath = nullptr; - int adb_port = -1; // Use for emulators (local transport) + void SetLocalPortForEmulator(int port) { + CHECK_EQ(local_port_for_emulator_, -1); + local_port_for_emulator_ = port; + } + + bool GetLocalPortForEmulator(int* port) const { + if (type == kTransportLocal && local_port_for_emulator_ != -1) { + *port = local_port_for_emulator_; + return true; + } + return false; + } + + bool IsTcpDevice() const { + return type == kTransportLocal && local_port_for_emulator_ == -1; + } void* key = nullptr; unsigned char token[TOKEN_SIZE] = {}; @@ -128,6 +143,7 @@ public: bool MatchesTarget(const std::string& target) const; private: + int local_port_for_emulator_ = -1; bool kicked_ = false; void (*kick_func_)(atransport*) = nullptr; diff --git a/adb/transport_local.cpp b/adb/transport_local.cpp index 63d36c083..fc197159c 100644 --- a/adb/transport_local.cpp +++ b/adb/transport_local.cpp @@ -17,6 +17,8 @@ #define TRACE_TAG TRANSPORT #include "sysdeps.h" +#include "sysdeps/condition_variable.h" +#include "sysdeps/mutex.h" #include "transport.h" #include @@ -25,6 +27,8 @@ #include #include +#include + #include #include @@ -90,9 +94,9 @@ static int remote_write(apacket *p, atransport *t) return 0; } -void local_connect(int port) { +bool local_connect(int port) { std::string dummy; - local_connect_arbitrary_ports(port-1, port, &dummy); + return local_connect_arbitrary_ports(port-1, port, &dummy) == 0; } int local_connect_arbitrary_ports(int console_port, int adb_port, std::string* error) { @@ -126,18 +130,71 @@ int local_connect_arbitrary_ports(int console_port, int adb_port, std::string* e } #if ADB_HOST + +static void PollAllLocalPortsForEmulator() { + int port = DEFAULT_ADB_LOCAL_TRANSPORT_PORT; + int count = ADB_LOCAL_TRANSPORT_MAX; + + // Try to connect to any number of running emulator instances. + for ( ; count > 0; count--, port += 2 ) { + local_connect(port); + } +} + +// Retry the disconnected local port for 60 times, and sleep 1 second between two retries. +constexpr uint32_t LOCAL_PORT_RETRY_COUNT = 60; +constexpr uint32_t LOCAL_PORT_RETRY_INTERVAL_IN_MS = 1000; + +struct RetryPort { + int port; + uint32_t retry_count; +}; + +// Retry emulators just kicked. +static std::vector& retry_ports = *new std::vector; +std::mutex &retry_ports_lock = *new std::mutex; +std::condition_variable &retry_ports_cond = *new std::condition_variable; + static void client_socket_thread(void* x) { adb_thread_setname("client_socket_thread"); D("transport: client_socket_thread() starting"); + PollAllLocalPortsForEmulator(); while (true) { - int port = DEFAULT_ADB_LOCAL_TRANSPORT_PORT; - int count = ADB_LOCAL_TRANSPORT_MAX; - - // Try to connect to any number of running emulator instances. - for ( ; count > 0; count--, port += 2 ) { - local_connect(port); + std::vector ports; + // Collect retry ports. + { + std::unique_lock lock(retry_ports_lock); + while (retry_ports.empty()) { + retry_ports_cond.wait(lock); + } + retry_ports.swap(ports); + } + // Sleep here instead of the end of loop, because if we immediately try to reconnect + // the emulator just kicked, the adbd on the emulator may not have time to remove the + // just kicked transport. + adb_sleep_ms(LOCAL_PORT_RETRY_INTERVAL_IN_MS); + + // Try connecting retry ports. + std::vector next_ports; + for (auto& port : ports) { + VLOG(TRANSPORT) << "retry port " << port.port << ", last retry_count " + << port.retry_count; + if (local_connect(port.port)) { + VLOG(TRANSPORT) << "retry port " << port.port << " successfully"; + continue; + } + if (--port.retry_count > 0) { + next_ports.push_back(port); + } else { + VLOG(TRANSPORT) << "stop retrying port " << port.port; + } + } + + // Copy back left retry ports. + { + std::unique_lock lock(retry_ports_lock); + retry_ports.insert(retry_ports.end(), next_ports.begin(), next_ports.end()); } - sleep(1); } } @@ -346,17 +403,32 @@ static void remote_close(atransport *t) t->sfd = -1; adb_close(fd); } +#if ADB_HOST + int local_port; + if (t->GetLocalPortForEmulator(&local_port)) { + VLOG(TRANSPORT) << "remote_close, local_port = " << local_port; + std::unique_lock lock(retry_ports_lock); + RetryPort port; + port.port = local_port; + port.retry_count = LOCAL_PORT_RETRY_COUNT; + retry_ports.push_back(port); + retry_ports_cond.notify_one(); + } +#endif } #if ADB_HOST /* Only call this function if you already hold local_transports_lock. */ -atransport* find_emulator_transport_by_adb_port_locked(int adb_port) +static atransport* find_emulator_transport_by_adb_port_locked(int adb_port) { int i; for (i = 0; i < ADB_LOCAL_TRANSPORT_MAX; i++) { - if (local_transports[i] && local_transports[i]->adb_port == adb_port) { - return local_transports[i]; + int local_port; + if (local_transports[i] && local_transports[i]->GetLocalPortForEmulator(&local_port)) { + if (local_port == adb_port) { + return local_transports[i]; + } } } return NULL; @@ -403,13 +475,12 @@ int init_socket_transport(atransport *t, int s, int adb_port, int local) t->sync_token = 1; t->connection_state = kCsOffline; t->type = kTransportLocal; - t->adb_port = 0; #if ADB_HOST if (local) { adb_mutex_lock( &local_transports_lock ); { - t->adb_port = adb_port; + t->SetLocalPortForEmulator(adb_port); atransport* existing_transport = find_emulator_transport_by_adb_port_locked(adb_port); int index = get_available_local_transport_index_locked();