adb: implement fdevent_set_timeout.

Test: adb_test
Change-Id: I9ae61465617b6a2fe0a1c161ad88c4feae49ec80
This commit is contained in:
Josh Gao 2019-01-31 15:51:52 -08:00
parent 66e76443a0
commit 1a90118bbb
4 changed files with 177 additions and 35 deletions

View file

@ -32,6 +32,7 @@
#include <functional>
#include <list>
#include <mutex>
#include <optional>
#include <unordered_map>
#include <utility>
#include <variant>
@ -225,14 +226,22 @@ void fdevent_set(fdevent* fde, unsigned events) {
void fdevent_add(fdevent* fde, unsigned events) {
check_main_thread();
CHECK(!(events & FDE_TIMEOUT));
fdevent_set(fde, (fde->state & FDE_EVENTMASK) | events);
}
void fdevent_del(fdevent* fde, unsigned events) {
check_main_thread();
CHECK(!(events & FDE_TIMEOUT));
fdevent_set(fde, (fde->state & FDE_EVENTMASK) & ~events);
}
void fdevent_set_timeout(fdevent* fde, std::optional<std::chrono::milliseconds> timeout) {
check_main_thread();
fde->timeout = timeout;
fde->last_active = std::chrono::steady_clock::now();
}
static std::string dump_pollfds(const std::vector<adb_pollfd>& pollfds) {
std::string result;
for (const auto& pollfd : pollfds) {
@ -248,6 +257,32 @@ static std::string dump_pollfds(const std::vector<adb_pollfd>& pollfds) {
return result;
}
static std::optional<std::chrono::milliseconds> calculate_timeout() {
std::optional<std::chrono::milliseconds> result = std::nullopt;
auto now = std::chrono::steady_clock::now();
check_main_thread();
for (const auto& [fd, pollnode] : g_poll_node_map) {
UNUSED(fd);
auto timeout_opt = pollnode.fde->timeout;
if (timeout_opt) {
auto deadline = pollnode.fde->last_active + *timeout_opt;
auto time_left = std::chrono::duration_cast<std::chrono::milliseconds>(deadline - now);
if (time_left < std::chrono::milliseconds::zero()) {
time_left = std::chrono::milliseconds::zero();
}
if (!result) {
result = time_left;
} else {
result = std::min(*result, time_left);
}
}
}
return result;
}
static void fdevent_process() {
std::vector<adb_pollfd> pollfds;
for (const auto& pair : g_poll_node_map) {
@ -256,11 +291,22 @@ static void fdevent_process() {
CHECK_GT(pollfds.size(), 0u);
D("poll(), pollfds = %s", dump_pollfds(pollfds).c_str());
int ret = adb_poll(&pollfds[0], pollfds.size(), -1);
auto timeout = calculate_timeout();
int timeout_ms;
if (!timeout) {
timeout_ms = -1;
} else {
timeout_ms = timeout->count();
}
int ret = adb_poll(&pollfds[0], pollfds.size(), timeout_ms);
if (ret == -1) {
PLOG(ERROR) << "poll(), ret = " << ret;
return;
}
auto post_poll = std::chrono::steady_clock::now();
for (const auto& pollfd : pollfds) {
if (pollfd.revents != 0) {
D("for fd %d, revents = %x", pollfd.fd, pollfd.revents);
@ -282,12 +328,24 @@ static void fdevent_process() {
events |= FDE_READ | FDE_ERROR;
}
#endif
auto it = g_poll_node_map.find(pollfd.fd);
CHECK(it != g_poll_node_map.end());
fdevent* fde = it->second.fde;
if (events == 0) {
// Check for timeout.
if (fde->timeout) {
auto deadline = fde->last_active + *fde->timeout;
if (deadline < post_poll) {
events |= FDE_TIMEOUT;
}
}
}
if (events != 0) {
auto it = g_poll_node_map.find(pollfd.fd);
CHECK(it != g_poll_node_map.end());
fdevent* fde = it->second.fde;
CHECK_EQ(fde->fd.get(), pollfd.fd);
fde->events |= events;
fde->last_active = post_poll;
D("%s got events %x", dump_fde(fde).c_str(), events);
fde->state |= FDE_PENDING;
g_pending_list.push_back(fde);

View file

@ -18,17 +18,20 @@
#define __FDEVENT_H
#include <stddef.h>
#include <stdint.h> /* for int64_t */
#include <stdint.h>
#include <chrono>
#include <functional>
#include <optional>
#include <variant>
#include "adb_unique_fd.h"
/* events that may be observed */
#define FDE_READ 0x0001
#define FDE_WRITE 0x0002
#define FDE_ERROR 0x0004
// Events that may be observed
#define FDE_READ 0x0001
#define FDE_WRITE 0x0002
#define FDE_ERROR 0x0004
#define FDE_TIMEOUT 0x0008
typedef void (*fd_func)(int fd, unsigned events, void *userdata);
typedef void (*fd_func2)(struct fdevent* fde, unsigned events, void* userdata);
@ -41,6 +44,8 @@ struct fdevent {
uint16_t state = 0;
uint16_t events = 0;
std::optional<std::chrono::milliseconds> timeout;
std::chrono::steady_clock::time_point last_active;
std::variant<fd_func, fd_func2> func;
void* arg = nullptr;
@ -62,7 +67,11 @@ void fdevent_set(fdevent *fde, unsigned events);
void fdevent_add(fdevent *fde, unsigned events);
void fdevent_del(fdevent *fde, unsigned events);
void fdevent_set_timeout(fdevent *fde, int64_t timeout_ms);
// Set a timeout on an fdevent.
// If no events are triggered by the timeout, an FDE_TIMEOUT will be generated.
// Note timeouts are not defused automatically; if a timeout is set on an fdevent, it will
// trigger repeatedly every |timeout| ms.
void fdevent_set_timeout(fdevent* fde, std::optional<std::chrono::milliseconds> timeout);
// Loop forever, handling events.
void fdevent_loop();

View file

@ -18,6 +18,7 @@
#include <gtest/gtest.h>
#include <chrono>
#include <limits>
#include <memory>
#include <queue>
@ -28,6 +29,8 @@
#include "adb_io.h"
#include "fdevent_test.h"
using namespace std::chrono_literals;
class FdHandler {
public:
FdHandler(int read_fd, int write_fd, bool use_new_callback)
@ -257,3 +260,100 @@ TEST_F(FdeventTest, run_on_main_thread_reentrant) {
ASSERT_EQ(i, vec[i]);
}
}
TEST_F(FdeventTest, timeout) {
fdevent_reset();
PrepareThread();
enum class TimeoutEvent {
read,
timeout,
done,
};
struct TimeoutTest {
std::vector<std::pair<TimeoutEvent, std::chrono::steady_clock::time_point>> events;
fdevent* fde;
};
TimeoutTest test;
int fds[2];
ASSERT_EQ(0, adb_socketpair(fds));
static constexpr auto delta = 100ms;
fdevent_run_on_main_thread([&]() {
test.fde = fdevent_create(fds[0], [](fdevent* fde, unsigned events, void* arg) {
auto test = static_cast<TimeoutTest*>(arg);
auto now = std::chrono::steady_clock::now();
CHECK((events & FDE_READ) ^ (events & FDE_TIMEOUT));
TimeoutEvent event;
if ((events & FDE_READ)) {
char buf[2];
ssize_t rc = adb_read(fde->fd.get(), buf, sizeof(buf));
if (rc == 0) {
event = TimeoutEvent::done;
} else if (rc == 1) {
event = TimeoutEvent::read;
} else {
abort();
}
} else if ((events & FDE_TIMEOUT)) {
event = TimeoutEvent::timeout;
} else {
abort();
}
CHECK_EQ(fde, test->fde);
test->events.emplace_back(event, now);
if (event == TimeoutEvent::done) {
fdevent_destroy(fde);
}
}, &test);
fdevent_add(test.fde, FDE_READ);
fdevent_set_timeout(test.fde, delta);
});
ASSERT_EQ(1, adb_write(fds[1], "", 1));
// Timeout should happen here
std::this_thread::sleep_for(delta);
// and another.
std::this_thread::sleep_for(delta);
// No timeout should happen here.
std::this_thread::sleep_for(delta / 2);
adb_close(fds[1]);
TerminateThread();
ASSERT_EQ(4ULL, test.events.size());
ASSERT_EQ(TimeoutEvent::read, test.events[0].first);
ASSERT_EQ(TimeoutEvent::timeout, test.events[1].first);
ASSERT_EQ(TimeoutEvent::timeout, test.events[2].first);
ASSERT_EQ(TimeoutEvent::done, test.events[3].first);
std::vector<int> time_deltas;
for (size_t i = 0; i < test.events.size() - 1; ++i) {
auto before = test.events[i].second;
auto after = test.events[i + 1].second;
auto diff = std::chrono::duration_cast<std::chrono::milliseconds>(after - before);
time_deltas.push_back(diff.count());
}
std::vector<int> expected = {
delta.count(),
delta.count(),
delta.count() / 2,
};
std::vector<int> diff;
ASSERT_EQ(time_deltas.size(), expected.size());
for (size_t i = 0; i < time_deltas.size(); ++i) {
diff.push_back(std::abs(time_deltas[i] - expected[i]));
}
ASSERT_LT(diff[0], delta.count() * 0.5);
ASSERT_LT(diff[1], delta.count() * 0.5);
ASSERT_LT(diff[2], delta.count() * 0.5);
}

View file

@ -18,29 +18,4 @@
#include <chrono>
#if defined(_WIN32)
// We don't have C++14 on Windows yet.
// Reimplement std::chrono_literals ourselves until we do.
// Silence the following warning (which gets promoted to an error):
// error: literal operator suffixes not preceded by _ are reserved for future standardization
#pragma GCC system_header
constexpr std::chrono::seconds operator"" s(unsigned long long s) {
return std::chrono::seconds(s);
}
constexpr std::chrono::duration<long double> operator"" s(long double s) {
return std::chrono::duration<long double>(s);
}
constexpr std::chrono::milliseconds operator"" ms(unsigned long long ms) {
return std::chrono::milliseconds(ms);
}
constexpr std::chrono::duration<long double, std::milli> operator"" ms(long double ms) {
return std::chrono::duration<long double, std::milli>(ms);
}
#else
using namespace std::chrono_literals;
#endif