audio: Prevent priority inversions in aidl StreamWorker am: 0c174e9133 am: d6070e0abe am: e1c6613fc9 am: c69c80cdae

Original change: https://android-review.googlesource.com/c/platform/hardware/interfaces/+/2197181

Change-Id: I95f0ac43c1bd0242c062973ce12e6c6d3fe1e02e
Signed-off-by: Automerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>
This commit is contained in:
Mikhail Naganov 2022-08-26 17:16:16 +00:00 committed by Automerger Merge Worker
commit 58186fa095
2 changed files with 40 additions and 1 deletions

View file

@ -18,6 +18,7 @@
#include <sched.h>
#include <atomic>
#include <condition_variable>
#include <mutex>
#include <thread>
@ -39,6 +40,7 @@ class StreamWorker {
android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
return mWorkerState != WorkerState::STOPPED;
});
mWorkerStateChangeRequest = false;
return mWorkerState == WorkerState::RUNNING;
}
void pause() { switchWorkerStateSync(WorkerState::RUNNING, WorkerState::PAUSE_REQUESTED); }
@ -52,6 +54,7 @@ class StreamWorker {
std::lock_guard<std::mutex> lock(mWorkerLock);
if (mWorkerState == WorkerState::STOPPED) return;
mWorkerState = WorkerState::STOPPED;
mWorkerStateChangeRequest = true;
}
if (mWorker.joinable()) {
mWorker.join();
@ -64,6 +67,10 @@ class StreamWorker {
switchWorkerStateSync(newState, WorkerState::RESUME_REQUESTED, &newState);
return newState == WorkerState::RUNNING;
}
// Only used by unit tests.
void testLockUnlockMutex(bool lock) NO_THREAD_SAFETY_ANALYSIS {
lock ? mWorkerLock.lock() : mWorkerLock.unlock();
}
// Methods that need to be provided by subclasses:
//
@ -87,6 +94,7 @@ class StreamWorker {
return;
}
mWorkerState = newState;
mWorkerStateChangeRequest = true;
mWorkerCv.wait(lock, [&]() {
android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
return mWorkerState != newState;
@ -106,6 +114,11 @@ class StreamWorker {
bool needToNotify = false;
if (state != WorkerState::PAUSED ? static_cast<Impl*>(this)->workerCycle()
: (sched_yield(), true)) {
{
// See https://developer.android.com/training/articles/smp#nonracing
android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
if (!mWorkerStateChangeRequest.load(std::memory_order_relaxed)) continue;
}
//
// Pause and resume are synchronous. One worker cycle must complete
// before the worker indicates a state change. This is how 'mWorkerState' and
@ -123,10 +136,10 @@ class StreamWorker {
// first workerCycle gets executed, the code below triggers a client notification
// (or if workerCycle fails, worker enters 'error' state and also notifies)
// state := mWorkerState (RUNNING)
std::lock_guard<std::mutex> lock(mWorkerLock);
if (state == WorkerState::RESUME_REQUESTED) {
needToNotify = true;
}
std::lock_guard<std::mutex> lock(mWorkerLock);
state = mWorkerState;
if (mWorkerState == WorkerState::PAUSE_REQUESTED) {
state = mWorkerState = WorkerState::PAUSED;
@ -144,6 +157,10 @@ class StreamWorker {
state = WorkerState::STOPPED;
}
if (needToNotify) {
{
std::lock_guard<std::mutex> lock(mWorkerLock);
mWorkerStateChangeRequest = false;
}
mWorkerCv.notify_one();
}
}
@ -153,4 +170,14 @@ class StreamWorker {
std::mutex mWorkerLock;
std::condition_variable mWorkerCv;
WorkerState mWorkerState GUARDED_BY(mWorkerLock) = WorkerState::STOPPED;
// The atomic lock-free variable is used to prevent priority inversions
// that can occur when a high priority worker tries to acquire the lock
// which has been taken by a lower priority control thread which in its turn
// got preempted. To prevent a PI under normal operating conditions, that is,
// when there are no errors or state changes, the worker does not attempt
// taking `mWorkerLock` unless `mWorkerStateChangeRequest` is set.
// To make sure that updates to `mWorkerState` and `mWorkerStateChangeRequest`
// are serialized, they are always made under a lock.
static_assert(std::atomic<bool>::is_always_lock_free);
std::atomic<bool> mWorkerStateChangeRequest GUARDED_BY(mWorkerLock) = false;
};

View file

@ -207,4 +207,16 @@ TEST_P(StreamWorkerTest, WaitForAtLeastOneCycleError) {
EXPECT_FALSE(worker.waitForAtLeastOneCycle());
}
TEST_P(StreamWorkerTest, MutexDoesNotBlockWorker) {
ASSERT_TRUE(worker.start());
const size_t workerCyclesBefore = worker.getWorkerCycles();
worker.testLockUnlockMutex(true);
while (worker.getWorkerCycles() == workerCyclesBefore) {
usleep(kWorkerIdleCheckTime);
}
worker.testLockUnlockMutex(false);
worker.waitForAtLeastOneCycle();
EXPECT_FALSE(worker.hasError());
}
INSTANTIATE_TEST_SUITE_P(StreamWorker, StreamWorkerTest, testing::Bool());