Merge "Looper: Use sequence numbers in epoll_event to track requests"

This commit is contained in:
Treehugger Robot 2021-09-13 11:58:45 +00:00 committed by Gerrit Code Review
commit 663c692a2d
3 changed files with 232 additions and 107 deletions

View file

@ -20,6 +20,16 @@
namespace android {
namespace {
constexpr uint64_t WAKE_EVENT_FD_SEQ = 1;
epoll_event createEpollEvent(uint32_t events, uint64_t seq) {
return {.events = events, .data = {.u64 = seq}};
}
} // namespace
// --- WeakMessageHandler ---
WeakMessageHandler::WeakMessageHandler(const wp<MessageHandler>& handler) :
@ -64,7 +74,7 @@ Looper::Looper(bool allowNonCallbacks)
mSendingMessage(false),
mPolling(false),
mEpollRebuildRequired(false),
mNextRequestSeq(0),
mNextRequestSeq(WAKE_EVENT_FD_SEQ + 1),
mResponseIndex(0),
mNextMessageUptime(LLONG_MAX) {
mWakeEventFd.reset(eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC));
@ -137,22 +147,17 @@ void Looper::rebuildEpollLocked() {
mEpollFd.reset();
}
// Allocate the new epoll instance and register the wake pipe.
// Allocate the new epoll instance and register the WakeEventFd.
mEpollFd.reset(epoll_create1(EPOLL_CLOEXEC));
LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance: %s", strerror(errno));
struct epoll_event eventItem;
memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
eventItem.events = EPOLLIN;
eventItem.data.fd = mWakeEventFd.get();
int result = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, mWakeEventFd.get(), &eventItem);
epoll_event wakeEvent = createEpollEvent(EPOLLIN, WAKE_EVENT_FD_SEQ);
int result = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, mWakeEventFd.get(), &wakeEvent);
LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake event fd to epoll instance: %s",
strerror(errno));
for (size_t i = 0; i < mRequests.size(); i++) {
const Request& request = mRequests.valueAt(i);
struct epoll_event eventItem;
request.initEventItem(&eventItem);
for (const auto& [seq, request] : mRequests) {
epoll_event eventItem = createEpollEvent(request.getEpollEvents(), seq);
int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, request.fd, &eventItem);
if (epollResult < 0) {
@ -276,26 +281,28 @@ int Looper::pollInner(int timeoutMillis) {
#endif
for (int i = 0; i < eventCount; i++) {
int fd = eventItems[i].data.fd;
const SequenceNumber seq = eventItems[i].data.u64;
uint32_t epollEvents = eventItems[i].events;
if (fd == mWakeEventFd.get()) {
if (seq == WAKE_EVENT_FD_SEQ) {
if (epollEvents & EPOLLIN) {
awoken();
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents);
}
} else {
ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex >= 0) {
const auto& request_it = mRequests.find(seq);
if (request_it != mRequests.end()) {
const auto& request = request_it->second;
int events = 0;
if (epollEvents & EPOLLIN) events |= EVENT_INPUT;
if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;
if (epollEvents & EPOLLERR) events |= EVENT_ERROR;
if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;
pushResponse(events, mRequests.valueAt(requestIndex));
mResponses.push({.seq = seq, .events = events, .request = request});
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
"no longer registered.", epollEvents, fd);
ALOGW("Ignoring unexpected epoll events 0x%x for sequence number %" PRIu64
" that is no longer registered.",
epollEvents, seq);
}
}
}
@ -354,7 +361,8 @@ Done: ;
// we need to be a little careful when removing the file descriptor afterwards.
int callbackResult = response.request.callback->handleEvent(fd, events, data);
if (callbackResult == 0) {
removeFd(fd, response.request.seq);
AutoMutex _l(mLock);
removeSequenceNumberLocked(response.seq);
}
// Clear the callback reference in the response structure promptly because we
@ -416,13 +424,6 @@ void Looper::awoken() {
TEMP_FAILURE_RETRY(read(mWakeEventFd.get(), &counter, sizeof(uint64_t)));
}
void Looper::pushResponse(int events, const Request& request) {
Response response;
response.events = events;
response.request = request;
mResponses.push(response);
}
int Looper::addFd(int fd, int ident, int events, Looper_callbackFunc callback, void* data) {
return addFd(fd, ident, events, callback ? new SimpleLooperCallback(callback) : nullptr, data);
}
@ -449,27 +450,27 @@ int Looper::addFd(int fd, int ident, int events, const sp<LooperCallback>& callb
{ // acquire lock
AutoMutex _l(mLock);
// There is a sequence number reserved for the WakeEventFd.
if (mNextRequestSeq == WAKE_EVENT_FD_SEQ) mNextRequestSeq++;
const SequenceNumber seq = mNextRequestSeq++;
Request request;
request.fd = fd;
request.ident = ident;
request.events = events;
request.seq = mNextRequestSeq++;
request.callback = callback;
request.data = data;
if (mNextRequestSeq == -1) mNextRequestSeq = 0; // reserve sequence number -1
struct epoll_event eventItem;
request.initEventItem(&eventItem);
ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex < 0) {
epoll_event eventItem = createEpollEvent(request.getEpollEvents(), seq);
auto seq_it = mSequenceNumberByFd.find(fd);
if (seq_it == mSequenceNumberByFd.end()) {
int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, fd, &eventItem);
if (epollResult < 0) {
ALOGE("Error adding epoll events for fd %d: %s", fd, strerror(errno));
return -1;
}
mRequests.add(fd, request);
mRequests.emplace(seq, request);
mSequenceNumberByFd.emplace(fd, seq);
} else {
int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_MOD, fd, &eventItem);
if (epollResult < 0) {
@ -486,7 +487,7 @@ int Looper::addFd(int fd, int ident, int events, const sp<LooperCallback>& callb
// set from scratch because it may contain an old file handle that we are
// now unable to remove since its file descriptor is no longer valid.
// No such problem would have occurred if we were using the poll system
// call instead, but that approach carries others disadvantages.
// call instead, but that approach carries other disadvantages.
#if DEBUG_CALLBACKS
ALOGD("%p ~ addFd - EPOLL_CTL_MOD failed due to file descriptor "
"being recycled, falling back on EPOLL_CTL_ADD: %s",
@ -504,71 +505,69 @@ int Looper::addFd(int fd, int ident, int events, const sp<LooperCallback>& callb
return -1;
}
}
mRequests.replaceValueAt(requestIndex, request);
const SequenceNumber oldSeq = seq_it->second;
mRequests.erase(oldSeq);
mRequests.emplace(seq, request);
seq_it->second = seq;
}
} // release lock
return 1;
}
int Looper::removeFd(int fd) {
return removeFd(fd, -1);
AutoMutex _l(mLock);
const auto& it = mSequenceNumberByFd.find(fd);
if (it == mSequenceNumberByFd.end()) {
return 0;
}
return removeSequenceNumberLocked(it->second);
}
int Looper::removeFd(int fd, int seq) {
int Looper::removeSequenceNumberLocked(SequenceNumber seq) {
#if DEBUG_CALLBACKS
ALOGD("%p ~ removeFd - fd=%d, seq=%d", this, fd, seq);
ALOGD("%p ~ removeFd - fd=%d, seq=%u", this, fd, seq);
#endif
{ // acquire lock
AutoMutex _l(mLock);
ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex < 0) {
return 0;
}
const auto& request_it = mRequests.find(seq);
if (request_it == mRequests.end()) {
return 0;
}
const int fd = request_it->second.fd;
// Check the sequence number if one was given.
if (seq != -1 && mRequests.valueAt(requestIndex).seq != seq) {
// Always remove the FD from the request map even if an error occurs while
// updating the epoll set so that we avoid accidentally leaking callbacks.
mRequests.erase(request_it);
mSequenceNumberByFd.erase(fd);
int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_DEL, fd, nullptr);
if (epollResult < 0) {
if (errno == EBADF || errno == ENOENT) {
// Tolerate EBADF or ENOENT because it means that the file descriptor was closed
// before its callback was unregistered. This error may occur naturally when a
// callback has the side-effect of closing the file descriptor before returning and
// unregistering itself.
//
// Unfortunately due to kernel limitations we need to rebuild the epoll
// set from scratch because it may contain an old file handle that we are
// now unable to remove since its file descriptor is no longer valid.
// No such problem would have occurred if we were using the poll system
// call instead, but that approach carries other disadvantages.
#if DEBUG_CALLBACKS
ALOGD("%p ~ removeFd - sequence number mismatch, oldSeq=%d",
this, mRequests.valueAt(requestIndex).seq);
ALOGD("%p ~ removeFd - EPOLL_CTL_DEL failed due to file descriptor "
"being closed: %s",
this, strerror(errno));
#endif
return 0;
scheduleEpollRebuildLocked();
} else {
// Some other error occurred. This is really weird because it means
// our list of callbacks got out of sync with the epoll set somehow.
// We defensively rebuild the epoll set to avoid getting spurious
// notifications with nowhere to go.
ALOGE("Error removing epoll events for fd %d: %s", fd, strerror(errno));
scheduleEpollRebuildLocked();
return -1;
}
// Always remove the FD from the request map even if an error occurs while
// updating the epoll set so that we avoid accidentally leaking callbacks.
mRequests.removeItemsAt(requestIndex);
int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_DEL, fd, nullptr);
if (epollResult < 0) {
if (seq != -1 && (errno == EBADF || errno == ENOENT)) {
// Tolerate EBADF or ENOENT when the sequence number is known because it
// means that the file descriptor was closed before its callback was
// unregistered. This error may occur naturally when a callback has the
// side-effect of closing the file descriptor before returning and
// unregistering itself.
//
// Unfortunately due to kernel limitations we need to rebuild the epoll
// set from scratch because it may contain an old file handle that we are
// now unable to remove since its file descriptor is no longer valid.
// No such problem would have occurred if we were using the poll system
// call instead, but that approach carries others disadvantages.
#if DEBUG_CALLBACKS
ALOGD("%p ~ removeFd - EPOLL_CTL_DEL failed due to file descriptor "
"being closed: %s", this, strerror(errno));
#endif
scheduleEpollRebuildLocked();
} else {
// Some other error occurred. This is really weird because it means
// our list of callbacks got out of sync with the epoll set somehow.
// We defensively rebuild the epoll set to avoid getting spurious
// notifications with nowhere to go.
ALOGE("Error removing epoll events for fd %d: %s", fd, strerror(errno));
scheduleEpollRebuildLocked();
return -1;
}
}
} // release lock
}
return 1;
}
@ -656,14 +655,11 @@ bool Looper::isPolling() const {
return mPolling;
}
void Looper::Request::initEventItem(struct epoll_event* eventItem) const {
int epollEvents = 0;
uint32_t Looper::Request::getEpollEvents() const {
uint32_t epollEvents = 0;
if (events & EVENT_INPUT) epollEvents |= EPOLLIN;
if (events & EVENT_OUTPUT) epollEvents |= EPOLLOUT;
memset(eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
eventItem->events = epollEvents;
eventItem->data.fd = fd;
return epollEvents;
}
MessageHandler::~MessageHandler() { }

View file

@ -8,6 +8,9 @@
#include <utils/Looper.h>
#include <utils/StopWatch.h>
#include <utils/Timers.h>
#include <thread>
#include <unordered_map>
#include <utility>
#include "Looper_test_pipe.h"
#include <utils/threads.h>
@ -710,4 +713,123 @@ TEST_F(LooperTest, RemoveMessage_WhenRemovingSomeMessagesForHandler_ShouldRemove
<< "no more messages to handle";
}
class LooperEventCallback : public LooperCallback {
public:
using Callback = std::function<int(int fd, int events)>;
explicit LooperEventCallback(Callback callback) : mCallback(std::move(callback)) {}
int handleEvent(int fd, int events, void* /*data*/) override { return mCallback(fd, events); }
private:
Callback mCallback;
};
// A utility class that allows for pipes to be added and removed from the looper, and polls the
// looper from a different thread.
class ThreadedLooperUtil {
public:
explicit ThreadedLooperUtil(const sp<Looper>& looper) : mLooper(looper), mRunning(true) {
mThread = std::thread([this]() {
while (mRunning) {
static constexpr std::chrono::milliseconds POLL_TIMEOUT(500);
mLooper->pollOnce(POLL_TIMEOUT.count());
}
});
}
~ThreadedLooperUtil() {
mRunning = false;
mThread.join();
}
// Create a new pipe, and return the write end of the pipe and the id used to track the pipe.
// The read end of the pipe is added to the looper.
std::pair<int /*id*/, base::unique_fd> createPipe() {
int pipeFd[2];
if (pipe(pipeFd)) {
ADD_FAILURE() << "pipe() failed.";
return {};
}
const int readFd = pipeFd[0];
const int writeFd = pipeFd[1];
int id;
{ // acquire lock
std::scoped_lock l(mLock);
id = mNextId++;
mFds.emplace(id, readFd);
auto removeCallback = [this, id, readFd](int fd, int events) {
EXPECT_EQ(readFd, fd) << "Received callback for incorrect fd.";
if ((events & Looper::EVENT_HANGUP) == 0) {
return 1; // Not a hangup, keep the callback.
}
removePipe(id);
return 0; // Remove the callback.
};
mLooper->addFd(readFd, 0, Looper::EVENT_INPUT,
new LooperEventCallback(std::move(removeCallback)), nullptr);
} // release lock
return {id, base::unique_fd(writeFd)};
}
// Remove the pipe with the given id.
void removePipe(int id) {
std::scoped_lock l(mLock);
if (mFds.find(id) == mFds.end()) {
return;
}
mLooper->removeFd(mFds[id].get());
mFds.erase(id);
}
// Check if the pipe with the given id exists and has not been removed.
bool hasPipe(int id) {
std::scoped_lock l(mLock);
return mFds.find(id) != mFds.end();
}
private:
sp<Looper> mLooper;
std::atomic<bool> mRunning;
std::thread mThread;
std::mutex mLock;
std::unordered_map<int, base::unique_fd> mFds GUARDED_BY(mLock);
int mNextId GUARDED_BY(mLock) = 0;
};
TEST_F(LooperTest, MultiThreaded_NoUnexpectedFdRemoval) {
ThreadedLooperUtil util(mLooper);
// Iterate repeatedly to try to recreate a flaky instance.
for (int i = 0; i < 1000; i++) {
auto [firstPipeId, firstPipeFd] = util.createPipe();
const int firstFdNumber = firstPipeFd.get();
// Close the first pipe's fd, causing a fd hangup.
firstPipeFd.reset();
// Request to remove the pipe from this test thread. This causes a race for pipe removal
// between the hangup in the looper's thread and this remove request from the test thread.
util.removePipe(firstPipeId);
// Create the second pipe. Since the fds for the first pipe are closed, this pipe should
// have the same fd numbers as the first pipe because the lowest unused fd number is used.
const auto [secondPipeId, fd] = util.createPipe();
EXPECT_EQ(firstFdNumber, fd.get())
<< "The first and second fds must match for the purposes of this test.";
// Wait for unexpected hangup to occur.
std::this_thread::sleep_for(std::chrono::milliseconds(1));
ASSERT_TRUE(util.hasPipe(secondPipeId)) << "The second pipe was removed unexpectedly.";
util.removePipe(secondPipeId);
}
SUCCEED() << "No unexpectedly removed fds.";
}
} // namespace android

View file

@ -17,15 +17,16 @@
#ifndef UTILS_LOOPER_H
#define UTILS_LOOPER_H
#include <utils/threads.h>
#include <utils/RefBase.h>
#include <utils/KeyedVector.h>
#include <utils/Timers.h>
#include <utils/Vector.h>
#include <utils/threads.h>
#include <sys/epoll.h>
#include <android-base/unique_fd.h>
#include <unordered_map>
#include <utility>
namespace android {
@ -421,18 +422,20 @@ public:
static sp<Looper> getForThread();
private:
struct Request {
int fd;
int ident;
int events;
int seq;
sp<LooperCallback> callback;
void* data;
using SequenceNumber = uint64_t;
void initEventItem(struct epoll_event* eventItem) const;
};
struct Request {
int fd;
int ident;
int events;
sp<LooperCallback> callback;
void* data;
uint32_t getEpollEvents() const;
};
struct Response {
SequenceNumber seq;
int events;
Request request;
};
@ -463,9 +466,14 @@ private:
android::base::unique_fd mEpollFd; // guarded by mLock but only modified on the looper thread
bool mEpollRebuildRequired; // guarded by mLock
// Locked list of file descriptor monitoring requests.
KeyedVector<int, Request> mRequests; // guarded by mLock
int mNextRequestSeq;
// Locked maps of fds and sequence numbers monitoring requests.
// Both maps must be kept in sync at all times.
std::unordered_map<SequenceNumber, Request> mRequests; // guarded by mLock
std::unordered_map<int /*fd*/, SequenceNumber> mSequenceNumberByFd; // guarded by mLock
// The sequence number to use for the next fd that is added to the looper.
// The sequence number 0 is reserved for the WakeEventFd.
SequenceNumber mNextRequestSeq; // guarded by mLock
// This state is only used privately by pollOnce and does not require a lock since
// it runs on a single thread.
@ -474,9 +482,8 @@ private:
nsecs_t mNextMessageUptime; // set to LLONG_MAX when none
int pollInner(int timeoutMillis);
int removeFd(int fd, int seq);
int removeSequenceNumberLocked(SequenceNumber seq); // requires mLock
void awoken();
void pushResponse(int events, const Request& request);
void rebuildEpollLocked();
void scheduleEpollRebuildLocked();