Merge "adb: use poll instead of select in shell_service."

This commit is contained in:
Josh Gao 2019-12-19 20:22:13 +00:00 committed by Gerrit Code Review
commit 27fa358d41
2 changed files with 84 additions and 40 deletions

View file

@ -85,7 +85,6 @@
#include <paths.h>
#include <pty.h>
#include <pwd.h>
#include <sys/select.h>
#include <termios.h>
#include <memory>
@ -141,6 +140,20 @@ bool CreateSocketpair(unique_fd* fd1, unique_fd* fd2) {
return true;
}
struct SubprocessPollfds {
adb_pollfd pfds[3];
adb_pollfd* data() { return pfds; }
size_t size() { return 3; }
adb_pollfd* begin() { return pfds; }
adb_pollfd* end() { return pfds + size(); }
adb_pollfd& stdinout_pfd() { return pfds[0]; }
adb_pollfd& stderr_pfd() { return pfds[1]; }
adb_pollfd& protocol_pfd() { return pfds[2]; }
};
class Subprocess {
public:
Subprocess(std::string command, const char* terminal_type, SubprocessType type,
@ -176,8 +189,7 @@ class Subprocess {
void PassDataStreams();
void WaitForExit();
unique_fd* SelectLoop(fd_set* master_read_set_ptr,
fd_set* master_write_set_ptr);
unique_fd* PollLoop(SubprocessPollfds* pfds);
// Input/output stream handlers. Success returns nullptr, failure returns
// a pointer to the failed FD.
@ -545,23 +557,23 @@ void Subprocess::PassDataStreams() {
}
// Start by trying to read from the protocol FD, stdout, and stderr.
fd_set master_read_set, master_write_set;
FD_ZERO(&master_read_set);
FD_ZERO(&master_write_set);
for (unique_fd* sfd : {&protocol_sfd_, &stdinout_sfd_, &stderr_sfd_}) {
if (*sfd != -1) {
FD_SET(sfd->get(), &master_read_set);
}
}
SubprocessPollfds pfds;
pfds.stdinout_pfd() = {.fd = stdinout_sfd_.get(), .events = POLLIN};
pfds.stderr_pfd() = {.fd = stderr_sfd_.get(), .events = POLLIN};
pfds.protocol_pfd() = {.fd = protocol_sfd_.get(), .events = POLLIN};
// Pass data until the protocol FD or both the subprocess pipes die, at
// which point we can't pass any more data.
while (protocol_sfd_ != -1 && (stdinout_sfd_ != -1 || stderr_sfd_ != -1)) {
unique_fd* dead_sfd = SelectLoop(&master_read_set, &master_write_set);
unique_fd* dead_sfd = PollLoop(&pfds);
if (dead_sfd) {
D("closing FD %d", dead_sfd->get());
FD_CLR(dead_sfd->get(), &master_read_set);
FD_CLR(dead_sfd->get(), &master_write_set);
auto it = std::find_if(pfds.begin(), pfds.end(), [=](const adb_pollfd& pfd) {
return pfd.fd == dead_sfd->get();
});
CHECK(it != pfds.end());
it->fd = -1;
it->events = 0;
if (dead_sfd == &protocol_sfd_) {
// Using SIGHUP is a decent general way to indicate that the
// controlling process is going away. If specific signals are
@ -583,30 +595,19 @@ void Subprocess::PassDataStreams() {
}
}
namespace {
inline bool ValidAndInSet(const unique_fd& sfd, fd_set* set) {
return sfd != -1 && FD_ISSET(sfd.get(), set);
}
} // namespace
unique_fd* Subprocess::SelectLoop(fd_set* master_read_set_ptr,
fd_set* master_write_set_ptr) {
fd_set read_set, write_set;
int select_n =
std::max(std::max(protocol_sfd_.get(), stdinout_sfd_.get()), stderr_sfd_.get()) + 1;
unique_fd* Subprocess::PollLoop(SubprocessPollfds* pfds) {
unique_fd* dead_sfd = nullptr;
adb_pollfd& stdinout_pfd = pfds->stdinout_pfd();
adb_pollfd& stderr_pfd = pfds->stderr_pfd();
adb_pollfd& protocol_pfd = pfds->protocol_pfd();
// Keep calling select() and passing data until an FD closes/errors.
// Keep calling poll() and passing data until an FD closes/errors.
while (!dead_sfd) {
memcpy(&read_set, master_read_set_ptr, sizeof(read_set));
memcpy(&write_set, master_write_set_ptr, sizeof(write_set));
if (select(select_n, &read_set, &write_set, nullptr, nullptr) < 0) {
if (adb_poll(pfds->data(), pfds->size(), -1) < 0) {
if (errno == EINTR) {
continue;
} else {
PLOG(ERROR) << "select failed, closing subprocess pipes";
PLOG(ERROR) << "poll failed, closing subprocess pipes";
stdinout_sfd_.reset(-1);
stderr_sfd_.reset(-1);
return nullptr;
@ -614,34 +615,47 @@ unique_fd* Subprocess::SelectLoop(fd_set* master_read_set_ptr,
}
// Read stdout, write to protocol FD.
if (ValidAndInSet(stdinout_sfd_, &read_set)) {
if (stdinout_pfd.fd != -1 && (stdinout_pfd.revents & POLLIN)) {
dead_sfd = PassOutput(&stdinout_sfd_, ShellProtocol::kIdStdout);
}
// Read stderr, write to protocol FD.
if (!dead_sfd && ValidAndInSet(stderr_sfd_, &read_set)) {
if (!dead_sfd && stderr_pfd.fd != 1 && (stderr_pfd.revents & POLLIN)) {
dead_sfd = PassOutput(&stderr_sfd_, ShellProtocol::kIdStderr);
}
// Read protocol FD, write to stdin.
if (!dead_sfd && ValidAndInSet(protocol_sfd_, &read_set)) {
if (!dead_sfd && protocol_pfd.fd != -1 && (protocol_pfd.revents & POLLIN)) {
dead_sfd = PassInput();
// If we didn't finish writing, block on stdin write.
if (input_bytes_left_) {
FD_CLR(protocol_sfd_.get(), master_read_set_ptr);
FD_SET(stdinout_sfd_.get(), master_write_set_ptr);
protocol_pfd.events &= ~POLLIN;
stdinout_pfd.events |= POLLOUT;
}
}
// Continue writing to stdin; only happens if a previous write blocked.
if (!dead_sfd && ValidAndInSet(stdinout_sfd_, &write_set)) {
if (!dead_sfd && stdinout_pfd.fd != -1 && (stdinout_pfd.revents & POLLOUT)) {
dead_sfd = PassInput();
// If we finished writing, go back to blocking on protocol read.
if (!input_bytes_left_) {
FD_SET(protocol_sfd_.get(), master_read_set_ptr);
FD_CLR(stdinout_sfd_.get(), master_write_set_ptr);
protocol_pfd.events |= POLLIN;
stdinout_pfd.events &= ~POLLOUT;
}
}
// After handling all of the events we've received, check to see if any fds have died.
if (stdinout_pfd.revents & (POLLHUP | POLLRDHUP | POLLERR | POLLNVAL)) {
return &stdinout_sfd_;
}
if (stderr_pfd.revents & (POLLHUP | POLLRDHUP | POLLERR | POLLNVAL)) {
return &stderr_sfd_;
}
if (protocol_pfd.revents & (POLLHUP | POLLRDHUP | POLLERR | POLLNVAL)) {
return &protocol_sfd_;
}
} // while (!dead_sfd)
return dead_sfd;

View file

@ -536,6 +536,36 @@ class ShellTest(DeviceTest):
for i, success in result.iteritems():
self.assertTrue(success)
def disabled_test_parallel(self):
"""Spawn a bunch of `adb shell` instances in parallel.
This was broken historically due to the use of select, which only works
for fds that are numerically less than 1024.
Bug: http://b/141955761"""
n_procs = 2048
procs = dict()
for i in xrange(0, n_procs):
procs[i] = subprocess.Popen(
['adb', 'shell', 'read foo; echo $foo; read rc; exit $rc'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE
)
for i in xrange(0, n_procs):
procs[i].stdin.write("%d\n" % i)
for i in xrange(0, n_procs):
response = procs[i].stdout.readline()
assert(response == "%d\n" % i)
for i in xrange(0, n_procs):
procs[i].stdin.write("%d\n" % (i % 256))
for i in xrange(0, n_procs):
assert(procs[i].wait() == i % 256)
class ArgumentEscapingTest(DeviceTest):
def test_shell_escaping(self):