diff --git a/libutils/Looper.cpp b/libutils/Looper.cpp index 14e3e35c7..292425a44 100644 --- a/libutils/Looper.cpp +++ b/libutils/Looper.cpp @@ -20,6 +20,16 @@ namespace android { +namespace { + +constexpr uint64_t WAKE_EVENT_FD_SEQ = 1; + +epoll_event createEpollEvent(uint32_t events, uint64_t seq) { + return {.events = events, .data = {.u64 = seq}}; +} + +} // namespace + // --- WeakMessageHandler --- WeakMessageHandler::WeakMessageHandler(const wp& handler) : @@ -64,7 +74,7 @@ Looper::Looper(bool allowNonCallbacks) mSendingMessage(false), mPolling(false), mEpollRebuildRequired(false), - mNextRequestSeq(0), + mNextRequestSeq(WAKE_EVENT_FD_SEQ + 1), mResponseIndex(0), mNextMessageUptime(LLONG_MAX) { mWakeEventFd.reset(eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC)); @@ -137,22 +147,17 @@ void Looper::rebuildEpollLocked() { mEpollFd.reset(); } - // Allocate the new epoll instance and register the wake pipe. + // Allocate the new epoll instance and register the WakeEventFd. mEpollFd.reset(epoll_create1(EPOLL_CLOEXEC)); LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance: %s", strerror(errno)); - struct epoll_event eventItem; - memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union - eventItem.events = EPOLLIN; - eventItem.data.fd = mWakeEventFd.get(); - int result = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, mWakeEventFd.get(), &eventItem); + epoll_event wakeEvent = createEpollEvent(EPOLLIN, WAKE_EVENT_FD_SEQ); + int result = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, mWakeEventFd.get(), &wakeEvent); LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake event fd to epoll instance: %s", strerror(errno)); - for (size_t i = 0; i < mRequests.size(); i++) { - const Request& request = mRequests.valueAt(i); - struct epoll_event eventItem; - request.initEventItem(&eventItem); + for (const auto& [seq, request] : mRequests) { + epoll_event eventItem = createEpollEvent(request.getEpollEvents(), seq); int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, request.fd, &eventItem); if (epollResult < 0) { @@ -276,26 +281,28 @@ int Looper::pollInner(int timeoutMillis) { #endif for (int i = 0; i < eventCount; i++) { - int fd = eventItems[i].data.fd; + const SequenceNumber seq = eventItems[i].data.u64; uint32_t epollEvents = eventItems[i].events; - if (fd == mWakeEventFd.get()) { + if (seq == WAKE_EVENT_FD_SEQ) { if (epollEvents & EPOLLIN) { awoken(); } else { ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents); } } else { - ssize_t requestIndex = mRequests.indexOfKey(fd); - if (requestIndex >= 0) { + const auto& request_it = mRequests.find(seq); + if (request_it != mRequests.end()) { + const auto& request = request_it->second; int events = 0; if (epollEvents & EPOLLIN) events |= EVENT_INPUT; if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT; if (epollEvents & EPOLLERR) events |= EVENT_ERROR; if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP; - pushResponse(events, mRequests.valueAt(requestIndex)); + mResponses.push({.seq = seq, .events = events, .request = request}); } else { - ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is " - "no longer registered.", epollEvents, fd); + ALOGW("Ignoring unexpected epoll events 0x%x for sequence number %" PRIu64 + " that is no longer registered.", + epollEvents, seq); } } } @@ -354,7 +361,8 @@ Done: ; // we need to be a little careful when removing the file descriptor afterwards. int callbackResult = response.request.callback->handleEvent(fd, events, data); if (callbackResult == 0) { - removeFd(fd, response.request.seq); + AutoMutex _l(mLock); + removeSequenceNumberLocked(response.seq); } // Clear the callback reference in the response structure promptly because we @@ -416,13 +424,6 @@ void Looper::awoken() { TEMP_FAILURE_RETRY(read(mWakeEventFd.get(), &counter, sizeof(uint64_t))); } -void Looper::pushResponse(int events, const Request& request) { - Response response; - response.events = events; - response.request = request; - mResponses.push(response); -} - int Looper::addFd(int fd, int ident, int events, Looper_callbackFunc callback, void* data) { return addFd(fd, ident, events, callback ? new SimpleLooperCallback(callback) : nullptr, data); } @@ -449,27 +450,27 @@ int Looper::addFd(int fd, int ident, int events, const sp& callb { // acquire lock AutoMutex _l(mLock); + // There is a sequence number reserved for the WakeEventFd. + if (mNextRequestSeq == WAKE_EVENT_FD_SEQ) mNextRequestSeq++; + const SequenceNumber seq = mNextRequestSeq++; Request request; request.fd = fd; request.ident = ident; request.events = events; - request.seq = mNextRequestSeq++; request.callback = callback; request.data = data; - if (mNextRequestSeq == -1) mNextRequestSeq = 0; // reserve sequence number -1 - struct epoll_event eventItem; - request.initEventItem(&eventItem); - - ssize_t requestIndex = mRequests.indexOfKey(fd); - if (requestIndex < 0) { + epoll_event eventItem = createEpollEvent(request.getEpollEvents(), seq); + auto seq_it = mSequenceNumberByFd.find(fd); + if (seq_it == mSequenceNumberByFd.end()) { int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, fd, &eventItem); if (epollResult < 0) { ALOGE("Error adding epoll events for fd %d: %s", fd, strerror(errno)); return -1; } - mRequests.add(fd, request); + mRequests.emplace(seq, request); + mSequenceNumberByFd.emplace(fd, seq); } else { int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_MOD, fd, &eventItem); if (epollResult < 0) { @@ -486,7 +487,7 @@ int Looper::addFd(int fd, int ident, int events, const sp& callb // set from scratch because it may contain an old file handle that we are // now unable to remove since its file descriptor is no longer valid. // No such problem would have occurred if we were using the poll system - // call instead, but that approach carries others disadvantages. + // call instead, but that approach carries other disadvantages. #if DEBUG_CALLBACKS ALOGD("%p ~ addFd - EPOLL_CTL_MOD failed due to file descriptor " "being recycled, falling back on EPOLL_CTL_ADD: %s", @@ -504,71 +505,69 @@ int Looper::addFd(int fd, int ident, int events, const sp& callb return -1; } } - mRequests.replaceValueAt(requestIndex, request); + const SequenceNumber oldSeq = seq_it->second; + mRequests.erase(oldSeq); + mRequests.emplace(seq, request); + seq_it->second = seq; } } // release lock return 1; } int Looper::removeFd(int fd) { - return removeFd(fd, -1); + AutoMutex _l(mLock); + const auto& it = mSequenceNumberByFd.find(fd); + if (it == mSequenceNumberByFd.end()) { + return 0; + } + return removeSequenceNumberLocked(it->second); } -int Looper::removeFd(int fd, int seq) { +int Looper::removeSequenceNumberLocked(SequenceNumber seq) { #if DEBUG_CALLBACKS - ALOGD("%p ~ removeFd - fd=%d, seq=%d", this, fd, seq); + ALOGD("%p ~ removeFd - fd=%d, seq=%u", this, fd, seq); #endif - { // acquire lock - AutoMutex _l(mLock); - ssize_t requestIndex = mRequests.indexOfKey(fd); - if (requestIndex < 0) { - return 0; - } + const auto& request_it = mRequests.find(seq); + if (request_it == mRequests.end()) { + return 0; + } + const int fd = request_it->second.fd; - // Check the sequence number if one was given. - if (seq != -1 && mRequests.valueAt(requestIndex).seq != seq) { + // Always remove the FD from the request map even if an error occurs while + // updating the epoll set so that we avoid accidentally leaking callbacks. + mRequests.erase(request_it); + mSequenceNumberByFd.erase(fd); + + int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_DEL, fd, nullptr); + if (epollResult < 0) { + if (errno == EBADF || errno == ENOENT) { + // Tolerate EBADF or ENOENT because it means that the file descriptor was closed + // before its callback was unregistered. This error may occur naturally when a + // callback has the side-effect of closing the file descriptor before returning and + // unregistering itself. + // + // Unfortunately due to kernel limitations we need to rebuild the epoll + // set from scratch because it may contain an old file handle that we are + // now unable to remove since its file descriptor is no longer valid. + // No such problem would have occurred if we were using the poll system + // call instead, but that approach carries other disadvantages. #if DEBUG_CALLBACKS - ALOGD("%p ~ removeFd - sequence number mismatch, oldSeq=%d", - this, mRequests.valueAt(requestIndex).seq); + ALOGD("%p ~ removeFd - EPOLL_CTL_DEL failed due to file descriptor " + "being closed: %s", + this, strerror(errno)); #endif - return 0; + scheduleEpollRebuildLocked(); + } else { + // Some other error occurred. This is really weird because it means + // our list of callbacks got out of sync with the epoll set somehow. + // We defensively rebuild the epoll set to avoid getting spurious + // notifications with nowhere to go. + ALOGE("Error removing epoll events for fd %d: %s", fd, strerror(errno)); + scheduleEpollRebuildLocked(); + return -1; } - - // Always remove the FD from the request map even if an error occurs while - // updating the epoll set so that we avoid accidentally leaking callbacks. - mRequests.removeItemsAt(requestIndex); - - int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_DEL, fd, nullptr); - if (epollResult < 0) { - if (seq != -1 && (errno == EBADF || errno == ENOENT)) { - // Tolerate EBADF or ENOENT when the sequence number is known because it - // means that the file descriptor was closed before its callback was - // unregistered. This error may occur naturally when a callback has the - // side-effect of closing the file descriptor before returning and - // unregistering itself. - // - // Unfortunately due to kernel limitations we need to rebuild the epoll - // set from scratch because it may contain an old file handle that we are - // now unable to remove since its file descriptor is no longer valid. - // No such problem would have occurred if we were using the poll system - // call instead, but that approach carries others disadvantages. -#if DEBUG_CALLBACKS - ALOGD("%p ~ removeFd - EPOLL_CTL_DEL failed due to file descriptor " - "being closed: %s", this, strerror(errno)); -#endif - scheduleEpollRebuildLocked(); - } else { - // Some other error occurred. This is really weird because it means - // our list of callbacks got out of sync with the epoll set somehow. - // We defensively rebuild the epoll set to avoid getting spurious - // notifications with nowhere to go. - ALOGE("Error removing epoll events for fd %d: %s", fd, strerror(errno)); - scheduleEpollRebuildLocked(); - return -1; - } - } - } // release lock + } return 1; } @@ -656,14 +655,11 @@ bool Looper::isPolling() const { return mPolling; } -void Looper::Request::initEventItem(struct epoll_event* eventItem) const { - int epollEvents = 0; +uint32_t Looper::Request::getEpollEvents() const { + uint32_t epollEvents = 0; if (events & EVENT_INPUT) epollEvents |= EPOLLIN; if (events & EVENT_OUTPUT) epollEvents |= EPOLLOUT; - - memset(eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union - eventItem->events = epollEvents; - eventItem->data.fd = fd; + return epollEvents; } MessageHandler::~MessageHandler() { } diff --git a/libutils/Looper_test.cpp b/libutils/Looper_test.cpp index 34f424b83..c859f9c61 100644 --- a/libutils/Looper_test.cpp +++ b/libutils/Looper_test.cpp @@ -8,6 +8,9 @@ #include #include #include +#include +#include +#include #include "Looper_test_pipe.h" #include @@ -710,4 +713,123 @@ TEST_F(LooperTest, RemoveMessage_WhenRemovingSomeMessagesForHandler_ShouldRemove << "no more messages to handle"; } +class LooperEventCallback : public LooperCallback { + public: + using Callback = std::function; + explicit LooperEventCallback(Callback callback) : mCallback(std::move(callback)) {} + int handleEvent(int fd, int events, void* /*data*/) override { return mCallback(fd, events); } + + private: + Callback mCallback; +}; + +// A utility class that allows for pipes to be added and removed from the looper, and polls the +// looper from a different thread. +class ThreadedLooperUtil { + public: + explicit ThreadedLooperUtil(const sp& looper) : mLooper(looper), mRunning(true) { + mThread = std::thread([this]() { + while (mRunning) { + static constexpr std::chrono::milliseconds POLL_TIMEOUT(500); + mLooper->pollOnce(POLL_TIMEOUT.count()); + } + }); + } + + ~ThreadedLooperUtil() { + mRunning = false; + mThread.join(); + } + + // Create a new pipe, and return the write end of the pipe and the id used to track the pipe. + // The read end of the pipe is added to the looper. + std::pair createPipe() { + int pipeFd[2]; + if (pipe(pipeFd)) { + ADD_FAILURE() << "pipe() failed."; + return {}; + } + const int readFd = pipeFd[0]; + const int writeFd = pipeFd[1]; + + int id; + { // acquire lock + std::scoped_lock l(mLock); + + id = mNextId++; + mFds.emplace(id, readFd); + + auto removeCallback = [this, id, readFd](int fd, int events) { + EXPECT_EQ(readFd, fd) << "Received callback for incorrect fd."; + if ((events & Looper::EVENT_HANGUP) == 0) { + return 1; // Not a hangup, keep the callback. + } + removePipe(id); + return 0; // Remove the callback. + }; + + mLooper->addFd(readFd, 0, Looper::EVENT_INPUT, + new LooperEventCallback(std::move(removeCallback)), nullptr); + } // release lock + + return {id, base::unique_fd(writeFd)}; + } + + // Remove the pipe with the given id. + void removePipe(int id) { + std::scoped_lock l(mLock); + if (mFds.find(id) == mFds.end()) { + return; + } + mLooper->removeFd(mFds[id].get()); + mFds.erase(id); + } + + // Check if the pipe with the given id exists and has not been removed. + bool hasPipe(int id) { + std::scoped_lock l(mLock); + return mFds.find(id) != mFds.end(); + } + + private: + sp mLooper; + std::atomic mRunning; + std::thread mThread; + + std::mutex mLock; + std::unordered_map mFds GUARDED_BY(mLock); + int mNextId GUARDED_BY(mLock) = 0; +}; + +TEST_F(LooperTest, MultiThreaded_NoUnexpectedFdRemoval) { + ThreadedLooperUtil util(mLooper); + + // Iterate repeatedly to try to recreate a flaky instance. + for (int i = 0; i < 1000; i++) { + auto [firstPipeId, firstPipeFd] = util.createPipe(); + const int firstFdNumber = firstPipeFd.get(); + + // Close the first pipe's fd, causing a fd hangup. + firstPipeFd.reset(); + + // Request to remove the pipe from this test thread. This causes a race for pipe removal + // between the hangup in the looper's thread and this remove request from the test thread. + util.removePipe(firstPipeId); + + // Create the second pipe. Since the fds for the first pipe are closed, this pipe should + // have the same fd numbers as the first pipe because the lowest unused fd number is used. + const auto [secondPipeId, fd] = util.createPipe(); + EXPECT_EQ(firstFdNumber, fd.get()) + << "The first and second fds must match for the purposes of this test."; + + // Wait for unexpected hangup to occur. + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + + ASSERT_TRUE(util.hasPipe(secondPipeId)) << "The second pipe was removed unexpectedly."; + + util.removePipe(secondPipeId); + } + SUCCEED() << "No unexpectedly removed fds."; +} + } // namespace android diff --git a/libutils/include/utils/Looper.h b/libutils/include/utils/Looper.h index 466fbb726..b387d68c6 100644 --- a/libutils/include/utils/Looper.h +++ b/libutils/include/utils/Looper.h @@ -17,15 +17,16 @@ #ifndef UTILS_LOOPER_H #define UTILS_LOOPER_H -#include #include -#include #include +#include +#include #include #include +#include #include namespace android { @@ -421,18 +422,20 @@ public: static sp getForThread(); private: - struct Request { - int fd; - int ident; - int events; - int seq; - sp callback; - void* data; + using SequenceNumber = uint64_t; - void initEventItem(struct epoll_event* eventItem) const; - }; + struct Request { + int fd; + int ident; + int events; + sp callback; + void* data; + + uint32_t getEpollEvents() const; + }; struct Response { + SequenceNumber seq; int events; Request request; }; @@ -463,9 +466,14 @@ private: android::base::unique_fd mEpollFd; // guarded by mLock but only modified on the looper thread bool mEpollRebuildRequired; // guarded by mLock - // Locked list of file descriptor monitoring requests. - KeyedVector mRequests; // guarded by mLock - int mNextRequestSeq; + // Locked maps of fds and sequence numbers monitoring requests. + // Both maps must be kept in sync at all times. + std::unordered_map mRequests; // guarded by mLock + std::unordered_map mSequenceNumberByFd; // guarded by mLock + + // The sequence number to use for the next fd that is added to the looper. + // The sequence number 0 is reserved for the WakeEventFd. + SequenceNumber mNextRequestSeq; // guarded by mLock // This state is only used privately by pollOnce and does not require a lock since // it runs on a single thread. @@ -474,9 +482,8 @@ private: nsecs_t mNextMessageUptime; // set to LLONG_MAX when none int pollInner(int timeoutMillis); - int removeFd(int fd, int seq); + int removeSequenceNumberLocked(SequenceNumber seq); // requires mLock void awoken(); - void pushResponse(int events, const Request& request); void rebuildEpollLocked(); void scheduleEpollRebuildLocked();