Merge "Checkpoints: Support validation and roll forward on fail"
This commit is contained in:
commit
9ff07121ff
1 changed files with 101 additions and 43 deletions
144
Checkpoint.cpp
144
Checkpoint.cpp
|
@ -229,6 +229,7 @@ struct log_sector {
|
|||
uint32_t magic;
|
||||
uint32_t count;
|
||||
uint32_t sequence;
|
||||
uint64_t sector0;
|
||||
struct log_entry entries[];
|
||||
} __attribute__((packed));
|
||||
|
||||
|
@ -289,62 +290,119 @@ void crc32(const void* data, size_t n_bytes, uint32_t* crc) {
|
|||
|
||||
} // namespace
|
||||
|
||||
static void read(std::fstream& device, std::vector<log_entry> const& logs, sector_t sector,
|
||||
char* buffer) {
|
||||
for (auto l = logs.rbegin(); l != logs.rend(); l++)
|
||||
if (sector >= l->source && (sector - l->source) * kSectorSize < l->size)
|
||||
sector = sector - l->source + l->dest;
|
||||
|
||||
device.seekg(sector * kSectorSize);
|
||||
device.read(buffer, kBlockSize);
|
||||
}
|
||||
|
||||
static std::vector<char> read(std::fstream& device, std::vector<log_entry> const& logs,
|
||||
bool validating, sector_t sector, uint32_t size) {
|
||||
if (!validating) {
|
||||
std::vector<char> buffer(size);
|
||||
device.seekg(sector * kSectorSize);
|
||||
device.read(&buffer[0], size);
|
||||
return buffer;
|
||||
}
|
||||
|
||||
// Crude approach at first where we do this sector by sector and just scan
|
||||
// the entire logs for remappings each time
|
||||
std::vector<char> buffer(size);
|
||||
|
||||
for (uint32_t i = 0; i < size; i += kBlockSize, sector += kBlockSize / kSectorSize)
|
||||
read(device, logs, sector, &buffer[i]);
|
||||
|
||||
return buffer;
|
||||
}
|
||||
|
||||
Status cp_restoreCheckpoint(const std::string& blockDevice) {
|
||||
LOG(INFO) << "Restoring checkpoint on " << blockDevice;
|
||||
std::fstream device(blockDevice, std::ios::binary | std::ios::in | std::ios::out);
|
||||
if (!device) {
|
||||
PLOG(ERROR) << "Cannot open " << blockDevice;
|
||||
return Status::fromExceptionCode(errno, ("Cannot open " + blockDevice).c_str());
|
||||
}
|
||||
alignas(alignof(log_sector)) char ls_buffer[kBlockSize];
|
||||
device.read(ls_buffer, kBlockSize);
|
||||
log_sector& ls = *reinterpret_cast<log_sector*>(ls_buffer);
|
||||
if (ls.magic != kMagic) {
|
||||
LOG(ERROR) << "No magic";
|
||||
return Status::fromExceptionCode(EINVAL, "No magic");
|
||||
}
|
||||
bool validating = true;
|
||||
std::string action = "Validating";
|
||||
|
||||
LOG(INFO) << "Restoring " << ls.sequence << " log sectors";
|
||||
for (;;) {
|
||||
std::vector<log_entry> logs;
|
||||
Status status = Status::ok();
|
||||
|
||||
for (int sequence = ls.sequence; sequence >= 0; sequence--) {
|
||||
device.seekg(0);
|
||||
device.read(ls_buffer, kBlockSize);
|
||||
ls = *reinterpret_cast<log_sector*>(ls_buffer);
|
||||
LOG(INFO) << action << " checkpoint on " << blockDevice;
|
||||
std::fstream device(blockDevice, std::ios::binary | std::ios::in | std::ios::out);
|
||||
if (!device) {
|
||||
PLOG(ERROR) << "Cannot open " << blockDevice;
|
||||
return Status::fromExceptionCode(errno, ("Cannot open " + blockDevice).c_str());
|
||||
}
|
||||
auto buffer = read(device, logs, validating, 0, kBlockSize);
|
||||
log_sector& ls = *reinterpret_cast<log_sector*>(&buffer[0]);
|
||||
if (ls.magic != kMagic) {
|
||||
LOG(ERROR) << "No magic!";
|
||||
LOG(ERROR) << "No magic";
|
||||
return Status::fromExceptionCode(EINVAL, "No magic");
|
||||
}
|
||||
|
||||
if ((int)ls.sequence != sequence) {
|
||||
LOG(ERROR) << "Expecting log sector " << sequence << " but got " << ls.sequence;
|
||||
return Status::fromExceptionCode(
|
||||
EINVAL, ("Expecting log sector " + std::to_string(sequence) + " but got " +
|
||||
std::to_string(ls.sequence))
|
||||
.c_str());
|
||||
}
|
||||
LOG(INFO) << action << " " << ls.sequence << " log sectors";
|
||||
|
||||
LOG(INFO) << "Restoring from log sector " << ls.sequence;
|
||||
|
||||
for (log_entry* le = &ls.entries[ls.count - 1]; le >= ls.entries; --le) {
|
||||
LOG(INFO) << "Restoring " << le->size << " bytes from sector " << le->dest << " to "
|
||||
<< le->source << " with checksum " << std::hex << le->checksum;
|
||||
std::vector<char> buffer(le->size);
|
||||
device.seekg(le->dest * kSectorSize);
|
||||
device.read(&buffer[0], le->size);
|
||||
|
||||
uint32_t checksum = le->source / (kBlockSize / kSectorSize);
|
||||
for (size_t i = 0; i < le->size; i += kBlockSize) {
|
||||
crc32(&buffer[i], kBlockSize, &checksum);
|
||||
for (int sequence = ls.sequence; sequence >= 0 && status.isOk(); sequence--) {
|
||||
auto buffer = read(device, logs, validating, 0, kBlockSize);
|
||||
log_sector& ls = *reinterpret_cast<log_sector*>(&buffer[0]);
|
||||
if (ls.magic != kMagic) {
|
||||
LOG(ERROR) << "No magic!";
|
||||
status = Status::fromExceptionCode(EINVAL, "No magic");
|
||||
break;
|
||||
}
|
||||
|
||||
if (le->checksum && checksum != le->checksum) {
|
||||
LOG(ERROR) << "Checksums don't match " << std::hex << checksum;
|
||||
return Status::fromExceptionCode(EINVAL, "Checksums don't match");
|
||||
if ((int)ls.sequence != sequence) {
|
||||
LOG(ERROR) << "Expecting log sector " << sequence << " but got " << ls.sequence;
|
||||
status = Status::fromExceptionCode(
|
||||
EINVAL, ("Expecting log sector " + std::to_string(sequence) + " but got " +
|
||||
std::to_string(ls.sequence))
|
||||
.c_str());
|
||||
break;
|
||||
}
|
||||
|
||||
device.seekg(le->source * kSectorSize);
|
||||
device.write(&buffer[0], le->size);
|
||||
LOG(INFO) << action << " from log sector " << ls.sequence;
|
||||
|
||||
for (log_entry* le = &ls.entries[ls.count - 1]; le >= ls.entries; --le) {
|
||||
LOG(INFO) << action << " " << le->size << " bytes from sector " << le->dest
|
||||
<< " to " << le->source << " with checksum " << std::hex << le->checksum;
|
||||
auto buffer = read(device, logs, validating, le->dest, le->size);
|
||||
uint32_t checksum = le->source / (kBlockSize / kSectorSize);
|
||||
for (size_t i = 0; i < le->size; i += kBlockSize) {
|
||||
crc32(&buffer[i], kBlockSize, &checksum);
|
||||
}
|
||||
|
||||
if (le->checksum && checksum != le->checksum) {
|
||||
LOG(ERROR) << "Checksums don't match " << std::hex << checksum;
|
||||
status = Status::fromExceptionCode(EINVAL, "Checksums don't match");
|
||||
break;
|
||||
}
|
||||
|
||||
logs.push_back(*le);
|
||||
|
||||
if (!validating) {
|
||||
device.seekg(le->source * kSectorSize);
|
||||
device.write(&buffer[0], le->size);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!status.isOk()) {
|
||||
if (!validating) {
|
||||
LOG(ERROR) << "Checkpoint restore failed even though checkpoint validation passed";
|
||||
return status;
|
||||
}
|
||||
|
||||
LOG(WARNING) << "Checkpoint validation failed - attempting to roll forward";
|
||||
auto buffer = read(device, logs, false, ls.sector0, kBlockSize);
|
||||
device.seekg(0);
|
||||
device.write(&buffer[0], kBlockSize);
|
||||
return Status::ok();
|
||||
}
|
||||
|
||||
if (!validating) break;
|
||||
|
||||
validating = false;
|
||||
action = "Restoring";
|
||||
}
|
||||
|
||||
return Status::ok();
|
||||
|
|
Loading…
Reference in a new issue