diff --git a/fs_mgr/libsnapshot/snapuserd/Android.bp b/fs_mgr/libsnapshot/snapuserd/Android.bp index 837f33a77..c9b05124b 100644 --- a/fs_mgr/libsnapshot/snapuserd/Android.bp +++ b/fs_mgr/libsnapshot/snapuserd/Android.bp @@ -56,7 +56,7 @@ cc_defaults { "fs_mgr_defaults", ], srcs: [ - "snapuserd_server.cpp", + "dm-snapshot-merge/snapuserd_server.cpp", "dm-snapshot-merge/snapuserd.cpp", "dm-snapshot-merge/snapuserd_worker.cpp", "dm-snapshot-merge/snapuserd_readahead.cpp", @@ -67,6 +67,7 @@ cc_defaults { "user-space-merge/snapuserd_merge.cpp", "user-space-merge/snapuserd_readahead.cpp", "user-space-merge/snapuserd_transitions.cpp", + "user-space-merge/snapuserd_server.cpp", ], cflags: [ diff --git a/fs_mgr/libsnapshot/snapuserd/snapuserd_server.cpp b/fs_mgr/libsnapshot/snapuserd/dm-snapshot-merge/snapuserd_server.cpp similarity index 99% rename from fs_mgr/libsnapshot/snapuserd/snapuserd_server.cpp rename to fs_mgr/libsnapshot/snapuserd/dm-snapshot-merge/snapuserd_server.cpp index 91b41908b..9ddc96389 100644 --- a/fs_mgr/libsnapshot/snapuserd/snapuserd_server.cpp +++ b/fs_mgr/libsnapshot/snapuserd/dm-snapshot-merge/snapuserd_server.cpp @@ -31,6 +31,7 @@ #include #include #include + #include "snapuserd_server.h" #define _REALLY_INCLUDE_SYS__SYSTEM_PROPERTIES_H_ diff --git a/fs_mgr/libsnapshot/snapuserd/snapuserd_server.h b/fs_mgr/libsnapshot/snapuserd/dm-snapshot-merge/snapuserd_server.h similarity index 99% rename from fs_mgr/libsnapshot/snapuserd/snapuserd_server.h rename to fs_mgr/libsnapshot/snapuserd/dm-snapshot-merge/snapuserd_server.h index 14e5de6e7..3b6ff1583 100644 --- a/fs_mgr/libsnapshot/snapuserd/snapuserd_server.h +++ b/fs_mgr/libsnapshot/snapuserd/dm-snapshot-merge/snapuserd_server.h @@ -28,7 +28,7 @@ #include #include -#include "dm-snapshot-merge/snapuserd.h" +#include "snapuserd.h" namespace android { namespace snapshot { diff --git a/fs_mgr/libsnapshot/snapuserd/include/snapuserd/snapuserd_client.h b/fs_mgr/libsnapshot/snapuserd/include/snapuserd/snapuserd_client.h index aeecf410e..4fa4330b7 100644 --- a/fs_mgr/libsnapshot/snapuserd/include/snapuserd/snapuserd_client.h +++ b/fs_mgr/libsnapshot/snapuserd/include/snapuserd/snapuserd_client.h @@ -79,6 +79,12 @@ class SnapuserdClient { // Returns true if the snapuserd instance supports bridging a socket to second-stage init. bool SupportsSecondStageSocketHandoff(); + + // Returns true if the merge is started(or resumed from crash). + bool InitiateMerge(const std::string& misc_name); + + // Returns Merge completion percentage + double GetMergePercent(); }; } // namespace snapshot diff --git a/fs_mgr/libsnapshot/snapuserd/snapuserd_client.cpp b/fs_mgr/libsnapshot/snapuserd/snapuserd_client.cpp index 1ea05a3b0..87c0ce493 100644 --- a/fs_mgr/libsnapshot/snapuserd/snapuserd_client.cpp +++ b/fs_mgr/libsnapshot/snapuserd/snapuserd_client.cpp @@ -231,5 +231,26 @@ bool SnapuserdClient::DetachSnapuserd() { return true; } +bool SnapuserdClient::InitiateMerge(const std::string& misc_name) { + std::string msg = "initiate_merge," + misc_name; + if (!Sendmsg(msg)) { + LOG(ERROR) << "Failed to send message " << msg << " to snapuserd"; + return false; + } + std::string response = Receivemsg(); + return response == "success"; +} + +double SnapuserdClient::GetMergePercent() { + std::string msg = "merge_percent"; + if (!Sendmsg(msg)) { + LOG(ERROR) << "Failed to send message " << msg << " to snapuserd"; + return false; + } + std::string response = Receivemsg(); + + return std::stod(response); +} + } // namespace snapshot } // namespace android diff --git a/fs_mgr/libsnapshot/snapuserd/snapuserd_daemon.cpp b/fs_mgr/libsnapshot/snapuserd/snapuserd_daemon.cpp index e05822ed7..912884fd3 100644 --- a/fs_mgr/libsnapshot/snapuserd/snapuserd_daemon.cpp +++ b/fs_mgr/libsnapshot/snapuserd/snapuserd_daemon.cpp @@ -21,8 +21,6 @@ #include #include -#include "snapuserd_server.h" - DEFINE_string(socket, android::snapshot::kSnapuserdSocket, "Named socket or socket path."); DEFINE_bool(no_socket, false, "If true, no socket is used. Each additional argument is an INIT message."); diff --git a/fs_mgr/libsnapshot/snapuserd/snapuserd_daemon.h b/fs_mgr/libsnapshot/snapuserd/snapuserd_daemon.h index b660ba2ef..fbf57d943 100644 --- a/fs_mgr/libsnapshot/snapuserd/snapuserd_daemon.h +++ b/fs_mgr/libsnapshot/snapuserd/snapuserd_daemon.h @@ -19,7 +19,7 @@ #include #include -#include "snapuserd_server.h" +#include "dm-snapshot-merge/snapuserd_server.h" namespace android { namespace snapshot { diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp index 5730f488f..4f32f696d 100644 --- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp +++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp @@ -59,6 +59,15 @@ std::unique_ptr SnapshotHandler::CloneReaderForWorker() { return reader_->CloneCowReader(); } +void SnapshotHandler::UpdateMergeCompletionPercentage() { + struct CowHeader* ch = reinterpret_cast(mapped_addr_); + merge_completion_percentage_ = (ch->num_merge_ops * 100.0) / reader_->get_num_total_data_ops(); + + SNAP_LOG(DEBUG) << "Merge-complete %: " << merge_completion_percentage_ + << " num_merge_ops: " << ch->num_merge_ops + << " total-ops: " << reader_->get_num_total_data_ops(); +} + bool SnapshotHandler::CommitMerge(int num_merge_ops) { struct CowHeader* ch = reinterpret_cast(mapped_addr_); ch->num_merge_ops += num_merge_ops; @@ -95,6 +104,12 @@ bool SnapshotHandler::CommitMerge(int num_merge_ops) { } } + // Update the merge completion - this is used by update engine + // to track the completion. No need to take a lock. It is ok + // even if there is a miss on reading a latest updated value. + // Subsequent polling will eventually converge to completion. + UpdateMergeCompletionPercentage(); + return true; } @@ -152,6 +167,8 @@ bool SnapshotHandler::ReadMetadata() { return false; } + UpdateMergeCompletionPercentage(); + // Initialize the iterator for reading metadata std::unique_ptr cowop_iter = reader_->GetMergeOpIter(); diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h index d56195242..5e9c7bf14 100644 --- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h +++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h @@ -280,6 +280,7 @@ class SnapshotHandler : public std::enable_shared_from_this { int GetTotalBlocksToMerge() { return total_ra_blocks_merged_; } void SetSocketPresent(bool socket) { is_socket_present_ = socket; } bool MergeInitiated() { return merge_initiated_; } + double GetMergePercentage() { return merge_completion_percentage_; } // Merge Block State Transitions void SetMergeCompleted(size_t block_index); @@ -295,6 +296,7 @@ class SnapshotHandler : public std::enable_shared_from_this { chunk_t SectorToChunk(sector_t sector) { return sector >> CHUNK_SHIFT; } bool IsBlockAligned(int read_size) { return ((read_size & (BLOCK_SZ - 1)) == 0); } struct BufferState* GetBufferState(); + void UpdateMergeCompletionPercentage(); void ReadBlocks(const std::string partition_name, const std::string& dm_block_device); void ReadBlocksToCache(const std::string& dm_block_device, const std::string& partition_name, @@ -342,6 +344,7 @@ class SnapshotHandler : public std::enable_shared_from_this { std::vector> merge_blk_state_; std::unique_ptr merge_thread_; + double merge_completion_percentage_; bool merge_initiated_ = false; bool attached_ = false; diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.cpp new file mode 100644 index 000000000..a9b1d17ab --- /dev/null +++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.cpp @@ -0,0 +1,660 @@ +/* + * Copyright (C) 2020 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include "snapuserd_server.h" + +#define _REALLY_INCLUDE_SYS__SYSTEM_PROPERTIES_H_ +#include + +namespace android { +namespace snapshot { + +using namespace std::string_literals; + +using android::base::borrowed_fd; +using android::base::unique_fd; + +DaemonOps SnapuserServer::Resolveop(std::string& input) { + if (input == "init") return DaemonOps::INIT; + if (input == "start") return DaemonOps::START; + if (input == "stop") return DaemonOps::STOP; + if (input == "query") return DaemonOps::QUERY; + if (input == "delete") return DaemonOps::DELETE; + if (input == "detach") return DaemonOps::DETACH; + if (input == "supports") return DaemonOps::SUPPORTS; + if (input == "initiate_merge") return DaemonOps::INITIATE; + if (input == "merge_percent") return DaemonOps::PERCENTAGE; + + return DaemonOps::INVALID; +} + +SnapuserServer::~SnapuserServer() { + // Close any client sockets that were added via AcceptClient(). + for (size_t i = 1; i < watched_fds_.size(); i++) { + close(watched_fds_[i].fd); + } +} + +std::string SnapuserServer::GetDaemonStatus() { + std::string msg = ""; + + if (IsTerminating()) + msg = "passive"; + else + msg = "active"; + + return msg; +} + +void SnapuserServer::Parsemsg(std::string const& msg, const char delim, + std::vector& out) { + std::stringstream ss(msg); + std::string s; + + while (std::getline(ss, s, delim)) { + out.push_back(s); + } +} + +void SnapuserServer::ShutdownThreads() { + terminating_ = true; + JoinAllThreads(); +} + +DmUserHandler::DmUserHandler(std::shared_ptr snapuserd) + : snapuserd_(snapuserd), misc_name_(snapuserd_->GetMiscName()) {} + +bool SnapuserServer::Sendmsg(android::base::borrowed_fd fd, const std::string& msg) { + ssize_t ret = TEMP_FAILURE_RETRY(send(fd.get(), msg.data(), msg.size(), MSG_NOSIGNAL)); + if (ret < 0) { + PLOG(ERROR) << "Snapuserd:server: send() failed"; + return false; + } + + if (ret < msg.size()) { + LOG(ERROR) << "Partial send; expected " << msg.size() << " bytes, sent " << ret; + return false; + } + return true; +} + +bool SnapuserServer::Recv(android::base::borrowed_fd fd, std::string* data) { + char msg[MAX_PACKET_SIZE]; + ssize_t rv = TEMP_FAILURE_RETRY(recv(fd.get(), msg, sizeof(msg), 0)); + if (rv < 0) { + PLOG(ERROR) << "recv failed"; + return false; + } + *data = std::string(msg, rv); + return true; +} + +bool SnapuserServer::Receivemsg(android::base::borrowed_fd fd, const std::string& str) { + const char delim = ','; + + std::vector out; + Parsemsg(str, delim, out); + DaemonOps op = Resolveop(out[0]); + + switch (op) { + case DaemonOps::INIT: { + // Message format: + // init,,,, + // + // Reads the metadata and send the number of sectors + if (out.size() != 5) { + LOG(ERROR) << "Malformed init message, " << out.size() << " parts"; + return Sendmsg(fd, "fail"); + } + + auto handler = AddHandler(out[1], out[2], out[3], out[4]); + if (!handler) { + return Sendmsg(fd, "fail"); + } + + auto retval = "success," + std::to_string(handler->snapuserd()->GetNumSectors()); + return Sendmsg(fd, retval); + } + case DaemonOps::START: { + // Message format: + // start, + // + // Start the new thread which binds to dm-user misc device + if (out.size() != 2) { + LOG(ERROR) << "Malformed start message, " << out.size() << " parts"; + return Sendmsg(fd, "fail"); + } + + std::lock_guard lock(lock_); + auto iter = FindHandler(&lock, out[1]); + if (iter == dm_users_.end()) { + LOG(ERROR) << "Could not find handler: " << out[1]; + return Sendmsg(fd, "fail"); + } + if (!(*iter)->snapuserd() || (*iter)->snapuserd()->IsAttached()) { + LOG(ERROR) << "Tried to re-attach control device: " << out[1]; + return Sendmsg(fd, "fail"); + } + if (!StartHandler(*iter)) { + return Sendmsg(fd, "fail"); + } + return Sendmsg(fd, "success"); + } + case DaemonOps::STOP: { + // Message format: stop + // + // Stop all the threads gracefully and then shutdown the + // main thread + SetTerminating(); + ShutdownThreads(); + return true; + } + case DaemonOps::QUERY: { + // Message format: query + // + // As part of transition, Second stage daemon will be + // created before terminating the first stage daemon. Hence, + // for a brief period client may have to distiguish between + // first stage daemon and second stage daemon. + // + // Second stage daemon is marked as active and hence will + // be ready to receive control message. + return Sendmsg(fd, GetDaemonStatus()); + } + case DaemonOps::DELETE: { + // Message format: + // delete, + if (out.size() != 2) { + LOG(ERROR) << "Malformed delete message, " << out.size() << " parts"; + return Sendmsg(fd, "fail"); + } + { + std::lock_guard lock(lock_); + auto iter = FindHandler(&lock, out[1]); + if (iter == dm_users_.end()) { + // After merge is completed, we swap dm-user table with + // the underlying dm-linear base device. Hence, worker + // threads would have terminted and was removed from + // the list. + LOG(DEBUG) << "Could not find handler: " << out[1]; + return Sendmsg(fd, "success"); + } + + if (!(*iter)->ThreadTerminated()) { + (*iter)->snapuserd()->NotifyIOTerminated(); + } + } + if (!RemoveAndJoinHandler(out[1])) { + return Sendmsg(fd, "fail"); + } + return Sendmsg(fd, "success"); + } + case DaemonOps::DETACH: { + std::lock_guard lock(lock_); + TerminateMergeThreads(&lock); + terminating_ = true; + return true; + } + case DaemonOps::SUPPORTS: { + if (out.size() != 2) { + LOG(ERROR) << "Malformed supports message, " << out.size() << " parts"; + return Sendmsg(fd, "fail"); + } + if (out[1] == "second_stage_socket_handoff") { + return Sendmsg(fd, "success"); + } + return Sendmsg(fd, "fail"); + } + case DaemonOps::INITIATE: { + if (out.size() != 2) { + LOG(ERROR) << "Malformed initiate-merge message, " << out.size() << " parts"; + return Sendmsg(fd, "fail"); + } + if (out[0] == "initiate_merge") { + std::lock_guard lock(lock_); + auto iter = FindHandler(&lock, out[1]); + if (iter == dm_users_.end()) { + LOG(ERROR) << "Could not find handler: " << out[1]; + return Sendmsg(fd, "fail"); + } + + if (!StartMerge(*iter)) { + return Sendmsg(fd, "fail"); + } + + return Sendmsg(fd, "success"); + } + return Sendmsg(fd, "fail"); + } + case DaemonOps::PERCENTAGE: { + std::lock_guard lock(lock_); + double percentage = GetMergePercentage(&lock); + + return Sendmsg(fd, std::to_string(percentage)); + } + default: { + LOG(ERROR) << "Received unknown message type from client"; + Sendmsg(fd, "fail"); + return false; + } + } +} + +void SnapuserServer::RunThread(std::shared_ptr handler) { + LOG(INFO) << "Entering thread for handler: " << handler->misc_name(); + + handler->snapuserd()->SetSocketPresent(is_socket_present_); + if (!handler->snapuserd()->Start()) { + LOG(ERROR) << " Failed to launch all worker threads"; + } + + handler->snapuserd()->CloseFds(); + handler->snapuserd()->CheckMergeCompletionStatus(); + handler->snapuserd()->UnmapBufferRegion(); + + auto misc_name = handler->misc_name(); + LOG(INFO) << "Handler thread about to exit: " << misc_name; + + { + std::lock_guard lock(lock_); + num_partitions_merge_complete_ += 1; + handler->SetThreadTerminated(); + auto iter = FindHandler(&lock, handler->misc_name()); + if (iter == dm_users_.end()) { + // RemoveAndJoinHandler() already removed us from the list, and is + // now waiting on a join(), so just return. Additionally, release + // all the resources held by snapuserd object which are shared + // by worker threads. This should be done when the last reference + // of "handler" is released; but we will explicitly release here + // to make sure snapuserd object is freed as it is the biggest + // consumer of memory in the daemon. + handler->FreeResources(); + LOG(INFO) << "Exiting handler thread to allow for join: " << misc_name; + return; + } + + LOG(INFO) << "Exiting handler thread and freeing resources: " << misc_name; + + if (handler->snapuserd()->IsAttached()) { + handler->thread().detach(); + } + + // Important: free resources within the lock. This ensures that if + // WaitForDelete() is called, the handler is either in the list, or + // it's not and its resources are guaranteed to be freed. + handler->FreeResources(); + dm_users_.erase(iter); + } +} + +bool SnapuserServer::Start(const std::string& socketname) { + bool start_listening = true; + + sockfd_.reset(android_get_control_socket(socketname.c_str())); + if (sockfd_ < 0) { + sockfd_.reset(socket_local_server(socketname.c_str(), ANDROID_SOCKET_NAMESPACE_RESERVED, + SOCK_STREAM)); + if (sockfd_ < 0) { + PLOG(ERROR) << "Failed to create server socket " << socketname; + return false; + } + start_listening = false; + } + return StartWithSocket(start_listening); +} + +bool SnapuserServer::StartWithSocket(bool start_listening) { + if (start_listening && listen(sockfd_.get(), 4) < 0) { + PLOG(ERROR) << "listen socket failed"; + return false; + } + + AddWatchedFd(sockfd_, POLLIN); + is_socket_present_ = true; + + // If started in first-stage init, the property service won't be online. + if (access("/dev/socket/property_service", F_OK) == 0) { + if (!android::base::SetProperty("snapuserd.ready", "true")) { + LOG(ERROR) << "Failed to set snapuserd.ready property"; + return false; + } + } + + LOG(DEBUG) << "Snapuserd server now accepting connections"; + return true; +} + +bool SnapuserServer::Run() { + LOG(INFO) << "Now listening on snapuserd socket"; + + while (!IsTerminating()) { + int rv = TEMP_FAILURE_RETRY(poll(watched_fds_.data(), watched_fds_.size(), -1)); + if (rv < 0) { + PLOG(ERROR) << "poll failed"; + return false; + } + if (!rv) { + continue; + } + + if (watched_fds_[0].revents) { + AcceptClient(); + } + + auto iter = watched_fds_.begin() + 1; + while (iter != watched_fds_.end()) { + if (iter->revents && !HandleClient(iter->fd, iter->revents)) { + close(iter->fd); + iter = watched_fds_.erase(iter); + } else { + iter++; + } + } + } + + JoinAllThreads(); + return true; +} + +void SnapuserServer::JoinAllThreads() { + // Acquire the thread list within the lock. + std::vector> dm_users; + { + std::lock_guard guard(lock_); + dm_users = std::move(dm_users_); + } + + for (auto& client : dm_users) { + auto& th = client->thread(); + + if (th.joinable()) th.join(); + } +} + +void SnapuserServer::AddWatchedFd(android::base::borrowed_fd fd, int events) { + struct pollfd p = {}; + p.fd = fd.get(); + p.events = events; + watched_fds_.emplace_back(std::move(p)); +} + +void SnapuserServer::AcceptClient() { + int fd = TEMP_FAILURE_RETRY(accept4(sockfd_.get(), nullptr, nullptr, SOCK_CLOEXEC)); + if (fd < 0) { + PLOG(ERROR) << "accept4 failed"; + return; + } + + AddWatchedFd(fd, POLLIN); +} + +bool SnapuserServer::HandleClient(android::base::borrowed_fd fd, int revents) { + if (revents & POLLHUP) { + LOG(DEBUG) << "Snapuserd client disconnected"; + return false; + } + + std::string str; + if (!Recv(fd, &str)) { + return false; + } + if (!Receivemsg(fd, str)) { + LOG(ERROR) << "Encountered error handling client message, revents: " << revents; + return false; + } + return true; +} + +void SnapuserServer::Interrupt() { + // Force close the socket so poll() fails. + sockfd_ = {}; + SetTerminating(); +} + +std::shared_ptr SnapuserServer::AddHandler(const std::string& misc_name, + const std::string& cow_device_path, + const std::string& backing_device, + const std::string& base_path_merge) { + auto snapuserd = std::make_shared(misc_name, cow_device_path, backing_device, + base_path_merge); + if (!snapuserd->InitCowDevice()) { + LOG(ERROR) << "Failed to initialize Snapuserd"; + return nullptr; + } + + if (!snapuserd->InitializeWorkers()) { + LOG(ERROR) << "Failed to initialize workers"; + return nullptr; + } + + auto handler = std::make_shared(snapuserd); + { + std::lock_guard lock(lock_); + if (FindHandler(&lock, misc_name) != dm_users_.end()) { + LOG(ERROR) << "Handler already exists: " << misc_name; + return nullptr; + } + dm_users_.push_back(handler); + } + return handler; +} + +bool SnapuserServer::StartHandler(const std::shared_ptr& handler) { + if (handler->snapuserd()->IsAttached()) { + LOG(ERROR) << "Handler already attached"; + return false; + } + + handler->snapuserd()->AttachControlDevice(); + + handler->thread() = std::thread(std::bind(&SnapuserServer::RunThread, this, handler)); + return true; +} + +bool SnapuserServer::StartMerge(const std::shared_ptr& handler) { + if (!handler->snapuserd()->IsAttached()) { + LOG(ERROR) << "Handler not attached to dm-user - Merge thread cannot be started"; + return false; + } + + handler->snapuserd()->InitiateMerge(); + return true; +} + +auto SnapuserServer::FindHandler(std::lock_guard* proof_of_lock, + const std::string& misc_name) -> HandlerList::iterator { + CHECK(proof_of_lock); + + for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) { + if ((*iter)->misc_name() == misc_name) { + return iter; + } + } + return dm_users_.end(); +} + +void SnapuserServer::TerminateMergeThreads(std::lock_guard* proof_of_lock) { + CHECK(proof_of_lock); + + for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) { + if (!(*iter)->ThreadTerminated()) { + (*iter)->snapuserd()->NotifyIOTerminated(); + } + } +} + +double SnapuserServer::GetMergePercentage(std::lock_guard* proof_of_lock) { + CHECK(proof_of_lock); + double percentage = 0.0; + int n = 0; + + for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) { + auto& th = (*iter)->thread(); + if (th.joinable()) { + // Merge percentage by individual partitions wherein merge is still + // in-progress + percentage += (*iter)->snapuserd()->GetMergePercentage(); + n += 1; + } + } + + // Calculate final merge including those partitions where merge was already + // completed - num_partitions_merge_complete_ will track them when each + // thread exists in RunThread. + int total_partitions = n + num_partitions_merge_complete_; + + if (total_partitions) { + percentage = ((num_partitions_merge_complete_ * 100.0) + percentage) / total_partitions; + } + + LOG(DEBUG) << "Merge %: " << percentage + << " num_partitions_merge_complete_: " << num_partitions_merge_complete_ + << " total_partitions: " << total_partitions << " n: " << n; + return percentage; +} + +bool SnapuserServer::RemoveAndJoinHandler(const std::string& misc_name) { + std::shared_ptr handler; + { + std::lock_guard lock(lock_); + + auto iter = FindHandler(&lock, misc_name); + if (iter == dm_users_.end()) { + // Client already deleted. + return true; + } + handler = std::move(*iter); + dm_users_.erase(iter); + } + + auto& th = handler->thread(); + if (th.joinable()) { + th.join(); + } + return true; +} + +bool SnapuserServer::WaitForSocket() { + auto scope_guard = android::base::make_scope_guard([this]() -> void { JoinAllThreads(); }); + + auto socket_path = ANDROID_SOCKET_DIR "/"s + kSnapuserdSocketProxy; + + if (!android::fs_mgr::WaitForFile(socket_path, std::chrono::milliseconds::max())) { + LOG(ERROR) + << "Failed to wait for proxy socket, second-stage snapuserd will fail to connect"; + return false; + } + + // We must re-initialize property service access, since we launched before + // second-stage init. + __system_properties_init(); + + if (!android::base::WaitForProperty("snapuserd.proxy_ready", "true")) { + LOG(ERROR) + << "Failed to wait for proxy property, second-stage snapuserd will fail to connect"; + return false; + } + + unique_fd fd(socket_local_client(kSnapuserdSocketProxy, ANDROID_SOCKET_NAMESPACE_RESERVED, + SOCK_SEQPACKET)); + if (fd < 0) { + PLOG(ERROR) << "Failed to connect to socket proxy"; + return false; + } + + char code[1]; + std::vector fds; + ssize_t rv = android::base::ReceiveFileDescriptorVector(fd, code, sizeof(code), 1, &fds); + if (rv < 0) { + PLOG(ERROR) << "Failed to receive server socket over proxy"; + return false; + } + if (fds.empty()) { + LOG(ERROR) << "Expected at least one file descriptor from proxy"; + return false; + } + + // We don't care if the ACK is received. + code[0] = 'a'; + if (TEMP_FAILURE_RETRY(send(fd, code, sizeof(code), MSG_NOSIGNAL) < 0)) { + PLOG(ERROR) << "Failed to send ACK to proxy"; + return false; + } + + sockfd_ = std::move(fds[0]); + if (!StartWithSocket(true)) { + return false; + } + return Run(); +} + +bool SnapuserServer::RunForSocketHandoff() { + unique_fd proxy_fd(android_get_control_socket(kSnapuserdSocketProxy)); + if (proxy_fd < 0) { + PLOG(FATAL) << "Proxy could not get android control socket " << kSnapuserdSocketProxy; + } + borrowed_fd server_fd(android_get_control_socket(kSnapuserdSocket)); + if (server_fd < 0) { + PLOG(FATAL) << "Proxy could not get android control socket " << kSnapuserdSocket; + } + + if (listen(proxy_fd.get(), 4) < 0) { + PLOG(FATAL) << "Proxy listen socket failed"; + } + + if (!android::base::SetProperty("snapuserd.proxy_ready", "true")) { + LOG(FATAL) << "Proxy failed to set ready property"; + } + + unique_fd client_fd( + TEMP_FAILURE_RETRY(accept4(proxy_fd.get(), nullptr, nullptr, SOCK_CLOEXEC))); + if (client_fd < 0) { + PLOG(FATAL) << "Proxy accept failed"; + } + + char code[1] = {'a'}; + std::vector fds = {server_fd.get()}; + ssize_t rv = android::base::SendFileDescriptorVector(client_fd, code, sizeof(code), fds); + if (rv < 0) { + PLOG(FATAL) << "Proxy could not send file descriptor to snapuserd"; + } + // Wait for an ACK - results don't matter, we just don't want to risk closing + // the proxy socket too early. + if (recv(client_fd, code, sizeof(code), 0) < 0) { + PLOG(FATAL) << "Proxy could not receive terminating code from snapuserd"; + } + return true; +} + +} // namespace snapshot +} // namespace android diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.h b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.h new file mode 100644 index 000000000..6fc3a9d97 --- /dev/null +++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.h @@ -0,0 +1,140 @@ +// Copyright (C) 2020 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include "snapuserd_core.h" + +namespace android { +namespace snapshot { + +static constexpr uint32_t MAX_PACKET_SIZE = 512; + +enum class DaemonOps { + INIT, + START, + QUERY, + STOP, + DELETE, + DETACH, + SUPPORTS, + INITIATE, + PERCENTAGE, + INVALID, +}; + +class DmUserHandler { + public: + explicit DmUserHandler(std::shared_ptr snapuserd); + + void FreeResources() { + // Each worker thread holds a reference to snapuserd. + // Clear them so that all the resources + // held by snapuserd is released + if (snapuserd_) { + snapuserd_->FreeResources(); + snapuserd_ = nullptr; + } + } + const std::shared_ptr& snapuserd() const { return snapuserd_; } + std::thread& thread() { return thread_; } + + const std::string& misc_name() const { return misc_name_; } + bool ThreadTerminated() { return thread_terminated_; } + void SetThreadTerminated() { thread_terminated_ = true; } + + private: + std::thread thread_; + std::shared_ptr snapuserd_; + std::string misc_name_; + bool thread_terminated_ = false; +}; + +class SnapuserServer { + private: + android::base::unique_fd sockfd_; + bool terminating_; + volatile bool received_socket_signal_ = false; + std::vector watched_fds_; + bool is_socket_present_ = false; + int num_partitions_merge_complete_ = 0; + + std::mutex lock_; + + using HandlerList = std::vector>; + HandlerList dm_users_; + + void AddWatchedFd(android::base::borrowed_fd fd, int events); + void AcceptClient(); + bool HandleClient(android::base::borrowed_fd fd, int revents); + bool Recv(android::base::borrowed_fd fd, std::string* data); + bool Sendmsg(android::base::borrowed_fd fd, const std::string& msg); + bool Receivemsg(android::base::borrowed_fd fd, const std::string& str); + + void ShutdownThreads(); + bool RemoveAndJoinHandler(const std::string& control_device); + DaemonOps Resolveop(std::string& input); + std::string GetDaemonStatus(); + void Parsemsg(std::string const& msg, const char delim, std::vector& out); + + bool IsTerminating() { return terminating_; } + + void RunThread(std::shared_ptr handler); + void JoinAllThreads(); + bool StartWithSocket(bool start_listening); + + // Find a DmUserHandler within a lock. + HandlerList::iterator FindHandler(std::lock_guard* proof_of_lock, + const std::string& misc_name); + + double GetMergePercentage(std::lock_guard* proof_of_lock); + void TerminateMergeThreads(std::lock_guard* proof_of_lock); + + public: + SnapuserServer() { terminating_ = false; } + ~SnapuserServer(); + + bool Start(const std::string& socketname); + bool Run(); + void Interrupt(); + bool RunForSocketHandoff(); + bool WaitForSocket(); + + std::shared_ptr AddHandler(const std::string& misc_name, + const std::string& cow_device_path, + const std::string& backing_device, + const std::string& base_path_merge); + bool StartHandler(const std::shared_ptr& handler); + bool StartMerge(const std::shared_ptr& handler); + + void SetTerminating() { terminating_ = true; } + void ReceivedSocketSignal() { received_socket_signal_ = true; } +}; + +} // namespace snapshot +} // namespace android