diff --git a/adb/Android.mk b/adb/Android.mk index b792bd44b..e0997eafd 100644 --- a/adb/Android.mk +++ b/adb/Android.mk @@ -50,6 +50,7 @@ LIBADB_SRC_FILES := \ adb_listeners.cpp \ adb_trace.cpp \ adb_utils.cpp \ + fdevent.cpp \ sockets.cpp \ transport.cpp \ transport_local.cpp \ @@ -58,6 +59,8 @@ LIBADB_SRC_FILES := \ LIBADB_TEST_SRCS := \ adb_io_test.cpp \ adb_utils_test.cpp \ + fdevent_test.cpp \ + socket_test.cpp \ sysdeps_test.cpp \ transport_test.cpp \ @@ -75,12 +78,10 @@ LIBADB_windows_CFLAGS := \ $(ADB_COMMON_windows_CFLAGS) \ LIBADB_darwin_SRC_FILES := \ - fdevent.cpp \ get_my_path_darwin.cpp \ usb_osx.cpp \ LIBADB_linux_SRC_FILES := \ - fdevent.cpp \ get_my_path_linux.cpp \ usb_linux.cpp \ @@ -88,14 +89,6 @@ LIBADB_windows_SRC_FILES := \ sysdeps_win32.cpp \ usb_windows.cpp \ -LIBADB_TEST_linux_SRCS := \ - fdevent_test.cpp \ - socket_test.cpp \ - -LIBADB_TEST_darwin_SRCS := \ - fdevent_test.cpp \ - socket_test.cpp \ - LIBADB_TEST_windows_SRCS := \ sysdeps_win32_test.cpp \ diff --git a/adb/adb_utils.cpp b/adb/adb_utils.cpp index 8a16e51c0..26e376ce6 100644 --- a/adb/adb_utils.cpp +++ b/adb/adb_utils.cpp @@ -213,6 +213,7 @@ std::string perror_str(const char* msg) { } #if !defined(_WIN32) +// Windows version provided in sysdeps_win32.cpp bool set_file_block_mode(int fd, bool block) { int flags = fcntl(fd, F_GETFL, 0); if (flags == -1) { diff --git a/adb/fdevent.cpp b/adb/fdevent.cpp index 386f22186..902548ec5 100644 --- a/adb/fdevent.cpp +++ b/adb/fdevent.cpp @@ -21,12 +21,11 @@ #include "fdevent.h" #include -#include #include #include -#include #include +#include #include #include #include @@ -54,7 +53,7 @@ int SHELL_EXIT_NOTIFY_FD = -1; struct PollNode { fdevent* fde; - ::pollfd pollfd; + adb_pollfd pollfd; PollNode(fdevent* fde) : fde(fde) { memset(&pollfd, 0, sizeof(pollfd)); @@ -72,18 +71,19 @@ struct PollNode { // That's why we don't need a lock for fdevent. static auto& g_poll_node_map = *new std::unordered_map(); static auto& g_pending_list = *new std::list(); +static std::atomic terminate_loop(false); static bool main_thread_valid; -static pthread_t main_thread; +static unsigned long main_thread_id; static void check_main_thread() { if (main_thread_valid) { - CHECK_NE(0, pthread_equal(main_thread, pthread_self())); + CHECK_EQ(main_thread_id, adb_thread_id()); } } static void set_main_thread() { main_thread_valid = true; - main_thread = pthread_self(); + main_thread_id = adb_thread_id(); } static std::string dump_fde(const fdevent* fde) { @@ -217,7 +217,7 @@ void fdevent_del(fdevent* fde, unsigned events) { fdevent_set(fde, (fde->state & FDE_EVENTMASK) & ~events); } -static std::string dump_pollfds(const std::vector& pollfds) { +static std::string dump_pollfds(const std::vector& pollfds) { std::string result; for (const auto& pollfd : pollfds) { std::string op; @@ -233,13 +233,13 @@ static std::string dump_pollfds(const std::vector& pollfds) { } static void fdevent_process() { - std::vector pollfds; + std::vector pollfds; for (const auto& pair : g_poll_node_map) { pollfds.push_back(pair.second.pollfd); } CHECK_GT(pollfds.size(), 0u); D("poll(), pollfds = %s", dump_pollfds(pollfds).c_str()); - int ret = TEMP_FAILURE_RETRY(poll(&pollfds[0], pollfds.size(), -1)); + int ret = adb_poll(&pollfds[0], pollfds.size(), -1); if (ret == -1) { PLOG(ERROR) << "poll(), ret = " << ret; return; @@ -289,6 +289,9 @@ static void fdevent_call_fdfunc(fdevent* fde) } #if !ADB_HOST + +#include + static void fdevent_subproc_event_func(int fd, unsigned ev, void* /* userdata */) { @@ -363,6 +366,10 @@ void fdevent_loop() #endif // !ADB_HOST while (true) { + if (terminate_loop) { + return; + } + D("--- --- waiting for events"); fdevent_process(); @@ -375,6 +382,10 @@ void fdevent_loop() } } +void fdevent_terminate_loop() { + terminate_loop = true; +} + size_t fdevent_installed_count() { return g_poll_node_map.size(); } @@ -383,4 +394,5 @@ void fdevent_reset() { g_poll_node_map.clear(); g_pending_list.clear(); main_thread_valid = false; + terminate_loop = false; } diff --git a/adb/fdevent.h b/adb/fdevent.h index 657fde5e9..207f9b702 100644 --- a/adb/fdevent.h +++ b/adb/fdevent.h @@ -76,9 +76,9 @@ void fdevent_set_timeout(fdevent *fde, int64_t timeout_ms); */ void fdevent_loop(); -// For debugging only. +// The following functions are used only for tests. +void fdevent_terminate_loop(); size_t fdevent_installed_count(); -// For debugging only. void fdevent_reset(); #endif diff --git a/adb/fdevent_test.cpp b/adb/fdevent_test.cpp index 7fe3d37d2..c933ed528 100644 --- a/adb/fdevent_test.cpp +++ b/adb/fdevent_test.cpp @@ -18,15 +18,13 @@ #include -#include -#include - #include #include #include #include #include "adb_io.h" +#include "fdevent_test.h" class FdHandler { public: @@ -48,7 +46,7 @@ class FdHandler { if (events & FDE_READ) { ASSERT_EQ(fd, handler->read_fd_); char c; - ASSERT_EQ(1, read(fd, &c, 1)); + ASSERT_EQ(1, adb_read(fd, &c, 1)); handler->queue_.push(c); fdevent_add(&handler->write_fde_, FDE_WRITE); } @@ -57,7 +55,7 @@ class FdHandler { ASSERT_FALSE(handler->queue_.empty()); char c = handler->queue_.front(); handler->queue_.pop(); - ASSERT_EQ(1, write(fd, &c, 1)); + ASSERT_EQ(1, adb_write(fd, &c, 1)); if (handler->queue_.empty()) { fdevent_del(&handler->write_fde_, FDE_WRITE); } @@ -72,29 +70,19 @@ class FdHandler { std::queue queue_; }; -static void signal_handler(int) { - pthread_exit(nullptr); -} - -class FdeventTest : public ::testing::Test { - protected: - static void SetUpTestCase() { - ASSERT_NE(SIG_ERR, signal(SIGUSR1, signal_handler)); - ASSERT_NE(SIG_ERR, signal(SIGPIPE, SIG_IGN)); - } - - virtual void SetUp() { - fdevent_reset(); - ASSERT_EQ(0u, fdevent_installed_count()); - } -}; - struct ThreadArg { int first_read_fd; int last_write_fd; size_t middle_pipe_count; }; +TEST_F(FdeventTest, fdevent_terminate) { + adb_thread_t thread; + PrepareThread(); + ASSERT_TRUE(adb_thread_create([](void*) { fdevent_loop(); }, nullptr, &thread)); + TerminateThread(thread); +} + static void FdEventThreadFunc(ThreadArg* arg) { std::vector read_fds; std::vector write_fds; @@ -102,7 +90,7 @@ static void FdEventThreadFunc(ThreadArg* arg) { read_fds.push_back(arg->first_read_fd); for (size_t i = 0; i < arg->middle_pipe_count; ++i) { int fds[2]; - ASSERT_EQ(0, pipe(fds)); + ASSERT_EQ(0, adb_socketpair(fds)); read_fds.push_back(fds[0]); write_fds.push_back(fds[1]); } @@ -122,9 +110,9 @@ TEST_F(FdeventTest, smoke) { const std::string MESSAGE = "fdevent_test"; int fd_pair1[2]; int fd_pair2[2]; - ASSERT_EQ(0, pipe(fd_pair1)); - ASSERT_EQ(0, pipe(fd_pair2)); - pthread_t thread; + ASSERT_EQ(0, adb_socketpair(fd_pair1)); + ASSERT_EQ(0, adb_socketpair(fd_pair2)); + adb_thread_t thread; ThreadArg thread_arg; thread_arg.first_read_fd = fd_pair1[0]; thread_arg.last_write_fd = fd_pair2[1]; @@ -132,9 +120,9 @@ TEST_F(FdeventTest, smoke) { int writer = fd_pair1[1]; int reader = fd_pair2[0]; - ASSERT_EQ(0, pthread_create(&thread, nullptr, - reinterpret_cast(FdEventThreadFunc), - &thread_arg)); + PrepareThread(); + ASSERT_TRUE(adb_thread_create(reinterpret_cast(FdEventThreadFunc), &thread_arg, + &thread)); for (size_t i = 0; i < MESSAGE_LOOP_COUNT; ++i) { std::string read_buffer = MESSAGE; @@ -144,10 +132,9 @@ TEST_F(FdeventTest, smoke) { ASSERT_EQ(read_buffer, write_buffer); } - ASSERT_EQ(0, pthread_kill(thread, SIGUSR1)); - ASSERT_EQ(0, pthread_join(thread, nullptr)); - ASSERT_EQ(0, close(writer)); - ASSERT_EQ(0, close(reader)); + TerminateThread(thread); + ASSERT_EQ(0, adb_close(writer)); + ASSERT_EQ(0, adb_close(reader)); } struct InvalidFdArg { @@ -161,7 +148,7 @@ static void InvalidFdEventCallback(int fd, unsigned events, void* userdata) { ASSERT_EQ(arg->expected_events, events); fdevent_remove(&arg->fde); if (++*(arg->happened_event_count) == 2) { - pthread_exit(nullptr); + fdevent_terminate_loop(); } } @@ -184,9 +171,7 @@ static void InvalidFdThreadFunc(void*) { } TEST_F(FdeventTest, invalid_fd) { - pthread_t thread; - ASSERT_EQ(0, pthread_create(&thread, nullptr, - reinterpret_cast(InvalidFdThreadFunc), - nullptr)); - ASSERT_EQ(0, pthread_join(thread, nullptr)); + adb_thread_t thread; + ASSERT_TRUE(adb_thread_create(InvalidFdThreadFunc, nullptr, &thread)); + ASSERT_TRUE(adb_thread_join(thread)); } diff --git a/adb/fdevent_test.h b/adb/fdevent_test.h new file mode 100644 index 000000000..c853bceec --- /dev/null +++ b/adb/fdevent_test.h @@ -0,0 +1,58 @@ +/* + * Copyright (C) 2016 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include "socket.h" +#include "sysdeps.h" + +class FdeventTest : public ::testing::Test { + protected: + int dummy = -1; + + static void SetUpTestCase() { +#if !defined(_WIN32) + ASSERT_NE(SIG_ERR, signal(SIGPIPE, SIG_IGN)); +#endif + } + + void SetUp() override { + fdevent_reset(); + ASSERT_EQ(0u, fdevent_installed_count()); + } + + // Register a dummy socket used to wake up the fdevent loop to tell it to die. + void PrepareThread() { + int dummy_fds[2]; + if (adb_socketpair(dummy_fds) != 0) { + FAIL() << "failed to create socketpair: " << strerror(errno); + } + + asocket* dummy_socket = create_local_socket(dummy_fds[1]); + if (!dummy_socket) { + FAIL() << "failed to create local socket: " << strerror(errno); + } + dummy_socket->ready(dummy_socket); + dummy = dummy_fds[0]; + } + + void TerminateThread(adb_thread_t thread) { + fdevent_terminate_loop(); + ASSERT_TRUE(WriteFdExactly(dummy, "", 1)); + ASSERT_TRUE(adb_thread_join(thread)); + ASSERT_EQ(0, adb_close(dummy)); + } +}; diff --git a/adb/socket_test.cpp b/adb/socket_test.cpp index 03cab64a5..471ca09e4 100644 --- a/adb/socket_test.cpp +++ b/adb/socket_test.cpp @@ -18,119 +18,89 @@ #include +#include #include #include #include #include -#include -#include #include #include "adb.h" #include "adb_io.h" +#include "fdevent_test.h" #include "socket.h" #include "sysdeps.h" -static void signal_handler(int) { - ASSERT_EQ(1u, fdevent_installed_count()); - pthread_exit(nullptr); -} - -// On host, register a dummy socket, so fdevet_loop() will not abort when previously -// registered local sockets are all closed. On device, fdevent_subproc_setup() installs -// one fdevent which can be considered as dummy socket. -static void InstallDummySocket() { -#if ADB_HOST - int dummy_fds[2]; - ASSERT_EQ(0, pipe(dummy_fds)); - asocket* dummy_socket = create_local_socket(dummy_fds[0]); - ASSERT_TRUE(dummy_socket != nullptr); - dummy_socket->ready(dummy_socket); -#endif -} - struct ThreadArg { int first_read_fd; int last_write_fd; size_t middle_pipe_count; }; -static void FdEventThreadFunc(ThreadArg* arg) { - std::vector read_fds; - std::vector write_fds; +class LocalSocketTest : public FdeventTest {}; - read_fds.push_back(arg->first_read_fd); - for (size_t i = 0; i < arg->middle_pipe_count; ++i) { - int fds[2]; - ASSERT_EQ(0, adb_socketpair(fds)); - read_fds.push_back(fds[0]); - write_fds.push_back(fds[1]); - } - write_fds.push_back(arg->last_write_fd); - - for (size_t i = 0; i < read_fds.size(); ++i) { - asocket* reader = create_local_socket(read_fds[i]); - ASSERT_TRUE(reader != nullptr); - asocket* writer = create_local_socket(write_fds[i]); - ASSERT_TRUE(writer != nullptr); - reader->peer = writer; - writer->peer = reader; - reader->ready(reader); - } - - InstallDummySocket(); +static void FdEventThreadFunc(void*) { fdevent_loop(); } -class LocalSocketTest : public ::testing::Test { - protected: - static void SetUpTestCase() { - ASSERT_NE(SIG_ERR, signal(SIGUSR1, signal_handler)); - ASSERT_NE(SIG_ERR, signal(SIGPIPE, SIG_IGN)); - } - - virtual void SetUp() { - fdevent_reset(); - ASSERT_EQ(0u, fdevent_installed_count()); - } -}; - TEST_F(LocalSocketTest, smoke) { - const size_t PIPE_COUNT = 100; - const size_t MESSAGE_LOOP_COUNT = 100; + // Join two socketpairs with a chain of intermediate socketpairs. + int first[2]; + std::vector> intermediates; + int last[2]; + + constexpr size_t INTERMEDIATE_COUNT = 50; + constexpr size_t MESSAGE_LOOP_COUNT = 100; const std::string MESSAGE = "socket_test"; - int fd_pair1[2]; - int fd_pair2[2]; - ASSERT_EQ(0, adb_socketpair(fd_pair1)); - ASSERT_EQ(0, adb_socketpair(fd_pair2)); - pthread_t thread; - ThreadArg thread_arg; - thread_arg.first_read_fd = fd_pair1[0]; - thread_arg.last_write_fd = fd_pair2[1]; - thread_arg.middle_pipe_count = PIPE_COUNT; - int writer = fd_pair1[1]; - int reader = fd_pair2[0]; - ASSERT_EQ(0, pthread_create(&thread, nullptr, - reinterpret_cast(FdEventThreadFunc), - &thread_arg)); + intermediates.resize(INTERMEDIATE_COUNT); + ASSERT_EQ(0, adb_socketpair(first)) << strerror(errno); + ASSERT_EQ(0, adb_socketpair(last)) << strerror(errno); + asocket* prev_tail = create_local_socket(first[1]); + ASSERT_NE(nullptr, prev_tail); + + auto connect = [](asocket* tail, asocket* head) { + tail->peer = head; + head->peer = tail; + tail->ready(tail); + }; + + for (auto& intermediate : intermediates) { + ASSERT_EQ(0, adb_socketpair(intermediate.data())) << strerror(errno); + + asocket* head = create_local_socket(intermediate[0]); + ASSERT_NE(nullptr, head); + + asocket* tail = create_local_socket(intermediate[1]); + ASSERT_NE(nullptr, tail); + + connect(prev_tail, head); + prev_tail = tail; + } + + asocket* end = create_local_socket(last[0]); + ASSERT_NE(nullptr, end); + connect(prev_tail, end); + + PrepareThread(); + adb_thread_t thread; + ASSERT_TRUE(adb_thread_create(FdEventThreadFunc, nullptr, &thread)); - usleep(1000); for (size_t i = 0; i < MESSAGE_LOOP_COUNT; ++i) { std::string read_buffer = MESSAGE; std::string write_buffer(MESSAGE.size(), 'a'); - ASSERT_TRUE(WriteFdExactly(writer, read_buffer.c_str(), read_buffer.size())); - ASSERT_TRUE(ReadFdExactly(reader, &write_buffer[0], write_buffer.size())); + ASSERT_TRUE(WriteFdExactly(first[0], &read_buffer[0], read_buffer.size())); + ASSERT_TRUE(ReadFdExactly(last[1], &write_buffer[0], write_buffer.size())); ASSERT_EQ(read_buffer, write_buffer); } - ASSERT_EQ(0, adb_close(writer)); - ASSERT_EQ(0, adb_close(reader)); - // Wait until the local sockets are closed. - sleep(1); - ASSERT_EQ(0, pthread_kill(thread, SIGUSR1)); - ASSERT_EQ(0, pthread_join(thread, nullptr)); + ASSERT_EQ(0, adb_close(first[0])); + ASSERT_EQ(0, adb_close(last[1])); + + // Wait until the local sockets are closed. + adb_sleep_ms(100); + TerminateThread(thread); } struct CloseWithPacketArg { @@ -160,7 +130,6 @@ static void CloseWithPacketThreadFunc(CloseWithPacketArg* arg) { s->peer = cause_close_s; cause_close_s->ready(cause_close_s); - InstallDummySocket(); fdevent_loop(); } @@ -176,21 +145,19 @@ TEST_F(LocalSocketTest, close_socket_with_packet) { CloseWithPacketArg arg; arg.socket_fd = socket_fd[1]; arg.cause_close_fd = cause_close_fd[1]; - pthread_t thread; - ASSERT_EQ(0, pthread_create(&thread, nullptr, - reinterpret_cast(CloseWithPacketThreadFunc), - &arg)); - // Wait until the fdevent_loop() starts. - sleep(1); - ASSERT_EQ(0, adb_close(cause_close_fd[0])); - sleep(1); - ASSERT_EQ(2u, fdevent_installed_count()); - ASSERT_EQ(0, adb_close(socket_fd[0])); - // Wait until the socket is closed. - sleep(1); - ASSERT_EQ(0, pthread_kill(thread, SIGUSR1)); - ASSERT_EQ(0, pthread_join(thread, nullptr)); + PrepareThread(); + adb_thread_t thread; + ASSERT_TRUE(adb_thread_create(reinterpret_cast(CloseWithPacketThreadFunc), + &arg, &thread)); + // Wait until the fdevent_loop() starts. + adb_sleep_ms(100); + ASSERT_EQ(0, adb_close(cause_close_fd[0])); + adb_sleep_ms(100); + EXPECT_EQ(2u, fdevent_installed_count()); + ASSERT_EQ(0, adb_close(socket_fd[0])); + + TerminateThread(thread); } // This test checks if we can read packets from a closing local socket. @@ -203,26 +170,23 @@ TEST_F(LocalSocketTest, read_from_closing_socket) { arg.socket_fd = socket_fd[1]; arg.cause_close_fd = cause_close_fd[1]; - pthread_t thread; - ASSERT_EQ(0, pthread_create(&thread, nullptr, - reinterpret_cast(CloseWithPacketThreadFunc), - &arg)); + PrepareThread(); + adb_thread_t thread; + ASSERT_TRUE(adb_thread_create(reinterpret_cast(CloseWithPacketThreadFunc), + &arg, &thread)); // Wait until the fdevent_loop() starts. - sleep(1); + adb_sleep_ms(100); ASSERT_EQ(0, adb_close(cause_close_fd[0])); - sleep(1); - ASSERT_EQ(2u, fdevent_installed_count()); + adb_sleep_ms(100); + EXPECT_EQ(2u, fdevent_installed_count()); // Verify if we can read successfully. std::vector buf(arg.bytes_written); + ASSERT_NE(0u, arg.bytes_written); ASSERT_EQ(true, ReadFdExactly(socket_fd[0], buf.data(), buf.size())); ASSERT_EQ(0, adb_close(socket_fd[0])); - // Wait until the socket is closed. - sleep(1); - - ASSERT_EQ(0, pthread_kill(thread, SIGUSR1)); - ASSERT_EQ(0, pthread_join(thread, nullptr)); + TerminateThread(thread); } // This test checks if we can close local socket in the following situation: @@ -238,20 +202,17 @@ TEST_F(LocalSocketTest, write_error_when_having_packets) { arg.socket_fd = socket_fd[1]; arg.cause_close_fd = cause_close_fd[1]; - pthread_t thread; - ASSERT_EQ(0, pthread_create(&thread, nullptr, - reinterpret_cast(CloseWithPacketThreadFunc), - &arg)); + PrepareThread(); + adb_thread_t thread; + ASSERT_TRUE(adb_thread_create(reinterpret_cast(CloseWithPacketThreadFunc), + &arg, &thread)); + // Wait until the fdevent_loop() starts. - sleep(1); - ASSERT_EQ(3u, fdevent_installed_count()); + adb_sleep_ms(100); + EXPECT_EQ(3u, fdevent_installed_count()); ASSERT_EQ(0, adb_close(socket_fd[0])); - // Wait until the socket is closed. - sleep(1); - - ASSERT_EQ(0, pthread_kill(thread, SIGUSR1)); - ASSERT_EQ(0, pthread_join(thread, nullptr)); + TerminateThread(thread); } #if defined(__linux__) @@ -260,50 +221,52 @@ static void ClientThreadFunc() { std::string error; int fd = network_loopback_client(5038, SOCK_STREAM, &error); ASSERT_GE(fd, 0) << error; - sleep(2); + adb_sleep_ms(200); ASSERT_EQ(0, adb_close(fd)); } struct CloseRdHupSocketArg { - int socket_fd; + int socket_fd; }; static void CloseRdHupSocketThreadFunc(CloseRdHupSocketArg* arg) { - asocket* s = create_local_socket(arg->socket_fd); - ASSERT_TRUE(s != nullptr); + asocket* s = create_local_socket(arg->socket_fd); + ASSERT_TRUE(s != nullptr); - InstallDummySocket(); - fdevent_loop(); + fdevent_loop(); } // This test checks if we can close sockets in CLOSE_WAIT state. TEST_F(LocalSocketTest, close_socket_in_CLOSE_WAIT_state) { - std::string error; - int listen_fd = network_inaddr_any_server(5038, SOCK_STREAM, &error); - ASSERT_GE(listen_fd, 0); - pthread_t client_thread; - ASSERT_EQ(0, pthread_create(&client_thread, nullptr, - reinterpret_cast(ClientThreadFunc), nullptr)); + std::string error; + int listen_fd = network_inaddr_any_server(5038, SOCK_STREAM, &error); + ASSERT_GE(listen_fd, 0); - struct sockaddr addr; - socklen_t alen; - alen = sizeof(addr); - int accept_fd = adb_socket_accept(listen_fd, &addr, &alen); - ASSERT_GE(accept_fd, 0); - CloseRdHupSocketArg arg; - arg.socket_fd = accept_fd; - pthread_t thread; - ASSERT_EQ(0, pthread_create(&thread, nullptr, - reinterpret_cast(CloseRdHupSocketThreadFunc), - &arg)); - // Wait until the fdevent_loop() starts. - sleep(1); - ASSERT_EQ(2u, fdevent_installed_count()); - // Wait until the client closes its socket. - ASSERT_EQ(0, pthread_join(client_thread, nullptr)); - sleep(2); - ASSERT_EQ(0, pthread_kill(thread, SIGUSR1)); - ASSERT_EQ(0, pthread_join(thread, nullptr)); + adb_thread_t client_thread; + ASSERT_TRUE(adb_thread_create(reinterpret_cast(ClientThreadFunc), nullptr, + &client_thread)); + + struct sockaddr addr; + socklen_t alen; + alen = sizeof(addr); + int accept_fd = adb_socket_accept(listen_fd, &addr, &alen); + ASSERT_GE(accept_fd, 0); + CloseRdHupSocketArg arg; + arg.socket_fd = accept_fd; + + PrepareThread(); + adb_thread_t thread; + ASSERT_TRUE(adb_thread_create(reinterpret_cast(CloseRdHupSocketThreadFunc), + &arg, &thread)); + + // Wait until the fdevent_loop() starts. + adb_sleep_ms(100); + EXPECT_EQ(2u, fdevent_installed_count()); + + // Wait until the client closes its socket. + ASSERT_TRUE(adb_thread_join(client_thread)); + + TerminateThread(thread); } #endif // defined(__linux__) diff --git a/adb/sysdeps.h b/adb/sysdeps.h index 3bae09ea1..7af2979ec 100644 --- a/adb/sysdeps.h +++ b/adb/sysdeps.h @@ -180,6 +180,14 @@ static __inline__ int adb_thread_setname(const std::string& name) { return 0; } +static __inline__ adb_thread_t adb_thread_self() { + return GetCurrentThread(); +} + +static __inline__ bool adb_thread_equal(adb_thread_t lhs, adb_thread_t rhs) { + return GetThreadId(lhs) == GetThreadId(rhs); +} + static __inline__ unsigned long adb_thread_id() { return GetCurrentThreadId(); @@ -263,24 +271,6 @@ int unix_isatty(int fd); /* normally provided by */ extern void* load_file(const char* pathname, unsigned* psize); -/* normally provided by "fdevent.h" */ - -#define FDE_READ 0x0001 -#define FDE_WRITE 0x0002 -#define FDE_ERROR 0x0004 -#define FDE_DONT_CLOSE 0x0080 - -typedef void (*fd_func)(int fd, unsigned events, void *userdata); - -fdevent *fdevent_create(int fd, fd_func func, void *arg); -void fdevent_destroy(fdevent *fde); -void fdevent_install(fdevent *fde, int fd, fd_func func, void *arg); -void fdevent_remove(fdevent *item); -void fdevent_set(fdevent *fde, unsigned events); -void fdevent_add(fdevent *fde, unsigned events); -void fdevent_del(fdevent *fde, unsigned events); -void fdevent_loop(); - static __inline__ void adb_sleep_ms( int mseconds ) { Sleep( mseconds ); @@ -304,6 +294,14 @@ extern int adb_setsockopt(int fd, int level, int optname, const void* optva extern int adb_socketpair( int sv[2] ); +struct adb_pollfd { + int fd; + short events; + short revents; +}; +extern int adb_poll(adb_pollfd* fds, size_t nfds, int timeout); +#define poll ___xxx_poll + static __inline__ int adb_is_absolute_host_path(const char* path) { return isalpha(path[0]) && path[1] == ':' && path[2] == '\\'; } @@ -456,14 +454,14 @@ size_t ParseCompleteUTF8(const char* first, const char* last, std::vector* #else /* !_WIN32 a.k.a. Unix */ -#include "fdevent.h" #include #include #include -#include -#include -#include #include +#include +#include +#include +#include #include #include @@ -803,6 +801,13 @@ static __inline__ int adb_socketpair( int sv[2] ) #undef socketpair #define socketpair ___xxx_socketpair +typedef struct pollfd adb_pollfd; +static __inline__ int adb_poll(adb_pollfd* fds, size_t nfds, int timeout) { + return TEMP_FAILURE_RETRY(poll(fds, nfds, timeout)); +} + +#define poll ___xxx_poll + static __inline__ void adb_sleep_ms( int mseconds ) { usleep( mseconds*1000 ); diff --git a/adb/sysdeps_test.cpp b/adb/sysdeps_test.cpp index 360eaa7f9..253d62fc2 100644 --- a/adb/sysdeps_test.cpp +++ b/adb/sysdeps_test.cpp @@ -18,6 +18,7 @@ #include #include +#include "adb_io.h" #include "sysdeps.h" static void increment_atomic_int(void* c) { @@ -67,3 +68,125 @@ TEST(sysdeps_thread, exit) { nullptr, &thread)); ASSERT_TRUE(adb_thread_join(thread)); } + +TEST(sysdeps_socketpair, smoke) { + int fds[2]; + ASSERT_EQ(0, adb_socketpair(fds)) << strerror(errno); + ASSERT_TRUE(WriteFdExactly(fds[0], "foo", 4)); + ASSERT_TRUE(WriteFdExactly(fds[1], "bar", 4)); + + char buf[4]; + ASSERT_TRUE(ReadFdExactly(fds[1], buf, 4)); + ASSERT_STREQ(buf, "foo"); + ASSERT_TRUE(ReadFdExactly(fds[0], buf, 4)); + ASSERT_STREQ(buf, "bar"); + ASSERT_EQ(0, adb_close(fds[0])); + ASSERT_EQ(0, adb_close(fds[1])); +} + +TEST(sysdeps_fd, exhaustion) { + std::vector fds; + int socketpair[2]; + + while (adb_socketpair(socketpair) == 0) { + fds.push_back(socketpair[0]); + fds.push_back(socketpair[1]); + } + + ASSERT_EQ(EMFILE, errno) << strerror(errno); + for (int fd : fds) { + ASSERT_EQ(0, adb_close(fd)); + } + ASSERT_EQ(0, adb_socketpair(socketpair)); + ASSERT_EQ(socketpair[0], fds[0]); + ASSERT_EQ(socketpair[1], fds[1]); + ASSERT_EQ(0, adb_close(socketpair[0])); + ASSERT_EQ(0, adb_close(socketpair[1])); +} + +class sysdeps_poll : public ::testing::Test { + protected: + int fds[2]; + void SetUp() override { + ASSERT_EQ(0, adb_socketpair(fds)) << strerror(errno); + } + + void TearDown() override { + ASSERT_EQ(0, adb_close(fds[0])); + ASSERT_EQ(0, adb_close(fds[1])); + } +}; + +TEST_F(sysdeps_poll, smoke) { + adb_pollfd pfd[2]; + pfd[0].fd = fds[0]; + pfd[0].events = POLLRDNORM; + pfd[1].fd = fds[1]; + pfd[1].events = POLLWRNORM; + + EXPECT_EQ(1, adb_poll(pfd, 2, 0)); + EXPECT_EQ(0, pfd[0].revents); + EXPECT_EQ(POLLWRNORM, pfd[1].revents); + + ASSERT_TRUE(WriteFdExactly(fds[1], "foo", 4)); + + // Wait for the socketpair to be flushed. + EXPECT_EQ(1, adb_poll(pfd, 1, 100)); + EXPECT_EQ(POLLRDNORM, pfd[0].revents); + + EXPECT_EQ(2, adb_poll(pfd, 2, 0)); + EXPECT_EQ(POLLRDNORM, pfd[0].revents); + EXPECT_EQ(POLLWRNORM, pfd[1].revents); +} + +TEST_F(sysdeps_poll, timeout) { + adb_pollfd pfd; + pfd.fd = fds[0]; + pfd.events = POLLRDNORM; + + EXPECT_EQ(0, adb_poll(&pfd, 1, 100)); + EXPECT_EQ(0, pfd.revents); + + ASSERT_TRUE(WriteFdExactly(fds[1], "foo", 4)); + + EXPECT_EQ(1, adb_poll(&pfd, 1, 100)); + EXPECT_EQ(POLLRDNORM, pfd.revents); +} + +TEST_F(sysdeps_poll, invalid_fd) { + adb_pollfd pfd[3]; + pfd[0].fd = fds[0]; + pfd[0].events = POLLRDNORM; + pfd[1].fd = INT_MAX; + pfd[1].events = POLLRDNORM; + pfd[2].fd = fds[1]; + pfd[2].events = POLLWRNORM; + + ASSERT_TRUE(WriteFdExactly(fds[1], "foo", 4)); + + // Wait for the socketpair to be flushed. + EXPECT_EQ(1, adb_poll(pfd, 1, 100)); + EXPECT_EQ(POLLRDNORM, pfd[0].revents); + + EXPECT_EQ(3, adb_poll(pfd, 3, 0)); + EXPECT_EQ(POLLRDNORM, pfd[0].revents); + EXPECT_EQ(POLLNVAL, pfd[1].revents); + EXPECT_EQ(POLLWRNORM, pfd[2].revents); +} + +TEST_F(sysdeps_poll, duplicate_fd) { + adb_pollfd pfd[2]; + pfd[0].fd = fds[0]; + pfd[0].events = POLLRDNORM; + pfd[1] = pfd[0]; + + EXPECT_EQ(0, adb_poll(pfd, 2, 0)); + EXPECT_EQ(0, pfd[0].revents); + EXPECT_EQ(0, pfd[1].revents); + + ASSERT_TRUE(WriteFdExactly(fds[1], "foo", 4)); + + EXPECT_EQ(2, adb_poll(pfd, 2, 100)); + EXPECT_EQ(POLLRDNORM, pfd[0].revents); + EXPECT_EQ(POLLRDNORM, pfd[1].revents); +} diff --git a/adb/sysdeps_win32.cpp b/adb/sysdeps_win32.cpp index 0b0898186..7eae186c4 100644 --- a/adb/sysdeps_win32.cpp +++ b/adb/sysdeps_win32.cpp @@ -29,6 +29,7 @@ #include #include #include +#include #include @@ -39,6 +40,7 @@ #include #include "adb.h" +#include "adb_utils.h" extern void fatal(const char *fmt, ...); @@ -54,7 +56,6 @@ typedef struct FHClassRec_ { int (*_fh_lseek)(FH, int, int); int (*_fh_read)(FH, void*, int); int (*_fh_write)(FH, const void*, int); - void (*_fh_hook)(FH, int, EventHook); } FHClassRec; static void _fh_file_init(FH); @@ -62,7 +63,6 @@ static int _fh_file_close(FH); static int _fh_file_lseek(FH, int, int); static int _fh_file_read(FH, void*, int); static int _fh_file_write(FH, const void*, int); -static void _fh_file_hook(FH, int, EventHook); static const FHClassRec _fh_file_class = { _fh_file_init, @@ -70,7 +70,6 @@ static const FHClassRec _fh_file_class = { _fh_file_lseek, _fh_file_read, _fh_file_write, - _fh_file_hook }; static void _fh_socket_init(FH); @@ -78,7 +77,6 @@ static int _fh_socket_close(FH); static int _fh_socket_lseek(FH, int, int); static int _fh_socket_read(FH, void*, int); static int _fh_socket_write(FH, const void*, int); -static void _fh_socket_hook(FH, int, EventHook); static const FHClassRec _fh_socket_class = { _fh_socket_init, @@ -86,7 +84,6 @@ static const FHClassRec _fh_socket_class = { _fh_socket_lseek, _fh_socket_read, _fh_socket_write, - _fh_socket_hook }; #define assert(cond) \ @@ -174,9 +171,6 @@ void *load_file(const char *fn, unsigned *_sz) /**************************************************************************/ /**************************************************************************/ -/* used to emulate unix-domain socket pairs */ -typedef struct SocketPairRec_* SocketPair; - typedef struct FHRec_ { FHClass clazz; @@ -185,10 +179,8 @@ typedef struct FHRec_ union { HANDLE handle; SOCKET socket; - SocketPair pair; } u; - HANDLE event; int mask; char name[32]; @@ -197,10 +189,8 @@ typedef struct FHRec_ #define fh_handle u.handle #define fh_socket u.socket -#define fh_pair u.pair - -#define WIN32_FH_BASE 100 +#define WIN32_FH_BASE 2048 #define WIN32_MAX_FHS 128 static adb_mutex_t _win32_lock; @@ -250,17 +240,10 @@ _fh_alloc( FHClass clazz ) adb_mutex_lock( &_win32_lock ); - // Search entire array, starting from _win32_fh_next. - for (int nn = 0; nn < WIN32_MAX_FHS; nn++) { - // Keep incrementing _win32_fh_next to avoid giving out an index that - // was recently closed, to try to avoid use-after-free. - const int index = _win32_fh_next++; - // Handle wrap-around of _win32_fh_next. - if (_win32_fh_next == WIN32_MAX_FHS) { - _win32_fh_next = 0; - } - if (_win32_fhs[index].clazz == NULL) { - f = &_win32_fhs[index]; + for (int i = _win32_fh_next; i < WIN32_MAX_FHS; ++i) { + if (_win32_fhs[i].clazz == NULL) { + f = &_win32_fhs[i]; + _win32_fh_next = i + 1; goto Exit; } } @@ -285,6 +268,12 @@ _fh_close( FH f ) // Use lock so that closing only happens once and so that _fh_alloc can't // allocate a FH that we're in the middle of closing. adb_mutex_lock(&_win32_lock); + + int offset = f - _win32_fhs; + if (_win32_fh_next > offset) { + _win32_fh_next = offset; + } + if (f->used) { f->clazz->_fh_close( f ); f->name[0] = '\0'; @@ -672,19 +661,56 @@ static void _socket_set_errno( const DWORD err ) { } } -static void _fh_socket_init( FH f ) { - f->fh_socket = INVALID_SOCKET; - f->event = WSACreateEvent(); - if (f->event == WSA_INVALID_EVENT) { - D("WSACreateEvent failed: %s", - android::base::SystemErrorCodeToString(WSAGetLastError()).c_str()); - - // _event_socket_start assumes that this field is INVALID_HANDLE_VALUE - // on failure, instead of NULL which is what Windows really returns on - // error. It might be better to change all the other code to look for - // NULL, but that is a much riskier change. - f->event = INVALID_HANDLE_VALUE; +extern int adb_poll(adb_pollfd* fds, size_t nfds, int timeout) { + // WSAPoll doesn't handle invalid/non-socket handles, so we need to handle them ourselves. + int skipped = 0; + std::vector sockets; + std::vector original; + for (size_t i = 0; i < nfds; ++i) { + FH fh = _fh_from_int(fds[i].fd, __func__); + if (!fh || !fh->used || fh->clazz != &_fh_socket_class) { + D("adb_poll received bad FD %d", fds[i].fd); + fds[i].revents = POLLNVAL; + ++skipped; + } else { + WSAPOLLFD wsapollfd = { + .fd = fh->u.socket, + .events = static_cast(fds[i].events) + }; + sockets.push_back(wsapollfd); + original.push_back(&fds[i]); + } } + + if (sockets.empty()) { + return skipped; + } + + int result = WSAPoll(sockets.data(), sockets.size(), timeout); + if (result == SOCKET_ERROR) { + _socket_set_errno(WSAGetLastError()); + return -1; + } + + // Map the results back onto the original set. + for (size_t i = 0; i < sockets.size(); ++i) { + original[i]->revents = sockets[i].revents; + } + + // WSAPoll appears to return the number of unique FDs with avaiable events, instead of how many + // of the pollfd elements have a non-zero revents field, which is what it and poll are specified + // to do. Ignore its result and calculate the proper return value. + result = 0; + for (size_t i = 0; i < nfds; ++i) { + if (fds[i].revents != 0) { + ++result; + } + } + return result; +} + +static void _fh_socket_init(FH f) { + f->fh_socket = INVALID_SOCKET; f->mask = 0; } @@ -700,18 +726,12 @@ static int _fh_socket_close( FH f ) { #endif } if (closesocket(f->fh_socket) == SOCKET_ERROR) { - D("closesocket failed: %s", - android::base::SystemErrorCodeToString(WSAGetLastError()).c_str()); + // Don't set errno here, since adb_close will ignore it. + const DWORD err = WSAGetLastError(); + D("closesocket failed: %s", android::base::SystemErrorCodeToString(err).c_str()); } f->fh_socket = INVALID_SOCKET; } - if (f->event != NULL) { - if (!CloseHandle(f->event)) { - D("CloseHandle failed: %s", - android::base::SystemErrorCodeToString(GetLastError()).c_str()); - } - f->event = NULL; - } f->mask = 0; return 0; } @@ -820,16 +840,15 @@ static int GetSocketProtocolFromSocketType(int type) { int network_loopback_client(int port, int type, std::string* error) { struct sockaddr_in addr; - SOCKET s; + SOCKET s; - unique_fh f(_fh_alloc(&_fh_socket_class)); + unique_fh f(_fh_alloc(&_fh_socket_class)); if (!f) { *error = strerror(errno); return -1; } - if (!_winsock_init) - _init_winsock(); + if (!_winsock_init) _init_winsock(); memset(&addr, 0, sizeof(addr)); addr.sin_family = AF_INET; @@ -837,30 +856,32 @@ int network_loopback_client(int port, int type, std::string* error) { addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); s = socket(AF_INET, type, GetSocketProtocolFromSocketType(type)); - if(s == INVALID_SOCKET) { + if (s == INVALID_SOCKET) { + const DWORD err = WSAGetLastError(); *error = android::base::StringPrintf("cannot create socket: %s", - android::base::SystemErrorCodeToString(WSAGetLastError()).c_str()); + android::base::SystemErrorCodeToString(err).c_str()); D("%s", error->c_str()); + _socket_set_errno(err); return -1; } f->fh_socket = s; - if(connect(s, (struct sockaddr *) &addr, sizeof(addr)) == SOCKET_ERROR) { + if (connect(s, (struct sockaddr*)&addr, sizeof(addr)) == SOCKET_ERROR) { // Save err just in case inet_ntoa() or ntohs() changes the last error. const DWORD err = WSAGetLastError(); *error = android::base::StringPrintf("cannot connect to %s:%u: %s", - inet_ntoa(addr.sin_addr), ntohs(addr.sin_port), - android::base::SystemErrorCodeToString(err).c_str()); - D("could not connect to %s:%d: %s", - type != SOCK_STREAM ? "udp" : "tcp", port, error->c_str()); + inet_ntoa(addr.sin_addr), ntohs(addr.sin_port), + android::base::SystemErrorCodeToString(err).c_str()); + D("could not connect to %s:%d: %s", type != SOCK_STREAM ? "udp" : "tcp", port, + error->c_str()); + _socket_set_errno(err); return -1; } const int fd = _fh_to_int(f.get()); - snprintf( f->name, sizeof(f->name), "%d(lo-client:%s%d)", fd, - type != SOCK_STREAM ? "udp:" : "", port ); - D( "port %d type %s => fd %d", port, type != SOCK_STREAM ? "udp" : "tcp", - fd ); + snprintf(f->name, sizeof(f->name), "%d(lo-client:%s%d)", fd, type != SOCK_STREAM ? "udp:" : "", + port); + D("port %d type %s => fd %d", port, type != SOCK_STREAM ? "udp" : "tcp", fd); f.release(); return fd; } @@ -868,20 +889,18 @@ int network_loopback_client(int port, int type, std::string* error) { #define LISTEN_BACKLOG 4 // interface_address is INADDR_LOOPBACK or INADDR_ANY. -static int _network_server(int port, int type, u_long interface_address, - std::string* error) { +static int _network_server(int port, int type, u_long interface_address, std::string* error) { struct sockaddr_in addr; - SOCKET s; - int n; + SOCKET s; + int n; - unique_fh f(_fh_alloc(&_fh_socket_class)); + unique_fh f(_fh_alloc(&_fh_socket_class)); if (!f) { *error = strerror(errno); return -1; } - if (!_winsock_init) - _init_winsock(); + if (!_winsock_init) _init_winsock(); memset(&addr, 0, sizeof(addr)); addr.sin_family = AF_INET; @@ -892,9 +911,11 @@ static int _network_server(int port, int type, u_long interface_address, // IPv4 and IPv6. s = socket(AF_INET, type, GetSocketProtocolFromSocketType(type)); if (s == INVALID_SOCKET) { + const DWORD err = WSAGetLastError(); *error = android::base::StringPrintf("cannot create socket: %s", - android::base::SystemErrorCodeToString(WSAGetLastError()).c_str()); + android::base::SystemErrorCodeToString(err).c_str()); D("%s", error->c_str()); + _socket_set_errno(err); return -1; } @@ -903,40 +924,41 @@ static int _network_server(int port, int type, u_long interface_address, // Note: SO_REUSEADDR on Windows allows multiple processes to bind to the // same port, so instead use SO_EXCLUSIVEADDRUSE. n = 1; - if (setsockopt(s, SOL_SOCKET, SO_EXCLUSIVEADDRUSE, (const char*)&n, - sizeof(n)) == SOCKET_ERROR) { - *error = android::base::StringPrintf( - "cannot set socket option SO_EXCLUSIVEADDRUSE: %s", - android::base::SystemErrorCodeToString(WSAGetLastError()).c_str()); + if (setsockopt(s, SOL_SOCKET, SO_EXCLUSIVEADDRUSE, (const char*)&n, sizeof(n)) == SOCKET_ERROR) { + const DWORD err = WSAGetLastError(); + *error = android::base::StringPrintf("cannot set socket option SO_EXCLUSIVEADDRUSE: %s", + android::base::SystemErrorCodeToString(err).c_str()); D("%s", error->c_str()); + _socket_set_errno(err); return -1; } - if (bind(s, (struct sockaddr *) &addr, sizeof(addr)) == SOCKET_ERROR) { + if (bind(s, (struct sockaddr*)&addr, sizeof(addr)) == SOCKET_ERROR) { // Save err just in case inet_ntoa() or ntohs() changes the last error. const DWORD err = WSAGetLastError(); - *error = android::base::StringPrintf("cannot bind to %s:%u: %s", - inet_ntoa(addr.sin_addr), ntohs(addr.sin_port), - android::base::SystemErrorCodeToString(err).c_str()); - D("could not bind to %s:%d: %s", - type != SOCK_STREAM ? "udp" : "tcp", port, error->c_str()); + *error = android::base::StringPrintf("cannot bind to %s:%u: %s", inet_ntoa(addr.sin_addr), + ntohs(addr.sin_port), + android::base::SystemErrorCodeToString(err).c_str()); + D("could not bind to %s:%d: %s", type != SOCK_STREAM ? "udp" : "tcp", port, error->c_str()); + _socket_set_errno(err); return -1; } if (type == SOCK_STREAM) { if (listen(s, LISTEN_BACKLOG) == SOCKET_ERROR) { - *error = android::base::StringPrintf("cannot listen on socket: %s", - android::base::SystemErrorCodeToString(WSAGetLastError()).c_str()); - D("could not listen on %s:%d: %s", - type != SOCK_STREAM ? "udp" : "tcp", port, error->c_str()); + const DWORD err = WSAGetLastError(); + *error = android::base::StringPrintf( + "cannot listen on socket: %s", android::base::SystemErrorCodeToString(err).c_str()); + D("could not listen on %s:%d: %s", type != SOCK_STREAM ? "udp" : "tcp", port, + error->c_str()); + _socket_set_errno(err); return -1; } } const int fd = _fh_to_int(f.get()); - snprintf( f->name, sizeof(f->name), "%d(%s-server:%s%d)", fd, - interface_address == INADDR_LOOPBACK ? "lo" : "any", - type != SOCK_STREAM ? "udp:" : "", port ); - D( "port %d type %s => fd %d", port, type != SOCK_STREAM ? "udp" : "tcp", - fd ); + snprintf(f->name, sizeof(f->name), "%d(%s-server:%s%d)", fd, + interface_address == INADDR_LOOPBACK ? "lo" : "any", type != SOCK_STREAM ? "udp:" : "", + port); + D("port %d type %s => fd %d", port, type != SOCK_STREAM ? "udp" : "tcp", fd); f.release(); return fd; } @@ -970,54 +992,57 @@ int network_connect(const std::string& host, int port, int type, int timeout, st struct addrinfo* addrinfo_ptr = nullptr; #if (NTDDI_VERSION >= NTDDI_WINXPSP2) || (_WIN32_WINNT >= _WIN32_WINNT_WS03) - // TODO: When the Android SDK tools increases the Windows system - // requirements >= WinXP SP2, switch to android::base::UTF8ToWide() + GetAddrInfoW(). +// TODO: When the Android SDK tools increases the Windows system +// requirements >= WinXP SP2, switch to android::base::UTF8ToWide() + GetAddrInfoW(). #else - // Otherwise, keep using getaddrinfo(), or do runtime API detection - // with GetProcAddress("GetAddrInfoW"). +// Otherwise, keep using getaddrinfo(), or do runtime API detection +// with GetProcAddress("GetAddrInfoW"). #endif if (getaddrinfo(host.c_str(), port_str, &hints, &addrinfo_ptr) != 0) { - *error = android::base::StringPrintf( - "cannot resolve host '%s' and port %s: %s", host.c_str(), - port_str, android::base::SystemErrorCodeToString(WSAGetLastError()).c_str()); + const DWORD err = WSAGetLastError(); + *error = android::base::StringPrintf("cannot resolve host '%s' and port %s: %s", + host.c_str(), port_str, + android::base::SystemErrorCodeToString(err).c_str()); + D("%s", error->c_str()); + _socket_set_errno(err); return -1; } - std::unique_ptr - addrinfo(addrinfo_ptr, freeaddrinfo); + std::unique_ptr addrinfo(addrinfo_ptr, freeaddrinfo); addrinfo_ptr = nullptr; // TODO: Try all the addresses if there's more than one? This just uses // the first. Or, could call WSAConnectByName() (Windows Vista and newer) // which tries all addresses, takes a timeout and more. - SOCKET s = socket(addrinfo->ai_family, addrinfo->ai_socktype, - addrinfo->ai_protocol); - if(s == INVALID_SOCKET) { + SOCKET s = socket(addrinfo->ai_family, addrinfo->ai_socktype, addrinfo->ai_protocol); + if (s == INVALID_SOCKET) { + const DWORD err = WSAGetLastError(); *error = android::base::StringPrintf("cannot create socket: %s", - android::base::SystemErrorCodeToString(WSAGetLastError()).c_str()); + android::base::SystemErrorCodeToString(err).c_str()); D("%s", error->c_str()); + _socket_set_errno(err); return -1; } f->fh_socket = s; // TODO: Implement timeouts for Windows. Seems like the default in theory // (according to http://serverfault.com/a/671453) and in practice is 21 sec. - if(connect(s, addrinfo->ai_addr, addrinfo->ai_addrlen) == SOCKET_ERROR) { + if (connect(s, addrinfo->ai_addr, addrinfo->ai_addrlen) == SOCKET_ERROR) { // TODO: Use WSAAddressToString or inet_ntop on address. - *error = android::base::StringPrintf("cannot connect to %s:%s: %s", - host.c_str(), port_str, - android::base::SystemErrorCodeToString(WSAGetLastError()).c_str()); - D("could not connect to %s:%s:%s: %s", - type != SOCK_STREAM ? "udp" : "tcp", host.c_str(), port_str, - error->c_str()); + const DWORD err = WSAGetLastError(); + *error = android::base::StringPrintf("cannot connect to %s:%s: %s", host.c_str(), port_str, + android::base::SystemErrorCodeToString(err).c_str()); + D("could not connect to %s:%s:%s: %s", type != SOCK_STREAM ? "udp" : "tcp", host.c_str(), + port_str, error->c_str()); + _socket_set_errno(err); return -1; } const int fd = _fh_to_int(f.get()); - snprintf( f->name, sizeof(f->name), "%d(net-client:%s%d)", fd, - type != SOCK_STREAM ? "udp:" : "", port ); - D( "host '%s' port %d type %s => fd %d", host.c_str(), port, - type != SOCK_STREAM ? "udp" : "tcp", fd ); + snprintf(f->name, sizeof(f->name), "%d(net-client:%s%d)", fd, type != SOCK_STREAM ? "udp:" : "", + port); + D("host '%s' port %d type %s => fd %d", host.c_str(), port, type != SOCK_STREAM ? "udp" : "tcp", + fd); f.release(); return fd; } @@ -1083,6 +1108,25 @@ int adb_setsockopt( int fd, int level, int optname, const void* optval, soc return result; } +int adb_getsockname(int fd, struct sockaddr* sockaddr, socklen_t* optlen) { + FH fh = _fh_from_int(fd, __func__); + + if (!fh || fh->clazz != &_fh_socket_class) { + D("adb_getsockname: invalid fd %d", fd); + errno = EBADF; + return -1; + } + + int result = getsockname(fh->fh_socket, sockaddr, optlen); + if (result == SOCKET_ERROR) { + const DWORD err = WSAGetLastError(); + D("adb_getsockname: setsockopt on fd %d failed: %s\n", fd, + android::base::SystemErrorCodeToString(err).c_str()); + _socket_set_errno(err); + result = -1; + } + return result; +} int adb_shutdown(int fd) { @@ -1105,1352 +1149,85 @@ int adb_shutdown(int fd) return 0; } -/**************************************************************************/ -/**************************************************************************/ -/***** *****/ -/***** emulated socketpairs *****/ -/***** *****/ -/**************************************************************************/ -/**************************************************************************/ +// Emulate socketpair(2) by binding and connecting to a socket. +int adb_socketpair(int sv[2]) { + int server = -1; + int client = -1; + int accepted = -1; + sockaddr_storage addr_storage; + socklen_t addr_len = sizeof(addr_storage); + sockaddr_in* addr = nullptr; + std::string error; -/* we implement socketpairs directly in use space for the following reasons: - * - it avoids copying data from/to the Nt kernel - * - it allows us to implement fdevent hooks easily and cheaply, something - * that is not possible with standard Win32 pipes !! - * - * basically, we use two circular buffers, each one corresponding to a given - * direction. - * - * each buffer is implemented as two regions: - * - * region A which is (a_start,a_end) - * region B which is (0, b_end) with b_end <= a_start - * - * an empty buffer has: a_start = a_end = b_end = 0 - * - * a_start is the pointer where we start reading data - * a_end is the pointer where we start writing data, unless it is BUFFER_SIZE, - * then you start writing at b_end - * - * the buffer is full when b_end == a_start && a_end == BUFFER_SIZE - * - * there is room when b_end < a_start || a_end < BUFER_SIZE - * - * when reading, a_start is incremented, it a_start meets a_end, then - * we do: a_start = 0, a_end = b_end, b_end = 0, and keep going on.. - */ - -#define BIP_BUFFER_SIZE 4096 - -#if 0 -#include -# define BIPD(x) D x -# define BIPDUMP bip_dump_hex - -static void bip_dump_hex( const unsigned char* ptr, size_t len ) -{ - int nn, len2 = len; - - if (len2 > 8) len2 = 8; - - for (nn = 0; nn < len2; nn++) - printf("%02x", ptr[nn]); - printf(" "); - - for (nn = 0; nn < len2; nn++) { - int c = ptr[nn]; - if (c < 32 || c > 127) - c = '.'; - printf("%c", c); - } - printf("\n"); - fflush(stdout); -} - -#else -# define BIPD(x) do {} while (0) -# define BIPDUMP(p,l) BIPD(p) -#endif - -typedef struct BipBufferRec_ -{ - int a_start; - int a_end; - int b_end; - int fdin; - int fdout; - int closed; - int can_write; /* boolean */ - HANDLE evt_write; /* event signaled when one can write to a buffer */ - int can_read; /* boolean */ - HANDLE evt_read; /* event signaled when one can read from a buffer */ - CRITICAL_SECTION lock; - unsigned char buff[ BIP_BUFFER_SIZE ]; - -} BipBufferRec, *BipBuffer; - -static void -bip_buffer_init( BipBuffer buffer ) -{ - D( "bit_buffer_init %p", buffer ); - buffer->a_start = 0; - buffer->a_end = 0; - buffer->b_end = 0; - buffer->can_write = 1; - buffer->can_read = 0; - buffer->fdin = 0; - buffer->fdout = 0; - buffer->closed = 0; - buffer->evt_write = CreateEvent( NULL, TRUE, TRUE, NULL ); - buffer->evt_read = CreateEvent( NULL, TRUE, FALSE, NULL ); - InitializeCriticalSection( &buffer->lock ); -} - -static void -bip_buffer_close( BipBuffer bip ) -{ - bip->closed = 1; - - if (!bip->can_read) { - SetEvent( bip->evt_read ); - } - if (!bip->can_write) { - SetEvent( bip->evt_write ); - } -} - -static void -bip_buffer_done( BipBuffer bip ) -{ - BIPD(( "bip_buffer_done: %d->%d", bip->fdin, bip->fdout )); - CloseHandle( bip->evt_read ); - CloseHandle( bip->evt_write ); - DeleteCriticalSection( &bip->lock ); -} - -static int -bip_buffer_write( BipBuffer bip, const void* src, int len ) -{ - int avail, count = 0; - - if (len <= 0) - return 0; - - BIPD(( "bip_buffer_write: enter %d->%d len %d", bip->fdin, bip->fdout, len )); - BIPDUMP( src, len ); - - if (bip->closed) { - errno = EPIPE; - return -1; + server = network_loopback_server(0, SOCK_STREAM, &error); + if (server < 0) { + D("adb_socketpair: failed to create server: %s", error.c_str()); + goto fail; } - EnterCriticalSection( &bip->lock ); - - while (!bip->can_write) { - int ret; - LeaveCriticalSection( &bip->lock ); - - if (bip->closed) { - errno = EPIPE; - return -1; - } - /* spinlocking here is probably unfair, but let's live with it */ - ret = WaitForSingleObject( bip->evt_write, INFINITE ); - if (ret != WAIT_OBJECT_0) { /* buffer probably closed */ - D( "bip_buffer_write: error %d->%d WaitForSingleObject returned %d, error %ld", bip->fdin, bip->fdout, ret, GetLastError() ); - return 0; - } - if (bip->closed) { - errno = EPIPE; - return -1; - } - EnterCriticalSection( &bip->lock ); + if (adb_getsockname(server, reinterpret_cast(&addr_storage), &addr_len) < 0) { + D("adb_socketpair: adb_getsockname failed: %s", strerror(errno)); + goto fail; } - BIPD(( "bip_buffer_write: exec %d->%d len %d", bip->fdin, bip->fdout, len )); - - avail = BIP_BUFFER_SIZE - bip->a_end; - if (avail > 0) - { - /* we can append to region A */ - if (avail > len) - avail = len; - - memcpy( bip->buff + bip->a_end, src, avail ); - src = (const char *)src + avail; - count += avail; - len -= avail; - - bip->a_end += avail; - if (bip->a_end == BIP_BUFFER_SIZE && bip->a_start == 0) { - bip->can_write = 0; - ResetEvent( bip->evt_write ); - goto Exit; - } + if (addr_storage.ss_family != AF_INET) { + D("adb_socketpair: unknown address family received: %d", addr_storage.ss_family); + errno = ECONNABORTED; + goto fail; } - if (len == 0) - goto Exit; - - avail = bip->a_start - bip->b_end; - assert( avail > 0 ); /* since can_write is TRUE */ - - if (avail > len) - avail = len; - - memcpy( bip->buff + bip->b_end, src, avail ); - count += avail; - bip->b_end += avail; - - if (bip->b_end == bip->a_start) { - bip->can_write = 0; - ResetEvent( bip->evt_write ); + addr = reinterpret_cast(&addr_storage); + D("adb_socketpair: bound on port %d", ntohs(addr->sin_port)); + client = network_loopback_client(ntohs(addr->sin_port), SOCK_STREAM, &error); + if (client < 0) { + D("adb_socketpair: failed to connect client: %s", error.c_str()); + goto fail; } -Exit: - assert( count > 0 ); - - if ( !bip->can_read ) { - bip->can_read = 1; - SetEvent( bip->evt_read ); - } - - BIPD(( "bip_buffer_write: exit %d->%d count %d (as=%d ae=%d be=%d cw=%d cr=%d", - bip->fdin, bip->fdout, count, bip->a_start, bip->a_end, bip->b_end, bip->can_write, bip->can_read )); - LeaveCriticalSection( &bip->lock ); - - return count; - } - -static int -bip_buffer_read( BipBuffer bip, void* dst, int len ) -{ - int avail, count = 0; - - if (len <= 0) - return 0; - - BIPD(( "bip_buffer_read: enter %d->%d len %d", bip->fdin, bip->fdout, len )); - - EnterCriticalSection( &bip->lock ); - while ( !bip->can_read ) - { -#if 0 - LeaveCriticalSection( &bip->lock ); - errno = EAGAIN; - return -1; -#else - int ret; - LeaveCriticalSection( &bip->lock ); - - if (bip->closed) { - errno = EPIPE; - return -1; - } - - ret = WaitForSingleObject( bip->evt_read, INFINITE ); - if (ret != WAIT_OBJECT_0) { /* probably closed buffer */ - D( "bip_buffer_read: error %d->%d WaitForSingleObject returned %d, error %ld", bip->fdin, bip->fdout, ret, GetLastError()); - return 0; - } - if (bip->closed) { - errno = EPIPE; - return -1; - } - EnterCriticalSection( &bip->lock ); -#endif - } - - BIPD(( "bip_buffer_read: exec %d->%d len %d", bip->fdin, bip->fdout, len )); - - avail = bip->a_end - bip->a_start; - assert( avail > 0 ); /* since can_read is TRUE */ - - if (avail > len) - avail = len; - - memcpy( dst, bip->buff + bip->a_start, avail ); - dst = (char *)dst + avail; - count += avail; - len -= avail; - - bip->a_start += avail; - if (bip->a_start < bip->a_end) - goto Exit; - - bip->a_start = 0; - bip->a_end = bip->b_end; - bip->b_end = 0; - - avail = bip->a_end; - if (avail > 0) { - if (avail > len) - avail = len; - memcpy( dst, bip->buff, avail ); - count += avail; - bip->a_start += avail; - - if ( bip->a_start < bip->a_end ) - goto Exit; - - bip->a_start = bip->a_end = 0; - } - - bip->can_read = 0; - ResetEvent( bip->evt_read ); - -Exit: - assert( count > 0 ); - - if (!bip->can_write ) { - bip->can_write = 1; - SetEvent( bip->evt_write ); - } - - BIPDUMP( (const unsigned char*)dst - count, count ); - BIPD(( "bip_buffer_read: exit %d->%d count %d (as=%d ae=%d be=%d cw=%d cr=%d", - bip->fdin, bip->fdout, count, bip->a_start, bip->a_end, bip->b_end, bip->can_write, bip->can_read )); - LeaveCriticalSection( &bip->lock ); - - return count; -} - -typedef struct SocketPairRec_ -{ - BipBufferRec a2b_bip; - BipBufferRec b2a_bip; - FH a_fd; - int used; - -} SocketPairRec; - -void _fh_socketpair_init( FH f ) -{ - f->fh_pair = NULL; -} - -static int -_fh_socketpair_close( FH f ) -{ - if ( f->fh_pair ) { - SocketPair pair = f->fh_pair; - - if ( f == pair->a_fd ) { - pair->a_fd = NULL; - } - - bip_buffer_close( &pair->b2a_bip ); - bip_buffer_close( &pair->a2b_bip ); - - if ( --pair->used == 0 ) { - bip_buffer_done( &pair->b2a_bip ); - bip_buffer_done( &pair->a2b_bip ); - free( pair ); - } - f->fh_pair = NULL; + accepted = adb_socket_accept(server, nullptr, nullptr); + if (accepted < 0) { + D("adb_socketpair: failed to accept: %s", strerror(errno)); + goto fail; } + adb_close(server); + sv[0] = client; + sv[1] = accepted; return 0; -} -static int -_fh_socketpair_lseek( FH f, int pos, int origin ) -{ - errno = ESPIPE; +fail: + if (server >= 0) { + adb_close(server); + } + if (client >= 0) { + adb_close(client); + } + if (accepted >= 0) { + adb_close(accepted); + } return -1; } -static int -_fh_socketpair_read( FH f, void* buf, int len ) -{ - SocketPair pair = f->fh_pair; - BipBuffer bip; +bool set_file_block_mode(int fd, bool block) { + FH fh = _fh_from_int(fd, __func__); - if (!pair) - return -1; - - if ( f == pair->a_fd ) - bip = &pair->b2a_bip; - else - bip = &pair->a2b_bip; - - return bip_buffer_read( bip, buf, len ); -} - -static int -_fh_socketpair_write( FH f, const void* buf, int len ) -{ - SocketPair pair = f->fh_pair; - BipBuffer bip; - - if (!pair) - return -1; - - if ( f == pair->a_fd ) - bip = &pair->a2b_bip; - else - bip = &pair->b2a_bip; - - return bip_buffer_write( bip, buf, len ); -} - - -static void _fh_socketpair_hook( FH f, int event, EventHook hook ); /* forward */ - -static const FHClassRec _fh_socketpair_class = -{ - _fh_socketpair_init, - _fh_socketpair_close, - _fh_socketpair_lseek, - _fh_socketpair_read, - _fh_socketpair_write, - _fh_socketpair_hook -}; - - -int adb_socketpair(int sv[2]) { - SocketPair pair; - - unique_fh fa(_fh_alloc(&_fh_socketpair_class)); - if (!fa) { - return -1; - } - unique_fh fb(_fh_alloc(&_fh_socketpair_class)); - if (!fb) { - return -1; + if (!fh || !fh->used) { + errno = EBADF; + return false; } - pair = reinterpret_cast(malloc(sizeof(*pair))); - if (pair == NULL) { - D("adb_socketpair: not enough memory to allocate pipes" ); - return -1; - } - - bip_buffer_init( &pair->a2b_bip ); - bip_buffer_init( &pair->b2a_bip ); - - fa->fh_pair = pair; - fb->fh_pair = pair; - pair->used = 2; - pair->a_fd = fa.get(); - - sv[0] = _fh_to_int(fa.get()); - sv[1] = _fh_to_int(fb.get()); - - pair->a2b_bip.fdin = sv[0]; - pair->a2b_bip.fdout = sv[1]; - pair->b2a_bip.fdin = sv[1]; - pair->b2a_bip.fdout = sv[0]; - - snprintf( fa->name, sizeof(fa->name), "%d(pair:%d)", sv[0], sv[1] ); - snprintf( fb->name, sizeof(fb->name), "%d(pair:%d)", sv[1], sv[0] ); - D( "adb_socketpair: returns (%d, %d)", sv[0], sv[1] ); - fa.release(); - fb.release(); - return 0; -} - -/**************************************************************************/ -/**************************************************************************/ -/***** *****/ -/***** fdevents emulation *****/ -/***** *****/ -/***** this is a very simple implementation, we rely on the fact *****/ -/***** that ADB doesn't use FDE_ERROR. *****/ -/***** *****/ -/**************************************************************************/ -/**************************************************************************/ - -#define FATAL(fmt, ...) fatal("%s: " fmt, __FUNCTION__, ##__VA_ARGS__) - -#if DEBUG -static void dump_fde(fdevent *fde, const char *info) -{ - fprintf(stderr,"FDE #%03d %c%c%c %s\n", fde->fd, - fde->state & FDE_READ ? 'R' : ' ', - fde->state & FDE_WRITE ? 'W' : ' ', - fde->state & FDE_ERROR ? 'E' : ' ', - info); -} -#else -#define dump_fde(fde, info) do { } while(0) -#endif - -#define FDE_EVENTMASK 0x00ff -#define FDE_STATEMASK 0xff00 - -#define FDE_ACTIVE 0x0100 -#define FDE_PENDING 0x0200 -#define FDE_CREATED 0x0400 - -static void fdevent_plist_enqueue(fdevent *node); -static void fdevent_plist_remove(fdevent *node); -static fdevent *fdevent_plist_dequeue(void); - -static fdevent list_pending = { - .next = &list_pending, - .prev = &list_pending, -}; - -static fdevent **fd_table = 0; -static int fd_table_max = 0; - -typedef struct EventLooperRec_* EventLooper; - -typedef struct EventHookRec_ -{ - EventHook next; - FH fh; - HANDLE h; - int wanted; /* wanted event flags */ - int ready; /* ready event flags */ - void* aux; - void (*prepare)( EventHook hook ); - int (*start) ( EventHook hook ); - void (*stop) ( EventHook hook ); - int (*check) ( EventHook hook ); - int (*peek) ( EventHook hook ); -} EventHookRec; - -static EventHook _free_hooks; - -static EventHook -event_hook_alloc(FH fh) { - EventHook hook = _free_hooks; - if (hook != NULL) { - _free_hooks = hook->next; + if (fh->clazz == &_fh_socket_class) { + u_long x = !block; + if (ioctlsocket(fh->u.socket, FIONBIO, &x) != 0) { + _socket_set_errno(WSAGetLastError()); + return false; + } + return true; } else { - hook = reinterpret_cast(malloc(sizeof(*hook))); - if (hook == NULL) - fatal( "could not allocate event hook\n" ); - } - hook->next = NULL; - hook->fh = fh; - hook->wanted = 0; - hook->ready = 0; - hook->h = INVALID_HANDLE_VALUE; - hook->aux = NULL; - - hook->prepare = NULL; - hook->start = NULL; - hook->stop = NULL; - hook->check = NULL; - hook->peek = NULL; - - return hook; -} - -static void -event_hook_free( EventHook hook ) -{ - hook->fh = NULL; - hook->wanted = 0; - hook->ready = 0; - hook->next = _free_hooks; - _free_hooks = hook; -} - - -static void -event_hook_signal( EventHook hook ) -{ - FH f = hook->fh; - int fd = _fh_to_int(f); - fdevent* fde = fd_table[ fd - WIN32_FH_BASE ]; - - if (fde != NULL && fde->fd == fd) { - if ((fde->state & FDE_PENDING) == 0) { - fde->state |= FDE_PENDING; - fdevent_plist_enqueue( fde ); - } - fde->events |= hook->wanted; + errno = ENOTSOCK; + return false; } } - -#define MAX_LOOPER_HANDLES WIN32_MAX_FHS - -typedef struct EventLooperRec_ -{ - EventHook hooks; - HANDLE htab[ MAX_LOOPER_HANDLES ]; - int htab_count; - -} EventLooperRec; - -static EventHook* -event_looper_find_p( EventLooper looper, FH fh ) -{ - EventHook *pnode = &looper->hooks; - EventHook node = *pnode; - for (;;) { - if ( node == NULL || node->fh == fh ) - break; - pnode = &node->next; - node = *pnode; - } - return pnode; -} - -static void -event_looper_hook( EventLooper looper, int fd, int events ) -{ - FH f = _fh_from_int(fd, __func__); - EventHook *pnode; - EventHook node; - - if (f == NULL) /* invalid arg */ { - D("event_looper_hook: invalid fd=%d", fd); - return; - } - - pnode = event_looper_find_p( looper, f ); - node = *pnode; - if ( node == NULL ) { - node = event_hook_alloc( f ); - node->next = *pnode; - *pnode = node; - } - - if ( (node->wanted & events) != events ) { - /* this should update start/stop/check/peek */ - D("event_looper_hook: call hook for %d (new=%x, old=%x)", - fd, node->wanted, events); - f->clazz->_fh_hook( f, events & ~node->wanted, node ); - node->wanted |= events; - } else { - D("event_looper_hook: ignoring events %x for %d wanted=%x)", - events, fd, node->wanted); - } -} - -static void -event_looper_unhook( EventLooper looper, int fd, int events ) -{ - FH fh = _fh_from_int(fd, __func__); - EventHook *pnode = event_looper_find_p( looper, fh ); - EventHook node = *pnode; - - if (node != NULL) { - int events2 = events & node->wanted; - if ( events2 == 0 ) { - D( "event_looper_unhook: events %x not registered for fd %d", events, fd ); - return; - } - node->wanted &= ~events2; - if (!node->wanted) { - *pnode = node->next; - event_hook_free( node ); - } - } -} - -/* - * A fixer for WaitForMultipleObjects on condition that there are more than 64 - * handles to wait on. - * - * In cetain cases DDMS may establish more than 64 connections with ADB. For - * instance, this may happen if there are more than 64 processes running on a - * device, or there are multiple devices connected (including the emulator) with - * the combined number of running processes greater than 64. In this case using - * WaitForMultipleObjects to wait on connection events simply wouldn't cut, - * because of the API limitations (64 handles max). So, we need to provide a way - * to scale WaitForMultipleObjects to accept an arbitrary number of handles. The - * easiest (and "Microsoft recommended") way to do that would be dividing the - * handle array into chunks with the chunk size less than 64, and fire up as many - * waiting threads as there are chunks. Then each thread would wait on a chunk of - * handles, and will report back to the caller which handle has been set. - * Here is the implementation of that algorithm. - */ - -/* Number of handles to wait on in each wating thread. */ -#define WAIT_ALL_CHUNK_SIZE 63 - -/* Descriptor for a wating thread */ -typedef struct WaitForAllParam { - /* A handle to an event to signal when waiting is over. This handle is shared - * accross all the waiting threads, so each waiting thread knows when any - * other thread has exited, so it can exit too. */ - HANDLE main_event; - /* Upon exit from a waiting thread contains the index of the handle that has - * been signaled. The index is an absolute index of the signaled handle in - * the original array. This pointer is shared accross all the waiting threads - * and it's not guaranteed (due to a race condition) that when all the - * waiting threads exit, the value contained here would indicate the first - * handle that was signaled. This is fine, because the caller cares only - * about any handle being signaled. It doesn't care about the order, nor - * about the whole list of handles that were signaled. */ - LONG volatile *signaled_index; - /* Array of handles to wait on in a waiting thread. */ - HANDLE* handles; - /* Number of handles in 'handles' array to wait on. */ - int handles_count; - /* Index inside the main array of the first handle in the 'handles' array. */ - int first_handle_index; - /* Waiting thread handle. */ - HANDLE thread; -} WaitForAllParam; - -/* Waiting thread routine. */ -static unsigned __stdcall -_in_waiter_thread(void* arg) -{ - HANDLE wait_on[WAIT_ALL_CHUNK_SIZE + 1]; - int res; - WaitForAllParam* const param = (WaitForAllParam*)arg; - - /* We have to wait on the main_event in order to be notified when any of the - * sibling threads is exiting. */ - wait_on[0] = param->main_event; - /* The rest of the handles go behind the main event handle. */ - memcpy(wait_on + 1, param->handles, param->handles_count * sizeof(HANDLE)); - - res = WaitForMultipleObjects(param->handles_count + 1, wait_on, FALSE, INFINITE); - if (res > 0 && res < (param->handles_count + 1)) { - /* One of the original handles got signaled. Save its absolute index into - * the output variable. */ - InterlockedCompareExchange(param->signaled_index, - res - 1L + param->first_handle_index, -1L); - } - - /* Notify the caller (and the siblings) that the wait is over. */ - SetEvent(param->main_event); - - _endthreadex(0); - return 0; -} - -/* WaitForMultipeObjects fixer routine. - * Param: - * handles Array of handles to wait on. - * handles_count Number of handles in the array. - * Return: - * (>= 0 && < handles_count) - Index of the signaled handle in the array, or - * WAIT_FAILED on an error. - */ -static int -_wait_for_all(HANDLE* handles, int handles_count) -{ - WaitForAllParam* threads; - HANDLE main_event; - int chunks, chunk, remains; - - /* This variable is going to be accessed by several threads at the same time, - * this is bound to fail randomly when the core is run on multi-core machines. - * To solve this, we need to do the following (1 _and_ 2): - * 1. Use the "volatile" qualifier to ensure the compiler doesn't optimize - * out the reads/writes in this function unexpectedly. - * 2. Ensure correct memory ordering. The "simple" way to do that is to wrap - * all accesses inside a critical section. But we can also use - * InterlockedCompareExchange() which always provide a full memory barrier - * on Win32. - */ - volatile LONG sig_index = -1; - - /* Calculate number of chunks, and allocate thread param array. */ - chunks = handles_count / WAIT_ALL_CHUNK_SIZE; - remains = handles_count % WAIT_ALL_CHUNK_SIZE; - threads = (WaitForAllParam*)malloc((chunks + (remains ? 1 : 0)) * - sizeof(WaitForAllParam)); - if (threads == NULL) { - D("Unable to allocate thread array for %d handles.", handles_count); - return (int)WAIT_FAILED; - } - - /* Create main event to wait on for all waiting threads. This is a "manualy - * reset" event that will remain set once it was set. */ - main_event = CreateEvent(NULL, TRUE, FALSE, NULL); - if (main_event == NULL) { - D("Unable to create main event. Error: %ld", GetLastError()); - free(threads); - return (int)WAIT_FAILED; - } - - /* - * Initialize waiting thread parameters. - */ - - for (chunk = 0; chunk < chunks; chunk++) { - threads[chunk].main_event = main_event; - threads[chunk].signaled_index = &sig_index; - threads[chunk].first_handle_index = WAIT_ALL_CHUNK_SIZE * chunk; - threads[chunk].handles = handles + threads[chunk].first_handle_index; - threads[chunk].handles_count = WAIT_ALL_CHUNK_SIZE; - } - if (remains) { - threads[chunk].main_event = main_event; - threads[chunk].signaled_index = &sig_index; - threads[chunk].first_handle_index = WAIT_ALL_CHUNK_SIZE * chunk; - threads[chunk].handles = handles + threads[chunk].first_handle_index; - threads[chunk].handles_count = remains; - chunks++; - } - - /* Start the waiting threads. */ - for (chunk = 0; chunk < chunks; chunk++) { - /* Note that using adb_thread_create is not appropriate here, since we - * need a handle to wait on for thread termination. */ - threads[chunk].thread = (HANDLE)_beginthreadex(NULL, 0, _in_waiter_thread, - &threads[chunk], 0, NULL); - if (threads[chunk].thread == NULL) { - /* Unable to create a waiter thread. Collapse. */ - D("Unable to create a waiting thread %d of %d. errno=%d", - chunk, chunks, errno); - chunks = chunk; - SetEvent(main_event); - break; - } - } - - /* Wait on any of the threads to get signaled. */ - WaitForSingleObject(main_event, INFINITE); - - /* Wait on all the waiting threads to exit. */ - for (chunk = 0; chunk < chunks; chunk++) { - WaitForSingleObject(threads[chunk].thread, INFINITE); - CloseHandle(threads[chunk].thread); - } - - CloseHandle(main_event); - free(threads); - - - const int ret = (int)InterlockedCompareExchange(&sig_index, -1, -1); - return (ret >= 0) ? ret : (int)WAIT_FAILED; -} - -static EventLooperRec win32_looper; - -static void fdevent_init(void) -{ - win32_looper.htab_count = 0; - win32_looper.hooks = NULL; -} - -static void fdevent_connect(fdevent *fde) -{ - EventLooper looper = &win32_looper; - int events = fde->state & FDE_EVENTMASK; - - if (events != 0) - event_looper_hook( looper, fde->fd, events ); -} - -static void fdevent_disconnect(fdevent *fde) -{ - EventLooper looper = &win32_looper; - int events = fde->state & FDE_EVENTMASK; - - if (events != 0) - event_looper_unhook( looper, fde->fd, events ); -} - -static void fdevent_update(fdevent *fde, unsigned events) -{ - EventLooper looper = &win32_looper; - unsigned events0 = fde->state & FDE_EVENTMASK; - - if (events != events0) { - int removes = events0 & ~events; - int adds = events & ~events0; - if (removes) { - D("fdevent_update: remove %x from %d", removes, fde->fd); - event_looper_unhook( looper, fde->fd, removes ); - } - if (adds) { - D("fdevent_update: add %x to %d", adds, fde->fd); - event_looper_hook ( looper, fde->fd, adds ); - } - } -} - -static void fdevent_process() -{ - EventLooper looper = &win32_looper; - EventHook hook; - int gotone = 0; - - /* if we have at least one ready hook, execute it/them */ - for (hook = looper->hooks; hook; hook = hook->next) { - hook->ready = 0; - if (hook->prepare) { - hook->prepare(hook); - if (hook->ready != 0) { - event_hook_signal( hook ); - gotone = 1; - } - } - } - - /* nothing's ready yet, so wait for something to happen */ - if (!gotone) - { - looper->htab_count = 0; - - for (hook = looper->hooks; hook; hook = hook->next) - { - if (hook->start && !hook->start(hook)) { - D( "fdevent_process: error when starting a hook" ); - return; - } - if (hook->h != INVALID_HANDLE_VALUE) { - int nn; - - for (nn = 0; nn < looper->htab_count; nn++) - { - if ( looper->htab[nn] == hook->h ) - goto DontAdd; - } - looper->htab[ looper->htab_count++ ] = hook->h; - DontAdd: - ; - } - } - - if (looper->htab_count == 0) { - D( "fdevent_process: nothing to wait for !!" ); - return; - } - - do - { - int wait_ret; - - D( "adb_win32: waiting for %d events", looper->htab_count ); - if (looper->htab_count > MAXIMUM_WAIT_OBJECTS) { - D("handle count %d exceeds MAXIMUM_WAIT_OBJECTS.", looper->htab_count); - wait_ret = _wait_for_all(looper->htab, looper->htab_count); - } else { - wait_ret = WaitForMultipleObjects( looper->htab_count, looper->htab, FALSE, INFINITE ); - } - if (wait_ret == (int)WAIT_FAILED) { - D( "adb_win32: wait failed, error %ld", GetLastError() ); - } else { - D( "adb_win32: got one (index %d)", wait_ret ); - - /* according to Cygwin, some objects like consoles wake up on "inappropriate" events - * like mouse movements. we need to filter these with the "check" function - */ - if ((unsigned)wait_ret < (unsigned)looper->htab_count) - { - for (hook = looper->hooks; hook; hook = hook->next) - { - if ( looper->htab[wait_ret] == hook->h && - (!hook->check || hook->check(hook)) ) - { - D( "adb_win32: signaling %s for %x", hook->fh->name, hook->ready ); - event_hook_signal( hook ); - gotone = 1; - break; - } - } - } - } - } - while (!gotone); - - for (hook = looper->hooks; hook; hook = hook->next) { - if (hook->stop) - hook->stop( hook ); - } - } - - for (hook = looper->hooks; hook; hook = hook->next) { - if (hook->peek && hook->peek(hook)) - event_hook_signal( hook ); - } -} - - -static void fdevent_register(fdevent *fde) -{ - int fd = fde->fd - WIN32_FH_BASE; - - if(fd < 0) { - FATAL("bogus negative fd (%d)\n", fde->fd); - } - - if(fd >= fd_table_max) { - int oldmax = fd_table_max; - if(fde->fd > 32000) { - FATAL("bogus huuuuge fd (%d)\n", fde->fd); - } - if(fd_table_max == 0) { - fdevent_init(); - fd_table_max = 256; - } - while(fd_table_max <= fd) { - fd_table_max *= 2; - } - fd_table = reinterpret_cast(realloc(fd_table, sizeof(fdevent*) * fd_table_max)); - if(fd_table == 0) { - FATAL("could not expand fd_table to %d entries\n", fd_table_max); - } - memset(fd_table + oldmax, 0, sizeof(int) * (fd_table_max - oldmax)); - } - - fd_table[fd] = fde; -} - -static void fdevent_unregister(fdevent *fde) -{ - int fd = fde->fd - WIN32_FH_BASE; - - if((fd < 0) || (fd >= fd_table_max)) { - FATAL("fd out of range (%d)\n", fde->fd); - } - - if(fd_table[fd] != fde) { - FATAL("fd_table out of sync"); - } - - fd_table[fd] = 0; - - if(!(fde->state & FDE_DONT_CLOSE)) { - dump_fde(fde, "close"); - adb_close(fde->fd); - } -} - -static void fdevent_plist_enqueue(fdevent *node) -{ - fdevent *list = &list_pending; - - node->next = list; - node->prev = list->prev; - node->prev->next = node; - list->prev = node; -} - -static void fdevent_plist_remove(fdevent *node) -{ - node->prev->next = node->next; - node->next->prev = node->prev; - node->next = 0; - node->prev = 0; -} - -static fdevent *fdevent_plist_dequeue(void) -{ - fdevent *list = &list_pending; - fdevent *node = list->next; - - if(node == list) return 0; - - list->next = node->next; - list->next->prev = list; - node->next = 0; - node->prev = 0; - - return node; -} - -fdevent *fdevent_create(int fd, fd_func func, void *arg) -{ - fdevent *fde = (fdevent*) malloc(sizeof(fdevent)); - if(fde == 0) return 0; - fdevent_install(fde, fd, func, arg); - fde->state |= FDE_CREATED; - return fde; -} - -void fdevent_destroy(fdevent *fde) -{ - if(fde == 0) return; - if(!(fde->state & FDE_CREATED)) { - FATAL("fde %p not created by fdevent_create()\n", fde); - } - fdevent_remove(fde); -} - -void fdevent_install(fdevent *fde, int fd, fd_func func, void *arg) -{ - memset(fde, 0, sizeof(fdevent)); - fde->state = FDE_ACTIVE; - fde->fd = fd; - fde->func = func; - fde->arg = arg; - - fdevent_register(fde); - dump_fde(fde, "connect"); - fdevent_connect(fde); - fde->state |= FDE_ACTIVE; -} - -void fdevent_remove(fdevent *fde) -{ - if(fde->state & FDE_PENDING) { - fdevent_plist_remove(fde); - } - - if(fde->state & FDE_ACTIVE) { - fdevent_disconnect(fde); - dump_fde(fde, "disconnect"); - fdevent_unregister(fde); - } - - fde->state = 0; - fde->events = 0; -} - - -void fdevent_set(fdevent *fde, unsigned events) -{ - events &= FDE_EVENTMASK; - - if((fde->state & FDE_EVENTMASK) == (int)events) return; - - if(fde->state & FDE_ACTIVE) { - fdevent_update(fde, events); - dump_fde(fde, "update"); - } - - fde->state = (fde->state & FDE_STATEMASK) | events; - - if(fde->state & FDE_PENDING) { - /* if we're pending, make sure - ** we don't signal an event that - ** is no longer wanted. - */ - fde->events &= (~events); - if(fde->events == 0) { - fdevent_plist_remove(fde); - fde->state &= (~FDE_PENDING); - } - } -} - -void fdevent_add(fdevent *fde, unsigned events) -{ - fdevent_set( - fde, (fde->state & FDE_EVENTMASK) | (events & FDE_EVENTMASK)); -} - -void fdevent_del(fdevent *fde, unsigned events) -{ - fdevent_set( - fde, (fde->state & FDE_EVENTMASK) & (~(events & FDE_EVENTMASK))); -} - -void fdevent_loop() -{ - fdevent *fde; - - for(;;) { -#if DEBUG - fprintf(stderr,"--- ---- waiting for events\n"); -#endif - fdevent_process(); - - while((fde = fdevent_plist_dequeue())) { - unsigned events = fde->events; - fde->events = 0; - fde->state &= (~FDE_PENDING); - dump_fde(fde, "callback"); - fde->func(fde->fd, events, fde->arg); - } - } -} - -/** FILE EVENT HOOKS - **/ - -static void _event_file_prepare( EventHook hook ) -{ - if (hook->wanted & (FDE_READ|FDE_WRITE)) { - /* we can always read/write */ - hook->ready |= hook->wanted & (FDE_READ|FDE_WRITE); - } -} - -static int _event_file_peek( EventHook hook ) -{ - return (hook->wanted & (FDE_READ|FDE_WRITE)); -} - -static void _fh_file_hook( FH f, int events, EventHook hook ) -{ - hook->h = f->fh_handle; - hook->prepare = _event_file_prepare; - hook->peek = _event_file_peek; -} - -/** SOCKET EVENT HOOKS - **/ - -static void _event_socket_verify( EventHook hook, WSANETWORKEVENTS* evts ) -{ - if ( evts->lNetworkEvents & (FD_READ|FD_ACCEPT|FD_CLOSE) ) { - if (hook->wanted & FDE_READ) - hook->ready |= FDE_READ; - if ((evts->iErrorCode[FD_READ] != 0) && hook->wanted & FDE_ERROR) - hook->ready |= FDE_ERROR; - } - if ( evts->lNetworkEvents & (FD_WRITE|FD_CONNECT|FD_CLOSE) ) { - if (hook->wanted & FDE_WRITE) - hook->ready |= FDE_WRITE; - if ((evts->iErrorCode[FD_WRITE] != 0) && hook->wanted & FDE_ERROR) - hook->ready |= FDE_ERROR; - } - if ( evts->lNetworkEvents & FD_OOB ) { - if (hook->wanted & FDE_ERROR) - hook->ready |= FDE_ERROR; - } -} - -static void _event_socket_prepare( EventHook hook ) -{ - WSANETWORKEVENTS evts; - - /* look if some of the events we want already happened ? */ - if (!WSAEnumNetworkEvents( hook->fh->fh_socket, NULL, &evts )) - _event_socket_verify( hook, &evts ); -} - -static int _socket_wanted_to_flags( int wanted ) -{ - int flags = 0; - if (wanted & FDE_READ) - flags |= FD_READ | FD_ACCEPT | FD_CLOSE; - - if (wanted & FDE_WRITE) - flags |= FD_WRITE | FD_CONNECT | FD_CLOSE; - - if (wanted & FDE_ERROR) - flags |= FD_OOB; - - return flags; -} - -static int _event_socket_start( EventHook hook ) -{ - /* create an event which we're going to wait for */ - FH fh = hook->fh; - long flags = _socket_wanted_to_flags( hook->wanted ); - - hook->h = fh->event; - if (hook->h == INVALID_HANDLE_VALUE) { - D( "_event_socket_start: no event for %s", fh->name ); - return 0; - } - - if ( flags != fh->mask ) { - D( "_event_socket_start: hooking %s for %x (flags %ld)", hook->fh->name, hook->wanted, flags ); - if ( WSAEventSelect( fh->fh_socket, hook->h, flags ) ) { - D( "_event_socket_start: WSAEventSelect() for %s failed, error %d", hook->fh->name, WSAGetLastError() ); - CloseHandle( hook->h ); - hook->h = INVALID_HANDLE_VALUE; - exit(1); - return 0; - } - fh->mask = flags; - } - return 1; -} - -static void _event_socket_stop( EventHook hook ) -{ - hook->h = INVALID_HANDLE_VALUE; -} - -static int _event_socket_check( EventHook hook ) -{ - int result = 0; - FH fh = hook->fh; - WSANETWORKEVENTS evts; - - if (!WSAEnumNetworkEvents( fh->fh_socket, hook->h, &evts ) ) { - _event_socket_verify( hook, &evts ); - result = (hook->ready != 0); - if (result) { - ResetEvent( hook->h ); - } - } - D( "_event_socket_check %s returns %d", fh->name, result ); - return result; -} - -static int _event_socket_peek( EventHook hook ) -{ - WSANETWORKEVENTS evts; - FH fh = hook->fh; - - /* look if some of the events we want already happened ? */ - if (!WSAEnumNetworkEvents( fh->fh_socket, NULL, &evts )) { - _event_socket_verify( hook, &evts ); - if (hook->ready) - ResetEvent( hook->h ); - } - - return hook->ready != 0; -} - - - -static void _fh_socket_hook( FH f, int events, EventHook hook ) -{ - hook->prepare = _event_socket_prepare; - hook->start = _event_socket_start; - hook->stop = _event_socket_stop; - hook->check = _event_socket_check; - hook->peek = _event_socket_peek; - - // TODO: check return value? - _event_socket_start( hook ); -} - -/** SOCKETPAIR EVENT HOOKS - **/ - -static void _event_socketpair_prepare( EventHook hook ) -{ - FH fh = hook->fh; - SocketPair pair = fh->fh_pair; - BipBuffer rbip = (pair->a_fd == fh) ? &pair->b2a_bip : &pair->a2b_bip; - BipBuffer wbip = (pair->a_fd == fh) ? &pair->a2b_bip : &pair->b2a_bip; - - if (hook->wanted & FDE_READ && rbip->can_read) - hook->ready |= FDE_READ; - - if (hook->wanted & FDE_WRITE && wbip->can_write) - hook->ready |= FDE_WRITE; - } - - static int _event_socketpair_start( EventHook hook ) - { - FH fh = hook->fh; - SocketPair pair = fh->fh_pair; - BipBuffer rbip = (pair->a_fd == fh) ? &pair->b2a_bip : &pair->a2b_bip; - BipBuffer wbip = (pair->a_fd == fh) ? &pair->a2b_bip : &pair->b2a_bip; - - if (hook->wanted == FDE_READ) - hook->h = rbip->evt_read; - - else if (hook->wanted == FDE_WRITE) - hook->h = wbip->evt_write; - - else { - D("_event_socketpair_start: can't handle FDE_READ+FDE_WRITE" ); - return 0; - } - D( "_event_socketpair_start: hook %s for %x wanted=%x", - hook->fh->name, _fh_to_int(fh), hook->wanted); - return 1; -} - -static int _event_socketpair_peek( EventHook hook ) -{ - _event_socketpair_prepare( hook ); - return hook->ready != 0; -} - -static void _fh_socketpair_hook( FH fh, int events, EventHook hook ) -{ - hook->prepare = _event_socketpair_prepare; - hook->start = _event_socketpair_start; - hook->peek = _event_socketpair_peek; -} - - static adb_mutex_t g_console_output_buffer_lock; void