Merge "Bluetooth: Watch multiple FDs with AsyncFdWatcher"
am: 1143a3fec6
Change-Id: Ib0f1a7f2a87eb82896d69501b5c33bbddebdf154
This commit is contained in:
commit
14d63bfc76
4 changed files with 91 additions and 19 deletions
|
@ -19,6 +19,7 @@
|
|||
#include <algorithm>
|
||||
#include <atomic>
|
||||
#include <condition_variable>
|
||||
#include <map>
|
||||
#include <mutex>
|
||||
#include <thread>
|
||||
#include <vector>
|
||||
|
@ -26,6 +27,8 @@
|
|||
#include "sys/select.h"
|
||||
#include "unistd.h"
|
||||
|
||||
static const int INVALID_FD = -1;
|
||||
|
||||
namespace android {
|
||||
namespace hardware {
|
||||
namespace bluetooth {
|
||||
|
@ -36,8 +39,7 @@ int AsyncFdWatcher::WatchFdForNonBlockingReads(
|
|||
// Add file descriptor and callback
|
||||
{
|
||||
std::unique_lock<std::mutex> guard(internal_mutex_);
|
||||
read_fd_ = file_descriptor;
|
||||
cb_ = on_read_fd_ready_callback;
|
||||
watched_fds_[file_descriptor] = on_read_fd_ready_callback;
|
||||
}
|
||||
|
||||
// Start the thread if not started yet
|
||||
|
@ -58,7 +60,7 @@ int AsyncFdWatcher::ConfigureTimeout(
|
|||
return 0;
|
||||
}
|
||||
|
||||
void AsyncFdWatcher::StopWatchingFileDescriptor() { stopThread(); }
|
||||
void AsyncFdWatcher::StopWatchingFileDescriptors() { stopThread(); }
|
||||
|
||||
AsyncFdWatcher::~AsyncFdWatcher() {}
|
||||
|
||||
|
@ -90,8 +92,7 @@ int AsyncFdWatcher::stopThread() {
|
|||
|
||||
{
|
||||
std::unique_lock<std::mutex> guard(internal_mutex_);
|
||||
cb_ = nullptr;
|
||||
read_fd_ = -1;
|
||||
watched_fds_.clear();
|
||||
}
|
||||
|
||||
{
|
||||
|
@ -115,7 +116,11 @@ void AsyncFdWatcher::ThreadRoutine() {
|
|||
fd_set read_fds;
|
||||
FD_ZERO(&read_fds);
|
||||
FD_SET(notification_listen_fd_, &read_fds);
|
||||
FD_SET(read_fd_, &read_fds);
|
||||
int max_read_fd = INVALID_FD;
|
||||
for (auto& it : watched_fds_) {
|
||||
FD_SET(it.first, &read_fds);
|
||||
max_read_fd = std::max(max_read_fd, it.first);
|
||||
}
|
||||
|
||||
struct timeval timeout;
|
||||
struct timeval* timeout_ptr = NULL;
|
||||
|
@ -126,7 +131,7 @@ void AsyncFdWatcher::ThreadRoutine() {
|
|||
}
|
||||
|
||||
// Wait until there is data available to read on some FD.
|
||||
int nfds = std::max(notification_listen_fd_, read_fd_);
|
||||
int nfds = std::max(notification_listen_fd_, max_read_fd);
|
||||
int retval = select(nfds + 1, &read_fds, NULL, NULL, timeout_ptr);
|
||||
|
||||
// There was some error.
|
||||
|
@ -153,10 +158,21 @@ void AsyncFdWatcher::ThreadRoutine() {
|
|||
continue;
|
||||
}
|
||||
|
||||
// Invoke the data ready callback if appropriate.
|
||||
if (FD_ISSET(read_fd_, &read_fds)) {
|
||||
// Invoke the data ready callbacks if appropriate.
|
||||
std::vector<decltype(watched_fds_)::value_type> saved_callbacks;
|
||||
{
|
||||
std::unique_lock<std::mutex> guard(internal_mutex_);
|
||||
if (cb_) cb_(read_fd_);
|
||||
for (auto& it : watched_fds_) {
|
||||
if (FD_ISSET(it.first, &read_fds)) {
|
||||
saved_callbacks.push_back(it);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (auto& it : saved_callbacks) {
|
||||
if (it.second) {
|
||||
it.second(it.first);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
|
||||
#pragma once
|
||||
|
||||
#include <map>
|
||||
#include <mutex>
|
||||
#include <thread>
|
||||
|
||||
|
@ -36,7 +37,7 @@ class AsyncFdWatcher {
|
|||
const ReadCallback& on_read_fd_ready_callback);
|
||||
int ConfigureTimeout(const std::chrono::milliseconds timeout,
|
||||
const TimeoutCallback& on_timeout_callback);
|
||||
void StopWatchingFileDescriptor();
|
||||
void StopWatchingFileDescriptors();
|
||||
|
||||
private:
|
||||
AsyncFdWatcher(const AsyncFdWatcher&) = delete;
|
||||
|
@ -52,10 +53,9 @@ class AsyncFdWatcher {
|
|||
std::mutex internal_mutex_;
|
||||
std::mutex timeout_mutex_;
|
||||
|
||||
int read_fd_;
|
||||
std::map<int, ReadCallback> watched_fds_;
|
||||
int notification_listen_fd_;
|
||||
int notification_write_fd_;
|
||||
ReadCallback cb_;
|
||||
TimeoutCallback timeout_cb_;
|
||||
std::chrono::milliseconds timeout_ms_;
|
||||
};
|
||||
|
|
|
@ -14,6 +14,8 @@
|
|||
// limitations under the License.
|
||||
//
|
||||
|
||||
#define LOG_TAG "async_fd_watcher_unittest"
|
||||
|
||||
#include "async_fd_watcher.h"
|
||||
#include <gtest/gtest.h>
|
||||
#include <cstdint>
|
||||
|
@ -122,8 +124,8 @@ class AsyncFdWatcherSocketTest : public ::testing::Test {
|
|||
}
|
||||
|
||||
void CleanUpServer() {
|
||||
async_fd_watcher_.StopWatchingFileDescriptor();
|
||||
conn_watcher_.StopWatchingFileDescriptor();
|
||||
async_fd_watcher_.StopWatchingFileDescriptors();
|
||||
conn_watcher_.StopWatchingFileDescriptors();
|
||||
close(socket_fd_);
|
||||
}
|
||||
|
||||
|
@ -211,7 +213,7 @@ TEST_F(AsyncFdWatcherSocketTest, Connect) {
|
|||
});
|
||||
|
||||
ConnectClient();
|
||||
conn_watcher.StopWatchingFileDescriptor();
|
||||
conn_watcher.StopWatchingFileDescriptors();
|
||||
close(socket_fd);
|
||||
}
|
||||
|
||||
|
@ -233,7 +235,7 @@ TEST_F(AsyncFdWatcherSocketTest, TimedOutConnect) {
|
|||
EXPECT_FALSE(timed_out);
|
||||
sleep(1);
|
||||
EXPECT_TRUE(timed_out);
|
||||
conn_watcher.StopWatchingFileDescriptor();
|
||||
conn_watcher.StopWatchingFileDescriptors();
|
||||
close(socket_fd);
|
||||
}
|
||||
|
||||
|
@ -265,10 +267,64 @@ TEST_F(AsyncFdWatcherSocketTest, TimedOutSchedulesTimeout) {
|
|||
sleep(1);
|
||||
EXPECT_TRUE(timed_out);
|
||||
EXPECT_TRUE(timed_out2);
|
||||
conn_watcher.StopWatchingFileDescriptor();
|
||||
conn_watcher.StopWatchingFileDescriptors();
|
||||
close(socket_fd);
|
||||
}
|
||||
|
||||
// Use a single AsyncFdWatcher to watch two file descriptors.
|
||||
TEST_F(AsyncFdWatcherSocketTest, WatchTwoFileDescriptors) {
|
||||
int sockfd[2];
|
||||
socketpair(AF_LOCAL, SOCK_STREAM, 0, sockfd);
|
||||
bool cb1_called = false;
|
||||
bool* cb1_called_ptr = &cb1_called;
|
||||
bool cb2_called = false;
|
||||
bool* cb2_called_ptr = &cb2_called;
|
||||
|
||||
AsyncFdWatcher watcher;
|
||||
watcher.WatchFdForNonBlockingReads(sockfd[0], [cb1_called_ptr](int fd) {
|
||||
char read_buf[1] = {0};
|
||||
int n = TEMP_FAILURE_RETRY(read(fd, read_buf, sizeof(read_buf)));
|
||||
ASSERT_TRUE(n == sizeof(read_buf));
|
||||
ASSERT_TRUE(read_buf[0] == '1');
|
||||
*cb1_called_ptr = true;
|
||||
});
|
||||
|
||||
watcher.WatchFdForNonBlockingReads(sockfd[1], [cb2_called_ptr](int fd) {
|
||||
char read_buf[1] = {0};
|
||||
int n = TEMP_FAILURE_RETRY(read(fd, read_buf, sizeof(read_buf)));
|
||||
ASSERT_TRUE(n == sizeof(read_buf));
|
||||
ASSERT_TRUE(read_buf[0] == '2');
|
||||
*cb2_called_ptr = true;
|
||||
});
|
||||
|
||||
// Fail if the test doesn't pass within 3 seconds
|
||||
watcher.ConfigureTimeout(std::chrono::seconds(3), [this]() {
|
||||
bool connection_timeout = true;
|
||||
ASSERT_FALSE(connection_timeout);
|
||||
});
|
||||
|
||||
EXPECT_FALSE(cb1_called);
|
||||
EXPECT_FALSE(cb2_called);
|
||||
|
||||
char one_buf[1] = {'1'};
|
||||
TEMP_FAILURE_RETRY(write(sockfd[1], one_buf, sizeof(one_buf)));
|
||||
|
||||
sleep(1);
|
||||
|
||||
EXPECT_TRUE(cb1_called);
|
||||
EXPECT_FALSE(cb2_called);
|
||||
|
||||
char two_buf[1] = {'2'};
|
||||
TEMP_FAILURE_RETRY(write(sockfd[0], two_buf, sizeof(two_buf)));
|
||||
|
||||
sleep(1);
|
||||
|
||||
EXPECT_TRUE(cb1_called);
|
||||
EXPECT_TRUE(cb2_called);
|
||||
|
||||
watcher.StopWatchingFileDescriptors();
|
||||
}
|
||||
|
||||
// Use two AsyncFdWatchers to set up a server socket.
|
||||
TEST_F(AsyncFdWatcherSocketTest, ClientServer) {
|
||||
ConfigureServer();
|
||||
|
|
|
@ -274,7 +274,7 @@ bool VendorInterface::Open(InitializeCompleteCallback initialize_complete_cb,
|
|||
}
|
||||
|
||||
void VendorInterface::Close() {
|
||||
fd_watcher_.StopWatchingFileDescriptor();
|
||||
fd_watcher_.StopWatchingFileDescriptors();
|
||||
|
||||
if (lib_interface_ != nullptr) {
|
||||
bt_vendor_lpm_mode_t mode = BT_VND_LPM_DISABLE;
|
||||
|
|
Loading…
Reference in a new issue