SocketListener: use poll() instead of select()
FD_SET is limited to 1024 file descriptors in Linux, which causes processes with too many open files or connections to crash: FORTIFY: FD_ISSET: file descriptor 1024 >= FD_SETSIZE 128 The fix we used elsewhere is replacing select() with poll(), but in the case of SocketListener we additionally need to replace the SocketClient list with a map indexed by fd in order to avoid quadratic behavior on each poll() wakeup. Bug: 79838856 Test: device boots and appears to work normally, tests pass Change-Id: I4a8f1804fa990d3db3a2c96b9acd60b2c7135950
This commit is contained in:
parent
763ccaa104
commit
ca76859aca
4 changed files with 80 additions and 113 deletions
|
@ -1,8 +1,6 @@
|
|||
#ifndef _SOCKET_CLIENT_H
|
||||
#define _SOCKET_CLIENT_H
|
||||
|
||||
#include "List.h"
|
||||
|
||||
#include <pthread.h>
|
||||
#include <cutils/atomic.h>
|
||||
#include <sys/types.h>
|
||||
|
@ -35,7 +33,7 @@ public:
|
|||
SocketClient(int sock, bool owned, bool useCmdNum);
|
||||
virtual ~SocketClient();
|
||||
|
||||
int getSocket() { return mSocket; }
|
||||
int getSocket() const { return mSocket; }
|
||||
pid_t getPid() const { return mPid; }
|
||||
uid_t getUid() const { return mUid; }
|
||||
gid_t getGid() const { return mGid; }
|
||||
|
@ -84,5 +82,4 @@ private:
|
|||
int sendDataLockedv(struct iovec *iov, int iovcnt);
|
||||
};
|
||||
|
||||
typedef android::sysutils::List<SocketClient *> SocketClientCollection;
|
||||
#endif
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright (C) 2008-2014 The Android Open Source Project
|
||||
* Copyright (C) 2008 The Android Open Source Project
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
@ -18,6 +18,8 @@
|
|||
|
||||
#include <pthread.h>
|
||||
|
||||
#include <unordered_map>
|
||||
|
||||
#include <sysutils/SocketClient.h>
|
||||
#include "SocketClientCommand.h"
|
||||
|
||||
|
@ -25,7 +27,7 @@ class SocketListener {
|
|||
bool mListen;
|
||||
const char *mSocketName;
|
||||
int mSock;
|
||||
SocketClientCollection *mClients;
|
||||
std::unordered_map<int, SocketClient*> mClients;
|
||||
pthread_mutex_t mClientsLock;
|
||||
int mCtrlPipe[2];
|
||||
pthread_t mThread;
|
||||
|
@ -51,8 +53,13 @@ protected:
|
|||
virtual bool onDataAvailable(SocketClient *c) = 0;
|
||||
|
||||
private:
|
||||
bool release(SocketClient *c, bool wakeup);
|
||||
static void *threadStart(void *obj);
|
||||
|
||||
// Add all clients to a separate list, so we don't have to hold the lock
|
||||
// while processing it.
|
||||
std::vector<SocketClient*> snapshotClients();
|
||||
|
||||
bool release(SocketClient *c, bool wakeup);
|
||||
void runListener();
|
||||
void init(const char *socketName, int socketFd, bool listen, bool useCmdNum);
|
||||
};
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright (C) 2008-2014 The Android Open Source Project
|
||||
* Copyright (C) 2008 The Android Open Source Project
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
@ -19,13 +19,15 @@
|
|||
#include <errno.h>
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <sys/select.h>
|
||||
#include <sys/poll.h>
|
||||
#include <sys/socket.h>
|
||||
#include <sys/time.h>
|
||||
#include <sys/types.h>
|
||||
#include <sys/un.h>
|
||||
#include <unistd.h>
|
||||
|
||||
#include <vector>
|
||||
|
||||
#include <cutils/sockets.h>
|
||||
#include <log/log.h>
|
||||
#include <sysutils/SocketListener.h>
|
||||
|
@ -62,12 +64,9 @@ SocketListener::~SocketListener() {
|
|||
close(mCtrlPipe[0]);
|
||||
close(mCtrlPipe[1]);
|
||||
}
|
||||
SocketClientCollection::iterator it;
|
||||
for (it = mClients->begin(); it != mClients->end();) {
|
||||
(*it)->decRef();
|
||||
it = mClients->erase(it);
|
||||
for (auto pair : mClients) {
|
||||
pair.second->decRef();
|
||||
}
|
||||
delete mClients;
|
||||
}
|
||||
|
||||
int SocketListener::startListener() {
|
||||
|
@ -94,7 +93,7 @@ int SocketListener::startListener(int backlog) {
|
|||
SLOGE("Unable to listen on socket (%s)", strerror(errno));
|
||||
return -1;
|
||||
} else if (!mListen)
|
||||
mClients->push_back(new SocketClient(mSock, false, mUseCmdNum));
|
||||
mClients[mSock] = new SocketClient(mSock, false, mUseCmdNum);
|
||||
|
||||
if (pipe(mCtrlPipe)) {
|
||||
SLOGE("pipe failed (%s)", strerror(errno));
|
||||
|
@ -134,11 +133,10 @@ int SocketListener::stopListener() {
|
|||
mSock = -1;
|
||||
}
|
||||
|
||||
SocketClientCollection::iterator it;
|
||||
for (it = mClients->begin(); it != mClients->end();) {
|
||||
delete (*it);
|
||||
it = mClients->erase(it);
|
||||
for (auto pair : mClients) {
|
||||
delete pair.second;
|
||||
}
|
||||
mClients.clear();
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -151,47 +149,30 @@ void *SocketListener::threadStart(void *obj) {
|
|||
}
|
||||
|
||||
void SocketListener::runListener() {
|
||||
|
||||
SocketClientCollection pendingList;
|
||||
|
||||
while(1) {
|
||||
SocketClientCollection::iterator it;
|
||||
fd_set read_fds;
|
||||
int rc = 0;
|
||||
int max = -1;
|
||||
|
||||
FD_ZERO(&read_fds);
|
||||
|
||||
if (mListen) {
|
||||
max = mSock;
|
||||
FD_SET(mSock, &read_fds);
|
||||
}
|
||||
|
||||
FD_SET(mCtrlPipe[0], &read_fds);
|
||||
if (mCtrlPipe[0] > max)
|
||||
max = mCtrlPipe[0];
|
||||
while (true) {
|
||||
std::vector<pollfd> fds;
|
||||
|
||||
pthread_mutex_lock(&mClientsLock);
|
||||
for (it = mClients->begin(); it != mClients->end(); ++it) {
|
||||
fds.reserve(2 + mClients.size());
|
||||
fds.push_back({.fd = mCtrlPipe[0], .events = POLLIN});
|
||||
if (mListen) fds.push_back({.fd = mSock, .events = POLLIN});
|
||||
for (auto pair : mClients) {
|
||||
// NB: calling out to an other object with mClientsLock held (safe)
|
||||
int fd = (*it)->getSocket();
|
||||
FD_SET(fd, &read_fds);
|
||||
if (fd > max) {
|
||||
max = fd;
|
||||
}
|
||||
const int fd = pair.second->getSocket();
|
||||
if (fd != pair.first) SLOGE("fd mismatch: %d != %d", fd, pair.first);
|
||||
fds.push_back({.fd = fd, .events = POLLIN});
|
||||
}
|
||||
pthread_mutex_unlock(&mClientsLock);
|
||||
SLOGV("mListen=%d, max=%d, mSocketName=%s", mListen, max, mSocketName);
|
||||
if ((rc = select(max + 1, &read_fds, nullptr, nullptr, nullptr)) < 0) {
|
||||
if (errno == EINTR)
|
||||
continue;
|
||||
SLOGE("select failed (%s) mListen=%d, max=%d", strerror(errno), mListen, max);
|
||||
|
||||
SLOGV("mListen=%d, mSocketName=%s", mListen, mSocketName);
|
||||
int rc = TEMP_FAILURE_RETRY(poll(fds.data(), fds.size(), -1));
|
||||
if (rc < 0) {
|
||||
SLOGE("poll failed (%s) mListen=%d", strerror(errno), mListen);
|
||||
sleep(1);
|
||||
continue;
|
||||
} else if (!rc)
|
||||
continue;
|
||||
}
|
||||
|
||||
if (FD_ISSET(mCtrlPipe[0], &read_fds)) {
|
||||
if (fds[0].revents & (POLLIN | POLLERR)) {
|
||||
char c = CtrlPipe_Shutdown;
|
||||
TEMP_FAILURE_RETRY(read(mCtrlPipe[0], &c, 1));
|
||||
if (c == CtrlPipe_Shutdown) {
|
||||
|
@ -199,7 +180,7 @@ void SocketListener::runListener() {
|
|||
}
|
||||
continue;
|
||||
}
|
||||
if (mListen && FD_ISSET(mSock, &read_fds)) {
|
||||
if (mListen && (fds[1].revents & (POLLIN | POLLERR))) {
|
||||
int c = TEMP_FAILURE_RETRY(accept4(mSock, nullptr, nullptr, SOCK_CLOEXEC));
|
||||
if (c < 0) {
|
||||
SLOGE("accept failed (%s)", strerror(errno));
|
||||
|
@ -207,32 +188,33 @@ void SocketListener::runListener() {
|
|||
continue;
|
||||
}
|
||||
pthread_mutex_lock(&mClientsLock);
|
||||
mClients->push_back(new SocketClient(c, true, mUseCmdNum));
|
||||
mClients[c] = new SocketClient(c, true, mUseCmdNum);
|
||||
pthread_mutex_unlock(&mClientsLock);
|
||||
}
|
||||
|
||||
/* Add all active clients to the pending list first */
|
||||
pendingList.clear();
|
||||
// Add all active clients to the pending list first, so we can release
|
||||
// the lock before invoking the callbacks.
|
||||
std::vector<SocketClient*> pending;
|
||||
pthread_mutex_lock(&mClientsLock);
|
||||
for (it = mClients->begin(); it != mClients->end(); ++it) {
|
||||
SocketClient* c = *it;
|
||||
// NB: calling out to an other object with mClientsLock held (safe)
|
||||
int fd = c->getSocket();
|
||||
if (FD_ISSET(fd, &read_fds)) {
|
||||
pendingList.push_back(c);
|
||||
const int size = fds.size();
|
||||
for (int i = mListen ? 2 : 1; i < size; ++i) {
|
||||
const struct pollfd& p = fds[i];
|
||||
if (p.revents & (POLLIN | POLLERR)) {
|
||||
auto it = mClients.find(p.fd);
|
||||
if (it == mClients.end()) {
|
||||
SLOGE("fd vanished: %d", p.fd);
|
||||
continue;
|
||||
}
|
||||
SocketClient* c = it->second;
|
||||
pending.push_back(c);
|
||||
c->incRef();
|
||||
}
|
||||
}
|
||||
pthread_mutex_unlock(&mClientsLock);
|
||||
|
||||
/* Process the pending list, since it is owned by the thread,
|
||||
* there is no need to lock it */
|
||||
while (!pendingList.empty()) {
|
||||
/* Pop the first item from the list */
|
||||
it = pendingList.begin();
|
||||
SocketClient* c = *it;
|
||||
pendingList.erase(it);
|
||||
/* Process it, if false is returned, remove from list */
|
||||
for (SocketClient* c : pending) {
|
||||
// Process it, if false is returned, remove from the map
|
||||
SLOGV("processing fd %d", c->getSocket());
|
||||
if (!onDataAvailable(c)) {
|
||||
release(c, false);
|
||||
}
|
||||
|
@ -245,17 +227,10 @@ bool SocketListener::release(SocketClient* c, bool wakeup) {
|
|||
bool ret = false;
|
||||
/* if our sockets are connection-based, remove and destroy it */
|
||||
if (mListen && c) {
|
||||
/* Remove the client from our array */
|
||||
/* Remove the client from our map */
|
||||
SLOGV("going to zap %d for %s", c->getSocket(), mSocketName);
|
||||
pthread_mutex_lock(&mClientsLock);
|
||||
SocketClientCollection::iterator it;
|
||||
for (it = mClients->begin(); it != mClients->end(); ++it) {
|
||||
if (*it == c) {
|
||||
mClients->erase(it);
|
||||
ret = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
ret = (mClients.erase(c->getSocket()) != 0);
|
||||
pthread_mutex_unlock(&mClientsLock);
|
||||
if (ret) {
|
||||
ret = c->decRef();
|
||||
|
@ -268,26 +243,22 @@ bool SocketListener::release(SocketClient* c, bool wakeup) {
|
|||
return ret;
|
||||
}
|
||||
|
||||
void SocketListener::sendBroadcast(int code, const char *msg, bool addErrno) {
|
||||
SocketClientCollection safeList;
|
||||
|
||||
/* Add all active clients to the safe list first */
|
||||
safeList.clear();
|
||||
std::vector<SocketClient*> SocketListener::snapshotClients() {
|
||||
std::vector<SocketClient*> clients;
|
||||
pthread_mutex_lock(&mClientsLock);
|
||||
SocketClientCollection::iterator i;
|
||||
|
||||
for (i = mClients->begin(); i != mClients->end(); ++i) {
|
||||
SocketClient* c = *i;
|
||||
clients.reserve(mClients.size());
|
||||
for (auto pair : mClients) {
|
||||
SocketClient* c = pair.second;
|
||||
c->incRef();
|
||||
safeList.push_back(c);
|
||||
clients.push_back(c);
|
||||
}
|
||||
pthread_mutex_unlock(&mClientsLock);
|
||||
|
||||
while (!safeList.empty()) {
|
||||
/* Pop the first item from the list */
|
||||
i = safeList.begin();
|
||||
SocketClient* c = *i;
|
||||
safeList.erase(i);
|
||||
return clients;
|
||||
}
|
||||
|
||||
void SocketListener::sendBroadcast(int code, const char *msg, bool addErrno) {
|
||||
for (SocketClient* c : snapshotClients()) {
|
||||
// broadcasts are unsolicited and should not include a cmd number
|
||||
if (c->sendMsg(code, msg, addErrno, false)) {
|
||||
SLOGW("Error sending broadcast (%s)", strerror(errno));
|
||||
|
@ -297,25 +268,7 @@ void SocketListener::sendBroadcast(int code, const char *msg, bool addErrno) {
|
|||
}
|
||||
|
||||
void SocketListener::runOnEachSocket(SocketClientCommand *command) {
|
||||
SocketClientCollection safeList;
|
||||
|
||||
/* Add all active clients to the safe list first */
|
||||
safeList.clear();
|
||||
pthread_mutex_lock(&mClientsLock);
|
||||
SocketClientCollection::iterator i;
|
||||
|
||||
for (i = mClients->begin(); i != mClients->end(); ++i) {
|
||||
SocketClient* c = *i;
|
||||
c->incRef();
|
||||
safeList.push_back(c);
|
||||
}
|
||||
pthread_mutex_unlock(&mClientsLock);
|
||||
|
||||
while (!safeList.empty()) {
|
||||
/* Pop the first item from the list */
|
||||
i = safeList.begin();
|
||||
SocketClient* c = *i;
|
||||
safeList.erase(i);
|
||||
for (SocketClient* c : snapshotClients()) {
|
||||
command->runSocketCommand(c);
|
||||
c->decRef();
|
||||
}
|
||||
|
|
|
@ -170,3 +170,13 @@ TEST_F(FrameworkListenerTest, RejectsInvalidCommands) {
|
|||
testCommand("test \"arg1 arg2", "500 Unclosed quotes error");
|
||||
testCommand("test \\a", "500 Unsupported escape sequence");
|
||||
}
|
||||
|
||||
TEST_F(FrameworkListenerTest, MultipleClients) {
|
||||
unique_fd client1 = clientSocket(mSocketPath);
|
||||
unique_fd client2 = clientSocket(mSocketPath);
|
||||
sendCmd(client1.get(), "test 1");
|
||||
sendCmd(client2.get(), "test 2");
|
||||
|
||||
EXPECT_EQ(std::string("42 test,2") + '\0', recvReply(client2.get()));
|
||||
EXPECT_EQ(std::string("42 test,1") + '\0', recvReply(client1.get()));
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue