audio: Implement setting name and priority in StreamWorker
Audio threads need to be able to set their priority.
Also, traditionally these worker threads set a custom
thread name. Implement this functionality in StreamWorker.
Since initialization steps can fail, implement simple
error reporting via a string field `mError`. The state
of the string field replaces the dedicated `ERROR` worker
state.
Bug: 205884982
Test: atest libaudioaidlcommon_test --iterations
Merged-In: Ie9ab94922d47f277a4993a90b478a2fa76657923
Change-Id: Ie9ab94922d47f277a4993a90b478a2fa76657923
(cherry picked from commit e9e0f7c0f5
)
This commit is contained in:
parent
0c174e9133
commit
48e2e8fe49
3 changed files with 76 additions and 17 deletions
|
@ -30,9 +30,11 @@ cc_library_headers {
|
|||
export_include_dirs: ["include"],
|
||||
header_libs: [
|
||||
"libbase_headers",
|
||||
"libsystem_headers",
|
||||
],
|
||||
export_header_lib_headers: [
|
||||
"libbase_headers",
|
||||
"libsystem_headers",
|
||||
],
|
||||
}
|
||||
|
||||
|
|
|
@ -16,29 +16,39 @@
|
|||
|
||||
#pragma once
|
||||
|
||||
#include <pthread.h>
|
||||
#include <sched.h>
|
||||
#include <sys/resource.h>
|
||||
|
||||
#include <atomic>
|
||||
#include <condition_variable>
|
||||
#include <mutex>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
|
||||
#include <android-base/thread_annotations.h>
|
||||
#include <system/thread_defs.h>
|
||||
|
||||
template <typename Impl>
|
||||
class StreamWorker {
|
||||
enum class WorkerState { STOPPED, RUNNING, PAUSE_REQUESTED, PAUSED, RESUME_REQUESTED, ERROR };
|
||||
enum class WorkerState { STOPPED, RUNNING, PAUSE_REQUESTED, PAUSED, RESUME_REQUESTED };
|
||||
|
||||
public:
|
||||
StreamWorker() = default;
|
||||
~StreamWorker() { stop(); }
|
||||
bool start() {
|
||||
// Note that 'priority' here is what is known as the 'nice number' in *nix systems.
|
||||
// The nice number is used with the default scheduler. For threads that
|
||||
// need to use a specialized scheduler (e.g. SCHED_FIFO) and set the priority within it,
|
||||
// it is recommended to implement an appropriate configuration sequence within `workerInit`.
|
||||
bool start(const std::string& name = "", int priority = ANDROID_PRIORITY_DEFAULT) {
|
||||
mThreadName = name;
|
||||
mThreadPriority = priority;
|
||||
mWorker = std::thread(&StreamWorker::workerThread, this);
|
||||
std::unique_lock<std::mutex> lock(mWorkerLock);
|
||||
android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
|
||||
mWorkerCv.wait(lock, [&]() {
|
||||
android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
|
||||
return mWorkerState != WorkerState::STOPPED;
|
||||
return mWorkerState == WorkerState::RUNNING || !mError.empty();
|
||||
});
|
||||
mWorkerStateChangeRequest = false;
|
||||
return mWorkerState == WorkerState::RUNNING;
|
||||
|
@ -47,14 +57,20 @@ class StreamWorker {
|
|||
void resume() { switchWorkerStateSync(WorkerState::PAUSED, WorkerState::RESUME_REQUESTED); }
|
||||
bool hasError() {
|
||||
std::lock_guard<std::mutex> lock(mWorkerLock);
|
||||
return mWorkerState == WorkerState::ERROR;
|
||||
return !mError.empty();
|
||||
}
|
||||
std::string getError() {
|
||||
std::lock_guard<std::mutex> lock(mWorkerLock);
|
||||
return mError;
|
||||
}
|
||||
void stop() {
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(mWorkerLock);
|
||||
if (mWorkerState == WorkerState::STOPPED) return;
|
||||
mWorkerState = WorkerState::STOPPED;
|
||||
mWorkerStateChangeRequest = true;
|
||||
if (mError.empty()) {
|
||||
if (mWorkerState == WorkerState::STOPPED) return;
|
||||
mWorkerState = WorkerState::STOPPED;
|
||||
mWorkerStateChangeRequest = true;
|
||||
}
|
||||
}
|
||||
if (mWorker.joinable()) {
|
||||
mWorker.join();
|
||||
|
@ -71,17 +87,21 @@ class StreamWorker {
|
|||
void testLockUnlockMutex(bool lock) NO_THREAD_SAFETY_ANALYSIS {
|
||||
lock ? mWorkerLock.lock() : mWorkerLock.unlock();
|
||||
}
|
||||
std::thread::native_handle_type testGetThreadNativeHandle() { return mWorker.native_handle(); }
|
||||
|
||||
// Methods that need to be provided by subclasses:
|
||||
//
|
||||
// Called once at the beginning of the thread loop. Must return
|
||||
// 'true' to enter the thread loop, otherwise the thread loop
|
||||
// exits and the worker switches into the 'error' state.
|
||||
// bool workerInit();
|
||||
// an empty string to enter the thread loop, otherwise the thread loop
|
||||
// exits and the worker switches into the 'error' state, setting
|
||||
// the error to the returned value.
|
||||
// std::string workerInit();
|
||||
//
|
||||
// Called for each thread loop unless the thread is in 'paused' state.
|
||||
// Must return 'true' to continue running, otherwise the thread loop
|
||||
// exits and the worker switches into the 'error' state.
|
||||
// exits and the worker switches into the 'error' state with a generic
|
||||
// error message. It is recommended that the subclass reports any
|
||||
// problems via logging facilities.
|
||||
// bool workerCycle();
|
||||
|
||||
private:
|
||||
|
@ -102,13 +122,27 @@ class StreamWorker {
|
|||
if (finalState) *finalState = mWorkerState;
|
||||
}
|
||||
void workerThread() {
|
||||
bool success = static_cast<Impl*>(this)->workerInit();
|
||||
std::string error = static_cast<Impl*>(this)->workerInit();
|
||||
if (error.empty() && !mThreadName.empty()) {
|
||||
std::string compliantName(mThreadName.substr(0, 15));
|
||||
if (int errCode = pthread_setname_np(pthread_self(), compliantName.c_str());
|
||||
errCode != 0) {
|
||||
error.append("Failed to set thread name: ").append(strerror(errCode));
|
||||
}
|
||||
}
|
||||
if (error.empty() && mThreadPriority != ANDROID_PRIORITY_DEFAULT) {
|
||||
if (int result = setpriority(PRIO_PROCESS, 0, mThreadPriority); result != 0) {
|
||||
int errCode = errno;
|
||||
error.append("Failed to set thread priority: ").append(strerror(errCode));
|
||||
}
|
||||
}
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(mWorkerLock);
|
||||
mWorkerState = success ? WorkerState::RUNNING : WorkerState::ERROR;
|
||||
mWorkerState = error.empty() ? WorkerState::RUNNING : WorkerState::STOPPED;
|
||||
mError = error;
|
||||
}
|
||||
mWorkerCv.notify_one();
|
||||
if (!success) return;
|
||||
if (!error.empty()) return;
|
||||
|
||||
for (WorkerState state = WorkerState::RUNNING; state != WorkerState::STOPPED;) {
|
||||
bool needToNotify = false;
|
||||
|
@ -153,8 +187,8 @@ class StreamWorker {
|
|||
mWorkerState == WorkerState::PAUSE_REQUESTED) {
|
||||
needToNotify = true;
|
||||
}
|
||||
mWorkerState = WorkerState::ERROR;
|
||||
state = WorkerState::STOPPED;
|
||||
state = mWorkerState = WorkerState::STOPPED;
|
||||
mError = "workerCycle failed";
|
||||
}
|
||||
if (needToNotify) {
|
||||
{
|
||||
|
@ -166,10 +200,13 @@ class StreamWorker {
|
|||
}
|
||||
}
|
||||
|
||||
std::string mThreadName;
|
||||
int mThreadPriority = ANDROID_PRIORITY_DEFAULT;
|
||||
std::thread mWorker;
|
||||
std::mutex mWorkerLock;
|
||||
std::condition_variable mWorkerCv;
|
||||
WorkerState mWorkerState GUARDED_BY(mWorkerLock) = WorkerState::STOPPED;
|
||||
std::string mError GUARDED_BY(mWorkerLock);
|
||||
// The atomic lock-free variable is used to prevent priority inversions
|
||||
// that can occur when a high priority worker tries to acquire the lock
|
||||
// which has been taken by a lower priority control thread which in its turn
|
||||
|
|
|
@ -14,8 +14,10 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include <pthread.h>
|
||||
#include <sched.h>
|
||||
#include <unistd.h>
|
||||
|
||||
#include <atomic>
|
||||
|
||||
#include <StreamWorker.h>
|
||||
|
@ -34,6 +36,7 @@ class TestWorker : public StreamWorker<TestWorker> {
|
|||
explicit TestWorker(TestStream* stream) : mStream(stream) {}
|
||||
|
||||
size_t getWorkerCycles() const { return mWorkerCycles; }
|
||||
int getPriority() const { return mPriority; }
|
||||
bool hasWorkerCycleCalled() const { return mWorkerCycles != 0; }
|
||||
bool hasNoWorkerCycleCalled(useconds_t usec) {
|
||||
const size_t cyclesBefore = mWorkerCycles;
|
||||
|
@ -41,8 +44,9 @@ class TestWorker : public StreamWorker<TestWorker> {
|
|||
return mWorkerCycles == cyclesBefore;
|
||||
}
|
||||
|
||||
bool workerInit() { return mStream; }
|
||||
std::string workerInit() { return mStream != nullptr ? "" : "Expected error"; }
|
||||
bool workerCycle() {
|
||||
mPriority = getpriority(PRIO_PROCESS, 0);
|
||||
do {
|
||||
mWorkerCycles++;
|
||||
} while (mWorkerCycles == 0);
|
||||
|
@ -52,6 +56,7 @@ class TestWorker : public StreamWorker<TestWorker> {
|
|||
private:
|
||||
TestStream* const mStream;
|
||||
std::atomic<size_t> mWorkerCycles = 0;
|
||||
std::atomic<int> mPriority = ANDROID_PRIORITY_DEFAULT;
|
||||
};
|
||||
|
||||
// The parameter specifies whether an extra call to 'stop' is made at the end.
|
||||
|
@ -219,4 +224,19 @@ TEST_P(StreamWorkerTest, MutexDoesNotBlockWorker) {
|
|||
EXPECT_FALSE(worker.hasError());
|
||||
}
|
||||
|
||||
TEST_P(StreamWorkerTest, ThreadName) {
|
||||
const std::string workerName = "TestWorker";
|
||||
ASSERT_TRUE(worker.start(workerName)) << worker.getError();
|
||||
char nameBuf[128];
|
||||
ASSERT_EQ(0, pthread_getname_np(worker.testGetThreadNativeHandle(), nameBuf, sizeof(nameBuf)));
|
||||
EXPECT_EQ(workerName, nameBuf);
|
||||
}
|
||||
|
||||
TEST_P(StreamWorkerTest, ThreadPriority) {
|
||||
const int priority = ANDROID_PRIORITY_LOWEST;
|
||||
ASSERT_TRUE(worker.start("", priority)) << worker.getError();
|
||||
worker.waitForAtLeastOneCycle();
|
||||
EXPECT_EQ(priority, worker.getPriority());
|
||||
}
|
||||
|
||||
INSTANTIATE_TEST_SUITE_P(StreamWorker, StreamWorkerTest, testing::Bool());
|
||||
|
|
Loading…
Reference in a new issue