diff --git a/adb/Android.bp b/adb/Android.bp index 06cfcbfa4..57872b0c9 100644 --- a/adb/Android.bp +++ b/adb/Android.bp @@ -142,6 +142,10 @@ libadb_posix_srcs = [ "sysdeps/posix/network.cpp", ] +libadb_linux_srcs = [ + "fdevent/fdevent_epoll.cpp", +] + libadb_test_srcs = [ "adb_io_test.cpp", "adb_listeners_test.cpp", @@ -170,12 +174,11 @@ cc_library_host_static { target: { linux: { - srcs: ["client/usb_linux.cpp"], + srcs: ["client/usb_linux.cpp"] + libadb_linux_srcs, }, darwin: { srcs: ["client/usb_osx.cpp"], }, - not_windows: { srcs: libadb_posix_srcs, }, @@ -342,7 +345,7 @@ cc_library_static { // libminadbd wants both, as it's used to build native tests. compile_multilib: "both", - srcs: libadb_srcs + libadb_posix_srcs + [ + srcs: libadb_srcs + libadb_linux_srcs + libadb_posix_srcs + [ "daemon/auth.cpp", "daemon/jdwp_service.cpp", ], diff --git a/adb/fdevent/fdevent.cpp b/adb/fdevent/fdevent.cpp index 28b8f37ce..562f5872d 100644 --- a/adb/fdevent/fdevent.cpp +++ b/adb/fdevent/fdevent.cpp @@ -26,16 +26,24 @@ #include "adb_utils.h" #include "fdevent.h" +#include "fdevent_epoll.h" #include "fdevent_poll.h" +using namespace std::chrono_literals; +using std::chrono::duration_cast; + +void invoke_fde(struct fdevent* fde, unsigned events) { + if (auto f = std::get_if(&fde->func)) { + (*f)(fde->fd.get(), events, fde->arg); + } else if (auto f = std::get_if(&fde->func)) { + (*f)(fde, events, fde->arg); + } else { + __builtin_unreachable(); + } +} + std::string dump_fde(const fdevent* fde) { std::string state; - if (fde->state & FDE_ACTIVE) { - state += "A"; - } - if (fde->state & FDE_PENDING) { - state += "P"; - } if (fde->state & FDE_READ) { state += "R"; } @@ -53,9 +61,11 @@ fdevent* fdevent_context::Create(unique_fd fd, std::variant f CheckMainThread(); CHECK_GE(fd.get(), 0); + int fd_num = fd.get(); + fdevent* fde = new fdevent(); fde->id = fdevent_id_++; - fde->state = FDE_ACTIVE; + fde->state = 0; fde->fd = std::move(fd); fde->func = func; fde->arg = arg; @@ -66,6 +76,10 @@ fdevent* fdevent_context::Create(unique_fd fd, std::variant f LOG(ERROR) << "failed to set non-blocking mode for fd " << fde->fd.get(); } + auto [it, inserted] = this->installed_fdevents_.emplace(fd_num, fde); + CHECK(inserted); + UNUSED(it); + this->Register(fde); return fde; } @@ -78,18 +92,22 @@ unique_fd fdevent_context::Destroy(fdevent* fde) { this->Unregister(fde); + auto erased = this->installed_fdevents_.erase(fde->fd.get()); + CHECK_EQ(1UL, erased); + unique_fd result = std::move(fde->fd); delete fde; return result; } void fdevent_context::Add(fdevent* fde, unsigned events) { - Set(fde, (fde->state & FDE_EVENTMASK) | events); + CHECK(!(events & FDE_TIMEOUT)); + Set(fde, fde->state | events); } void fdevent_context::Del(fdevent* fde, unsigned events) { CHECK(!(events & FDE_TIMEOUT)); - Set(fde, (fde->state & FDE_EVENTMASK) & ~events); + Set(fde, fde->state & ~events); } void fdevent_context::SetTimeout(fdevent* fde, std::optional timeout) { @@ -98,6 +116,56 @@ void fdevent_context::SetTimeout(fdevent* fde, std::optionallast_active = std::chrono::steady_clock::now(); } +std::optional fdevent_context::CalculatePollDuration() { + std::optional result = std::nullopt; + auto now = std::chrono::steady_clock::now(); + CheckMainThread(); + + for (const auto& [fd, fde] : this->installed_fdevents_) { + UNUSED(fd); + auto timeout_opt = fde->timeout; + if (timeout_opt) { + auto deadline = fde->last_active + *timeout_opt; + auto time_left = duration_cast(deadline - now); + if (time_left < 0ms) { + time_left = 0ms; + } + + if (!result) { + result = time_left; + } else { + result = std::min(*result, time_left); + } + } + } + + return result; +} + +void fdevent_context::HandleEvents(const std::vector& events) { + for (const auto& event : events) { + invoke_fde(event.fde, event.events); + } + FlushRunQueue(); +} + +void fdevent_context::FlushRunQueue() { + // We need to be careful around reentrancy here, since a function we call can queue up another + // function. + while (true) { + std::function fn; + { + std::lock_guard lock(this->run_queue_mutex_); + if (this->run_queue_.empty()) { + break; + } + fn = std::move(this->run_queue_.front()); + this->run_queue_.pop_front(); + } + fn(); + } +} + void fdevent_context::CheckMainThread() { if (main_thread_id_) { CHECK_EQ(*main_thread_id_, android::base::GetThreadId()); @@ -118,25 +186,16 @@ void fdevent_context::TerminateLoop() { Interrupt(); } -void fdevent_context::FlushRunQueue() { - // We need to be careful around reentrancy here, since a function we call can queue up another - // function. - while (true) { - std::function fn; - { - std::lock_guard lock(this->run_queue_mutex_); - if (this->run_queue_.empty()) { - break; - } - fn = this->run_queue_.front(); - this->run_queue_.pop_front(); - } - fn(); - } +static std::unique_ptr fdevent_create_context() { +#if defined(__linux__) + return std::make_unique(); +#else + return std::make_unique(); +#endif } static auto& g_ambient_fdevent_context = - *new std::unique_ptr(new fdevent_context_poll()); + *new std::unique_ptr(fdevent_create_context()); static fdevent_context* fdevent_get_ambient() { return g_ambient_fdevent_context.get(); @@ -197,5 +256,5 @@ size_t fdevent_installed_count() { } void fdevent_reset() { - g_ambient_fdevent_context.reset(new fdevent_context_poll()); + g_ambient_fdevent_context = fdevent_create_context(); } diff --git a/adb/fdevent/fdevent.h b/adb/fdevent/fdevent.h index 2424252f5..86814d7c5 100644 --- a/adb/fdevent/fdevent.h +++ b/adb/fdevent/fdevent.h @@ -26,6 +26,7 @@ #include #include #include +#include #include #include @@ -38,19 +39,19 @@ #define FDE_ERROR 0x0004 #define FDE_TIMEOUT 0x0008 -// Internal states. -#define FDE_EVENTMASK 0x00ff -#define FDE_STATEMASK 0xff00 - -#define FDE_ACTIVE 0x0100 -#define FDE_PENDING 0x0200 +struct fdevent; typedef void (*fd_func)(int fd, unsigned events, void *userdata); typedef void (*fd_func2)(struct fdevent* fde, unsigned events, void* userdata); -struct fdevent; +void invoke_fde(struct fdevent* fde, unsigned events); std::string dump_fde(const fdevent* fde); +struct fdevent_event { + fdevent* fde; + unsigned events; +}; + struct fdevent_context { public: virtual ~fdevent_context() = default; @@ -59,14 +60,13 @@ struct fdevent_context { fdevent* Create(unique_fd fd, std::variant func, void* arg); // Deallocate an fdevent object, returning the file descriptor that was owned by it. + // Note that this calls Set, which is a virtual method, so destructors that call this must be + // final. unique_fd Destroy(fdevent* fde); protected: - // Register an fdevent that is being created by Create with the fdevent_context. - virtual void Register(fdevent* fde) = 0; - - // Unregister an fdevent that is being destroyed by Destroy with the fdevent_context. - virtual void Unregister(fdevent* fde) = 0; + virtual void Register(fdevent*) {} + virtual void Unregister(fdevent*) {} public: // Change which events should cause notifications. @@ -80,6 +80,15 @@ struct fdevent_context { // trigger repeatedly every |timeout| ms. void SetTimeout(fdevent* fde, std::optional timeout); + protected: + std::optional CalculatePollDuration(); + void HandleEvents(const std::vector& events); + + private: + // Run all pending functions enqueued via Run(). + void FlushRunQueue() EXCLUDES(run_queue_mutex_); + + public: // Loop until TerminateLoop is called, handling events. // Implementations should call FlushRunQueue on every iteration, and check the value of // terminate_loop_ to determine whether to stop. @@ -100,12 +109,12 @@ struct fdevent_context { // Interrupt the run loop. virtual void Interrupt() = 0; - // Run all pending functions enqueued via Run(). - void FlushRunQueue() EXCLUDES(run_queue_mutex_); - std::optional main_thread_id_ = std::nullopt; std::atomic terminate_loop_ = false; + protected: + std::unordered_map installed_fdevents_; + private: uint64_t fdevent_id_ = 0; std::mutex run_queue_mutex_; @@ -119,7 +128,6 @@ struct fdevent { int force_eof = 0; uint16_t state = 0; - uint16_t events = 0; std::optional timeout; std::chrono::steady_clock::time_point last_active; diff --git a/adb/fdevent/fdevent_epoll.cpp b/adb/fdevent/fdevent_epoll.cpp new file mode 100644 index 000000000..e3d167424 --- /dev/null +++ b/adb/fdevent/fdevent_epoll.cpp @@ -0,0 +1,200 @@ +/* + * Copyright (C) 2019 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "fdevent_epoll.h" + +#if defined(__linux__) + +#include +#include + +#include +#include + +#include "adb_unique_fd.h" +#include "fdevent.h" + +static void fdevent_interrupt(int fd, unsigned, void*) { + uint64_t buf; + ssize_t rc = TEMP_FAILURE_RETRY(adb_read(fd, &buf, sizeof(buf))); + if (rc == -1) { + PLOG(FATAL) << "failed to read from fdevent interrupt fd"; + } +} + +fdevent_context_epoll::fdevent_context_epoll() { + epoll_fd_.reset(epoll_create1(EPOLL_CLOEXEC)); + + unique_fd interrupt_fd(eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK)); + if (interrupt_fd == -1) { + PLOG(FATAL) << "failed to create fdevent interrupt eventfd"; + } + + unique_fd interrupt_fd_dup(fcntl(interrupt_fd.get(), F_DUPFD_CLOEXEC, 3)); + if (interrupt_fd_dup == -1) { + PLOG(FATAL) << "failed to dup fdevent interrupt eventfd"; + } + + this->interrupt_fd_ = std::move(interrupt_fd_dup); + fdevent* fde = this->Create(std::move(interrupt_fd), fdevent_interrupt, nullptr); + CHECK(fde != nullptr); + this->Add(fde, FDE_READ); +} + +fdevent_context_epoll::~fdevent_context_epoll() { + // Destroy calls virtual methods, but this class is final, so that's okay. + this->Destroy(this->interrupt_fde_); +} + +static epoll_event calculate_epoll_event(fdevent* fde) { + epoll_event result; + result.events = 0; + if (fde->state & FDE_READ) { + result.events |= EPOLLIN; + } + if (fde->state & FDE_WRITE) { + result.events |= EPOLLOUT; + } + if (fde->state & FDE_ERROR) { + result.events |= EPOLLERR; + } + result.events |= EPOLLRDHUP; + result.data.ptr = fde; + return result; +} + +void fdevent_context_epoll::Register(fdevent* fde) { + epoll_event ev = calculate_epoll_event(fde); + if (epoll_ctl(epoll_fd_.get(), EPOLL_CTL_ADD, fde->fd.get(), &ev) != 0) { + PLOG(FATAL) << "failed to register fd " << fde->fd.get() << " with epoll"; + } +} + +void fdevent_context_epoll::Unregister(fdevent* fde) { + if (epoll_ctl(epoll_fd_.get(), EPOLL_CTL_DEL, fde->fd.get(), nullptr) != 0) { + PLOG(FATAL) << "failed to unregister fd " << fde->fd.get() << " with epoll"; + } +} + +void fdevent_context_epoll::Set(fdevent* fde, unsigned events) { + unsigned previous_state = fde->state; + fde->state = events; + + // If the state is the same, or only differed by FDE_TIMEOUT, we don't need to modify epoll. + if ((previous_state & ~FDE_TIMEOUT) == (events & ~FDE_TIMEOUT)) { + return; + } + + epoll_event ev = calculate_epoll_event(fde); + if (epoll_ctl(epoll_fd_.get(), EPOLL_CTL_MOD, fde->fd.get(), &ev) != 0) { + PLOG(FATAL) << "failed to modify fd " << fde->fd.get() << " with epoll"; + } +} + +void fdevent_context_epoll::Loop() { + main_thread_id_ = android::base::GetThreadId(); + + std::vector fde_events; + std::vector epoll_events; + epoll_events.resize(this->installed_fdevents_.size()); + + while (true) { + if (terminate_loop_) { + break; + } + + int rc = -1; + while (rc == -1) { + std::optional timeout = CalculatePollDuration(); + int timeout_ms; + if (!timeout) { + timeout_ms = -1; + } else { + timeout_ms = timeout->count(); + } + + rc = epoll_wait(epoll_fd_.get(), epoll_events.data(), epoll_events.size(), timeout_ms); + if (rc == -1 && errno != EINTR) { + PLOG(FATAL) << "epoll_wait failed"; + } + } + + auto post_poll = std::chrono::steady_clock::now(); + std::unordered_map event_map; + for (int i = 0; i < rc; ++i) { + fdevent* fde = static_cast(epoll_events[i].data.ptr); + + unsigned events = 0; + if (epoll_events[i].events & EPOLLIN) { + CHECK(fde->state & FDE_READ); + events |= FDE_READ; + } + if (epoll_events[i].events & EPOLLOUT) { + CHECK(fde->state & FDE_WRITE); + events |= FDE_WRITE; + } + if (epoll_events[i].events & (EPOLLERR | EPOLLHUP | EPOLLRDHUP)) { + // We fake a read, as the rest of the code assumes that errors will + // be detected at that point. + events |= FDE_READ | FDE_ERROR; + } + + event_map[fde] = events; + } + + for (const auto& [fd, fde] : installed_fdevents_) { + unsigned events = 0; + if (auto it = event_map.find(fde); it != event_map.end()) { + events = it->second; + } + + if (events == 0) { + if (fde->timeout) { + auto deadline = fde->last_active + *fde->timeout; + if (deadline < post_poll) { + events |= FDE_TIMEOUT; + } + } + } + + if (events != 0) { + LOG(DEBUG) << dump_fde(fde) << " got events " << std::hex << std::showbase + << events; + fde_events.push_back({fde, events}); + fde->last_active = post_poll; + } + } + this->HandleEvents(std::move(fde_events)); + fde_events.clear(); + } + + main_thread_id_.reset(); +} + +size_t fdevent_context_epoll::InstalledCount() { + // We always have an installed fde for interrupt. + return this->installed_fdevents_.size() - 1; +} + +void fdevent_context_epoll::Interrupt() { + uint64_t i = 1; + ssize_t rc = TEMP_FAILURE_RETRY(adb_write(this->interrupt_fd_, &i, sizeof(i))); + if (rc != sizeof(i)) { + PLOG(FATAL) << "failed to write to fdevent interrupt eventfd"; + } +} + +#endif // defined(__linux__) diff --git a/adb/fdevent/fdevent_epoll.h b/adb/fdevent/fdevent_epoll.h new file mode 100644 index 000000000..684fa32bc --- /dev/null +++ b/adb/fdevent/fdevent_epoll.h @@ -0,0 +1,61 @@ +#pragma once + +/* + * Copyright (C) 2019 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#if defined(__linux__) + +#include "sysdeps.h" + +#include + +#include +#include +#include +#include + +#include + +#include "adb_unique_fd.h" +#include "fdevent.h" + +struct fdevent_context_epoll final : public fdevent_context { + fdevent_context_epoll(); + virtual ~fdevent_context_epoll(); + + virtual void Register(fdevent* fde) final; + virtual void Unregister(fdevent* fde) final; + + virtual void Set(fdevent* fde, unsigned events) final; + + virtual void Loop() final; + size_t InstalledCount() final; + + protected: + virtual void Interrupt() final; + + public: + // All operations to fdevent should happen only in the main thread. + // That's why we don't need a lock for fdevent. + std::unordered_map epoll_node_map_; + std::list pending_list_; + + unique_fd epoll_fd_; + unique_fd interrupt_fd_; + fdevent* interrupt_fde_ = nullptr; +}; + +#endif // defined(__linux__) diff --git a/adb/fdevent/fdevent_poll.cpp b/adb/fdevent/fdevent_poll.cpp index 75ea08178..cc4a7a151 100644 --- a/adb/fdevent/fdevent_poll.cpp +++ b/adb/fdevent/fdevent_poll.cpp @@ -75,60 +75,14 @@ fdevent_context_poll::fdevent_context_poll() { } fdevent_context_poll::~fdevent_context_poll() { + // Destroy calls virtual methods, but this class is final, so that's okay. this->Destroy(this->interrupt_fde_); } -void fdevent_context_poll::Register(fdevent* fde) { - auto pair = poll_node_map_.emplace(fde->fd.get(), PollNode(fde)); - CHECK(pair.second) << "install existing fd " << fde->fd.get(); -} - -void fdevent_context_poll::Unregister(fdevent* fde) { - if (fde->state & FDE_ACTIVE) { - poll_node_map_.erase(fde->fd.get()); - - if (fde->state & FDE_PENDING) { - pending_list_.remove(fde); - } - fde->state = 0; - fde->events = 0; - } -} - void fdevent_context_poll::Set(fdevent* fde, unsigned events) { CheckMainThread(); - events &= FDE_EVENTMASK; - if ((fde->state & FDE_EVENTMASK) == events) { - return; - } - CHECK(fde->state & FDE_ACTIVE); - - auto it = poll_node_map_.find(fde->fd.get()); - CHECK(it != poll_node_map_.end()); - PollNode& node = it->second; - if (events & FDE_READ) { - node.pollfd.events |= POLLIN; - } else { - node.pollfd.events &= ~POLLIN; - } - - if (events & FDE_WRITE) { - node.pollfd.events |= POLLOUT; - } else { - node.pollfd.events &= ~POLLOUT; - } - fde->state = (fde->state & FDE_STATEMASK) | events; - + fde->state = events; D("fdevent_set: %s, events = %u", dump_fde(fde).c_str(), events); - - if (fde->state & FDE_PENDING) { - // If we are pending, make sure we don't signal an event that is no longer wanted. - fde->events &= events; - if (fde->events == 0) { - pending_list_.remove(fde); - fde->state &= ~FDE_PENDING; - } - } } static std::string dump_pollfds(const std::vector& pollfds) { @@ -146,204 +100,94 @@ static std::string dump_pollfds(const std::vector& pollfds) { return result; } -static std::optional calculate_timeout(fdevent_context_poll* ctx) { - std::optional result = std::nullopt; - auto now = std::chrono::steady_clock::now(); - ctx->CheckMainThread(); - - for (const auto& [fd, pollnode] : ctx->poll_node_map_) { - UNUSED(fd); - auto timeout_opt = pollnode.fde->timeout; - if (timeout_opt) { - auto deadline = pollnode.fde->last_active + *timeout_opt; - auto time_left = std::chrono::duration_cast(deadline - now); - if (time_left < std::chrono::milliseconds::zero()) { - time_left = std::chrono::milliseconds::zero(); - } - - if (!result) { - result = time_left; - } else { - result = std::min(*result, time_left); - } - } - } - - return result; -} - -static void fdevent_process(fdevent_context_poll* ctx) { - std::vector pollfds; - for (const auto& pair : ctx->poll_node_map_) { - pollfds.push_back(pair.second.pollfd); - } - CHECK_GT(pollfds.size(), 0u); - D("poll(), pollfds = %s", dump_pollfds(pollfds).c_str()); - - auto timeout = calculate_timeout(ctx); - int timeout_ms; - if (!timeout) { - timeout_ms = -1; - } else { - timeout_ms = timeout->count(); - } - - int ret = adb_poll(&pollfds[0], pollfds.size(), timeout_ms); - if (ret == -1) { - PLOG(ERROR) << "poll(), ret = " << ret; - return; - } - - auto post_poll = std::chrono::steady_clock::now(); - - for (const auto& pollfd : pollfds) { - if (pollfd.revents != 0) { - D("for fd %d, revents = %x", pollfd.fd, pollfd.revents); - } - unsigned events = 0; - if (pollfd.revents & POLLIN) { - events |= FDE_READ; - } - if (pollfd.revents & POLLOUT) { - events |= FDE_WRITE; - } - if (pollfd.revents & (POLLERR | POLLHUP | POLLNVAL)) { - // We fake a read, as the rest of the code assumes that errors will - // be detected at that point. - events |= FDE_READ | FDE_ERROR; - } -#if defined(__linux__) - if (pollfd.revents & POLLRDHUP) { - events |= FDE_READ | FDE_ERROR; - } -#endif - auto it = ctx->poll_node_map_.find(pollfd.fd); - CHECK(it != ctx->poll_node_map_.end()); - fdevent* fde = it->second.fde; - - if (events == 0) { - // Check for timeout. - if (fde->timeout) { - auto deadline = fde->last_active + *fde->timeout; - if (deadline < post_poll) { - events |= FDE_TIMEOUT; - } - } - } - - if (events != 0) { - CHECK_EQ(fde->fd.get(), pollfd.fd); - fde->events |= events; - fde->last_active = post_poll; - D("%s got events %x", dump_fde(fde).c_str(), events); - fde->state |= FDE_PENDING; - ctx->pending_list_.push_back(fde); - } - } -} - -template -struct always_false : std::false_type {}; - -static void fdevent_call_fdfunc(fdevent* fde) { - unsigned events = fde->events; - fde->events = 0; - CHECK(fde->state & FDE_PENDING); - fde->state &= (~FDE_PENDING); - D("fdevent_call_fdfunc %s", dump_fde(fde).c_str()); - std::visit( - [&](auto&& f) { - using F = std::decay_t; - if constexpr (std::is_same_v) { - f(fde->fd.get(), events, fde->arg); - } else if constexpr (std::is_same_v) { - f(fde, events, fde->arg); - } else { - static_assert(always_false::value, "non-exhaustive visitor"); - } - }, - fde->func); -} - -static void fdevent_check_spin(fdevent_context_poll* ctx, uint64_t cycle) { - // Check to see if we're spinning because we forgot about an fdevent - // by keeping track of how long fdevents have been continuously pending. - struct SpinCheck { - fdevent* fde; - android::base::boot_clock::time_point timestamp; - uint64_t cycle; - }; - - // TODO: Move this into the base fdevent_context. - static auto& g_continuously_pending = *new std::unordered_map(); - static auto last_cycle = android::base::boot_clock::now(); - - auto now = android::base::boot_clock::now(); - if (now - last_cycle > 10ms) { - // We're not spinning. - g_continuously_pending.clear(); - last_cycle = now; - return; - } - last_cycle = now; - - for (auto* fde : ctx->pending_list_) { - auto it = g_continuously_pending.find(fde->id); - if (it == g_continuously_pending.end()) { - g_continuously_pending[fde->id] = - SpinCheck{.fde = fde, .timestamp = now, .cycle = cycle}; - } else { - it->second.cycle = cycle; - } - } - - for (auto it = g_continuously_pending.begin(); it != g_continuously_pending.end();) { - if (it->second.cycle != cycle) { - it = g_continuously_pending.erase(it); - } else { - // Use an absurdly long window, since all we really care about is - // getting a bugreport eventually. - if (now - it->second.timestamp > 300s) { - LOG(FATAL_WITHOUT_ABORT) - << "detected spin in fdevent: " << dump_fde(it->second.fde); -#if defined(__linux__) - int fd = it->second.fde->fd.get(); - std::string fd_path = android::base::StringPrintf("/proc/self/fd/%d", fd); - std::string path; - if (!android::base::Readlink(fd_path, &path)) { - PLOG(FATAL_WITHOUT_ABORT) << "readlink of fd " << fd << " failed"; - } - LOG(FATAL_WITHOUT_ABORT) << "fd " << fd << " = " << path; -#endif - abort(); - } - ++it; - } - } -} - void fdevent_context_poll::Loop() { main_thread_id_ = android::base::GetThreadId(); - uint64_t cycle = 0; while (true) { if (terminate_loop_) { break; } D("--- --- waiting for events"); + std::vector pollfds; + for (const auto& [fd, fde] : this->installed_fdevents_) { + adb_pollfd pfd; + pfd.fd = fd; + pfd.events = 0; + if (fde->state & FDE_READ) { + pfd.events |= POLLIN; + } + if (fde->state & FDE_WRITE) { + pfd.events |= POLLOUT; + } + if (fde->state & FDE_ERROR) { + pfd.events |= POLLERR; + } +#if defined(__linux__) + pfd.events |= POLLRDHUP; +#endif + pfd.revents = 0; + pollfds.push_back(pfd); + } + CHECK_GT(pollfds.size(), 0u); + D("poll(), pollfds = %s", dump_pollfds(pollfds).c_str()); - fdevent_process(this); - - fdevent_check_spin(this, cycle++); - - while (!pending_list_.empty()) { - fdevent* fde = pending_list_.front(); - pending_list_.pop_front(); - fdevent_call_fdfunc(fde); + std::optional timeout = CalculatePollDuration(); + int timeout_ms; + if (!timeout) { + timeout_ms = -1; + } else { + timeout_ms = timeout->count(); } - this->FlushRunQueue(); + int ret = adb_poll(pollfds.data(), pollfds.size(), timeout_ms); + if (ret == -1) { + PLOG(ERROR) << "poll(), ret = " << ret; + return; + } + + auto post_poll = std::chrono::steady_clock::now(); + std::vector poll_events; + + for (const auto& pollfd : pollfds) { + unsigned events = 0; + if (pollfd.revents & POLLIN) { + events |= FDE_READ; + } + if (pollfd.revents & POLLOUT) { + events |= FDE_WRITE; + } + if (pollfd.revents & (POLLERR | POLLHUP | POLLNVAL)) { + // We fake a read, as the rest of the code assumes that errors will + // be detected at that point. + events |= FDE_READ | FDE_ERROR; + } +#if defined(__linux__) + if (pollfd.revents & POLLRDHUP) { + events |= FDE_READ | FDE_ERROR; + } +#endif + + auto it = this->installed_fdevents_.find(pollfd.fd); + CHECK(it != this->installed_fdevents_.end()); + fdevent* fde = it->second; + + if (events == 0) { + if (fde->timeout) { + auto deadline = fde->last_active + *fde->timeout; + if (deadline < post_poll) { + events |= FDE_TIMEOUT; + } + } + } + + if (events != 0) { + D("%s got events %x", dump_fde(fde).c_str(), events); + poll_events.push_back({fde, events}); + fde->last_active = post_poll; + } + } + this->HandleEvents(std::move(poll_events)); } main_thread_id_.reset(); @@ -351,7 +195,7 @@ void fdevent_context_poll::Loop() { size_t fdevent_context_poll::InstalledCount() { // We always have an installed fde for interrupt. - return poll_node_map_.size() - 1; + return this->installed_fdevents_.size() - 1; } void fdevent_context_poll::Interrupt() { diff --git a/adb/fdevent/fdevent_poll.h b/adb/fdevent/fdevent_poll.h index db083018b..98abab283 100644 --- a/adb/fdevent/fdevent_poll.h +++ b/adb/fdevent/fdevent_poll.h @@ -44,13 +44,10 @@ struct PollNode { } }; -struct fdevent_context_poll : public fdevent_context { +struct fdevent_context_poll final : public fdevent_context { fdevent_context_poll(); virtual ~fdevent_context_poll(); - virtual void Register(fdevent* fde) final; - virtual void Unregister(fdevent* fde) final; - virtual void Set(fdevent* fde, unsigned events) final; virtual void Loop() final; @@ -61,11 +58,6 @@ struct fdevent_context_poll : public fdevent_context { virtual void Interrupt() final; public: - // All operations to fdevent should happen only in the main thread. - // That's why we don't need a lock for fdevent. - std::unordered_map poll_node_map_; - std::list pending_list_; - unique_fd interrupt_fd_; fdevent* interrupt_fde_ = nullptr; }; diff --git a/adb/fdevent/fdevent_test.cpp b/adb/fdevent/fdevent_test.cpp index 682f06102..e06b3b329 100644 --- a/adb/fdevent/fdevent_test.cpp +++ b/adb/fdevent/fdevent_test.cpp @@ -118,8 +118,8 @@ TEST_F(FdeventTest, fdevent_terminate) { TEST_F(FdeventTest, smoke) { for (bool use_new_callback : {true, false}) { fdevent_reset(); - const size_t PIPE_COUNT = 10; - const size_t MESSAGE_LOOP_COUNT = 100; + const size_t PIPE_COUNT = 512; + const size_t MESSAGE_LOOP_COUNT = 10; const std::string MESSAGE = "fdevent_test"; int fd_pair1[2]; int fd_pair2[2]; @@ -172,44 +172,6 @@ TEST_F(FdeventTest, smoke) { } } -struct InvalidFdArg { - fdevent* fde; - unsigned expected_events; - size_t* happened_event_count; -}; - -static void InvalidFdEventCallback(int, unsigned events, void* userdata) { - InvalidFdArg* arg = reinterpret_cast(userdata); - ASSERT_EQ(arg->expected_events, events); - fdevent_destroy(arg->fde); - if (++*(arg->happened_event_count) == 2) { - fdevent_terminate_loop(); - } -} - -static void InvalidFdThreadFunc() { - const int INVALID_READ_FD = std::numeric_limits::max() - 1; - size_t happened_event_count = 0; - InvalidFdArg read_arg; - read_arg.expected_events = FDE_READ | FDE_ERROR; - read_arg.happened_event_count = &happened_event_count; - read_arg.fde = fdevent_create(INVALID_READ_FD, InvalidFdEventCallback, &read_arg); - fdevent_add(read_arg.fde, FDE_READ); - - const int INVALID_WRITE_FD = std::numeric_limits::max(); - InvalidFdArg write_arg; - write_arg.expected_events = FDE_READ | FDE_ERROR; - write_arg.happened_event_count = &happened_event_count; - write_arg.fde = fdevent_create(INVALID_WRITE_FD, InvalidFdEventCallback, &write_arg); - fdevent_add(write_arg.fde, FDE_WRITE); - fdevent_loop(); -} - -TEST_F(FdeventTest, invalid_fd) { - std::thread thread(InvalidFdThreadFunc); - thread.join(); -} - TEST_F(FdeventTest, run_on_main_thread) { std::vector vec;