From 48d3115614a78ed3e52ab8118b9127cbabfd141c Mon Sep 17 00:00:00 2001 From: Mikhail Naganov Date: Sat, 30 Jul 2022 00:10:52 +0000 Subject: [PATCH] audio: Allow stopping a StreamWorker from the looping thread Enhance the return type of the 'workerCycle' to allow it exiting without inducing an error on the controller side. Also, put StreamWorker into a namespace. Bug: 205884982 Test: atest libaudioaidlcommon_test --iterations Merged-In: I3b27028b10f80f27985040cae8f8b0e6ab63ddad Change-Id: I3b27028b10f80f27985040cae8f8b0e6ab63ddad (cherry picked from commit 5021df71c713a01e619088f741ac88334fb0c25b) --- audio/aidl/common/include/StreamWorker.h | 43 ++++++++---- .../aidl/common/tests/streamworker_tests.cpp | 69 ++++++++++++++----- 2 files changed, 78 insertions(+), 34 deletions(-) diff --git a/audio/aidl/common/include/StreamWorker.h b/audio/aidl/common/include/StreamWorker.h index 776490493d..03685fcf29 100644 --- a/audio/aidl/common/include/StreamWorker.h +++ b/audio/aidl/common/include/StreamWorker.h @@ -29,11 +29,15 @@ #include #include +namespace android::hardware::audio::common { + template class StreamWorker { enum class WorkerState { STOPPED, RUNNING, PAUSE_REQUESTED, PAUSED, RESUME_REQUESTED }; public: + enum class WorkerStatus { ABORT, CONTINUE, EXIT }; + StreamWorker() = default; ~StreamWorker() { stop(); } // Note that 'priority' here is what is known as the 'nice number' in *nix systems. @@ -66,8 +70,7 @@ class StreamWorker { void stop() { { std::lock_guard lock(mWorkerLock); - if (mError.empty()) { - if (mWorkerState == WorkerState::STOPPED) return; + if (mWorkerState != WorkerState::STOPPED) { mWorkerState = WorkerState::STOPPED; mWorkerStateChangeRequest = true; } @@ -91,18 +94,22 @@ class StreamWorker { // Methods that need to be provided by subclasses: // - // Called once at the beginning of the thread loop. Must return - // an empty string to enter the thread loop, otherwise the thread loop - // exits and the worker switches into the 'error' state, setting - // the error to the returned value. + // /* Called once at the beginning of the thread loop. Must return + // * an empty string to enter the thread loop, otherwise the thread loop + // * exits and the worker switches into the 'error' state, setting + // * the error to the returned value. + // */ // std::string workerInit(); // - // Called for each thread loop unless the thread is in 'paused' state. - // Must return 'true' to continue running, otherwise the thread loop - // exits and the worker switches into the 'error' state with a generic - // error message. It is recommended that the subclass reports any - // problems via logging facilities. - // bool workerCycle(); + // /* Called for each thread loop unless the thread is in 'paused' state. + // * Must return 'CONTINUE' to continue running, otherwise the thread loop + // * exits. If the result from worker cycle is 'ABORT' then the worker switches + // * into the 'error' state with a generic error message. It is recommended that + // * the subclass reports any problems via logging facilities. Returning the 'EXIT' + // * status is equivalent to calling 'stop()' method. This is just a way of + // * of stopping the worker by its own initiative. + // */ + // WorkerStatus workerCycle(); private: void switchWorkerStateSync(WorkerState oldState, WorkerState newState, @@ -146,8 +153,10 @@ class StreamWorker { for (WorkerState state = WorkerState::RUNNING; state != WorkerState::STOPPED;) { bool needToNotify = false; - if (state != WorkerState::PAUSED ? static_cast(this)->workerCycle() - : (sched_yield(), true)) { + if (WorkerStatus status = state != WorkerState::PAUSED + ? static_cast(this)->workerCycle() + : (sched_yield(), WorkerStatus::CONTINUE); + status == WorkerStatus::CONTINUE) { { // See https://developer.android.com/training/articles/smp#nonracing android::base::ScopedLockAssertion lock_assertion(mWorkerLock); @@ -188,7 +197,9 @@ class StreamWorker { needToNotify = true; } state = mWorkerState = WorkerState::STOPPED; - mError = "workerCycle failed"; + if (status == WorkerStatus::ABORT) { + mError = "workerCycle aborted"; + } } if (needToNotify) { { @@ -218,3 +229,5 @@ class StreamWorker { static_assert(std::atomic::is_always_lock_free); std::atomic mWorkerStateChangeRequest GUARDED_BY(mWorkerLock) = false; }; + +} // namespace android::hardware::audio::common diff --git a/audio/aidl/common/tests/streamworker_tests.cpp b/audio/aidl/common/tests/streamworker_tests.cpp index 9fb1a8ee3f..df81c69716 100644 --- a/audio/aidl/common/tests/streamworker_tests.cpp +++ b/audio/aidl/common/tests/streamworker_tests.cpp @@ -26,14 +26,18 @@ #define LOG_TAG "StreamWorker_Test" #include -struct TestStream { - std::atomic error = false; -}; +using android::hardware::audio::common::StreamWorker; class TestWorker : public StreamWorker { public: + struct Stream { + void setErrorStatus() { status = WorkerStatus::ABORT; } + void setStopStatus() { status = WorkerStatus::EXIT; } + std::atomic status = WorkerStatus::CONTINUE; + }; + // Use nullptr to test error reporting from the worker thread. - explicit TestWorker(TestStream* stream) : mStream(stream) {} + explicit TestWorker(Stream* stream) : mStream(stream) {} size_t getWorkerCycles() const { return mWorkerCycles; } int getPriority() const { return mPriority; } @@ -45,16 +49,16 @@ class TestWorker : public StreamWorker { } std::string workerInit() { return mStream != nullptr ? "" : "Expected error"; } - bool workerCycle() { + WorkerStatus workerCycle() { mPriority = getpriority(PRIO_PROCESS, 0); do { mWorkerCycles++; } while (mWorkerCycles == 0); - return !mStream->error; + return mStream->status; } private: - TestStream* const mStream; + Stream* const mStream; std::atomic mWorkerCycles = 0; std::atomic mPriority = ANDROID_PRIORITY_DEFAULT; }; @@ -70,7 +74,8 @@ class StreamWorkerInvalidTest : public testing::TestWithParam { } protected: - StreamWorkerInvalidTest(TestStream* stream) : testing::TestWithParam(), worker(stream) {} + StreamWorkerInvalidTest(TestWorker::Stream* stream) + : testing::TestWithParam(), worker(stream) {} TestWorker worker; }; @@ -118,7 +123,7 @@ class StreamWorkerTest : public StreamWorkerInvalidTest { StreamWorkerTest() : StreamWorkerInvalidTest(&stream) {} protected: - TestStream stream; + TestWorker::Stream stream; }; static constexpr unsigned kWorkerIdleCheckTime = 50 * 1000; @@ -130,21 +135,47 @@ TEST_P(StreamWorkerTest, Uninitialized) { TEST_P(StreamWorkerTest, Start) { ASSERT_TRUE(worker.start()); + EXPECT_TRUE(worker.waitForAtLeastOneCycle()); + EXPECT_FALSE(worker.hasError()); +} + +TEST_P(StreamWorkerTest, StartStop) { + ASSERT_TRUE(worker.start()); + EXPECT_TRUE(worker.waitForAtLeastOneCycle()); + EXPECT_FALSE(worker.hasError()); + worker.stop(); + EXPECT_FALSE(worker.hasError()); +} + +TEST_P(StreamWorkerTest, WorkerExit) { + ASSERT_TRUE(worker.start()); + stream.setStopStatus(); worker.waitForAtLeastOneCycle(); EXPECT_FALSE(worker.hasError()); + EXPECT_TRUE(worker.hasNoWorkerCycleCalled(kWorkerIdleCheckTime)); } TEST_P(StreamWorkerTest, WorkerError) { ASSERT_TRUE(worker.start()); - stream.error = true; + stream.setErrorStatus(); worker.waitForAtLeastOneCycle(); EXPECT_TRUE(worker.hasError()); EXPECT_TRUE(worker.hasNoWorkerCycleCalled(kWorkerIdleCheckTime)); } +TEST_P(StreamWorkerTest, StopAfterError) { + ASSERT_TRUE(worker.start()); + stream.setErrorStatus(); + worker.waitForAtLeastOneCycle(); + EXPECT_TRUE(worker.hasError()); + EXPECT_TRUE(worker.hasNoWorkerCycleCalled(kWorkerIdleCheckTime)); + worker.stop(); + EXPECT_TRUE(worker.hasError()); +} + TEST_P(StreamWorkerTest, PauseResume) { ASSERT_TRUE(worker.start()); - worker.waitForAtLeastOneCycle(); + EXPECT_TRUE(worker.waitForAtLeastOneCycle()); EXPECT_FALSE(worker.hasError()); worker.pause(); EXPECT_TRUE(worker.hasNoWorkerCycleCalled(kWorkerIdleCheckTime)); @@ -158,7 +189,7 @@ TEST_P(StreamWorkerTest, PauseResume) { TEST_P(StreamWorkerTest, StopPaused) { ASSERT_TRUE(worker.start()); - worker.waitForAtLeastOneCycle(); + EXPECT_TRUE(worker.waitForAtLeastOneCycle()); EXPECT_FALSE(worker.hasError()); worker.pause(); worker.stop(); @@ -167,7 +198,7 @@ TEST_P(StreamWorkerTest, StopPaused) { TEST_P(StreamWorkerTest, PauseAfterErrorIgnored) { ASSERT_TRUE(worker.start()); - stream.error = true; + stream.setErrorStatus(); worker.waitForAtLeastOneCycle(); EXPECT_TRUE(worker.hasError()); worker.pause(); @@ -177,7 +208,7 @@ TEST_P(StreamWorkerTest, PauseAfterErrorIgnored) { TEST_P(StreamWorkerTest, ResumeAfterErrorIgnored) { ASSERT_TRUE(worker.start()); - stream.error = true; + stream.setErrorStatus(); worker.waitForAtLeastOneCycle(); EXPECT_TRUE(worker.hasError()); worker.resume(); @@ -187,11 +218,11 @@ TEST_P(StreamWorkerTest, ResumeAfterErrorIgnored) { TEST_P(StreamWorkerTest, WorkerErrorOnResume) { ASSERT_TRUE(worker.start()); - worker.waitForAtLeastOneCycle(); + EXPECT_TRUE(worker.waitForAtLeastOneCycle()); EXPECT_FALSE(worker.hasError()); worker.pause(); EXPECT_FALSE(worker.hasError()); - stream.error = true; + stream.setErrorStatus(); EXPECT_FALSE(worker.hasError()); worker.resume(); worker.waitForAtLeastOneCycle(); @@ -208,7 +239,7 @@ TEST_P(StreamWorkerTest, WaitForAtLeastOneCycle) { TEST_P(StreamWorkerTest, WaitForAtLeastOneCycleError) { ASSERT_TRUE(worker.start()); - stream.error = true; + stream.setErrorStatus(); EXPECT_FALSE(worker.waitForAtLeastOneCycle()); } @@ -220,7 +251,7 @@ TEST_P(StreamWorkerTest, MutexDoesNotBlockWorker) { usleep(kWorkerIdleCheckTime); } worker.testLockUnlockMutex(false); - worker.waitForAtLeastOneCycle(); + EXPECT_TRUE(worker.waitForAtLeastOneCycle()); EXPECT_FALSE(worker.hasError()); } @@ -235,7 +266,7 @@ TEST_P(StreamWorkerTest, ThreadName) { TEST_P(StreamWorkerTest, ThreadPriority) { const int priority = ANDROID_PRIORITY_LOWEST; ASSERT_TRUE(worker.start("", priority)) << worker.getError(); - worker.waitForAtLeastOneCycle(); + EXPECT_TRUE(worker.waitForAtLeastOneCycle()); EXPECT_EQ(priority, worker.getPriority()); }