diff --git a/adb/socket_test.cpp b/adb/socket_test.cpp index 44d327659..6b400565f 100644 --- a/adb/socket_test.cpp +++ b/adb/socket_test.cpp @@ -209,7 +209,6 @@ TEST_F(LocalSocketTest, write_error_when_having_packets) { TerminateThread(thread); } -#if 0 // Ensure that if we fail to write output to an fd, we will still flush data coming from it. TEST_F(LocalSocketTest, flush_after_shutdown) { int head_fd[2]; @@ -248,7 +247,6 @@ TEST_F(LocalSocketTest, flush_after_shutdown) { ASSERT_EQ(GetAdditionalLocalSocketCount(), fdevent_installed_count()); TerminateThread(thread); } -#endif #if defined(__linux__) diff --git a/adb/sockets.cpp b/adb/sockets.cpp index 0007fed7b..e05a3dbc6 100644 --- a/adb/sockets.cpp +++ b/adb/sockets.cpp @@ -106,50 +106,131 @@ restart: } } +enum class SocketFlushResult { + Destroyed, + TryAgain, + Completed, +}; + +static SocketFlushResult local_socket_flush_incoming(asocket* s) { + while (!s->packet_queue.empty()) { + Range& r = s->packet_queue.front(); + + int rc = adb_write(s->fd, r.data(), r.size()); + if (rc == static_cast(r.size())) { + s->packet_queue.pop_front(); + } else if (rc > 0) { + r.drop_front(rc); + fdevent_add(&s->fde, FDE_WRITE); + return SocketFlushResult::TryAgain; + } else if (rc == -1 && errno == EAGAIN) { + fdevent_add(&s->fde, FDE_WRITE); + return SocketFlushResult::TryAgain; + } + + // We failed to write, but it's possible that we can still read from the socket. + // Give that a try before giving up. + s->has_write_error = true; + break; + } + + // If we sent the last packet of a closing socket, we can now destroy it. + if (s->closing) { + s->close(s); + return SocketFlushResult::Destroyed; + } + + fdevent_del(&s->fde, FDE_WRITE); + return SocketFlushResult::Completed; +} + +// Returns false if the socket has been closed and destroyed as a side-effect of this function. +static bool local_socket_flush_outgoing(asocket* s) { + const size_t max_payload = s->get_max_payload(); + std::string data; + data.resize(max_payload); + char* x = &data[0]; + size_t avail = max_payload; + int r = 0; + int is_eof = 0; + + while (avail > 0) { + r = adb_read(s->fd, x, avail); + D("LS(%d): post adb_read(fd=%d,...) r=%d (errno=%d) avail=%zu", s->id, s->fd, r, + r < 0 ? errno : 0, avail); + if (r == -1) { + if (errno == EAGAIN) { + break; + } + } else if (r > 0) { + avail -= r; + x += r; + continue; + } + + /* r = 0 or unhandled error */ + is_eof = 1; + break; + } + D("LS(%d): fd=%d post avail loop. r=%d is_eof=%d forced_eof=%d", s->id, s->fd, r, is_eof, + s->fde.force_eof); + + if (avail != max_payload && s->peer) { + data.resize(max_payload - avail); + + // s->peer->enqueue() may call s->close() and free s, + // so save variables for debug printing below. + unsigned saved_id = s->id; + int saved_fd = s->fd; + r = s->peer->enqueue(s->peer, std::move(data)); + D("LS(%u): fd=%d post peer->enqueue(). r=%d", saved_id, saved_fd, r); + + if (r < 0) { + // Error return means they closed us as a side-effect and we must + // return immediately. + // + // Note that if we still have buffered packets, the socket will be + // placed on the closing socket list. This handler function will be + // called again to process FDE_WRITE events. + return false; + } + + if (r > 0) { + /* if the remote cannot accept further events, + ** we disable notification of READs. They'll + ** be enabled again when we get a call to ready() + */ + fdevent_del(&s->fde, FDE_READ); + } + } + + // Don't allow a forced eof if data is still there. + if ((s->fde.force_eof && !r) || is_eof) { + D(" closing because is_eof=%d r=%d s->fde.force_eof=%d", is_eof, r, s->fde.force_eof); + s->close(s); + return false; + } + + return true; +} + static int local_socket_enqueue(asocket* s, std::string data) { D("LS(%d): enqueue %zu", s->id, data.size()); Range r(std::move(data)); - - /* if there is already data queue'd, we will receive - ** events when it's time to write. just add this to - ** the tail - */ - if (!s->packet_queue.empty()) { - goto enqueue; - } - - /* write as much as we can, until we - ** would block or there is an error/eof - */ - while (!r.empty()) { - int rc = adb_write(s->fd, r.data(), r.size()); - if (rc > 0) { - r.drop_front(rc); - continue; - } - - if (rc == 0 || errno != EAGAIN) { - D("LS(%d): not ready, errno=%d: %s", s->id, errno, strerror(errno)); - s->has_write_error = true; - s->close(s); - return 1; /* not ready (error) */ - } else { - // errno == EAGAIN - break; - } - } - - if (r.empty()) { - return 0; /* ready for more data */ - } - -enqueue: - /* make sure we are notified when we can drain the queue */ s->packet_queue.push_back(std::move(r)); - fdevent_add(&s->fde, FDE_WRITE); + switch (local_socket_flush_incoming(s)) { + case SocketFlushResult::Destroyed: + return -1; - return 1; /* not ready (backlog) */ + case SocketFlushResult::TryAgain: + return 1; + + case SocketFlushResult::Completed: + return 0; + } + + return !s->packet_queue.empty(); } static void local_socket_ready(asocket* s) { @@ -224,114 +305,21 @@ static void local_socket_event_func(int fd, unsigned ev, void* _s) { ** in order to simplify the code. */ if (ev & FDE_WRITE) { - while (!s->packet_queue.empty()) { - Range& r = s->packet_queue.front(); - while (!r.empty()) { - int rc = adb_write(fd, r.data(), r.size()); - if (rc == -1) { - /* returning here is ok because FDE_READ will - ** be processed in the next iteration loop - */ - if (errno == EAGAIN) { - return; - } - } else if (rc > 0) { - r.drop_front(rc); - continue; - } - - D(" closing after write because rc=%d and errno is %d", rc, errno); - s->has_write_error = true; - s->close(s); + switch (local_socket_flush_incoming(s)) { + case SocketFlushResult::Destroyed: return; - } - if (r.empty()) { - s->packet_queue.pop_front(); - } + case SocketFlushResult::TryAgain: + break; + + case SocketFlushResult::Completed: + s->peer->ready(s->peer); + break; } - - /* if we sent the last packet of a closing socket, - ** we can now destroy it. - */ - if (s->closing) { - D(" closing because 'closing' is set after write"); - s->close(s); - return; - } - - /* no more packets queued, so we can ignore - ** writable events again and tell our peer - ** to resume writing - */ - fdevent_del(&s->fde, FDE_WRITE); - s->peer->ready(s->peer); } if (ev & FDE_READ) { - const size_t max_payload = s->get_max_payload(); - std::string data; - data.resize(max_payload); - char* x = &data[0]; - size_t avail = max_payload; - int r = 0; - int is_eof = 0; - - while (avail > 0) { - r = adb_read(fd, x, avail); - D("LS(%d): post adb_read(fd=%d,...) r=%d (errno=%d) avail=%zu", s->id, s->fd, r, - r < 0 ? errno : 0, avail); - if (r == -1) { - if (errno == EAGAIN) { - break; - } - } else if (r > 0) { - avail -= r; - x += r; - continue; - } - - /* r = 0 or unhandled error */ - is_eof = 1; - break; - } - D("LS(%d): fd=%d post avail loop. r=%d is_eof=%d forced_eof=%d", s->id, s->fd, r, is_eof, - s->fde.force_eof); - - if (avail != max_payload && s->peer) { - data.resize(max_payload - avail); - - // s->peer->enqueue() may call s->close() and free s, - // so save variables for debug printing below. - unsigned saved_id = s->id; - int saved_fd = s->fd; - r = s->peer->enqueue(s->peer, std::move(data)); - D("LS(%u): fd=%d post peer->enqueue(). r=%d", saved_id, saved_fd, r); - - if (r < 0) { - /* error return means they closed us as a side-effect - ** and we must return immediately. - ** - ** note that if we still have buffered packets, the - ** socket will be placed on the closing socket list. - ** this handler function will be called again - ** to process FDE_WRITE events. - */ - return; - } - - if (r > 0) { - /* if the remote cannot accept further events, - ** we disable notification of READs. They'll - ** be enabled again when we get a call to ready() - */ - fdevent_del(&s->fde, FDE_READ); - } - } - /* Don't allow a forced eof if data is still there */ - if ((s->fde.force_eof && !r) || is_eof) { - D(" closing because is_eof=%d r=%d s->fde.force_eof=%d", is_eof, r, s->fde.force_eof); - s->close(s); + if (!local_socket_flush_outgoing(s)) { return; } }