diff --git a/adb/fdevent.cpp b/adb/fdevent.cpp index 9776c1bec..cf441cf91 100644 --- a/adb/fdevent.cpp +++ b/adb/fdevent.cpp @@ -75,6 +75,7 @@ static std::atomic terminate_loop(false); static bool main_thread_valid; static uint64_t main_thread_id; +static bool run_needs_flush = false; static auto& run_queue_notify_fd = *new unique_fd(); static auto& run_queue_mutex = *new std::mutex(); static auto& run_queue GUARDED_BY(run_queue_mutex) = *new std::deque>(); @@ -317,7 +318,8 @@ static void fdevent_run_func(int fd, unsigned ev, void* /* userdata */) { PLOG(FATAL) << "failed to empty run queue notify fd"; } - fdevent_run_flush(); + // Mark that we need to flush, and then run it at the end of fdevent_loop. + run_needs_flush = true; } static void fdevent_run_setup() { @@ -378,6 +380,11 @@ void fdevent_loop() { g_pending_list.pop_front(); fdevent_call_fdfunc(fde); } + + if (run_needs_flush) { + fdevent_run_flush(); + run_needs_flush = false; + } } } diff --git a/adb/fdevent_test.cpp b/adb/fdevent_test.cpp index e3d5a356b..2f0ff1805 100644 --- a/adb/fdevent_test.cpp +++ b/adb/fdevent_test.cpp @@ -80,30 +80,7 @@ struct ThreadArg { TEST_F(FdeventTest, fdevent_terminate) { PrepareThread(); - - std::thread thread(fdevent_loop); - TerminateThread(thread); -} - -static void FdEventThreadFunc(ThreadArg* arg) { - std::vector read_fds; - std::vector write_fds; - - read_fds.push_back(arg->first_read_fd); - for (size_t i = 0; i < arg->middle_pipe_count; ++i) { - int fds[2]; - ASSERT_EQ(0, adb_socketpair(fds)); - read_fds.push_back(fds[0]); - write_fds.push_back(fds[1]); - } - write_fds.push_back(arg->last_write_fd); - - std::vector> fd_handlers; - for (size_t i = 0; i < read_fds.size(); ++i) { - fd_handlers.push_back(std::make_unique(read_fds[i], write_fds[i])); - } - - fdevent_loop(); + TerminateThread(); } TEST_F(FdeventTest, smoke) { @@ -122,7 +99,26 @@ TEST_F(FdeventTest, smoke) { int reader = fd_pair2[0]; PrepareThread(); - std::thread thread(FdEventThreadFunc, &thread_arg); + + std::vector> fd_handlers; + fdevent_run_on_main_thread([&thread_arg, &fd_handlers]() { + std::vector read_fds; + std::vector write_fds; + + read_fds.push_back(thread_arg.first_read_fd); + for (size_t i = 0; i < thread_arg.middle_pipe_count; ++i) { + int fds[2]; + ASSERT_EQ(0, adb_socketpair(fds)); + read_fds.push_back(fds[0]); + write_fds.push_back(fds[1]); + } + write_fds.push_back(thread_arg.last_write_fd); + + for (size_t i = 0; i < read_fds.size(); ++i) { + fd_handlers.push_back(std::make_unique(read_fds[i], write_fds[i])); + } + }); + WaitForFdeventLoop(); for (size_t i = 0; i < MESSAGE_LOOP_COUNT; ++i) { std::string read_buffer = MESSAGE; @@ -132,7 +128,10 @@ TEST_F(FdeventTest, smoke) { ASSERT_EQ(read_buffer, write_buffer); } - TerminateThread(thread); + fdevent_run_on_main_thread([&fd_handlers]() { fd_handlers.clear(); }); + WaitForFdeventLoop(); + + TerminateThread(); ASSERT_EQ(0, adb_close(writer)); ASSERT_EQ(0, adb_close(reader)); } @@ -143,7 +142,7 @@ struct InvalidFdArg { size_t* happened_event_count; }; -static void InvalidFdEventCallback(int fd, unsigned events, void* userdata) { +static void InvalidFdEventCallback(int, unsigned events, void* userdata) { InvalidFdArg* arg = reinterpret_cast(userdata); ASSERT_EQ(arg->expected_events, events); fdevent_remove(&arg->fde); @@ -179,7 +178,6 @@ TEST_F(FdeventTest, run_on_main_thread) { std::vector vec; PrepareThread(); - std::thread thread(fdevent_loop); // Block the main thread for a long time while we queue our callbacks. fdevent_run_on_main_thread([]() { @@ -194,7 +192,7 @@ TEST_F(FdeventTest, run_on_main_thread) { }); } - TerminateThread(thread); + TerminateThread(); ASSERT_EQ(1000000u, vec.size()); for (int i = 0; i < 1000000; ++i) { @@ -218,11 +216,8 @@ TEST_F(FdeventTest, run_on_main_thread_reentrant) { std::vector vec; PrepareThread(); - std::thread thread(fdevent_loop); - fdevent_run_on_main_thread(make_appender(&vec, 0)); - - TerminateThread(thread); + TerminateThread(); ASSERT_EQ(100u, vec.size()); for (int i = 0; i < 100; ++i) { diff --git a/adb/fdevent_test.h b/adb/fdevent_test.h index 1a2d41c6f..5a417e056 100644 --- a/adb/fdevent_test.h +++ b/adb/fdevent_test.h @@ -16,10 +16,31 @@ #include +#include +#include #include #include "socket.h" #include "sysdeps.h" +#include "sysdeps/chrono.h" + +static void WaitForFdeventLoop() { + // Sleep for a bit to make sure that network events have propagated. + std::this_thread::sleep_for(100ms); + + // fdevent_run_on_main_thread has a guaranteed ordering, and is guaranteed to happen after + // socket events, so as soon as our function is called, we know that we've processed all + // previous events. + std::mutex mutex; + std::condition_variable cv; + std::unique_lock lock(mutex); + fdevent_run_on_main_thread([&]() { + mutex.lock(); + mutex.unlock(); + cv.notify_one(); + }); + cv.wait(lock); +} class FdeventTest : public ::testing::Test { protected: @@ -49,6 +70,9 @@ class FdeventTest : public ::testing::Test { } dummy_socket->ready(dummy_socket); dummy = dummy_fds[0]; + + thread_ = std::thread([]() { fdevent_loop(); }); + WaitForFdeventLoop(); } size_t GetAdditionalLocalSocketCount() { @@ -56,10 +80,12 @@ class FdeventTest : public ::testing::Test { return 2; } - void TerminateThread(std::thread& thread) { + void TerminateThread() { fdevent_terminate_loop(); ASSERT_TRUE(WriteFdExactly(dummy, "", 1)); - thread.join(); + thread_.join(); ASSERT_EQ(0, adb_close(dummy)); } + + std::thread thread_; }; diff --git a/adb/socket_test.cpp b/adb/socket_test.cpp index f587fdb30..6c4a8b2c1 100644 --- a/adb/socket_test.cpp +++ b/adb/socket_test.cpp @@ -42,10 +42,6 @@ struct ThreadArg { class LocalSocketTest : public FdeventTest {}; -static void WaitForFdeventLoop() { - std::this_thread::sleep_for(100ms); -} - TEST_F(LocalSocketTest, smoke) { // Join two socketpairs with a chain of intermediate socketpairs. int first[2]; @@ -86,7 +82,6 @@ TEST_F(LocalSocketTest, smoke) { connect(prev_tail, end); PrepareThread(); - std::thread thread(fdevent_loop); for (size_t i = 0; i < MESSAGE_LOOP_COUNT; ++i) { std::string read_buffer = MESSAGE; @@ -102,7 +97,7 @@ TEST_F(LocalSocketTest, smoke) { // Wait until the local sockets are closed. WaitForFdeventLoop(); ASSERT_EQ(GetAdditionalLocalSocketCount(), fdevent_installed_count()); - TerminateThread(thread); + TerminateThread(); } struct CloseWithPacketArg { @@ -111,24 +106,39 @@ struct CloseWithPacketArg { int cause_close_fd; }; -static void CloseWithPacketThreadFunc(CloseWithPacketArg* arg) { - asocket* s = create_local_socket(arg->socket_fd); - ASSERT_TRUE(s != nullptr); - arg->bytes_written = 0; +static void CreateCloser(CloseWithPacketArg* arg) { + fdevent_run_on_main_thread([arg]() { + asocket* s = create_local_socket(arg->socket_fd); + ASSERT_TRUE(s != nullptr); + arg->bytes_written = 0; - std::string data; - data.resize(MAX_PAYLOAD); - arg->bytes_written += data.size(); - int ret = s->enqueue(s, std::move(data)); - ASSERT_EQ(1, ret); + // On platforms that implement sockets via underlying sockets (e.g. Wine), + // a socket can appear to be full, and then become available for writes + // again without read being called on the other end. Loop and sleep after + // each write to give the underlying implementation time to flush. + bool socket_filled = false; + for (int i = 0; i < 128; ++i) { + std::string data; + data.resize(MAX_PAYLOAD); + arg->bytes_written += data.size(); + int ret = s->enqueue(s, std::move(data)); + if (ret == 1) { + socket_filled = true; + break; + } + ASSERT_NE(-1, ret); - asocket* cause_close_s = create_local_socket(arg->cause_close_fd); - ASSERT_TRUE(cause_close_s != nullptr); - cause_close_s->peer = s; - s->peer = cause_close_s; - cause_close_s->ready(cause_close_s); + std::this_thread::sleep_for(250ms); + } + ASSERT_TRUE(socket_filled); - fdevent_loop(); + asocket* cause_close_s = create_local_socket(arg->cause_close_fd); + ASSERT_TRUE(cause_close_s != nullptr); + cause_close_s->peer = s; + s->peer = cause_close_s; + cause_close_s->ready(cause_close_s); + }); + WaitForFdeventLoop(); } // This test checks if we can close local socket in the following situation: @@ -145,9 +155,8 @@ TEST_F(LocalSocketTest, close_socket_with_packet) { arg.cause_close_fd = cause_close_fd[1]; PrepareThread(); - std::thread thread(CloseWithPacketThreadFunc, &arg); + CreateCloser(&arg); - WaitForFdeventLoop(); ASSERT_EQ(0, adb_close(cause_close_fd[0])); WaitForFdeventLoop(); @@ -156,7 +165,7 @@ TEST_F(LocalSocketTest, close_socket_with_packet) { WaitForFdeventLoop(); ASSERT_EQ(GetAdditionalLocalSocketCount(), fdevent_installed_count()); - TerminateThread(thread); + TerminateThread(); } // This test checks if we can read packets from a closing local socket. @@ -170,7 +179,7 @@ TEST_F(LocalSocketTest, read_from_closing_socket) { arg.cause_close_fd = cause_close_fd[1]; PrepareThread(); - std::thread thread(CloseWithPacketThreadFunc, &arg); + CreateCloser(&arg); WaitForFdeventLoop(); ASSERT_EQ(0, adb_close(cause_close_fd[0])); @@ -186,7 +195,7 @@ TEST_F(LocalSocketTest, read_from_closing_socket) { WaitForFdeventLoop(); ASSERT_EQ(GetAdditionalLocalSocketCount(), fdevent_installed_count()); - TerminateThread(thread); + TerminateThread(); } // This test checks if we can close local socket in the following situation: @@ -203,7 +212,7 @@ TEST_F(LocalSocketTest, write_error_when_having_packets) { arg.cause_close_fd = cause_close_fd[1]; PrepareThread(); - std::thread thread(CloseWithPacketThreadFunc, &arg); + CreateCloser(&arg); WaitForFdeventLoop(); EXPECT_EQ(2u + GetAdditionalLocalSocketCount(), fdevent_installed_count()); @@ -211,7 +220,7 @@ TEST_F(LocalSocketTest, write_error_when_having_packets) { WaitForFdeventLoop(); ASSERT_EQ(GetAdditionalLocalSocketCount(), fdevent_installed_count()); - TerminateThread(thread); + TerminateThread(); } // Ensure that if we fail to write output to an fd, we will still flush data coming from it. @@ -231,7 +240,6 @@ TEST_F(LocalSocketTest, flush_after_shutdown) { tail->ready(tail); PrepareThread(); - std::thread thread(fdevent_loop); EXPECT_TRUE(WriteFdExactly(head_fd[0], "foo", 3)); @@ -249,7 +257,7 @@ TEST_F(LocalSocketTest, flush_after_shutdown) { WaitForFdeventLoop(); ASSERT_EQ(GetAdditionalLocalSocketCount(), fdevent_installed_count()); - TerminateThread(thread); + TerminateThread(); } #if defined(__linux__) @@ -258,21 +266,10 @@ static void ClientThreadFunc() { std::string error; int fd = network_loopback_client(5038, SOCK_STREAM, &error); ASSERT_GE(fd, 0) << error; - std::this_thread::sleep_for(200ms); + std::this_thread::sleep_for(1s); ASSERT_EQ(0, adb_close(fd)); } -struct CloseRdHupSocketArg { - int socket_fd; -}; - -static void CloseRdHupSocketThreadFunc(CloseRdHupSocketArg* arg) { - asocket* s = create_local_socket(arg->socket_fd); - ASSERT_TRUE(s != nullptr); - - fdevent_loop(); -} - // This test checks if we can close sockets in CLOSE_WAIT state. TEST_F(LocalSocketTest, close_socket_in_CLOSE_WAIT_state) { std::string error; @@ -283,11 +280,13 @@ TEST_F(LocalSocketTest, close_socket_in_CLOSE_WAIT_state) { int accept_fd = adb_socket_accept(listen_fd, nullptr, nullptr); ASSERT_GE(accept_fd, 0); - CloseRdHupSocketArg arg; - arg.socket_fd = accept_fd; PrepareThread(); - std::thread thread(CloseRdHupSocketThreadFunc, &arg); + + fdevent_run_on_main_thread([accept_fd]() { + asocket* s = create_local_socket(accept_fd); + ASSERT_TRUE(s != nullptr); + }); WaitForFdeventLoop(); EXPECT_EQ(1u + GetAdditionalLocalSocketCount(), fdevent_installed_count()); @@ -297,7 +296,7 @@ TEST_F(LocalSocketTest, close_socket_in_CLOSE_WAIT_state) { WaitForFdeventLoop(); ASSERT_EQ(GetAdditionalLocalSocketCount(), fdevent_installed_count()); - TerminateThread(thread); + TerminateThread(); } #endif // defined(__linux__) diff --git a/adb/sockets.cpp b/adb/sockets.cpp index e05a3dbc6..0887e6f01 100644 --- a/adb/sockets.cpp +++ b/adb/sockets.cpp @@ -126,12 +126,12 @@ static SocketFlushResult local_socket_flush_incoming(asocket* s) { } else if (rc == -1 && errno == EAGAIN) { fdevent_add(&s->fde, FDE_WRITE); return SocketFlushResult::TryAgain; + } else { + // We failed to write, but it's possible that we can still read from the socket. + // Give that a try before giving up. + s->has_write_error = true; + break; } - - // We failed to write, but it's possible that we can still read from the socket. - // Give that a try before giving up. - s->has_write_error = true; - break; } // If we sent the last packet of a closing socket, we can now destroy it.