Merge "logd: remove min heap in SerializedFlushToState" am: b92ac53aeb
am: eb2b5408b6
am: 0a01b6da1b
Original change: https://android-review.googlesource.com/c/platform/system/core/+/1431987 Change-Id: Ic5a1f173bcae1248cb71bd7e7510277339022895
This commit is contained in:
commit
009660b5b4
5 changed files with 69 additions and 57 deletions
|
@ -16,6 +16,8 @@
|
|||
|
||||
#include "SerializedFlushToState.h"
|
||||
|
||||
#include <limits>
|
||||
|
||||
#include <android-base/logging.h>
|
||||
|
||||
SerializedFlushToState::SerializedFlushToState(uint64_t start, LogMask log_mask)
|
||||
|
@ -63,14 +65,13 @@ void SerializedFlushToState::CreateLogPosition(log_id_t log_id) {
|
|||
log_positions_[log_id].emplace(log_position);
|
||||
}
|
||||
|
||||
void SerializedFlushToState::AddMinHeapEntry(log_id_t log_id) {
|
||||
void SerializedFlushToState::UpdateLogsNeeded(log_id_t log_id) {
|
||||
auto& buffer_it = log_positions_[log_id]->buffer_it;
|
||||
auto read_offset = log_positions_[log_id]->read_offset;
|
||||
|
||||
// If there is another log to read in this buffer, add it to the min heap.
|
||||
// If there is another log to read in this buffer, let it be read.
|
||||
if (read_offset < buffer_it->write_offset()) {
|
||||
auto* entry = buffer_it->log_entry(read_offset);
|
||||
min_heap_.emplace(log_id, entry);
|
||||
logs_needed_from_next_position_[log_id] = false;
|
||||
} else if (read_offset == buffer_it->write_offset()) {
|
||||
// If there are no more logs to read in this buffer and it's the last buffer, then
|
||||
// set logs_needed_from_next_position_ to wait until more logs get logged.
|
||||
|
@ -85,8 +86,7 @@ void SerializedFlushToState::AddMinHeapEntry(log_id_t log_id) {
|
|||
if (buffer_it->write_offset() == 0) {
|
||||
logs_needed_from_next_position_[log_id] = true;
|
||||
} else {
|
||||
auto* entry = buffer_it->log_entry(0);
|
||||
min_heap_.emplace(log_id, entry);
|
||||
logs_needed_from_next_position_[log_id] = false;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
@ -106,24 +106,41 @@ void SerializedFlushToState::CheckForNewLogs() {
|
|||
}
|
||||
CreateLogPosition(i);
|
||||
}
|
||||
logs_needed_from_next_position_[i] = false;
|
||||
// If it wasn't possible to insert, logs_needed_from_next_position will be set back to true.
|
||||
AddMinHeapEntry(i);
|
||||
UpdateLogsNeeded(i);
|
||||
}
|
||||
}
|
||||
|
||||
MinHeapElement SerializedFlushToState::PopNextUnreadLog() {
|
||||
auto top = min_heap_.top();
|
||||
min_heap_.pop();
|
||||
bool SerializedFlushToState::HasUnreadLogs() {
|
||||
CheckForNewLogs();
|
||||
log_id_for_each(i) {
|
||||
if (log_positions_[i] && !logs_needed_from_next_position_[i]) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
auto* entry = top.entry;
|
||||
auto log_id = top.log_id;
|
||||
LogWithId SerializedFlushToState::PopNextUnreadLog() {
|
||||
uint64_t min_sequence = std::numeric_limits<uint64_t>::max();
|
||||
log_id_t log_id;
|
||||
const SerializedLogEntry* entry = nullptr;
|
||||
log_id_for_each(i) {
|
||||
if (!log_positions_[i] || logs_needed_from_next_position_[i]) {
|
||||
continue;
|
||||
}
|
||||
if (log_positions_[i]->log_entry()->sequence() < min_sequence) {
|
||||
log_id = i;
|
||||
entry = log_positions_[i]->log_entry();
|
||||
min_sequence = entry->sequence();
|
||||
}
|
||||
}
|
||||
CHECK_NE(nullptr, entry);
|
||||
|
||||
log_positions_[log_id]->read_offset += entry->total_len();
|
||||
|
||||
logs_needed_from_next_position_[log_id] = true;
|
||||
|
||||
return top;
|
||||
return {log_id, entry};
|
||||
}
|
||||
|
||||
void SerializedFlushToState::Prune(log_id_t log_id,
|
||||
|
@ -133,25 +150,12 @@ void SerializedFlushToState::Prune(log_id_t log_id,
|
|||
return;
|
||||
}
|
||||
|
||||
// // Decrease the ref count since we're deleting our reference.
|
||||
// Decrease the ref count since we're deleting our reference.
|
||||
buffer_it->DecReaderRefCount();
|
||||
|
||||
// Delete in the reference.
|
||||
log_positions_[log_id].reset();
|
||||
|
||||
// Remove the MinHeapElement referencing log_id, if it exists, but retain the others.
|
||||
std::vector<MinHeapElement> old_elements;
|
||||
while (!min_heap_.empty()) {
|
||||
auto& element = min_heap_.top();
|
||||
if (element.log_id != log_id) {
|
||||
old_elements.emplace_back(element);
|
||||
}
|
||||
min_heap_.pop();
|
||||
}
|
||||
for (auto&& element : old_elements) {
|
||||
min_heap_.emplace(element);
|
||||
}
|
||||
|
||||
// Finally set logs_needed_from_next_position_, so CheckForNewLogs() will re-create the
|
||||
// log_position_ object during the next read.
|
||||
logs_needed_from_next_position_[log_id] = true;
|
||||
|
|
|
@ -27,26 +27,19 @@
|
|||
struct LogPosition {
|
||||
std::list<SerializedLogChunk>::iterator buffer_it;
|
||||
int read_offset;
|
||||
|
||||
const SerializedLogEntry* log_entry() const { return buffer_it->log_entry(read_offset); }
|
||||
};
|
||||
|
||||
struct MinHeapElement {
|
||||
MinHeapElement(log_id_t log_id, const SerializedLogEntry* entry)
|
||||
: log_id(log_id), entry(entry) {}
|
||||
struct LogWithId {
|
||||
log_id_t log_id;
|
||||
const SerializedLogEntry* entry;
|
||||
// The change of comparison operators is intentional, std::priority_queue uses operator<() to
|
||||
// compare but creates a max heap. Since we want a min heap, we return the opposite result.
|
||||
bool operator<(const MinHeapElement& rhs) const {
|
||||
return entry->sequence() > rhs.entry->sequence();
|
||||
}
|
||||
};
|
||||
|
||||
// This class tracks the specific point where a FlushTo client has read through the logs. It
|
||||
// directly references the std::list<> iterators from the parent SerializedLogBuffer and the offset
|
||||
// into each log chunk where it has last read. All interactions with this class, except for its
|
||||
// construction, must be done with SerializedLogBuffer::lock_ held. No log chunks that it
|
||||
// references may be pruned, which is handled by ensuring prune does not touch any log chunk with
|
||||
// highest sequence number greater or equal to start().
|
||||
// construction, must be done with SerializedLogBuffer::lock_ held.
|
||||
class SerializedFlushToState : public FlushToState {
|
||||
public:
|
||||
// Initializes this state object. For each log buffer set in log_mask, this sets
|
||||
|
@ -61,31 +54,29 @@ class SerializedFlushToState : public FlushToState {
|
|||
if (logs_ == nullptr) logs_ = logs;
|
||||
}
|
||||
|
||||
bool HasUnreadLogs() {
|
||||
CheckForNewLogs();
|
||||
return !min_heap_.empty();
|
||||
}
|
||||
// Updates the state of log_positions_ and logs_needed_from_next_position_ then returns true if
|
||||
// there are any unread logs, false otherwise.
|
||||
bool HasUnreadLogs();
|
||||
|
||||
// Pops the next unread log from the min heap and sets logs_needed_from_next_position_ to
|
||||
// indicate that we're waiting for more logs from the associated log buffer.
|
||||
MinHeapElement PopNextUnreadLog();
|
||||
// Returns the next unread log and sets logs_needed_from_next_position_ to indicate that we're
|
||||
// waiting for more logs from the associated log buffer.
|
||||
LogWithId PopNextUnreadLog();
|
||||
|
||||
// If the parent log buffer prunes logs, the reference that this class contains may become
|
||||
// invalid, so this must be called first to drop the reference to buffer_it, if any.
|
||||
void Prune(log_id_t log_id, const std::list<SerializedLogChunk>::iterator& buffer_it);
|
||||
|
||||
private:
|
||||
// If there is a log in the serialized log buffer for `log_id` at the read_offset, add it to the
|
||||
// min heap for reading, otherwise set logs_needed_from_next_position_ to indicate that we're
|
||||
// waiting for the next log.
|
||||
void AddMinHeapEntry(log_id_t log_id);
|
||||
// Set logs_needed_from_next_position_[i] to indicate if log_positions_[i] points to an unread
|
||||
// log or to the point at which the next log will appear.
|
||||
void UpdateLogsNeeded(log_id_t log_id);
|
||||
|
||||
// Create a LogPosition object for the given log_id by searching through the log chunks for the
|
||||
// first chunk and then first log entry within that chunk that is greater or equal to start().
|
||||
void CreateLogPosition(log_id_t log_id);
|
||||
|
||||
// Checks to see if any log buffers set in logs_needed_from_next_position_ have new logs and
|
||||
// calls AddMinHeapEntry() if so.
|
||||
// calls UpdateLogsNeeded() if so.
|
||||
void CheckForNewLogs();
|
||||
|
||||
std::list<SerializedLogChunk>* logs_ = nullptr;
|
||||
|
@ -97,7 +88,4 @@ class SerializedFlushToState : public FlushToState {
|
|||
// next_log_position == logs_write_position_)`. These will be re-checked in each
|
||||
// loop in case new logs came in.
|
||||
std::bitset<LOG_ID_MAX> logs_needed_from_next_position_ = {};
|
||||
// A min heap that has up to one entry per log buffer, sorted by sequence number, of the next
|
||||
// element that this reader should read.
|
||||
std::priority_queue<MinHeapElement> min_heap_;
|
||||
};
|
||||
|
|
|
@ -287,4 +287,21 @@ TEST_F(SerializedFlushToStateTest, no_dangling_references) {
|
|||
EXPECT_EQ(second_chunk->reader_ref_count(), 1U);
|
||||
|
||||
EXPECT_FALSE(state.HasUnreadLogs());
|
||||
}
|
||||
}
|
||||
|
||||
TEST(SerializedFlushToState, Prune) {
|
||||
auto chunk = SerializedLogChunk{kChunkSize};
|
||||
chunk.Log(1, log_time(), 0, 1, 1, "abc", 3);
|
||||
chunk.Log(2, log_time(), 0, 1, 1, "abc", 3);
|
||||
chunk.Log(3, log_time(), 0, 1, 1, "abc", 3);
|
||||
chunk.FinishWriting();
|
||||
|
||||
std::list<SerializedLogChunk> log_chunks[LOG_ID_MAX];
|
||||
log_chunks[LOG_ID_MAIN].emplace_back(std::move(chunk));
|
||||
|
||||
auto state = SerializedFlushToState{1, kLogMaskAll};
|
||||
state.InitializeLogs(log_chunks);
|
||||
ASSERT_TRUE(state.HasUnreadLogs());
|
||||
|
||||
state.Prune(LOG_ID_MAIN, log_chunks[LOG_ID_MAIN].begin());
|
||||
}
|
||||
|
|
|
@ -211,7 +211,7 @@ bool SerializedLogBuffer::FlushTo(
|
|||
state.InitializeLogs(logs_);
|
||||
|
||||
while (state.HasUnreadLogs()) {
|
||||
MinHeapElement top = state.PopNextUnreadLog();
|
||||
LogWithId top = state.PopNextUnreadLog();
|
||||
auto* entry = top.entry;
|
||||
auto log_id = top.log_id;
|
||||
|
||||
|
|
|
@ -18,6 +18,8 @@
|
|||
|
||||
#include <sys/types.h>
|
||||
|
||||
#include <android-base/logging.h>
|
||||
|
||||
#include "LogWriter.h"
|
||||
#include "SerializedData.h"
|
||||
#include "SerializedLogEntry.h"
|
||||
|
@ -55,6 +57,7 @@ class SerializedLogChunk {
|
|||
}
|
||||
|
||||
const SerializedLogEntry* log_entry(int offset) const {
|
||||
CHECK(writer_active_ || reader_ref_count_ > 0);
|
||||
return reinterpret_cast<const SerializedLogEntry*>(data() + offset);
|
||||
}
|
||||
const uint8_t* data() const { return contents_.data(); }
|
||||
|
|
Loading…
Reference in a new issue