Merge "logd: rework logic for LogTimeEntry"

This commit is contained in:
Tom Cherry 2018-10-15 17:57:06 +00:00 committed by Gerrit Code Review
commit 6e52c23a3e
6 changed files with 71 additions and 192 deletions

View file

@ -42,7 +42,7 @@ void FlushCommand::runSocketCommand(SocketClient* client) {
LogTimeEntry::wrlock();
LastLogTimes::iterator it = times.begin();
while (it != times.end()) {
entry = (*it);
entry = it->get();
if (entry->mClient == client) {
if (!entry->isWatchingMultiple(mLogMask)) {
LogTimeEntry::unlock();
@ -63,31 +63,12 @@ void FlushCommand::runSocketCommand(SocketClient* client) {
}
}
entry->triggerReader_Locked();
if (entry->runningReader_Locked()) {
LogTimeEntry::unlock();
return;
}
entry->incRef_Locked();
break;
LogTimeEntry::unlock();
return;
}
it++;
}
if (it == times.end()) {
// Create LogTimeEntry in notifyNewLog() ?
if (mTail == (unsigned long)-1) {
LogTimeEntry::unlock();
return;
}
entry = new LogTimeEntry(mReader, client, mNonBlock, mTail, mLogMask,
mPid, mStart, mTimeout);
times.push_front(entry);
}
client->incRef();
// release client and entry reference counts once done
entry->startReader_Locked();
LogTimeEntry::unlock();
}

View file

