audio: Allow stopping a StreamWorker from the looping thread am: 48d3115614

Original change: https://android-review.googlesource.com/c/platform/hardware/interfaces/+/2199344

Change-Id: Ie0c5c6ee63b0e017dd909d7c894beacbddb8ce18
Signed-off-by: Automerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>
This commit is contained in:
Mikhail Naganov 2022-08-30 17:25:02 +00:00 committed by Automerger Merge Worker
commit 33b5a6d8b4
2 changed files with 78 additions and 34 deletions

View file

@ -29,11 +29,15 @@
#include <android-base/thread_annotations.h>
#include <system/thread_defs.h>
namespace android::hardware::audio::common {
template <typename Impl>
class StreamWorker {
enum class WorkerState { STOPPED, RUNNING, PAUSE_REQUESTED, PAUSED, RESUME_REQUESTED };
public:
enum class WorkerStatus { ABORT, CONTINUE, EXIT };
StreamWorker() = default;
~StreamWorker() { stop(); }
// Note that 'priority' here is what is known as the 'nice number' in *nix systems.
@ -66,8 +70,7 @@ class StreamWorker {
void stop() {
{
std::lock_guard<std::mutex> lock(mWorkerLock);
if (mError.empty()) {
if (mWorkerState == WorkerState::STOPPED) return;
if (mWorkerState != WorkerState::STOPPED) {
mWorkerState = WorkerState::STOPPED;
mWorkerStateChangeRequest = true;
}
@ -91,18 +94,22 @@ class StreamWorker {
// Methods that need to be provided by subclasses:
//
// Called once at the beginning of the thread loop. Must return
// an empty string to enter the thread loop, otherwise the thread loop
// exits and the worker switches into the 'error' state, setting
// the error to the returned value.
// /* Called once at the beginning of the thread loop. Must return
// * an empty string to enter the thread loop, otherwise the thread loop
// * exits and the worker switches into the 'error' state, setting
// * the error to the returned value.
// */
// std::string workerInit();
//
// Called for each thread loop unless the thread is in 'paused' state.
// Must return 'true' to continue running, otherwise the thread loop
// exits and the worker switches into the 'error' state with a generic
// error message. It is recommended that the subclass reports any
// problems via logging facilities.
// bool workerCycle();
// /* Called for each thread loop unless the thread is in 'paused' state.
// * Must return 'CONTINUE' to continue running, otherwise the thread loop
// * exits. If the result from worker cycle is 'ABORT' then the worker switches
// * into the 'error' state with a generic error message. It is recommended that
// * the subclass reports any problems via logging facilities. Returning the 'EXIT'
// * status is equivalent to calling 'stop()' method. This is just a way of
// * of stopping the worker by its own initiative.
// */
// WorkerStatus workerCycle();
private:
void switchWorkerStateSync(WorkerState oldState, WorkerState newState,
@ -146,8 +153,10 @@ class StreamWorker {
for (WorkerState state = WorkerState::RUNNING; state != WorkerState::STOPPED;) {
bool needToNotify = false;
if (state != WorkerState::PAUSED ? static_cast<Impl*>(this)->workerCycle()
: (sched_yield(), true)) {
if (WorkerStatus status = state != WorkerState::PAUSED
? static_cast<Impl*>(this)->workerCycle()
: (sched_yield(), WorkerStatus::CONTINUE);
status == WorkerStatus::CONTINUE) {
{
// See https://developer.android.com/training/articles/smp#nonracing
android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
@ -188,7 +197,9 @@ class StreamWorker {
needToNotify = true;
}
state = mWorkerState = WorkerState::STOPPED;
mError = "workerCycle failed";
if (status == WorkerStatus::ABORT) {
mError = "workerCycle aborted";
}
}
if (needToNotify) {
{
@ -218,3 +229,5 @@ class StreamWorker {
static_assert(std::atomic<bool>::is_always_lock_free);
std::atomic<bool> mWorkerStateChangeRequest GUARDED_BY(mWorkerLock) = false;
};
} // namespace android::hardware::audio::common

View file

@ -26,14 +26,18 @@
#define LOG_TAG "StreamWorker_Test"
#include <log/log.h>
struct TestStream {
std::atomic<bool> error = false;
};
using android::hardware::audio::common::StreamWorker;
class TestWorker : public StreamWorker<TestWorker> {
public:
struct Stream {
void setErrorStatus() { status = WorkerStatus::ABORT; }
void setStopStatus() { status = WorkerStatus::EXIT; }
std::atomic<WorkerStatus> status = WorkerStatus::CONTINUE;
};
// Use nullptr to test error reporting from the worker thread.
explicit TestWorker(TestStream* stream) : mStream(stream) {}
explicit TestWorker(Stream* stream) : mStream(stream) {}
size_t getWorkerCycles() const { return mWorkerCycles; }
int getPriority() const { return mPriority; }
@ -45,16 +49,16 @@ class TestWorker : public StreamWorker<TestWorker> {
}
std::string workerInit() { return mStream != nullptr ? "" : "Expected error"; }
bool workerCycle() {
WorkerStatus workerCycle() {
mPriority = getpriority(PRIO_PROCESS, 0);
do {
mWorkerCycles++;
} while (mWorkerCycles == 0);
return !mStream->error;
return mStream->status;
}
private:
TestStream* const mStream;
Stream* const mStream;
std::atomic<size_t> mWorkerCycles = 0;
std::atomic<int> mPriority = ANDROID_PRIORITY_DEFAULT;
};
@ -70,7 +74,8 @@ class StreamWorkerInvalidTest : public testing::TestWithParam<bool> {
}
protected:
StreamWorkerInvalidTest(TestStream* stream) : testing::TestWithParam<bool>(), worker(stream) {}
StreamWorkerInvalidTest(TestWorker::Stream* stream)
: testing::TestWithParam<bool>(), worker(stream) {}
TestWorker worker;
};
@ -118,7 +123,7 @@ class StreamWorkerTest : public StreamWorkerInvalidTest {
StreamWorkerTest() : StreamWorkerInvalidTest(&stream) {}
protected:
TestStream stream;
TestWorker::Stream stream;
};
static constexpr unsigned kWorkerIdleCheckTime = 50 * 1000;
@ -130,21 +135,47 @@ TEST_P(StreamWorkerTest, Uninitialized) {
TEST_P(StreamWorkerTest, Start) {
ASSERT_TRUE(worker.start());
EXPECT_TRUE(worker.waitForAtLeastOneCycle());
EXPECT_FALSE(worker.hasError());
}
TEST_P(StreamWorkerTest, StartStop) {
ASSERT_TRUE(worker.start());
EXPECT_TRUE(worker.waitForAtLeastOneCycle());
EXPECT_FALSE(worker.hasError());
worker.stop();
EXPECT_FALSE(worker.hasError());
}
TEST_P(StreamWorkerTest, WorkerExit) {
ASSERT_TRUE(worker.start());
stream.setStopStatus();
worker.waitForAtLeastOneCycle();
EXPECT_FALSE(worker.hasError());
EXPECT_TRUE(worker.hasNoWorkerCycleCalled(kWorkerIdleCheckTime));
}
TEST_P(StreamWorkerTest, WorkerError) {
ASSERT_TRUE(worker.start());
stream.error = true;
stream.setErrorStatus();
worker.waitForAtLeastOneCycle();
EXPECT_TRUE(worker.hasError());
EXPECT_TRUE(worker.hasNoWorkerCycleCalled(kWorkerIdleCheckTime));
}
TEST_P(StreamWorkerTest, StopAfterError) {
ASSERT_TRUE(worker.start());
stream.setErrorStatus();
worker.waitForAtLeastOneCycle();
EXPECT_TRUE(worker.hasError());
EXPECT_TRUE(worker.hasNoWorkerCycleCalled(kWorkerIdleCheckTime));
worker.stop();
EXPECT_TRUE(worker.hasError());
}
TEST_P(StreamWorkerTest, PauseResume) {
ASSERT_TRUE(worker.start());
worker.waitForAtLeastOneCycle();
EXPECT_TRUE(worker.waitForAtLeastOneCycle());
EXPECT_FALSE(worker.hasError());
worker.pause();
EXPECT_TRUE(worker.hasNoWorkerCycleCalled(kWorkerIdleCheckTime));
@ -158,7 +189,7 @@ TEST_P(StreamWorkerTest, PauseResume) {
TEST_P(StreamWorkerTest, StopPaused) {
ASSERT_TRUE(worker.start());
worker.waitForAtLeastOneCycle();
EXPECT_TRUE(worker.waitForAtLeastOneCycle());
EXPECT_FALSE(worker.hasError());
worker.pause();
worker.stop();
@ -167,7 +198,7 @@ TEST_P(StreamWorkerTest, StopPaused) {
TEST_P(StreamWorkerTest, PauseAfterErrorIgnored) {
ASSERT_TRUE(worker.start());
stream.error = true;
stream.setErrorStatus();
worker.waitForAtLeastOneCycle();
EXPECT_TRUE(worker.hasError());
worker.pause();
@ -177,7 +208,7 @@ TEST_P(StreamWorkerTest, PauseAfterErrorIgnored) {
TEST_P(StreamWorkerTest, ResumeAfterErrorIgnored) {
ASSERT_TRUE(worker.start());
stream.error = true;
stream.setErrorStatus();
worker.waitForAtLeastOneCycle();
EXPECT_TRUE(worker.hasError());
worker.resume();
@ -187,11 +218,11 @@ TEST_P(StreamWorkerTest, ResumeAfterErrorIgnored) {
TEST_P(StreamWorkerTest, WorkerErrorOnResume) {
ASSERT_TRUE(worker.start());
worker.waitForAtLeastOneCycle();
EXPECT_TRUE(worker.waitForAtLeastOneCycle());
EXPECT_FALSE(worker.hasError());
worker.pause();
EXPECT_FALSE(worker.hasError());
stream.error = true;
stream.setErrorStatus();
EXPECT_FALSE(worker.hasError());
worker.resume();
worker.waitForAtLeastOneCycle();
@ -208,7 +239,7 @@ TEST_P(StreamWorkerTest, WaitForAtLeastOneCycle) {
TEST_P(StreamWorkerTest, WaitForAtLeastOneCycleError) {
ASSERT_TRUE(worker.start());
stream.error = true;
stream.setErrorStatus();
EXPECT_FALSE(worker.waitForAtLeastOneCycle());
}
@ -220,7 +251,7 @@ TEST_P(StreamWorkerTest, MutexDoesNotBlockWorker) {
usleep(kWorkerIdleCheckTime);
}
worker.testLockUnlockMutex(false);
worker.waitForAtLeastOneCycle();
EXPECT_TRUE(worker.waitForAtLeastOneCycle());
EXPECT_FALSE(worker.hasError());
}
@ -235,7 +266,7 @@ TEST_P(StreamWorkerTest, ThreadName) {
TEST_P(StreamWorkerTest, ThreadPriority) {
const int priority = ANDROID_PRIORITY_LOWEST;
ASSERT_TRUE(worker.start("", priority)) << worker.getError();
worker.waitForAtLeastOneCycle();
EXPECT_TRUE(worker.waitForAtLeastOneCycle());
EXPECT_EQ(priority, worker.getPriority());
}