Merge changes I4d6da40d,I91c7ced5,I7b9f6d18

* changes:
  adb: Add a test for emulator connection
  adb: Improve test_adb a bit more
  adb: Add a way to reconnect TCP transports
This commit is contained in:
Treehugger Robot 2018-05-21 22:37:21 +00:00 committed by Gerrit Code Review
commit 2fa9770c3a
5 changed files with 428 additions and 106 deletions

View file

@ -117,6 +117,7 @@ int adb_server_main(int is_daemon, const std::string& socket_spec, int ack_reply
atexit(adb_server_cleanup);
init_transport_registration();
init_reconnect_handler();
init_mdns_transport_discovery();
usb_init();

View file

@ -36,10 +36,11 @@ import adb
@contextlib.contextmanager
def fake_adb_server(protocol=socket.AF_INET, port=0):
"""Creates a fake ADB server that just replies with a CNXN packet."""
def fake_adbd(protocol=socket.AF_INET, port=0):
"""Creates a fake ADB daemon that just replies with a CNXN packet."""
serversock = socket.socket(protocol, socket.SOCK_STREAM)
serversock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
if protocol == socket.AF_INET:
serversock.bind(('127.0.0.1', port))
else:
@ -60,31 +61,33 @@ def fake_adb_server(protocol=socket.AF_INET, port=0):
rlist = [readpipe, serversock]
cnxn_sent = {}
while True:
ready, _, _ = select.select(rlist, [], [])
for r in ready:
if r == readpipe:
read_ready, _, _ = select.select(rlist, [], [])
for ready in read_ready:
if ready == readpipe:
# Closure pipe
os.close(r)
os.close(ready)
serversock.shutdown(socket.SHUT_RDWR)
serversock.close()
return
elif r == serversock:
elif ready == serversock:
# Server socket
conn, _ = r.accept()
conn, _ = ready.accept()
rlist.append(conn)
else:
# Client socket
data = r.recv(1024)
if not data:
if r in cnxn_sent:
del cnxn_sent[r]
rlist.remove(r)
data = ready.recv(1024)
if not data or data.startswith('OPEN'):
if ready in cnxn_sent:
del cnxn_sent[ready]
ready.shutdown(socket.SHUT_RDWR)
ready.close()
rlist.remove(ready)
continue
if r in cnxn_sent:
if ready in cnxn_sent:
continue
cnxn_sent[r] = True
r.sendall(_adb_packet('CNXN', 0x01000001, 1024 * 1024,
'device::ro.product.name=fakeadb'))
cnxn_sent[ready] = True
ready.sendall(_adb_packet('CNXN', 0x01000001, 1024 * 1024,
'device::ro.product.name=fakeadb'))
port = serversock.getsockname()[1]
server_thread = threading.Thread(target=_handle)
@ -97,8 +100,52 @@ def fake_adb_server(protocol=socket.AF_INET, port=0):
server_thread.join()
class NonApiTest(unittest.TestCase):
"""Tests for ADB that aren't a part of the AndroidDevice API."""
@contextlib.contextmanager
def adb_connect(unittest, serial):
"""Context manager for an ADB connection.
This automatically disconnects when done with the connection.
"""
output = subprocess.check_output(['adb', 'connect', serial])
unittest.assertEqual(output.strip(), 'connected to {}'.format(serial))
try:
yield
finally:
# Perform best-effort disconnection. Discard the output.
subprocess.Popen(['adb', 'disconnect', serial],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE).communicate()
@contextlib.contextmanager
def adb_server():
"""Context manager for an ADB server.
This creates an ADB server and returns the port it's listening on.
"""
port = 5038
# Kill any existing server on this non-default port.
subprocess.check_output(['adb', '-P', str(port), 'kill-server'],
stderr=subprocess.STDOUT)
read_pipe, write_pipe = os.pipe()
proc = subprocess.Popen(['adb', '-L', 'tcp:localhost:{}'.format(port),
'fork-server', 'server',
'--reply-fd', str(write_pipe)])
try:
os.close(write_pipe)
greeting = os.read(read_pipe, 1024)
assert greeting == 'OK\n', repr(greeting)
yield port
finally:
proc.terminate()
proc.wait()
class CommandlineTest(unittest.TestCase):
"""Tests for the ADB commandline."""
def test_help(self):
"""Make sure we get _something_ out of help."""
@ -120,28 +167,37 @@ class NonApiTest(unittest.TestCase):
revision_line, r'^Revision [0-9a-f]{12}-android$')
def test_tcpip_error_messages(self):
p = subprocess.Popen(['adb', 'tcpip'], stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
out, _ = p.communicate()
self.assertEqual(1, p.returncode)
"""Make sure 'adb tcpip' parsing is sane."""
proc = subprocess.Popen(['adb', 'tcpip'], stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
out, _ = proc.communicate()
self.assertEqual(1, proc.returncode)
self.assertIn('requires an argument', out)
p = subprocess.Popen(['adb', 'tcpip', 'foo'], stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
out, _ = p.communicate()
self.assertEqual(1, p.returncode)
proc = subprocess.Popen(['adb', 'tcpip', 'foo'], stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
out, _ = proc.communicate()
self.assertEqual(1, proc.returncode)
self.assertIn('invalid port', out)
# Helper method that reads a pipe until it is closed, then sets the event.
def _read_pipe_and_set_event(self, pipe, event):
x = pipe.read()
class ServerTest(unittest.TestCase):
"""Tests for the ADB server."""
@staticmethod
def _read_pipe_and_set_event(pipe, event):
"""Reads a pipe until it is closed, then sets the event."""
pipe.read()
event.set()
# Test that launch_server() does not let the adb server inherit
# stdin/stdout/stderr handles which can cause callers of adb.exe to hang.
# This test also runs fine on unix even though the impetus is an issue
# unique to Windows.
def test_handle_inheritance(self):
"""Test that launch_server() does not inherit handles.
launch_server() should not let the adb server inherit
stdin/stdout/stderr handles, which can cause callers of adb.exe to hang.
This test also runs fine on unix even though the impetus is an issue
unique to Windows.
"""
# This test takes 5 seconds to run on Windows: if there is no adb server
# running on the the port used below, adb kill-server tries to make a
# TCP connection to a closed port and that takes 1 second on Windows;
@ -163,29 +219,30 @@ class NonApiTest(unittest.TestCase):
try:
# Run the adb client and have it start the adb server.
p = subprocess.Popen(['adb', '-P', str(port), 'start-server'],
stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
proc = subprocess.Popen(['adb', '-P', str(port), 'start-server'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
# Start threads that set events when stdout/stderr are closed.
stdout_event = threading.Event()
stdout_thread = threading.Thread(
target=self._read_pipe_and_set_event,
args=(p.stdout, stdout_event))
target=ServerTest._read_pipe_and_set_event,
args=(proc.stdout, stdout_event))
stdout_thread.daemon = True
stdout_thread.start()
stderr_event = threading.Event()
stderr_thread = threading.Thread(
target=self._read_pipe_and_set_event,
args=(p.stderr, stderr_event))
target=ServerTest._read_pipe_and_set_event,
args=(proc.stderr, stderr_event))
stderr_thread.daemon = True
stderr_thread.start()
# Wait for the adb client to finish. Once that has occurred, if
# stdin/stderr/stdout are still open, it must be open in the adb
# server.
p.wait()
proc.wait()
# Try to write to stdin which we expect is closed. If it isn't
# closed, we should get an IOError. If we don't get an IOError,
@ -193,7 +250,7 @@ class NonApiTest(unittest.TestCase):
# probably letting the adb server inherit stdin which would be
# wrong.
with self.assertRaises(IOError):
p.stdin.write('x')
proc.stdin.write('x')
# Wait a few seconds for stdout/stderr to be closed (in the success
# case, this won't wait at all). If there is a timeout, that means
@ -207,8 +264,12 @@ class NonApiTest(unittest.TestCase):
subprocess.check_output(['adb', '-P', str(port), 'kill-server'],
stderr=subprocess.STDOUT)
# Use SO_LINGER to cause TCP RST segment to be sent on socket close.
class EmulatorTest(unittest.TestCase):
"""Tests for the emulator connection."""
def _reset_socket_on_close(self, sock):
"""Use SO_LINGER to cause TCP RST segment to be sent on socket close."""
# The linger structure is two shorts on Windows, but two ints on Unix.
linger_format = 'hh' if os.name == 'nt' else 'ii'
l_onoff = 1
@ -227,7 +288,7 @@ class NonApiTest(unittest.TestCase):
Bug: https://code.google.com/p/android/issues/detail?id=21021
"""
with contextlib.closing(
socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as listener:
socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as listener:
# Use SO_REUSEADDR so subsequent runs of the test can grab the port
# even if it is in TIME_WAIT.
listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
@ -237,7 +298,7 @@ class NonApiTest(unittest.TestCase):
# Now that listening has started, start adb emu kill, telling it to
# connect to our mock emulator.
p = subprocess.Popen(
proc = subprocess.Popen(
['adb', '-s', 'emulator-' + str(port), 'emu', 'kill'],
stderr=subprocess.STDOUT)
@ -246,12 +307,16 @@ class NonApiTest(unittest.TestCase):
# If WSAECONNABORTED (10053) is raised by any socket calls,
# then adb probably isn't reading the data that we sent it.
conn.sendall('Android Console: type \'help\' for a list ' +
'of commands\r\n')
'of commands\r\n')
conn.sendall('OK\r\n')
with contextlib.closing(conn.makefile()) as f:
self.assertEqual('kill\n', f.readline())
self.assertEqual('quit\n', f.readline())
with contextlib.closing(conn.makefile()) as connf:
line = connf.readline()
if line.startswith('auth'):
# Ignore the first auth line.
line = connf.readline()
self.assertEqual('kill\n', line)
self.assertEqual('quit\n', connf.readline())
conn.sendall('OK: killing emulator, bye bye\r\n')
@ -264,11 +329,48 @@ class NonApiTest(unittest.TestCase):
self._reset_socket_on_close(conn)
# Wait for adb to finish, so we can check return code.
p.communicate()
proc.communicate()
# If this fails, adb probably isn't ignoring WSAECONNRESET when
# reading the response from the adb emu kill command (on Windows).
self.assertEqual(0, p.returncode)
self.assertEqual(0, proc.returncode)
def test_emulator_connect(self):
"""Ensure that the emulator can connect.
Bug: http://b/78991667
"""
with adb_server() as server_port:
with fake_adbd() as port:
serial = 'emulator-{}'.format(port - 1)
# Ensure that the emulator is not there.
try:
subprocess.check_output(['adb', '-P', str(server_port),
'-s', serial, 'get-state'],
stderr=subprocess.STDOUT)
self.fail('Device should not be available')
except subprocess.CalledProcessError as err:
self.assertEqual(
err.output.strip(),
'error: device \'{}\' not found'.format(serial))
# Let the ADB server know that the emulator has started.
with contextlib.closing(
socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
sock.connect(('localhost', server_port))
command = 'host:emulator:{}'.format(port)
sock.sendall('%04x%s' % (len(command), command))
# Ensure the emulator is there.
subprocess.check_call(['adb', '-P', str(server_port),
'-s', serial, 'wait-for-device'])
output = subprocess.check_output(['adb', '-P', str(server_port),
'-s', serial, 'get-state'])
self.assertEqual(output.strip(), 'device')
class ConnectionTest(unittest.TestCase):
"""Tests for adb connect."""
def test_connect_ipv4_ipv6(self):
"""Ensure that `adb connect localhost:1234` will try both IPv4 and IPv6.
@ -277,38 +379,67 @@ class NonApiTest(unittest.TestCase):
"""
for protocol in (socket.AF_INET, socket.AF_INET6):
try:
with fake_adb_server(protocol=protocol) as port:
output = subprocess.check_output(
['adb', 'connect', 'localhost:{}'.format(port)])
self.assertEqual(
output.strip(), 'connected to localhost:{}'.format(port))
with fake_adbd(protocol=protocol) as port:
serial = 'localhost:{}'.format(port)
with adb_connect(self, serial):
pass
except socket.error:
print("IPv6 not available, skipping")
continue
def test_already_connected(self):
with fake_adb_server() as port:
output = subprocess.check_output(
['adb', 'connect', 'localhost:{}'.format(port)])
"""Ensure that an already-connected device stays connected."""
self.assertEqual(
output.strip(), 'connected to localhost:{}'.format(port))
with fake_adbd() as port:
serial = 'localhost:{}'.format(port)
with adb_connect(self, serial):
# b/31250450: this always returns 0 but probably shouldn't.
output = subprocess.check_output(['adb', 'connect', serial])
self.assertEqual(
output.strip(), 'already connected to {}'.format(serial))
# b/31250450: this always returns 0 but probably shouldn't.
output = subprocess.check_output(
['adb', 'connect', 'localhost:{}'.format(port)])
def test_reconnect(self):
"""Ensure that a disconnected device reconnects."""
with fake_adbd() as port:
serial = 'localhost:{}'.format(port)
with adb_connect(self, serial):
output = subprocess.check_output(['adb', '-s', serial,
'get-state'])
self.assertEqual(output.strip(), 'device')
# This will fail.
proc = subprocess.Popen(['adb', '-s', serial, 'shell', 'true'],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
output, _ = proc.communicate()
self.assertEqual(output.strip(), 'error: closed')
subprocess.check_call(['adb', '-s', serial, 'wait-for-device'])
output = subprocess.check_output(['adb', '-s', serial,
'get-state'])
self.assertEqual(output.strip(), 'device')
# Once we explicitly kick a device, it won't attempt to
# reconnect.
output = subprocess.check_output(['adb', 'disconnect', serial])
self.assertEqual(
output.strip(), 'disconnected {}'.format(serial))
try:
subprocess.check_output(['adb', '-s', serial, 'get-state'],
stderr=subprocess.STDOUT)
self.fail('Device should not be available')
except subprocess.CalledProcessError as err:
self.assertEqual(
err.output.strip(),
'error: device \'{}\' not found'.format(serial))
self.assertEqual(
output.strip(), 'already connected to localhost:{}'.format(port))
def main():
"""Main entrypoint."""
random.seed(0)
if len(adb.get_devices()) > 0:
suite = unittest.TestLoader().loadTestsFromName(__name__)
unittest.TextTestRunner(verbosity=3).run(suite)
else:
print('Test suite must be run with attached devices')
unittest.main(verbosity=3)
if __name__ == '__main__':

View file

@ -33,6 +33,7 @@
#include <deque>
#include <list>
#include <mutex>
#include <queue>
#include <thread>
#include <android-base/logging.h>
@ -50,7 +51,9 @@
#include "adb_utils.h"
#include "fdevent.h"
static void transport_unref(atransport *t);
static void register_transport(atransport* transport);
static void remove_transport(atransport* transport);
static void transport_unref(atransport* transport);
// TODO: unordered_map<TransportId, atransport*>
static auto& transport_list = *new std::list<atransport*>();
@ -77,6 +80,130 @@ class SCOPED_CAPABILITY ScopedAssumeLocked {
~ScopedAssumeLocked() RELEASE() {}
};
// Tracks and handles atransport*s that are attempting reconnection.
class ReconnectHandler {
public:
ReconnectHandler() = default;
~ReconnectHandler() = default;
// Starts the ReconnectHandler thread.
void Start();
// Requests the ReconnectHandler thread to stop.
void Stop();
// Adds the atransport* to the queue of reconnect attempts.
void TrackTransport(atransport* transport);
private:
// The main thread loop.
void Run();
// Tracks a reconnection attempt.
struct ReconnectAttempt {
atransport* transport;
std::chrono::system_clock::time_point deadline;
size_t attempts_left;
};
// Only retry for up to one minute.
static constexpr const std::chrono::seconds kDefaultTimeout = std::chrono::seconds(10);
static constexpr const size_t kMaxAttempts = 6;
// Protects all members.
std::mutex reconnect_mutex_;
bool running_ GUARDED_BY(reconnect_mutex_) = true;
std::thread handler_thread_;
std::condition_variable reconnect_cv_;
std::queue<ReconnectAttempt> reconnect_queue_ GUARDED_BY(reconnect_mutex_);
DISALLOW_COPY_AND_ASSIGN(ReconnectHandler);
};
void ReconnectHandler::Start() {
check_main_thread();
handler_thread_ = std::thread(&ReconnectHandler::Run, this);
}
void ReconnectHandler::Stop() {
check_main_thread();
{
std::lock_guard<std::mutex> lock(reconnect_mutex_);
running_ = false;
}
reconnect_cv_.notify_one();
handler_thread_.join();
// Drain the queue to free all resources.
std::lock_guard<std::mutex> lock(reconnect_mutex_);
while (!reconnect_queue_.empty()) {
ReconnectAttempt attempt = reconnect_queue_.front();
reconnect_queue_.pop();
remove_transport(attempt.transport);
}
}
void ReconnectHandler::TrackTransport(atransport* transport) {
check_main_thread();
{
std::lock_guard<std::mutex> lock(reconnect_mutex_);
if (!running_) return;
reconnect_queue_.emplace(ReconnectAttempt{
transport, std::chrono::system_clock::now() + ReconnectHandler::kDefaultTimeout,
ReconnectHandler::kMaxAttempts});
}
reconnect_cv_.notify_one();
}
void ReconnectHandler::Run() {
while (true) {
ReconnectAttempt attempt;
{
std::unique_lock<std::mutex> lock(reconnect_mutex_);
ScopedAssumeLocked assume_lock(reconnect_mutex_);
auto deadline = std::chrono::time_point<std::chrono::system_clock>::max();
if (!reconnect_queue_.empty()) deadline = reconnect_queue_.front().deadline;
reconnect_cv_.wait_until(lock, deadline, [&]() REQUIRES(reconnect_mutex_) {
return !running_ ||
(!reconnect_queue_.empty() && reconnect_queue_.front().deadline < deadline);
});
if (!running_) return;
attempt = reconnect_queue_.front();
reconnect_queue_.pop();
if (attempt.transport->kicked()) {
D("transport %s was kicked. giving up on it.", attempt.transport->serial);
remove_transport(attempt.transport);
continue;
}
}
D("attempting to reconnect %s", attempt.transport->serial);
if (!attempt.transport->Reconnect()) {
D("attempting to reconnect %s failed.", attempt.transport->serial);
if (attempt.attempts_left == 0) {
D("transport %s exceeded the number of retry attempts. giving up on it.",
attempt.transport->serial);
remove_transport(attempt.transport);
continue;
}
std::lock_guard<std::mutex> lock(reconnect_mutex_);
reconnect_queue_.emplace(ReconnectAttempt{
attempt.transport,
std::chrono::system_clock::now() + ReconnectHandler::kDefaultTimeout,
attempt.attempts_left - 1});
continue;
}
D("reconnection to %s succeeded.", attempt.transport->serial);
register_transport(attempt.transport);
}
}
static auto& reconnect_handler = *new ReconnectHandler();
} // namespace
TransportId NextTransportId() {
@ -477,8 +604,6 @@ static int transport_write_action(int fd, struct tmsg* m) {
return 0;
}
static void remove_transport(atransport*);
static void transport_registration_func(int _fd, unsigned ev, void*) {
tmsg m;
atransport* t;
@ -515,8 +640,9 @@ static void transport_registration_func(int _fd, unsigned ev, void*) {
/* don't create transport threads for inaccessible devices */
if (t->GetConnectionState() != kCsNoPerm) {
/* initial references are the two threads */
t->ref_count = 1;
// The connection gets a reference to the atransport. It will release it
// upon a read/write error.
t->ref_count++;
t->connection()->SetTransportName(t->serial_name());
t->connection()->SetReadCallback([t](Connection*, std::unique_ptr<apacket> p) {
if (!check_header(p.get(), t)) {
@ -547,13 +673,20 @@ static void transport_registration_func(int _fd, unsigned ev, void*) {
{
std::lock_guard<std::recursive_mutex> lock(transport_lock);
pending_list.remove(t);
transport_list.push_front(t);
auto it = std::find(pending_list.begin(), pending_list.end(), t);
if (it != pending_list.end()) {
pending_list.remove(t);
transport_list.push_front(t);
}
}
update_transports();
}
void init_reconnect_handler(void) {
reconnect_handler.Start();
}
void init_transport_registration(void) {
int s[2];
@ -572,6 +705,7 @@ void init_transport_registration(void) {
}
void kick_all_transports() {
reconnect_handler.Stop();
// To avoid only writing part of a packet to a transport after exit, kick all transports.
std::lock_guard<std::recursive_mutex> lock(transport_lock);
for (auto t : transport_list) {
@ -601,15 +735,21 @@ static void remove_transport(atransport* transport) {
}
static void transport_unref(atransport* t) {
check_main_thread();
CHECK(t != nullptr);
std::lock_guard<std::recursive_mutex> lock(transport_lock);
CHECK_GT(t->ref_count, 0u);
t->ref_count--;
if (t->ref_count == 0) {
D("transport: %s unref (kicking and closing)", t->serial);
t->connection()->Stop();
remove_transport(t);
if (t->IsTcpDevice() && !t->kicked()) {
D("transport: %s unref (attempting reconnection) %d", t->serial, t->kicked());
reconnect_handler.TrackTransport(t);
} else {
D("transport: %s unref (kicking and closing)", t->serial);
remove_transport(t);
}
} else {
D("transport: %s unref (count=%zu)", t->serial, t->ref_count);
}
@ -781,9 +921,8 @@ int atransport::Write(apacket* p) {
}
void atransport::Kick() {
if (!kicked_) {
D("kicking transport %s", this->serial);
kicked_ = true;
if (!kicked_.exchange(true)) {
D("kicking transport %p %s", this, this->serial);
this->connection()->Stop();
}
}
@ -941,6 +1080,10 @@ void atransport::SetConnectionEstablished(bool success) {
connection_waitable_->SetConnectionEstablished(success);
}
bool atransport::Reconnect() {
return reconnect_(this);
}
#if ADB_HOST
// We use newline as our delimiter, make sure to never output it.
@ -1021,8 +1164,9 @@ void close_usb_devices() {
}
#endif // ADB_HOST
int register_socket_transport(int s, const char* serial, int port, int local) {
atransport* t = new atransport();
int register_socket_transport(int s, const char* serial, int port, int local,
atransport::ReconnectCallback reconnect) {
atransport* t = new atransport(std::move(reconnect), kCsOffline);
if (!serial) {
char buf[32];
@ -1103,7 +1247,7 @@ void kick_all_tcp_devices() {
void register_usb_transport(usb_handle* usb, const char* serial, const char* devpath,
unsigned writeable) {
atransport* t = new atransport((writeable ? kCsConnecting : kCsNoPerm));
atransport* t = new atransport(writeable ? kCsOffline : kCsNoPerm);
D("transport: %p init'ing for usb_handle %p (sn='%s')", t, usb, serial ? serial : "");
init_usb_transport(t, usb);

View file

@ -198,20 +198,27 @@ class atransport {
// class in one go is a very large change. Given how bad our testing is,
// it's better to do this piece by piece.
atransport(ConnectionState state = kCsConnecting)
using ReconnectCallback = std::function<bool(atransport*)>;
atransport(ReconnectCallback reconnect, ConnectionState state)
: id(NextTransportId()),
kicked_(false),
connection_state_(state),
connection_waitable_(std::make_shared<ConnectionWaitable>()),
connection_(nullptr) {
connection_(nullptr),
reconnect_(std::move(reconnect)) {
// Initialize protocol to min version for compatibility with older versions.
// Version will be updated post-connect.
protocol_version = A_VERSION_MIN;
max_payload = MAX_PAYLOAD;
}
atransport(ConnectionState state = kCsOffline)
: atransport([](atransport*) { return false; }, state) {}
virtual ~atransport();
int Write(apacket* p);
void Kick();
bool kicked() const { return kicked_; }
// ConnectionState can be read by all threads, but can only be written in the main thread.
ConnectionState GetConnectionState() const;
@ -286,8 +293,12 @@ class atransport {
// Gets a shared reference to the ConnectionWaitable.
std::shared_ptr<ConnectionWaitable> connection_waitable() { return connection_waitable_; }
// Attempts to reconnect with the underlying Connection. Returns true if the
// reconnection attempt succeeded.
bool Reconnect();
private:
bool kicked_ = false;
std::atomic<bool> kicked_;
// A set of features transmitted in the banner with the initial connection.
// This is stored in the banner as 'features=feature0,feature1,etc'.
@ -310,6 +321,9 @@ class atransport {
// The underlying connection object.
std::shared_ptr<Connection> connection_ GUARDED_BY(mutex_);
// A callback that will be invoked when the atransport needs to reconnect.
ReconnectCallback reconnect_;
std::mutex mutex_;
DISALLOW_COPY_AND_ASSIGN(atransport);
@ -333,6 +347,7 @@ void update_transports(void);
// Stops iteration and returns false if fn returns false, otherwise returns true.
bool iterate_transports(std::function<bool(const atransport*)> fn);
void init_reconnect_handler(void);
void init_transport_registration(void);
void init_mdns_transport_discovery(void);
std::string list_transports(bool long_listing);
@ -347,7 +362,8 @@ void register_usb_transport(usb_handle* h, const char* serial,
void connect_device(const std::string& address, std::string* response);
/* cause new transports to be init'd and added to the list */
int register_socket_transport(int s, const char* serial, int port, int local);
int register_socket_transport(int s, const char* serial, int port, int local,
atransport::ReconnectCallback reconnect);
// This should only be used for transports with connection_state == kCsNoPerm.
void unregister_usb_transport(usb_handle* usb);

View file

@ -68,28 +68,24 @@ bool local_connect(int port) {
return local_connect_arbitrary_ports(port - 1, port, &dummy) == 0;
}
void connect_device(const std::string& address, std::string* response) {
if (address.empty()) {
*response = "empty address";
return;
}
std::tuple<unique_fd, int, std::string> tcp_connect(const std::string& address,
std::string* response) {
std::string serial;
std::string host;
int port = DEFAULT_ADB_LOCAL_TRANSPORT_PORT;
if (!android::base::ParseNetAddress(address, &host, &port, &serial, response)) {
return;
return std::make_tuple(unique_fd(), port, serial);
}
std::string error;
int fd = network_connect(host.c_str(), port, SOCK_STREAM, 10, &error);
unique_fd fd(network_connect(host.c_str(), port, SOCK_STREAM, 10, &error));
if (fd == -1) {
*response = android::base::StringPrintf("unable to connect to %s: %s",
serial.c_str(), error.c_str());
return;
return std::make_tuple(std::move(fd), port, serial);
}
D("client: connected %s remote on fd %d", serial.c_str(), fd);
D("client: connected %s remote on fd %d", serial.c_str(), fd.get());
close_on_exec(fd);
disable_tcp_nagle(fd);
@ -98,7 +94,38 @@ void connect_device(const std::string& address, std::string* response) {
D("warning: failed to configure TCP keepalives (%s)", strerror(errno));
}
int ret = register_socket_transport(fd, serial.c_str(), port, 0);
return std::make_tuple(std::move(fd), port, serial);
}
void connect_device(const std::string& address, std::string* response) {
if (address.empty()) {
*response = "empty address";
return;
}
unique_fd fd;
int port;
std::string serial;
std::tie(fd, port, serial) = tcp_connect(address, response);
auto reconnect = [address](atransport* t) {
std::string response;
unique_fd fd;
int port;
std::string serial;
std::tie(fd, port, serial) = tcp_connect(address, &response);
if (fd == -1) {
D("reconnect failed: %s", response.c_str());
return false;
}
// This invokes the part of register_socket_transport() that needs to be
// invoked if the atransport* has already been setup. This eventually
// calls atransport->SetConnection() with a newly created Connection*
// that will in turn send the CNXN packet.
return init_socket_transport(t, fd.release(), port, 0) >= 0;
};
int ret = register_socket_transport(fd.release(), serial.c_str(), port, 0, std::move(reconnect));
if (ret < 0) {
adb_close(fd);
if (ret == -EALREADY) {
@ -135,7 +162,8 @@ int local_connect_arbitrary_ports(int console_port, int adb_port, std::string* e
close_on_exec(fd);
disable_tcp_nagle(fd);
std::string serial = getEmulatorSerialString(console_port);
if (register_socket_transport(fd, serial.c_str(), adb_port, 1) == 0) {
if (register_socket_transport(fd, serial.c_str(), adb_port, 1,
[](atransport*) { return false; }) == 0) {
return 0;
}
adb_close(fd);
@ -239,7 +267,8 @@ static void server_socket_thread(int port) {
close_on_exec(fd);
disable_tcp_nagle(fd);
std::string serial = android::base::StringPrintf("host-%d", fd);
if (register_socket_transport(fd, serial.c_str(), port, 1) != 0) {
if (register_socket_transport(fd, serial.c_str(), port, 1,
[](atransport*) { return false; }) != 0) {
adb_close(fd);
}
}
@ -338,7 +367,8 @@ static void qemu_socket_thread(int port) {
/* Host is connected. Register the transport, and start the
* exchange. */
std::string serial = android::base::StringPrintf("host-%d", fd);
if (register_socket_transport(fd, serial.c_str(), port, 1) != 0 ||
if (register_socket_transport(fd, serial.c_str(), port, 1,
[](atransport*) { return false; }) != 0 ||
!WriteFdExactly(fd, _start_req, strlen(_start_req))) {
adb_close(fd);
}