Merge changes I5cf0d014,Ib46f0646 am: 609630e7ba

Change-Id: I66754d24b58cde35284b4e0b0621e3f40b587948
This commit is contained in:
Tom Cherry 2020-05-28 15:29:37 +00:00 committed by Automerger Merge Worker
commit d090042c44
6 changed files with 75 additions and 60 deletions

View file

@ -21,13 +21,11 @@
#include <functional>
#include <log/log.h>
#include <sysutils/SocketClient.h>
#include <log/log_read.h>
#include "LogBufferElement.h"
#include "LogWriter.h"
class LogWriter;
enum class FlushToResult {
enum class FilterResult {
kSkip,
kStop,
kWrite,
@ -45,10 +43,11 @@ class LogBuffer {
// valid message was from the same source so we can differentiate chatty
// filter types (identical or expired)
static const uint64_t FLUSH_ERROR = 0;
virtual uint64_t FlushTo(
LogWriter* writer, uint64_t start,
pid_t* last_tid, // nullable
const std::function<FlushToResult(const LogBufferElement* element)>& filter) = 0;
virtual uint64_t FlushTo(LogWriter* writer, uint64_t start,
pid_t* last_tid, // nullable
const std::function<FilterResult(log_id_t log_id, pid_t pid,
uint64_t sequence, log_time realtime,
uint16_t dropped_count)>& filter) = 0;
virtual bool Clear(log_id_t id, uid_t uid) = 0;
virtual unsigned long GetSize(log_id_t id) = 0;

View file

@ -171,27 +171,29 @@ bool LogReader::onDataAvailable(SocketClient* cli) {
if (start != log_time::EPOCH) {
bool start_time_set = false;
uint64_t last = sequence;
auto log_find_start = [pid, logMask, start, &sequence, &start_time_set,
&last](const LogBufferElement* element) -> FlushToResult {
if (pid && pid != element->getPid()) {
return FlushToResult::kSkip;
auto log_find_start = [pid, logMask, start, &sequence, &start_time_set, &last](
log_id_t element_log_id, pid_t element_pid,
uint64_t element_sequence, log_time element_realtime,
uint16_t) -> FilterResult {
if (pid && pid != element_pid) {
return FilterResult::kSkip;
}
if ((logMask & (1 << element->getLogId())) == 0) {
return FlushToResult::kSkip;
if ((logMask & (1 << element_log_id)) == 0) {
return FilterResult::kSkip;
}
if (start == element->getRealTime()) {
sequence = element->getSequence();
if (start == element_realtime) {
sequence = element_sequence;
start_time_set = true;
return FlushToResult::kStop;
return FilterResult::kStop;
} else {
if (start < element->getRealTime()) {
if (start < element_realtime) {
sequence = last;
start_time_set = true;
return FlushToResult::kStop;
return FilterResult::kStop;
}
last = element->getSequence();
last = element_sequence;
}
return FlushToResult::kSkip;
return FilterResult::kSkip;
};
log_buffer_->FlushTo(socket_log_writer.get(), sequence, nullptr, log_find_start);

View file

@ -75,13 +75,21 @@ void LogReaderThread::ThreadFunction() {
if (tail_) {
log_buffer_->FlushTo(writer_.get(), start, nullptr,
std::bind(&LogReaderThread::FilterFirstPass, this, _1));
[this](log_id_t log_id, pid_t pid, uint64_t sequence,
log_time realtime, uint16_t dropped_count) {
return FilterFirstPass(log_id, pid, sequence, realtime,
dropped_count);
});
leading_dropped_ =
true; // TODO: Likely a bug, if leading_dropped_ was not true before calling
// flushTo(), then it should not be reset to true after.
}
start = log_buffer_->FlushTo(writer_.get(), start, last_tid_,
std::bind(&LogReaderThread::FilterSecondPass, this, _1));
[this](log_id_t log_id, pid_t pid, uint64_t sequence,
log_time realtime, uint16_t dropped_count) {
return FilterSecondPass(log_id, pid, sequence, realtime,
dropped_count);
});
// We only ignore entries before the original start time for the first flushTo(), if we
// get entries after this first flush before the original start time, then the client
@ -123,65 +131,67 @@ void LogReaderThread::ThreadFunction() {
}
// A first pass to count the number of elements
FlushToResult LogReaderThread::FilterFirstPass(const LogBufferElement* element) {
FilterResult LogReaderThread::FilterFirstPass(log_id_t log_id, pid_t pid, uint64_t sequence,
log_time realtime, uint16_t dropped_count) {
auto lock = std::lock_guard{reader_list_->reader_threads_lock()};
if (leading_dropped_) {
if (element->getDropped()) {
return FlushToResult::kSkip;
if (dropped_count) {
return FilterResult::kSkip;
}
leading_dropped_ = false;
}
if (count_ == 0) {
start_ = element->getSequence();
start_ = sequence;
}
if ((!pid_ || pid_ == element->getPid()) && IsWatching(element->getLogId()) &&
(start_time_ == log_time::EPOCH || start_time_ <= element->getRealTime())) {
if ((!pid_ || pid_ == pid) && IsWatching(log_id) &&
(start_time_ == log_time::EPOCH || start_time_ <= realtime)) {
++count_;
}
return FlushToResult::kSkip;
return FilterResult::kSkip;
}
// A second pass to send the selected elements
FlushToResult LogReaderThread::FilterSecondPass(const LogBufferElement* element) {
FilterResult LogReaderThread::FilterSecondPass(log_id_t log_id, pid_t pid, uint64_t sequence,
log_time realtime, uint16_t dropped_count) {
auto lock = std::lock_guard{reader_list_->reader_threads_lock()};
start_ = element->getSequence();
start_ = sequence;
if (skip_ahead_[element->getLogId()]) {
skip_ahead_[element->getLogId()]--;
return FlushToResult::kSkip;
if (skip_ahead_[log_id]) {
skip_ahead_[log_id]--;
return FilterResult::kSkip;
}
if (leading_dropped_) {
if (element->getDropped()) {
return FlushToResult::kSkip;
if (dropped_count) {
return FilterResult::kSkip;
}
leading_dropped_ = false;
}
// Truncate to close race between first and second pass
if (non_block_ && tail_ && index_ >= count_) {
return FlushToResult::kStop;
return FilterResult::kStop;
}
if (!IsWatching(element->getLogId())) {
return FlushToResult::kSkip;
if (!IsWatching(log_id)) {
return FilterResult::kSkip;
}
if (pid_ && pid_ != element->getPid()) {
return FlushToResult::kSkip;
if (pid_ && pid_ != pid) {
return FilterResult::kSkip;
}
if (start_time_ != log_time::EPOCH && element->getRealTime() <= start_time_) {
return FlushToResult::kSkip;
if (start_time_ != log_time::EPOCH && realtime <= start_time_) {
return FilterResult::kSkip;
}
if (release_) {
return FlushToResult::kStop;
return FilterResult::kStop;
}
if (!tail_) {
@ -191,7 +201,7 @@ FlushToResult LogReaderThread::FilterSecondPass(const LogBufferElement* element)
++index_;
if (count_ > tail_ && index_ <= (count_ - tail_)) {
return FlushToResult::kSkip;
return FilterResult::kSkip;
}
if (!non_block_) {
@ -199,10 +209,10 @@ FlushToResult LogReaderThread::FilterSecondPass(const LogBufferElement* element)
}
ok:
if (!skip_ahead_[element->getLogId()]) {
return FlushToResult::kWrite;
if (!skip_ahead_[log_id]) {
return FilterResult::kWrite;
}
return FlushToResult::kSkip;
return FilterResult::kSkip;
}
void LogReaderThread::cleanSkip_Locked(void) {

View file

@ -30,7 +30,6 @@
#include <sysutils/SocketClient.h>
#include "LogBuffer.h"
#include "LogBufferElement.h"
#include "LogWriter.h"
class LogReaderList;
@ -63,8 +62,10 @@ class LogReaderThread {
private:
void ThreadFunction();
// flushTo filter callbacks
FlushToResult FilterFirstPass(const LogBufferElement* element);
FlushToResult FilterSecondPass(const LogBufferElement* element);
FilterResult FilterFirstPass(log_id_t log_id, pid_t pid, uint64_t sequence, log_time realtime,
uint16_t dropped_count);
FilterResult FilterSecondPass(log_id_t log_id, pid_t pid, uint64_t sequence, log_time realtime,
uint16_t dropped_count);
std::condition_variable thread_triggered_condition_;
LogBuffer* log_buffer_;

View file

@ -112,7 +112,8 @@ void SimpleLogBuffer::LogInternal(LogBufferElement&& elem) {
uint64_t SimpleLogBuffer::FlushTo(
LogWriter* writer, uint64_t start, pid_t* last_tid,
const std::function<FlushToResult(const LogBufferElement* element)>& filter) {
const std::function<FilterResult(log_id_t log_id, pid_t pid, uint64_t sequence,
log_time realtime, uint16_t dropped_count)>& filter) {
auto shared_lock = SharedLock{lock_};
std::list<LogBufferElement>::iterator it;
@ -146,11 +147,12 @@ uint64_t SimpleLogBuffer::FlushTo(
}
if (filter) {
FlushToResult ret = filter(&element);
if (ret == FlushToResult::kSkip) {
FilterResult ret = filter(element.getLogId(), element.getPid(), element.getSequence(),
element.getRealTime(), element.getDropped());
if (ret == FilterResult::kSkip) {
continue;
}
if (ret == FlushToResult::kStop) {
if (ret == FilterResult::kStop) {
break;
}
}

View file

@ -35,9 +35,10 @@ class SimpleLogBuffer : public LogBuffer {
int Log(log_id_t log_id, log_time realtime, uid_t uid, pid_t pid, pid_t tid, const char* msg,
uint16_t len) override;
uint64_t FlushTo(
LogWriter* writer, uint64_t start, pid_t* lastTid,
const std::function<FlushToResult(const LogBufferElement* element)>& filter) override;
uint64_t FlushTo(LogWriter* writer, uint64_t start, pid_t* lastTid,
const std::function<FilterResult(log_id_t log_id, pid_t pid, uint64_t sequence,
log_time realtime, uint16_t dropped_count)>&
filter) override;
bool Clear(log_id_t id, uid_t uid) override;
unsigned long GetSize(log_id_t id) override;