audio: Allow stopping a StreamWorker from the looping thread am: 48d3115614
am: 33b5a6d8b4
am: 8883713e20
Original change: https://android-review.googlesource.com/c/platform/hardware/interfaces/+/2199344 Change-Id: I6367520a55e69da3f866d657d115b04d9e86b0f6 Signed-off-by: Automerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>
This commit is contained in:
commit
783ebc4737
2 changed files with 78 additions and 34 deletions
|
@ -29,11 +29,15 @@
|
|||
#include <android-base/thread_annotations.h>
|
||||
#include <system/thread_defs.h>
|
||||
|
||||
namespace android::hardware::audio::common {
|
||||
|
||||
template <typename Impl>
|
||||
class StreamWorker {
|
||||
enum class WorkerState { STOPPED, RUNNING, PAUSE_REQUESTED, PAUSED, RESUME_REQUESTED };
|
||||
|
||||
public:
|
||||
enum class WorkerStatus { ABORT, CONTINUE, EXIT };
|
||||
|
||||
StreamWorker() = default;
|
||||
~StreamWorker() { stop(); }
|
||||
// Note that 'priority' here is what is known as the 'nice number' in *nix systems.
|
||||
|
@ -66,8 +70,7 @@ class StreamWorker {
|
|||
void stop() {
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(mWorkerLock);
|
||||
if (mError.empty()) {
|
||||
if (mWorkerState == WorkerState::STOPPED) return;
|
||||
if (mWorkerState != WorkerState::STOPPED) {
|
||||
mWorkerState = WorkerState::STOPPED;
|
||||
mWorkerStateChangeRequest = true;
|
||||
}
|
||||
|
@ -91,18 +94,22 @@ class StreamWorker {
|
|||
|
||||
// Methods that need to be provided by subclasses:
|
||||
//
|
||||
// Called once at the beginning of the thread loop. Must return
|
||||
// an empty string to enter the thread loop, otherwise the thread loop
|
||||
// exits and the worker switches into the 'error' state, setting
|
||||
// the error to the returned value.
|
||||
// /* Called once at the beginning of the thread loop. Must return
|
||||
// * an empty string to enter the thread loop, otherwise the thread loop
|
||||
// * exits and the worker switches into the 'error' state, setting
|
||||
// * the error to the returned value.
|
||||
// */
|
||||
// std::string workerInit();
|
||||
//
|
||||
// Called for each thread loop unless the thread is in 'paused' state.
|
||||
// Must return 'true' to continue running, otherwise the thread loop
|
||||
// exits and the worker switches into the 'error' state with a generic
|
||||
// error message. It is recommended that the subclass reports any
|
||||
// problems via logging facilities.
|
||||
// bool workerCycle();
|
||||
// /* Called for each thread loop unless the thread is in 'paused' state.
|
||||
// * Must return 'CONTINUE' to continue running, otherwise the thread loop
|
||||
// * exits. If the result from worker cycle is 'ABORT' then the worker switches
|
||||
// * into the 'error' state with a generic error message. It is recommended that
|
||||
// * the subclass reports any problems via logging facilities. Returning the 'EXIT'
|
||||
// * status is equivalent to calling 'stop()' method. This is just a way of
|
||||
// * of stopping the worker by its own initiative.
|
||||
// */
|
||||
// WorkerStatus workerCycle();
|
||||
|
||||
private:
|
||||
void switchWorkerStateSync(WorkerState oldState, WorkerState newState,
|
||||
|
@ -146,8 +153,10 @@ class StreamWorker {
|
|||
|
||||
for (WorkerState state = WorkerState::RUNNING; state != WorkerState::STOPPED;) {
|
||||
bool needToNotify = false;
|
||||
if (state != WorkerState::PAUSED ? static_cast<Impl*>(this)->workerCycle()
|
||||
: (sched_yield(), true)) {
|
||||
if (WorkerStatus status = state != WorkerState::PAUSED
|
||||
? static_cast<Impl*>(this)->workerCycle()
|
||||
: (sched_yield(), WorkerStatus::CONTINUE);
|
||||
status == WorkerStatus::CONTINUE) {
|
||||
{
|
||||
// See https://developer.android.com/training/articles/smp#nonracing
|
||||
android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
|
||||
|
@ -188,7 +197,9 @@ class StreamWorker {
|
|||
needToNotify = true;
|
||||
}
|
||||
state = mWorkerState = WorkerState::STOPPED;
|
||||
mError = "workerCycle failed";
|
||||
if (status == WorkerStatus::ABORT) {
|
||||
mError = "workerCycle aborted";
|
||||
}
|
||||
}
|
||||
if (needToNotify) {
|
||||
{
|
||||
|
@ -218,3 +229,5 @@ class StreamWorker {
|
|||
static_assert(std::atomic<bool>::is_always_lock_free);
|
||||
std::atomic<bool> mWorkerStateChangeRequest GUARDED_BY(mWorkerLock) = false;
|
||||
};
|
||||
|
||||
} // namespace android::hardware::audio::common
|
||||
|
|
|
@ -26,14 +26,18 @@
|
|||
#define LOG_TAG "StreamWorker_Test"
|
||||
#include <log/log.h>
|
||||
|
||||
struct TestStream {
|
||||
std::atomic<bool> error = false;
|
||||
};
|
||||
using android::hardware::audio::common::StreamWorker;
|
||||
|
||||
class TestWorker : public StreamWorker<TestWorker> {
|
||||
public:
|
||||
struct Stream {
|
||||
void setErrorStatus() { status = WorkerStatus::ABORT; }
|
||||
void setStopStatus() { status = WorkerStatus::EXIT; }
|
||||
std::atomic<WorkerStatus> status = WorkerStatus::CONTINUE;
|
||||
};
|
||||
|
||||
// Use nullptr to test error reporting from the worker thread.
|
||||
explicit TestWorker(TestStream* stream) : mStream(stream) {}
|
||||
explicit TestWorker(Stream* stream) : mStream(stream) {}
|
||||
|
||||
size_t getWorkerCycles() const { return mWorkerCycles; }
|
||||
int getPriority() const { return mPriority; }
|
||||
|
@ -45,16 +49,16 @@ class TestWorker : public StreamWorker<TestWorker> {
|
|||
}
|
||||
|
||||
std::string workerInit() { return mStream != nullptr ? "" : "Expected error"; }
|
||||
bool workerCycle() {
|
||||
WorkerStatus workerCycle() {
|
||||
mPriority = getpriority(PRIO_PROCESS, 0);
|
||||
do {
|
||||
mWorkerCycles++;
|
||||
} while (mWorkerCycles == 0);
|
||||
return !mStream->error;
|
||||
return mStream->status;
|
||||
}
|
||||
|
||||
private:
|
||||
TestStream* const mStream;
|
||||
Stream* const mStream;
|
||||
std::atomic<size_t> mWorkerCycles = 0;
|
||||
std::atomic<int> mPriority = ANDROID_PRIORITY_DEFAULT;
|
||||
};
|
||||
|
@ -70,7 +74,8 @@ class StreamWorkerInvalidTest : public testing::TestWithParam<bool> {
|
|||
}
|
||||
|
||||
protected:
|
||||
StreamWorkerInvalidTest(TestStream* stream) : testing::TestWithParam<bool>(), worker(stream) {}
|
||||
StreamWorkerInvalidTest(TestWorker::Stream* stream)
|
||||
: testing::TestWithParam<bool>(), worker(stream) {}
|
||||
TestWorker worker;
|
||||
};
|
||||
|
||||
|
@ -118,7 +123,7 @@ class StreamWorkerTest : public StreamWorkerInvalidTest {
|
|||
StreamWorkerTest() : StreamWorkerInvalidTest(&stream) {}
|
||||
|
||||
protected:
|
||||
TestStream stream;
|
||||
TestWorker::Stream stream;
|
||||
};
|
||||
|
||||
static constexpr unsigned kWorkerIdleCheckTime = 50 * 1000;
|
||||
|
@ -130,21 +135,47 @@ TEST_P(StreamWorkerTest, Uninitialized) {
|
|||
|
||||
TEST_P(StreamWorkerTest, Start) {
|
||||
ASSERT_TRUE(worker.start());
|
||||
EXPECT_TRUE(worker.waitForAtLeastOneCycle());
|
||||
EXPECT_FALSE(worker.hasError());
|
||||
}
|
||||
|
||||
TEST_P(StreamWorkerTest, StartStop) {
|
||||
ASSERT_TRUE(worker.start());
|
||||
EXPECT_TRUE(worker.waitForAtLeastOneCycle());
|
||||
EXPECT_FALSE(worker.hasError());
|
||||
worker.stop();
|
||||
EXPECT_FALSE(worker.hasError());
|
||||
}
|
||||
|
||||
TEST_P(StreamWorkerTest, WorkerExit) {
|
||||
ASSERT_TRUE(worker.start());
|
||||
stream.setStopStatus();
|
||||
worker.waitForAtLeastOneCycle();
|
||||
EXPECT_FALSE(worker.hasError());
|
||||
EXPECT_TRUE(worker.hasNoWorkerCycleCalled(kWorkerIdleCheckTime));
|
||||
}
|
||||
|
||||
TEST_P(StreamWorkerTest, WorkerError) {
|
||||
ASSERT_TRUE(worker.start());
|
||||
stream.error = true;
|
||||
stream.setErrorStatus();
|
||||
worker.waitForAtLeastOneCycle();
|
||||
EXPECT_TRUE(worker.hasError());
|
||||
EXPECT_TRUE(worker.hasNoWorkerCycleCalled(kWorkerIdleCheckTime));
|
||||
}
|
||||
|
||||
TEST_P(StreamWorkerTest, StopAfterError) {
|
||||
ASSERT_TRUE(worker.start());
|
||||
stream.setErrorStatus();
|
||||
worker.waitForAtLeastOneCycle();
|
||||
EXPECT_TRUE(worker.hasError());
|
||||
EXPECT_TRUE(worker.hasNoWorkerCycleCalled(kWorkerIdleCheckTime));
|
||||
worker.stop();
|
||||
EXPECT_TRUE(worker.hasError());
|
||||
}
|
||||
|
||||
TEST_P(StreamWorkerTest, PauseResume) {
|
||||
ASSERT_TRUE(worker.start());
|
||||
worker.waitForAtLeastOneCycle();
|
||||
EXPECT_TRUE(worker.waitForAtLeastOneCycle());
|
||||
EXPECT_FALSE(worker.hasError());
|
||||
worker.pause();
|
||||
EXPECT_TRUE(worker.hasNoWorkerCycleCalled(kWorkerIdleCheckTime));
|
||||
|
@ -158,7 +189,7 @@ TEST_P(StreamWorkerTest, PauseResume) {
|
|||
|
||||
TEST_P(StreamWorkerTest, StopPaused) {
|
||||
ASSERT_TRUE(worker.start());
|
||||
worker.waitForAtLeastOneCycle();
|
||||
EXPECT_TRUE(worker.waitForAtLeastOneCycle());
|
||||
EXPECT_FALSE(worker.hasError());
|
||||
worker.pause();
|
||||
worker.stop();
|
||||
|
@ -167,7 +198,7 @@ TEST_P(StreamWorkerTest, StopPaused) {
|
|||
|
||||
TEST_P(StreamWorkerTest, PauseAfterErrorIgnored) {
|
||||
ASSERT_TRUE(worker.start());
|
||||
stream.error = true;
|
||||
stream.setErrorStatus();
|
||||
worker.waitForAtLeastOneCycle();
|
||||
EXPECT_TRUE(worker.hasError());
|
||||
worker.pause();
|
||||
|
@ -177,7 +208,7 @@ TEST_P(StreamWorkerTest, PauseAfterErrorIgnored) {
|
|||
|
||||
TEST_P(StreamWorkerTest, ResumeAfterErrorIgnored) {
|
||||
ASSERT_TRUE(worker.start());
|
||||
stream.error = true;
|
||||
stream.setErrorStatus();
|
||||
worker.waitForAtLeastOneCycle();
|
||||
EXPECT_TRUE(worker.hasError());
|
||||
worker.resume();
|
||||
|
@ -187,11 +218,11 @@ TEST_P(StreamWorkerTest, ResumeAfterErrorIgnored) {
|
|||
|
||||
TEST_P(StreamWorkerTest, WorkerErrorOnResume) {
|
||||
ASSERT_TRUE(worker.start());
|
||||
worker.waitForAtLeastOneCycle();
|
||||
EXPECT_TRUE(worker.waitForAtLeastOneCycle());
|
||||
EXPECT_FALSE(worker.hasError());
|
||||
worker.pause();
|
||||
EXPECT_FALSE(worker.hasError());
|
||||
stream.error = true;
|
||||
stream.setErrorStatus();
|
||||
EXPECT_FALSE(worker.hasError());
|
||||
worker.resume();
|
||||
worker.waitForAtLeastOneCycle();
|
||||
|
@ -208,7 +239,7 @@ TEST_P(StreamWorkerTest, WaitForAtLeastOneCycle) {
|
|||
|
||||
TEST_P(StreamWorkerTest, WaitForAtLeastOneCycleError) {
|
||||
ASSERT_TRUE(worker.start());
|
||||
stream.error = true;
|
||||
stream.setErrorStatus();
|
||||
EXPECT_FALSE(worker.waitForAtLeastOneCycle());
|
||||
}
|
||||
|
||||
|
@ -220,7 +251,7 @@ TEST_P(StreamWorkerTest, MutexDoesNotBlockWorker) {
|
|||
usleep(kWorkerIdleCheckTime);
|
||||
}
|
||||
worker.testLockUnlockMutex(false);
|
||||
worker.waitForAtLeastOneCycle();
|
||||
EXPECT_TRUE(worker.waitForAtLeastOneCycle());
|
||||
EXPECT_FALSE(worker.hasError());
|
||||
}
|
||||
|
||||
|
@ -235,7 +266,7 @@ TEST_P(StreamWorkerTest, ThreadName) {
|
|||
TEST_P(StreamWorkerTest, ThreadPriority) {
|
||||
const int priority = ANDROID_PRIORITY_LOWEST;
|
||||
ASSERT_TRUE(worker.start("", priority)) << worker.getError();
|
||||
worker.waitForAtLeastOneCycle();
|
||||
EXPECT_TRUE(worker.waitForAtLeastOneCycle());
|
||||
EXPECT_EQ(priority, worker.getPriority());
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue