adb: add fdevent callback that passes the fdevent.

This is useful for when we don't want to actually store the fdevent
into a separate struct to be able to destroy it, but instead want to
destroy it immediately from the callback.

Test: adb_test
Change-Id: Ief3dbf8ea6a6bd72dc7e73f9ab9b7429e48fc181
This commit is contained in:
Josh Gao 2019-01-25 15:30:25 -08:00
parent 92ee52cc38
commit c162c713ef
3 changed files with 114 additions and 63 deletions

View file

@ -33,6 +33,8 @@
#include <list>
#include <mutex>
#include <unordered_map>
#include <utility>
#include <variant>
#include <vector>
#include <android-base/chrono_utils.h>
@ -121,13 +123,8 @@ static std::string dump_fde(const fdevent* fde) {
state.c_str());
}
void fdevent_install(fdevent* fde, int fd, fd_func func, void* arg) {
check_main_thread();
CHECK_GE(fd, 0);
memset(fde, 0, sizeof(fdevent));
}
fdevent* fdevent_create(int fd, fd_func func, void* arg) {
template <typename F>
static fdevent* fdevent_create_impl(int fd, F func, void* arg) {
check_main_thread();
CHECK_GE(fd, 0);
@ -150,6 +147,14 @@ fdevent* fdevent_create(int fd, fd_func func, void* arg) {
return fde;
}
fdevent* fdevent_create(int fd, fd_func func, void* arg) {
return fdevent_create_impl(fd, func, arg);
}
fdevent* fdevent_create(int fd, fd_func2 func, void* arg) {
return fdevent_create_impl(fd, func, arg);
}
unique_fd fdevent_release(fdevent* fde) {
check_main_thread();
if (!fde) {
@ -290,13 +295,27 @@ static void fdevent_process() {
}
}
template <class T>
struct always_false : std::false_type {};
static void fdevent_call_fdfunc(fdevent* fde) {
unsigned events = fde->events;
fde->events = 0;
CHECK(fde->state & FDE_PENDING);
fde->state &= (~FDE_PENDING);
D("fdevent_call_fdfunc %s", dump_fde(fde).c_str());
fde->func(fde->fd.get(), events, fde->arg);
std::visit(
[&](auto&& f) {
using F = std::decay_t<decltype(f)>;
if constexpr (std::is_same_v<fd_func, F>) {
f(fde->fd.get(), events, fde->arg);
} else if constexpr (std::is_same_v<fd_func2, F>) {
f(fde, events, fde->arg);
} else {
static_assert(always_false<F>::value, "non-exhaustive visitor");
}
},
fde->func);
}
static void fdevent_run_flush() EXCLUDES(run_queue_mutex) {

View file

@ -21,6 +21,7 @@
#include <stdint.h> /* for int64_t */
#include <functional>
#include <variant>
#include "adb_unique_fd.h"
@ -30,6 +31,7 @@
#define FDE_ERROR 0x0004
typedef void (*fd_func)(int fd, unsigned events, void *userdata);
typedef void (*fd_func2)(struct fdevent* fde, unsigned events, void* userdata);
struct fdevent {
uint64_t id;
@ -40,15 +42,14 @@ struct fdevent {
uint16_t state = 0;
uint16_t events = 0;
fd_func func = nullptr;
std::variant<fd_func, fd_func2> func;
void* arg = nullptr;
};
/* Allocate and initialize a new fdevent object
* Note: use FD_TIMER as 'fd' to create a fd-less object
* (used to implement timers).
*/
// Allocate and initialize a new fdevent object
// TODO: Switch these to unique_fd.
fdevent *fdevent_create(int fd, fd_func func, void *arg);
fdevent* fdevent_create(int fd, fd_func2 func, void* arg);
// Deallocate an fdevent object that was created by fdevent_create.
void fdevent_destroy(fdevent *fde);
@ -56,16 +57,14 @@ void fdevent_destroy(fdevent *fde);
// fdevent_destroy, except releasing the file descriptor previously owned by the fdevent.
unique_fd fdevent_release(fdevent* fde);
/* Change which events should cause notifications
*/
// Change which events should cause notifications
void fdevent_set(fdevent *fde, unsigned events);
void fdevent_add(fdevent *fde, unsigned events);
void fdevent_del(fdevent *fde, unsigned events);
void fdevent_set_timeout(fdevent *fde, int64_t timeout_ms);
/* loop forever, handling events.
*/
// Loop forever, handling events.
void fdevent_loop();
void check_main_thread();

View file

@ -30,10 +30,16 @@
class FdHandler {
public:
FdHandler(int read_fd, int write_fd) : read_fd_(read_fd), write_fd_(write_fd) {
read_fde_ = fdevent_create(read_fd_, FdEventCallback, this);
FdHandler(int read_fd, int write_fd, bool use_new_callback)
: read_fd_(read_fd), write_fd_(write_fd) {
if (use_new_callback) {
read_fde_ = fdevent_create(read_fd_, FdEventNewCallback, this);
write_fde_ = fdevent_create(write_fd_, FdEventNewCallback, this);
} else {
read_fde_ = fdevent_create(read_fd_, FdEventCallback, this);
write_fde_ = fdevent_create(write_fd_, FdEventCallback, this);
}
fdevent_add(read_fde_, FDE_READ);
write_fde_ = fdevent_create(write_fd_, FdEventCallback, this);
}
~FdHandler() {
@ -64,6 +70,29 @@ class FdHandler {
}
}
static void FdEventNewCallback(fdevent* fde, unsigned events, void* userdata) {
int fd = fde->fd.get();
FdHandler* handler = reinterpret_cast<FdHandler*>(userdata);
ASSERT_EQ(0u, (events & ~(FDE_READ | FDE_WRITE))) << "unexpected events: " << events;
if (events & FDE_READ) {
ASSERT_EQ(fd, handler->read_fd_);
char c;
ASSERT_EQ(1, adb_read(fd, &c, 1));
handler->queue_.push(c);
fdevent_add(handler->write_fde_, FDE_WRITE);
}
if (events & FDE_WRITE) {
ASSERT_EQ(fd, handler->write_fd_);
ASSERT_FALSE(handler->queue_.empty());
char c = handler->queue_.front();
handler->queue_.pop();
ASSERT_EQ(1, adb_write(fd, &c, 1));
if (handler->queue_.empty()) {
fdevent_del(handler->write_fde_, FDE_WRITE);
}
}
}
private:
const int read_fd_;
const int write_fd_;
@ -84,56 +113,60 @@ TEST_F(FdeventTest, fdevent_terminate) {
}
TEST_F(FdeventTest, smoke) {
const size_t PIPE_COUNT = 10;
const size_t MESSAGE_LOOP_COUNT = 100;
const std::string MESSAGE = "fdevent_test";
int fd_pair1[2];
int fd_pair2[2];
ASSERT_EQ(0, adb_socketpair(fd_pair1));
ASSERT_EQ(0, adb_socketpair(fd_pair2));
ThreadArg thread_arg;
thread_arg.first_read_fd = fd_pair1[0];
thread_arg.last_write_fd = fd_pair2[1];
thread_arg.middle_pipe_count = PIPE_COUNT;
int writer = fd_pair1[1];
int reader = fd_pair2[0];
for (bool use_new_callback : {true, false}) {
fdevent_reset();
const size_t PIPE_COUNT = 10;
const size_t MESSAGE_LOOP_COUNT = 100;
const std::string MESSAGE = "fdevent_test";
int fd_pair1[2];
int fd_pair2[2];
ASSERT_EQ(0, adb_socketpair(fd_pair1));
ASSERT_EQ(0, adb_socketpair(fd_pair2));
ThreadArg thread_arg;
thread_arg.first_read_fd = fd_pair1[0];
thread_arg.last_write_fd = fd_pair2[1];
thread_arg.middle_pipe_count = PIPE_COUNT;
int writer = fd_pair1[1];
int reader = fd_pair2[0];
PrepareThread();
PrepareThread();
std::vector<std::unique_ptr<FdHandler>> fd_handlers;
fdevent_run_on_main_thread([&thread_arg, &fd_handlers]() {
std::vector<int> read_fds;
std::vector<int> write_fds;
std::vector<std::unique_ptr<FdHandler>> fd_handlers;
fdevent_run_on_main_thread([&thread_arg, &fd_handlers, use_new_callback]() {
std::vector<int> read_fds;
std::vector<int> write_fds;
read_fds.push_back(thread_arg.first_read_fd);
for (size_t i = 0; i < thread_arg.middle_pipe_count; ++i) {
int fds[2];
ASSERT_EQ(0, adb_socketpair(fds));
read_fds.push_back(fds[0]);
write_fds.push_back(fds[1]);
read_fds.push_back(thread_arg.first_read_fd);
for (size_t i = 0; i < thread_arg.middle_pipe_count; ++i) {
int fds[2];
ASSERT_EQ(0, adb_socketpair(fds));
read_fds.push_back(fds[0]);
write_fds.push_back(fds[1]);
}
write_fds.push_back(thread_arg.last_write_fd);
for (size_t i = 0; i < read_fds.size(); ++i) {
fd_handlers.push_back(
std::make_unique<FdHandler>(read_fds[i], write_fds[i], use_new_callback));
}
});
WaitForFdeventLoop();
for (size_t i = 0; i < MESSAGE_LOOP_COUNT; ++i) {
std::string read_buffer = MESSAGE;
std::string write_buffer(MESSAGE.size(), 'a');
ASSERT_TRUE(WriteFdExactly(writer, read_buffer.c_str(), read_buffer.size()));
ASSERT_TRUE(ReadFdExactly(reader, &write_buffer[0], write_buffer.size()));
ASSERT_EQ(read_buffer, write_buffer);
}
write_fds.push_back(thread_arg.last_write_fd);
for (size_t i = 0; i < read_fds.size(); ++i) {
fd_handlers.push_back(std::make_unique<FdHandler>(read_fds[i], write_fds[i]));
}
});
WaitForFdeventLoop();
fdevent_run_on_main_thread([&fd_handlers]() { fd_handlers.clear(); });
WaitForFdeventLoop();
for (size_t i = 0; i < MESSAGE_LOOP_COUNT; ++i) {
std::string read_buffer = MESSAGE;
std::string write_buffer(MESSAGE.size(), 'a');
ASSERT_TRUE(WriteFdExactly(writer, read_buffer.c_str(), read_buffer.size()));
ASSERT_TRUE(ReadFdExactly(reader, &write_buffer[0], write_buffer.size()));
ASSERT_EQ(read_buffer, write_buffer);
TerminateThread();
ASSERT_EQ(0, adb_close(writer));
ASSERT_EQ(0, adb_close(reader));
}
fdevent_run_on_main_thread([&fd_handlers]() { fd_handlers.clear(); });
WaitForFdeventLoop();
TerminateThread();
ASSERT_EQ(0, adb_close(writer));
ASSERT_EQ(0, adb_close(reader));
}
struct InvalidFdArg {