Merge "adb: add nonblocking fd Connection."
This commit is contained in:
commit
812ba6a469
4 changed files with 258 additions and 5 deletions
|
@ -104,6 +104,7 @@ libadb_srcs = [
|
|||
"socket_spec.cpp",
|
||||
"sysdeps/errno.cpp",
|
||||
"transport.cpp",
|
||||
"transport_fd.cpp",
|
||||
"transport_local.cpp",
|
||||
"transport_usb.cpp",
|
||||
]
|
||||
|
|
|
@ -92,6 +92,8 @@ struct Connection {
|
|||
std::string transport_name_;
|
||||
ReadCallback read_callback_;
|
||||
ErrorCallback error_callback_;
|
||||
|
||||
static std::unique_ptr<Connection> FromFd(unique_fd fd);
|
||||
};
|
||||
|
||||
// Abstraction for a blocking packet transport.
|
||||
|
|
|
@ -24,13 +24,19 @@
|
|||
#include "sysdeps.h"
|
||||
#include "transport.h"
|
||||
|
||||
#define ADB_CONNECTION_BENCHMARK(benchmark_name, ...) \
|
||||
BENCHMARK_TEMPLATE(benchmark_name, FdConnection, ##__VA_ARGS__) \
|
||||
->Arg(1) \
|
||||
->Arg(16384) \
|
||||
->Arg(MAX_PAYLOAD) \
|
||||
#define ADB_CONNECTION_BENCHMARK(benchmark_name, ...) \
|
||||
BENCHMARK_TEMPLATE(benchmark_name, FdConnection, ##__VA_ARGS__) \
|
||||
->Arg(1) \
|
||||
->Arg(16384) \
|
||||
->Arg(MAX_PAYLOAD) \
|
||||
->UseRealTime(); \
|
||||
BENCHMARK_TEMPLATE(benchmark_name, NonblockingFdConnection, ##__VA_ARGS__) \
|
||||
->Arg(1) \
|
||||
->Arg(16384) \
|
||||
->Arg(MAX_PAYLOAD) \
|
||||
->UseRealTime()
|
||||
|
||||
struct NonblockingFdConnection;
|
||||
template <typename ConnectionType>
|
||||
std::unique_ptr<Connection> MakeConnection(unique_fd fd);
|
||||
|
||||
|
@ -40,6 +46,11 @@ std::unique_ptr<Connection> MakeConnection<FdConnection>(unique_fd fd) {
|
|||
return std::make_unique<BlockingConnectionAdapter>(std::move(fd_connection));
|
||||
}
|
||||
|
||||
template <>
|
||||
std::unique_ptr<Connection> MakeConnection<NonblockingFdConnection>(unique_fd fd) {
|
||||
return Connection::FromFd(std::move(fd));
|
||||
}
|
||||
|
||||
template <typename ConnectionType>
|
||||
void BM_Connection_Unidirectional(benchmark::State& state) {
|
||||
int fds[2];
|
||||
|
|
239
adb/transport_fd.cpp
Normal file
239
adb/transport_fd.cpp
Normal file
|
@ -0,0 +1,239 @@
|
|||
/*
|
||||
* Copyright (C) 2018 The Android Open Source Project
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include <stdint.h>
|
||||
|
||||
#include <deque>
|
||||
#include <mutex>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
|
||||
#include <android-base/logging.h>
|
||||
#include <android-base/stringprintf.h>
|
||||
#include <android-base/thread_annotations.h>
|
||||
|
||||
#include "adb_unique_fd.h"
|
||||
#include "adb_utils.h"
|
||||
#include "sysdeps.h"
|
||||
#include "sysdeps/memory.h"
|
||||
#include "transport.h"
|
||||
#include "types.h"
|
||||
|
||||
static void CreateWakeFds(unique_fd* read, unique_fd* write) {
|
||||
// TODO: eventfd on linux?
|
||||
int wake_fds[2];
|
||||
int rc = adb_socketpair(wake_fds);
|
||||
set_file_block_mode(wake_fds[0], false);
|
||||
set_file_block_mode(wake_fds[1], false);
|
||||
CHECK_EQ(0, rc);
|
||||
*read = unique_fd(wake_fds[0]);
|
||||
*write = unique_fd(wake_fds[1]);
|
||||
}
|
||||
|
||||
struct NonblockingFdConnection : public Connection {
|
||||
NonblockingFdConnection(unique_fd fd) : started_(false), fd_(std::move(fd)) {
|
||||
set_file_block_mode(fd_.get(), false);
|
||||
CreateWakeFds(&wake_fd_read_, &wake_fd_write_);
|
||||
}
|
||||
|
||||
void SetRunning(bool value) {
|
||||
std::lock_guard<std::mutex> lock(run_mutex_);
|
||||
running_ = value;
|
||||
}
|
||||
|
||||
bool IsRunning() {
|
||||
std::lock_guard<std::mutex> lock(run_mutex_);
|
||||
return running_;
|
||||
}
|
||||
|
||||
void Run(std::string* error) {
|
||||
SetRunning(true);
|
||||
while (IsRunning()) {
|
||||
adb_pollfd pfds[2] = {
|
||||
{.fd = fd_.get(), .events = POLLIN},
|
||||
{.fd = wake_fd_read_.get(), .events = POLLIN},
|
||||
};
|
||||
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(this->write_mutex_);
|
||||
if (!writable_) {
|
||||
pfds[0].events |= POLLOUT;
|
||||
}
|
||||
}
|
||||
|
||||
int rc = adb_poll(pfds, 2, -1);
|
||||
if (rc == -1) {
|
||||
*error = android::base::StringPrintf("poll failed: %s", strerror(errno));
|
||||
return;
|
||||
} else if (rc == 0) {
|
||||
LOG(FATAL) << "poll timed out with an infinite timeout?";
|
||||
}
|
||||
|
||||
if (pfds[0].revents) {
|
||||
if ((pfds[0].revents & POLLOUT)) {
|
||||
std::lock_guard<std::mutex> lock(this->write_mutex_);
|
||||
WriteResult result = DispatchWrites();
|
||||
switch (result) {
|
||||
case WriteResult::Error:
|
||||
*error = "write failed";
|
||||
return;
|
||||
|
||||
case WriteResult::Completed:
|
||||
writable_ = true;
|
||||
break;
|
||||
|
||||
case WriteResult::TryAgain:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (pfds[0].revents & POLLIN) {
|
||||
// TODO: Should we be getting blocks from a free list?
|
||||
auto block = std::make_unique<IOVector::block_type>(MAX_PAYLOAD);
|
||||
rc = adb_read(fd_.get(), &(*block)[0], block->size());
|
||||
if (rc == -1) {
|
||||
*error = std::string("read failed: ") + strerror(errno);
|
||||
return;
|
||||
} else if (rc == 0) {
|
||||
*error = "read failed: EOF";
|
||||
return;
|
||||
}
|
||||
block->resize(rc);
|
||||
read_buffer_.append(std::move(block));
|
||||
|
||||
if (!read_header_ && read_buffer_.size() >= sizeof(amessage)) {
|
||||
auto header_buf = read_buffer_.take_front(sizeof(amessage)).coalesce();
|
||||
CHECK_EQ(sizeof(amessage), header_buf.size());
|
||||
read_header_ = std::make_unique<amessage>();
|
||||
memcpy(read_header_.get(), header_buf.data(), sizeof(amessage));
|
||||
}
|
||||
|
||||
if (read_header_ && read_buffer_.size() >= read_header_->data_length) {
|
||||
auto data_chain = read_buffer_.take_front(read_header_->data_length);
|
||||
|
||||
// TODO: Make apacket carry around a IOVector instead of coalescing.
|
||||
auto payload = data_chain.coalesce<apacket::payload_type>();
|
||||
auto packet = std::make_unique<apacket>();
|
||||
packet->msg = *read_header_;
|
||||
packet->payload = std::move(payload);
|
||||
read_header_ = nullptr;
|
||||
read_callback_(this, std::move(packet));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (pfds[1].revents) {
|
||||
uint64_t buf;
|
||||
rc = adb_read(wake_fd_read_.get(), &buf, sizeof(buf));
|
||||
CHECK_EQ(static_cast<int>(sizeof(buf)), rc);
|
||||
|
||||
// We were woken up either to add POLLOUT to our events, or to exit.
|
||||
// Do nothing.
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void Start() override final {
|
||||
if (started_.exchange(true)) {
|
||||
LOG(FATAL) << "Connection started multiple times?";
|
||||
}
|
||||
|
||||
thread_ = std::thread([this]() {
|
||||
std::string error = "connection closed";
|
||||
Run(&error);
|
||||
this->error_callback_(this, error);
|
||||
});
|
||||
}
|
||||
|
||||
void Stop() override final {
|
||||
SetRunning(false);
|
||||
WakeThread();
|
||||
thread_.join();
|
||||
}
|
||||
|
||||
void WakeThread() {
|
||||
uint64_t buf = 0;
|
||||
if (TEMP_FAILURE_RETRY(adb_write(wake_fd_write_.get(), &buf, sizeof(buf))) != sizeof(buf)) {
|
||||
LOG(FATAL) << "failed to wake up thread";
|
||||
}
|
||||
}
|
||||
|
||||
enum class WriteResult {
|
||||
Error,
|
||||
Completed,
|
||||
TryAgain,
|
||||
};
|
||||
|
||||
WriteResult DispatchWrites() REQUIRES(write_mutex_) {
|
||||
CHECK(!write_buffer_.empty());
|
||||
if (!writable_) {
|
||||
return WriteResult::TryAgain;
|
||||
}
|
||||
|
||||
auto iovs = write_buffer_.iovecs();
|
||||
ssize_t rc = adb_writev(fd_.get(), iovs.data(), iovs.size());
|
||||
if (rc == -1) {
|
||||
return WriteResult::Error;
|
||||
} else if (rc == 0) {
|
||||
errno = 0;
|
||||
return WriteResult::Error;
|
||||
}
|
||||
|
||||
// TODO: Implement a more efficient drop_front?
|
||||
write_buffer_.take_front(rc);
|
||||
if (write_buffer_.empty()) {
|
||||
return WriteResult::Completed;
|
||||
}
|
||||
|
||||
// There's data left in the range, which means our write returned early.
|
||||
return WriteResult::TryAgain;
|
||||
}
|
||||
|
||||
bool Write(std::unique_ptr<apacket> packet) final {
|
||||
std::lock_guard<std::mutex> lock(write_mutex_);
|
||||
const char* header_begin = reinterpret_cast<const char*>(&packet->msg);
|
||||
const char* header_end = header_begin + sizeof(packet->msg);
|
||||
auto header_block = std::make_unique<IOVector::block_type>(header_begin, header_end);
|
||||
write_buffer_.append(std::move(header_block));
|
||||
if (!packet->payload.empty()) {
|
||||
write_buffer_.append(std::make_unique<IOVector::block_type>(std::move(packet->payload)));
|
||||
}
|
||||
return DispatchWrites() != WriteResult::Error;
|
||||
}
|
||||
|
||||
std::thread thread_;
|
||||
|
||||
std::atomic<bool> started_;
|
||||
std::mutex run_mutex_;
|
||||
bool running_ GUARDED_BY(run_mutex_);
|
||||
|
||||
std::unique_ptr<amessage> read_header_;
|
||||
IOVector read_buffer_;
|
||||
|
||||
unique_fd fd_;
|
||||
unique_fd wake_fd_read_;
|
||||
unique_fd wake_fd_write_;
|
||||
|
||||
std::mutex write_mutex_;
|
||||
bool writable_ GUARDED_BY(write_mutex_) = true;
|
||||
IOVector write_buffer_ GUARDED_BY(write_mutex_);
|
||||
|
||||
IOVector incoming_queue_;
|
||||
};
|
||||
|
||||
std::unique_ptr<Connection> Connection::FromFd(unique_fd fd) {
|
||||
return std::make_unique<NonblockingFdConnection>(std::move(fd));
|
||||
}
|
Loading…
Reference in a new issue