Merge changes Iff1ec14b,I32123c51,I517dbcba,Ic215c7fe

* changes:
  adb: try harder to fill our test sockets.
  adb: don't set has_write_error on success.
  adb: move ownership of the fdevent thread into FdeventTest.
  adb: guarantee that fdevent_run_on_main_thread happens last.
This commit is contained in:
Josh Gao 2018-04-03 01:44:14 +00:00 committed by Gerrit Code Review
commit b9b967e2fc
5 changed files with 114 additions and 87 deletions

View file

@ -75,6 +75,7 @@ static std::atomic<bool> terminate_loop(false);
static bool main_thread_valid;
static uint64_t main_thread_id;
static bool run_needs_flush = false;
static auto& run_queue_notify_fd = *new unique_fd();
static auto& run_queue_mutex = *new std::mutex();
static auto& run_queue GUARDED_BY(run_queue_mutex) = *new std::deque<std::function<void()>>();
@ -317,7 +318,8 @@ static void fdevent_run_func(int fd, unsigned ev, void* /* userdata */) {
PLOG(FATAL) << "failed to empty run queue notify fd";
}
fdevent_run_flush();
// Mark that we need to flush, and then run it at the end of fdevent_loop.
run_needs_flush = true;
}
static void fdevent_run_setup() {
@ -378,6 +380,11 @@ void fdevent_loop() {
g_pending_list.pop_front();
fdevent_call_fdfunc(fde);
}
if (run_needs_flush) {
fdevent_run_flush();
run_needs_flush = false;
}
}
}

View file

