Effect AIDL: Move EffectThread process into mutex am: b49631f4c2

Original change: https://android-review.googlesource.com/c/platform/hardware/interfaces/+/2419710

Change-Id: I6122fbd8cf344b1b7e889fae56524ec089ebe1e3
Signed-off-by: Automerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>
This commit is contained in:
Shunkai Yao 2023-02-08 22:40:53 +00:00 committed by Automerger Merge Worker
commit 5cf8828110
4 changed files with 100 additions and 105 deletions

View file

@ -70,26 +70,14 @@ RetCode EffectThread::destroyThread() {
}
RetCode EffectThread::startThread() {
if (!mThread.joinable()) {
LOG(ERROR) << __func__ << " thread already destroyed";
return RetCode::ERROR_THREAD;
}
{
std::lock_guard lg(mThreadMutex);
if (!mStop) {
LOG(WARNING) << __func__ << " already start";
return RetCode::SUCCESS;
}
mStop = false;
}
mCv.notify_one();
LOG(DEBUG) << __func__ << " done";
return RetCode::SUCCESS;
return handleStartStop(false /* stop */);
}
RetCode EffectThread::stopThread() {
return handleStartStop(true /* stop */);
}
RetCode EffectThread::handleStartStop(bool stop) {
if (!mThread.joinable()) {
LOG(ERROR) << __func__ << " thread already destroyed";
return RetCode::ERROR_THREAD;
@ -97,13 +85,15 @@ RetCode EffectThread::stopThread() {
{
std::lock_guard lg(mThreadMutex);
if (mStop) {
LOG(WARNING) << __func__ << " already stop";
if (stop == mStop) {
LOG(WARNING) << __func__ << " already " << stop ? "stop" : "start";
return RetCode::SUCCESS;
}
mStop = true;
mStop = stop;
}
LOG(DEBUG) << __func__ << " done";
mCv.notify_one();
LOG(DEBUG) << stop ? "stop done" : "start done";
return RetCode::SUCCESS;
}
@ -111,34 +101,23 @@ void EffectThread::threadLoop() {
pthread_setname_np(pthread_self(), mName.substr(0, kMaxTaskNameLen - 1).c_str());
setpriority(PRIO_PROCESS, 0, mPriority);
while (true) {
bool needExit = false;
{
std::unique_lock l(mThreadMutex);
mCv.wait(l, [&]() REQUIRES(mThreadMutex) {
needExit = mExit;
return mExit || !mStop;
});
}
if (needExit) {
std::unique_lock l(mThreadMutex);
::android::base::ScopedLockAssertion lock_assertion(mThreadMutex);
mCv.wait(l, [&]() REQUIRES(mThreadMutex) { return mExit || !mStop; });
if (mExit) {
LOG(WARNING) << __func__ << " EXIT!";
return;
}
process();
process_l();
}
}
void EffectThread::process() {
std::shared_ptr<EffectContext> context;
{
std::lock_guard lg(mThreadMutex);
context = mThreadContext;
RETURN_VALUE_IF(!context, void(), "nullContext");
}
std::shared_ptr<EffectContext::StatusMQ> statusMQ = context->getStatusFmq();
std::shared_ptr<EffectContext::DataMQ> inputMQ = context->getInputDataFmq();
std::shared_ptr<EffectContext::DataMQ> outputMQ = context->getOutputDataFmq();
auto buffer = context->getWorkBuffer();
void EffectThread::process_l() {
RETURN_VALUE_IF(!mThreadContext, void(), "nullContext");
std::shared_ptr<EffectContext::StatusMQ> statusMQ = mThreadContext->getStatusFmq();
std::shared_ptr<EffectContext::DataMQ> inputMQ = mThreadContext->getInputDataFmq();
std::shared_ptr<EffectContext::DataMQ> outputMQ = mThreadContext->getOutputDataFmq();
auto buffer = mThreadContext->getWorkBuffer();
// Only this worker will read from input data MQ and write to output data MQ.
auto readSamples = inputMQ->availableToRead(), writeSamples = outputMQ->availableToWrite();
@ -149,7 +128,6 @@ void EffectThread::process() {
inputMQ->read(buffer, processSamples);
// call effectProcessImpl without lock
IEffect::Status status = effectProcessImpl(buffer, buffer, processSamples);
outputMQ->write(buffer, status.fmqProduced);
statusMQ->writeBlocking(&status, 1);

View file

@ -54,6 +54,9 @@ class EffectThread {
* EffectThread will make sure effectProcessImpl only be called after startThread() successful
* and before stopThread() successful.
*
* effectProcessImpl implementation must not call any EffectThread interface, otherwise it will
* cause deadlock.
*
* @param in address of input float buffer.
* @param out address of output float buffer.
* @param samples number of samples to process.
@ -62,16 +65,11 @@ class EffectThread {
virtual IEffect::Status effectProcessImpl(float* in, float* out, int samples) = 0;
/**
* The default EffectThread::process() implementation doesn't need to lock. It will only
* access the FMQ and mWorkBuffer in EffectContext, since they will only be changed in
* EffectImpl IEffect::open() (in this case EffectThread just created and not running yet) and
* IEffect::command(CommandId::RESET) (in this case EffectThread already stopped).
*
* process() call effectProcessImpl for effect processing, and because effectProcessImpl is
* implemented by effects, process() must not hold lock before call into effectProcessImpl to
* avoid deadlock.
* process() call effectProcessImpl() for effect data processing, it is necessary for the
* processing to be called under Effect thread mutex mThreadMutex, to avoid the effect state
* change before/during data processing, and keep the thread and effect state consistent.
*/
virtual void process();
virtual void process_l() REQUIRES(mThreadMutex);
private:
const int kMaxTaskNameLen = 15;
@ -83,5 +81,7 @@ class EffectThread {
std::thread mThread;
int mPriority;
std::string mName;
RetCode handleStartStop(bool stop);
};
} // namespace aidl::android::hardware::audio::effect

View file

@ -148,16 +148,22 @@ class EffectHelper {
}
static void readFromFmq(std::unique_ptr<StatusMQ>& statusMq, size_t statusNum,
std::unique_ptr<DataMQ>& dataMq, size_t expectFloats,
std::vector<float>& buffer) {
std::vector<float>& buffer,
std::optional<int> expectStatus = STATUS_OK) {
if (0 == statusNum) {
ASSERT_EQ(0ul, statusMq->availableToRead());
return;
}
IEffect::Status status{};
ASSERT_TRUE(statusMq->readBlocking(&status, statusNum));
ASSERT_EQ(STATUS_OK, status.status);
if (statusNum != 0) {
ASSERT_EQ(expectFloats, (unsigned)status.fmqProduced);
ASSERT_EQ(expectFloats, dataMq->availableToRead());
if (expectFloats != 0) {
ASSERT_TRUE(dataMq->read(buffer.data(), expectFloats));
}
if (expectStatus.has_value()) {
ASSERT_EQ(expectStatus.value(), status.status);
}
ASSERT_EQ(expectFloats, (unsigned)status.fmqProduced);
ASSERT_EQ(expectFloats, dataMq->availableToRead());
if (expectFloats != 0) {
ASSERT_TRUE(dataMq->read(buffer.data(), expectFloats));
}
}
static Parameter::Common createParamCommon(

View file

@ -590,12 +590,14 @@ TEST_P(AudioEffectTest, ConsumeDataInProcessingState) {
ASSERT_NO_FATAL_FAILURE(expectState(mEffect, State::PROCESSING));
std::vector<float> buffer;
EffectHelper::allocateInputData(common, inputMQ, buffer);
EffectHelper::writeToFmq(inputMQ, buffer);
EffectHelper::readFromFmq(statusMQ, 1, outputMQ, buffer.size(), buffer);
EXPECT_NO_FATAL_FAILURE(EffectHelper::allocateInputData(common, inputMQ, buffer));
EXPECT_NO_FATAL_FAILURE(EffectHelper::writeToFmq(inputMQ, buffer));
EXPECT_NO_FATAL_FAILURE(
EffectHelper::readFromFmq(statusMQ, 1, outputMQ, buffer.size(), buffer));
ASSERT_NO_FATAL_FAILURE(command(mEffect, CommandId::STOP));
ASSERT_NO_FATAL_FAILURE(expectState(mEffect, State::IDLE));
EXPECT_NO_FATAL_FAILURE(EffectHelper::readFromFmq(statusMQ, 0, outputMQ, 0, buffer));
ASSERT_NO_FATAL_FAILURE(close(mEffect));
ASSERT_NO_FATAL_FAILURE(destroy(mFactory, mEffect));
@ -617,20 +619,24 @@ TEST_P(AudioEffectTest, ConsumeDataAfterRestart) {
auto outputMQ = std::make_unique<EffectHelper::DataMQ>(ret.outputDataMQ);
ASSERT_TRUE(outputMQ->isValid());
ASSERT_NO_FATAL_FAILURE(command(mEffect, CommandId::START));
ASSERT_NO_FATAL_FAILURE(expectState(mEffect, State::PROCESSING));
ASSERT_NO_FATAL_FAILURE(command(mEffect, CommandId::STOP));
ASSERT_NO_FATAL_FAILURE(expectState(mEffect, State::IDLE));
ASSERT_NO_FATAL_FAILURE(command(mEffect, CommandId::START));
ASSERT_NO_FATAL_FAILURE(expectState(mEffect, State::PROCESSING));
std::vector<float> buffer;
EffectHelper::allocateInputData(common, inputMQ, buffer);
EffectHelper::writeToFmq(inputMQ, buffer);
EffectHelper::readFromFmq(statusMQ, 1, outputMQ, buffer.size(), buffer);
ASSERT_NO_FATAL_FAILURE(command(mEffect, CommandId::START));
ASSERT_NO_FATAL_FAILURE(expectState(mEffect, State::PROCESSING));
ASSERT_NO_FATAL_FAILURE(command(mEffect, CommandId::STOP));
ASSERT_NO_FATAL_FAILURE(expectState(mEffect, State::IDLE));
EXPECT_NO_FATAL_FAILURE(
EffectHelper::readFromFmq(statusMQ, 0, outputMQ, buffer.size(), buffer));
ASSERT_NO_FATAL_FAILURE(command(mEffect, CommandId::START));
ASSERT_NO_FATAL_FAILURE(expectState(mEffect, State::PROCESSING));
EXPECT_NO_FATAL_FAILURE(EffectHelper::allocateInputData(common, inputMQ, buffer));
EXPECT_NO_FATAL_FAILURE(EffectHelper::writeToFmq(inputMQ, buffer));
EXPECT_NO_FATAL_FAILURE(
EffectHelper::readFromFmq(statusMQ, 1, outputMQ, buffer.size(), buffer));
ASSERT_NO_FATAL_FAILURE(command(mEffect, CommandId::STOP));
ASSERT_NO_FATAL_FAILURE(expectState(mEffect, State::IDLE));
EXPECT_NO_FATAL_FAILURE(EffectHelper::readFromFmq(statusMQ, 0, outputMQ, 0, buffer));
ASSERT_NO_FATAL_FAILURE(close(mEffect));
ASSERT_NO_FATAL_FAILURE(destroy(mFactory, mEffect));
@ -653,14 +659,14 @@ TEST_P(AudioEffectTest, SendDataAtIdleAndConsumeDataInProcessing) {
ASSERT_TRUE(outputMQ->isValid());
std::vector<float> buffer;
EffectHelper::allocateInputData(common, inputMQ, buffer);
EffectHelper::writeToFmq(inputMQ, buffer);
EffectHelper::readFromFmq(statusMQ, 0, outputMQ, 0, buffer);
EXPECT_NO_FATAL_FAILURE(EffectHelper::allocateInputData(common, inputMQ, buffer));
EXPECT_NO_FATAL_FAILURE(EffectHelper::writeToFmq(inputMQ, buffer));
ASSERT_NO_FATAL_FAILURE(command(mEffect, CommandId::START));
ASSERT_NO_FATAL_FAILURE(expectState(mEffect, State::PROCESSING));
EffectHelper::readFromFmq(statusMQ, 1, outputMQ, buffer.size(), buffer);
EXPECT_NO_FATAL_FAILURE(
EffectHelper::readFromFmq(statusMQ, 1, outputMQ, buffer.size(), buffer));
ASSERT_NO_FATAL_FAILURE(command(mEffect, CommandId::STOP));
ASSERT_NO_FATAL_FAILURE(expectState(mEffect, State::IDLE));
@ -686,31 +692,30 @@ TEST_P(AudioEffectTest, ProcessDataMultipleTimes) {
ASSERT_TRUE(outputMQ->isValid());
std::vector<float> buffer;
EffectHelper::allocateInputData(common, inputMQ, buffer);
EffectHelper::writeToFmq(inputMQ, buffer);
EffectHelper::readFromFmq(statusMQ, 0, outputMQ, 0, buffer);
EXPECT_NO_FATAL_FAILURE(EffectHelper::allocateInputData(common, inputMQ, buffer));
EXPECT_NO_FATAL_FAILURE(EffectHelper::writeToFmq(inputMQ, buffer));
EXPECT_NO_FATAL_FAILURE(EffectHelper::readFromFmq(statusMQ, 0, outputMQ, 0, buffer));
ASSERT_NO_FATAL_FAILURE(command(mEffect, CommandId::START));
ASSERT_NO_FATAL_FAILURE(expectState(mEffect, State::PROCESSING));
EffectHelper::readFromFmq(statusMQ, 1, outputMQ, buffer.size(), buffer);
// expect no status and data after consume
EffectHelper::readFromFmq(statusMQ, 0, outputMQ, 0, buffer);
EXPECT_NO_FATAL_FAILURE(
EffectHelper::readFromFmq(statusMQ, 1, outputMQ, buffer.size(), buffer));
EffectHelper::writeToFmq(inputMQ, buffer);
EffectHelper::readFromFmq(statusMQ, 1, outputMQ, buffer.size(), buffer);
// expect no status and data after consume
EffectHelper::readFromFmq(statusMQ, 0, outputMQ, 0, buffer);
EXPECT_NO_FATAL_FAILURE(EffectHelper::writeToFmq(inputMQ, buffer));
EXPECT_NO_FATAL_FAILURE(
EffectHelper::readFromFmq(statusMQ, 1, outputMQ, buffer.size(), buffer));
ASSERT_NO_FATAL_FAILURE(command(mEffect, CommandId::STOP));
ASSERT_NO_FATAL_FAILURE(expectState(mEffect, State::IDLE));
EXPECT_NO_FATAL_FAILURE(EffectHelper::readFromFmq(statusMQ, 0, outputMQ, 0, buffer));
ASSERT_NO_FATAL_FAILURE(close(mEffect));
ASSERT_NO_FATAL_FAILURE(destroy(mFactory, mEffect));
}
// Send data to IDLE state effects and expect it not be consumed.
TEST_P(AudioEffectTest, NotConsumeDataInIdleState) {
// Send data to processing state effects, stop, and restart.
TEST_P(AudioEffectTest, ConsumeDataAndRestart) {
ASSERT_NO_FATAL_FAILURE(create(mFactory, mEffect, mDescriptor));
Parameter::Common common = EffectHelper::createParamCommon(
@ -727,17 +732,21 @@ TEST_P(AudioEffectTest, NotConsumeDataInIdleState) {
ASSERT_NO_FATAL_FAILURE(command(mEffect, CommandId::START));
ASSERT_NO_FATAL_FAILURE(expectState(mEffect, State::PROCESSING));
std::vector<float> buffer;
EXPECT_NO_FATAL_FAILURE(EffectHelper::allocateInputData(common, inputMQ, buffer));
EXPECT_NO_FATAL_FAILURE(EffectHelper::writeToFmq(inputMQ, buffer));
EXPECT_NO_FATAL_FAILURE(
EffectHelper::readFromFmq(statusMQ, 1, outputMQ, buffer.size(), buffer));
ASSERT_NO_FATAL_FAILURE(command(mEffect, CommandId::STOP));
ASSERT_NO_FATAL_FAILURE(expectState(mEffect, State::IDLE));
std::vector<float> buffer;
EffectHelper::allocateInputData(common, inputMQ, buffer);
EffectHelper::writeToFmq(inputMQ, buffer);
EffectHelper::readFromFmq(statusMQ, 0, outputMQ, 0, buffer);
EXPECT_NO_FATAL_FAILURE(EffectHelper::writeToFmq(inputMQ, buffer));
EXPECT_NO_FATAL_FAILURE(EffectHelper::readFromFmq(statusMQ, 0, outputMQ, 0, buffer));
ASSERT_NO_FATAL_FAILURE(command(mEffect, CommandId::START));
ASSERT_NO_FATAL_FAILURE(expectState(mEffect, State::PROCESSING));
EffectHelper::readFromFmq(statusMQ, 1, outputMQ, buffer.size(), buffer);
EXPECT_NO_FATAL_FAILURE(
EffectHelper::readFromFmq(statusMQ, 1, outputMQ, buffer.size(), buffer));
ASSERT_NO_FATAL_FAILURE(command(mEffect, CommandId::STOP));
ASSERT_NO_FATAL_FAILURE(expectState(mEffect, State::IDLE));
@ -765,9 +774,9 @@ TEST_P(AudioEffectTest, NotConsumeDataByClosedEffect) {
ASSERT_TRUE(outputMQ->isValid());
std::vector<float> buffer;
EffectHelper::allocateInputData(common, inputMQ, buffer);
EffectHelper::writeToFmq(inputMQ, buffer);
EffectHelper::readFromFmq(statusMQ, 0, outputMQ, 0, buffer);
EXPECT_NO_FATAL_FAILURE(EffectHelper::allocateInputData(common, inputMQ, buffer));
EXPECT_NO_FATAL_FAILURE(EffectHelper::writeToFmq(inputMQ, buffer));
EXPECT_NO_FATAL_FAILURE(EffectHelper::readFromFmq(statusMQ, 0, outputMQ, 0, buffer));
ASSERT_NO_FATAL_FAILURE(destroy(mFactory, mEffect));
}
@ -800,9 +809,10 @@ TEST_P(AudioEffectTest, ConsumeDataMultipleEffects) {
ASSERT_TRUE(outputMQ1->isValid());
std::vector<float> buffer1, buffer2;
EffectHelper::allocateInputData(common1, inputMQ1, buffer1);
EffectHelper::writeToFmq(inputMQ1, buffer1);
EffectHelper::readFromFmq(statusMQ1, 1, outputMQ1, buffer1.size(), buffer1);
EXPECT_NO_FATAL_FAILURE(EffectHelper::allocateInputData(common1, inputMQ1, buffer1));
EXPECT_NO_FATAL_FAILURE(EffectHelper::writeToFmq(inputMQ1, buffer1));
EXPECT_NO_FATAL_FAILURE(
EffectHelper::readFromFmq(statusMQ1, 1, outputMQ1, buffer1.size(), buffer1));
auto statusMQ2 = std::make_unique<EffectHelper::StatusMQ>(ret2.statusMQ);
ASSERT_TRUE(statusMQ2->isValid());
@ -810,9 +820,10 @@ TEST_P(AudioEffectTest, ConsumeDataMultipleEffects) {
ASSERT_TRUE(inputMQ2->isValid());
auto outputMQ2 = std::make_unique<EffectHelper::DataMQ>(ret2.outputDataMQ);
ASSERT_TRUE(outputMQ2->isValid());
EffectHelper::allocateInputData(common2, inputMQ2, buffer2);
EffectHelper::writeToFmq(inputMQ2, buffer2);
EffectHelper::readFromFmq(statusMQ2, 1, outputMQ2, buffer2.size(), buffer2);
EXPECT_NO_FATAL_FAILURE(EffectHelper::allocateInputData(common2, inputMQ2, buffer2));
EXPECT_NO_FATAL_FAILURE(EffectHelper::writeToFmq(inputMQ2, buffer2));
EXPECT_NO_FATAL_FAILURE(
EffectHelper::readFromFmq(statusMQ2, 1, outputMQ2, buffer2.size(), buffer2));
ASSERT_NO_FATAL_FAILURE(command(effect1, CommandId::STOP));
ASSERT_NO_FATAL_FAILURE(expectState(effect1, State::IDLE));