logd: rework logic for LogTimeEntry

LogTimeEntry's lifecycle is spread out in various locations.  It
further seems incomplete as there is logic that assumes that its
associated thread can exit while the underlying LogTimeEntry remains
valid, however it doesn't appear that that is actually a supported
situation.

This change simplifies this logic to have only one valid state for a
LogTimeEntry: it must have its thread running and be present in
LastLogTimes.  A LogTimeEntry will never be placed into LastLogTimes
unless its thread is running and its thread will remove its associated
LogTimeEntry from LastLogTimes before it has exited.

This admittedly breaks situations where a blocking socket gets issued
multiple commands with different pid filters, tail lines, etc,
however, I'm reasonably sure that these situations were already
broken.  A check is added to close the socket in this case.

Test: multiple logcat instances work, logd.reader.per's are cleaned up
Change-Id: Ibe8651e7d530c5e9a8d6ce3150cd247982887cbe
This commit is contained in:
Tom Cherry 2018-10-08 17:33:50 -07:00
parent b3bc842750
commit 4f22786cc9
6 changed files with 71 additions and 192 deletions

View file

@ -42,7 +42,7 @@ void FlushCommand::runSocketCommand(SocketClient* client) {
LogTimeEntry::wrlock();
LastLogTimes::iterator it = times.begin();
while (it != times.end()) {
entry = (*it);
entry = it->get();
if (entry->mClient == client) {
if (!entry->isWatchingMultiple(mLogMask)) {
LogTimeEntry::unlock();
@ -63,31 +63,12 @@ void FlushCommand::runSocketCommand(SocketClient* client) {
}
}
entry->triggerReader_Locked();
if (entry->runningReader_Locked()) {
LogTimeEntry::unlock();
return;
}
entry->incRef_Locked();
break;
LogTimeEntry::unlock();
return;
}
it++;
}
if (it == times.end()) {
// Create LogTimeEntry in notifyNewLog() ?
if (mTail == (unsigned long)-1) {
LogTimeEntry::unlock();
return;
}
entry = new LogTimeEntry(mReader, client, mNonBlock, mTail, mLogMask,
mPid, mStart, mTimeout);
times.push_front(entry);
}
client->incRef();
// release client and entry reference counts once done
entry->startReader_Locked();
LogTimeEntry::unlock();
}

View file

