adb: fdevent: move run queue to fdevent_context.

Make the run queue logic reusable between implementations of fdevent by
moving it to the abstract base class.

Test: adb_test
Change-Id: If2f72e3ddc8007304bca63aa75446fa117267b25
This commit is contained in:
Josh Gao 2019-07-08 17:37:23 -07:00
parent 7adca93fe9
commit 95eef6b097
5 changed files with 99 additions and 90 deletions

View file

@ -49,6 +49,32 @@ std::string dump_fde(const fdevent* fde) {
state.c_str());
}
void fdevent_context::Run(std::function<void()> fn) {
{
std::lock_guard<std::mutex> lock(run_queue_mutex_);
run_queue_.push_back(std::move(fn));
}
Interrupt();
}
void fdevent_context::FlushRunQueue() {
// We need to be careful around reentrancy here, since a function we call can queue up another
// function.
while (true) {
std::function<void()> fn;
{
std::lock_guard<std::mutex> lock(this->run_queue_mutex_);
if (this->run_queue_.empty()) {
break;
}
fn = this->run_queue_.front();
this->run_queue_.pop_front();
}
fn();
}
}
static auto& g_ambient_fdevent_context =
*new std::unique_ptr<fdevent_context>(new fdevent_context_poll());

View file

@ -21,10 +21,14 @@
#include <stdint.h>
#include <chrono>
#include <deque>
#include <functional>
#include <mutex>
#include <optional>
#include <variant>
#include <android-base/thread_annotations.h>
#include "adb_unique_fd.h"
// Events that may be observed
@ -48,6 +52,7 @@ struct fdevent;
std::string dump_fde(const fdevent* fde);
struct fdevent_context {
public:
virtual ~fdevent_context() = default;
// Allocate and initialize a new fdevent object.
@ -68,17 +73,29 @@ struct fdevent_context {
virtual void SetTimeout(fdevent* fde, std::optional<std::chrono::milliseconds> timeout) = 0;
// Loop forever, handling events.
// Implementations should call FlushRunQueue on every iteration.
virtual void Loop() = 0;
// Assert that the caller is running on the context's main thread.
virtual void CheckMainThread() = 0;
// Queue an operation to be run on the main thread.
virtual void Run(std::function<void()> fn) = 0;
void Run(std::function<void()> fn);
// Test-only functionality:
virtual void TerminateLoop() = 0;
virtual size_t InstalledCount() = 0;
protected:
// Interrupt the run loop.
virtual void Interrupt() = 0;
// Run all pending functions enqueued via Run().
void FlushRunQueue() EXCLUDES(run_queue_mutex_);
private:
std::mutex run_queue_mutex_;
std::deque<std::function<void()>> run_queue_ GUARDED_BY(run_queue_mutex_);
};
struct fdevent {

View file

@ -50,6 +50,35 @@
#include "fdevent.h"
#include "sysdeps/chrono.h"
static void fdevent_interrupt(int fd, unsigned, void*) {
char buf[BUFSIZ];
ssize_t rc = TEMP_FAILURE_RETRY(adb_read(fd, buf, sizeof(buf)));
if (rc == -1) {
PLOG(FATAL) << "failed to read from fdevent interrupt fd";
}
}
fdevent_context_poll::fdevent_context_poll() {
int s[2];
if (adb_socketpair(s) != 0) {
PLOG(FATAL) << "failed to create fdevent interrupt socketpair";
}
if (!set_file_block_mode(s[0], false) || !set_file_block_mode(s[1], false)) {
PLOG(FATAL) << "failed to make fdevent interrupt socket nonblocking";
}
this->interrupt_fd_.reset(s[0]);
fdevent* fde = this->Create(unique_fd(s[1]), fdevent_interrupt, nullptr);
CHECK(fde != nullptr);
this->Add(fde, FDE_READ);
}
fdevent_context_poll::~fdevent_context_poll() {
main_thread_valid_ = false;
this->Destroy(this->interrupt_fde_);
}
void fdevent_context_poll::CheckMainThread() {
if (main_thread_valid_) {
CHECK_EQ(main_thread_id_, android::base::GetThreadId());
@ -291,79 +320,6 @@ static void fdevent_call_fdfunc(fdevent* fde) {
fde->func);
}
static void fdevent_run_flush(fdevent_context_poll* ctx) EXCLUDES(ctx->run_queue_mutex_) {
// We need to be careful around reentrancy here, since a function we call can queue up another
// function.
while (true) {
std::function<void()> fn;
{
std::lock_guard<std::mutex> lock(ctx->run_queue_mutex_);
if (ctx->run_queue_.empty()) {
break;
}
fn = ctx->run_queue_.front();
ctx->run_queue_.pop_front();
}
fn();
}
}
static void fdevent_run_func(int fd, unsigned ev, void* data) {
CHECK_GE(fd, 0);
CHECK(ev & FDE_READ);
bool* run_needs_flush = static_cast<bool*>(data);
char buf[1024];
// Empty the fd.
if (adb_read(fd, buf, sizeof(buf)) == -1) {
PLOG(FATAL) << "failed to empty run queue notify fd";
}
// Mark that we need to flush, and then run it at the end of fdevent_loop.
*run_needs_flush = true;
}
static void fdevent_run_setup(fdevent_context_poll* ctx) {
{
std::lock_guard<std::mutex> lock(ctx->run_queue_mutex_);
CHECK(ctx->run_queue_notify_fd_.get() == -1);
int s[2];
if (adb_socketpair(s) != 0) {
PLOG(FATAL) << "failed to create run queue notify socketpair";
}
if (!set_file_block_mode(s[0], false) || !set_file_block_mode(s[1], false)) {
PLOG(FATAL) << "failed to make run queue notify socket nonblocking";
}
ctx->run_queue_notify_fd_.reset(s[0]);
fdevent* fde = ctx->Create(unique_fd(s[1]), fdevent_run_func, &ctx->run_needs_flush_);
CHECK(fde != nullptr);
ctx->Add(fde, FDE_READ);
}
fdevent_run_flush(ctx);
}
void fdevent_context_poll::Run(std::function<void()> fn) {
std::lock_guard<std::mutex> lock(run_queue_mutex_);
run_queue_.push_back(std::move(fn));
// run_queue_notify_fd could still be -1 if we're called before fdevent has finished setting up.
// In that case, rely on the setup code to flush the queue without a notification being needed.
if (run_queue_notify_fd_ != -1) {
int rc = adb_write(run_queue_notify_fd_.get(), "", 1);
// It's possible that we get EAGAIN here, if lots of notifications came in while handling.
if (rc == 0) {
PLOG(FATAL) << "run queue notify fd was closed?";
} else if (rc == -1 && errno != EAGAIN) {
PLOG(FATAL) << "failed to write to run queue notify fd";
}
}
}
static void fdevent_check_spin(fdevent_context_poll* ctx, uint64_t cycle) {
// Check to see if we're spinning because we forgot about an fdevent
// by keeping track of how long fdevents have been continuously pending.
@ -424,7 +380,6 @@ static void fdevent_check_spin(fdevent_context_poll* ctx, uint64_t cycle) {
void fdevent_context_poll::Loop() {
this->main_thread_id_ = android::base::GetThreadId();
this->main_thread_valid_ = true;
fdevent_run_setup(this);
uint64_t cycle = 0;
while (true) {
@ -444,17 +399,27 @@ void fdevent_context_poll::Loop() {
fdevent_call_fdfunc(fde);
}
if (run_needs_flush_) {
fdevent_run_flush(this);
run_needs_flush_ = false;
}
this->FlushRunQueue();
}
}
void fdevent_context_poll::TerminateLoop() {
terminate_loop_ = true;
Interrupt();
}
size_t fdevent_context_poll::InstalledCount() {
return poll_node_map_.size();
// We always have an installed fde for interrupt.
return poll_node_map_.size() - 1;
}
void fdevent_context_poll::Interrupt() {
int rc = adb_write(this->interrupt_fd_, "", 1);
// It's possible that we get EAGAIN here, if lots of notifications came in while handling.
if (rc == 0) {
PLOG(FATAL) << "fdevent interrupt fd was closed?";
} else if (rc == -1 && errno != EAGAIN) {
PLOG(FATAL) << "failed to write to fdevent interrupt fd";
}
}

View file

@ -25,6 +25,7 @@
#include <android-base/thread_annotations.h>
#include "adb_unique_fd.h"
#include "fdevent.h"
struct PollNode {
@ -44,7 +45,8 @@ struct PollNode {
};
struct fdevent_context_poll : public fdevent_context {
virtual ~fdevent_context_poll() = default;
fdevent_context_poll();
virtual ~fdevent_context_poll();
virtual fdevent* Create(unique_fd fd, std::variant<fd_func, fd_func2> func, void* arg) final;
virtual unique_fd Destroy(fdevent* fde) final;
@ -58,11 +60,13 @@ struct fdevent_context_poll : public fdevent_context {
virtual void CheckMainThread() final;
virtual void Run(std::function<void()> fn) final;
virtual void TerminateLoop() final;
virtual size_t InstalledCount() final;
protected:
virtual void Interrupt() final;
public:
// All operations to fdevent should happen only in the main thread.
// That's why we don't need a lock for fdevent.
std::unordered_map<int, PollNode> poll_node_map_;
@ -71,10 +75,7 @@ struct fdevent_context_poll : public fdevent_context {
uint64_t main_thread_id_ = 0;
uint64_t fdevent_id_ = 0;
bool run_needs_flush_ = false;
unique_fd run_queue_notify_fd_;
std::mutex run_queue_mutex_;
std::deque<std::function<void()>> run_queue_ GUARDED_BY(run_queue_mutex_);
unique_fd interrupt_fd_;
fdevent* interrupt_fde_ = nullptr;
std::atomic<bool> terminate_loop_ = false;
};

View file

@ -78,8 +78,8 @@ class FdeventTest : public ::testing::Test {
}
size_t GetAdditionalLocalSocketCount() {
// dummy socket installed in PrepareThread() + fdevent_run_on_main_thread socket
return 2;
// dummy socket installed in PrepareThread()
return 1;
}
void TerminateThread() {