From ca76859aca4b7c69139e05014c5211781cfa3d56 Mon Sep 17 00:00:00 2001 From: Bernie Innocenti Date: Wed, 23 May 2018 19:02:52 +0900 Subject: [PATCH] SocketListener: use poll() instead of select() FD_SET is limited to 1024 file descriptors in Linux, which causes processes with too many open files or connections to crash: FORTIFY: FD_ISSET: file descriptor 1024 >= FD_SETSIZE 128 The fix we used elsewhere is replacing select() with poll(), but in the case of SocketListener we additionally need to replace the SocketClient list with a map indexed by fd in order to avoid quadratic behavior on each poll() wakeup. Bug: 79838856 Test: device boots and appears to work normally, tests pass Change-Id: I4a8f1804fa990d3db3a2c96b9acd60b2c7135950 --- libsysutils/include/sysutils/SocketClient.h | 5 +- libsysutils/include/sysutils/SocketListener.h | 13 +- libsysutils/src/SocketListener.cpp | 165 +++++++----------- libsysutils/src/SocketListener_test.cpp | 10 ++ 4 files changed, 80 insertions(+), 113 deletions(-) diff --git a/libsysutils/include/sysutils/SocketClient.h b/libsysutils/include/sysutils/SocketClient.h index 1004f0611..c657526ee 100644 --- a/libsysutils/include/sysutils/SocketClient.h +++ b/libsysutils/include/sysutils/SocketClient.h @@ -1,8 +1,6 @@ #ifndef _SOCKET_CLIENT_H #define _SOCKET_CLIENT_H -#include "List.h" - #include #include #include @@ -35,7 +33,7 @@ public: SocketClient(int sock, bool owned, bool useCmdNum); virtual ~SocketClient(); - int getSocket() { return mSocket; } + int getSocket() const { return mSocket; } pid_t getPid() const { return mPid; } uid_t getUid() const { return mUid; } gid_t getGid() const { return mGid; } @@ -84,5 +82,4 @@ private: int sendDataLockedv(struct iovec *iov, int iovcnt); }; -typedef android::sysutils::List SocketClientCollection; #endif diff --git a/libsysutils/include/sysutils/SocketListener.h b/libsysutils/include/sysutils/SocketListener.h index bc93b8635..67a691a04 100644 --- a/libsysutils/include/sysutils/SocketListener.h +++ b/libsysutils/include/sysutils/SocketListener.h @@ -1,5 +1,5 @@ /* - * Copyright (C) 2008-2014 The Android Open Source Project + * Copyright (C) 2008 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,6 +18,8 @@ #include +#include + #include #include "SocketClientCommand.h" @@ -25,7 +27,7 @@ class SocketListener { bool mListen; const char *mSocketName; int mSock; - SocketClientCollection *mClients; + std::unordered_map mClients; pthread_mutex_t mClientsLock; int mCtrlPipe[2]; pthread_t mThread; @@ -51,8 +53,13 @@ protected: virtual bool onDataAvailable(SocketClient *c) = 0; private: - bool release(SocketClient *c, bool wakeup); static void *threadStart(void *obj); + + // Add all clients to a separate list, so we don't have to hold the lock + // while processing it. + std::vector snapshotClients(); + + bool release(SocketClient *c, bool wakeup); void runListener(); void init(const char *socketName, int socketFd, bool listen, bool useCmdNum); }; diff --git a/libsysutils/src/SocketListener.cpp b/libsysutils/src/SocketListener.cpp index 6b3f79659..ded5adb6d 100644 --- a/libsysutils/src/SocketListener.cpp +++ b/libsysutils/src/SocketListener.cpp @@ -1,5 +1,5 @@ /* - * Copyright (C) 2008-2014 The Android Open Source Project + * Copyright (C) 2008 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,13 +19,15 @@ #include #include #include -#include +#include #include #include #include #include #include +#include + #include #include #include @@ -62,12 +64,9 @@ SocketListener::~SocketListener() { close(mCtrlPipe[0]); close(mCtrlPipe[1]); } - SocketClientCollection::iterator it; - for (it = mClients->begin(); it != mClients->end();) { - (*it)->decRef(); - it = mClients->erase(it); + for (auto pair : mClients) { + pair.second->decRef(); } - delete mClients; } int SocketListener::startListener() { @@ -94,7 +93,7 @@ int SocketListener::startListener(int backlog) { SLOGE("Unable to listen on socket (%s)", strerror(errno)); return -1; } else if (!mListen) - mClients->push_back(new SocketClient(mSock, false, mUseCmdNum)); + mClients[mSock] = new SocketClient(mSock, false, mUseCmdNum); if (pipe(mCtrlPipe)) { SLOGE("pipe failed (%s)", strerror(errno)); @@ -134,11 +133,10 @@ int SocketListener::stopListener() { mSock = -1; } - SocketClientCollection::iterator it; - for (it = mClients->begin(); it != mClients->end();) { - delete (*it); - it = mClients->erase(it); + for (auto pair : mClients) { + delete pair.second; } + mClients.clear(); return 0; } @@ -151,47 +149,30 @@ void *SocketListener::threadStart(void *obj) { } void SocketListener::runListener() { - - SocketClientCollection pendingList; - - while(1) { - SocketClientCollection::iterator it; - fd_set read_fds; - int rc = 0; - int max = -1; - - FD_ZERO(&read_fds); - - if (mListen) { - max = mSock; - FD_SET(mSock, &read_fds); - } - - FD_SET(mCtrlPipe[0], &read_fds); - if (mCtrlPipe[0] > max) - max = mCtrlPipe[0]; + while (true) { + std::vector fds; pthread_mutex_lock(&mClientsLock); - for (it = mClients->begin(); it != mClients->end(); ++it) { + fds.reserve(2 + mClients.size()); + fds.push_back({.fd = mCtrlPipe[0], .events = POLLIN}); + if (mListen) fds.push_back({.fd = mSock, .events = POLLIN}); + for (auto pair : mClients) { // NB: calling out to an other object with mClientsLock held (safe) - int fd = (*it)->getSocket(); - FD_SET(fd, &read_fds); - if (fd > max) { - max = fd; - } + const int fd = pair.second->getSocket(); + if (fd != pair.first) SLOGE("fd mismatch: %d != %d", fd, pair.first); + fds.push_back({.fd = fd, .events = POLLIN}); } pthread_mutex_unlock(&mClientsLock); - SLOGV("mListen=%d, max=%d, mSocketName=%s", mListen, max, mSocketName); - if ((rc = select(max + 1, &read_fds, nullptr, nullptr, nullptr)) < 0) { - if (errno == EINTR) - continue; - SLOGE("select failed (%s) mListen=%d, max=%d", strerror(errno), mListen, max); + + SLOGV("mListen=%d, mSocketName=%s", mListen, mSocketName); + int rc = TEMP_FAILURE_RETRY(poll(fds.data(), fds.size(), -1)); + if (rc < 0) { + SLOGE("poll failed (%s) mListen=%d", strerror(errno), mListen); sleep(1); continue; - } else if (!rc) - continue; + } - if (FD_ISSET(mCtrlPipe[0], &read_fds)) { + if (fds[0].revents & (POLLIN | POLLERR)) { char c = CtrlPipe_Shutdown; TEMP_FAILURE_RETRY(read(mCtrlPipe[0], &c, 1)); if (c == CtrlPipe_Shutdown) { @@ -199,7 +180,7 @@ void SocketListener::runListener() { } continue; } - if (mListen && FD_ISSET(mSock, &read_fds)) { + if (mListen && (fds[1].revents & (POLLIN | POLLERR))) { int c = TEMP_FAILURE_RETRY(accept4(mSock, nullptr, nullptr, SOCK_CLOEXEC)); if (c < 0) { SLOGE("accept failed (%s)", strerror(errno)); @@ -207,32 +188,33 @@ void SocketListener::runListener() { continue; } pthread_mutex_lock(&mClientsLock); - mClients->push_back(new SocketClient(c, true, mUseCmdNum)); + mClients[c] = new SocketClient(c, true, mUseCmdNum); pthread_mutex_unlock(&mClientsLock); } - /* Add all active clients to the pending list first */ - pendingList.clear(); + // Add all active clients to the pending list first, so we can release + // the lock before invoking the callbacks. + std::vector pending; pthread_mutex_lock(&mClientsLock); - for (it = mClients->begin(); it != mClients->end(); ++it) { - SocketClient* c = *it; - // NB: calling out to an other object with mClientsLock held (safe) - int fd = c->getSocket(); - if (FD_ISSET(fd, &read_fds)) { - pendingList.push_back(c); + const int size = fds.size(); + for (int i = mListen ? 2 : 1; i < size; ++i) { + const struct pollfd& p = fds[i]; + if (p.revents & (POLLIN | POLLERR)) { + auto it = mClients.find(p.fd); + if (it == mClients.end()) { + SLOGE("fd vanished: %d", p.fd); + continue; + } + SocketClient* c = it->second; + pending.push_back(c); c->incRef(); } } pthread_mutex_unlock(&mClientsLock); - /* Process the pending list, since it is owned by the thread, - * there is no need to lock it */ - while (!pendingList.empty()) { - /* Pop the first item from the list */ - it = pendingList.begin(); - SocketClient* c = *it; - pendingList.erase(it); - /* Process it, if false is returned, remove from list */ + for (SocketClient* c : pending) { + // Process it, if false is returned, remove from the map + SLOGV("processing fd %d", c->getSocket()); if (!onDataAvailable(c)) { release(c, false); } @@ -245,17 +227,10 @@ bool SocketListener::release(SocketClient* c, bool wakeup) { bool ret = false; /* if our sockets are connection-based, remove and destroy it */ if (mListen && c) { - /* Remove the client from our array */ + /* Remove the client from our map */ SLOGV("going to zap %d for %s", c->getSocket(), mSocketName); pthread_mutex_lock(&mClientsLock); - SocketClientCollection::iterator it; - for (it = mClients->begin(); it != mClients->end(); ++it) { - if (*it == c) { - mClients->erase(it); - ret = true; - break; - } - } + ret = (mClients.erase(c->getSocket()) != 0); pthread_mutex_unlock(&mClientsLock); if (ret) { ret = c->decRef(); @@ -268,26 +243,22 @@ bool SocketListener::release(SocketClient* c, bool wakeup) { return ret; } -void SocketListener::sendBroadcast(int code, const char *msg, bool addErrno) { - SocketClientCollection safeList; - - /* Add all active clients to the safe list first */ - safeList.clear(); +std::vector SocketListener::snapshotClients() { + std::vector clients; pthread_mutex_lock(&mClientsLock); - SocketClientCollection::iterator i; - - for (i = mClients->begin(); i != mClients->end(); ++i) { - SocketClient* c = *i; + clients.reserve(mClients.size()); + for (auto pair : mClients) { + SocketClient* c = pair.second; c->incRef(); - safeList.push_back(c); + clients.push_back(c); } pthread_mutex_unlock(&mClientsLock); - while (!safeList.empty()) { - /* Pop the first item from the list */ - i = safeList.begin(); - SocketClient* c = *i; - safeList.erase(i); + return clients; +} + +void SocketListener::sendBroadcast(int code, const char *msg, bool addErrno) { + for (SocketClient* c : snapshotClients()) { // broadcasts are unsolicited and should not include a cmd number if (c->sendMsg(code, msg, addErrno, false)) { SLOGW("Error sending broadcast (%s)", strerror(errno)); @@ -297,25 +268,7 @@ void SocketListener::sendBroadcast(int code, const char *msg, bool addErrno) { } void SocketListener::runOnEachSocket(SocketClientCommand *command) { - SocketClientCollection safeList; - - /* Add all active clients to the safe list first */ - safeList.clear(); - pthread_mutex_lock(&mClientsLock); - SocketClientCollection::iterator i; - - for (i = mClients->begin(); i != mClients->end(); ++i) { - SocketClient* c = *i; - c->incRef(); - safeList.push_back(c); - } - pthread_mutex_unlock(&mClientsLock); - - while (!safeList.empty()) { - /* Pop the first item from the list */ - i = safeList.begin(); - SocketClient* c = *i; - safeList.erase(i); + for (SocketClient* c : snapshotClients()) { command->runSocketCommand(c); c->decRef(); } diff --git a/libsysutils/src/SocketListener_test.cpp b/libsysutils/src/SocketListener_test.cpp index aa59e293a..fed454686 100644 --- a/libsysutils/src/SocketListener_test.cpp +++ b/libsysutils/src/SocketListener_test.cpp @@ -170,3 +170,13 @@ TEST_F(FrameworkListenerTest, RejectsInvalidCommands) { testCommand("test \"arg1 arg2", "500 Unclosed quotes error"); testCommand("test \\a", "500 Unsupported escape sequence"); } + +TEST_F(FrameworkListenerTest, MultipleClients) { + unique_fd client1 = clientSocket(mSocketPath); + unique_fd client2 = clientSocket(mSocketPath); + sendCmd(client1.get(), "test 1"); + sendCmd(client2.get(), "test 2"); + + EXPECT_EQ(std::string("42 test,2") + '\0', recvReply(client2.get())); + EXPECT_EQ(std::string("42 test,1") + '\0', recvReply(client1.get())); +}