Merge "logd: single std::mutex for locking log buffers and tracking readers"

This commit is contained in:
Tom Cherry 2020-10-08 19:58:27 +00:00 committed by Gerrit Code Review
commit 8582aa21df
23 changed files with 251 additions and 261 deletions

View file

@ -58,12 +58,13 @@ cc_library_static {
srcs: [
"ChattyLogBuffer.cpp",
"CompressionEngine.cpp",
"LogBufferElement.cpp",
"LogReaderList.cpp",
"LogReaderThread.cpp",
"LogBufferElement.cpp",
"LogSize.cpp",
"LogStatistics.cpp",
"LogTags.cpp",
"LogdLock.cpp",
"PruneList.cpp",
"SerializedFlushToState.cpp",
"SerializedLogBuffer.cpp",
@ -138,6 +139,7 @@ cc_defaults {
"-fstack-protector-all",
"-g",
"-Wall",
"-Wthread-safety",
"-Wextra",
"-Werror",
"-fno-builtin",

View file

@ -333,8 +333,6 @@ bool ChattyLogBuffer::Prune(log_id_t id, unsigned long pruneRows, uid_t caller_u
LogReaderThread* oldest = nullptr;
bool clearAll = pruneRows == ULONG_MAX;
auto reader_threads_lock = std::lock_guard{reader_list()->reader_threads_lock()};
// Region locked?
for (const auto& reader_thread : reader_list()->reader_threads()) {
if (!reader_thread->IsWatching(id)) {

View file

@ -33,19 +33,19 @@
#include "LogStatistics.h"
#include "LogTags.h"
#include "LogWriter.h"
#include "LogdLock.h"
#include "PruneList.h"
#include "SimpleLogBuffer.h"
#include "rwlock.h"
typedef std::list<LogBufferElement> LogBufferElementCollection;
class ChattyLogBuffer : public SimpleLogBuffer {
// watermark of any worst/chatty uid processing
typedef std::unordered_map<uid_t, LogBufferElementCollection::iterator> LogBufferIteratorMap;
LogBufferIteratorMap mLastWorst[LOG_ID_MAX] GUARDED_BY(lock_);
LogBufferIteratorMap mLastWorst[LOG_ID_MAX] GUARDED_BY(logd_lock);
// watermark of any worst/chatty pid of system processing
typedef std::unordered_map<pid_t, LogBufferElementCollection::iterator> LogBufferPidIteratorMap;
LogBufferPidIteratorMap mLastWorstPidOfSystem[LOG_ID_MAX] GUARDED_BY(lock_);
LogBufferPidIteratorMap mLastWorstPidOfSystem[LOG_ID_MAX] GUARDED_BY(logd_lock);
public:
ChattyLogBuffer(LogReaderList* reader_list, LogTags* tags, PruneList* prune,
@ -53,18 +53,18 @@ class ChattyLogBuffer : public SimpleLogBuffer {
~ChattyLogBuffer();
protected:
bool Prune(log_id_t id, unsigned long pruneRows, uid_t uid) REQUIRES(lock_) override;
void LogInternal(LogBufferElement&& elem) REQUIRES(lock_) override;
bool Prune(log_id_t id, unsigned long pruneRows, uid_t uid) REQUIRES(logd_lock) override;
void LogInternal(LogBufferElement&& elem) REQUIRES(logd_lock) override;
private:
LogBufferElementCollection::iterator Erase(LogBufferElementCollection::iterator it,
bool coalesce = false) REQUIRES(lock_);
bool coalesce = false) REQUIRES(logd_lock);
PruneList* prune_;
// This always contains a copy of the last message logged, for deduplication.
std::optional<LogBufferElement> last_logged_elements_[LOG_ID_MAX] GUARDED_BY(lock_);
std::optional<LogBufferElement> last_logged_elements_[LOG_ID_MAX] GUARDED_BY(logd_lock);
// This contains an element if duplicate messages are seen.
// Its `dropped` count is `duplicates seen - 1`.
std::optional<LogBufferElement> duplicate_elements_[LOG_ID_MAX] GUARDED_BY(lock_);
std::optional<LogBufferElement> duplicate_elements_[LOG_ID_MAX] GUARDED_BY(logd_lock);
};

View file

@ -60,9 +60,14 @@ TEST_P(ChattyLogBufferTest, deduplication_simple) {
LogMessages(log_messages);
std::vector<LogMessage> read_log_messages;
std::unique_ptr<LogWriter> test_writer(new TestWriter(&read_log_messages, nullptr));
std::unique_ptr<FlushToState> flush_to_state = log_buffer_->CreateFlushToState(1, kLogMaskAll);
EXPECT_TRUE(log_buffer_->FlushTo(test_writer.get(), *flush_to_state, nullptr));
{
auto lock = std::lock_guard{logd_lock};
std::unique_ptr<LogWriter> test_writer(new TestWriter(&read_log_messages, nullptr));
std::unique_ptr<FlushToState> flush_to_state =
log_buffer_->CreateFlushToState(1, kLogMaskAll);
EXPECT_TRUE(log_buffer_->FlushTo(test_writer.get(), *flush_to_state, nullptr));
}
std::vector<LogMessage> expected_log_messages = {
make_message(0, "test_tag", "duplicate"),
@ -117,9 +122,13 @@ TEST_P(ChattyLogBufferTest, deduplication_overflow) {
LogMessages(log_messages);
std::vector<LogMessage> read_log_messages;
std::unique_ptr<LogWriter> test_writer(new TestWriter(&read_log_messages, nullptr));
std::unique_ptr<FlushToState> flush_to_state = log_buffer_->CreateFlushToState(1, kLogMaskAll);
EXPECT_TRUE(log_buffer_->FlushTo(test_writer.get(), *flush_to_state, nullptr));
{
auto lock = std::lock_guard{logd_lock};
std::unique_ptr<LogWriter> test_writer(new TestWriter(&read_log_messages, nullptr));
std::unique_ptr<FlushToState> flush_to_state =
log_buffer_->CreateFlushToState(1, kLogMaskAll);
EXPECT_TRUE(log_buffer_->FlushTo(test_writer.get(), *flush_to_state, nullptr));
}
std::vector<LogMessage> expected_log_messages = {
make_message(0, "test_tag", "normal"),
@ -173,9 +182,13 @@ TEST_P(ChattyLogBufferTest, deduplication_liblog) {
LogMessages(log_messages);
std::vector<LogMessage> read_log_messages;
std::unique_ptr<LogWriter> test_writer(new TestWriter(&read_log_messages, nullptr));
std::unique_ptr<FlushToState> flush_to_state = log_buffer_->CreateFlushToState(1, kLogMaskAll);
EXPECT_TRUE(log_buffer_->FlushTo(test_writer.get(), *flush_to_state, nullptr));
{
auto lock = std::lock_guard{logd_lock};
std::unique_ptr<LogWriter> test_writer(new TestWriter(&read_log_messages, nullptr));
std::unique_ptr<FlushToState> flush_to_state =
log_buffer_->CreateFlushToState(1, kLogMaskAll);
EXPECT_TRUE(log_buffer_->FlushTo(test_writer.get(), *flush_to_state, nullptr));
}
std::vector<LogMessage> expected_log_messages = {
make_message(0, 1234, 1),
@ -257,7 +270,7 @@ TEST_P(ChattyLogBufferTest, no_leading_chatty_simple) {
std::vector<LogMessage> read_log_messages;
bool released = false;
{
auto lock = std::unique_lock{reader_list_.reader_threads_lock()};
auto lock = std::lock_guard{logd_lock};
std::unique_ptr<LogWriter> test_writer(new TestWriter(&read_log_messages, &released));
std::unique_ptr<LogReaderThread> log_reader(
new LogReaderThread(log_buffer_.get(), &reader_list_, std::move(test_writer), true,
@ -317,7 +330,7 @@ TEST_P(ChattyLogBufferTest, no_leading_chatty_tail) {
std::vector<LogMessage> read_log_messages;
bool released = false;
{
auto lock = std::unique_lock{reader_list_.reader_threads_lock()};
auto lock = std::lock_guard{logd_lock};
std::unique_ptr<LogWriter> test_writer(new TestWriter(&read_log_messages, &released));
std::unique_ptr<LogReaderThread> log_reader(
new LogReaderThread(log_buffer_.get(), &reader_list_, std::move(test_writer), true,

View file

@ -21,10 +21,12 @@
#include <functional>
#include <memory>
#include <android-base/thread_annotations.h>
#include <log/log.h>
#include <log/log_read.h>
#include "LogWriter.h"
#include "LogdLock.h"
// A mask to represent which log buffers a reader is watching, values are (1 << LOG_ID_MAIN), etc.
using LogMask = uint32_t;
@ -62,12 +64,12 @@ class LogBuffer {
virtual int Log(log_id_t log_id, log_time realtime, uid_t uid, pid_t pid, pid_t tid,
const char* msg, uint16_t len) = 0;
virtual std::unique_ptr<FlushToState> CreateFlushToState(uint64_t start, LogMask log_mask) = 0;
virtual void DeleteFlushToState(std::unique_ptr<FlushToState>) {}
virtual std::unique_ptr<FlushToState> CreateFlushToState(uint64_t start, LogMask log_mask)
REQUIRES(logd_lock) = 0;
virtual bool FlushTo(
LogWriter* writer, FlushToState& state,
const std::function<FilterResult(log_id_t log_id, pid_t pid, uint64_t sequence,
log_time realtime)>& filter) = 0;
log_time realtime)>& filter) REQUIRES(logd_lock) = 0;
virtual bool Clear(log_id_t id, uid_t uid) = 0;
virtual size_t GetSize(log_id_t id) = 0;

View file

@ -190,10 +190,14 @@ TEST_P(LogBufferTest, smoke) {
LogMessages(log_messages);
std::vector<LogMessage> read_log_messages;
std::unique_ptr<LogWriter> test_writer(new TestWriter(&read_log_messages, nullptr));
std::unique_ptr<FlushToState> flush_to_state = log_buffer_->CreateFlushToState(1, kLogMaskAll);
EXPECT_TRUE(log_buffer_->FlushTo(test_writer.get(), *flush_to_state, nullptr));
EXPECT_EQ(2ULL, flush_to_state->start());
{
auto lock = std::lock_guard{logd_lock};
std::unique_ptr<LogWriter> test_writer(new TestWriter(&read_log_messages, nullptr));
std::unique_ptr<FlushToState> flush_to_state =
log_buffer_->CreateFlushToState(1, kLogMaskAll);
EXPECT_TRUE(log_buffer_->FlushTo(test_writer.get(), *flush_to_state, nullptr));
EXPECT_EQ(2ULL, flush_to_state->start());
}
CompareLogMessages(log_messages, read_log_messages);
}
@ -227,7 +231,7 @@ TEST_P(LogBufferTest, smoke_with_reader_thread) {
bool released = false;
{
auto lock = std::unique_lock{reader_list_.reader_threads_lock()};
auto lock = std::lock_guard{logd_lock};
std::unique_ptr<LogWriter> test_writer(new TestWriter(&read_log_messages, &released));
std::unique_ptr<LogReaderThread> log_reader(
new LogReaderThread(log_buffer_.get(), &reader_list_, std::move(test_writer), true,
@ -239,7 +243,7 @@ TEST_P(LogBufferTest, smoke_with_reader_thread) {
usleep(5000);
}
{
auto lock = std::unique_lock{reader_list_.reader_threads_lock()};
auto lock = std::lock_guard{logd_lock};
EXPECT_EQ(0U, reader_list_.reader_threads().size());
}
CompareLogMessages(log_messages, read_log_messages);
@ -301,7 +305,7 @@ TEST_P(LogBufferTest, random_messages) {
bool released = false;
{
auto lock = std::unique_lock{reader_list_.reader_threads_lock()};
auto lock = std::lock_guard{logd_lock};
std::unique_ptr<LogWriter> test_writer(new TestWriter(&read_log_messages, &released));
std::unique_ptr<LogReaderThread> log_reader(
new LogReaderThread(log_buffer_.get(), &reader_list_, std::move(test_writer), true,
@ -313,7 +317,7 @@ TEST_P(LogBufferTest, random_messages) {
usleep(5000);
}
{
auto lock = std::unique_lock{reader_list_.reader_threads_lock()};
auto lock = std::lock_guard{logd_lock};
EXPECT_EQ(0U, reader_list_.reader_threads().size());
}
CompareLogMessages(log_messages, read_log_messages);
@ -335,7 +339,7 @@ TEST_P(LogBufferTest, read_last_sequence) {
bool released = false;
{
auto lock = std::unique_lock{reader_list_.reader_threads_lock()};
auto lock = std::lock_guard{logd_lock};
std::unique_ptr<LogWriter> test_writer(new TestWriter(&read_log_messages, &released));
std::unique_ptr<LogReaderThread> log_reader(
new LogReaderThread(log_buffer_.get(), &reader_list_, std::move(test_writer), true,
@ -347,7 +351,7 @@ TEST_P(LogBufferTest, read_last_sequence) {
usleep(5000);
}
{
auto lock = std::unique_lock{reader_list_.reader_threads_lock()};
auto lock = std::lock_guard{logd_lock};
EXPECT_EQ(0U, reader_list_.reader_threads().size());
}
std::vector<LogMessage> expected_log_messages = {log_messages.back()};
@ -372,7 +376,7 @@ TEST_P(LogBufferTest, clear_logs) {
// Connect a blocking reader.
{
auto lock = std::unique_lock{reader_list_.reader_threads_lock()};
auto lock = std::lock_guard{logd_lock};
std::unique_ptr<LogWriter> test_writer(new TestWriter(&read_log_messages, &released));
std::unique_ptr<LogReaderThread> log_reader(
new LogReaderThread(log_buffer_.get(), &reader_list_, std::move(test_writer), false,
@ -385,7 +389,7 @@ TEST_P(LogBufferTest, clear_logs) {
int count = 0;
for (; count < kMaxRetryCount; ++count) {
usleep(5000);
auto lock = std::unique_lock{reader_list_.reader_threads_lock()};
auto lock = std::lock_guard{logd_lock};
if (reader_list_.reader_threads().back()->start() == 4) {
break;
}
@ -410,7 +414,7 @@ TEST_P(LogBufferTest, clear_logs) {
// Wait up to 250ms for the reader to read the 3 additional logs.
for (count = 0; count < kMaxRetryCount; ++count) {
usleep(5000);
auto lock = std::unique_lock{reader_list_.reader_threads_lock()};
auto lock = std::lock_guard{logd_lock};
if (reader_list_.reader_threads().back()->start() == 7) {
break;
}
@ -419,14 +423,14 @@ TEST_P(LogBufferTest, clear_logs) {
// Release the reader, wait for it to get the signal then check that it has been deleted.
{
auto lock = std::unique_lock{reader_list_.reader_threads_lock()};
reader_list_.reader_threads().back()->release_Locked();
auto lock = std::lock_guard{logd_lock};
reader_list_.reader_threads().back()->Release();
}
while (!released) {
usleep(5000);
}
{
auto lock = std::unique_lock{reader_list_.reader_threads_lock()};
auto lock = std::lock_guard{logd_lock};
EXPECT_EQ(0U, reader_list_.reader_threads().size());
}
@ -438,10 +442,15 @@ TEST_P(LogBufferTest, clear_logs) {
// Finally, call FlushTo and ensure that only the 3 logs after the clear remain in the buffer.
std::vector<LogMessage> read_log_messages_after_clear;
std::unique_ptr<LogWriter> test_writer(new TestWriter(&read_log_messages_after_clear, nullptr));
std::unique_ptr<FlushToState> flush_to_state = log_buffer_->CreateFlushToState(1, kLogMaskAll);
EXPECT_TRUE(log_buffer_->FlushTo(test_writer.get(), *flush_to_state, nullptr));
EXPECT_EQ(7ULL, flush_to_state->start());
{
auto lock = std::lock_guard{logd_lock};
std::unique_ptr<LogWriter> test_writer(
new TestWriter(&read_log_messages_after_clear, nullptr));
std::unique_ptr<FlushToState> flush_to_state =
log_buffer_->CreateFlushToState(1, kLogMaskAll);
EXPECT_TRUE(log_buffer_->FlushTo(test_writer.get(), *flush_to_state, nullptr));
EXPECT_EQ(7ULL, flush_to_state->start());
}
CompareLogMessages(after_clear_messages, read_log_messages_after_clear);
}

View file

@ -17,6 +17,7 @@
#include <ctype.h>
#include <inttypes.h>
#include <poll.h>
#include <sched.h>
#include <sys/prctl.h>
#include <sys/socket.h>
#include <sys/types.h>
@ -152,8 +153,8 @@ bool LogReader::onDataAvailable(SocketClient* cli) {
if (!fastcmp<strncmp>(buffer, "dumpAndClose", 12)) {
// Allow writer to get some cycles, and wait for pending notifications
sched_yield();
reader_list_->reader_threads_lock().lock();
reader_list_->reader_threads_lock().unlock();
logd_lock.lock();
logd_lock.unlock();
sched_yield();
nonBlock = true;
}
@ -191,6 +192,7 @@ bool LogReader::onDataAvailable(SocketClient* cli) {
}
return FilterResult::kSkip;
};
auto lock = std::lock_guard{logd_lock};
auto flush_to_state = log_buffer_->CreateFlushToState(sequence, logMask);
log_buffer_->FlushTo(socket_log_writer.get(), *flush_to_state, log_find_start);
@ -212,7 +214,7 @@ bool LogReader::onDataAvailable(SocketClient* cli) {
deadline = {};
}
auto lock = std::lock_guard{reader_list_->reader_threads_lock()};
auto lock = std::lock_guard{logd_lock};
auto entry = std::make_unique<LogReaderThread>(log_buffer_, reader_list_,
std::move(socket_log_writer), nonBlock, tail,
logMask, pid, start, sequence, deadline);
@ -230,10 +232,10 @@ bool LogReader::onDataAvailable(SocketClient* cli) {
bool LogReader::DoSocketDelete(SocketClient* cli) {
auto cli_name = SocketClientToName(cli);
auto lock = std::lock_guard{reader_list_->reader_threads_lock()};
auto lock = std::lock_guard{logd_lock};
for (const auto& reader : reader_list_->reader_threads()) {
if (reader->name() == cli_name) {
reader->release_Locked();
reader->Release();
return true;
}
}

View file

@ -19,8 +19,6 @@
// When we are notified a new log entry is available, inform
// listening sockets who are watching this entry's log id.
void LogReaderList::NotifyNewLog(LogMask log_mask) const {
auto lock = std::lock_guard{reader_threads_lock_};
for (const auto& entry : reader_threads_) {
if (!entry->IsWatchingMultiple(log_mask)) {
continue;
@ -28,6 +26,6 @@ void LogReaderList::NotifyNewLog(LogMask log_mask) const {
if (entry->deadline().time_since_epoch().count() != 0) {
continue;
}
entry->triggerReader_Locked();
entry->TriggerReader();
}
}

View file

@ -22,15 +22,16 @@
#include "LogBuffer.h"
#include "LogReaderThread.h"
#include "LogdLock.h"
class LogReaderList {
public:
void NotifyNewLog(LogMask log_mask) const;
void NotifyNewLog(LogMask log_mask) const REQUIRES(logd_lock);
std::list<std::unique_ptr<LogReaderThread>>& reader_threads() { return reader_threads_; }
std::mutex& reader_threads_lock() { return reader_threads_lock_; }
std::list<std::unique_ptr<LogReaderThread>>& reader_threads() REQUIRES(logd_lock) {
return reader_threads_;
}
private:
std::list<std::unique_ptr<LogReaderThread>> reader_threads_;
mutable std::mutex reader_threads_lock_;
std::list<std::unique_ptr<LogReaderThread>> reader_threads_ GUARDED_BY(logd_lock);
};

View file

@ -24,6 +24,7 @@
#include "LogBuffer.h"
#include "LogReaderList.h"
#include "SerializedFlushToState.h"
LogReaderThread::LogReaderThread(LogBuffer* log_buffer, LogReaderList* reader_list,
std::unique_ptr<LogWriter> writer, bool non_block,
@ -40,7 +41,7 @@ LogReaderThread::LogReaderThread(LogBuffer* log_buffer, LogReaderList* reader_li
start_time_(start_time),
deadline_(deadline),
non_block_(non_block) {
cleanSkip_Locked();
CleanSkip();
flush_to_state_ = log_buffer_->CreateFlushToState(start, log_mask);
auto thread = std::thread{&LogReaderThread::ThreadFunction, this};
thread.detach();
@ -49,7 +50,8 @@ LogReaderThread::LogReaderThread(LogBuffer* log_buffer, LogReaderList* reader_li
void LogReaderThread::ThreadFunction() {
prctl(PR_SET_NAME, "logd.reader.per");
auto lock = std::unique_lock{reader_list_->reader_threads_lock()};
auto lock = std::unique_lock{logd_lock};
auto lock_assertion = android::base::ScopedLockAssertion{logd_lock};
while (!release_) {
if (deadline_.time_since_epoch().count() != 0) {
@ -62,23 +64,19 @@ void LogReaderThread::ThreadFunction() {
}
}
lock.unlock();
if (tail_) {
auto first_pass_state = log_buffer_->CreateFlushToState(flush_to_state_->start(),
flush_to_state_->log_mask());
log_buffer_->FlushTo(
writer_.get(), *first_pass_state,
[this](log_id_t log_id, pid_t pid, uint64_t sequence, log_time realtime) {
return FilterFirstPass(log_id, pid, sequence, realtime);
});
log_buffer_->DeleteFlushToState(std::move(first_pass_state));
log_buffer_->FlushTo(writer_.get(), *first_pass_state,
[this](log_id_t log_id, pid_t pid, uint64_t sequence,
log_time realtime) REQUIRES(logd_lock) {
return FilterFirstPass(log_id, pid, sequence, realtime);
});
}
bool flush_success = log_buffer_->FlushTo(
writer_.get(), *flush_to_state_,
[this](log_id_t log_id, pid_t pid, uint64_t sequence, log_time realtime) {
return FilterSecondPass(log_id, pid, sequence, realtime);
});
[this](log_id_t log_id, pid_t pid, uint64_t sequence, log_time realtime) REQUIRES(
logd_lock) { return FilterSecondPass(log_id, pid, sequence, realtime); });
// We only ignore entries before the original start time for the first flushTo(), if we
// get entries after this first flush before the original start time, then the client
@ -89,8 +87,6 @@ void LogReaderThread::ThreadFunction() {
start_time_.tv_sec = 0;
start_time_.tv_nsec = 0;
lock.lock();
if (!flush_success) {
break;
}
@ -99,17 +95,13 @@ void LogReaderThread::ThreadFunction() {
break;
}
cleanSkip_Locked();
CleanSkip();
if (deadline_.time_since_epoch().count() == 0) {
thread_triggered_condition_.wait(lock);
}
}
lock.unlock();
log_buffer_->DeleteFlushToState(std::move(flush_to_state_));
lock.lock();
writer_->Release();
auto& log_reader_threads = reader_list_->reader_threads();
@ -123,8 +115,6 @@ void LogReaderThread::ThreadFunction() {
// A first pass to count the number of elements
FilterResult LogReaderThread::FilterFirstPass(log_id_t, pid_t pid, uint64_t, log_time realtime) {
auto lock = std::lock_guard{reader_list_->reader_threads_lock()};
if ((!pid_ || pid_ == pid) && (start_time_ == log_time::EPOCH || start_time_ <= realtime)) {
++count_;
}
@ -135,8 +125,6 @@ FilterResult LogReaderThread::FilterFirstPass(log_id_t, pid_t pid, uint64_t, log
// A second pass to send the selected elements
FilterResult LogReaderThread::FilterSecondPass(log_id_t log_id, pid_t pid, uint64_t,
log_time realtime) {
auto lock = std::lock_guard{reader_list_->reader_threads_lock()};
if (skip_ahead_[log_id]) {
skip_ahead_[log_id]--;
return FilterResult::kSkip;
@ -179,7 +167,3 @@ ok:
}
return FilterResult::kSkip;
}
void LogReaderThread::cleanSkip_Locked(void) {
memset(skip_ahead_, 0, sizeof(skip_ahead_));
}

View file

@ -26,10 +26,12 @@
#include <list>
#include <memory>
#include <android-base/thread_annotations.h>
#include <log/log.h>
#include "LogBuffer.h"
#include "LogWriter.h"
#include "LogdLock.h"
class LogReaderList;
@ -39,50 +41,54 @@ class LogReaderThread {
std::unique_ptr<LogWriter> writer, bool non_block, unsigned long tail,
LogMask log_mask, pid_t pid, log_time start_time, uint64_t sequence,
std::chrono::steady_clock::time_point deadline);
void triggerReader_Locked() { thread_triggered_condition_.notify_all(); }
void TriggerReader() REQUIRES(logd_lock) { thread_triggered_condition_.notify_all(); }
void triggerSkip_Locked(log_id_t id, unsigned int skip) { skip_ahead_[id] = skip; }
void cleanSkip_Locked();
void TriggerSkip(log_id_t id, unsigned int skip) REQUIRES(logd_lock) { skip_ahead_[id] = skip; }
void CleanSkip() REQUIRES(logd_lock) { memset(skip_ahead_, 0, sizeof(skip_ahead_)); }
void release_Locked() {
void Release() REQUIRES(logd_lock) {
// gracefully shut down the socket.
writer_->Shutdown();
release_ = true;
thread_triggered_condition_.notify_all();
}
bool IsWatching(log_id_t id) const { return flush_to_state_->log_mask() & (1 << id); }
bool IsWatchingMultiple(LogMask log_mask) const {
return flush_to_state_ && flush_to_state_->log_mask() & log_mask;
bool IsWatching(log_id_t id) const REQUIRES(logd_lock) {
return flush_to_state_->log_mask() & (1 << id);
}
bool IsWatchingMultiple(LogMask log_mask) const REQUIRES(logd_lock) {
return flush_to_state_->log_mask() & log_mask;
}
std::string name() const { return writer_->name(); }
uint64_t start() const { return flush_to_state_->start(); }
std::chrono::steady_clock::time_point deadline() const { return deadline_; }
FlushToState& flush_to_state() { return *flush_to_state_; }
std::string name() const REQUIRES(logd_lock) { return writer_->name(); }
uint64_t start() const REQUIRES(logd_lock) { return flush_to_state_->start(); }
std::chrono::steady_clock::time_point deadline() const REQUIRES(logd_lock) { return deadline_; }
FlushToState& flush_to_state() REQUIRES(logd_lock) { return *flush_to_state_; }
private:
void ThreadFunction();
// flushTo filter callbacks
FilterResult FilterFirstPass(log_id_t log_id, pid_t pid, uint64_t sequence, log_time realtime);
FilterResult FilterSecondPass(log_id_t log_id, pid_t pid, uint64_t sequence, log_time realtime);
FilterResult FilterFirstPass(log_id_t log_id, pid_t pid, uint64_t sequence, log_time realtime)
REQUIRES(logd_lock);
FilterResult FilterSecondPass(log_id_t log_id, pid_t pid, uint64_t sequence, log_time realtime)
REQUIRES(logd_lock);
std::condition_variable thread_triggered_condition_;
LogBuffer* log_buffer_;
LogReaderList* reader_list_;
std::unique_ptr<LogWriter> writer_;
std::unique_ptr<LogWriter> writer_ GUARDED_BY(logd_lock);
// Set to true to cause the thread to end and the LogReaderThread to delete itself.
bool release_ = false;
bool release_ GUARDED_BY(logd_lock) = false;
// If set to non-zero, only pids equal to this are read by the reader.
const pid_t pid_;
// When a reader is referencing (via start_) old elements in the log buffer, and the log
// buffer's size grows past its memory limit, the log buffer may request the reader to skip
// ahead a specified number of logs.
unsigned int skip_ahead_[LOG_ID_MAX];
unsigned int skip_ahead_[LOG_ID_MAX] GUARDED_BY(logd_lock);
// LogBuffer::FlushTo() needs to store state across subsequent calls.
std::unique_ptr<FlushToState> flush_to_state_;
std::unique_ptr<FlushToState> flush_to_state_ GUARDED_BY(logd_lock);
// These next three variables are used for reading only the most recent lines aka `adb logcat
// -t` / `adb logcat -T`.
@ -100,7 +106,7 @@ class LogReaderThread {
log_time start_time_;
// CLOCK_MONOTONIC based deadline used for log wrapping. If this deadline expires before logs
// wrap, then wake up and send the logs to the reader anyway.
std::chrono::steady_clock::time_point deadline_;
std::chrono::steady_clock::time_point deadline_ GUARDED_BY(logd_lock);
// If this reader is 'dumpAndClose' and will disconnect once it has read its intended logs.
const bool non_block_;
};

19
logd/LogdLock.cpp Normal file
View file

@ -0,0 +1,19 @@
/*
* Copyright (C) 2020 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "LogdLock.h"
std::mutex logd_lock;

21
logd/LogdLock.h Normal file
View file

@ -0,0 +1,21 @@
/*
* Copyright (C) 2020 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <mutex>
extern std::mutex logd_lock;

View file

@ -321,6 +321,7 @@ class PrintLogs : public SingleBufferOperation {
}
void End() override {
auto lock = std::lock_guard{logd_lock};
std::unique_ptr<LogWriter> test_writer(new StdoutWriter());
std::unique_ptr<FlushToState> flush_to_state = log_buffer_->CreateFlushToState(1, mask_);
log_buffer_->FlushTo(test_writer.get(), *flush_to_state, nullptr);
@ -372,7 +373,7 @@ class PrintAllLogs : public SingleBufferOperation {
PrintAllLogs(log_time first_log_timestamp, const char* buffer, const char* buffers)
: SingleBufferOperation(first_log_timestamp, buffer) {
LogMask mask = BuffersToLogMask(buffers);
auto lock = std::unique_lock{reader_list_.reader_threads_lock()};
auto lock = std::unique_lock{logd_lock};
std::unique_ptr<LogWriter> stdout_writer(new StdoutWriter());
std::unique_ptr<LogReaderThread> log_reader(
new LogReaderThread(log_buffer_.get(), &reader_list_, std::move(stdout_writer),

View file

@ -20,8 +20,9 @@
#include <android-base/logging.h>
SerializedFlushToState::SerializedFlushToState(uint64_t start, LogMask log_mask)
: FlushToState(start, log_mask) {
SerializedFlushToState::SerializedFlushToState(uint64_t start, LogMask log_mask,
std::list<SerializedLogChunk>* logs)
: FlushToState(start, log_mask), logs_(logs) {
log_id_for_each(i) {
if (((1 << i) & log_mask) == 0) {
continue;

View file

@ -21,6 +21,7 @@
#include <queue>
#include "LogBuffer.h"
#include "LogdLock.h"
#include "SerializedLogChunk.h"
#include "SerializedLogEntry.h"
@ -44,48 +45,45 @@ class SerializedFlushToState : public FlushToState {
public:
// Initializes this state object. For each log buffer set in log_mask, this sets
// logs_needed_from_next_position_.
SerializedFlushToState(uint64_t start, LogMask log_mask);
SerializedFlushToState(uint64_t start, LogMask log_mask, std::list<SerializedLogChunk>* logs)
REQUIRES(logd_lock);
// Decrease the reference of all referenced logs. This happens when a reader is disconnected.
~SerializedFlushToState() override;
// We can't hold SerializedLogBuffer::lock_ in the constructor, so we must initialize logs here.
void InitializeLogs(std::list<SerializedLogChunk>* logs) {
if (logs_ == nullptr) logs_ = logs;
}
// Updates the state of log_positions_ and logs_needed_from_next_position_ then returns true if
// there are any unread logs, false otherwise.
bool HasUnreadLogs();
bool HasUnreadLogs() REQUIRES(logd_lock);
// Returns the next unread log and sets logs_needed_from_next_position_ to indicate that we're
// waiting for more logs from the associated log buffer.
LogWithId PopNextUnreadLog();
LogWithId PopNextUnreadLog() REQUIRES(logd_lock);
// If the parent log buffer prunes logs, the reference that this class contains may become
// invalid, so this must be called first to drop the reference to buffer_it, if any.
void Prune(log_id_t log_id, const std::list<SerializedLogChunk>::iterator& buffer_it);
void Prune(log_id_t log_id, const std::list<SerializedLogChunk>::iterator& buffer_it)
REQUIRES(logd_lock);
private:
// Set logs_needed_from_next_position_[i] to indicate if log_positions_[i] points to an unread
// log or to the point at which the next log will appear.
void UpdateLogsNeeded(log_id_t log_id);
void UpdateLogsNeeded(log_id_t log_id) REQUIRES(logd_lock);
// Create a LogPosition object for the given log_id by searching through the log chunks for the
// first chunk and then first log entry within that chunk that is greater or equal to start().
void CreateLogPosition(log_id_t log_id);
void CreateLogPosition(log_id_t log_id) REQUIRES(logd_lock);
// Checks to see if any log buffers set in logs_needed_from_next_position_ have new logs and
// calls UpdateLogsNeeded() if so.
void CheckForNewLogs();
void CheckForNewLogs() REQUIRES(logd_lock);
std::list<SerializedLogChunk>* logs_ = nullptr;
std::list<SerializedLogChunk>* logs_ GUARDED_BY(logd_lock) = nullptr;
// An optional structure that contains an iterator to the serialized log buffer and offset into
// it that this logger should handle next.
std::optional<LogPosition> log_positions_[LOG_ID_MAX];
std::optional<LogPosition> log_positions_[LOG_ID_MAX] GUARDED_BY(logd_lock);
// A bit for each log that is set if a given log_id has no logs or if this client has read all
// of its logs. In order words: `logs_[i].empty() || (buffer_it == std::prev(logs_.end) &&
// next_log_position == logs_write_position_)`. These will be re-checked in each
// loop in case new logs came in.
std::bitset<LOG_ID_MAX> logs_needed_from_next_position_ = {};
std::bitset<LOG_ID_MAX> logs_needed_from_next_position_ GUARDED_BY(logd_lock) = {};
};

View file

@ -36,8 +36,8 @@ class SerializedFlushToStateTest : public testing::Test {
}
void TearDown() override { android::base::SetMinimumLogSeverity(old_log_severity_); }
std::string TestReport(const std::vector<uint64_t>& expected,
const std::vector<uint64_t>& read) {
std::string TestReport(const std::vector<uint64_t>& expected, const std::vector<uint64_t>& read)
REQUIRES(logd_lock) {
auto sequence_to_log_id = [&](uint64_t sequence) -> int {
for (const auto& [log_id, sequences] : sequence_numbers_per_buffer_) {
if (std::find(sequences.begin(), sequences.end(), sequence) != sequences.end()) {
@ -82,13 +82,12 @@ class SerializedFlushToStateTest : public testing::Test {
// Read sequence numbers in order from SerializedFlushToState for every mask combination and all
// sequence numbers from 0 through the highest logged sequence number + 1.
// This assumes that all of the logs have already been written.
void TestAllReading() {
void TestAllReading() REQUIRES(logd_lock) {
uint64_t max_sequence = sequence_ + 1;
uint32_t max_mask = (1 << LOG_ID_MAX) - 1;
for (uint64_t sequence = 0; sequence < max_sequence; ++sequence) {
for (uint32_t mask = 0; mask < max_mask; ++mask) {
auto state = SerializedFlushToState{sequence, mask};
state.InitializeLogs(log_chunks_);
auto state = SerializedFlushToState{sequence, mask, log_chunks_};
TestReading(sequence, mask, state);
}
}
@ -98,14 +97,14 @@ class SerializedFlushToStateTest : public testing::Test {
// it calls write_logs() in a loop for sequence/mask combination. It clears log_chunks_ and
// sequence_numbers_per_buffer_ between calls, such that only the sequence numbers written in
// the previous call to write_logs() are expected.
void TestAllReadingWithFutureMessages(const std::function<bool(int)>& write_logs) {
void TestAllReadingWithFutureMessages(const std::function<bool(int)>& write_logs)
REQUIRES(logd_lock) {
uint64_t max_sequence = sequence_ + 1;
uint32_t max_mask = (1 << LOG_ID_MAX) - 1;
for (uint64_t sequence = 1; sequence < max_sequence; ++sequence) {
for (uint32_t mask = 1; mask < max_mask; ++mask) {
log_id_for_each(i) { log_chunks_[i].clear(); }
auto state = SerializedFlushToState{sequence, mask};
state.InitializeLogs(log_chunks_);
auto state = SerializedFlushToState{sequence, mask, log_chunks_};
int loop_count = 0;
while (write_logs(loop_count++)) {
TestReading(sequence, mask, state);
@ -115,7 +114,8 @@ class SerializedFlushToStateTest : public testing::Test {
}
}
void TestReading(uint64_t start, LogMask log_mask, SerializedFlushToState& state) {
void TestReading(uint64_t start, LogMask log_mask, SerializedFlushToState& state)
REQUIRES(logd_lock) {
std::vector<uint64_t> expected_sequence;
log_id_for_each(i) {
if (((1 << i) & log_mask) == 0) {
@ -148,7 +148,7 @@ class SerializedFlushToStateTest : public testing::Test {
// Add a chunk with the given messages to the a given log buffer. Keep track of the sequence
// numbers for future validation. Optionally mark the block as having finished writing.
void AddChunkWithMessages(bool finish_writing, int buffer,
const std::vector<std::string>& messages) {
const std::vector<std::string>& messages) REQUIRES(logd_lock) {
auto chunk = SerializedLogChunk{kChunkSize};
for (const auto& message : messages) {
auto sequence = sequence_++;
@ -175,6 +175,7 @@ class SerializedFlushToStateTest : public testing::Test {
// 4: 1 chunk with 0 logs and finished writing (impossible, but SerializedFlushToState handles it)
// 5-7: 0 chunks
TEST_F(SerializedFlushToStateTest, smoke) {
auto lock = std::lock_guard{logd_lock};
AddChunkWithMessages(true, 0, {"1st", "2nd"});
AddChunkWithMessages(true, 1, {"3rd"});
AddChunkWithMessages(false, 0, {"4th"});
@ -188,6 +189,7 @@ TEST_F(SerializedFlushToStateTest, smoke) {
}
TEST_F(SerializedFlushToStateTest, random) {
auto lock = std::lock_guard{logd_lock};
srand(1);
for (int count = 0; count < 20; ++count) {
unsigned int num_messages = 1 + rand() % 15;
@ -204,7 +206,8 @@ TEST_F(SerializedFlushToStateTest, random) {
// Same start as smoke, but we selectively write logs to the buffers and ensure they're read.
TEST_F(SerializedFlushToStateTest, future_writes) {
auto write_logs = [&](int loop_count) {
auto lock = std::lock_guard{logd_lock};
auto write_logs = [&](int loop_count) REQUIRES(logd_lock) {
switch (loop_count) {
case 0:
// Initial writes.
@ -252,11 +255,11 @@ TEST_F(SerializedFlushToStateTest, future_writes) {
}
TEST_F(SerializedFlushToStateTest, no_dangling_references) {
auto lock = std::lock_guard{logd_lock};
AddChunkWithMessages(true, 0, {"1st", "2nd"});
AddChunkWithMessages(true, 0, {"3rd", "4th"});
auto state = SerializedFlushToState{1, kLogMaskAll};
state.InitializeLogs(log_chunks_);
auto state = SerializedFlushToState{1, kLogMaskAll, log_chunks_};
ASSERT_EQ(log_chunks_[0].size(), 2U);
auto first_chunk = log_chunks_[0].begin();
@ -290,6 +293,7 @@ TEST_F(SerializedFlushToStateTest, no_dangling_references) {
}
TEST(SerializedFlushToState, Prune) {
auto lock = std::lock_guard{logd_lock};
auto chunk = SerializedLogChunk{kChunkSize};
chunk.Log(1, log_time(), 0, 1, 1, "abc", 3);
chunk.Log(2, log_time(), 0, 1, 1, "abc", 3);
@ -299,8 +303,7 @@ TEST(SerializedFlushToState, Prune) {
std::list<SerializedLogChunk> log_chunks[LOG_ID_MAX];
log_chunks[LOG_ID_MAIN].emplace_back(std::move(chunk));
auto state = SerializedFlushToState{1, kLogMaskAll};
state.InitializeLogs(log_chunks);
auto state = SerializedFlushToState{1, kLogMaskAll, log_chunks};
ASSERT_TRUE(state.HasUnreadLogs());
state.Prune(LOG_ID_MAIN, log_chunks[LOG_ID_MAIN].begin());

View file

@ -41,9 +41,9 @@ void SerializedLogBuffer::Init() {
}
// Release any sleeping reader threads to dump their current content.
auto reader_threads_lock = std::lock_guard{reader_list_->reader_threads_lock()};
auto lock = std::lock_guard{logd_lock};
for (const auto& reader_thread : reader_list_->reader_threads()) {
reader_thread->triggerReader_Locked();
reader_thread->TriggerReader();
}
}
@ -86,7 +86,7 @@ int SerializedLogBuffer::Log(log_id_t log_id, log_time realtime, uid_t uid, pid_
auto sequence = sequence_.fetch_add(1, std::memory_order_relaxed);
auto lock = std::lock_guard{lock_};
auto lock = std::lock_guard{logd_lock};
if (logs_[log_id].empty()) {
logs_[log_id].push_back(SerializedLogChunk(max_size_[log_id] / 4));
@ -140,8 +140,6 @@ void SerializedLogBuffer::NotifyReadersOfPrune(
}
void SerializedLogBuffer::Prune(log_id_t log_id, size_t bytes_to_free, uid_t uid) {
auto reader_threads_lock = std::lock_guard{reader_list_->reader_threads_lock()};
auto& log_buffer = logs_[log_id];
auto it = log_buffer.begin();
while (it != log_buffer.end()) {
@ -158,7 +156,7 @@ void SerializedLogBuffer::Prune(log_id_t log_id, size_t bytes_to_free, uid_t uid
// fast enough to not back-up logd. Instead, we can achieve an nearly-as-efficient
// but not error-prune batching effect by waking the reader whenever any chunk is
// about to be pruned.
reader_thread->triggerReader_Locked();
reader_thread->TriggerReader();
}
// Some readers may be still reading from this log chunk, log a warning that they are
@ -198,22 +196,14 @@ void SerializedLogBuffer::Prune(log_id_t log_id, size_t bytes_to_free, uid_t uid
std::unique_ptr<FlushToState> SerializedLogBuffer::CreateFlushToState(uint64_t start,
LogMask log_mask) {
return std::make_unique<SerializedFlushToState>(start, log_mask);
}
void SerializedLogBuffer::DeleteFlushToState(std::unique_ptr<FlushToState> state) {
auto lock = std::unique_lock{lock_};
state.reset();
return std::make_unique<SerializedFlushToState>(start, log_mask, logs_);
}
bool SerializedLogBuffer::FlushTo(
LogWriter* writer, FlushToState& abstract_state,
const std::function<FilterResult(log_id_t log_id, pid_t pid, uint64_t sequence,
log_time realtime)>& filter) {
auto lock = std::unique_lock{lock_};
auto& state = reinterpret_cast<SerializedFlushToState&>(abstract_state);
state.InitializeLogs(logs_);
while (state.HasUnreadLogs()) {
LogWithId top = state.PopNextUnreadLog();
@ -245,13 +235,14 @@ bool SerializedLogBuffer::FlushTo(
unsigned char entry_copy[kMaxEntrySize] __attribute__((uninitialized));
CHECK_LT(entry->msg_len(), LOGGER_ENTRY_MAX_PAYLOAD + 1);
memcpy(entry_copy, entry, sizeof(*entry) + entry->msg_len());
lock.unlock();
logd_lock.unlock();
if (!reinterpret_cast<SerializedLogEntry*>(entry_copy)->Flush(writer, log_id)) {
logd_lock.lock();
return false;
}
lock.lock();
logd_lock.lock();
}
state.set_start(state.start() + 1);
@ -259,7 +250,7 @@ bool SerializedLogBuffer::FlushTo(
}
bool SerializedLogBuffer::Clear(log_id_t id, uid_t uid) {
auto lock = std::lock_guard{lock_};
auto lock = std::lock_guard{logd_lock};
Prune(id, ULONG_MAX, uid);
// Clearing SerializedLogBuffer never waits for readers and therefore is always successful.
@ -275,7 +266,7 @@ size_t SerializedLogBuffer::GetSizeUsed(log_id_t id) {
}
size_t SerializedLogBuffer::GetSize(log_id_t id) {
auto lock = std::lock_guard{lock_};
auto lock = std::lock_guard{logd_lock};
return max_size_[id];
}
@ -288,7 +279,7 @@ bool SerializedLogBuffer::SetSize(log_id_t id, size_t size) {
return false;
}
auto lock = std::lock_guard{lock_};
auto lock = std::lock_guard{logd_lock};
max_size_[id] = size;
MaybePrune(id);

View file

@ -30,9 +30,9 @@
#include "LogReaderList.h"
#include "LogStatistics.h"
#include "LogTags.h"
#include "LogdLock.h"
#include "SerializedLogChunk.h"
#include "SerializedLogEntry.h"
#include "rwlock.h"
class SerializedLogBuffer final : public LogBuffer {
public:
@ -41,11 +41,12 @@ class SerializedLogBuffer final : public LogBuffer {
int Log(log_id_t log_id, log_time realtime, uid_t uid, pid_t pid, pid_t tid, const char* msg,
uint16_t len) override;
std::unique_ptr<FlushToState> CreateFlushToState(uint64_t start, LogMask log_mask) override;
void DeleteFlushToState(std::unique_ptr<FlushToState> state) override;
std::unique_ptr<FlushToState> CreateFlushToState(uint64_t start, LogMask log_mask)
REQUIRES(logd_lock) override;
bool FlushTo(LogWriter* writer, FlushToState& state,
const std::function<FilterResult(log_id_t log_id, pid_t pid, uint64_t sequence,
log_time realtime)>& filter) override;
log_time realtime)>& filter)
REQUIRES(logd_lock) override;
bool Clear(log_id_t id, uid_t uid) override;
size_t GetSize(log_id_t id) override;
@ -55,20 +56,19 @@ class SerializedLogBuffer final : public LogBuffer {
private:
bool ShouldLog(log_id_t log_id, const char* msg, uint16_t len);
void MaybePrune(log_id_t log_id) REQUIRES(lock_);
void Prune(log_id_t log_id, size_t bytes_to_free, uid_t uid) REQUIRES(lock_);
void MaybePrune(log_id_t log_id) REQUIRES(logd_lock);
void Prune(log_id_t log_id, size_t bytes_to_free, uid_t uid) REQUIRES(logd_lock);
void NotifyReadersOfPrune(log_id_t log_id, const std::list<SerializedLogChunk>::iterator& chunk)
REQUIRES(reader_list_->reader_threads_lock());
REQUIRES(logd_lock);
void RemoveChunkFromStats(log_id_t log_id, SerializedLogChunk& chunk);
size_t GetSizeUsed(log_id_t id) REQUIRES(lock_);
size_t GetSizeUsed(log_id_t id) REQUIRES(logd_lock);
LogReaderList* reader_list_;
LogTags* tags_;
LogStatistics* stats_;
size_t max_size_[LOG_ID_MAX] GUARDED_BY(lock_) = {};
std::list<SerializedLogChunk> logs_[LOG_ID_MAX] GUARDED_BY(lock_);
RwLock lock_;
size_t max_size_[LOG_ID_MAX] GUARDED_BY(logd_lock) = {};
std::list<SerializedLogChunk> logs_[LOG_ID_MAX] GUARDED_BY(logd_lock);
std::atomic<uint64_t> sequence_ = 1;
};

View file

@ -36,9 +36,9 @@ void SimpleLogBuffer::Init() {
}
// Release any sleeping reader threads to dump their current content.
auto reader_threads_lock = std::lock_guard{reader_list_->reader_threads_lock()};
auto lock = std::lock_guard{logd_lock};
for (const auto& reader_thread : reader_list_->reader_threads()) {
reader_thread->triggerReader_Locked();
reader_thread->TriggerReader();
}
}
@ -95,7 +95,7 @@ int SimpleLogBuffer::Log(log_id_t log_id, log_time realtime, uid_t uid, pid_t pi
// exact entry with time specified in ms or us precision.
if ((realtime.tv_nsec % 1000) == 0) ++realtime.tv_nsec;
auto lock = std::lock_guard{lock_};
auto lock = std::lock_guard{logd_lock};
auto sequence = sequence_.fetch_add(1, std::memory_order_relaxed);
LogInternal(LogBufferElement(log_id, realtime, uid, pid, tid, sequence, msg, len));
return len;
@ -136,8 +136,6 @@ bool SimpleLogBuffer::FlushTo(
LogWriter* writer, FlushToState& abstract_state,
const std::function<FilterResult(log_id_t log_id, pid_t pid, uint64_t sequence,
log_time realtime)>& filter) {
auto shared_lock = SharedLock{lock_};
auto& state = reinterpret_cast<ChattyFlushToState&>(abstract_state);
std::list<LogBufferElement>::iterator it;
@ -200,13 +198,14 @@ bool SimpleLogBuffer::FlushTo(
state.last_tid()[element.log_id()] =
(element.dropped_count() && !same_tid) ? 0 : element.tid();
shared_lock.unlock();
logd_lock.unlock();
// We never prune logs equal to or newer than any LogReaderThreads' `start` value, so the
// `element` pointer is safe here without the lock
if (!element.FlushTo(writer, stats_, same_tid)) {
logd_lock.lock();
return false;
}
shared_lock.lock_shared();
logd_lock.lock();
}
state.set_start(state.start() + 1);
@ -217,7 +216,7 @@ bool SimpleLogBuffer::Clear(log_id_t id, uid_t uid) {
// Try three times to clear, then disconnect the readers and try one final time.
for (int retry = 0; retry < 3; ++retry) {
{
auto lock = std::lock_guard{lock_};
auto lock = std::lock_guard{logd_lock};
if (Prune(id, ULONG_MAX, uid)) {
return true;
}
@ -229,27 +228,27 @@ bool SimpleLogBuffer::Clear(log_id_t id, uid_t uid) {
// _blocked_ reader.
bool busy = false;
{
auto lock = std::lock_guard{lock_};
auto lock = std::lock_guard{logd_lock};
busy = !Prune(id, 1, uid);
}
// It is still busy, disconnect all readers.
if (busy) {
auto reader_threads_lock = std::lock_guard{reader_list_->reader_threads_lock()};
auto lock = std::lock_guard{logd_lock};
for (const auto& reader_thread : reader_list_->reader_threads()) {
if (reader_thread->IsWatching(id)) {
LOG(WARNING) << "Kicking blocked reader, " << reader_thread->name()
<< ", from LogBuffer::clear()";
reader_thread->release_Locked();
reader_thread->Release();
}
}
}
auto lock = std::lock_guard{lock_};
auto lock = std::lock_guard{logd_lock};
return Prune(id, ULONG_MAX, uid);
}
// get the total space allocated to "id"
size_t SimpleLogBuffer::GetSize(log_id_t id) {
auto lock = SharedLock{lock_};
auto lock = std::lock_guard{logd_lock};
size_t retval = max_size_[id];
return retval;
}
@ -261,7 +260,7 @@ bool SimpleLogBuffer::SetSize(log_id_t id, size_t size) {
return false;
}
auto lock = std::lock_guard{lock_};
auto lock = std::lock_guard{logd_lock};
max_size_[id] = size;
return true;
}
@ -274,8 +273,6 @@ void SimpleLogBuffer::MaybePrune(log_id_t id) {
}
bool SimpleLogBuffer::Prune(log_id_t id, unsigned long prune_rows, uid_t caller_uid) {
auto reader_threads_lock = std::lock_guard{reader_list_->reader_threads_lock()};
// Don't prune logs that are newer than the point at which any reader threads are reading from.
LogReaderThread* oldest = nullptr;
for (const auto& reader_thread : reader_list_->reader_threads()) {
@ -347,14 +344,14 @@ void SimpleLogBuffer::KickReader(LogReaderThread* reader, log_id_t id, unsigned
// dropped if we hit too much memory pressure.
LOG(WARNING) << "Kicking blocked reader, " << reader->name()
<< ", from LogBuffer::kickMe()";
reader->release_Locked();
reader->Release();
} else if (reader->deadline().time_since_epoch().count() != 0) {
// Allow a blocked WRAP deadline reader to trigger and start reporting the log data.
reader->triggerReader_Locked();
reader->TriggerReader();
} else {
// tell slow reader to skip entries to catch up
LOG(WARNING) << "Skipping " << prune_rows << " entries from slow reader, " << reader->name()
<< ", from LogBuffer::kickMe()";
reader->triggerSkip_Locked(id, prune_rows);
reader->TriggerSkip(id, prune_rows);
}
}

View file

@ -25,7 +25,7 @@
#include "LogReaderList.h"
#include "LogStatistics.h"
#include "LogTags.h"
#include "rwlock.h"
#include "LogdLock.h"
class SimpleLogBuffer : public LogBuffer {
public:
@ -35,10 +35,12 @@ class SimpleLogBuffer : public LogBuffer {
int Log(log_id_t log_id, log_time realtime, uid_t uid, pid_t pid, pid_t tid, const char* msg,
uint16_t len) override;
std::unique_ptr<FlushToState> CreateFlushToState(uint64_t start, LogMask log_mask) override;
std::unique_ptr<FlushToState> CreateFlushToState(uint64_t start, LogMask log_mask)
REQUIRES(logd_lock) override;
bool FlushTo(LogWriter* writer, FlushToState& state,
const std::function<FilterResult(log_id_t log_id, pid_t pid, uint64_t sequence,
log_time realtime)>& filter) override;
log_time realtime)>& filter)
REQUIRES(logd_lock) override;
bool Clear(log_id_t id, uid_t uid) override;
size_t GetSize(log_id_t id) override;
@ -47,27 +49,25 @@ class SimpleLogBuffer : public LogBuffer {
uint64_t sequence() const override { return sequence_.load(std::memory_order_relaxed); }
protected:
virtual bool Prune(log_id_t id, unsigned long prune_rows, uid_t uid) REQUIRES(lock_);
virtual void LogInternal(LogBufferElement&& elem) REQUIRES(lock_);
virtual bool Prune(log_id_t id, unsigned long prune_rows, uid_t uid) REQUIRES(logd_lock);
virtual void LogInternal(LogBufferElement&& elem) REQUIRES(logd_lock);
// Returns an iterator to the oldest element for a given log type, or logs_.end() if
// there are no logs for the given log type. Requires logs_lock_ to be held.
std::list<LogBufferElement>::iterator GetOldest(log_id_t log_id) REQUIRES(lock_);
// there are no logs for the given log type. Requires logs_logd_lock to be held.
std::list<LogBufferElement>::iterator GetOldest(log_id_t log_id) REQUIRES(logd_lock);
std::list<LogBufferElement>::iterator Erase(std::list<LogBufferElement>::iterator it)
REQUIRES(lock_);
REQUIRES(logd_lock);
void KickReader(LogReaderThread* reader, log_id_t id, unsigned long prune_rows)
REQUIRES_SHARED(lock_);
REQUIRES(logd_lock);
LogStatistics* stats() { return stats_; }
LogReaderList* reader_list() { return reader_list_; }
size_t max_size(log_id_t id) REQUIRES_SHARED(lock_) { return max_size_[id]; }
size_t max_size(log_id_t id) REQUIRES_SHARED(logd_lock) { return max_size_[id]; }
std::list<LogBufferElement>& logs() { return logs_; }
RwLock lock_;
private:
bool ShouldLog(log_id_t log_id, const char* msg, uint16_t len);
void MaybePrune(log_id_t id) REQUIRES(lock_);
void MaybePrune(log_id_t id) REQUIRES(logd_lock);
LogReaderList* reader_list_;
LogTags* tags_;
@ -75,9 +75,9 @@ class SimpleLogBuffer : public LogBuffer {
std::atomic<uint64_t> sequence_ = 1;
size_t max_size_[LOG_ID_MAX] GUARDED_BY(lock_);
std::list<LogBufferElement> logs_ GUARDED_BY(lock_);
size_t max_size_[LOG_ID_MAX] GUARDED_BY(logd_lock);
std::list<LogBufferElement> logs_ GUARDED_BY(logd_lock);
// Keeps track of the iterator to the oldest log message of a given log type, as an
// optimization when pruning logs. Use GetOldest() to retrieve.
std::optional<std::list<LogBufferElement>::iterator> oldest_[LOG_ID_MAX] GUARDED_BY(lock_);
std::optional<std::list<LogBufferElement>::iterator> oldest_[LOG_ID_MAX] GUARDED_BY(logd_lock);
};

View file

@ -126,7 +126,7 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
// Read out all of the logs.
{
auto lock = std::unique_lock{reader_list.reader_threads_lock()};
auto lock = std::unique_lock{logd_lock};
std::unique_ptr<LogWriter> test_writer(new NoopWriter());
std::unique_ptr<LogReaderThread> log_reader(
new LogReaderThread(log_buffer.get(), &reader_list, std::move(test_writer), true, 0,
@ -137,7 +137,7 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
// Wait until the reader has finished.
while (true) {
usleep(50);
auto lock = std::unique_lock{reader_list.reader_threads_lock()};
auto lock = std::unique_lock{logd_lock};
if (reader_list.reader_threads().size() == 0) {
break;
}

View file

@ -1,56 +0,0 @@
/*
* Copyright (C) 2020 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <pthread.h>
#include <android-base/macros.h>
#include <android-base/thread_annotations.h>
// As of the end of May 2020, std::shared_mutex is *not* simply a pthread_rwlock, but rather a
// combination of std::mutex and std::condition variable, which is obviously less efficient. This
// immitates what std::shared_mutex should be doing and is compatible with RAII thread wrappers.
class SHARED_CAPABILITY("mutex") RwLock {
public:
RwLock() {}
~RwLock() {}
void lock() ACQUIRE() { pthread_rwlock_wrlock(&rwlock_); }
void lock_shared() ACQUIRE_SHARED() { pthread_rwlock_rdlock(&rwlock_); }
void unlock() RELEASE() { pthread_rwlock_unlock(&rwlock_); }
private:
pthread_rwlock_t rwlock_ = PTHREAD_RWLOCK_INITIALIZER;
};
// std::shared_lock does not have thread annotations, so we need our own.
class SCOPED_CAPABILITY SharedLock {
public:
explicit SharedLock(RwLock& lock) ACQUIRE_SHARED(lock) : lock_(lock) { lock_.lock_shared(); }
~SharedLock() RELEASE() { lock_.unlock(); }
void lock_shared() ACQUIRE_SHARED() { lock_.lock_shared(); }
void unlock() RELEASE() { lock_.unlock(); }
DISALLOW_IMPLICIT_CONSTRUCTORS(SharedLock);
private:
RwLock& lock_;
};