audio: Fix the lifetime of the StreamWorker's logic part am: 0b9c5feed1
am: 4b279d6a32
am: 231ca12ce8
Original change: https://android-review.googlesource.com/c/platform/hardware/interfaces/+/2199345 Change-Id: I3d2d99dbe43fc36cffb25811b2f2d79201943bea Signed-off-by: Automerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>
This commit is contained in:
commit
63989892e3
6 changed files with 262 additions and 172 deletions
|
@ -23,7 +23,7 @@ package {
|
||||||
default_applicable_licenses: ["hardware_interfaces_license"],
|
default_applicable_licenses: ["hardware_interfaces_license"],
|
||||||
}
|
}
|
||||||
|
|
||||||
cc_library_headers {
|
cc_library {
|
||||||
name: "libaudioaidlcommon",
|
name: "libaudioaidlcommon",
|
||||||
host_supported: true,
|
host_supported: true,
|
||||||
vendor_available: true,
|
vendor_available: true,
|
||||||
|
@ -36,13 +36,16 @@ cc_library_headers {
|
||||||
"libbase_headers",
|
"libbase_headers",
|
||||||
"libsystem_headers",
|
"libsystem_headers",
|
||||||
],
|
],
|
||||||
|
srcs: [
|
||||||
|
"StreamWorker.cpp",
|
||||||
|
],
|
||||||
}
|
}
|
||||||
|
|
||||||
cc_test {
|
cc_test {
|
||||||
name: "libaudioaidlcommon_test",
|
name: "libaudioaidlcommon_test",
|
||||||
host_supported: true,
|
host_supported: true,
|
||||||
vendor_available: true,
|
vendor_available: true,
|
||||||
header_libs: [
|
static_libs: [
|
||||||
"libaudioaidlcommon",
|
"libaudioaidlcommon",
|
||||||
],
|
],
|
||||||
shared_libs: [
|
shared_libs: [
|
||||||
|
|
160
audio/aidl/common/StreamWorker.cpp
Normal file
160
audio/aidl/common/StreamWorker.cpp
Normal file
|
@ -0,0 +1,160 @@
|
||||||
|
/*
|
||||||
|
* Copyright (C) 2022 The Android Open Source Project
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include <pthread.h>
|
||||||
|
#include <sched.h>
|
||||||
|
#include <sys/resource.h>
|
||||||
|
|
||||||
|
#include "include/StreamWorker.h"
|
||||||
|
|
||||||
|
namespace android::hardware::audio::common::internal {
|
||||||
|
|
||||||
|
bool ThreadController::start(const std::string& name, int priority) {
|
||||||
|
mThreadName = name;
|
||||||
|
mThreadPriority = priority;
|
||||||
|
mWorker = std::thread(&ThreadController::workerThread, this);
|
||||||
|
std::unique_lock<std::mutex> lock(mWorkerLock);
|
||||||
|
android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
|
||||||
|
mWorkerCv.wait(lock, [&]() {
|
||||||
|
android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
|
||||||
|
return mWorkerState == WorkerState::RUNNING || !mError.empty();
|
||||||
|
});
|
||||||
|
mWorkerStateChangeRequest = false;
|
||||||
|
return mWorkerState == WorkerState::RUNNING;
|
||||||
|
}
|
||||||
|
|
||||||
|
void ThreadController::stop() {
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lock(mWorkerLock);
|
||||||
|
if (mWorkerState != WorkerState::STOPPED) {
|
||||||
|
mWorkerState = WorkerState::STOPPED;
|
||||||
|
mWorkerStateChangeRequest = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (mWorker.joinable()) {
|
||||||
|
mWorker.join();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
bool ThreadController::waitForAtLeastOneCycle() {
|
||||||
|
WorkerState newState;
|
||||||
|
switchWorkerStateSync(WorkerState::RUNNING, WorkerState::PAUSE_REQUESTED, &newState);
|
||||||
|
if (newState != WorkerState::PAUSED) return false;
|
||||||
|
switchWorkerStateSync(newState, WorkerState::RESUME_REQUESTED, &newState);
|
||||||
|
return newState == WorkerState::RUNNING;
|
||||||
|
}
|
||||||
|
|
||||||
|
void ThreadController::switchWorkerStateSync(WorkerState oldState, WorkerState newState,
|
||||||
|
WorkerState* finalState) {
|
||||||
|
std::unique_lock<std::mutex> lock(mWorkerLock);
|
||||||
|
android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
|
||||||
|
if (mWorkerState != oldState) {
|
||||||
|
if (finalState) *finalState = mWorkerState;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
mWorkerState = newState;
|
||||||
|
mWorkerStateChangeRequest = true;
|
||||||
|
mWorkerCv.wait(lock, [&]() {
|
||||||
|
android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
|
||||||
|
return mWorkerState != newState;
|
||||||
|
});
|
||||||
|
if (finalState) *finalState = mWorkerState;
|
||||||
|
}
|
||||||
|
|
||||||
|
void ThreadController::workerThread() {
|
||||||
|
using Status = StreamLogic::Status;
|
||||||
|
|
||||||
|
std::string error = mLogic->init();
|
||||||
|
if (error.empty() && !mThreadName.empty()) {
|
||||||
|
std::string compliantName(mThreadName.substr(0, 15));
|
||||||
|
if (int errCode = pthread_setname_np(pthread_self(), compliantName.c_str()); errCode != 0) {
|
||||||
|
error.append("Failed to set thread name: ").append(strerror(errCode));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (error.empty() && mThreadPriority != ANDROID_PRIORITY_DEFAULT) {
|
||||||
|
if (int result = setpriority(PRIO_PROCESS, 0, mThreadPriority); result != 0) {
|
||||||
|
int errCode = errno;
|
||||||
|
error.append("Failed to set thread priority: ").append(strerror(errCode));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lock(mWorkerLock);
|
||||||
|
mWorkerState = error.empty() ? WorkerState::RUNNING : WorkerState::STOPPED;
|
||||||
|
mError = error;
|
||||||
|
}
|
||||||
|
mWorkerCv.notify_one();
|
||||||
|
if (!error.empty()) return;
|
||||||
|
|
||||||
|
for (WorkerState state = WorkerState::RUNNING; state != WorkerState::STOPPED;) {
|
||||||
|
bool needToNotify = false;
|
||||||
|
if (Status status = state != WorkerState::PAUSED ? mLogic->cycle()
|
||||||
|
: (sched_yield(), Status::CONTINUE);
|
||||||
|
status == Status::CONTINUE) {
|
||||||
|
{
|
||||||
|
// See https://developer.android.com/training/articles/smp#nonracing
|
||||||
|
android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
|
||||||
|
if (!mWorkerStateChangeRequest.load(std::memory_order_relaxed)) continue;
|
||||||
|
}
|
||||||
|
//
|
||||||
|
// Pause and resume are synchronous. One worker cycle must complete
|
||||||
|
// before the worker indicates a state change. This is how 'mWorkerState' and
|
||||||
|
// 'state' interact:
|
||||||
|
//
|
||||||
|
// mWorkerState == RUNNING
|
||||||
|
// client sets mWorkerState := PAUSE_REQUESTED
|
||||||
|
// last workerCycle gets executed, state := mWorkerState := PAUSED by us
|
||||||
|
// (or the workers enters the 'error' state if workerCycle fails)
|
||||||
|
// client gets notified about state change in any case
|
||||||
|
// thread is doing a busy wait while 'state == PAUSED'
|
||||||
|
// client sets mWorkerState := RESUME_REQUESTED
|
||||||
|
// state := mWorkerState (RESUME_REQUESTED)
|
||||||
|
// mWorkerState := RUNNING, but we don't notify the client yet
|
||||||
|
// first workerCycle gets executed, the code below triggers a client notification
|
||||||
|
// (or if workerCycle fails, worker enters 'error' state and also notifies)
|
||||||
|
// state := mWorkerState (RUNNING)
|
||||||
|
std::lock_guard<std::mutex> lock(mWorkerLock);
|
||||||
|
if (state == WorkerState::RESUME_REQUESTED) {
|
||||||
|
needToNotify = true;
|
||||||
|
}
|
||||||
|
state = mWorkerState;
|
||||||
|
if (mWorkerState == WorkerState::PAUSE_REQUESTED) {
|
||||||
|
state = mWorkerState = WorkerState::PAUSED;
|
||||||
|
needToNotify = true;
|
||||||
|
} else if (mWorkerState == WorkerState::RESUME_REQUESTED) {
|
||||||
|
mWorkerState = WorkerState::RUNNING;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
std::lock_guard<std::mutex> lock(mWorkerLock);
|
||||||
|
if (state == WorkerState::RESUME_REQUESTED ||
|
||||||
|
mWorkerState == WorkerState::PAUSE_REQUESTED) {
|
||||||
|
needToNotify = true;
|
||||||
|
}
|
||||||
|
state = mWorkerState = WorkerState::STOPPED;
|
||||||
|
if (status == Status::ABORT) {
|
||||||
|
mError = "Received ABORT from the logic cycle";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (needToNotify) {
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lock(mWorkerLock);
|
||||||
|
mWorkerStateChangeRequest = false;
|
||||||
|
}
|
||||||
|
mWorkerCv.notify_one();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
} // namespace android::hardware::audio::common::internal
|
|
@ -16,10 +16,6 @@
|
||||||
|
|
||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#include <pthread.h>
|
|
||||||
#include <sched.h>
|
|
||||||
#include <sys/resource.h>
|
|
||||||
|
|
||||||
#include <atomic>
|
#include <atomic>
|
||||||
#include <condition_variable>
|
#include <condition_variable>
|
||||||
#include <mutex>
|
#include <mutex>
|
||||||
|
@ -31,32 +27,18 @@
|
||||||
|
|
||||||
namespace android::hardware::audio::common {
|
namespace android::hardware::audio::common {
|
||||||
|
|
||||||
template <typename Impl>
|
class StreamLogic;
|
||||||
class StreamWorker {
|
|
||||||
|
namespace internal {
|
||||||
|
|
||||||
|
class ThreadController {
|
||||||
enum class WorkerState { STOPPED, RUNNING, PAUSE_REQUESTED, PAUSED, RESUME_REQUESTED };
|
enum class WorkerState { STOPPED, RUNNING, PAUSE_REQUESTED, PAUSED, RESUME_REQUESTED };
|
||||||
|
|
||||||
public:
|
public:
|
||||||
enum class WorkerStatus { ABORT, CONTINUE, EXIT };
|
explicit ThreadController(StreamLogic* logic) : mLogic(logic) {}
|
||||||
|
~ThreadController() { stop(); }
|
||||||
|
|
||||||
StreamWorker() = default;
|
bool start(const std::string& name, int priority);
|
||||||
~StreamWorker() { stop(); }
|
|
||||||
// Note that 'priority' here is what is known as the 'nice number' in *nix systems.
|
|
||||||
// The nice number is used with the default scheduler. For threads that
|
|
||||||
// need to use a specialized scheduler (e.g. SCHED_FIFO) and set the priority within it,
|
|
||||||
// it is recommended to implement an appropriate configuration sequence within `workerInit`.
|
|
||||||
bool start(const std::string& name = "", int priority = ANDROID_PRIORITY_DEFAULT) {
|
|
||||||
mThreadName = name;
|
|
||||||
mThreadPriority = priority;
|
|
||||||
mWorker = std::thread(&StreamWorker::workerThread, this);
|
|
||||||
std::unique_lock<std::mutex> lock(mWorkerLock);
|
|
||||||
android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
|
|
||||||
mWorkerCv.wait(lock, [&]() {
|
|
||||||
android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
|
|
||||||
return mWorkerState == WorkerState::RUNNING || !mError.empty();
|
|
||||||
});
|
|
||||||
mWorkerStateChangeRequest = false;
|
|
||||||
return mWorkerState == WorkerState::RUNNING;
|
|
||||||
}
|
|
||||||
void pause() { switchWorkerStateSync(WorkerState::RUNNING, WorkerState::PAUSE_REQUESTED); }
|
void pause() { switchWorkerStateSync(WorkerState::RUNNING, WorkerState::PAUSE_REQUESTED); }
|
||||||
void resume() { switchWorkerStateSync(WorkerState::PAUSED, WorkerState::RESUME_REQUESTED); }
|
void resume() { switchWorkerStateSync(WorkerState::PAUSED, WorkerState::RESUME_REQUESTED); }
|
||||||
bool hasError() {
|
bool hasError() {
|
||||||
|
@ -67,150 +49,21 @@ class StreamWorker {
|
||||||
std::lock_guard<std::mutex> lock(mWorkerLock);
|
std::lock_guard<std::mutex> lock(mWorkerLock);
|
||||||
return mError;
|
return mError;
|
||||||
}
|
}
|
||||||
void stop() {
|
void stop();
|
||||||
{
|
bool waitForAtLeastOneCycle();
|
||||||
std::lock_guard<std::mutex> lock(mWorkerLock);
|
|
||||||
if (mWorkerState != WorkerState::STOPPED) {
|
|
||||||
mWorkerState = WorkerState::STOPPED;
|
|
||||||
mWorkerStateChangeRequest = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (mWorker.joinable()) {
|
|
||||||
mWorker.join();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
bool waitForAtLeastOneCycle() {
|
|
||||||
WorkerState newState;
|
|
||||||
switchWorkerStateSync(WorkerState::RUNNING, WorkerState::PAUSE_REQUESTED, &newState);
|
|
||||||
if (newState != WorkerState::PAUSED) return false;
|
|
||||||
switchWorkerStateSync(newState, WorkerState::RESUME_REQUESTED, &newState);
|
|
||||||
return newState == WorkerState::RUNNING;
|
|
||||||
}
|
|
||||||
// Only used by unit tests.
|
// Only used by unit tests.
|
||||||
void testLockUnlockMutex(bool lock) NO_THREAD_SAFETY_ANALYSIS {
|
void lockUnlockMutex(bool lock) NO_THREAD_SAFETY_ANALYSIS {
|
||||||
lock ? mWorkerLock.lock() : mWorkerLock.unlock();
|
lock ? mWorkerLock.lock() : mWorkerLock.unlock();
|
||||||
}
|
}
|
||||||
std::thread::native_handle_type testGetThreadNativeHandle() { return mWorker.native_handle(); }
|
std::thread::native_handle_type getThreadNativeHandle() { return mWorker.native_handle(); }
|
||||||
|
|
||||||
// Methods that need to be provided by subclasses:
|
|
||||||
//
|
|
||||||
// /* Called once at the beginning of the thread loop. Must return
|
|
||||||
// * an empty string to enter the thread loop, otherwise the thread loop
|
|
||||||
// * exits and the worker switches into the 'error' state, setting
|
|
||||||
// * the error to the returned value.
|
|
||||||
// */
|
|
||||||
// std::string workerInit();
|
|
||||||
//
|
|
||||||
// /* Called for each thread loop unless the thread is in 'paused' state.
|
|
||||||
// * Must return 'CONTINUE' to continue running, otherwise the thread loop
|
|
||||||
// * exits. If the result from worker cycle is 'ABORT' then the worker switches
|
|
||||||
// * into the 'error' state with a generic error message. It is recommended that
|
|
||||||
// * the subclass reports any problems via logging facilities. Returning the 'EXIT'
|
|
||||||
// * status is equivalent to calling 'stop()' method. This is just a way of
|
|
||||||
// * of stopping the worker by its own initiative.
|
|
||||||
// */
|
|
||||||
// WorkerStatus workerCycle();
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
void switchWorkerStateSync(WorkerState oldState, WorkerState newState,
|
void switchWorkerStateSync(WorkerState oldState, WorkerState newState,
|
||||||
WorkerState* finalState = nullptr) {
|
WorkerState* finalState = nullptr);
|
||||||
std::unique_lock<std::mutex> lock(mWorkerLock);
|
void workerThread();
|
||||||
android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
|
|
||||||
if (mWorkerState != oldState) {
|
|
||||||
if (finalState) *finalState = mWorkerState;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
mWorkerState = newState;
|
|
||||||
mWorkerStateChangeRequest = true;
|
|
||||||
mWorkerCv.wait(lock, [&]() {
|
|
||||||
android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
|
|
||||||
return mWorkerState != newState;
|
|
||||||
});
|
|
||||||
if (finalState) *finalState = mWorkerState;
|
|
||||||
}
|
|
||||||
void workerThread() {
|
|
||||||
std::string error = static_cast<Impl*>(this)->workerInit();
|
|
||||||
if (error.empty() && !mThreadName.empty()) {
|
|
||||||
std::string compliantName(mThreadName.substr(0, 15));
|
|
||||||
if (int errCode = pthread_setname_np(pthread_self(), compliantName.c_str());
|
|
||||||
errCode != 0) {
|
|
||||||
error.append("Failed to set thread name: ").append(strerror(errCode));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (error.empty() && mThreadPriority != ANDROID_PRIORITY_DEFAULT) {
|
|
||||||
if (int result = setpriority(PRIO_PROCESS, 0, mThreadPriority); result != 0) {
|
|
||||||
int errCode = errno;
|
|
||||||
error.append("Failed to set thread priority: ").append(strerror(errCode));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
{
|
|
||||||
std::lock_guard<std::mutex> lock(mWorkerLock);
|
|
||||||
mWorkerState = error.empty() ? WorkerState::RUNNING : WorkerState::STOPPED;
|
|
||||||
mError = error;
|
|
||||||
}
|
|
||||||
mWorkerCv.notify_one();
|
|
||||||
if (!error.empty()) return;
|
|
||||||
|
|
||||||
for (WorkerState state = WorkerState::RUNNING; state != WorkerState::STOPPED;) {
|
|
||||||
bool needToNotify = false;
|
|
||||||
if (WorkerStatus status = state != WorkerState::PAUSED
|
|
||||||
? static_cast<Impl*>(this)->workerCycle()
|
|
||||||
: (sched_yield(), WorkerStatus::CONTINUE);
|
|
||||||
status == WorkerStatus::CONTINUE) {
|
|
||||||
{
|
|
||||||
// See https://developer.android.com/training/articles/smp#nonracing
|
|
||||||
android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
|
|
||||||
if (!mWorkerStateChangeRequest.load(std::memory_order_relaxed)) continue;
|
|
||||||
}
|
|
||||||
//
|
|
||||||
// Pause and resume are synchronous. One worker cycle must complete
|
|
||||||
// before the worker indicates a state change. This is how 'mWorkerState' and
|
|
||||||
// 'state' interact:
|
|
||||||
//
|
|
||||||
// mWorkerState == RUNNING
|
|
||||||
// client sets mWorkerState := PAUSE_REQUESTED
|
|
||||||
// last workerCycle gets executed, state := mWorkerState := PAUSED by us
|
|
||||||
// (or the workers enters the 'error' state if workerCycle fails)
|
|
||||||
// client gets notified about state change in any case
|
|
||||||
// thread is doing a busy wait while 'state == PAUSED'
|
|
||||||
// client sets mWorkerState := RESUME_REQUESTED
|
|
||||||
// state := mWorkerState (RESUME_REQUESTED)
|
|
||||||
// mWorkerState := RUNNING, but we don't notify the client yet
|
|
||||||
// first workerCycle gets executed, the code below triggers a client notification
|
|
||||||
// (or if workerCycle fails, worker enters 'error' state and also notifies)
|
|
||||||
// state := mWorkerState (RUNNING)
|
|
||||||
std::lock_guard<std::mutex> lock(mWorkerLock);
|
|
||||||
if (state == WorkerState::RESUME_REQUESTED) {
|
|
||||||
needToNotify = true;
|
|
||||||
}
|
|
||||||
state = mWorkerState;
|
|
||||||
if (mWorkerState == WorkerState::PAUSE_REQUESTED) {
|
|
||||||
state = mWorkerState = WorkerState::PAUSED;
|
|
||||||
needToNotify = true;
|
|
||||||
} else if (mWorkerState == WorkerState::RESUME_REQUESTED) {
|
|
||||||
mWorkerState = WorkerState::RUNNING;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
std::lock_guard<std::mutex> lock(mWorkerLock);
|
|
||||||
if (state == WorkerState::RESUME_REQUESTED ||
|
|
||||||
mWorkerState == WorkerState::PAUSE_REQUESTED) {
|
|
||||||
needToNotify = true;
|
|
||||||
}
|
|
||||||
state = mWorkerState = WorkerState::STOPPED;
|
|
||||||
if (status == WorkerStatus::ABORT) {
|
|
||||||
mError = "workerCycle aborted";
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (needToNotify) {
|
|
||||||
{
|
|
||||||
std::lock_guard<std::mutex> lock(mWorkerLock);
|
|
||||||
mWorkerStateChangeRequest = false;
|
|
||||||
}
|
|
||||||
mWorkerCv.notify_one();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
StreamLogic* const mLogic;
|
||||||
std::string mThreadName;
|
std::string mThreadName;
|
||||||
int mThreadPriority = ANDROID_PRIORITY_DEFAULT;
|
int mThreadPriority = ANDROID_PRIORITY_DEFAULT;
|
||||||
std::thread mWorker;
|
std::thread mWorker;
|
||||||
|
@ -230,4 +83,71 @@ class StreamWorker {
|
||||||
std::atomic<bool> mWorkerStateChangeRequest GUARDED_BY(mWorkerLock) = false;
|
std::atomic<bool> mWorkerStateChangeRequest GUARDED_BY(mWorkerLock) = false;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
} // namespace internal
|
||||||
|
|
||||||
|
class StreamLogic {
|
||||||
|
public:
|
||||||
|
friend class internal::ThreadController;
|
||||||
|
|
||||||
|
virtual ~StreamLogic() = default;
|
||||||
|
|
||||||
|
protected:
|
||||||
|
enum class Status { ABORT, CONTINUE, EXIT };
|
||||||
|
|
||||||
|
/* Called once at the beginning of the thread loop. Must return
|
||||||
|
* an empty string to enter the thread loop, otherwise the thread loop
|
||||||
|
* exits and the worker switches into the 'error' state, setting
|
||||||
|
* the error to the returned value.
|
||||||
|
*/
|
||||||
|
virtual std::string init() = 0;
|
||||||
|
|
||||||
|
/* Called for each thread loop unless the thread is in 'paused' state.
|
||||||
|
* Must return 'CONTINUE' to continue running, otherwise the thread loop
|
||||||
|
* exits. If the result from worker cycle is 'ABORT' then the worker switches
|
||||||
|
* into the 'error' state with a generic error message. It is recommended that
|
||||||
|
* the subclass reports any problems via logging facilities. Returning the 'EXIT'
|
||||||
|
* status is equivalent to calling 'stop()' method. This is just a way of
|
||||||
|
* of stopping the worker by its own initiative.
|
||||||
|
*/
|
||||||
|
virtual Status cycle() = 0;
|
||||||
|
};
|
||||||
|
|
||||||
|
template <class LogicImpl>
|
||||||
|
class StreamWorker : public LogicImpl {
|
||||||
|
public:
|
||||||
|
template <class... Args>
|
||||||
|
explicit StreamWorker(Args&&... args) : LogicImpl(std::forward<Args>(args)...), mThread(this) {}
|
||||||
|
|
||||||
|
// Methods of LogicImpl are available via inheritance.
|
||||||
|
// Forwarded methods of ThreadController follow.
|
||||||
|
|
||||||
|
// Note that 'priority' here is what is known as the 'nice number' in *nix systems.
|
||||||
|
// The nice number is used with the default scheduler. For threads that
|
||||||
|
// need to use a specialized scheduler (e.g. SCHED_FIFO) and set the priority within it,
|
||||||
|
// it is recommended to implement an appropriate configuration sequence within
|
||||||
|
// 'LogicImpl' or 'StreamLogic::init'.
|
||||||
|
bool start(const std::string& name = "", int priority = ANDROID_PRIORITY_DEFAULT) {
|
||||||
|
return mThread.start(name, priority);
|
||||||
|
}
|
||||||
|
void pause() { mThread.pause(); }
|
||||||
|
void resume() { mThread.resume(); }
|
||||||
|
bool hasError() { return mThread.hasError(); }
|
||||||
|
std::string getError() { return mThread.getError(); }
|
||||||
|
void stop() { return mThread.stop(); }
|
||||||
|
bool waitForAtLeastOneCycle() { return mThread.waitForAtLeastOneCycle(); }
|
||||||
|
|
||||||
|
// Only used by unit tests.
|
||||||
|
void testLockUnlockMutex(bool lock) { mThread.lockUnlockMutex(lock); }
|
||||||
|
std::thread::native_handle_type testGetThreadNativeHandle() {
|
||||||
|
return mThread.getThreadNativeHandle();
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
// The ThreadController gets destroyed before LogicImpl.
|
||||||
|
// After the controller has been destroyed, it is guaranteed that
|
||||||
|
// the thread was joined, thus the 'cycle' method of LogicImpl
|
||||||
|
// will not be called anymore, and it is safe to destroy LogicImpl.
|
||||||
|
internal::ThreadController mThread;
|
||||||
|
};
|
||||||
|
|
||||||
} // namespace android::hardware::audio::common
|
} // namespace android::hardware::audio::common
|
||||||
|
|
|
@ -16,6 +16,7 @@
|
||||||
|
|
||||||
#include <pthread.h>
|
#include <pthread.h>
|
||||||
#include <sched.h>
|
#include <sched.h>
|
||||||
|
#include <sys/resource.h>
|
||||||
#include <unistd.h>
|
#include <unistd.h>
|
||||||
|
|
||||||
#include <atomic>
|
#include <atomic>
|
||||||
|
@ -26,18 +27,19 @@
|
||||||
#define LOG_TAG "StreamWorker_Test"
|
#define LOG_TAG "StreamWorker_Test"
|
||||||
#include <log/log.h>
|
#include <log/log.h>
|
||||||
|
|
||||||
|
using android::hardware::audio::common::StreamLogic;
|
||||||
using android::hardware::audio::common::StreamWorker;
|
using android::hardware::audio::common::StreamWorker;
|
||||||
|
|
||||||
class TestWorker : public StreamWorker<TestWorker> {
|
class TestWorkerLogic : public StreamLogic {
|
||||||
public:
|
public:
|
||||||
struct Stream {
|
struct Stream {
|
||||||
void setErrorStatus() { status = WorkerStatus::ABORT; }
|
void setErrorStatus() { status = Status::ABORT; }
|
||||||
void setStopStatus() { status = WorkerStatus::EXIT; }
|
void setStopStatus() { status = Status::EXIT; }
|
||||||
std::atomic<WorkerStatus> status = WorkerStatus::CONTINUE;
|
std::atomic<Status> status = Status::CONTINUE;
|
||||||
};
|
};
|
||||||
|
|
||||||
// Use nullptr to test error reporting from the worker thread.
|
// Use nullptr to test error reporting from the worker thread.
|
||||||
explicit TestWorker(Stream* stream) : mStream(stream) {}
|
explicit TestWorkerLogic(Stream* stream) : mStream(stream) {}
|
||||||
|
|
||||||
size_t getWorkerCycles() const { return mWorkerCycles; }
|
size_t getWorkerCycles() const { return mWorkerCycles; }
|
||||||
int getPriority() const { return mPriority; }
|
int getPriority() const { return mPriority; }
|
||||||
|
@ -48,8 +50,10 @@ class TestWorker : public StreamWorker<TestWorker> {
|
||||||
return mWorkerCycles == cyclesBefore;
|
return mWorkerCycles == cyclesBefore;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::string workerInit() { return mStream != nullptr ? "" : "Expected error"; }
|
protected:
|
||||||
WorkerStatus workerCycle() {
|
// StreamLogic implementation
|
||||||
|
std::string init() override { return mStream != nullptr ? "" : "Expected error"; }
|
||||||
|
Status cycle() override {
|
||||||
mPriority = getpriority(PRIO_PROCESS, 0);
|
mPriority = getpriority(PRIO_PROCESS, 0);
|
||||||
do {
|
do {
|
||||||
mWorkerCycles++;
|
mWorkerCycles++;
|
||||||
|
@ -62,6 +66,7 @@ class TestWorker : public StreamWorker<TestWorker> {
|
||||||
std::atomic<size_t> mWorkerCycles = 0;
|
std::atomic<size_t> mWorkerCycles = 0;
|
||||||
std::atomic<int> mPriority = ANDROID_PRIORITY_DEFAULT;
|
std::atomic<int> mPriority = ANDROID_PRIORITY_DEFAULT;
|
||||||
};
|
};
|
||||||
|
using TestWorker = StreamWorker<TestWorkerLogic>;
|
||||||
|
|
||||||
// The parameter specifies whether an extra call to 'stop' is made at the end.
|
// The parameter specifies whether an extra call to 'stop' is made at the end.
|
||||||
class StreamWorkerInvalidTest : public testing::TestWithParam<bool> {
|
class StreamWorkerInvalidTest : public testing::TestWithParam<bool> {
|
||||||
|
|
|
@ -11,6 +11,7 @@ cc_library_static {
|
||||||
name: "libaudioserviceexampleimpl",
|
name: "libaudioserviceexampleimpl",
|
||||||
vendor: true,
|
vendor: true,
|
||||||
shared_libs: [
|
shared_libs: [
|
||||||
|
"libaudioaidlcommon",
|
||||||
"libbase",
|
"libbase",
|
||||||
"libbinder_ndk",
|
"libbinder_ndk",
|
||||||
"libstagefright_foundation",
|
"libstagefright_foundation",
|
||||||
|
|
|
@ -26,6 +26,7 @@ cc_test {
|
||||||
"android.hardware.common-V2-ndk",
|
"android.hardware.common-V2-ndk",
|
||||||
"android.hardware.common.fmq-V1-ndk",
|
"android.hardware.common.fmq-V1-ndk",
|
||||||
"android.media.audio.common.types-V1-ndk",
|
"android.media.audio.common.types-V1-ndk",
|
||||||
|
"libaudioaidlcommon",
|
||||||
],
|
],
|
||||||
test_suites: [
|
test_suites: [
|
||||||
"general-tests",
|
"general-tests",
|
||||||
|
|
Loading…
Reference in a new issue