snapuserd: Wire up API's for Initiating and tracking Merge
Add new client API's for initiating and tracking merge. These API's will be used by libsnapshot. Track the merge completion in the server by walking through all the partitions. Each worker thread will update the merge completion as and when number of COW operations are completed. Server will gather all the completions of each partition and average it out. This is in sync with the current merge completion tracking for dm-snapshot. As a side effect, move the snapuserd_server.h/cpp files to dm-snapshot-merge directory as it will only be a maintaining code. Bug: 193863443 Test: Snapuserd_test Signed-off-by: Akilesh Kailash <akailash@google.com> Change-Id: I031eb1a11b0f426aafbed3d39d85b0c22b9030fb
This commit is contained in:
parent
8abe050eb5
commit
ff590a806c
11 changed files with 852 additions and 5 deletions
|
@ -56,7 +56,7 @@ cc_defaults {
|
|||
"fs_mgr_defaults",
|
||||
],
|
||||
srcs: [
|
||||
"snapuserd_server.cpp",
|
||||
"dm-snapshot-merge/snapuserd_server.cpp",
|
||||
"dm-snapshot-merge/snapuserd.cpp",
|
||||
"dm-snapshot-merge/snapuserd_worker.cpp",
|
||||
"dm-snapshot-merge/snapuserd_readahead.cpp",
|
||||
|
@ -67,6 +67,7 @@ cc_defaults {
|
|||
"user-space-merge/snapuserd_merge.cpp",
|
||||
"user-space-merge/snapuserd_readahead.cpp",
|
||||
"user-space-merge/snapuserd_transitions.cpp",
|
||||
"user-space-merge/snapuserd_server.cpp",
|
||||
],
|
||||
|
||||
cflags: [
|
||||
|
|
|
@ -31,6 +31,7 @@
|
|||
#include <android-base/scopeguard.h>
|
||||
#include <fs_mgr/file_wait.h>
|
||||
#include <snapuserd/snapuserd_client.h>
|
||||
|
||||
#include "snapuserd_server.h"
|
||||
|
||||
#define _REALLY_INCLUDE_SYS__SYSTEM_PROPERTIES_H_
|
|
@ -28,7 +28,7 @@
|
|||
#include <vector>
|
||||
|
||||
#include <android-base/unique_fd.h>
|
||||
#include "dm-snapshot-merge/snapuserd.h"
|
||||
#include "snapuserd.h"
|
||||
|
||||
namespace android {
|
||||
namespace snapshot {
|
|
@ -79,6 +79,12 @@ class SnapuserdClient {
|
|||
|
||||
// Returns true if the snapuserd instance supports bridging a socket to second-stage init.
|
||||
bool SupportsSecondStageSocketHandoff();
|
||||
|
||||
// Returns true if the merge is started(or resumed from crash).
|
||||
bool InitiateMerge(const std::string& misc_name);
|
||||
|
||||
// Returns Merge completion percentage
|
||||
double GetMergePercent();
|
||||
};
|
||||
|
||||
} // namespace snapshot
|
||||
|
|
|
@ -231,5 +231,26 @@ bool SnapuserdClient::DetachSnapuserd() {
|
|||
return true;
|
||||
}
|
||||
|
||||
bool SnapuserdClient::InitiateMerge(const std::string& misc_name) {
|
||||
std::string msg = "initiate_merge," + misc_name;
|
||||
if (!Sendmsg(msg)) {
|
||||
LOG(ERROR) << "Failed to send message " << msg << " to snapuserd";
|
||||
return false;
|
||||
}
|
||||
std::string response = Receivemsg();
|
||||
return response == "success";
|
||||
}
|
||||
|
||||
double SnapuserdClient::GetMergePercent() {
|
||||
std::string msg = "merge_percent";
|
||||
if (!Sendmsg(msg)) {
|
||||
LOG(ERROR) << "Failed to send message " << msg << " to snapuserd";
|
||||
return false;
|
||||
}
|
||||
std::string response = Receivemsg();
|
||||
|
||||
return std::stod(response);
|
||||
}
|
||||
|
||||
} // namespace snapshot
|
||||
} // namespace android
|
||||
|
|
|
@ -21,8 +21,6 @@
|
|||
#include <gflags/gflags.h>
|
||||
#include <snapuserd/snapuserd_client.h>
|
||||
|
||||
#include "snapuserd_server.h"
|
||||
|
||||
DEFINE_string(socket, android::snapshot::kSnapuserdSocket, "Named socket or socket path.");
|
||||
DEFINE_bool(no_socket, false,
|
||||
"If true, no socket is used. Each additional argument is an INIT message.");
|
||||
|
|
|
@ -19,7 +19,7 @@
|
|||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
#include "snapuserd_server.h"
|
||||
#include "dm-snapshot-merge/snapuserd_server.h"
|
||||
|
||||
namespace android {
|
||||
namespace snapshot {
|
||||
|
|
|
@ -59,6 +59,15 @@ std::unique_ptr<CowReader> SnapshotHandler::CloneReaderForWorker() {
|
|||
return reader_->CloneCowReader();
|
||||
}
|
||||
|
||||
void SnapshotHandler::UpdateMergeCompletionPercentage() {
|
||||
struct CowHeader* ch = reinterpret_cast<struct CowHeader*>(mapped_addr_);
|
||||
merge_completion_percentage_ = (ch->num_merge_ops * 100.0) / reader_->get_num_total_data_ops();
|
||||
|
||||
SNAP_LOG(DEBUG) << "Merge-complete %: " << merge_completion_percentage_
|
||||
<< " num_merge_ops: " << ch->num_merge_ops
|
||||
<< " total-ops: " << reader_->get_num_total_data_ops();
|
||||
}
|
||||
|
||||
bool SnapshotHandler::CommitMerge(int num_merge_ops) {
|
||||
struct CowHeader* ch = reinterpret_cast<struct CowHeader*>(mapped_addr_);
|
||||
ch->num_merge_ops += num_merge_ops;
|
||||
|
@ -95,6 +104,12 @@ bool SnapshotHandler::CommitMerge(int num_merge_ops) {
|
|||
}
|
||||
}
|
||||
|
||||
// Update the merge completion - this is used by update engine
|
||||
// to track the completion. No need to take a lock. It is ok
|
||||
// even if there is a miss on reading a latest updated value.
|
||||
// Subsequent polling will eventually converge to completion.
|
||||
UpdateMergeCompletionPercentage();
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -152,6 +167,8 @@ bool SnapshotHandler::ReadMetadata() {
|
|||
return false;
|
||||
}
|
||||
|
||||
UpdateMergeCompletionPercentage();
|
||||
|
||||
// Initialize the iterator for reading metadata
|
||||
std::unique_ptr<ICowOpIter> cowop_iter = reader_->GetMergeOpIter();
|
||||
|
||||
|
|
|
@ -280,6 +280,7 @@ class SnapshotHandler : public std::enable_shared_from_this<SnapshotHandler> {
|
|||
int GetTotalBlocksToMerge() { return total_ra_blocks_merged_; }
|
||||
void SetSocketPresent(bool socket) { is_socket_present_ = socket; }
|
||||
bool MergeInitiated() { return merge_initiated_; }
|
||||
double GetMergePercentage() { return merge_completion_percentage_; }
|
||||
|
||||
// Merge Block State Transitions
|
||||
void SetMergeCompleted(size_t block_index);
|
||||
|
@ -295,6 +296,7 @@ class SnapshotHandler : public std::enable_shared_from_this<SnapshotHandler> {
|
|||
chunk_t SectorToChunk(sector_t sector) { return sector >> CHUNK_SHIFT; }
|
||||
bool IsBlockAligned(int read_size) { return ((read_size & (BLOCK_SZ - 1)) == 0); }
|
||||
struct BufferState* GetBufferState();
|
||||
void UpdateMergeCompletionPercentage();
|
||||
|
||||
void ReadBlocks(const std::string partition_name, const std::string& dm_block_device);
|
||||
void ReadBlocksToCache(const std::string& dm_block_device, const std::string& partition_name,
|
||||
|
@ -342,6 +344,7 @@ class SnapshotHandler : public std::enable_shared_from_this<SnapshotHandler> {
|
|||
std::vector<std::unique_ptr<MergeGroupState>> merge_blk_state_;
|
||||
|
||||
std::unique_ptr<Worker> merge_thread_;
|
||||
double merge_completion_percentage_;
|
||||
|
||||
bool merge_initiated_ = false;
|
||||
bool attached_ = false;
|
||||
|
|
|
@ -0,0 +1,660 @@
|
|||
/*
|
||||
* Copyright (C) 2020 The Android Open Source Project
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include <arpa/inet.h>
|
||||
#include <cutils/sockets.h>
|
||||
#include <errno.h>
|
||||
#include <netinet/in.h>
|
||||
#include <stdint.h>
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <sys/socket.h>
|
||||
#include <sys/types.h>
|
||||
#include <unistd.h>
|
||||
|
||||
#include <android-base/cmsg.h>
|
||||
#include <android-base/logging.h>
|
||||
#include <android-base/properties.h>
|
||||
#include <android-base/scopeguard.h>
|
||||
#include <fs_mgr/file_wait.h>
|
||||
#include <snapuserd/snapuserd_client.h>
|
||||
#include "snapuserd_server.h"
|
||||
|
||||
#define _REALLY_INCLUDE_SYS__SYSTEM_PROPERTIES_H_
|
||||
#include <sys/_system_properties.h>
|
||||
|
||||
namespace android {
|
||||
namespace snapshot {
|
||||
|
||||
using namespace std::string_literals;
|
||||
|
||||
using android::base::borrowed_fd;
|
||||
using android::base::unique_fd;
|
||||
|
||||
DaemonOps SnapuserServer::Resolveop(std::string& input) {
|
||||
if (input == "init") return DaemonOps::INIT;
|
||||
if (input == "start") return DaemonOps::START;
|
||||
if (input == "stop") return DaemonOps::STOP;
|
||||
if (input == "query") return DaemonOps::QUERY;
|
||||
if (input == "delete") return DaemonOps::DELETE;
|
||||
if (input == "detach") return DaemonOps::DETACH;
|
||||
if (input == "supports") return DaemonOps::SUPPORTS;
|
||||
if (input == "initiate_merge") return DaemonOps::INITIATE;
|
||||
if (input == "merge_percent") return DaemonOps::PERCENTAGE;
|
||||
|
||||
return DaemonOps::INVALID;
|
||||
}
|
||||
|
||||
SnapuserServer::~SnapuserServer() {
|
||||
// Close any client sockets that were added via AcceptClient().
|
||||
for (size_t i = 1; i < watched_fds_.size(); i++) {
|
||||
close(watched_fds_[i].fd);
|
||||
}
|
||||
}
|
||||
|
||||
std::string SnapuserServer::GetDaemonStatus() {
|
||||
std::string msg = "";
|
||||
|
||||
if (IsTerminating())
|
||||
msg = "passive";
|
||||
else
|
||||
msg = "active";
|
||||
|
||||
return msg;
|
||||
}
|
||||
|
||||
void SnapuserServer::Parsemsg(std::string const& msg, const char delim,
|
||||
std::vector<std::string>& out) {
|
||||
std::stringstream ss(msg);
|
||||
std::string s;
|
||||
|
||||
while (std::getline(ss, s, delim)) {
|
||||
out.push_back(s);
|
||||
}
|
||||
}
|
||||
|
||||
void SnapuserServer::ShutdownThreads() {
|
||||
terminating_ = true;
|
||||
JoinAllThreads();
|
||||
}
|
||||
|
||||
DmUserHandler::DmUserHandler(std::shared_ptr<SnapshotHandler> snapuserd)
|
||||
: snapuserd_(snapuserd), misc_name_(snapuserd_->GetMiscName()) {}
|
||||
|
||||
bool SnapuserServer::Sendmsg(android::base::borrowed_fd fd, const std::string& msg) {
|
||||
ssize_t ret = TEMP_FAILURE_RETRY(send(fd.get(), msg.data(), msg.size(), MSG_NOSIGNAL));
|
||||
if (ret < 0) {
|
||||
PLOG(ERROR) << "Snapuserd:server: send() failed";
|
||||
return false;
|
||||
}
|
||||
|
||||
if (ret < msg.size()) {
|
||||
LOG(ERROR) << "Partial send; expected " << msg.size() << " bytes, sent " << ret;
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
bool SnapuserServer::Recv(android::base::borrowed_fd fd, std::string* data) {
|
||||
char msg[MAX_PACKET_SIZE];
|
||||
ssize_t rv = TEMP_FAILURE_RETRY(recv(fd.get(), msg, sizeof(msg), 0));
|
||||
if (rv < 0) {
|
||||
PLOG(ERROR) << "recv failed";
|
||||
return false;
|
||||
}
|
||||
*data = std::string(msg, rv);
|
||||
return true;
|
||||
}
|
||||
|
||||
bool SnapuserServer::Receivemsg(android::base::borrowed_fd fd, const std::string& str) {
|
||||
const char delim = ',';
|
||||
|
||||
std::vector<std::string> out;
|
||||
Parsemsg(str, delim, out);
|
||||
DaemonOps op = Resolveop(out[0]);
|
||||
|
||||
switch (op) {
|
||||
case DaemonOps::INIT: {
|
||||
// Message format:
|
||||
// init,<misc_name>,<cow_device_path>,<backing_device>,<base_path_merge>
|
||||
//
|
||||
// Reads the metadata and send the number of sectors
|
||||
if (out.size() != 5) {
|
||||
LOG(ERROR) << "Malformed init message, " << out.size() << " parts";
|
||||
return Sendmsg(fd, "fail");
|
||||
}
|
||||
|
||||
auto handler = AddHandler(out[1], out[2], out[3], out[4]);
|
||||
if (!handler) {
|
||||
return Sendmsg(fd, "fail");
|
||||
}
|
||||
|
||||
auto retval = "success," + std::to_string(handler->snapuserd()->GetNumSectors());
|
||||
return Sendmsg(fd, retval);
|
||||
}
|
||||
case DaemonOps::START: {
|
||||
// Message format:
|
||||
// start,<misc_name>
|
||||
//
|
||||
// Start the new thread which binds to dm-user misc device
|
||||
if (out.size() != 2) {
|
||||
LOG(ERROR) << "Malformed start message, " << out.size() << " parts";
|
||||
return Sendmsg(fd, "fail");
|
||||
}
|
||||
|
||||
std::lock_guard<std::mutex> lock(lock_);
|
||||
auto iter = FindHandler(&lock, out[1]);
|
||||
if (iter == dm_users_.end()) {
|
||||
LOG(ERROR) << "Could not find handler: " << out[1];
|
||||
return Sendmsg(fd, "fail");
|
||||
}
|
||||
if (!(*iter)->snapuserd() || (*iter)->snapuserd()->IsAttached()) {
|
||||
LOG(ERROR) << "Tried to re-attach control device: " << out[1];
|
||||
return Sendmsg(fd, "fail");
|
||||
}
|
||||
if (!StartHandler(*iter)) {
|
||||
return Sendmsg(fd, "fail");
|
||||
}
|
||||
return Sendmsg(fd, "success");
|
||||
}
|
||||
case DaemonOps::STOP: {
|
||||
// Message format: stop
|
||||
//
|
||||
// Stop all the threads gracefully and then shutdown the
|
||||
// main thread
|
||||
SetTerminating();
|
||||
ShutdownThreads();
|
||||
return true;
|
||||
}
|
||||
case DaemonOps::QUERY: {
|
||||
// Message format: query
|
||||
//
|
||||
// As part of transition, Second stage daemon will be
|
||||
// created before terminating the first stage daemon. Hence,
|
||||
// for a brief period client may have to distiguish between
|
||||
// first stage daemon and second stage daemon.
|
||||
//
|
||||
// Second stage daemon is marked as active and hence will
|
||||
// be ready to receive control message.
|
||||
return Sendmsg(fd, GetDaemonStatus());
|
||||
}
|
||||
case DaemonOps::DELETE: {
|
||||
// Message format:
|
||||
// delete,<misc_name>
|
||||
if (out.size() != 2) {
|
||||
LOG(ERROR) << "Malformed delete message, " << out.size() << " parts";
|
||||
return Sendmsg(fd, "fail");
|
||||
}
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(lock_);
|
||||
auto iter = FindHandler(&lock, out[1]);
|
||||
if (iter == dm_users_.end()) {
|
||||
// After merge is completed, we swap dm-user table with
|
||||
// the underlying dm-linear base device. Hence, worker
|
||||
// threads would have terminted and was removed from
|
||||
// the list.
|
||||
LOG(DEBUG) << "Could not find handler: " << out[1];
|
||||
return Sendmsg(fd, "success");
|
||||
}
|
||||
|
||||
if (!(*iter)->ThreadTerminated()) {
|
||||
(*iter)->snapuserd()->NotifyIOTerminated();
|
||||
}
|
||||
}
|
||||
if (!RemoveAndJoinHandler(out[1])) {
|
||||
return Sendmsg(fd, "fail");
|
||||
}
|
||||
return Sendmsg(fd, "success");
|
||||
}
|
||||
case DaemonOps::DETACH: {
|
||||
std::lock_guard<std::mutex> lock(lock_);
|
||||
TerminateMergeThreads(&lock);
|
||||
terminating_ = true;
|
||||
return true;
|
||||
}
|
||||
case DaemonOps::SUPPORTS: {
|
||||
if (out.size() != 2) {
|
||||
LOG(ERROR) << "Malformed supports message, " << out.size() << " parts";
|
||||
return Sendmsg(fd, "fail");
|
||||
}
|
||||
if (out[1] == "second_stage_socket_handoff") {
|
||||
return Sendmsg(fd, "success");
|
||||
}
|
||||
return Sendmsg(fd, "fail");
|
||||
}
|
||||
case DaemonOps::INITIATE: {
|
||||
if (out.size() != 2) {
|
||||
LOG(ERROR) << "Malformed initiate-merge message, " << out.size() << " parts";
|
||||
return Sendmsg(fd, "fail");
|
||||
}
|
||||
if (out[0] == "initiate_merge") {
|
||||
std::lock_guard<std::mutex> lock(lock_);
|
||||
auto iter = FindHandler(&lock, out[1]);
|
||||
if (iter == dm_users_.end()) {
|
||||
LOG(ERROR) << "Could not find handler: " << out[1];
|
||||
return Sendmsg(fd, "fail");
|
||||
}
|
||||
|
||||
if (!StartMerge(*iter)) {
|
||||
return Sendmsg(fd, "fail");
|
||||
}
|
||||
|
||||
return Sendmsg(fd, "success");
|
||||
}
|
||||
return Sendmsg(fd, "fail");
|
||||
}
|
||||
case DaemonOps::PERCENTAGE: {
|
||||
std::lock_guard<std::mutex> lock(lock_);
|
||||
double percentage = GetMergePercentage(&lock);
|
||||
|
||||
return Sendmsg(fd, std::to_string(percentage));
|
||||
}
|
||||
default: {
|
||||
LOG(ERROR) << "Received unknown message type from client";
|
||||
Sendmsg(fd, "fail");
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void SnapuserServer::RunThread(std::shared_ptr<DmUserHandler> handler) {
|
||||
LOG(INFO) << "Entering thread for handler: " << handler->misc_name();
|
||||
|
||||
handler->snapuserd()->SetSocketPresent(is_socket_present_);
|
||||
if (!handler->snapuserd()->Start()) {
|
||||
LOG(ERROR) << " Failed to launch all worker threads";
|
||||
}
|
||||
|
||||
handler->snapuserd()->CloseFds();
|
||||
handler->snapuserd()->CheckMergeCompletionStatus();
|
||||
handler->snapuserd()->UnmapBufferRegion();
|
||||
|
||||
auto misc_name = handler->misc_name();
|
||||
LOG(INFO) << "Handler thread about to exit: " << misc_name;
|
||||
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(lock_);
|
||||
num_partitions_merge_complete_ += 1;
|
||||
handler->SetThreadTerminated();
|
||||
auto iter = FindHandler(&lock, handler->misc_name());
|
||||
if (iter == dm_users_.end()) {
|
||||
// RemoveAndJoinHandler() already removed us from the list, and is
|
||||
// now waiting on a join(), so just return. Additionally, release
|
||||
// all the resources held by snapuserd object which are shared
|
||||
// by worker threads. This should be done when the last reference
|
||||
// of "handler" is released; but we will explicitly release here
|
||||
// to make sure snapuserd object is freed as it is the biggest
|
||||
// consumer of memory in the daemon.
|
||||
handler->FreeResources();
|
||||
LOG(INFO) << "Exiting handler thread to allow for join: " << misc_name;
|
||||
return;
|
||||
}
|
||||
|
||||
LOG(INFO) << "Exiting handler thread and freeing resources: " << misc_name;
|
||||
|
||||
if (handler->snapuserd()->IsAttached()) {
|
||||
handler->thread().detach();
|
||||
}
|
||||
|
||||
// Important: free resources within the lock. This ensures that if
|
||||
// WaitForDelete() is called, the handler is either in the list, or
|
||||
// it's not and its resources are guaranteed to be freed.
|
||||
handler->FreeResources();
|
||||
dm_users_.erase(iter);
|
||||
}
|
||||
}
|
||||
|
||||
bool SnapuserServer::Start(const std::string& socketname) {
|
||||
bool start_listening = true;
|
||||
|
||||
sockfd_.reset(android_get_control_socket(socketname.c_str()));
|
||||
if (sockfd_ < 0) {
|
||||
sockfd_.reset(socket_local_server(socketname.c_str(), ANDROID_SOCKET_NAMESPACE_RESERVED,
|
||||
SOCK_STREAM));
|
||||
if (sockfd_ < 0) {
|
||||
PLOG(ERROR) << "Failed to create server socket " << socketname;
|
||||
return false;
|
||||
}
|
||||
start_listening = false;
|
||||
}
|
||||
return StartWithSocket(start_listening);
|
||||
}
|
||||
|
||||
bool SnapuserServer::StartWithSocket(bool start_listening) {
|
||||
if (start_listening && listen(sockfd_.get(), 4) < 0) {
|
||||
PLOG(ERROR) << "listen socket failed";
|
||||
return false;
|
||||
}
|
||||
|
||||
AddWatchedFd(sockfd_, POLLIN);
|
||||
is_socket_present_ = true;
|
||||
|
||||
// If started in first-stage init, the property service won't be online.
|
||||
if (access("/dev/socket/property_service", F_OK) == 0) {
|
||||
if (!android::base::SetProperty("snapuserd.ready", "true")) {
|
||||
LOG(ERROR) << "Failed to set snapuserd.ready property";
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
LOG(DEBUG) << "Snapuserd server now accepting connections";
|
||||
return true;
|
||||
}
|
||||
|
||||
bool SnapuserServer::Run() {
|
||||
LOG(INFO) << "Now listening on snapuserd socket";
|
||||
|
||||
while (!IsTerminating()) {
|
||||
int rv = TEMP_FAILURE_RETRY(poll(watched_fds_.data(), watched_fds_.size(), -1));
|
||||
if (rv < 0) {
|
||||
PLOG(ERROR) << "poll failed";
|
||||
return false;
|
||||
}
|
||||
if (!rv) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (watched_fds_[0].revents) {
|
||||
AcceptClient();
|
||||
}
|
||||
|
||||
auto iter = watched_fds_.begin() + 1;
|
||||
while (iter != watched_fds_.end()) {
|
||||
if (iter->revents && !HandleClient(iter->fd, iter->revents)) {
|
||||
close(iter->fd);
|
||||
iter = watched_fds_.erase(iter);
|
||||
} else {
|
||||
iter++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
JoinAllThreads();
|
||||
return true;
|
||||
}
|
||||
|
||||
void SnapuserServer::JoinAllThreads() {
|
||||
// Acquire the thread list within the lock.
|
||||
std::vector<std::shared_ptr<DmUserHandler>> dm_users;
|
||||
{
|
||||
std::lock_guard<std::mutex> guard(lock_);
|
||||
dm_users = std::move(dm_users_);
|
||||
}
|
||||
|
||||
for (auto& client : dm_users) {
|
||||
auto& th = client->thread();
|
||||
|
||||
if (th.joinable()) th.join();
|
||||
}
|
||||
}
|
||||
|
||||
void SnapuserServer::AddWatchedFd(android::base::borrowed_fd fd, int events) {
|
||||
struct pollfd p = {};
|
||||
p.fd = fd.get();
|
||||
p.events = events;
|
||||
watched_fds_.emplace_back(std::move(p));
|
||||
}
|
||||
|
||||
void SnapuserServer::AcceptClient() {
|
||||
int fd = TEMP_FAILURE_RETRY(accept4(sockfd_.get(), nullptr, nullptr, SOCK_CLOEXEC));
|
||||
if (fd < 0) {
|
||||
PLOG(ERROR) << "accept4 failed";
|
||||
return;
|
||||
}
|
||||
|
||||
AddWatchedFd(fd, POLLIN);
|
||||
}
|
||||
|
||||
bool SnapuserServer::HandleClient(android::base::borrowed_fd fd, int revents) {
|
||||
if (revents & POLLHUP) {
|
||||
LOG(DEBUG) << "Snapuserd client disconnected";
|
||||
return false;
|
||||
}
|
||||
|
||||
std::string str;
|
||||
if (!Recv(fd, &str)) {
|
||||
return false;
|
||||
}
|
||||
if (!Receivemsg(fd, str)) {
|
||||
LOG(ERROR) << "Encountered error handling client message, revents: " << revents;
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
void SnapuserServer::Interrupt() {
|
||||
// Force close the socket so poll() fails.
|
||||
sockfd_ = {};
|
||||
SetTerminating();
|
||||
}
|
||||
|
||||
std::shared_ptr<DmUserHandler> SnapuserServer::AddHandler(const std::string& misc_name,
|
||||
const std::string& cow_device_path,
|
||||
const std::string& backing_device,
|
||||
const std::string& base_path_merge) {
|
||||
auto snapuserd = std::make_shared<SnapshotHandler>(misc_name, cow_device_path, backing_device,
|
||||
base_path_merge);
|
||||
if (!snapuserd->InitCowDevice()) {
|
||||
LOG(ERROR) << "Failed to initialize Snapuserd";
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
if (!snapuserd->InitializeWorkers()) {
|
||||
LOG(ERROR) << "Failed to initialize workers";
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
auto handler = std::make_shared<DmUserHandler>(snapuserd);
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(lock_);
|
||||
if (FindHandler(&lock, misc_name) != dm_users_.end()) {
|
||||
LOG(ERROR) << "Handler already exists: " << misc_name;
|
||||
return nullptr;
|
||||
}
|
||||
dm_users_.push_back(handler);
|
||||
}
|
||||
return handler;
|
||||
}
|
||||
|
||||
bool SnapuserServer::StartHandler(const std::shared_ptr<DmUserHandler>& handler) {
|
||||
if (handler->snapuserd()->IsAttached()) {
|
||||
LOG(ERROR) << "Handler already attached";
|
||||
return false;
|
||||
}
|
||||
|
||||
handler->snapuserd()->AttachControlDevice();
|
||||
|
||||
handler->thread() = std::thread(std::bind(&SnapuserServer::RunThread, this, handler));
|
||||
return true;
|
||||
}
|
||||
|
||||
bool SnapuserServer::StartMerge(const std::shared_ptr<DmUserHandler>& handler) {
|
||||
if (!handler->snapuserd()->IsAttached()) {
|
||||
LOG(ERROR) << "Handler not attached to dm-user - Merge thread cannot be started";
|
||||
return false;
|
||||
}
|
||||
|
||||
handler->snapuserd()->InitiateMerge();
|
||||
return true;
|
||||
}
|
||||
|
||||
auto SnapuserServer::FindHandler(std::lock_guard<std::mutex>* proof_of_lock,
|
||||
const std::string& misc_name) -> HandlerList::iterator {
|
||||
CHECK(proof_of_lock);
|
||||
|
||||
for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) {
|
||||
if ((*iter)->misc_name() == misc_name) {
|
||||
return iter;
|
||||
}
|
||||
}
|
||||
return dm_users_.end();
|
||||
}
|
||||
|
||||
void SnapuserServer::TerminateMergeThreads(std::lock_guard<std::mutex>* proof_of_lock) {
|
||||
CHECK(proof_of_lock);
|
||||
|
||||
for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) {
|
||||
if (!(*iter)->ThreadTerminated()) {
|
||||
(*iter)->snapuserd()->NotifyIOTerminated();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
double SnapuserServer::GetMergePercentage(std::lock_guard<std::mutex>* proof_of_lock) {
|
||||
CHECK(proof_of_lock);
|
||||
double percentage = 0.0;
|
||||
int n = 0;
|
||||
|
||||
for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) {
|
||||
auto& th = (*iter)->thread();
|
||||
if (th.joinable()) {
|
||||
// Merge percentage by individual partitions wherein merge is still
|
||||
// in-progress
|
||||
percentage += (*iter)->snapuserd()->GetMergePercentage();
|
||||
n += 1;
|
||||
}
|
||||
}
|
||||
|
||||
// Calculate final merge including those partitions where merge was already
|
||||
// completed - num_partitions_merge_complete_ will track them when each
|
||||
// thread exists in RunThread.
|
||||
int total_partitions = n + num_partitions_merge_complete_;
|
||||
|
||||
if (total_partitions) {
|
||||
percentage = ((num_partitions_merge_complete_ * 100.0) + percentage) / total_partitions;
|
||||
}
|
||||
|
||||
LOG(DEBUG) << "Merge %: " << percentage
|
||||
<< " num_partitions_merge_complete_: " << num_partitions_merge_complete_
|
||||
<< " total_partitions: " << total_partitions << " n: " << n;
|
||||
return percentage;
|
||||
}
|
||||
|
||||
bool SnapuserServer::RemoveAndJoinHandler(const std::string& misc_name) {
|
||||
std::shared_ptr<DmUserHandler> handler;
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(lock_);
|
||||
|
||||
auto iter = FindHandler(&lock, misc_name);
|
||||
if (iter == dm_users_.end()) {
|
||||
// Client already deleted.
|
||||
return true;
|
||||
}
|
||||
handler = std::move(*iter);
|
||||
dm_users_.erase(iter);
|
||||
}
|
||||
|
||||
auto& th = handler->thread();
|
||||
if (th.joinable()) {
|
||||
th.join();
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
bool SnapuserServer::WaitForSocket() {
|
||||
auto scope_guard = android::base::make_scope_guard([this]() -> void { JoinAllThreads(); });
|
||||
|
||||
auto socket_path = ANDROID_SOCKET_DIR "/"s + kSnapuserdSocketProxy;
|
||||
|
||||
if (!android::fs_mgr::WaitForFile(socket_path, std::chrono::milliseconds::max())) {
|
||||
LOG(ERROR)
|
||||
<< "Failed to wait for proxy socket, second-stage snapuserd will fail to connect";
|
||||
return false;
|
||||
}
|
||||
|
||||
// We must re-initialize property service access, since we launched before
|
||||
// second-stage init.
|
||||
__system_properties_init();
|
||||
|
||||
if (!android::base::WaitForProperty("snapuserd.proxy_ready", "true")) {
|
||||
LOG(ERROR)
|
||||
<< "Failed to wait for proxy property, second-stage snapuserd will fail to connect";
|
||||
return false;
|
||||
}
|
||||
|
||||
unique_fd fd(socket_local_client(kSnapuserdSocketProxy, ANDROID_SOCKET_NAMESPACE_RESERVED,
|
||||
SOCK_SEQPACKET));
|
||||
if (fd < 0) {
|
||||
PLOG(ERROR) << "Failed to connect to socket proxy";
|
||||
return false;
|
||||
}
|
||||
|
||||
char code[1];
|
||||
std::vector<unique_fd> fds;
|
||||
ssize_t rv = android::base::ReceiveFileDescriptorVector(fd, code, sizeof(code), 1, &fds);
|
||||
if (rv < 0) {
|
||||
PLOG(ERROR) << "Failed to receive server socket over proxy";
|
||||
return false;
|
||||
}
|
||||
if (fds.empty()) {
|
||||
LOG(ERROR) << "Expected at least one file descriptor from proxy";
|
||||
return false;
|
||||
}
|
||||
|
||||
// We don't care if the ACK is received.
|
||||
code[0] = 'a';
|
||||
if (TEMP_FAILURE_RETRY(send(fd, code, sizeof(code), MSG_NOSIGNAL) < 0)) {
|
||||
PLOG(ERROR) << "Failed to send ACK to proxy";
|
||||
return false;
|
||||
}
|
||||
|
||||
sockfd_ = std::move(fds[0]);
|
||||
if (!StartWithSocket(true)) {
|
||||
return false;
|
||||
}
|
||||
return Run();
|
||||
}
|
||||
|
||||
bool SnapuserServer::RunForSocketHandoff() {
|
||||
unique_fd proxy_fd(android_get_control_socket(kSnapuserdSocketProxy));
|
||||
if (proxy_fd < 0) {
|
||||
PLOG(FATAL) << "Proxy could not get android control socket " << kSnapuserdSocketProxy;
|
||||
}
|
||||
borrowed_fd server_fd(android_get_control_socket(kSnapuserdSocket));
|
||||
if (server_fd < 0) {
|
||||
PLOG(FATAL) << "Proxy could not get android control socket " << kSnapuserdSocket;
|
||||
}
|
||||
|
||||
if (listen(proxy_fd.get(), 4) < 0) {
|
||||
PLOG(FATAL) << "Proxy listen socket failed";
|
||||
}
|
||||
|
||||
if (!android::base::SetProperty("snapuserd.proxy_ready", "true")) {
|
||||
LOG(FATAL) << "Proxy failed to set ready property";
|
||||
}
|
||||
|
||||
unique_fd client_fd(
|
||||
TEMP_FAILURE_RETRY(accept4(proxy_fd.get(), nullptr, nullptr, SOCK_CLOEXEC)));
|
||||
if (client_fd < 0) {
|
||||
PLOG(FATAL) << "Proxy accept failed";
|
||||
}
|
||||
|
||||
char code[1] = {'a'};
|
||||
std::vector<int> fds = {server_fd.get()};
|
||||
ssize_t rv = android::base::SendFileDescriptorVector(client_fd, code, sizeof(code), fds);
|
||||
if (rv < 0) {
|
||||
PLOG(FATAL) << "Proxy could not send file descriptor to snapuserd";
|
||||
}
|
||||
// Wait for an ACK - results don't matter, we just don't want to risk closing
|
||||
// the proxy socket too early.
|
||||
if (recv(client_fd, code, sizeof(code), 0) < 0) {
|
||||
PLOG(FATAL) << "Proxy could not receive terminating code from snapuserd";
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
} // namespace snapshot
|
||||
} // namespace android
|
140
fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.h
Normal file
140
fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.h
Normal file
|
@ -0,0 +1,140 @@
|
|||
// Copyright (C) 2020 The Android Open Source Project
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <poll.h>
|
||||
|
||||
#include <cstdio>
|
||||
#include <cstring>
|
||||
#include <functional>
|
||||
#include <future>
|
||||
#include <iostream>
|
||||
#include <mutex>
|
||||
#include <sstream>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
#include <vector>
|
||||
|
||||
#include <android-base/unique_fd.h>
|
||||
#include "snapuserd_core.h"
|
||||
|
||||
namespace android {
|
||||
namespace snapshot {
|
||||
|
||||
static constexpr uint32_t MAX_PACKET_SIZE = 512;
|
||||
|
||||
enum class DaemonOps {
|
||||
INIT,
|
||||
START,
|
||||
QUERY,
|
||||
STOP,
|
||||
DELETE,
|
||||
DETACH,
|
||||
SUPPORTS,
|
||||
INITIATE,
|
||||
PERCENTAGE,
|
||||
INVALID,
|
||||
};
|
||||
|
||||
class DmUserHandler {
|
||||
public:
|
||||
explicit DmUserHandler(std::shared_ptr<SnapshotHandler> snapuserd);
|
||||
|
||||
void FreeResources() {
|
||||
// Each worker thread holds a reference to snapuserd.
|
||||
// Clear them so that all the resources
|
||||
// held by snapuserd is released
|
||||
if (snapuserd_) {
|
||||
snapuserd_->FreeResources();
|
||||
snapuserd_ = nullptr;
|
||||
}
|
||||
}
|
||||
const std::shared_ptr<SnapshotHandler>& snapuserd() const { return snapuserd_; }
|
||||
std::thread& thread() { return thread_; }
|
||||
|
||||
const std::string& misc_name() const { return misc_name_; }
|
||||
bool ThreadTerminated() { return thread_terminated_; }
|
||||
void SetThreadTerminated() { thread_terminated_ = true; }
|
||||
|
||||
private:
|
||||
std::thread thread_;
|
||||
std::shared_ptr<SnapshotHandler> snapuserd_;
|
||||
std::string misc_name_;
|
||||
bool thread_terminated_ = false;
|
||||
};
|
||||
|
||||
class SnapuserServer {
|
||||
private:
|
||||
android::base::unique_fd sockfd_;
|
||||
bool terminating_;
|
||||
volatile bool received_socket_signal_ = false;
|
||||
std::vector<struct pollfd> watched_fds_;
|
||||
bool is_socket_present_ = false;
|
||||
int num_partitions_merge_complete_ = 0;
|
||||
|
||||
std::mutex lock_;
|
||||
|
||||
using HandlerList = std::vector<std::shared_ptr<DmUserHandler>>;
|
||||
HandlerList dm_users_;
|
||||
|
||||
void AddWatchedFd(android::base::borrowed_fd fd, int events);
|
||||
void AcceptClient();
|
||||
bool HandleClient(android::base::borrowed_fd fd, int revents);
|
||||
bool Recv(android::base::borrowed_fd fd, std::string* data);
|
||||
bool Sendmsg(android::base::borrowed_fd fd, const std::string& msg);
|
||||
bool Receivemsg(android::base::borrowed_fd fd, const std::string& str);
|
||||
|
||||
void ShutdownThreads();
|
||||
bool RemoveAndJoinHandler(const std::string& control_device);
|
||||
DaemonOps Resolveop(std::string& input);
|
||||
std::string GetDaemonStatus();
|
||||
void Parsemsg(std::string const& msg, const char delim, std::vector<std::string>& out);
|
||||
|
||||
bool IsTerminating() { return terminating_; }
|
||||
|
||||
void RunThread(std::shared_ptr<DmUserHandler> handler);
|
||||
void JoinAllThreads();
|
||||
bool StartWithSocket(bool start_listening);
|
||||
|
||||
// Find a DmUserHandler within a lock.
|
||||
HandlerList::iterator FindHandler(std::lock_guard<std::mutex>* proof_of_lock,
|
||||
const std::string& misc_name);
|
||||
|
||||
double GetMergePercentage(std::lock_guard<std::mutex>* proof_of_lock);
|
||||
void TerminateMergeThreads(std::lock_guard<std::mutex>* proof_of_lock);
|
||||
|
||||
public:
|
||||
SnapuserServer() { terminating_ = false; }
|
||||
~SnapuserServer();
|
||||
|
||||
bool Start(const std::string& socketname);
|
||||
bool Run();
|
||||
void Interrupt();
|
||||
bool RunForSocketHandoff();
|
||||
bool WaitForSocket();
|
||||
|
||||
std::shared_ptr<DmUserHandler> AddHandler(const std::string& misc_name,
|
||||
const std::string& cow_device_path,
|
||||
const std::string& backing_device,
|
||||
const std::string& base_path_merge);
|
||||
bool StartHandler(const std::shared_ptr<DmUserHandler>& handler);
|
||||
bool StartMerge(const std::shared_ptr<DmUserHandler>& handler);
|
||||
|
||||
void SetTerminating() { terminating_ = true; }
|
||||
void ReceivedSocketSignal() { received_socket_signal_ = true; }
|
||||
};
|
||||
|
||||
} // namespace snapshot
|
||||
} // namespace android
|
Loading…
Reference in a new issue