@ -80,30 +80,7 @@ struct ThreadArg {
TEST_F(FdeventTest, fdevent_terminate) {
PrepareThread();
std::thread thread(fdevent_loop);
TerminateThread(thread);
}
static void FdEventThreadFunc(ThreadArg* arg) {
std::vector<int> read_fds;
std::vector<int> write_fds;
read_fds.push_back(arg->first_read_fd);
for (size_t i = 0; i < arg->middle_pipe_count; ++i) {
int fds[2];
ASSERT_EQ(0, adb_socketpair(fds));
read_fds.push_back(fds[0]);
write_fds.push_back(fds[1]);
}
write_fds.push_back(arg->last_write_fd);
std::vector<std::unique_ptr<FdHandler>> fd_handlers;
for (size_t i = 0; i < read_fds.size(); ++i) {
fd_handlers.push_back(std::make_unique<FdHandler>(read_fds[i], write_fds[i]));
}
fdevent_loop();
TerminateThread();
}
TEST_F(FdeventTest, smoke) {
@ -122,7 +99,26 @@ TEST_F(FdeventTest, smoke) {
int reader = fd_pair2[0];
PrepareThread();
std::thread thread(FdEventThreadFunc, &thread_arg);
std::vector<std::unique_ptr<FdHandler>> fd_handlers;
fdevent_run_on_main_thread([&thread_arg, &fd_handlers]() {
std::vector<int> read_fds;
std::vector<int> write_fds;
read_fds.push_back(thread_arg.first_read_fd);
for (size_t i = 0; i < thread_arg.middle_pipe_count; ++i) {
int fds[2];
ASSERT_EQ(0, adb_socketpair(fds));
read_fds.push_back(fds[0]);
write_fds.push_back(fds[1]);
}
write_fds.push_back(thread_arg.last_write_fd);
for (size_t i = 0; i < read_fds.size(); ++i) {
fd_handlers.push_back(std::make_unique<FdHandler>(read_fds[i], write_fds[i]));
}
});
WaitForFdeventLoop();
for (size_t i = 0; i < MESSAGE_LOOP_COUNT; ++i) {
std::string read_buffer = MESSAGE;
@ -132,7 +128,10 @@ TEST_F(FdeventTest, smoke) {
ASSERT_EQ(read_buffer, write_buffer);
}
TerminateThread(thread);
fdevent_run_on_main_thread([&fd_handlers]() { fd_handlers.clear(); });
WaitForFdeventLoop();
TerminateThread();
ASSERT_EQ(0, adb_close(writer));
ASSERT_EQ(0, adb_close(reader));
}
@ -143,7 +142,7 @@ struct InvalidFdArg {
size_t* happened_event_count;
};
static void InvalidFdEventCallback(int fd, unsigned events, void* userdata) {
static void InvalidFdEventCallback(int, unsigned events, void* userdata) {
InvalidFdArg* arg = reinterpret_cast<InvalidFdArg*>(userdata);
ASSERT_EQ(arg->expected_events, events);
fdevent_remove(&arg->fde);
@ -179,7 +178,6 @@ TEST_F(FdeventTest, run_on_main_thread) {
std::vector<int> vec;
PrepareThread();
std::thread thread(fdevent_loop);
// Block the main thread for a long time while we queue our callbacks.
fdevent_run_on_main_thread([]() {
@ -194,7 +192,7 @@ TEST_F(FdeventTest, run_on_main_thread) {
});
}
TerminateThread(thread);
TerminateThread();
ASSERT_EQ(1000000u, vec.size());
for (int i = 0; i < 1000000; ++i) {
@ -218,11 +216,8 @@ TEST_F(FdeventTest, run_on_main_thread_reentrant) {
std::vector<int> vec;
PrepareThread();
std::thread thread(fdevent_loop);
fdevent_run_on_main_thread(make_appender(&vec, 0));
TerminateThread(thread);
TerminateThread();
ASSERT_EQ(100u, vec.size());
for (int i = 0; i < 100; ++i) {

View file

@ -16,10 +16,31 @@
#include <gtest/gtest.h>
#include <condition_variable>
#include <mutex>
#include <thread>
#include "socket.h"
#include "sysdeps.h"
#include "sysdeps/chrono.h"
static void WaitForFdeventLoop() {
// Sleep for a bit to make sure that network events have propagated.
std::this_thread::sleep_for(100ms);
// fdevent_run_on_main_thread has a guaranteed ordering, and is guaranteed to happen after
// socket events, so as soon as our function is called, we know that we've processed all
// previous events.
std::mutex mutex;
std::condition_variable cv;
std::unique_lock<std::mutex> lock(mutex);
fdevent_run_on_main_thread([&]() {
mutex.lock();
mutex.unlock();
cv.notify_one();
});
cv.wait(lock);
}
class FdeventTest : public ::testing::Test {
protected:
@ -49,6 +70,9 @@ class FdeventTest : public ::testing::Test {
}
dummy_socket->ready(dummy_socket);
dummy = dummy_fds[0];
thread_ = std::thread([]() { fdevent_loop(); });
WaitForFdeventLoop();
}
size_t GetAdditionalLocalSocketCount() {
@ -56,10 +80,12 @@ class FdeventTest : public ::testing::Test {
return 2;
}
void TerminateThread(std::thread& thread) {
void TerminateThread() {
fdevent_terminate_loop();
ASSERT_TRUE(WriteFdExactly(dummy, "", 1));
thread.join();
thread_.join();
ASSERT_EQ(0, adb_close(dummy));
}
std::thread thread_;
};

View file

@ -42,10 +42,6 @@ struct ThreadArg {
class LocalSocketTest : public FdeventTest {};
static void WaitForFdeventLoop() {
std::this_thread::sleep_for(100ms);
}
TEST_F(LocalSocketTest, smoke) {
// Join two socketpairs with a chain of intermediate socketpairs.
int first[2];
@ -86,7 +82,6 @@ TEST_F(LocalSocketTest, smoke) {
connect(prev_tail, end);
PrepareThread();
std::thread thread(fdevent_loop);
for (size_t i = 0; i < MESSAGE_LOOP_COUNT; ++i) {
std::string read_buffer = MESSAGE;
@ -102,7 +97,7 @@ TEST_F(LocalSocketTest, smoke) {
// Wait until the local sockets are closed.
WaitForFdeventLoop();
ASSERT_EQ(GetAdditionalLocalSocketCount(), fdevent_installed_count());
TerminateThread(thread);
TerminateThread();
}
struct CloseWithPacketArg {
@ -111,24 +106,39 @@ struct CloseWithPacketArg {
int cause_close_fd;
};
static void CloseWithPacketThreadFunc(CloseWithPacketArg* arg) {
asocket* s = create_local_socket(arg->socket_fd);
ASSERT_TRUE(s != nullptr);
arg->bytes_written = 0;
static void CreateCloser(CloseWithPacketArg* arg) {
fdevent_run_on_main_thread([arg]() {
asocket* s = create_local_socket(arg->socket_fd);
ASSERT_TRUE(s != nullptr);
arg->bytes_written = 0;
std::string data;
data.resize(MAX_PAYLOAD);
arg->bytes_written += data.size();
int ret = s->enqueue(s, std::move(data));
ASSERT_EQ(1, ret);
// On platforms that implement sockets via underlying sockets (e.g. Wine),
// a socket can appear to be full, and then become available for writes
// again without read being called on the other end. Loop and sleep after
// each write to give the underlying implementation time to flush.
bool socket_filled = false;
for (int i = 0; i < 128; ++i) {
std::string data;
data.resize(MAX_PAYLOAD);
arg->bytes_written += data.size();
int ret = s->enqueue(s, std::move(data));
if (ret == 1) {
socket_filled = true;
break;
}
ASSERT_NE(-1, ret);
asocket* cause_close_s = create_local_socket(arg->cause_close_fd);
ASSERT_TRUE(cause_close_s != nullptr);
cause_close_s->peer = s;
s->peer = cause_close_s;
cause_close_s->ready(cause_close_s);
std::this_thread::sleep_for(250ms);
}
ASSERT_TRUE(socket_filled);
fdevent_loop();
asocket* cause_close_s = create_local_socket(arg->cause_close_fd);
ASSERT_TRUE(cause_close_s != nullptr);
cause_close_s->peer = s;
s->peer = cause_close_s;
cause_close_s->ready(cause_close_s);
});
WaitForFdeventLoop();
}
// This test checks if we can close local socket in the following situation:
@ -145,9 +155,8 @@ TEST_F(LocalSocketTest, close_socket_with_packet) {
arg.cause_close_fd = cause_close_fd[1];
PrepareThread();
std::thread thread(CloseWithPacketThreadFunc, &arg);
CreateCloser(&arg);
WaitForFdeventLoop();
ASSERT_EQ(0, adb_close(cause_close_fd[0]));
WaitForFdeventLoop();
@ -156,7 +165,7 @@ TEST_F(LocalSocketTest, close_socket_with_packet) {
WaitForFdeventLoop();
ASSERT_EQ(GetAdditionalLocalSocketCount(), fdevent_installed_count());
TerminateThread(thread);
TerminateThread();
}
// This test checks if we can read packets from a closing local socket.
@ -170,7 +179,7 @@ TEST_F(LocalSocketTest, read_from_closing_socket) {
arg.cause_close_fd = cause_close_fd[1];
PrepareThread();
std::thread thread(CloseWithPacketThreadFunc, &arg);
CreateCloser(&arg);
WaitForFdeventLoop();
ASSERT_EQ(0, adb_close(cause_close_fd[0]));
@ -186,7 +195,7 @@ TEST_F(LocalSocketTest, read_from_closing_socket) {
WaitForFdeventLoop();
ASSERT_EQ(GetAdditionalLocalSocketCount(), fdevent_installed_count());
TerminateThread(thread);
TerminateThread();
}
// This test checks if we can close local socket in the following situation:
@ -203,7 +212,7 @@ TEST_F(LocalSocketTest, write_error_when_having_packets) {
arg.cause_close_fd = cause_close_fd[1];
PrepareThread();
std::thread thread(CloseWithPacketThreadFunc, &arg);
CreateCloser(&arg);
WaitForFdeventLoop();
EXPECT_EQ(2u + GetAdditionalLocalSocketCount(), fdevent_installed_count());
@ -211,7 +220,7 @@ TEST_F(LocalSocketTest, write_error_when_having_packets) {
WaitForFdeventLoop();
ASSERT_EQ(GetAdditionalLocalSocketCount(), fdevent_installed_count());
TerminateThread(thread);
TerminateThread();
}
// Ensure that if we fail to write output to an fd, we will still flush data coming from it.
@ -231,7 +240,6 @@ TEST_F(LocalSocketTest, flush_after_shutdown) {
tail->ready(tail);
PrepareThread();
std::thread thread(fdevent_loop);
EXPECT_TRUE(WriteFdExactly(head_fd[0], "foo", 3));
@ -249,7 +257,7 @@ TEST_F(LocalSocketTest, flush_after_shutdown) {
WaitForFdeventLoop();
ASSERT_EQ(GetAdditionalLocalSocketCount(), fdevent_installed_count());
TerminateThread(thread);
TerminateThread();
}
#if defined(__linux__)
@ -258,21 +266,10 @@ static void ClientThreadFunc() {
std::string error;
int fd = network_loopback_client(5038, SOCK_STREAM, &error);
ASSERT_GE(fd, 0) << error;
std::this_thread::sleep_for(200ms);
std::this_thread::sleep_for(1s);
ASSERT_EQ(0, adb_close(fd));
}
struct CloseRdHupSocketArg {
int socket_fd;
};
static void CloseRdHupSocketThreadFunc(CloseRdHupSocketArg* arg) {
asocket* s = create_local_socket(arg->socket_fd);
ASSERT_TRUE(s != nullptr);
fdevent_loop();
}
// This test checks if we can close sockets in CLOSE_WAIT state.
TEST_F(LocalSocketTest, close_socket_in_CLOSE_WAIT_state) {
std::string error;
@ -283,11 +280,13 @@ TEST_F(LocalSocketTest, close_socket_in_CLOSE_WAIT_state) {
int accept_fd = adb_socket_accept(listen_fd, nullptr, nullptr);
ASSERT_GE(accept_fd, 0);
CloseRdHupSocketArg arg;
arg.socket_fd = accept_fd;
PrepareThread();
std::thread thread(CloseRdHupSocketThreadFunc, &arg);
fdevent_run_on_main_thread([accept_fd]() {
asocket* s = create_local_socket(accept_fd);
ASSERT_TRUE(s != nullptr);
});
WaitForFdeventLoop();
EXPECT_EQ(1u + GetAdditionalLocalSocketCount(), fdevent_installed_count());
@ -297,7 +296,7 @@ TEST_F(LocalSocketTest, close_socket_in_CLOSE_WAIT_state) {
WaitForFdeventLoop();
ASSERT_EQ(GetAdditionalLocalSocketCount(), fdevent_installed_count());
TerminateThread(thread);
TerminateThread();
}
#endif // defined(__linux__)

View file

@ -126,12 +126,12 @@ static SocketFlushResult local_socket_flush_incoming(asocket* s) {
} else if (rc == -1 && errno == EAGAIN) {
fdevent_add(&s->fde, FDE_WRITE);
return SocketFlushResult::TryAgain;
} else {
// We failed to write, but it's possible that we can still read from the socket.
// Give that a try before giving up.
s->has_write_error = true;
break;
}
// We failed to write, but it's possible that we can still read from the socket.
// Give that a try before giving up.
s->has_write_error = true;
break;
}
// If we sent the last packet of a closing socket, we can now destroy it.