Merge changes from topic 'adb_thread_safety'

* changes:
  adb: wait for devices to come up instead of sleeping for 3s.
  adb: initialize mDNS asynchronously.
  adb: add fdevent_run_on_main_thread.
This commit is contained in:
Josh Gao 2017-05-04 23:47:30 +00:00 committed by Gerrit Code Review
commit 9083ff1c24
12 changed files with 238 additions and 57 deletions

View file

@ -31,6 +31,8 @@
#include <time.h>
#include <chrono>
#include <condition_variable>
#include <mutex>
#include <string>
#include <thread>
#include <vector>
@ -48,6 +50,7 @@
#include "adb_io.h"
#include "adb_listeners.h"
#include "adb_utils.h"
#include "sysdeps/chrono.h"
#include "transport.h"
#if !ADB_HOST
@ -313,19 +316,15 @@ void parse_banner(const std::string& banner, atransport* t) {
if (type == "bootloader") {
D("setting connection_state to kCsBootloader");
t->SetConnectionState(kCsBootloader);
update_transports();
} else if (type == "device") {
D("setting connection_state to kCsDevice");
t->SetConnectionState(kCsDevice);
update_transports();
} else if (type == "recovery") {
D("setting connection_state to kCsRecovery");
t->SetConnectionState(kCsRecovery);
update_transports();
} else if (type == "sideload") {
D("setting connection_state to kCsSideload");
t->SetConnectionState(kCsSideload);
update_transports();
} else {
D("setting connection_state to kCsHost");
t->SetConnectionState(kCsHost);
@ -353,6 +352,8 @@ static void handle_new_connection(atransport* t, apacket* p) {
send_auth_request(t);
}
#endif
update_transports();
}
void handle_packet(apacket *p, atransport *t)
@ -1229,4 +1230,50 @@ int handle_host_request(const char* service, TransportType type,
return ret - 1;
return -1;
}
static auto& init_mutex = *new std::mutex();
static auto& init_cv = *new std::condition_variable();
static bool device_scan_complete = false;
static bool transports_ready = false;
void update_transport_status() {
bool result = iterate_transports([](const atransport* t) {
if (t->type == kTransportUsb && t->online != 1) {
return false;
}
return true;
});
D("update_transport_status: transports_ready = %s", result ? "true" : "false");
bool ready;
{
std::lock_guard<std::mutex> lock(init_mutex);
transports_ready = result;
ready = transports_ready && device_scan_complete;
}
if (ready) {
D("update_transport_status: notifying");
init_cv.notify_all();
}
}
void adb_notify_device_scan_complete() {
D("device scan complete");
{
std::lock_guard<std::mutex> lock(init_mutex);
device_scan_complete = true;
}
update_transport_status();
}
void adb_wait_for_device_initialization() {
std::unique_lock<std::mutex> lock(init_mutex);
init_cv.wait_for(lock, 3s, []() { return device_scan_complete && transports_ready; });
}
#endif // ADB_HOST

View file

@ -228,4 +228,18 @@ void SendConnectOnHost(atransport* t);
void parse_banner(const std::string&, atransport* t);
// On startup, the adb server needs to wait until all of the connected devices are ready.
// To do this, we need to know when the scan has identified all of the potential new transports, and
// when each transport becomes ready.
// TODO: Do this for mDNS as well, instead of just USB?
// We've found all of the transports we potentially care about.
void adb_notify_device_scan_complete();
// One or more transports have changed status, check to see if we're ready.
void update_transport_status();
// Wait until device scan has completed and every transport is ready, or a timeout elapses.
void adb_wait_for_device_initialization();
#endif

View file

@ -28,12 +28,15 @@
#include <sys/stat.h>
#include <sys/types.h>
#include <condition_variable>
#include <mutex>
#include <string>
#include <thread>
#include <vector>
#include <android-base/stringprintf.h>
#include <android-base/strings.h>
#include <android-base/thread_annotations.h>
#include <cutils/sockets.h>
#include "adb_io.h"
@ -177,9 +180,8 @@ int adb_connect(const std::string& service, std::string* error) {
} else {
fprintf(stderr, "* daemon started successfully\n");
}
// Give the server some time to start properly and detect devices.
std::this_thread::sleep_for(3s);
// fall through to _adb_connect
// The server will wait until it detects all of its connected devices before acking.
// Fall through to _adb_connect.
} else {
// If a server is already running, check its version matches.
int version = ADB_SERVER_VERSION - 1;

View file

@ -156,33 +156,38 @@ int adb_server_main(int is_daemon, const std::string& socket_spec, int ack_reply
}
#endif
// Inform our parent that we are up and running.
// Wait for the USB scan to complete before notifying the parent that we're up.
// We need to perform this in a thread, because we would otherwise block the event loop.
std::thread notify_thread([ack_reply_fd]() {
adb_wait_for_device_initialization();
// Any error output written to stderr now goes to adb.log. We could
// keep around a copy of the stderr fd and use that to write any errors
// encountered by the following code, but that is probably overkill.
// Any error output written to stderr now goes to adb.log. We could
// keep around a copy of the stderr fd and use that to write any errors
// encountered by the following code, but that is probably overkill.
#if defined(_WIN32)
const HANDLE ack_reply_handle = cast_int_to_handle(ack_reply_fd);
const CHAR ack[] = "OK\n";
const DWORD bytes_to_write = arraysize(ack) - 1;
DWORD written = 0;
if (!WriteFile(ack_reply_handle, ack, bytes_to_write, &written, NULL)) {
fatal("adb: cannot write ACK to handle 0x%p: %s", ack_reply_handle,
android::base::SystemErrorCodeToString(GetLastError()).c_str());
}
if (written != bytes_to_write) {
fatal("adb: cannot write %lu bytes of ACK: only wrote %lu bytes",
bytes_to_write, written);
}
CloseHandle(ack_reply_handle);
const HANDLE ack_reply_handle = cast_int_to_handle(ack_reply_fd);
const CHAR ack[] = "OK\n";
const DWORD bytes_to_write = arraysize(ack) - 1;
DWORD written = 0;
if (!WriteFile(ack_reply_handle, ack, bytes_to_write, &written, NULL)) {
fatal("adb: cannot write ACK to handle 0x%p: %s", ack_reply_handle,
android::base::SystemErrorCodeToString(GetLastError()).c_str());
}
if (written != bytes_to_write) {
fatal("adb: cannot write %lu bytes of ACK: only wrote %lu bytes", bytes_to_write,
written);
}
CloseHandle(ack_reply_handle);
#else
// TODO(danalbert): Can't use SendOkay because we're sending "OK\n", not
// "OKAY".
if (!android::base::WriteStringToFd("OK\n", ack_reply_fd)) {
fatal_errno("error writing ACK to fd %d", ack_reply_fd);
}
unix_close(ack_reply_fd);
// TODO(danalbert): Can't use SendOkay because we're sending "OK\n", not
// "OKAY".
if (!android::base::WriteStringToFd("OK\n", ack_reply_fd)) {
fatal_errno("error writing ACK to fd %d", ack_reply_fd);
}
unix_close(ack_reply_fd);
#endif
});
notify_thread.detach();
}
D("Event loop starting");