@ -27,36 +27,11 @@ class LogReader;
class FlushCommand : public SocketClientCommand {
LogReader& mReader;
bool mNonBlock;
unsigned long mTail;
log_mask_t mLogMask;
pid_t mPid;
log_time mStart;
uint64_t mTimeout;
public:
// for opening a reader
explicit FlushCommand(LogReader& reader, bool nonBlock, unsigned long tail,
log_mask_t logMask, pid_t pid, log_time start,
uint64_t timeout)
: mReader(reader),
mNonBlock(nonBlock),
mTail(tail),
mLogMask(logMask),
mPid(pid),
mStart(start),
mTimeout((start != log_time::EPOCH) ? timeout : 0) {
}
// for notification of an update
explicit FlushCommand(LogReader& reader, log_mask_t logMask)
: mReader(reader),
mNonBlock(false),
mTail(-1),
mLogMask(logMask),
mPid(0),
mStart(log_time::EPOCH),
mTimeout(0) {
: mReader(reader), mLogMask(logMask) {
}
virtual void runSocketCommand(SocketClient* client);

View file

@ -105,10 +105,8 @@ void LogBuffer::init() {
LastLogTimes::iterator times = mTimes.begin();
while (times != mTimes.end()) {
LogTimeEntry* entry = (*times);
if (entry->owned_Locked()) {
entry->triggerReader_Locked();
}
LogTimeEntry* entry = times->get();
entry->triggerReader_Locked();
times++;
}
@ -409,17 +407,15 @@ void LogBuffer::log(LogBufferElement* elem) {
LastLogTimes::iterator times = mTimes.begin();
while (times != mTimes.end()) {
LogTimeEntry* entry = (*times);
if (entry->owned_Locked()) {
if (!entry->mNonBlock) {
end_always = true;
break;
}
// it passing mEnd is blocked by the following checks.
if (!end_set || (end <= entry->mEnd)) {
end = entry->mEnd;
end_set = true;
}
LogTimeEntry* entry = times->get();
if (!entry->mNonBlock) {
end_always = true;
break;
}
// it passing mEnd is blocked by the following checks.
if (!end_set || (end <= entry->mEnd)) {
end = entry->mEnd;
end_set = true;
}
times++;
}
@ -710,8 +706,8 @@ bool LogBuffer::prune(log_id_t id, unsigned long pruneRows, uid_t caller_uid) {
// Region locked?
LastLogTimes::iterator times = mTimes.begin();
while (times != mTimes.end()) {
LogTimeEntry* entry = (*times);
if (entry->owned_Locked() && entry->isWatching(id) &&
LogTimeEntry* entry = times->get();
if (entry->isWatching(id) &&
(!oldest || (oldest->mStart > entry->mStart) ||
((oldest->mStart == entry->mStart) &&
(entry->mTimeout.tv_sec || entry->mTimeout.tv_nsec)))) {
@ -1052,9 +1048,9 @@ bool LogBuffer::clear(log_id_t id, uid_t uid) {
LogTimeEntry::wrlock();
LastLogTimes::iterator times = mTimes.begin();
while (times != mTimes.end()) {
LogTimeEntry* entry = (*times);
LogTimeEntry* entry = times->get();
// Killer punch
if (entry->owned_Locked() && entry->isWatching(id)) {
if (entry->isWatching(id)) {
entry->release_Locked();
}
times++;

View file

@ -41,6 +41,7 @@ void LogReader::notifyNewLog(log_mask_t logMask) {
runOnEachSocket(&command);
}
// Note returning false will release the SocketClient instance.
bool LogReader::onDataAvailable(SocketClient* cli) {
static bool name_set;
if (!name_set) {
@ -57,6 +58,18 @@ bool LogReader::onDataAvailable(SocketClient* cli) {
}
buffer[len] = '\0';
// Clients are only allowed to send one command, disconnect them if they
// send another.
LogTimeEntry::wrlock();
for (const auto& entry : mLogbuf.mTimes) {
if (entry->mClient == cli) {
entry->release_Locked();
LogTimeEntry::unlock();
return false;
}
}
LogTimeEntry::unlock();
unsigned long tail = 0;
static const char _tail[] = " tail=";
char* cp = strstr(buffer, _tail);
@ -199,14 +212,25 @@ bool LogReader::onDataAvailable(SocketClient* cli) {
cli->getUid(), cli->getGid(), cli->getPid(), nonBlock ? 'n' : 'b', tail,
logMask, (int)pid, sequence.nsec(), timeout);
FlushCommand command(*this, nonBlock, tail, logMask, pid, sequence, timeout);
LogTimeEntry::wrlock();
auto entry = std::make_unique<LogTimeEntry>(
*this, cli, nonBlock, tail, logMask, pid, sequence, timeout);
if (!entry->startReader_Locked()) {
LogTimeEntry::unlock();
return false;
}
// release client and entry reference counts once done
cli->incRef();
mLogbuf.mTimes.emplace_front(std::move(entry));
// Set acceptable upper limit to wait for slow reader processing b/27242723
struct timeval t = { LOGD_SNDTIMEO, 0 };
setsockopt(cli->getSocket(), SOL_SOCKET, SO_SNDTIMEO, (const char*)&t,
sizeof(t));
command.runSocketCommand(cli);
LogTimeEntry::unlock();
return true;
}
@ -215,9 +239,8 @@ void LogReader::doSocketDelete(SocketClient* cli) {
LogTimeEntry::wrlock();
LastLogTimes::iterator it = times.begin();
while (it != times.end()) {
LogTimeEntry* entry = (*it);
LogTimeEntry* entry = it->get();
if (entry->mClient == cli) {
times.erase(it);
entry->release_Locked();
break;
}

View file

@ -30,11 +30,7 @@ pthread_mutex_t LogTimeEntry::timesLock = PTHREAD_MUTEX_INITIALIZER;
LogTimeEntry::LogTimeEntry(LogReader& reader, SocketClient* client,
bool nonBlock, unsigned long tail, log_mask_t logMask,
pid_t pid, log_time start, uint64_t timeout)
: mRefCount(1),
mRelease(false),
mError(false),
threadRunning(false),
leadingDropped(false),
: leadingDropped(false),
mReader(reader),
mLogMask(logMask),
mPid(pid),
@ -52,65 +48,21 @@ LogTimeEntry::LogTimeEntry(LogReader& reader, SocketClient* client,
cleanSkip_Locked();
}
void LogTimeEntry::startReader_Locked(void) {
bool LogTimeEntry::startReader_Locked() {
pthread_attr_t attr;
threadRunning = true;
if (!pthread_attr_init(&attr)) {
if (!pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED)) {
if (!pthread_create(&mThread, &attr, LogTimeEntry::threadStart,
this)) {
pthread_attr_destroy(&attr);
return;
return true;
}
}
pthread_attr_destroy(&attr);
}
threadRunning = false;
if (mClient) {
mClient->decRef();
}
decRef_Locked();
}
void LogTimeEntry::threadStop(void* obj) {
LogTimeEntry* me = reinterpret_cast<LogTimeEntry*>(obj);
wrlock();
if (me->mNonBlock) {
me->error_Locked();
}
SocketClient* client = me->mClient;
if (me->isError_Locked()) {
LogReader& reader = me->mReader;
LastLogTimes& times = reader.logbuf().mTimes;
LastLogTimes::iterator it = times.begin();
while (it != times.end()) {
if (*it == me) {
times.erase(it);
me->release_nodelete_Locked();
break;
}
it++;
}
me->mClient = nullptr;
reader.release(client);
}
if (client) {
client->decRef();
}
me->threadRunning = false;
me->decRef_Locked();
unlock();
return false;
}
void* LogTimeEntry::threadStart(void* obj) {
@ -118,13 +70,7 @@ void* LogTimeEntry::threadStart(void* obj) {
LogTimeEntry* me = reinterpret_cast<LogTimeEntry*>(obj);
pthread_cleanup_push(threadStop, obj);
SocketClient* client = me->mClient;
if (!client) {
me->error();
return nullptr;
}
LogBuffer& logbuf = me->mReader.logbuf();
@ -137,14 +83,14 @@ void* LogTimeEntry::threadStart(void* obj) {
log_time start = me->mStart;
while (me->threadRunning && !me->isError_Locked()) {
while (!me->mRelease) {
if (me->mTimeout.tv_sec || me->mTimeout.tv_nsec) {
if (pthread_cond_timedwait(&me->threadTriggeredCondition,
&timesLock, &me->mTimeout) == ETIMEDOUT) {
me->mTimeout.tv_sec = 0;
me->mTimeout.tv_nsec = 0;
}
if (!me->threadRunning || me->isError_Locked()) {
if (me->mRelease) {
break;
}
}
@ -162,13 +108,12 @@ void* LogTimeEntry::threadStart(void* obj) {
wrlock();
if (start == LogBufferElement::FLUSH_ERROR) {
me->error_Locked();
break;
}
me->mStart = start + log_time(0, 1);
if (me->mNonBlock || !me->threadRunning || me->isError_Locked()) {
if (me->mNonBlock || me->mRelease) {
break;
}
@ -179,9 +124,21 @@ void* LogTimeEntry::threadStart(void* obj) {
}
}
unlock();
LogReader& reader = me->mReader;
reader.release(client);
pthread_cleanup_pop(true);
client->decRef();
LastLogTimes& times = reader.logbuf().mTimes;
auto it =
std::find_if(times.begin(), times.end(),
[&me](const auto& other) { return other.get() == me; });
if (it != times.end()) {
times.erase(it);
}
unlock();
return nullptr;
}
@ -247,10 +204,6 @@ int LogTimeEntry::FilterSecondPass(const LogBufferElement* element, void* obj) {
goto skip;
}
if (me->isError_Locked()) {
goto stop;
}
if (!me->mTail) {
goto ok;
}

View file

@ -22,6 +22,7 @@
#include <time.h>
#include <list>
#include <memory>
#include <log/log.h>
#include <sysutils/SocketClient.h>
@ -33,16 +34,12 @@ class LogBufferElement;
class LogTimeEntry {
static pthread_mutex_t timesLock;
unsigned int mRefCount;
bool mRelease;
bool mError;
bool threadRunning;
bool mRelease = false;
bool leadingDropped;
pthread_cond_t threadTriggeredCondition;
pthread_t mThread;
LogReader& mReader;
static void* threadStart(void* me);
static void threadStop(void* me);
const log_mask_t mLogMask;
const pid_t mPid;
unsigned int skipAhead[LOG_ID_MAX];
@ -73,11 +70,8 @@ class LogTimeEntry {
pthread_mutex_unlock(&timesLock);
}
void startReader_Locked(void);
bool startReader_Locked();
bool runningReader_Locked(void) const {
return threadRunning || mRelease || mError || mNonBlock;
}
void triggerReader_Locked(void) {
pthread_cond_signal(&threadTriggeredCondition);
}
@ -87,54 +81,11 @@ class LogTimeEntry {
}
void cleanSkip_Locked(void);
// These called after LogTimeEntry removed from list, lock implicitly held
void release_nodelete_Locked(void) {
mRelease = true;
pthread_cond_signal(&threadTriggeredCondition);
// assumes caller code path will call decRef_Locked()
}
void release_Locked(void) {
mRelease = true;
pthread_cond_signal(&threadTriggeredCondition);
if (mRefCount || threadRunning) {
return;
}
// No one else is holding a reference to this
delete this;
}
// Called to mark socket in jeopardy
void error_Locked(void) {
mError = true;
}
void error(void) {
wrlock();
error_Locked();
unlock();
}
bool isError_Locked(void) const {
return mRelease || mError;
}
// Mark Used
// Locking implied, grabbed when protection around loop iteration
void incRef_Locked(void) {
++mRefCount;
}
bool owned_Locked(void) const {
return mRefCount != 0;
}
void decRef_Locked(void) {
if ((mRefCount && --mRefCount) || !mRelease || threadRunning) {
return;
}
// No one else is holding a reference to this
delete this;
}
bool isWatching(log_id_t id) const {
return mLogMask & (1 << id);
}
@ -146,6 +97,6 @@ class LogTimeEntry {
static int FilterSecondPass(const LogBufferElement* element, void* me);
};
typedef std::list<LogTimeEntry*> LastLogTimes;
typedef std::list<std::unique_ptr<LogTimeEntry>> LastLogTimes;
#endif // _LOGD_LOG_TIMES_H__