logd: remove LogBufferElement dependency of LogReaderThread
In the future, not all log buffers will be implemented in terms of LogBufferElement. Test: build Change-Id: I5cf0d01414857b1bfa08c92a4f8035b43ef2aad7
This commit is contained in:
parent
3e61a1368a
commit
70fadea36f
6 changed files with 55 additions and 40 deletions
|
@ -21,11 +21,9 @@
|
|||
#include <functional>
|
||||
|
||||
#include <log/log.h>
|
||||
#include <sysutils/SocketClient.h>
|
||||
#include <log/log_read.h>
|
||||
|
||||
#include "LogBufferElement.h"
|
||||
|
||||
class LogWriter;
|
||||
#include "LogWriter.h"
|
||||
|
||||
enum class FilterResult {
|
||||
kSkip,
|
||||
|
@ -45,10 +43,11 @@ class LogBuffer {
|
|||
// valid message was from the same source so we can differentiate chatty
|
||||
// filter types (identical or expired)
|
||||
static const uint64_t FLUSH_ERROR = 0;
|
||||
virtual uint64_t FlushTo(
|
||||
LogWriter* writer, uint64_t start,
|
||||
pid_t* last_tid, // nullable
|
||||
const std::function<FilterResult(const LogBufferElement* element)>& filter) = 0;
|
||||
virtual uint64_t FlushTo(LogWriter* writer, uint64_t start,
|
||||
pid_t* last_tid, // nullable
|
||||
const std::function<FilterResult(log_id_t log_id, pid_t pid,
|
||||
uint64_t sequence, log_time realtime,
|
||||
uint16_t dropped_count)>& filter) = 0;
|
||||
|
||||
virtual bool Clear(log_id_t id, uid_t uid) = 0;
|
||||
virtual unsigned long GetSize(log_id_t id) = 0;
|
||||
|
|
|
@ -171,25 +171,27 @@ bool LogReader::onDataAvailable(SocketClient* cli) {
|
|||
if (start != log_time::EPOCH) {
|
||||
bool start_time_set = false;
|
||||
uint64_t last = sequence;
|
||||
auto log_find_start = [pid, logMask, start, &sequence, &start_time_set,
|
||||
&last](const LogBufferElement* element) -> FilterResult {
|
||||
if (pid && pid != element->getPid()) {
|
||||
auto log_find_start = [pid, logMask, start, &sequence, &start_time_set, &last](
|
||||
log_id_t element_log_id, pid_t element_pid,
|
||||
uint64_t element_sequence, log_time element_realtime,
|
||||
uint16_t) -> FilterResult {
|
||||
if (pid && pid != element_pid) {
|
||||
return FilterResult::kSkip;
|
||||
}
|
||||
if ((logMask & (1 << element->getLogId())) == 0) {
|
||||
if ((logMask & (1 << element_log_id)) == 0) {
|
||||
return FilterResult::kSkip;
|
||||
}
|
||||
if (start == element->getRealTime()) {
|
||||
sequence = element->getSequence();
|
||||
if (start == element_realtime) {
|
||||
sequence = element_sequence;
|
||||
start_time_set = true;
|
||||
return FilterResult::kStop;
|
||||
} else {
|
||||
if (start < element->getRealTime()) {
|
||||
if (start < element_realtime) {
|
||||
sequence = last;
|
||||
start_time_set = true;
|
||||
return FilterResult::kStop;
|
||||
}
|
||||
last = element->getSequence();
|
||||
last = element_sequence;
|
||||
}
|
||||
return FilterResult::kSkip;
|
||||
};
|
||||
|
|
|
@ -75,13 +75,21 @@ void LogReaderThread::ThreadFunction() {
|
|||
|
||||
if (tail_) {
|
||||
log_buffer_->FlushTo(writer_.get(), start, nullptr,
|
||||
std::bind(&LogReaderThread::FilterFirstPass, this, _1));
|
||||
[this](log_id_t log_id, pid_t pid, uint64_t sequence,
|
||||
log_time realtime, uint16_t dropped_count) {
|
||||
return FilterFirstPass(log_id, pid, sequence, realtime,
|
||||
dropped_count);
|
||||
});
|
||||
leading_dropped_ =
|
||||
true; // TODO: Likely a bug, if leading_dropped_ was not true before calling
|
||||
// flushTo(), then it should not be reset to true after.
|
||||
}
|
||||
start = log_buffer_->FlushTo(writer_.get(), start, last_tid_,
|
||||
std::bind(&LogReaderThread::FilterSecondPass, this, _1));
|
||||
[this](log_id_t log_id, pid_t pid, uint64_t sequence,
|
||||
log_time realtime, uint16_t dropped_count) {
|
||||
return FilterSecondPass(log_id, pid, sequence, realtime,
|
||||
dropped_count);
|
||||
});
|
||||
|
||||
// We only ignore entries before the original start time for the first flushTo(), if we
|
||||
// get entries after this first flush before the original start time, then the client
|
||||
|
@ -123,22 +131,23 @@ void LogReaderThread::ThreadFunction() {
|
|||
}
|
||||
|
||||
// A first pass to count the number of elements
|
||||
FilterResult LogReaderThread::FilterFirstPass(const LogBufferElement* element) {
|
||||
FilterResult LogReaderThread::FilterFirstPass(log_id_t log_id, pid_t pid, uint64_t sequence,
|
||||
log_time realtime, uint16_t dropped_count) {
|
||||
auto lock = std::lock_guard{reader_list_->reader_threads_lock()};
|
||||
|
||||
if (leading_dropped_) {
|
||||
if (element->getDropped()) {
|
||||
if (dropped_count) {
|
||||
return FilterResult::kSkip;
|
||||
}
|
||||
leading_dropped_ = false;
|
||||
}
|
||||
|
||||
if (count_ == 0) {
|
||||
start_ = element->getSequence();
|
||||
start_ = sequence;
|
||||
}
|
||||
|
||||
if ((!pid_ || pid_ == element->getPid()) && IsWatching(element->getLogId()) &&
|
||||
(start_time_ == log_time::EPOCH || start_time_ <= element->getRealTime())) {
|
||||
if ((!pid_ || pid_ == pid) && IsWatching(log_id) &&
|
||||
(start_time_ == log_time::EPOCH || start_time_ <= realtime)) {
|
||||
++count_;
|
||||
}
|
||||
|
||||
|
@ -146,18 +155,19 @@ FilterResult LogReaderThread::FilterFirstPass(const LogBufferElement* element) {
|
|||
}
|
||||
|
||||
// A second pass to send the selected elements
|
||||
FilterResult LogReaderThread::FilterSecondPass(const LogBufferElement* element) {
|
||||
FilterResult LogReaderThread::FilterSecondPass(log_id_t log_id, pid_t pid, uint64_t sequence,
|
||||
log_time realtime, uint16_t dropped_count) {
|
||||
auto lock = std::lock_guard{reader_list_->reader_threads_lock()};
|
||||
|
||||
start_ = element->getSequence();
|
||||
start_ = sequence;
|
||||
|
||||
if (skip_ahead_[element->getLogId()]) {
|
||||
skip_ahead_[element->getLogId()]--;
|
||||
if (skip_ahead_[log_id]) {
|
||||
skip_ahead_[log_id]--;
|
||||
return FilterResult::kSkip;
|
||||
}
|
||||
|
||||
if (leading_dropped_) {
|
||||
if (element->getDropped()) {
|
||||
if (dropped_count) {
|
||||
return FilterResult::kSkip;
|
||||
}
|
||||
leading_dropped_ = false;
|
||||
|
@ -168,15 +178,15 @@ FilterResult LogReaderThread::FilterSecondPass(const LogBufferElement* element)
|
|||
return FilterResult::kStop;
|
||||
}
|
||||
|
||||
if (!IsWatching(element->getLogId())) {
|
||||
if (!IsWatching(log_id)) {
|
||||
return FilterResult::kSkip;
|
||||
}
|
||||
|
||||
if (pid_ && pid_ != element->getPid()) {
|
||||
if (pid_ && pid_ != pid) {
|
||||
return FilterResult::kSkip;
|
||||
}
|
||||
|
||||
if (start_time_ != log_time::EPOCH && element->getRealTime() <= start_time_) {
|
||||
if (start_time_ != log_time::EPOCH && realtime <= start_time_) {
|
||||
return FilterResult::kSkip;
|
||||
}
|
||||
|
||||
|
@ -199,7 +209,7 @@ FilterResult LogReaderThread::FilterSecondPass(const LogBufferElement* element)
|
|||
}
|
||||
|
||||
ok:
|
||||
if (!skip_ahead_[element->getLogId()]) {
|
||||
if (!skip_ahead_[log_id]) {
|
||||
return FilterResult::kWrite;
|
||||
}
|
||||
return FilterResult::kSkip;
|
||||
|
|
|
@ -30,7 +30,6 @@
|
|||
#include <sysutils/SocketClient.h>
|
||||
|
||||
#include "LogBuffer.h"
|
||||
#include "LogBufferElement.h"
|
||||
#include "LogWriter.h"
|
||||
|
||||
class LogReaderList;
|
||||
|
@ -63,8 +62,10 @@ class LogReaderThread {
|
|||
private:
|
||||
void ThreadFunction();
|
||||
// flushTo filter callbacks
|
||||
FilterResult FilterFirstPass(const LogBufferElement* element);
|
||||
FilterResult FilterSecondPass(const LogBufferElement* element);
|
||||
FilterResult FilterFirstPass(log_id_t log_id, pid_t pid, uint64_t sequence, log_time realtime,
|
||||
uint16_t dropped_count);
|
||||
FilterResult FilterSecondPass(log_id_t log_id, pid_t pid, uint64_t sequence, log_time realtime,
|
||||
uint16_t dropped_count);
|
||||
|
||||
std::condition_variable thread_triggered_condition_;
|
||||
LogBuffer* log_buffer_;
|
||||
|
|
|
@ -112,7 +112,8 @@ void SimpleLogBuffer::LogInternal(LogBufferElement&& elem) {
|
|||
|
||||
uint64_t SimpleLogBuffer::FlushTo(
|
||||
LogWriter* writer, uint64_t start, pid_t* last_tid,
|
||||
const std::function<FilterResult(const LogBufferElement* element)>& filter) {
|
||||
const std::function<FilterResult(log_id_t log_id, pid_t pid, uint64_t sequence,
|
||||
log_time realtime, uint16_t dropped_count)>& filter) {
|
||||
auto shared_lock = SharedLock{lock_};
|
||||
|
||||
std::list<LogBufferElement>::iterator it;
|
||||
|
@ -146,7 +147,8 @@ uint64_t SimpleLogBuffer::FlushTo(
|
|||
}
|
||||
|
||||
if (filter) {
|
||||
FilterResult ret = filter(&element);
|
||||
FilterResult ret = filter(element.getLogId(), element.getPid(), element.getSequence(),
|
||||
element.getRealTime(), element.getDropped());
|
||||
if (ret == FilterResult::kSkip) {
|
||||
continue;
|
||||
}
|
||||
|
|
|
@ -35,9 +35,10 @@ class SimpleLogBuffer : public LogBuffer {
|
|||
|
||||
int Log(log_id_t log_id, log_time realtime, uid_t uid, pid_t pid, pid_t tid, const char* msg,
|
||||
uint16_t len) override;
|
||||
uint64_t FlushTo(
|
||||
LogWriter* writer, uint64_t start, pid_t* lastTid,
|
||||
const std::function<FilterResult(const LogBufferElement* element)>& filter) override;
|
||||
uint64_t FlushTo(LogWriter* writer, uint64_t start, pid_t* lastTid,
|
||||
const std::function<FilterResult(log_id_t log_id, pid_t pid, uint64_t sequence,
|
||||
log_time realtime, uint16_t dropped_count)>&
|
||||
filter) override;
|
||||
|
||||
bool Clear(log_id_t id, uid_t uid) override;
|
||||
unsigned long GetSize(log_id_t id) override;
|
||||
|
|
Loading…
Reference in a new issue