updater: Move RangeSinkWrite into RangeSinkState.

Then rename RangeSinkState to RangeSinkWriter. RangeSinkWriter reads
data from the given FD, and writes them to the desination RangeSet.

Test: Apply an incremental with the new updater.
Change-Id: I5e3ab6fc082efa1726562c55b56e2d418fe4acaf
This commit is contained in:
Tao Bao 2017-03-26 14:03:52 -07:00
parent 850f89f198
commit 60a70afc0a

View file

@ -231,125 +231,135 @@ static void allocate(size_t size, std::vector<uint8_t>& buffer) {
buffer.resize(size);
}
struct RangeSinkState {
explicit RangeSinkState(RangeSet& rs) : tgt(rs) { };
/**
* RangeSinkWriter reads data from the given FD, and writes them to the destination specified by the
* given RangeSet.
*/
class RangeSinkWriter {
public:
RangeSinkWriter(int fd, const RangeSet& tgt)
: fd_(fd), tgt_(tgt), next_range_(0), current_range_left_(0) {
CHECK_NE(tgt.count, static_cast<size_t>(0));
};
int fd;
const RangeSet& tgt;
size_t p_block;
size_t p_remain;
};
static size_t RangeSinkWrite(const uint8_t* data, size_t size, RangeSinkState* rss) {
if (rss->p_remain == 0) {
LOG(ERROR) << "range sink write overrun";
return 0;
bool Finished() const {
return next_range_ == tgt_.count && current_range_left_ == 0;
}
size_t written = 0;
while (size > 0) {
size_t write_now = size;
if (rss->p_remain < write_now) {
write_now = rss->p_remain;
size_t Write(const uint8_t* data, size_t size) {
if (Finished()) {
LOG(ERROR) << "range sink write overrun; can't write " << size << " bytes";
return 0;
}
if (write_all(rss->fd, data, write_now) == -1) {
break;
}
size_t written = 0;
while (size > 0) {
// Move to the next range as needed.
if (current_range_left_ == 0) {
if (next_range_ < tgt_.count) {
off64_t offset = static_cast<off64_t>(tgt_.pos[next_range_ * 2]) * BLOCKSIZE;
current_range_left_ =
(tgt_.pos[next_range_ * 2 + 1] - tgt_.pos[next_range_ * 2]) * BLOCKSIZE;
next_range_++;
if (!discard_blocks(fd_, offset, current_range_left_)) {
break;
}
data += write_now;
size -= write_now;
rss->p_remain -= write_now;
written += write_now;
if (rss->p_remain == 0) {
// Move to the next block.
++rss->p_block;
if (rss->p_block < rss->tgt.count) {
rss->p_remain =
(rss->tgt.pos[rss->p_block * 2 + 1] - rss->tgt.pos[rss->p_block * 2]) * BLOCKSIZE;
off64_t offset = static_cast<off64_t>(rss->tgt.pos[rss->p_block * 2]) * BLOCKSIZE;
if (!discard_blocks(rss->fd, offset, rss->p_remain)) {
if (!check_lseek(fd_, offset, SEEK_SET)) {
break;
}
} else {
// We can't write any more; return how many bytes have been written so far.
break;
}
}
if (!check_lseek(rss->fd, offset, SEEK_SET)) {
break;
}
size_t write_now = size;
if (current_range_left_ < write_now) {
write_now = current_range_left_;
}
} else {
// We can't write any more; return how many bytes have been written so far.
if (write_all(fd_, data, write_now) == -1) {
break;
}
data += write_now;
size -= write_now;
current_range_left_ -= write_now;
written += write_now;
}
return written;
}
return written;
}
// All of the data for all the 'new' transfers is contained in one
// file in the update package, concatenated together in the order in
// which transfers.list will need it. We want to stream it out of the
// archive (it's compressed) without writing it to a temp file, but we
// can't write each section until it's that transfer's turn to go.
//
// To achieve this, we expand the new data from the archive in a
// background thread, and block that threads 'receive uncompressed
// data' function until the main thread has reached a point where we
// want some new data to be written. We signal the background thread
// with the destination for the data and block the main thread,
// waiting for the background thread to complete writing that section.
// Then it signals the main thread to wake up and goes back to
// blocking waiting for a transfer.
//
// NewThreadInfo is the struct used to pass information back and forth
// between the two threads. When the main thread wants some data
// written, it sets rss to the destination location and signals the
// condition. When the background thread is done writing, it clears
// rss and signals the condition again.
private:
// The input data.
int fd_;
// The destination for the data.
const RangeSet& tgt_;
// The next range that we should write to.
size_t next_range_;
// The number of bytes to write before moving to the next range.
size_t current_range_left_;
};
/**
* All of the data for all the 'new' transfers is contained in one file in the update package,
* concatenated together in the order in which transfers.list will need it. We want to stream it out
* of the archive (it's compressed) without writing it to a temp file, but we can't write each
* section until it's that transfer's turn to go.
*
* To achieve this, we expand the new data from the archive in a background thread, and block that
* threads 'receive uncompressed data' function until the main thread has reached a point where we
* want some new data to be written. We signal the background thread with the destination for the
* data and block the main thread, waiting for the background thread to complete writing that
* section. Then it signals the main thread to wake up and goes back to blocking waiting for a
* transfer.
*
* NewThreadInfo is the struct used to pass information back and forth between the two threads. When
* the main thread wants some data written, it sets writer to the destination location and signals
* the condition. When the background thread is done writing, it clears writer and signals the
* condition again.
*/
struct NewThreadInfo {
ZipArchiveHandle za;
ZipEntry entry;
ZipArchiveHandle za;
ZipEntry entry;
RangeSinkState* rss;
RangeSinkWriter* writer;
pthread_mutex_t mu;
pthread_cond_t cv;
pthread_mutex_t mu;
pthread_cond_t cv;
};
static bool receive_new_data(const uint8_t* data, size_t size, void* cookie) {
NewThreadInfo* nti = reinterpret_cast<NewThreadInfo*>(cookie);
NewThreadInfo* nti = static_cast<NewThreadInfo*>(cookie);
while (size > 0) {
// Wait for nti->rss to be non-null, indicating some of this
// data is wanted.
pthread_mutex_lock(&nti->mu);
while (nti->rss == nullptr) {
pthread_cond_wait(&nti->cv, &nti->mu);
}
pthread_mutex_unlock(&nti->mu);
// At this point nti->rss is set, and we own it. The main
// thread is waiting for it to disappear from nti.
size_t written = RangeSinkWrite(data, size, nti->rss);
data += written;
size -= written;
if (nti->rss->p_block == nti->rss->tgt.count) {
// we have written all the bytes desired by this rss.
pthread_mutex_lock(&nti->mu);
nti->rss = nullptr;
pthread_cond_broadcast(&nti->cv);
pthread_mutex_unlock(&nti->mu);
}
while (size > 0) {
// Wait for nti->writer to be non-null, indicating some of this data is wanted.
pthread_mutex_lock(&nti->mu);
while (nti->writer == nullptr) {
pthread_cond_wait(&nti->cv, &nti->mu);
}
pthread_mutex_unlock(&nti->mu);
return true;
// At this point nti->writer is set, and we own it. The main thread is waiting for it to
// disappear from nti.
size_t written = nti->writer->Write(data, size);
data += written;
size -= written;
if (nti->writer->Finished()) {
// We have written all the bytes desired by this writer.
pthread_mutex_lock(&nti->mu);
nti->writer = nullptr;
pthread_cond_broadcast(&nti->cv);
pthread_mutex_unlock(&nti->mu);
}
}
return true;
}
static void* unzip_new_data(void* cookie) {
@ -380,28 +390,26 @@ static int ReadBlocks(const RangeSet& src, std::vector<uint8_t>& buffer, int fd)
}
static int WriteBlocks(const RangeSet& tgt, const std::vector<uint8_t>& buffer, int fd) {
const uint8_t* data = buffer.data();
size_t p = 0;
for (size_t i = 0; i < tgt.count; ++i) {
off64_t offset = static_cast<off64_t>(tgt.pos[i * 2]) * BLOCKSIZE;
size_t size = (tgt.pos[i * 2 + 1] - tgt.pos[i * 2]) * BLOCKSIZE;
if (!discard_blocks(fd, offset, size)) {
return -1;
}
if (!check_lseek(fd, offset, SEEK_SET)) {
return -1;
}
if (write_all(fd, data + p, size) == -1) {
return -1;
}
p += size;
size_t written = 0;
for (size_t i = 0; i < tgt.count; ++i) {
off64_t offset = static_cast<off64_t>(tgt.pos[i * 2]) * BLOCKSIZE;
size_t size = (tgt.pos[i * 2 + 1] - tgt.pos[i * 2]) * BLOCKSIZE;
if (!discard_blocks(fd, offset, size)) {
return -1;
}
return 0;
if (!check_lseek(fd, offset, SEEK_SET)) {
return -1;
}
if (write_all(fd, buffer.data() + written, size) == -1) {
return -1;
}
written += size;
}
return 0;
}
// Parameters for transfer list command functions
@ -1214,45 +1222,31 @@ static int PerformCommandZero(CommandParameters& params) {
}
static int PerformCommandNew(CommandParameters& params) {
if (params.cpos >= params.tokens.size()) {
LOG(ERROR) << "missing target blocks for new";
return -1;
}
if (params.cpos >= params.tokens.size()) {
LOG(ERROR) << "missing target blocks for new";
return -1;
RangeSet tgt = parse_range(params.tokens[params.cpos++]);
if (params.canwrite) {
LOG(INFO) << " writing " << tgt.size << " blocks of new data";
RangeSinkWriter writer(params.fd, tgt);
pthread_mutex_lock(&params.nti.mu);
params.nti.writer = &writer;
pthread_cond_broadcast(&params.nti.cv);
while (params.nti.writer != nullptr) {
pthread_cond_wait(&params.nti.cv, &params.nti.mu);
}
RangeSet tgt = parse_range(params.tokens[params.cpos++]);
pthread_mutex_unlock(&params.nti.mu);
}
if (params.canwrite) {
LOG(INFO) << " writing " << tgt.size << " blocks of new data";
params.written += tgt.size;
RangeSinkState rss(tgt);
rss.fd = params.fd;
rss.p_block = 0;
rss.p_remain = (tgt.pos[1] - tgt.pos[0]) * BLOCKSIZE;
off64_t offset = static_cast<off64_t>(tgt.pos[0]) * BLOCKSIZE;
if (!discard_blocks(params.fd, offset, tgt.size * BLOCKSIZE)) {
return -1;
}
if (!check_lseek(params.fd, offset, SEEK_SET)) {
return -1;
}
pthread_mutex_lock(&params.nti.mu);
params.nti.rss = &rss;
pthread_cond_broadcast(&params.nti.cv);
while (params.nti.rss) {
pthread_cond_wait(&params.nti.cv, &params.nti.mu);
}
pthread_mutex_unlock(&params.nti.mu);
}
params.written += tgt.size;
return 0;
return 0;
}
static int PerformCommandDiff(CommandParameters& params) {
@ -1295,40 +1289,28 @@ static int PerformCommandDiff(CommandParameters& params) {
LOG(INFO) << "patching " << blocks << " blocks to " << tgt.size;
Value patch_value(
VAL_BLOB, std::string(reinterpret_cast<const char*>(params.patch_start + offset), len));
RangeSinkState rss(tgt);
rss.fd = params.fd;
rss.p_block = 0;
rss.p_remain = (tgt.pos[1] - tgt.pos[0]) * BLOCKSIZE;
off64_t offset = static_cast<off64_t>(tgt.pos[0]) * BLOCKSIZE;
if (!discard_blocks(params.fd, offset, rss.p_remain)) {
return -1;
}
if (!check_lseek(params.fd, offset, SEEK_SET)) {
return -1;
}
RangeSinkWriter writer(params.fd, tgt);
if (params.cmdname[0] == 'i') { // imgdiff
if (ApplyImagePatch(
params.buffer.data(), blocks * BLOCKSIZE, &patch_value,
std::bind(&RangeSinkWrite, std::placeholders::_1, std::placeholders::_2, &rss),
nullptr, nullptr) != 0) {
if (ApplyImagePatch(params.buffer.data(), blocks * BLOCKSIZE, &patch_value,
std::bind(&RangeSinkWriter::Write, &writer, std::placeholders::_1,
std::placeholders::_2),
nullptr, nullptr) != 0) {
LOG(ERROR) << "Failed to apply image patch.";
return -1;
}
} else {
if (ApplyBSDiffPatch(
params.buffer.data(), blocks * BLOCKSIZE, &patch_value, 0,
std::bind(&RangeSinkWrite, std::placeholders::_1, std::placeholders::_2, &rss),
nullptr) != 0) {
if (ApplyBSDiffPatch(params.buffer.data(), blocks * BLOCKSIZE, &patch_value, 0,
std::bind(&RangeSinkWriter::Write, &writer, std::placeholders::_1,
std::placeholders::_2),
nullptr) != 0) {
LOG(ERROR) << "Failed to apply bsdiff patch.";
return -1;
}
}
// We expect the output of the patcher to fill the tgt ranges exactly.
if (rss.p_block != tgt.count || rss.p_remain != 0) {
if (!writer.Finished()) {
LOG(ERROR) << "range sink underrun?";
}
} else {