From fa30bf3932a53d98537f2c2bf878484fb81b4c9c Mon Sep 17 00:00:00 2001 From: Josh Gao Date: Thu, 29 Mar 2018 16:23:43 -0700 Subject: [PATCH 1/4] adb: guarantee that fdevent_run_on_main_thread happens last. Make it so that we handle run_on_main_thread calls after regular socket events, so that we can use it as a way to ensure we've processed all pending socket events. Test: adb_test Test: wine adb_test.exe Change-Id: Ic215c7fed19a8e1699e759970658b3775aa08c45 --- adb/fdevent.cpp | 9 ++++++++- adb/fdevent_test.h | 18 ++++++++++++++++++ adb/socket_test.cpp | 4 ---- 3 files changed, 26 insertions(+), 5 deletions(-) diff --git a/adb/fdevent.cpp b/adb/fdevent.cpp index 9776c1bec..cf441cf91 100644 --- a/adb/fdevent.cpp +++ b/adb/fdevent.cpp @@ -75,6 +75,7 @@ static std::atomic terminate_loop(false); static bool main_thread_valid; static uint64_t main_thread_id; +static bool run_needs_flush = false; static auto& run_queue_notify_fd = *new unique_fd(); static auto& run_queue_mutex = *new std::mutex(); static auto& run_queue GUARDED_BY(run_queue_mutex) = *new std::deque>(); @@ -317,7 +318,8 @@ static void fdevent_run_func(int fd, unsigned ev, void* /* userdata */) { PLOG(FATAL) << "failed to empty run queue notify fd"; } - fdevent_run_flush(); + // Mark that we need to flush, and then run it at the end of fdevent_loop. + run_needs_flush = true; } static void fdevent_run_setup() { @@ -378,6 +380,11 @@ void fdevent_loop() { g_pending_list.pop_front(); fdevent_call_fdfunc(fde); } + + if (run_needs_flush) { + fdevent_run_flush(); + run_needs_flush = false; + } } } diff --git a/adb/fdevent_test.h b/adb/fdevent_test.h index 1a2d41c6f..15ded21d0 100644 --- a/adb/fdevent_test.h +++ b/adb/fdevent_test.h @@ -21,6 +21,24 @@ #include "socket.h" #include "sysdeps.h" +static void WaitForFdeventLoop() { + // Sleep for a bit to make sure that network events have propagated. + std::this_thread::sleep_for(100ms); + + // fdevent_run_on_main_thread has a guaranteed ordering, and is guaranteed to happen after + // socket events, so as soon as our function is called, we know that we've processed all + // previous events. + std::mutex mutex; + std::condition_variable cv; + std::unique_lock lock(mutex); + fdevent_run_on_main_thread([&]() { + mutex.lock(); + mutex.unlock(); + cv.notify_one(); + }); + cv.wait(lock); +} + class FdeventTest : public ::testing::Test { protected: int dummy = -1; diff --git a/adb/socket_test.cpp b/adb/socket_test.cpp index f587fdb30..068c17543 100644 --- a/adb/socket_test.cpp +++ b/adb/socket_test.cpp @@ -42,10 +42,6 @@ struct ThreadArg { class LocalSocketTest : public FdeventTest {}; -static void WaitForFdeventLoop() { - std::this_thread::sleep_for(100ms); -} - TEST_F(LocalSocketTest, smoke) { // Join two socketpairs with a chain of intermediate socketpairs. int first[2]; From 7ab55713ccd194b9906f834c5f62df14716f6dec Mon Sep 17 00:00:00 2001 From: Josh Gao Date: Thu, 29 Mar 2018 16:27:13 -0700 Subject: [PATCH 2/4] adb: move ownership of the fdevent thread into FdeventTest. Previously, each of the tests was spawning the fdevent thread manually, in order to be able to set up listeners and such before running fdevent_loop. Now that we have a way to run arbitrary code on the fdevent thread, switch to having a generic fdevent thread and running setup code via fdevent_run_on_main_thread. Test: adb_test Test: wine adb_test.exe Change-Id: I517dbcbad31067b45087d9fbed67a75b75a75aec --- adb/fdevent_test.cpp | 61 +++++++++++++++++------------------ adb/fdevent_test.h | 12 +++++-- adb/socket_test.cpp | 75 +++++++++++++++++++------------------------- 3 files changed, 70 insertions(+), 78 deletions(-) diff --git a/adb/fdevent_test.cpp b/adb/fdevent_test.cpp index e3d5a356b..2f0ff1805 100644 --- a/adb/fdevent_test.cpp +++ b/adb/fdevent_test.cpp @@ -80,30 +80,7 @@ struct ThreadArg { TEST_F(FdeventTest, fdevent_terminate) { PrepareThread(); - - std::thread thread(fdevent_loop); - TerminateThread(thread); -} - -static void FdEventThreadFunc(ThreadArg* arg) { - std::vector read_fds; - std::vector write_fds; - - read_fds.push_back(arg->first_read_fd); - for (size_t i = 0; i < arg->middle_pipe_count; ++i) { - int fds[2]; - ASSERT_EQ(0, adb_socketpair(fds)); - read_fds.push_back(fds[0]); - write_fds.push_back(fds[1]); - } - write_fds.push_back(arg->last_write_fd); - - std::vector> fd_handlers; - for (size_t i = 0; i < read_fds.size(); ++i) { - fd_handlers.push_back(std::make_unique(read_fds[i], write_fds[i])); - } - - fdevent_loop(); + TerminateThread(); } TEST_F(FdeventTest, smoke) { @@ -122,7 +99,26 @@ TEST_F(FdeventTest, smoke) { int reader = fd_pair2[0]; PrepareThread(); - std::thread thread(FdEventThreadFunc, &thread_arg); + + std::vector> fd_handlers; + fdevent_run_on_main_thread([&thread_arg, &fd_handlers]() { + std::vector read_fds; + std::vector write_fds; + + read_fds.push_back(thread_arg.first_read_fd); + for (size_t i = 0; i < thread_arg.middle_pipe_count; ++i) { + int fds[2]; + ASSERT_EQ(0, adb_socketpair(fds)); + read_fds.push_back(fds[0]); + write_fds.push_back(fds[1]); + } + write_fds.push_back(thread_arg.last_write_fd); + + for (size_t i = 0; i < read_fds.size(); ++i) { + fd_handlers.push_back(std::make_unique(read_fds[i], write_fds[i])); + } + }); + WaitForFdeventLoop(); for (size_t i = 0; i < MESSAGE_LOOP_COUNT; ++i) { std::string read_buffer = MESSAGE; @@ -132,7 +128,10 @@ TEST_F(FdeventTest, smoke) { ASSERT_EQ(read_buffer, write_buffer); } - TerminateThread(thread); + fdevent_run_on_main_thread([&fd_handlers]() { fd_handlers.clear(); }); + WaitForFdeventLoop(); + + TerminateThread(); ASSERT_EQ(0, adb_close(writer)); ASSERT_EQ(0, adb_close(reader)); } @@ -143,7 +142,7 @@ struct InvalidFdArg { size_t* happened_event_count; }; -static void InvalidFdEventCallback(int fd, unsigned events, void* userdata) { +static void InvalidFdEventCallback(int, unsigned events, void* userdata) { InvalidFdArg* arg = reinterpret_cast(userdata); ASSERT_EQ(arg->expected_events, events); fdevent_remove(&arg->fde); @@ -179,7 +178,6 @@ TEST_F(FdeventTest, run_on_main_thread) { std::vector vec; PrepareThread(); - std::thread thread(fdevent_loop); // Block the main thread for a long time while we queue our callbacks. fdevent_run_on_main_thread([]() { @@ -194,7 +192,7 @@ TEST_F(FdeventTest, run_on_main_thread) { }); } - TerminateThread(thread); + TerminateThread(); ASSERT_EQ(1000000u, vec.size()); for (int i = 0; i < 1000000; ++i) { @@ -218,11 +216,8 @@ TEST_F(FdeventTest, run_on_main_thread_reentrant) { std::vector vec; PrepareThread(); - std::thread thread(fdevent_loop); - fdevent_run_on_main_thread(make_appender(&vec, 0)); - - TerminateThread(thread); + TerminateThread(); ASSERT_EQ(100u, vec.size()); for (int i = 0; i < 100; ++i) { diff --git a/adb/fdevent_test.h b/adb/fdevent_test.h index 15ded21d0..5a417e056 100644 --- a/adb/fdevent_test.h +++ b/adb/fdevent_test.h @@ -16,10 +16,13 @@ #include +#include +#include #include #include "socket.h" #include "sysdeps.h" +#include "sysdeps/chrono.h" static void WaitForFdeventLoop() { // Sleep for a bit to make sure that network events have propagated. @@ -67,6 +70,9 @@ class FdeventTest : public ::testing::Test { } dummy_socket->ready(dummy_socket); dummy = dummy_fds[0]; + + thread_ = std::thread([]() { fdevent_loop(); }); + WaitForFdeventLoop(); } size_t GetAdditionalLocalSocketCount() { @@ -74,10 +80,12 @@ class FdeventTest : public ::testing::Test { return 2; } - void TerminateThread(std::thread& thread) { + void TerminateThread() { fdevent_terminate_loop(); ASSERT_TRUE(WriteFdExactly(dummy, "", 1)); - thread.join(); + thread_.join(); ASSERT_EQ(0, adb_close(dummy)); } + + std::thread thread_; }; diff --git a/adb/socket_test.cpp b/adb/socket_test.cpp index 068c17543..f09e4b3df 100644 --- a/adb/socket_test.cpp +++ b/adb/socket_test.cpp @@ -82,7 +82,6 @@ TEST_F(LocalSocketTest, smoke) { connect(prev_tail, end); PrepareThread(); - std::thread thread(fdevent_loop); for (size_t i = 0; i < MESSAGE_LOOP_COUNT; ++i) { std::string read_buffer = MESSAGE; @@ -98,7 +97,7 @@ TEST_F(LocalSocketTest, smoke) { // Wait until the local sockets are closed. WaitForFdeventLoop(); ASSERT_EQ(GetAdditionalLocalSocketCount(), fdevent_installed_count()); - TerminateThread(thread); + TerminateThread(); } struct CloseWithPacketArg { @@ -107,24 +106,25 @@ struct CloseWithPacketArg { int cause_close_fd; }; -static void CloseWithPacketThreadFunc(CloseWithPacketArg* arg) { - asocket* s = create_local_socket(arg->socket_fd); - ASSERT_TRUE(s != nullptr); - arg->bytes_written = 0; +static void CreateCloser(CloseWithPacketArg* arg) { + fdevent_run_on_main_thread([arg]() { + asocket* s = create_local_socket(arg->socket_fd); + ASSERT_TRUE(s != nullptr); + arg->bytes_written = 0; - std::string data; - data.resize(MAX_PAYLOAD); - arg->bytes_written += data.size(); - int ret = s->enqueue(s, std::move(data)); - ASSERT_EQ(1, ret); + std::string data; + data.resize(MAX_PAYLOAD); + arg->bytes_written += data.size(); + int ret = s->enqueue(s, std::move(data)); + ASSERT_EQ(1, ret); - asocket* cause_close_s = create_local_socket(arg->cause_close_fd); - ASSERT_TRUE(cause_close_s != nullptr); - cause_close_s->peer = s; - s->peer = cause_close_s; - cause_close_s->ready(cause_close_s); - - fdevent_loop(); + asocket* cause_close_s = create_local_socket(arg->cause_close_fd); + ASSERT_TRUE(cause_close_s != nullptr); + cause_close_s->peer = s; + s->peer = cause_close_s; + cause_close_s->ready(cause_close_s); + }); + WaitForFdeventLoop(); } // This test checks if we can close local socket in the following situation: @@ -141,9 +141,8 @@ TEST_F(LocalSocketTest, close_socket_with_packet) { arg.cause_close_fd = cause_close_fd[1]; PrepareThread(); - std::thread thread(CloseWithPacketThreadFunc, &arg); + CreateCloser(&arg); - WaitForFdeventLoop(); ASSERT_EQ(0, adb_close(cause_close_fd[0])); WaitForFdeventLoop(); @@ -152,7 +151,7 @@ TEST_F(LocalSocketTest, close_socket_with_packet) { WaitForFdeventLoop(); ASSERT_EQ(GetAdditionalLocalSocketCount(), fdevent_installed_count()); - TerminateThread(thread); + TerminateThread(); } // This test checks if we can read packets from a closing local socket. @@ -166,7 +165,7 @@ TEST_F(LocalSocketTest, read_from_closing_socket) { arg.cause_close_fd = cause_close_fd[1]; PrepareThread(); - std::thread thread(CloseWithPacketThreadFunc, &arg); + CreateCloser(&arg); WaitForFdeventLoop(); ASSERT_EQ(0, adb_close(cause_close_fd[0])); @@ -182,7 +181,7 @@ TEST_F(LocalSocketTest, read_from_closing_socket) { WaitForFdeventLoop(); ASSERT_EQ(GetAdditionalLocalSocketCount(), fdevent_installed_count()); - TerminateThread(thread); + TerminateThread(); } // This test checks if we can close local socket in the following situation: @@ -199,7 +198,7 @@ TEST_F(LocalSocketTest, write_error_when_having_packets) { arg.cause_close_fd = cause_close_fd[1]; PrepareThread(); - std::thread thread(CloseWithPacketThreadFunc, &arg); + CreateCloser(&arg); WaitForFdeventLoop(); EXPECT_EQ(2u + GetAdditionalLocalSocketCount(), fdevent_installed_count()); @@ -207,7 +206,7 @@ TEST_F(LocalSocketTest, write_error_when_having_packets) { WaitForFdeventLoop(); ASSERT_EQ(GetAdditionalLocalSocketCount(), fdevent_installed_count()); - TerminateThread(thread); + TerminateThread(); } // Ensure that if we fail to write output to an fd, we will still flush data coming from it. @@ -227,7 +226,6 @@ TEST_F(LocalSocketTest, flush_after_shutdown) { tail->ready(tail); PrepareThread(); - std::thread thread(fdevent_loop); EXPECT_TRUE(WriteFdExactly(head_fd[0], "foo", 3)); @@ -245,7 +243,7 @@ TEST_F(LocalSocketTest, flush_after_shutdown) { WaitForFdeventLoop(); ASSERT_EQ(GetAdditionalLocalSocketCount(), fdevent_installed_count()); - TerminateThread(thread); + TerminateThread(); } #if defined(__linux__) @@ -254,21 +252,10 @@ static void ClientThreadFunc() { std::string error; int fd = network_loopback_client(5038, SOCK_STREAM, &error); ASSERT_GE(fd, 0) << error; - std::this_thread::sleep_for(200ms); + std::this_thread::sleep_for(1s); ASSERT_EQ(0, adb_close(fd)); } -struct CloseRdHupSocketArg { - int socket_fd; -}; - -static void CloseRdHupSocketThreadFunc(CloseRdHupSocketArg* arg) { - asocket* s = create_local_socket(arg->socket_fd); - ASSERT_TRUE(s != nullptr); - - fdevent_loop(); -} - // This test checks if we can close sockets in CLOSE_WAIT state. TEST_F(LocalSocketTest, close_socket_in_CLOSE_WAIT_state) { std::string error; @@ -279,11 +266,13 @@ TEST_F(LocalSocketTest, close_socket_in_CLOSE_WAIT_state) { int accept_fd = adb_socket_accept(listen_fd, nullptr, nullptr); ASSERT_GE(accept_fd, 0); - CloseRdHupSocketArg arg; - arg.socket_fd = accept_fd; PrepareThread(); - std::thread thread(CloseRdHupSocketThreadFunc, &arg); + + fdevent_run_on_main_thread([accept_fd]() { + asocket* s = create_local_socket(accept_fd); + ASSERT_TRUE(s != nullptr); + }); WaitForFdeventLoop(); EXPECT_EQ(1u + GetAdditionalLocalSocketCount(), fdevent_installed_count()); @@ -293,7 +282,7 @@ TEST_F(LocalSocketTest, close_socket_in_CLOSE_WAIT_state) { WaitForFdeventLoop(); ASSERT_EQ(GetAdditionalLocalSocketCount(), fdevent_installed_count()); - TerminateThread(thread); + TerminateThread(); } #endif // defined(__linux__) From 954e1280d7d6244c659896fb6c6b415e40865133 Mon Sep 17 00:00:00 2001 From: Josh Gao Date: Fri, 30 Mar 2018 13:56:24 -0700 Subject: [PATCH 3/4] adb: don't set has_write_error on success. Whoops. Test: adb_test Change-Id: I32123c51446a22d4423eef0753b0a0b00b500a90 --- adb/sockets.cpp | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/adb/sockets.cpp b/adb/sockets.cpp index e05a3dbc6..0887e6f01 100644 --- a/adb/sockets.cpp +++ b/adb/sockets.cpp @@ -126,12 +126,12 @@ static SocketFlushResult local_socket_flush_incoming(asocket* s) { } else if (rc == -1 && errno == EAGAIN) { fdevent_add(&s->fde, FDE_WRITE); return SocketFlushResult::TryAgain; + } else { + // We failed to write, but it's possible that we can still read from the socket. + // Give that a try before giving up. + s->has_write_error = true; + break; } - - // We failed to write, but it's possible that we can still read from the socket. - // Give that a try before giving up. - s->has_write_error = true; - break; } // If we sent the last packet of a closing socket, we can now destroy it. From 7651eb9ef743e92fb68897f81cb06dc8c4553504 Mon Sep 17 00:00:00 2001 From: Josh Gao Date: Fri, 30 Mar 2018 14:05:40 -0700 Subject: [PATCH 4/4] adb: try harder to fill our test sockets. On platforms that implement sockets via underlying sockets (e.g. Wine), a socket can appear to be full, and then become available for writes again without read being called on the other end. Add a sleep after each write to give the underlying implementation time to flush. This doesn't help us if the buffer size is smaller than MAX_PAYLOAD, but at least in the case of Wine, that doesn't seem to be the case. Test: adb_test Test: wine adb_test.exe Change-Id: Iff1ec14bbf318b9742ec7e2fb72e34e3d6bbe6ad --- adb/socket_test.cpp | 24 +++++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/adb/socket_test.cpp b/adb/socket_test.cpp index f09e4b3df..6c4a8b2c1 100644 --- a/adb/socket_test.cpp +++ b/adb/socket_test.cpp @@ -112,11 +112,25 @@ static void CreateCloser(CloseWithPacketArg* arg) { ASSERT_TRUE(s != nullptr); arg->bytes_written = 0; - std::string data; - data.resize(MAX_PAYLOAD); - arg->bytes_written += data.size(); - int ret = s->enqueue(s, std::move(data)); - ASSERT_EQ(1, ret); + // On platforms that implement sockets via underlying sockets (e.g. Wine), + // a socket can appear to be full, and then become available for writes + // again without read being called on the other end. Loop and sleep after + // each write to give the underlying implementation time to flush. + bool socket_filled = false; + for (int i = 0; i < 128; ++i) { + std::string data; + data.resize(MAX_PAYLOAD); + arg->bytes_written += data.size(); + int ret = s->enqueue(s, std::move(data)); + if (ret == 1) { + socket_filled = true; + break; + } + ASSERT_NE(-1, ret); + + std::this_thread::sleep_for(250ms); + } + ASSERT_TRUE(socket_filled); asocket* cause_close_s = create_local_socket(arg->cause_close_fd); ASSERT_TRUE(cause_close_s != nullptr);