adb: fdevent: move run queue to fdevent_context.
Make the run queue logic reusable between implementations of fdevent by moving it to the abstract base class. Test: adb_test Change-Id: If2f72e3ddc8007304bca63aa75446fa117267b25
This commit is contained in:
parent
7adca93fe9
commit
95eef6b097
5 changed files with 99 additions and 90 deletions
|
@ -49,6 +49,32 @@ std::string dump_fde(const fdevent* fde) {
|
|||
state.c_str());
|
||||
}
|
||||
|
||||
void fdevent_context::Run(std::function<void()> fn) {
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(run_queue_mutex_);
|
||||
run_queue_.push_back(std::move(fn));
|
||||
}
|
||||
|
||||
Interrupt();
|
||||
}
|
||||
|
||||
void fdevent_context::FlushRunQueue() {
|
||||
// We need to be careful around reentrancy here, since a function we call can queue up another
|
||||
// function.
|
||||
while (true) {
|
||||
std::function<void()> fn;
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(this->run_queue_mutex_);
|
||||
if (this->run_queue_.empty()) {
|
||||
break;
|
||||
}
|
||||
fn = this->run_queue_.front();
|
||||
this->run_queue_.pop_front();
|
||||
}
|
||||
fn();
|
||||
}
|
||||
}
|
||||
|
||||
static auto& g_ambient_fdevent_context =
|
||||
*new std::unique_ptr<fdevent_context>(new fdevent_context_poll());
|
||||
|
||||
|
|
|
@ -21,10 +21,14 @@
|
|||
#include <stdint.h>
|
||||
|
||||
#include <chrono>
|
||||
#include <deque>
|
||||
#include <functional>
|
||||
#include <mutex>
|
||||
#include <optional>
|
||||
#include <variant>
|
||||
|
||||
#include <android-base/thread_annotations.h>
|
||||
|
||||
#include "adb_unique_fd.h"
|
||||
|
||||
// Events that may be observed
|
||||
|
@ -48,6 +52,7 @@ struct fdevent;
|
|||
std::string dump_fde(const fdevent* fde);
|
||||
|
||||
struct fdevent_context {
|
||||
public:
|
||||
virtual ~fdevent_context() = default;
|
||||
|
||||
// Allocate and initialize a new fdevent object.
|
||||
|
@ -68,17 +73,29 @@ struct fdevent_context {
|
|||
virtual void SetTimeout(fdevent* fde, std::optional<std::chrono::milliseconds> timeout) = 0;
|
||||
|
||||
// Loop forever, handling events.
|
||||
// Implementations should call FlushRunQueue on every iteration.
|
||||
virtual void Loop() = 0;
|
||||
|
||||
// Assert that the caller is running on the context's main thread.
|
||||
virtual void CheckMainThread() = 0;
|
||||
|
||||
// Queue an operation to be run on the main thread.
|
||||
virtual void Run(std::function<void()> fn) = 0;
|
||||
void Run(std::function<void()> fn);
|
||||
|
||||
// Test-only functionality:
|
||||
virtual void TerminateLoop() = 0;
|
||||
virtual size_t InstalledCount() = 0;
|
||||
|
||||
protected:
|
||||
// Interrupt the run loop.
|
||||
virtual void Interrupt() = 0;
|
||||
|
||||
// Run all pending functions enqueued via Run().
|
||||
void FlushRunQueue() EXCLUDES(run_queue_mutex_);
|
||||
|
||||
private:
|
||||
std::mutex run_queue_mutex_;
|
||||
std::deque<std::function<void()>> run_queue_ GUARDED_BY(run_queue_mutex_);
|
||||
};
|
||||
|
||||
struct fdevent {
|
||||
|
|
|
@ -50,6 +50,35 @@
|
|||
#include "fdevent.h"
|
||||
#include "sysdeps/chrono.h"
|
||||
|
||||
static void fdevent_interrupt(int fd, unsigned, void*) {
|
||||
char buf[BUFSIZ];
|
||||
ssize_t rc = TEMP_FAILURE_RETRY(adb_read(fd, buf, sizeof(buf)));
|
||||
if (rc == -1) {
|
||||
PLOG(FATAL) << "failed to read from fdevent interrupt fd";
|
||||
}
|
||||
}
|
||||
|
||||
fdevent_context_poll::fdevent_context_poll() {
|
||||
int s[2];
|
||||
if (adb_socketpair(s) != 0) {
|
||||
PLOG(FATAL) << "failed to create fdevent interrupt socketpair";
|
||||
}
|
||||
|
||||
if (!set_file_block_mode(s[0], false) || !set_file_block_mode(s[1], false)) {
|
||||
PLOG(FATAL) << "failed to make fdevent interrupt socket nonblocking";
|
||||
}
|
||||
|
||||
this->interrupt_fd_.reset(s[0]);
|
||||
fdevent* fde = this->Create(unique_fd(s[1]), fdevent_interrupt, nullptr);
|
||||
CHECK(fde != nullptr);
|
||||
this->Add(fde, FDE_READ);
|
||||
}
|
||||
|
||||
fdevent_context_poll::~fdevent_context_poll() {
|
||||
main_thread_valid_ = false;
|
||||
this->Destroy(this->interrupt_fde_);
|
||||
}
|
||||
|
||||
void fdevent_context_poll::CheckMainThread() {
|
||||
if (main_thread_valid_) {
|
||||
CHECK_EQ(main_thread_id_, android::base::GetThreadId());
|
||||
|
@ -291,79 +320,6 @@ static void fdevent_call_fdfunc(fdevent* fde) {
|
|||
fde->func);
|
||||
}
|
||||
|
||||
static void fdevent_run_flush(fdevent_context_poll* ctx) EXCLUDES(ctx->run_queue_mutex_) {
|
||||
// We need to be careful around reentrancy here, since a function we call can queue up another
|
||||
// function.
|
||||
while (true) {
|
||||
std::function<void()> fn;
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(ctx->run_queue_mutex_);
|
||||
if (ctx->run_queue_.empty()) {
|
||||
break;
|
||||
}
|
||||
fn = ctx->run_queue_.front();
|
||||
ctx->run_queue_.pop_front();
|
||||
}
|
||||
fn();
|
||||
}
|
||||
}
|
||||
|
||||
static void fdevent_run_func(int fd, unsigned ev, void* data) {
|
||||
CHECK_GE(fd, 0);
|
||||
CHECK(ev & FDE_READ);
|
||||
|
||||
bool* run_needs_flush = static_cast<bool*>(data);
|
||||
char buf[1024];
|
||||
|
||||
// Empty the fd.
|
||||
if (adb_read(fd, buf, sizeof(buf)) == -1) {
|
||||
PLOG(FATAL) << "failed to empty run queue notify fd";
|
||||
}
|
||||
|
||||
// Mark that we need to flush, and then run it at the end of fdevent_loop.
|
||||
*run_needs_flush = true;
|
||||
}
|
||||
|
||||
static void fdevent_run_setup(fdevent_context_poll* ctx) {
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(ctx->run_queue_mutex_);
|
||||
CHECK(ctx->run_queue_notify_fd_.get() == -1);
|
||||
int s[2];
|
||||
if (adb_socketpair(s) != 0) {
|
||||
PLOG(FATAL) << "failed to create run queue notify socketpair";
|
||||
}
|
||||
|
||||
if (!set_file_block_mode(s[0], false) || !set_file_block_mode(s[1], false)) {
|
||||
PLOG(FATAL) << "failed to make run queue notify socket nonblocking";
|
||||
}
|
||||
|
||||
ctx->run_queue_notify_fd_.reset(s[0]);
|
||||
fdevent* fde = ctx->Create(unique_fd(s[1]), fdevent_run_func, &ctx->run_needs_flush_);
|
||||
CHECK(fde != nullptr);
|
||||
ctx->Add(fde, FDE_READ);
|
||||
}
|
||||
|
||||
fdevent_run_flush(ctx);
|
||||
}
|
||||
|
||||
void fdevent_context_poll::Run(std::function<void()> fn) {
|
||||
std::lock_guard<std::mutex> lock(run_queue_mutex_);
|
||||
run_queue_.push_back(std::move(fn));
|
||||
|
||||
// run_queue_notify_fd could still be -1 if we're called before fdevent has finished setting up.
|
||||
// In that case, rely on the setup code to flush the queue without a notification being needed.
|
||||
if (run_queue_notify_fd_ != -1) {
|
||||
int rc = adb_write(run_queue_notify_fd_.get(), "", 1);
|
||||
|
||||
// It's possible that we get EAGAIN here, if lots of notifications came in while handling.
|
||||
if (rc == 0) {
|
||||
PLOG(FATAL) << "run queue notify fd was closed?";
|
||||
} else if (rc == -1 && errno != EAGAIN) {
|
||||
PLOG(FATAL) << "failed to write to run queue notify fd";
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static void fdevent_check_spin(fdevent_context_poll* ctx, uint64_t cycle) {
|
||||
// Check to see if we're spinning because we forgot about an fdevent
|
||||
// by keeping track of how long fdevents have been continuously pending.
|
||||
|
@ -424,7 +380,6 @@ static void fdevent_check_spin(fdevent_context_poll* ctx, uint64_t cycle) {
|
|||
void fdevent_context_poll::Loop() {
|
||||
this->main_thread_id_ = android::base::GetThreadId();
|
||||
this->main_thread_valid_ = true;
|
||||
fdevent_run_setup(this);
|
||||
|
||||
uint64_t cycle = 0;
|
||||
while (true) {
|
||||
|
@ -444,17 +399,27 @@ void fdevent_context_poll::Loop() {
|
|||
fdevent_call_fdfunc(fde);
|
||||
}
|
||||
|
||||
if (run_needs_flush_) {
|
||||
fdevent_run_flush(this);
|
||||
run_needs_flush_ = false;
|
||||
}
|
||||
this->FlushRunQueue();
|
||||
}
|
||||
}
|
||||
|
||||
void fdevent_context_poll::TerminateLoop() {
|
||||
terminate_loop_ = true;
|
||||
Interrupt();
|
||||
}
|
||||
|
||||
size_t fdevent_context_poll::InstalledCount() {
|
||||
return poll_node_map_.size();
|
||||
// We always have an installed fde for interrupt.
|
||||
return poll_node_map_.size() - 1;
|
||||
}
|
||||
|
||||
void fdevent_context_poll::Interrupt() {
|
||||
int rc = adb_write(this->interrupt_fd_, "", 1);
|
||||
|
||||
// It's possible that we get EAGAIN here, if lots of notifications came in while handling.
|
||||
if (rc == 0) {
|
||||
PLOG(FATAL) << "fdevent interrupt fd was closed?";
|
||||
} else if (rc == -1 && errno != EAGAIN) {
|
||||
PLOG(FATAL) << "failed to write to fdevent interrupt fd";
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,6 +25,7 @@
|
|||
|
||||
#include <android-base/thread_annotations.h>
|
||||
|
||||
#include "adb_unique_fd.h"
|
||||
#include "fdevent.h"
|
||||
|
||||
struct PollNode {
|
||||
|
@ -44,7 +45,8 @@ struct PollNode {
|
|||
};
|
||||
|
||||
struct fdevent_context_poll : public fdevent_context {
|
||||
virtual ~fdevent_context_poll() = default;
|
||||
fdevent_context_poll();
|
||||
virtual ~fdevent_context_poll();
|
||||
|
||||
virtual fdevent* Create(unique_fd fd, std::variant<fd_func, fd_func2> func, void* arg) final;
|
||||
virtual unique_fd Destroy(fdevent* fde) final;
|
||||
|
@ -58,11 +60,13 @@ struct fdevent_context_poll : public fdevent_context {
|
|||
|
||||
virtual void CheckMainThread() final;
|
||||
|
||||
virtual void Run(std::function<void()> fn) final;
|
||||
|
||||
virtual void TerminateLoop() final;
|
||||
virtual size_t InstalledCount() final;
|
||||
|
||||
protected:
|
||||
virtual void Interrupt() final;
|
||||
|
||||
public:
|
||||
// All operations to fdevent should happen only in the main thread.
|
||||
// That's why we don't need a lock for fdevent.
|
||||
std::unordered_map<int, PollNode> poll_node_map_;
|
||||
|
@ -71,10 +75,7 @@ struct fdevent_context_poll : public fdevent_context {
|
|||
uint64_t main_thread_id_ = 0;
|
||||
uint64_t fdevent_id_ = 0;
|
||||
|
||||
bool run_needs_flush_ = false;
|
||||
unique_fd run_queue_notify_fd_;
|
||||
std::mutex run_queue_mutex_;
|
||||
std::deque<std::function<void()>> run_queue_ GUARDED_BY(run_queue_mutex_);
|
||||
|
||||
unique_fd interrupt_fd_;
|
||||
fdevent* interrupt_fde_ = nullptr;
|
||||
std::atomic<bool> terminate_loop_ = false;
|
||||
};
|
||||
|
|
|
@ -78,8 +78,8 @@ class FdeventTest : public ::testing::Test {
|
|||
}
|
||||
|
||||
size_t GetAdditionalLocalSocketCount() {
|
||||
// dummy socket installed in PrepareThread() + fdevent_run_on_main_thread socket
|
||||
return 2;
|
||||
// dummy socket installed in PrepareThread()
|
||||
return 1;
|
||||
}
|
||||
|
||||
void TerminateThread() {
|
||||
|
|
Loading…
Reference in a new issue