View file

@ -352,6 +352,8 @@ static void poll_for_devices() {
}
libusb_free_device_list(list, 1);
adb_notify_device_scan_complete();
std::this_thread::sleep_for(500ms);
}
}

View file

@ -26,15 +26,19 @@
#include <unistd.h>
#include <atomic>
#include <functional>
#include <list>
#include <mutex>
#include <unordered_map>
#include <vector>
#include <android-base/logging.h>
#include <android-base/stringprintf.h>
#include <android-base/thread_annotations.h>
#include "adb_io.h"
#include "adb_trace.h"
#include "adb_unique_fd.h"
#include "adb_utils.h"
#if !ADB_HOST
@ -75,6 +79,10 @@ static std::atomic<bool> terminate_loop(false);
static bool main_thread_valid;
static unsigned long main_thread_id;
static auto& run_queue_notify_fd = *new unique_fd();
static auto& run_queue_mutex = *new std::mutex();
static auto& run_queue GUARDED_BY(run_queue_mutex) = *new std::vector<std::function<void()>>();
void check_main_thread() {
if (main_thread_valid) {
CHECK_EQ(main_thread_id, adb_thread_id());
@ -112,8 +120,7 @@ static std::string dump_fde(const fdevent* fde) {
return android::base::StringPrintf("(fdevent %d %s)", fde->fd, state.c_str());
}
fdevent *fdevent_create(int fd, fd_func func, void *arg)
{
fdevent* fdevent_create(int fd, fd_func func, void* arg) {
check_main_thread();
fdevent *fde = (fdevent*) malloc(sizeof(fdevent));
if(fde == 0) return 0;
@ -122,8 +129,7 @@ fdevent *fdevent_create(int fd, fd_func func, void *arg)
return fde;
}
void fdevent_destroy(fdevent *fde)
{
void fdevent_destroy(fdevent* fde) {
check_main_thread();
if(fde == 0) return;
if(!(fde->state & FDE_CREATED)) {
@ -278,8 +284,7 @@ static void fdevent_process() {
}
}
static void fdevent_call_fdfunc(fdevent* fde)
{
static void fdevent_call_fdfunc(fdevent* fde) {
unsigned events = fde->events;
fde->events = 0;
CHECK(fde->state & FDE_PENDING);
@ -292,10 +297,7 @@ static void fdevent_call_fdfunc(fdevent* fde)
#include <sys/ioctl.h>
static void fdevent_subproc_event_func(int fd, unsigned ev,
void* /* userdata */)
{
static void fdevent_subproc_event_func(int fd, unsigned ev, void* /* userdata */) {
D("subproc handling on fd = %d, ev = %x", fd, ev);
CHECK_GE(fd, 0);
@ -342,8 +344,7 @@ static void fdevent_subproc_event_func(int fd, unsigned ev,
}
}
void fdevent_subproc_setup()
{
static void fdevent_subproc_setup() {
int s[2];
if(adb_socketpair(s)) {
@ -358,12 +359,63 @@ void fdevent_subproc_setup()
}
#endif // !ADB_HOST
void fdevent_loop()
{
static void fdevent_run_flush() REQUIRES(run_queue_mutex) {
for (auto& f : run_queue) {
f();
}
run_queue.clear();
}
static void fdevent_run_func(int fd, unsigned ev, void* /* userdata */) {
CHECK_GE(fd, 0);
CHECK(ev & FDE_READ);
char buf[1024];
// Empty the fd.
if (adb_read(fd, buf, sizeof(buf)) == -1) {
PLOG(FATAL) << "failed to empty run queue notify fd";
}
std::lock_guard<std::mutex> lock(run_queue_mutex);
fdevent_run_flush();
}
static void fdevent_run_setup() {
std::lock_guard<std::mutex> lock(run_queue_mutex);
CHECK(run_queue_notify_fd.get() == -1);
int s[2];
if (adb_socketpair(s) != 0) {
PLOG(FATAL) << "failed to create run queue notify socketpair";
}
run_queue_notify_fd.reset(s[0]);
fdevent* fde = fdevent_create(s[1], fdevent_run_func, nullptr);
CHECK(fde != nullptr);
fdevent_add(fde, FDE_READ);
fdevent_run_flush();
}
void fdevent_run_on_main_thread(std::function<void()> fn) {
std::lock_guard<std::mutex> lock(run_queue_mutex);
run_queue.push_back(std::move(fn));
// run_queue_notify_fd could still be -1 if we're called before fdevent has finished setting up.
// In that case, rely on the setup code to flush the queue without a notification being needed.
if (run_queue_notify_fd != -1) {
if (adb_write(run_queue_notify_fd.get(), "", 1) != 1) {
PLOG(FATAL) << "failed to write to run queue notify fd";
}
}
}
void fdevent_loop() {
set_main_thread();
#if !ADB_HOST
fdevent_subproc_setup();
#endif // !ADB_HOST
fdevent_run_setup();
while (true) {
if (terminate_loop) {
@ -393,6 +445,11 @@ size_t fdevent_installed_count() {
void fdevent_reset() {
g_poll_node_map.clear();
g_pending_list.clear();
std::lock_guard<std::mutex> lock(run_queue_mutex);
run_queue_notify_fd.reset();
run_queue.clear();
main_thread_valid = false;
terminate_loop = false;
}

View file

@ -20,6 +20,8 @@
#include <stddef.h>
#include <stdint.h> /* for int64_t */
#include <functional>
/* events that may be observed */
#define FDE_READ 0x0001
#define FDE_WRITE 0x0002
@ -78,6 +80,9 @@ void fdevent_loop();
void check_main_thread();
// Queue an operation to run on the main thread.
void fdevent_run_on_main_thread(std::function<void()> fn);
// The following functions are used only for tests.
void fdevent_terminate_loop();
size_t fdevent_installed_count();

View file

@ -173,3 +173,24 @@ TEST_F(FdeventTest, invalid_fd) {
std::thread thread(InvalidFdThreadFunc);
thread.join();
}
TEST_F(FdeventTest, run_on_main_thread) {
std::vector<int> vec;
PrepareThread();
std::thread thread(fdevent_loop);
for (int i = 0; i < 100; ++i) {
fdevent_run_on_main_thread([i, &vec]() {
check_main_thread();
vec.push_back(i);
});
}
TerminateThread(thread);
ASSERT_EQ(100u, vec.size());
for (int i = 0; i < 100; ++i) {
ASSERT_EQ(i, vec[i]);
}
}

View file

@ -53,11 +53,11 @@ class FdeventTest : public ::testing::Test {
size_t GetAdditionalLocalSocketCount() {
#if ADB_HOST
// dummy socket installed in PrepareThread()
return 1;
#else
// dummy socket and one more socket installed in fdevent_subproc_setup()
// dummy socket installed in PrepareThread() + fdevent_run_on_main_thread socket
return 2;
#else
// dummy socket + fdevent_run_on_main_thread + fdevent_subproc_setup() sockets
return 3;
#endif
}

View file

@ -400,8 +400,27 @@ asocket* create_device_tracker(void) {
return &tracker->socket;
}
// Check if all of the USB transports are connected.
bool iterate_transports(std::function<bool(const atransport*)> fn) {
std::lock_guard<std::mutex> lock(transport_lock);
for (const auto& t : transport_list) {
if (!fn(t)) {
return false;
}
}
for (const auto& t : pending_list) {
if (!fn(t)) {
return false;
}
}
return true;
}
// Call this function each time the transport list has changed.
void update_transports() {
update_transport_status();
// Notify `adb track-devices` clients.
std::string transports = list_transports(false);
device_tracker* tracker = device_tracker_list;

View file

@ -198,6 +198,10 @@ atransport* acquire_one_transport(TransportType type, const char* serial, bool*
void kick_transport(atransport* t);
void update_transports(void);
// Iterates across all of the current and pending transports.
// Stops iteration and returns false if fn returns false, otherwise returns true.
bool iterate_transports(std::function<bool(const atransport*)> fn);
void init_transport_registration(void);
void init_mdns_transport_discovery(void);
std::string list_transports(bool long_listing);

View file

@ -24,6 +24,8 @@
#include <arpa/inet.h>
#endif
#include <thread>
#include <android-base/stringprintf.h>
#include <dns_sd.h>
@ -262,19 +264,22 @@ static void DNSSD_API register_mdns_transport(DNSServiceRef sdRef,
}
}
void init_mdns_transport_discovery(void) {
DNSServiceErrorType errorCode =
DNSServiceBrowse(&service_ref, 0, 0, kADBServiceType, nullptr,
register_mdns_transport, nullptr);
void init_mdns_transport_discovery_thread(void) {
DNSServiceErrorType errorCode = DNSServiceBrowse(&service_ref, 0, 0, kADBServiceType, nullptr,
register_mdns_transport, nullptr);
if (errorCode != kDNSServiceErr_NoError) {
D("Got %d initiating mDNS browse.", errorCode);
return;
}
fdevent_install(&service_ref_fde,
adb_DNSServiceRefSockFD(service_ref),
pump_service_ref,
&service_ref);
fdevent_set(&service_ref_fde, FDE_READ);
fdevent_run_on_main_thread([]() {
fdevent_install(&service_ref_fde, adb_DNSServiceRefSockFD(service_ref), pump_service_ref,
&service_ref);
fdevent_set(&service_ref_fde, FDE_READ);
});
}
void init_mdns_transport_discovery(void) {
std::thread(init_mdns_transport_discovery_thread).detach();
}