adb: don't immediately close a socket when write fails.

When we fail to write to a local socket peer, we might still have data
queued up to send to the other side. Defer closing the socket until
we've failed to both read and write.

Bug: http://b/74616284
Test: python test_device.py
Change-Id: Ifc4b8fe95369b4872e475c2ae4ee611dd2d8b9d7
This commit is contained in:
Josh Gao 2018-03-19 13:20:29 -07:00
parent 1222abc75b
commit 184f480547
2 changed files with 128 additions and 142 deletions

View file

@ -209,7 +209,6 @@ TEST_F(LocalSocketTest, write_error_when_having_packets) {
TerminateThread(thread);
}
#if 0
// Ensure that if we fail to write output to an fd, we will still flush data coming from it.
TEST_F(LocalSocketTest, flush_after_shutdown) {
int head_fd[2];
@ -248,7 +247,6 @@ TEST_F(LocalSocketTest, flush_after_shutdown) {
ASSERT_EQ(GetAdditionalLocalSocketCount(), fdevent_installed_count());
TerminateThread(thread);
}
#endif
#if defined(__linux__)

View file

@ -106,50 +106,131 @@ restart:
}
}
enum class SocketFlushResult {
Destroyed,
TryAgain,
Completed,
};
static SocketFlushResult local_socket_flush_incoming(asocket* s) {
while (!s->packet_queue.empty()) {
Range& r = s->packet_queue.front();
int rc = adb_write(s->fd, r.data(), r.size());
if (rc == static_cast<int>(r.size())) {
s->packet_queue.pop_front();
} else if (rc > 0) {
r.drop_front(rc);
fdevent_add(&s->fde, FDE_WRITE);
return SocketFlushResult::TryAgain;
} else if (rc == -1 && errno == EAGAIN) {
fdevent_add(&s->fde, FDE_WRITE);
return SocketFlushResult::TryAgain;
}
// We failed to write, but it's possible that we can still read from the socket.
// Give that a try before giving up.
s->has_write_error = true;
break;
}
// If we sent the last packet of a closing socket, we can now destroy it.
if (s->closing) {
s->close(s);
return SocketFlushResult::Destroyed;
}
fdevent_del(&s->fde, FDE_WRITE);
return SocketFlushResult::Completed;
}
// Returns false if the socket has been closed and destroyed as a side-effect of this function.
static bool local_socket_flush_outgoing(asocket* s) {
const size_t max_payload = s->get_max_payload();
std::string data;
data.resize(max_payload);
char* x = &data[0];
size_t avail = max_payload;
int r = 0;
int is_eof = 0;
while (avail > 0) {
r = adb_read(s->fd, x, avail);
D("LS(%d): post adb_read(fd=%d,...) r=%d (errno=%d) avail=%zu", s->id, s->fd, r,
r < 0 ? errno : 0, avail);
if (r == -1) {
if (errno == EAGAIN) {
break;
}
} else if (r > 0) {
avail -= r;
x += r;
continue;
}
/* r = 0 or unhandled error */
is_eof = 1;
break;
}
D("LS(%d): fd=%d post avail loop. r=%d is_eof=%d forced_eof=%d", s->id, s->fd, r, is_eof,
s->fde.force_eof);
if (avail != max_payload && s->peer) {
data.resize(max_payload - avail);
// s->peer->enqueue() may call s->close() and free s,
// so save variables for debug printing below.
unsigned saved_id = s->id;
int saved_fd = s->fd;
r = s->peer->enqueue(s->peer, std::move(data));
D("LS(%u): fd=%d post peer->enqueue(). r=%d", saved_id, saved_fd, r);
if (r < 0) {
// Error return means they closed us as a side-effect and we must
// return immediately.
//
// Note that if we still have buffered packets, the socket will be
// placed on the closing socket list. This handler function will be
// called again to process FDE_WRITE events.
return false;
}
if (r > 0) {
/* if the remote cannot accept further events,
** we disable notification of READs. They'll
** be enabled again when we get a call to ready()
*/
fdevent_del(&s->fde, FDE_READ);
}
}
// Don't allow a forced eof if data is still there.
if ((s->fde.force_eof && !r) || is_eof) {
D(" closing because is_eof=%d r=%d s->fde.force_eof=%d", is_eof, r, s->fde.force_eof);
s->close(s);
return false;
}
return true;
}
static int local_socket_enqueue(asocket* s, std::string data) {
D("LS(%d): enqueue %zu", s->id, data.size());
Range r(std::move(data));
/* if there is already data queue'd, we will receive
** events when it's time to write. just add this to
** the tail
*/
if (!s->packet_queue.empty()) {
goto enqueue;
}
/* write as much as we can, until we
** would block or there is an error/eof
*/
while (!r.empty()) {
int rc = adb_write(s->fd, r.data(), r.size());
if (rc > 0) {
r.drop_front(rc);
continue;
}
if (rc == 0 || errno != EAGAIN) {
D("LS(%d): not ready, errno=%d: %s", s->id, errno, strerror(errno));
s->has_write_error = true;
s->close(s);
return 1; /* not ready (error) */
} else {
// errno == EAGAIN
break;
}
}
if (r.empty()) {
return 0; /* ready for more data */
}
enqueue:
/* make sure we are notified when we can drain the queue */
s->packet_queue.push_back(std::move(r));
fdevent_add(&s->fde, FDE_WRITE);
switch (local_socket_flush_incoming(s)) {
case SocketFlushResult::Destroyed:
return -1;
return 1; /* not ready (backlog) */
case SocketFlushResult::TryAgain:
return 1;
case SocketFlushResult::Completed:
return 0;
}
return !s->packet_queue.empty();
}
static void local_socket_ready(asocket* s) {
@ -224,114 +305,21 @@ static void local_socket_event_func(int fd, unsigned ev, void* _s) {
** in order to simplify the code.
*/
if (ev & FDE_WRITE) {
while (!s->packet_queue.empty()) {
Range& r = s->packet_queue.front();
while (!r.empty()) {
int rc = adb_write(fd, r.data(), r.size());
if (rc == -1) {
/* returning here is ok because FDE_READ will
** be processed in the next iteration loop
*/
if (errno == EAGAIN) {
return;
}
} else if (rc > 0) {
r.drop_front(rc);
continue;
}
D(" closing after write because rc=%d and errno is %d", rc, errno);
s->has_write_error = true;
s->close(s);
switch (local_socket_flush_incoming(s)) {
case SocketFlushResult::Destroyed:
return;
}
if (r.empty()) {
s->packet_queue.pop_front();
}
case SocketFlushResult::TryAgain:
break;
case SocketFlushResult::Completed:
s->peer->ready(s->peer);
break;
}
/* if we sent the last packet of a closing socket,
** we can now destroy it.
*/
if (s->closing) {
D(" closing because 'closing' is set after write");
s->close(s);
return;
}
/* no more packets queued, so we can ignore
** writable events again and tell our peer
** to resume writing
*/
fdevent_del(&s->fde, FDE_WRITE);
s->peer->ready(s->peer);
}
if (ev & FDE_READ) {
const size_t max_payload = s->get_max_payload();
std::string data;
data.resize(max_payload);
char* x = &data[0];
size_t avail = max_payload;
int r = 0;
int is_eof = 0;
while (avail > 0) {
r = adb_read(fd, x, avail);
D("LS(%d): post adb_read(fd=%d,...) r=%d (errno=%d) avail=%zu", s->id, s->fd, r,
r < 0 ? errno : 0, avail);
if (r == -1) {
if (errno == EAGAIN) {
break;
}
} else if (r > 0) {
avail -= r;
x += r;
continue;
}
/* r = 0 or unhandled error */
is_eof = 1;
break;
}
D("LS(%d): fd=%d post avail loop. r=%d is_eof=%d forced_eof=%d", s->id, s->fd, r, is_eof,
s->fde.force_eof);
if (avail != max_payload && s->peer) {
data.resize(max_payload - avail);
// s->peer->enqueue() may call s->close() and free s,
// so save variables for debug printing below.
unsigned saved_id = s->id;
int saved_fd = s->fd;
r = s->peer->enqueue(s->peer, std::move(data));
D("LS(%u): fd=%d post peer->enqueue(). r=%d", saved_id, saved_fd, r);
if (r < 0) {
/* error return means they closed us as a side-effect
** and we must return immediately.
**
** note that if we still have buffered packets, the
** socket will be placed on the closing socket list.
** this handler function will be called again
** to process FDE_WRITE events.
*/
return;
}
if (r > 0) {
/* if the remote cannot accept further events,
** we disable notification of READs. They'll
** be enabled again when we get a call to ready()
*/
fdevent_del(&s->fde, FDE_READ);
}
}
/* Don't allow a forced eof if data is still there */
if ((s->fde.force_eof && !r) || is_eof) {
D(" closing because is_eof=%d r=%d s->fde.force_eof=%d", is_eof, r, s->fde.force_eof);
s->close(s);
if (!local_socket_flush_outgoing(s)) {
return;
}
}