@ -27,36 +27,11 @@ class LogReader;
class FlushCommand : public SocketClientCommand {
LogReader& mReader;
bool mNonBlock;
unsigned long mTail;
log_mask_t mLogMask;
pid_t mPid;
log_time mStart;
uint64_t mTimeout;
public:
// for opening a reader
explicit FlushCommand(LogReader& reader, bool nonBlock, unsigned long tail,
log_mask_t logMask, pid_t pid, log_time start,
uint64_t timeout)
: mReader(reader),
mNonBlock(nonBlock),
mTail(tail),
mLogMask(logMask),
mPid(pid),
mStart(start),
mTimeout((start != log_time::EPOCH) ? timeout : 0) {
}
// for notification of an update
explicit FlushCommand(LogReader& reader, log_mask_t logMask)
: mReader(reader),
mNonBlock(false),
mTail(-1),
mLogMask(logMask),
mPid(0),
mStart(log_time::EPOCH),
mTimeout(0) {
: mReader(reader), mLogMask(logMask) {
}
virtual void runSocketCommand(SocketClient* client);

View file

@ -105,10 +105,8 @@ void LogBuffer::init() {
LastLogTimes::iterator times = mTimes.begin();
while (times != mTimes.end()) {
LogTimeEntry* entry = (*times);
if (entry->owned_Locked()) {
entry->triggerReader_Locked();
}
LogTimeEntry* entry = times->get();
entry->triggerReader_Locked();
times++;
}
@ -409,17 +407,15 @@ void LogBuffer::log(LogBufferElement* elem) {
LastLogTimes::iterator times = mTimes.begin();
while (times != mTimes.end()) {
LogTimeEntry* entry = (*times);
if (entry->owned_Locked()) {
if (!entry->mNonBlock) {
end_always = true;
break;
}
// it passing mEnd is blocked by the following checks.
if (!end_set || (end <= entry->mEnd)) {
end = entry->mEnd;
end_set = true;
}
LogTimeEntry* entry = times->get();
if (!entry->mNonBlock) {
end_always = true;
break;
}
// it passing mEnd is blocked by the following checks.
if (!end_set || (end <= entry->mEnd)) {
end = entry->mEnd;
end_set = true;
}
times++;
}
@ -710,8 +706,8 @@ bool LogBuffer::prune(log_id_t id, unsigned long pruneRows, uid_t caller_uid) {
// Region locked?
LastLogTimes::iterator times = mTimes.begin();
while (times != mTimes.end()) {
LogTimeEntry* entry = (*times);
if (entry->owned_Locked() && entry->isWatching(id) &&
LogTimeEntry* entry = times->get();
if (entry->isWatching(id) &&
(!oldest || (oldest->mStart > entry->mStart) ||
((oldest->mStart == entry->mStart) &&
(entry->mTimeout.tv_sec || entry->mTimeout.tv_nsec)))) {
@ -1052,9 +1048,9 @@ bool LogBuffer::clear(log_id_t id, uid_t uid) {
LogTimeEntry::wrlock();
LastLogTimes::iterator times = mTimes.begin();
while (times != mTimes.end()) {
LogTimeEntry* entry = (*times);
LogTimeEntry* entry = times->get();
// Killer punch
if (entry->owned_Locked() && entry->isWatching(id)) {
if (entry->isWatching(id)) {
entry->release_Locked();
}
times++;

View file

@ -41,6 +41,7 @@ void LogReader::notifyNewLog(log_mask_t logMask) {
runOnEachSocket(&command);
}
// Note returning false will release the SocketClient instance.
bool LogReader::onDataAvailable(SocketClient* cli) {
static bool name_set;
if (!name_set) {
@ -57,6 +58,18 @@ bool LogReader::onDataAvailable(SocketClient* cli) {
}
buffer[len] = '\0';
// Clients are only allowed to send one command, disconnect them if they
// send another.
LogTimeEntry::wrlock();
for (const auto& entry : mLogbuf.mTimes) {
if (entry->mClient == cli) {
entry->release_Locked();
LogTimeEntry::unlock();
return false;
}
}
LogTimeEntry::unlock();
unsigned long tail = 0;
static const char _tail[] = " tail=";
char* cp = strstr(buffer, _tail);
@ -199,14 +212,25 @@ bool LogReader::onDataAvailable(SocketClient* cli) {
cli->getUid(), cli->getGid(), cli->getPid(), nonBlock ? 'n' : 'b', tail,
logMask, (int)pid, sequence.nsec(), timeout);
FlushCommand command(*this, nonBlock, tail, logMask, pid, sequence, timeout);
LogTimeEntry::wrlock();
auto entry = std::make_unique<LogTimeEntry>(
*this, cli, nonBlock, tail, logMask, pid, sequence, timeout);
if (!entry->startReader_Locked()) {
LogTimeEntry::unlock();
return false;
}
// release client and entry reference counts once done
cli->incRef();
mLogbuf.mTimes.emplace_front(std::move(entry));
// Set acceptable upper limit to wait for slow reader processing b/27242723
struct timeval t = { LOGD_SNDTIMEO, 0 };
setsockopt(cli->getSocket(), SOL_SOCKET, SO_SNDTIMEO, (const char*)&t,
sizeof(t));
command.runSocketCommand(cli);
LogTimeEntry::unlock();
return true;
}
@ -215,9 +239,8 @@ void LogReader::doSocketDelete(SocketClient* cli) {
LogTimeEntry::wrlock();
LastLogTimes::iterator it = times.begin();
while (it != times.end()) {
LogTimeEntry* entry = (*it);
LogTimeEntry* entry = it->get();
if (entry->mClient == cli) {
times.erase(it);
entry->release_Locked();
break;
}

View file

@ -30,11 +30,7 @@ pthread_mutex_t LogTimeEntry::timesLock = PTHREAD_MUTEX_INITIALIZER;
LogTimeEntry::LogTimeEntry(LogReader& reader, SocketClient* client,
bool nonBlock, unsigned long tail, log_mask_t logMask,
pid_t pid, log_time start, uint64_t timeout)
: mRefCount(1),
mRelease(false),
mError(false),
threadRunning(false),
leadingDropped(false),
: leadingDropped(false),
mReader(reader),
mLogMask(logMask),
mPid(pid),
@ -52,65 +48,21 @@ LogTimeEntry::LogTimeEntry(LogReader& reader, SocketClient* client,
cleanSkip_Locked();
}
void LogTimeEntry::startReader_Locked(void) {
bool LogTimeEntry::startReader_Locked() {
pthread_attr_t attr;
threadRunning = true;
if (!pthread_attr_init(&attr)) {
if (!pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED)) {
if (!pthread_create(&mThread, &attr, LogTimeEntry::threadStart,
this)) {
pthread_attr_destroy(&attr);
return;
return true;
}
}
pthread_attr_destroy(&attr);
}
threadRunning = false;
if (mClient) {
mClient->decRef();
}
decRef_Locked();
}
void LogTimeEntry::threadStop(void* obj) {
LogTimeEntry* me = reinterpret_cast<LogTimeEntry*>(obj);
wrlock();
if (me->mNonBlock) {
me->error_Locked();
}
SocketClient* client = me->mClient;
if (me->isError_Locked()) {
LogReader& reader = me->mReader;
LastLogTimes& times = reader.logbuf().mTimes;
LastLogTimes::iterator it = times.begin();
while (it != times.end()) {
if (*it == me) {
times.erase(it);
me->release_nodelete_Locked();
break;
}
it++;
}
me->mClient = nullptr;
reader.release(client);
}
if (client) {
client->decRef();
}
me->threadRunning = false;
me->decRef_Locked();
unlock();
return false;
}
void* LogTimeEntry::threadStart(void* obj) {
@ -118,13 +70,7 @@ void* LogTimeEntry::threadStart(void* obj) {
LogTimeEntry* me = reinterpret_cast<LogTimeEntry*>(obj);
pthread_cleanup_push(threadStop, obj);
SocketClient* client = me->mClient;
if (!client) {
me->error();
return nullptr;
}
LogBuffer& logbuf = me->mReader.logbuf();
@ -137,14 +83,14 @@ void* LogTimeEntry::threadStart(void* obj) {
log_time start = me->mStart;
while (me->threadRunning && !me->isError_Locked()) {
while (!me->mRelease) {
if (me->mTimeout.tv_sec || me->mTimeout.tv_nsec) {
if (pthread_cond_timedwait(&me->threadTriggeredCondition,
&timesLock, &me->mTimeout) == ETIMEDOUT) {
me->mTimeout.tv_sec = 0;
me->mTimeout.tv_nsec = 0;
}
if (!me->threadRunning || me->isError_Locked()) {
if (me->mRelease) {
break;
}
}
@ -162,13 +108,12 @@ void* LogTimeEntry::threadStart(void* obj) {
wrlock();
if (start == LogBufferElement::FLUSH_ERROR) {
me->error_Locked();
break;
}
me->mStart = start + log_time(0, 1);
if (me->mNonBlock || !me->threadRunning || me->isError_Locked()) {
if (me->mNonBlock || me->mRelease) {
break;
}
@ -179,9 +124,21 @@ void* LogTimeEntry::threadStart(void* obj) {
}
}
unlock();
LogReader& reader = me->mReader;
reader.release(client);
pthread_cleanup_pop(true);
client->decRef();
LastLogTimes& times = reader.logbuf().mTimes;
auto it =
std::find_if(times.begin(), times.end(),
[&me](const auto& other) { return other.get() == me; });
if (it != times.end()) {
times.erase(it);
}
unlock();
return nullptr;
}
@ -247,10 +204,6 @@ int LogTimeEntry::FilterSecondPass(const LogBufferElement* element, void* obj) {
goto skip;
}
if (me->isError_Locked()) {
goto stop;
}
if (!me->mTail) {
goto ok;
}

View file

@ -22,6 +22,7 @@
#include <time.h>
#include <list>
#include <memory>
#include <log/log.h>
#include <sysutils/SocketClient.h>
@ -33,16 +34,12 @@ class LogBufferElement;
class LogTimeEntry {
static pthread_mutex_t timesLock;
unsigned int mRefCount;
bool mRelease;
bool mError;
bool threadRunning;
bool mRelease = false;
bool leadingDropped;
pthread_cond_t threadTriggeredCondition;
pthread_t mThread;
LogReader& mReader;
static void* threadStart(void* me);
static void threadStop(void* me);
const log_mask_t mLogMask;
const pid_t mPid;
unsigned int skipAhead[LOG_ID_MAX];
@ -73,11 +70,8 @@ class LogTimeEntry {
pthread_mutex_unlock(&timesLock);
}
void startReader_Locked(void);
bool startReader_Locked();
bool runningReader_Locked(void) const {
return threadRunning || mRelease || mError || mNonBlock;
}
void triggerReader_Locked(void) {
pthread_cond_signal(&threadTriggeredCondition);
}
@ -87,54 +81,11 @@ class LogTimeEntry {
}
void cleanSkip_Locked(void);
// These called after LogTimeEntry removed from list, lock implicitly held
void release_nodelete_Locked(void) {
mRelease = true;
pthread_cond_signal(&threadTriggeredCondition);
// assumes caller code path will call decRef_Locked()
}
void release_Locked(void) {
mRelease = true;
pthread_cond_signal(&threadTriggeredCondition);
if (mRefCount || threadRunning) {
return;
}
// No one else is holding a reference to this
delete this;
}
// Called to mark socket in jeopardy
void error_Locked(void) {
mError = true;
}
void error(void) {
wrlock();
error_Locked();
unlock();
}
bool isError_Locked(void) const {
return mRelease || mError;
}
// Mark Used
// Locking implied, grabbed when protection around loop iteration
void incRef_Locked(void) {
++mRefCount;
}
bool owned_Locked(void) const {
return mRefCount != 0;
}
void decRef_Locked(void) {
if ((mRefCount && --mRefCount) || !mRelease || threadRunning) {
return;
}
// No one else is holding a reference to this
delete this;
}
bool isWatching(log_id_t id) const {
return mLogMask & (1 << id);
}
@ -146,6 +97,6 @@ class LogTimeEntry {
static int FilterSecondPass(const LogBufferElement* element, void* me);
};
typedef std::list<LogTimeEntry*> LastLogTimes;
typedef std::list<std::unique_ptr<LogTimeEntry>> LastLogTimes;
#endif // _LOGD_LOG_TIMES_H__