From 0b9c5feed1ebc5151acbb8146a4988584e34463f Mon Sep 17 00:00:00 2001 From: Mikhail Naganov Date: Mon, 8 Aug 2022 18:28:36 +0000 Subject: [PATCH] audio: Fix the lifetime of the StreamWorker's logic part Fix the mistake of making StreamWorker to inherit from the part which provides actual thread logic (Impl). The lifetime of the logic object must be longer than the lifetime of the StreamWorker's thread. Otherwise, the thread could still have running while the logic has already been destroyed (consider the order of destructors in C++ class inheritance). With this fix, the StreamWorker class does not have to be a template anymore, thus reorganize the code to move big methods into a .cpp file. Bug: 205884982 Test: atest libaudioaidlcommon_test --iterations Merged-In: I5bc2c8fd9d78a0fbc9fddab67456cc5214584045 Change-Id: I5bc2c8fd9d78a0fbc9fddab67456cc5214584045 (cherry picked from commmit 84024eccee71607c13bf13d7de80947efca4de50) Change-Id: I70958f437657b574cda6480c3216a0b1ea252433 --- audio/aidl/common/Android.bp | 7 +- audio/aidl/common/StreamWorker.cpp | 160 ++++++++++++ audio/aidl/common/include/StreamWorker.h | 246 ++++++------------ .../aidl/common/tests/streamworker_tests.cpp | 19 +- audio/aidl/default/Android.bp | 1 + audio/aidl/vts/Android.bp | 1 + 6 files changed, 262 insertions(+), 172 deletions(-) create mode 100644 audio/aidl/common/StreamWorker.cpp diff --git a/audio/aidl/common/Android.bp b/audio/aidl/common/Android.bp index 37da9d66ec..f2d8fc2a53 100644 --- a/audio/aidl/common/Android.bp +++ b/audio/aidl/common/Android.bp @@ -23,7 +23,7 @@ package { default_applicable_licenses: ["hardware_interfaces_license"], } -cc_library_headers { +cc_library { name: "libaudioaidlcommon", host_supported: true, vendor_available: true, @@ -36,13 +36,16 @@ cc_library_headers { "libbase_headers", "libsystem_headers", ], + srcs: [ + "StreamWorker.cpp", + ], } cc_test { name: "libaudioaidlcommon_test", host_supported: true, vendor_available: true, - header_libs: [ + static_libs: [ "libaudioaidlcommon", ], shared_libs: [ diff --git a/audio/aidl/common/StreamWorker.cpp b/audio/aidl/common/StreamWorker.cpp new file mode 100644 index 0000000000..9bca7609fd --- /dev/null +++ b/audio/aidl/common/StreamWorker.cpp @@ -0,0 +1,160 @@ +/* + * Copyright (C) 2022 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +#include "include/StreamWorker.h" + +namespace android::hardware::audio::common::internal { + +bool ThreadController::start(const std::string& name, int priority) { + mThreadName = name; + mThreadPriority = priority; + mWorker = std::thread(&ThreadController::workerThread, this); + std::unique_lock lock(mWorkerLock); + android::base::ScopedLockAssertion lock_assertion(mWorkerLock); + mWorkerCv.wait(lock, [&]() { + android::base::ScopedLockAssertion lock_assertion(mWorkerLock); + return mWorkerState == WorkerState::RUNNING || !mError.empty(); + }); + mWorkerStateChangeRequest = false; + return mWorkerState == WorkerState::RUNNING; +} + +void ThreadController::stop() { + { + std::lock_guard lock(mWorkerLock); + if (mWorkerState != WorkerState::STOPPED) { + mWorkerState = WorkerState::STOPPED; + mWorkerStateChangeRequest = true; + } + } + if (mWorker.joinable()) { + mWorker.join(); + } +} + +bool ThreadController::waitForAtLeastOneCycle() { + WorkerState newState; + switchWorkerStateSync(WorkerState::RUNNING, WorkerState::PAUSE_REQUESTED, &newState); + if (newState != WorkerState::PAUSED) return false; + switchWorkerStateSync(newState, WorkerState::RESUME_REQUESTED, &newState); + return newState == WorkerState::RUNNING; +} + +void ThreadController::switchWorkerStateSync(WorkerState oldState, WorkerState newState, + WorkerState* finalState) { + std::unique_lock lock(mWorkerLock); + android::base::ScopedLockAssertion lock_assertion(mWorkerLock); + if (mWorkerState != oldState) { + if (finalState) *finalState = mWorkerState; + return; + } + mWorkerState = newState; + mWorkerStateChangeRequest = true; + mWorkerCv.wait(lock, [&]() { + android::base::ScopedLockAssertion lock_assertion(mWorkerLock); + return mWorkerState != newState; + }); + if (finalState) *finalState = mWorkerState; +} + +void ThreadController::workerThread() { + using Status = StreamLogic::Status; + + std::string error = mLogic->init(); + if (error.empty() && !mThreadName.empty()) { + std::string compliantName(mThreadName.substr(0, 15)); + if (int errCode = pthread_setname_np(pthread_self(), compliantName.c_str()); errCode != 0) { + error.append("Failed to set thread name: ").append(strerror(errCode)); + } + } + if (error.empty() && mThreadPriority != ANDROID_PRIORITY_DEFAULT) { + if (int result = setpriority(PRIO_PROCESS, 0, mThreadPriority); result != 0) { + int errCode = errno; + error.append("Failed to set thread priority: ").append(strerror(errCode)); + } + } + { + std::lock_guard lock(mWorkerLock); + mWorkerState = error.empty() ? WorkerState::RUNNING : WorkerState::STOPPED; + mError = error; + } + mWorkerCv.notify_one(); + if (!error.empty()) return; + + for (WorkerState state = WorkerState::RUNNING; state != WorkerState::STOPPED;) { + bool needToNotify = false; + if (Status status = state != WorkerState::PAUSED ? mLogic->cycle() + : (sched_yield(), Status::CONTINUE); + status == Status::CONTINUE) { + { + // See https://developer.android.com/training/articles/smp#nonracing + android::base::ScopedLockAssertion lock_assertion(mWorkerLock); + if (!mWorkerStateChangeRequest.load(std::memory_order_relaxed)) continue; + } + // + // Pause and resume are synchronous. One worker cycle must complete + // before the worker indicates a state change. This is how 'mWorkerState' and + // 'state' interact: + // + // mWorkerState == RUNNING + // client sets mWorkerState := PAUSE_REQUESTED + // last workerCycle gets executed, state := mWorkerState := PAUSED by us + // (or the workers enters the 'error' state if workerCycle fails) + // client gets notified about state change in any case + // thread is doing a busy wait while 'state == PAUSED' + // client sets mWorkerState := RESUME_REQUESTED + // state := mWorkerState (RESUME_REQUESTED) + // mWorkerState := RUNNING, but we don't notify the client yet + // first workerCycle gets executed, the code below triggers a client notification + // (or if workerCycle fails, worker enters 'error' state and also notifies) + // state := mWorkerState (RUNNING) + std::lock_guard lock(mWorkerLock); + if (state == WorkerState::RESUME_REQUESTED) { + needToNotify = true; + } + state = mWorkerState; + if (mWorkerState == WorkerState::PAUSE_REQUESTED) { + state = mWorkerState = WorkerState::PAUSED; + needToNotify = true; + } else if (mWorkerState == WorkerState::RESUME_REQUESTED) { + mWorkerState = WorkerState::RUNNING; + } + } else { + std::lock_guard lock(mWorkerLock); + if (state == WorkerState::RESUME_REQUESTED || + mWorkerState == WorkerState::PAUSE_REQUESTED) { + needToNotify = true; + } + state = mWorkerState = WorkerState::STOPPED; + if (status == Status::ABORT) { + mError = "Received ABORT from the logic cycle"; + } + } + if (needToNotify) { + { + std::lock_guard lock(mWorkerLock); + mWorkerStateChangeRequest = false; + } + mWorkerCv.notify_one(); + } + } +} + +} // namespace android::hardware::audio::common::internal diff --git a/audio/aidl/common/include/StreamWorker.h b/audio/aidl/common/include/StreamWorker.h index 03685fcf29..6260eca49a 100644 --- a/audio/aidl/common/include/StreamWorker.h +++ b/audio/aidl/common/include/StreamWorker.h @@ -16,10 +16,6 @@ #pragma once -#include -#include -#include - #include #include #include @@ -31,32 +27,18 @@ namespace android::hardware::audio::common { -template -class StreamWorker { +class StreamLogic; + +namespace internal { + +class ThreadController { enum class WorkerState { STOPPED, RUNNING, PAUSE_REQUESTED, PAUSED, RESUME_REQUESTED }; public: - enum class WorkerStatus { ABORT, CONTINUE, EXIT }; + explicit ThreadController(StreamLogic* logic) : mLogic(logic) {} + ~ThreadController() { stop(); } - StreamWorker() = default; - ~StreamWorker() { stop(); } - // Note that 'priority' here is what is known as the 'nice number' in *nix systems. - // The nice number is used with the default scheduler. For threads that - // need to use a specialized scheduler (e.g. SCHED_FIFO) and set the priority within it, - // it is recommended to implement an appropriate configuration sequence within `workerInit`. - bool start(const std::string& name = "", int priority = ANDROID_PRIORITY_DEFAULT) { - mThreadName = name; - mThreadPriority = priority; - mWorker = std::thread(&StreamWorker::workerThread, this); - std::unique_lock lock(mWorkerLock); - android::base::ScopedLockAssertion lock_assertion(mWorkerLock); - mWorkerCv.wait(lock, [&]() { - android::base::ScopedLockAssertion lock_assertion(mWorkerLock); - return mWorkerState == WorkerState::RUNNING || !mError.empty(); - }); - mWorkerStateChangeRequest = false; - return mWorkerState == WorkerState::RUNNING; - } + bool start(const std::string& name, int priority); void pause() { switchWorkerStateSync(WorkerState::RUNNING, WorkerState::PAUSE_REQUESTED); } void resume() { switchWorkerStateSync(WorkerState::PAUSED, WorkerState::RESUME_REQUESTED); } bool hasError() { @@ -67,150 +49,21 @@ class StreamWorker { std::lock_guard lock(mWorkerLock); return mError; } - void stop() { - { - std::lock_guard lock(mWorkerLock); - if (mWorkerState != WorkerState::STOPPED) { - mWorkerState = WorkerState::STOPPED; - mWorkerStateChangeRequest = true; - } - } - if (mWorker.joinable()) { - mWorker.join(); - } - } - bool waitForAtLeastOneCycle() { - WorkerState newState; - switchWorkerStateSync(WorkerState::RUNNING, WorkerState::PAUSE_REQUESTED, &newState); - if (newState != WorkerState::PAUSED) return false; - switchWorkerStateSync(newState, WorkerState::RESUME_REQUESTED, &newState); - return newState == WorkerState::RUNNING; - } + void stop(); + bool waitForAtLeastOneCycle(); + // Only used by unit tests. - void testLockUnlockMutex(bool lock) NO_THREAD_SAFETY_ANALYSIS { + void lockUnlockMutex(bool lock) NO_THREAD_SAFETY_ANALYSIS { lock ? mWorkerLock.lock() : mWorkerLock.unlock(); } - std::thread::native_handle_type testGetThreadNativeHandle() { return mWorker.native_handle(); } - - // Methods that need to be provided by subclasses: - // - // /* Called once at the beginning of the thread loop. Must return - // * an empty string to enter the thread loop, otherwise the thread loop - // * exits and the worker switches into the 'error' state, setting - // * the error to the returned value. - // */ - // std::string workerInit(); - // - // /* Called for each thread loop unless the thread is in 'paused' state. - // * Must return 'CONTINUE' to continue running, otherwise the thread loop - // * exits. If the result from worker cycle is 'ABORT' then the worker switches - // * into the 'error' state with a generic error message. It is recommended that - // * the subclass reports any problems via logging facilities. Returning the 'EXIT' - // * status is equivalent to calling 'stop()' method. This is just a way of - // * of stopping the worker by its own initiative. - // */ - // WorkerStatus workerCycle(); + std::thread::native_handle_type getThreadNativeHandle() { return mWorker.native_handle(); } private: void switchWorkerStateSync(WorkerState oldState, WorkerState newState, - WorkerState* finalState = nullptr) { - std::unique_lock lock(mWorkerLock); - android::base::ScopedLockAssertion lock_assertion(mWorkerLock); - if (mWorkerState != oldState) { - if (finalState) *finalState = mWorkerState; - return; - } - mWorkerState = newState; - mWorkerStateChangeRequest = true; - mWorkerCv.wait(lock, [&]() { - android::base::ScopedLockAssertion lock_assertion(mWorkerLock); - return mWorkerState != newState; - }); - if (finalState) *finalState = mWorkerState; - } - void workerThread() { - std::string error = static_cast(this)->workerInit(); - if (error.empty() && !mThreadName.empty()) { - std::string compliantName(mThreadName.substr(0, 15)); - if (int errCode = pthread_setname_np(pthread_self(), compliantName.c_str()); - errCode != 0) { - error.append("Failed to set thread name: ").append(strerror(errCode)); - } - } - if (error.empty() && mThreadPriority != ANDROID_PRIORITY_DEFAULT) { - if (int result = setpriority(PRIO_PROCESS, 0, mThreadPriority); result != 0) { - int errCode = errno; - error.append("Failed to set thread priority: ").append(strerror(errCode)); - } - } - { - std::lock_guard lock(mWorkerLock); - mWorkerState = error.empty() ? WorkerState::RUNNING : WorkerState::STOPPED; - mError = error; - } - mWorkerCv.notify_one(); - if (!error.empty()) return; - - for (WorkerState state = WorkerState::RUNNING; state != WorkerState::STOPPED;) { - bool needToNotify = false; - if (WorkerStatus status = state != WorkerState::PAUSED - ? static_cast(this)->workerCycle() - : (sched_yield(), WorkerStatus::CONTINUE); - status == WorkerStatus::CONTINUE) { - { - // See https://developer.android.com/training/articles/smp#nonracing - android::base::ScopedLockAssertion lock_assertion(mWorkerLock); - if (!mWorkerStateChangeRequest.load(std::memory_order_relaxed)) continue; - } - // - // Pause and resume are synchronous. One worker cycle must complete - // before the worker indicates a state change. This is how 'mWorkerState' and - // 'state' interact: - // - // mWorkerState == RUNNING - // client sets mWorkerState := PAUSE_REQUESTED - // last workerCycle gets executed, state := mWorkerState := PAUSED by us - // (or the workers enters the 'error' state if workerCycle fails) - // client gets notified about state change in any case - // thread is doing a busy wait while 'state == PAUSED' - // client sets mWorkerState := RESUME_REQUESTED - // state := mWorkerState (RESUME_REQUESTED) - // mWorkerState := RUNNING, but we don't notify the client yet - // first workerCycle gets executed, the code below triggers a client notification - // (or if workerCycle fails, worker enters 'error' state and also notifies) - // state := mWorkerState (RUNNING) - std::lock_guard lock(mWorkerLock); - if (state == WorkerState::RESUME_REQUESTED) { - needToNotify = true; - } - state = mWorkerState; - if (mWorkerState == WorkerState::PAUSE_REQUESTED) { - state = mWorkerState = WorkerState::PAUSED; - needToNotify = true; - } else if (mWorkerState == WorkerState::RESUME_REQUESTED) { - mWorkerState = WorkerState::RUNNING; - } - } else { - std::lock_guard lock(mWorkerLock); - if (state == WorkerState::RESUME_REQUESTED || - mWorkerState == WorkerState::PAUSE_REQUESTED) { - needToNotify = true; - } - state = mWorkerState = WorkerState::STOPPED; - if (status == WorkerStatus::ABORT) { - mError = "workerCycle aborted"; - } - } - if (needToNotify) { - { - std::lock_guard lock(mWorkerLock); - mWorkerStateChangeRequest = false; - } - mWorkerCv.notify_one(); - } - } - } + WorkerState* finalState = nullptr); + void workerThread(); + StreamLogic* const mLogic; std::string mThreadName; int mThreadPriority = ANDROID_PRIORITY_DEFAULT; std::thread mWorker; @@ -230,4 +83,71 @@ class StreamWorker { std::atomic mWorkerStateChangeRequest GUARDED_BY(mWorkerLock) = false; }; +} // namespace internal + +class StreamLogic { + public: + friend class internal::ThreadController; + + virtual ~StreamLogic() = default; + + protected: + enum class Status { ABORT, CONTINUE, EXIT }; + + /* Called once at the beginning of the thread loop. Must return + * an empty string to enter the thread loop, otherwise the thread loop + * exits and the worker switches into the 'error' state, setting + * the error to the returned value. + */ + virtual std::string init() = 0; + + /* Called for each thread loop unless the thread is in 'paused' state. + * Must return 'CONTINUE' to continue running, otherwise the thread loop + * exits. If the result from worker cycle is 'ABORT' then the worker switches + * into the 'error' state with a generic error message. It is recommended that + * the subclass reports any problems via logging facilities. Returning the 'EXIT' + * status is equivalent to calling 'stop()' method. This is just a way of + * of stopping the worker by its own initiative. + */ + virtual Status cycle() = 0; +}; + +template +class StreamWorker : public LogicImpl { + public: + template + explicit StreamWorker(Args&&... args) : LogicImpl(std::forward(args)...), mThread(this) {} + + // Methods of LogicImpl are available via inheritance. + // Forwarded methods of ThreadController follow. + + // Note that 'priority' here is what is known as the 'nice number' in *nix systems. + // The nice number is used with the default scheduler. For threads that + // need to use a specialized scheduler (e.g. SCHED_FIFO) and set the priority within it, + // it is recommended to implement an appropriate configuration sequence within + // 'LogicImpl' or 'StreamLogic::init'. + bool start(const std::string& name = "", int priority = ANDROID_PRIORITY_DEFAULT) { + return mThread.start(name, priority); + } + void pause() { mThread.pause(); } + void resume() { mThread.resume(); } + bool hasError() { return mThread.hasError(); } + std::string getError() { return mThread.getError(); } + void stop() { return mThread.stop(); } + bool waitForAtLeastOneCycle() { return mThread.waitForAtLeastOneCycle(); } + + // Only used by unit tests. + void testLockUnlockMutex(bool lock) { mThread.lockUnlockMutex(lock); } + std::thread::native_handle_type testGetThreadNativeHandle() { + return mThread.getThreadNativeHandle(); + } + + private: + // The ThreadController gets destroyed before LogicImpl. + // After the controller has been destroyed, it is guaranteed that + // the thread was joined, thus the 'cycle' method of LogicImpl + // will not be called anymore, and it is safe to destroy LogicImpl. + internal::ThreadController mThread; +}; + } // namespace android::hardware::audio::common diff --git a/audio/aidl/common/tests/streamworker_tests.cpp b/audio/aidl/common/tests/streamworker_tests.cpp index df81c69716..e3e484d7e3 100644 --- a/audio/aidl/common/tests/streamworker_tests.cpp +++ b/audio/aidl/common/tests/streamworker_tests.cpp @@ -16,6 +16,7 @@ #include #include +#include #include #include @@ -26,18 +27,19 @@ #define LOG_TAG "StreamWorker_Test" #include +using android::hardware::audio::common::StreamLogic; using android::hardware::audio::common::StreamWorker; -class TestWorker : public StreamWorker { +class TestWorkerLogic : public StreamLogic { public: struct Stream { - void setErrorStatus() { status = WorkerStatus::ABORT; } - void setStopStatus() { status = WorkerStatus::EXIT; } - std::atomic status = WorkerStatus::CONTINUE; + void setErrorStatus() { status = Status::ABORT; } + void setStopStatus() { status = Status::EXIT; } + std::atomic status = Status::CONTINUE; }; // Use nullptr to test error reporting from the worker thread. - explicit TestWorker(Stream* stream) : mStream(stream) {} + explicit TestWorkerLogic(Stream* stream) : mStream(stream) {} size_t getWorkerCycles() const { return mWorkerCycles; } int getPriority() const { return mPriority; } @@ -48,8 +50,10 @@ class TestWorker : public StreamWorker { return mWorkerCycles == cyclesBefore; } - std::string workerInit() { return mStream != nullptr ? "" : "Expected error"; } - WorkerStatus workerCycle() { + protected: + // StreamLogic implementation + std::string init() override { return mStream != nullptr ? "" : "Expected error"; } + Status cycle() override { mPriority = getpriority(PRIO_PROCESS, 0); do { mWorkerCycles++; @@ -62,6 +66,7 @@ class TestWorker : public StreamWorker { std::atomic mWorkerCycles = 0; std::atomic mPriority = ANDROID_PRIORITY_DEFAULT; }; +using TestWorker = StreamWorker; // The parameter specifies whether an extra call to 'stop' is made at the end. class StreamWorkerInvalidTest : public testing::TestWithParam { diff --git a/audio/aidl/default/Android.bp b/audio/aidl/default/Android.bp index 027d92873f..07b10976f7 100644 --- a/audio/aidl/default/Android.bp +++ b/audio/aidl/default/Android.bp @@ -11,6 +11,7 @@ cc_library_static { name: "libaudioserviceexampleimpl", vendor: true, shared_libs: [ + "libaudioaidlcommon", "libbase", "libbinder_ndk", "libstagefright_foundation", diff --git a/audio/aidl/vts/Android.bp b/audio/aidl/vts/Android.bp index 75ff37f088..1d0ec7c3f6 100644 --- a/audio/aidl/vts/Android.bp +++ b/audio/aidl/vts/Android.bp @@ -26,6 +26,7 @@ cc_test { "android.hardware.common-V2-ndk", "android.hardware.common.fmq-V1-ndk", "android.media.audio.common.types-V1-ndk", + "libaudioaidlcommon", ], test_suites: [ "general-tests",