diff --git a/audio/aidl/common/Android.bp b/audio/aidl/common/Android.bp index 6a1c4a4677..37da9d66ec 100644 --- a/audio/aidl/common/Android.bp +++ b/audio/aidl/common/Android.bp @@ -30,9 +30,11 @@ cc_library_headers { export_include_dirs: ["include"], header_libs: [ "libbase_headers", + "libsystem_headers", ], export_header_lib_headers: [ "libbase_headers", + "libsystem_headers", ], } diff --git a/audio/aidl/common/include/StreamWorker.h b/audio/aidl/common/include/StreamWorker.h index 74e99df4cd..776490493d 100644 --- a/audio/aidl/common/include/StreamWorker.h +++ b/audio/aidl/common/include/StreamWorker.h @@ -16,29 +16,39 @@ #pragma once +#include #include +#include #include #include #include +#include #include #include +#include template class StreamWorker { - enum class WorkerState { STOPPED, RUNNING, PAUSE_REQUESTED, PAUSED, RESUME_REQUESTED, ERROR }; + enum class WorkerState { STOPPED, RUNNING, PAUSE_REQUESTED, PAUSED, RESUME_REQUESTED }; public: StreamWorker() = default; ~StreamWorker() { stop(); } - bool start() { + // Note that 'priority' here is what is known as the 'nice number' in *nix systems. + // The nice number is used with the default scheduler. For threads that + // need to use a specialized scheduler (e.g. SCHED_FIFO) and set the priority within it, + // it is recommended to implement an appropriate configuration sequence within `workerInit`. + bool start(const std::string& name = "", int priority = ANDROID_PRIORITY_DEFAULT) { + mThreadName = name; + mThreadPriority = priority; mWorker = std::thread(&StreamWorker::workerThread, this); std::unique_lock lock(mWorkerLock); android::base::ScopedLockAssertion lock_assertion(mWorkerLock); mWorkerCv.wait(lock, [&]() { android::base::ScopedLockAssertion lock_assertion(mWorkerLock); - return mWorkerState != WorkerState::STOPPED; + return mWorkerState == WorkerState::RUNNING || !mError.empty(); }); mWorkerStateChangeRequest = false; return mWorkerState == WorkerState::RUNNING; @@ -47,14 +57,20 @@ class StreamWorker { void resume() { switchWorkerStateSync(WorkerState::PAUSED, WorkerState::RESUME_REQUESTED); } bool hasError() { std::lock_guard lock(mWorkerLock); - return mWorkerState == WorkerState::ERROR; + return !mError.empty(); + } + std::string getError() { + std::lock_guard lock(mWorkerLock); + return mError; } void stop() { { std::lock_guard lock(mWorkerLock); - if (mWorkerState == WorkerState::STOPPED) return; - mWorkerState = WorkerState::STOPPED; - mWorkerStateChangeRequest = true; + if (mError.empty()) { + if (mWorkerState == WorkerState::STOPPED) return; + mWorkerState = WorkerState::STOPPED; + mWorkerStateChangeRequest = true; + } } if (mWorker.joinable()) { mWorker.join(); @@ -71,17 +87,21 @@ class StreamWorker { void testLockUnlockMutex(bool lock) NO_THREAD_SAFETY_ANALYSIS { lock ? mWorkerLock.lock() : mWorkerLock.unlock(); } + std::thread::native_handle_type testGetThreadNativeHandle() { return mWorker.native_handle(); } // Methods that need to be provided by subclasses: // // Called once at the beginning of the thread loop. Must return - // 'true' to enter the thread loop, otherwise the thread loop - // exits and the worker switches into the 'error' state. - // bool workerInit(); + // an empty string to enter the thread loop, otherwise the thread loop + // exits and the worker switches into the 'error' state, setting + // the error to the returned value. + // std::string workerInit(); // // Called for each thread loop unless the thread is in 'paused' state. // Must return 'true' to continue running, otherwise the thread loop - // exits and the worker switches into the 'error' state. + // exits and the worker switches into the 'error' state with a generic + // error message. It is recommended that the subclass reports any + // problems via logging facilities. // bool workerCycle(); private: @@ -102,13 +122,27 @@ class StreamWorker { if (finalState) *finalState = mWorkerState; } void workerThread() { - bool success = static_cast(this)->workerInit(); + std::string error = static_cast(this)->workerInit(); + if (error.empty() && !mThreadName.empty()) { + std::string compliantName(mThreadName.substr(0, 15)); + if (int errCode = pthread_setname_np(pthread_self(), compliantName.c_str()); + errCode != 0) { + error.append("Failed to set thread name: ").append(strerror(errCode)); + } + } + if (error.empty() && mThreadPriority != ANDROID_PRIORITY_DEFAULT) { + if (int result = setpriority(PRIO_PROCESS, 0, mThreadPriority); result != 0) { + int errCode = errno; + error.append("Failed to set thread priority: ").append(strerror(errCode)); + } + } { std::lock_guard lock(mWorkerLock); - mWorkerState = success ? WorkerState::RUNNING : WorkerState::ERROR; + mWorkerState = error.empty() ? WorkerState::RUNNING : WorkerState::STOPPED; + mError = error; } mWorkerCv.notify_one(); - if (!success) return; + if (!error.empty()) return; for (WorkerState state = WorkerState::RUNNING; state != WorkerState::STOPPED;) { bool needToNotify = false; @@ -153,8 +187,8 @@ class StreamWorker { mWorkerState == WorkerState::PAUSE_REQUESTED) { needToNotify = true; } - mWorkerState = WorkerState::ERROR; - state = WorkerState::STOPPED; + state = mWorkerState = WorkerState::STOPPED; + mError = "workerCycle failed"; } if (needToNotify) { { @@ -166,10 +200,13 @@ class StreamWorker { } } + std::string mThreadName; + int mThreadPriority = ANDROID_PRIORITY_DEFAULT; std::thread mWorker; std::mutex mWorkerLock; std::condition_variable mWorkerCv; WorkerState mWorkerState GUARDED_BY(mWorkerLock) = WorkerState::STOPPED; + std::string mError GUARDED_BY(mWorkerLock); // The atomic lock-free variable is used to prevent priority inversions // that can occur when a high priority worker tries to acquire the lock // which has been taken by a lower priority control thread which in its turn diff --git a/audio/aidl/common/tests/streamworker_tests.cpp b/audio/aidl/common/tests/streamworker_tests.cpp index c9d3dbd5ee..9fb1a8ee3f 100644 --- a/audio/aidl/common/tests/streamworker_tests.cpp +++ b/audio/aidl/common/tests/streamworker_tests.cpp @@ -14,8 +14,10 @@ * limitations under the License. */ +#include #include #include + #include #include @@ -34,6 +36,7 @@ class TestWorker : public StreamWorker { explicit TestWorker(TestStream* stream) : mStream(stream) {} size_t getWorkerCycles() const { return mWorkerCycles; } + int getPriority() const { return mPriority; } bool hasWorkerCycleCalled() const { return mWorkerCycles != 0; } bool hasNoWorkerCycleCalled(useconds_t usec) { const size_t cyclesBefore = mWorkerCycles; @@ -41,8 +44,9 @@ class TestWorker : public StreamWorker { return mWorkerCycles == cyclesBefore; } - bool workerInit() { return mStream; } + std::string workerInit() { return mStream != nullptr ? "" : "Expected error"; } bool workerCycle() { + mPriority = getpriority(PRIO_PROCESS, 0); do { mWorkerCycles++; } while (mWorkerCycles == 0); @@ -52,6 +56,7 @@ class TestWorker : public StreamWorker { private: TestStream* const mStream; std::atomic mWorkerCycles = 0; + std::atomic mPriority = ANDROID_PRIORITY_DEFAULT; }; // The parameter specifies whether an extra call to 'stop' is made at the end. @@ -219,4 +224,19 @@ TEST_P(StreamWorkerTest, MutexDoesNotBlockWorker) { EXPECT_FALSE(worker.hasError()); } +TEST_P(StreamWorkerTest, ThreadName) { + const std::string workerName = "TestWorker"; + ASSERT_TRUE(worker.start(workerName)) << worker.getError(); + char nameBuf[128]; + ASSERT_EQ(0, pthread_getname_np(worker.testGetThreadNativeHandle(), nameBuf, sizeof(nameBuf))); + EXPECT_EQ(workerName, nameBuf); +} + +TEST_P(StreamWorkerTest, ThreadPriority) { + const int priority = ANDROID_PRIORITY_LOWEST; + ASSERT_TRUE(worker.start("", priority)) << worker.getError(); + worker.waitForAtLeastOneCycle(); + EXPECT_EQ(priority, worker.getPriority()); +} + INSTANTIATE_TEST_SUITE_P(StreamWorker, StreamWorkerTest, testing::Bool());