audiohal: Make sure audio data transfer related commands go via FMQ

When outputting audio, the framework issues several HAL calls
from the same thread that writes into data FMQ. These calls
also need to be served on the same thread that writes audio data
to HAL. The same thing happens when audio input is commenced.

Add a command FMQ for passing different commands to the HAL thread.
This way, depending on the calling thread, the same call may go
either via hwbinder or via the command queue.

This dramatically reduces jitter in RTT measurements (although
doesn't improve the latency).

Bug: 30222631
Test: scripted RTT app
Change-Id: I04c826e2479d8210fd9c99756241156cda3143b6
This commit is contained in:
Mikhail Naganov 2017-01-31 13:56:02 -08:00
parent bbd6adda5a
commit a468fa84d1
6 changed files with 226 additions and 84 deletions

View file

@ -40,6 +40,26 @@ interface IStreamIn extends IStream {
*/ */
setGain(float gain) generates (Result retval); setGain(float gain) generates (Result retval);
/*
* Commands that can be executed on the driver reader thread.
*/
enum ReadCommand : int32_t {
READ,
GET_CAPTURE_POSITION
};
/*
* Data structure passed to the driver for executing commands
* on the driver reader thread.
*/
struct ReadParameters {
ReadCommand command; // discriminator
union Params {
uint64_t read; // READ command, amount of bytes to read, >= 0.
// No parameters for GET_CAPTURE_POSITION.
} params;
};
/* /*
* Data structure passed back to the client via status message queue * Data structure passed back to the client via status message queue
* of 'read' operation. * of 'read' operation.
@ -51,24 +71,36 @@ interface IStreamIn extends IStream {
*/ */
struct ReadStatus { struct ReadStatus {
Result retval; Result retval;
uint64_t read; ReadCommand replyTo; // discriminator
union Reply {
uint64_t read; // READ command, amount of bytes read, >= 0.
struct CapturePosition { // same as generated by getCapturePosition.
uint64_t frames;
uint64_t time;
} capturePosition;
} reply;
}; };
/* /*
* Set up required transports for receiving audio buffers from the driver. * Set up required transports for receiving audio buffers from the driver.
* *
* The transport consists of two message queues: one is used for passing * The transport consists of three message queues:
* audio data from the driver to the client, another is used for reporting * -- command queue is used to instruct the reader thread what operation
* read operation status (amount of bytes actually read or error code), * to perform;
* see ReadStatus structure definition. * -- data queue is used for passing audio data from the driver
* to the client;
* -- status queue is used for reporting operation status
* (e.g. amount of bytes actually read or error code).
* The driver operates on a dedicated thread.
* *
* @param frameSize the size of a single frame, in bytes. * @param frameSize the size of a single frame, in bytes.
* @param framesCount the number of frames in a buffer. * @param framesCount the number of frames in a buffer.
* @param threadPriority priority of the thread that performs reads. * @param threadPriority priority of the driver thread.
* @return retval OK if both message queues were created successfully. * @return retval OK if both message queues were created successfully.
* INVALID_STATE if the method was already called. * INVALID_STATE if the method was already called.
* INVALID_ARGUMENTS if there was a problem setting up * INVALID_ARGUMENTS if there was a problem setting up
* the queues. * the queues.
* @return commandMQ a message queue used for passing commands.
* @return dataMQ a message queue used for passing audio data in the format * @return dataMQ a message queue used for passing audio data in the format
* specified at the stream opening. * specified at the stream opening.
* @return statusMQ a message queue used for passing status from the driver * @return statusMQ a message queue used for passing status from the driver
@ -79,7 +111,9 @@ interface IStreamIn extends IStream {
ThreadPriority threadPriority) ThreadPriority threadPriority)
generates ( generates (
Result retval, Result retval,
fmq_sync<uint8_t> dataMQ, fmq_sync<ReadStatus> statusMQ); fmq_sync<ReadParameters> commandMQ,
fmq_sync<uint8_t> dataMQ,
fmq_sync<ReadStatus> statusMQ);
/* /*
* Return the amount of input frames lost in the audio driver since the last * Return the amount of input frames lost in the audio driver since the last

View file

@ -43,45 +43,58 @@ interface IStreamOut extends IStream {
*/ */
setVolume(float left, float right) generates (Result retval); setVolume(float left, float right) generates (Result retval);
/*
* Commands that can be executed on the driver writer thread.
*/
enum WriteCommand : int32_t {
WRITE,
GET_PRESENTATION_POSITION,
GET_LATENCY
};
/* /*
* Data structure passed back to the client via status message queue * Data structure passed back to the client via status message queue
* of 'write' operation. * of 'write' operation.
* *
* Possible values of 'writeRetval' field: * Possible values of 'retval' field:
* - OK, write operation was successful; * - OK, write operation was successful;
* - INVALID_ARGUMENTS, stream was not configured properly; * - INVALID_ARGUMENTS, stream was not configured properly;
* - INVALID_STATE, stream is in a state that doesn't allow writes. * - INVALID_STATE, stream is in a state that doesn't allow writes;
* * - INVALID_OPERATION, retrieving presentation position isn't supported.
* Possible values of 'presentationPositionRetval' field (must only
* be considered if 'writeRetval' field is set to 'OK'):
* - OK, presentation position retrieved successfully;
* - INVALID_ARGUMENTS, indicates that the position can't be retrieved;
* - INVALID_OPERATION, retrieving presentation position isn't supported;
*/ */
struct WriteStatus { struct WriteStatus {
Result writeRetval; Result retval;
uint64_t written; WriteCommand replyTo; // discriminator
Result presentationPositionRetval; union Reply {
uint64_t frames; // presentation position uint64_t written; // WRITE command, amount of bytes written, >= 0.
TimeSpec timeStamp; // presentation position struct PresentationPosition { // same as generated by
uint64_t frames; // getPresentationPosition.
TimeSpec timeStamp;
} presentationPosition;
uint32_t latencyMs; // Same as generated by getLatency.
} reply;
}; };
/* /*
* Set up required transports for passing audio buffers to the driver. * Set up required transports for passing audio buffers to the driver.
* *
* The transport consists of two message queues: one is used for passing * The transport consists of three message queues:
* audio data from the client to the driver, another is used for reporting * -- command queue is used to instruct the writer thread what operation
* write operation status (amount of bytes actually written or error code), * to perform;
* and the presentation position immediately after the write, see * -- data queue is used for passing audio data from the client
* WriteStatus structure definition. * to the driver;
* -- status queue is used for reporting operation status
* (e.g. amount of bytes actually written or error code).
* The driver operates on a dedicated thread.
* *
* @param frameSize the size of a single frame, in bytes. * @param frameSize the size of a single frame, in bytes.
* @param framesCount the number of frames in a buffer. * @param framesCount the number of frames in a buffer.
* @param threadPriority priority of the thread that performs writes. * @param threadPriority priority of the driver thread.
* @return retval OK if both message queues were created successfully. * @return retval OK if both message queues were created successfully.
* INVALID_STATE if the method was already called. * INVALID_STATE if the method was already called.
* INVALID_ARGUMENTS if there was a problem setting up * INVALID_ARGUMENTS if there was a problem setting up
* the queues. * the queues.
* @return commandMQ a message queue used for passing commands.
* @return dataMQ a message queue used for passing audio data in the format * @return dataMQ a message queue used for passing audio data in the format
* specified at the stream opening. * specified at the stream opening.
* @return statusMQ a message queue used for passing status from the driver * @return statusMQ a message queue used for passing status from the driver
@ -92,7 +105,9 @@ interface IStreamOut extends IStream {
ThreadPriority threadPriority) ThreadPriority threadPriority)
generates ( generates (
Result retval, Result retval,
fmq_sync<uint8_t> dataMQ, fmq_sync<WriteStatus> statusMQ); fmq_sync<WriteCommand> commandMQ,
fmq_sync<uint8_t> dataMQ,
fmq_sync<WriteStatus> statusMQ);
/* /*
* Return the number of audio frames written by the audio DSP to DAC since * Return the number of audio frames written by the audio DSP to DAC since

View file

@ -38,6 +38,7 @@ class ReadThread : public Thread {
// ReadThread's lifespan never exceeds StreamIn's lifespan. // ReadThread's lifespan never exceeds StreamIn's lifespan.
ReadThread(std::atomic<bool>* stop, ReadThread(std::atomic<bool>* stop,
audio_stream_in_t* stream, audio_stream_in_t* stream,
StreamIn::CommandMQ* commandMQ,
StreamIn::DataMQ* dataMQ, StreamIn::DataMQ* dataMQ,
StreamIn::StatusMQ* statusMQ, StreamIn::StatusMQ* statusMQ,
EventFlag* efGroup, EventFlag* efGroup,
@ -45,6 +46,7 @@ class ReadThread : public Thread {
: Thread(false /*canCallJava*/), : Thread(false /*canCallJava*/),
mStop(stop), mStop(stop),
mStream(stream), mStream(stream),
mCommandMQ(commandMQ),
mDataMQ(dataMQ), mDataMQ(dataMQ),
mStatusMQ(statusMQ), mStatusMQ(statusMQ),
mEfGroup(efGroup), mEfGroup(efGroup),
@ -58,13 +60,19 @@ class ReadThread : public Thread {
private: private:
std::atomic<bool>* mStop; std::atomic<bool>* mStop;
audio_stream_in_t* mStream; audio_stream_in_t* mStream;
StreamIn::CommandMQ* mCommandMQ;
StreamIn::DataMQ* mDataMQ; StreamIn::DataMQ* mDataMQ;
StreamIn::StatusMQ* mStatusMQ; StreamIn::StatusMQ* mStatusMQ;
EventFlag* mEfGroup; EventFlag* mEfGroup;
ThreadPriority mThreadPriority; ThreadPriority mThreadPriority;
std::unique_ptr<uint8_t[]> mBuffer; std::unique_ptr<uint8_t[]> mBuffer;
IStreamIn::ReadParameters mParameters;
IStreamIn::ReadStatus mStatus;
bool threadLoop() override; bool threadLoop() override;
void doGetCapturePosition();
void doRead();
}; };
status_t ReadThread::readyToRun() { status_t ReadThread::readyToRun() {
@ -77,6 +85,32 @@ status_t ReadThread::readyToRun() {
return OK; return OK;
} }
void ReadThread::doRead() {
size_t availableToWrite = mDataMQ->availableToWrite();
size_t requestedToRead = mParameters.params.read;
if (requestedToRead > availableToWrite) {
ALOGW("truncating read data from %d to %d due to insufficient data queue space",
(int32_t)requestedToRead, (int32_t)availableToWrite);
requestedToRead = availableToWrite;
}
ssize_t readResult = mStream->read(mStream, &mBuffer[0], requestedToRead);
mStatus.retval = Result::OK;
uint64_t read = 0;
if (readResult >= 0) {
mStatus.reply.read = readResult;
if (!mDataMQ->write(&mBuffer[0], readResult)) {
ALOGW("data message queue write failed");
}
} else {
mStatus.retval = Stream::analyzeStatus("read", readResult);
}
}
void ReadThread::doGetCapturePosition() {
mStatus.retval = StreamIn::getCapturePositionImpl(
mStream, &mStatus.reply.capturePosition.frames, &mStatus.reply.capturePosition.time);
}
bool ReadThread::threadLoop() { bool ReadThread::threadLoop() {
// This implementation doesn't return control back to the Thread until it decides to stop, // This implementation doesn't return control back to the Thread until it decides to stop,
// as the Thread uses mutexes, and this can lead to priority inversion. // as the Thread uses mutexes, and this can lead to priority inversion.
@ -87,21 +121,23 @@ bool ReadThread::threadLoop() {
if (!(efState & static_cast<uint32_t>(MessageQueueFlagBits::NOT_FULL))) { if (!(efState & static_cast<uint32_t>(MessageQueueFlagBits::NOT_FULL))) {
continue; // Nothing to do. continue; // Nothing to do.
} }
if (!mCommandMQ->read(&mParameters)) {
const size_t availToWrite = mDataMQ->availableToWrite(); continue; // Nothing to do.
ssize_t readResult = mStream->read(mStream, &mBuffer[0], availToWrite);
Result retval = Result::OK;
uint64_t read = 0;
if (readResult >= 0) {
read = readResult;
if (!mDataMQ->write(&mBuffer[0], readResult)) {
ALOGW("data message queue write failed");
}
} else {
retval = Stream::analyzeStatus("read", readResult);
} }
IStreamIn::ReadStatus status = { retval, read }; mStatus.replyTo = mParameters.command;
if (!mStatusMQ->write(&status)) { switch (mParameters.command) {
case IStreamIn::ReadCommand::READ:
doRead();
break;
case IStreamIn::ReadCommand::GET_CAPTURE_POSITION:
doGetCapturePosition();
break;
default:
ALOGE("Unknown read thread command code %d", mParameters.command);
mStatus.retval = Result::NOT_SUPPORTED;
break;
}
if (!mStatusMQ->write(&mStatus)) {
ALOGW("status message queue write failed"); ALOGW("status message queue write failed");
} }
mEfGroup->wake(static_cast<uint32_t>(MessageQueueFlagBits::NOT_EMPTY)); mEfGroup->wake(static_cast<uint32_t>(MessageQueueFlagBits::NOT_EMPTY));
@ -275,17 +311,19 @@ Return<void> StreamIn::prepareForReading(
if (mDataMQ) { if (mDataMQ) {
ALOGE("the client attempts to call prepareForReading twice"); ALOGE("the client attempts to call prepareForReading twice");
_hidl_cb(Result::INVALID_STATE, _hidl_cb(Result::INVALID_STATE,
DataMQ::Descriptor(), StatusMQ::Descriptor()); CommandMQ::Descriptor(), DataMQ::Descriptor(), StatusMQ::Descriptor());
return Void(); return Void();
} }
std::unique_ptr<CommandMQ> tempCommandMQ(new CommandMQ(1));
std::unique_ptr<DataMQ> tempDataMQ( std::unique_ptr<DataMQ> tempDataMQ(
new DataMQ(frameSize * framesCount, true /* EventFlag */)); new DataMQ(frameSize * framesCount, true /* EventFlag */));
std::unique_ptr<StatusMQ> tempStatusMQ(new StatusMQ(1)); std::unique_ptr<StatusMQ> tempStatusMQ(new StatusMQ(1));
if (!tempDataMQ->isValid() || !tempStatusMQ->isValid()) { if (!tempCommandMQ->isValid() || !tempDataMQ->isValid() || !tempStatusMQ->isValid()) {
ALOGE_IF(!tempCommandMQ->isValid(), "command MQ is invalid");
ALOGE_IF(!tempDataMQ->isValid(), "data MQ is invalid"); ALOGE_IF(!tempDataMQ->isValid(), "data MQ is invalid");
ALOGE_IF(!tempStatusMQ->isValid(), "status MQ is invalid"); ALOGE_IF(!tempStatusMQ->isValid(), "status MQ is invalid");
_hidl_cb(Result::INVALID_ARGUMENTS, _hidl_cb(Result::INVALID_ARGUMENTS,
DataMQ::Descriptor(), StatusMQ::Descriptor()); CommandMQ::Descriptor(), DataMQ::Descriptor(), StatusMQ::Descriptor());
return Void(); return Void();
} }
// TODO: Remove event flag management once blocking MQ is implemented. b/33815422 // TODO: Remove event flag management once blocking MQ is implemented. b/33815422
@ -293,7 +331,7 @@ Return<void> StreamIn::prepareForReading(
if (status != OK || !mEfGroup) { if (status != OK || !mEfGroup) {
ALOGE("failed creating event flag for data MQ: %s", strerror(-status)); ALOGE("failed creating event flag for data MQ: %s", strerror(-status));
_hidl_cb(Result::INVALID_ARGUMENTS, _hidl_cb(Result::INVALID_ARGUMENTS,
DataMQ::Descriptor(), StatusMQ::Descriptor()); CommandMQ::Descriptor(), DataMQ::Descriptor(), StatusMQ::Descriptor());
return Void(); return Void();
} }
@ -301,6 +339,7 @@ Return<void> StreamIn::prepareForReading(
mReadThread = new ReadThread( mReadThread = new ReadThread(
&mStopReadThread, &mStopReadThread,
mStream, mStream,
tempCommandMQ.get(),
tempDataMQ.get(), tempDataMQ.get(),
tempStatusMQ.get(), tempStatusMQ.get(),
mEfGroup, mEfGroup,
@ -309,13 +348,14 @@ Return<void> StreamIn::prepareForReading(
if (status != OK) { if (status != OK) {
ALOGW("failed to start reader thread: %s", strerror(-status)); ALOGW("failed to start reader thread: %s", strerror(-status));
_hidl_cb(Result::INVALID_ARGUMENTS, _hidl_cb(Result::INVALID_ARGUMENTS,
DataMQ::Descriptor(), StatusMQ::Descriptor()); CommandMQ::Descriptor(), DataMQ::Descriptor(), StatusMQ::Descriptor());
return Void(); return Void();
} }
mCommandMQ = std::move(tempCommandMQ);
mDataMQ = std::move(tempDataMQ); mDataMQ = std::move(tempDataMQ);
mStatusMQ = std::move(tempStatusMQ); mStatusMQ = std::move(tempStatusMQ);
_hidl_cb(Result::OK, *mDataMQ->getDesc(), *mStatusMQ->getDesc()); _hidl_cb(Result::OK, *mCommandMQ->getDesc(), *mDataMQ->getDesc(), *mStatusMQ->getDesc());
return Void(); return Void();
} }
@ -323,22 +363,28 @@ Return<uint32_t> StreamIn::getInputFramesLost() {
return mStream->get_input_frames_lost(mStream); return mStream->get_input_frames_lost(mStream);
} }
Return<void> StreamIn::getCapturePosition(getCapturePosition_cb _hidl_cb) { // static
Result StreamIn::getCapturePositionImpl(
audio_stream_in_t *stream, uint64_t *frames, uint64_t *time) {
Result retval(Result::NOT_SUPPORTED); Result retval(Result::NOT_SUPPORTED);
uint64_t frames = 0, time = 0; if (stream->get_capture_position != NULL) return retval;
if (mStream->get_capture_position != NULL) { int64_t halFrames, halTime;
int64_t halFrames, halTime; retval = Stream::analyzeStatus(
retval = Stream::analyzeStatus( "get_capture_position",
"get_capture_position", stream->get_capture_position(stream, &halFrames, &halTime),
mStream->get_capture_position(mStream, &halFrames, &halTime), // HAL may have a stub function, always returning ENOSYS, don't
// HAL may have a stub function, always returning ENOSYS, don't // spam the log in this case.
// spam the log in this case. ENOSYS);
ENOSYS); if (retval == Result::OK) {
if (retval == Result::OK) { *frames = halFrames;
frames = halFrames; *time = halTime;
time = halTime;
}
} }
return retval;
};
Return<void> StreamIn::getCapturePosition(getCapturePosition_cb _hidl_cb) {
uint64_t frames = 0, time = 0;
Result retval = getCapturePositionImpl(mStream, &frames, &time);
_hidl_cb(retval, frames, time); _hidl_cb(retval, frames, time);
return Void(); return Void();
} }

View file

@ -52,6 +52,7 @@ using ::android::hardware::hidl_string;
using ::android::sp; using ::android::sp;
struct StreamIn : public IStreamIn { struct StreamIn : public IStreamIn {
typedef MessageQueue<ReadParameters, kSynchronizedReadWrite> CommandMQ;
typedef MessageQueue<uint8_t, kSynchronizedReadWrite> DataMQ; typedef MessageQueue<uint8_t, kSynchronizedReadWrite> DataMQ;
typedef MessageQueue<ReadStatus, kSynchronizedReadWrite> StatusMQ; typedef MessageQueue<ReadStatus, kSynchronizedReadWrite> StatusMQ;
@ -97,12 +98,16 @@ struct StreamIn : public IStreamIn {
Return<void> createMmapBuffer(int32_t minSizeFrames, createMmapBuffer_cb _hidl_cb) override; Return<void> createMmapBuffer(int32_t minSizeFrames, createMmapBuffer_cb _hidl_cb) override;
Return<void> getMmapPosition(getMmapPosition_cb _hidl_cb) override; Return<void> getMmapPosition(getMmapPosition_cb _hidl_cb) override;
static Result getCapturePositionImpl(
audio_stream_in_t *stream, uint64_t *frames, uint64_t *time);
private: private:
bool mIsClosed; bool mIsClosed;
audio_hw_device_t *mDevice; audio_hw_device_t *mDevice;
audio_stream_in_t *mStream; audio_stream_in_t *mStream;
sp<Stream> mStreamCommon; sp<Stream> mStreamCommon;
sp<StreamMmap<audio_stream_in_t>> mStreamMmap; sp<StreamMmap<audio_stream_in_t>> mStreamMmap;
std::unique_ptr<CommandMQ> mCommandMQ;
std::unique_ptr<DataMQ> mDataMQ; std::unique_ptr<DataMQ> mDataMQ;
std::unique_ptr<StatusMQ> mStatusMQ; std::unique_ptr<StatusMQ> mStatusMQ;
EventFlag* mEfGroup; EventFlag* mEfGroup;

View file

@ -36,6 +36,7 @@ class WriteThread : public Thread {
// WriteThread's lifespan never exceeds StreamOut's lifespan. // WriteThread's lifespan never exceeds StreamOut's lifespan.
WriteThread(std::atomic<bool>* stop, WriteThread(std::atomic<bool>* stop,
audio_stream_out_t* stream, audio_stream_out_t* stream,
StreamOut::CommandMQ* commandMQ,
StreamOut::DataMQ* dataMQ, StreamOut::DataMQ* dataMQ,
StreamOut::StatusMQ* statusMQ, StreamOut::StatusMQ* statusMQ,
EventFlag* efGroup, EventFlag* efGroup,
@ -43,6 +44,7 @@ class WriteThread : public Thread {
: Thread(false /*canCallJava*/), : Thread(false /*canCallJava*/),
mStop(stop), mStop(stop),
mStream(stream), mStream(stream),
mCommandMQ(commandMQ),
mDataMQ(dataMQ), mDataMQ(dataMQ),
mStatusMQ(statusMQ), mStatusMQ(statusMQ),
mEfGroup(efGroup), mEfGroup(efGroup),
@ -56,13 +58,19 @@ class WriteThread : public Thread {
private: private:
std::atomic<bool>* mStop; std::atomic<bool>* mStop;
audio_stream_out_t* mStream; audio_stream_out_t* mStream;
StreamOut::CommandMQ* mCommandMQ;
StreamOut::DataMQ* mDataMQ; StreamOut::DataMQ* mDataMQ;
StreamOut::StatusMQ* mStatusMQ; StreamOut::StatusMQ* mStatusMQ;
EventFlag* mEfGroup; EventFlag* mEfGroup;
ThreadPriority mThreadPriority; ThreadPriority mThreadPriority;
std::unique_ptr<uint8_t[]> mBuffer; std::unique_ptr<uint8_t[]> mBuffer;
IStreamOut::WriteStatus mStatus;
bool threadLoop() override; bool threadLoop() override;
void doGetLatency();
void doGetPresentationPosition();
void doWrite();
}; };
status_t WriteThread::readyToRun() { status_t WriteThread::readyToRun() {
@ -75,6 +83,32 @@ status_t WriteThread::readyToRun() {
return OK; return OK;
} }
void WriteThread::doWrite() {
const size_t availToRead = mDataMQ->availableToRead();
mStatus.retval = Result::OK;
mStatus.reply.written = 0;
if (mDataMQ->read(&mBuffer[0], availToRead)) {
ssize_t writeResult = mStream->write(mStream, &mBuffer[0], availToRead);
if (writeResult >= 0) {
mStatus.reply.written = writeResult;
} else {
mStatus.retval = Stream::analyzeStatus("write", writeResult);
}
}
}
void WriteThread::doGetPresentationPosition() {
mStatus.retval = StreamOut::getPresentationPositionImpl(
mStream,
&mStatus.reply.presentationPosition.frames,
&mStatus.reply.presentationPosition.timeStamp);
}
void WriteThread::doGetLatency() {
mStatus.retval = Result::OK;
mStatus.reply.latencyMs = mStream->get_latency(mStream);
}
bool WriteThread::threadLoop() { bool WriteThread::threadLoop() {
// This implementation doesn't return control back to the Thread until it decides to stop, // This implementation doesn't return control back to the Thread until it decides to stop,
// as the Thread uses mutexes, and this can lead to priority inversion. // as the Thread uses mutexes, and this can lead to priority inversion.
@ -86,24 +120,26 @@ bool WriteThread::threadLoop() {
if (!(efState & static_cast<uint32_t>(MessageQueueFlagBits::NOT_EMPTY))) { if (!(efState & static_cast<uint32_t>(MessageQueueFlagBits::NOT_EMPTY))) {
continue; // Nothing to do. continue; // Nothing to do.
} }
if (!mCommandMQ->read(&mStatus.replyTo)) {
const size_t availToRead = mDataMQ->availableToRead(); continue; // Nothing to do.
IStreamOut::WriteStatus status;
status.writeRetval = Result::OK;
status.written = 0;
if (mDataMQ->read(&mBuffer[0], availToRead)) {
ssize_t writeResult = mStream->write(mStream, &mBuffer[0], availToRead);
if (writeResult >= 0) {
status.written = writeResult;
} else {
status.writeRetval = Stream::analyzeStatus("write", writeResult);
}
} }
status.presentationPositionRetval = status.writeRetval == Result::OK ? switch (mStatus.replyTo) {
StreamOut::getPresentationPositionImpl(mStream, &status.frames, &status.timeStamp) : case IStreamOut::WriteCommand::WRITE:
Result::OK; doWrite();
if (!mStatusMQ->write(&status)) { break;
ALOGW("status message queue write failed"); case IStreamOut::WriteCommand::GET_PRESENTATION_POSITION:
doGetPresentationPosition();
break;
case IStreamOut::WriteCommand::GET_LATENCY:
doGetLatency();
break;
default:
ALOGE("Unknown write thread command code %d", mStatus.replyTo);
mStatus.retval = Result::NOT_SUPPORTED;
break;
}
if (!mStatusMQ->write(&mStatus)) {
ALOGE("status message queue write failed");
} }
mEfGroup->wake(static_cast<uint32_t>(MessageQueueFlagBits::NOT_FULL)); mEfGroup->wake(static_cast<uint32_t>(MessageQueueFlagBits::NOT_FULL));
} }
@ -259,17 +295,19 @@ Return<void> StreamOut::prepareForWriting(
if (mDataMQ) { if (mDataMQ) {
ALOGE("the client attempts to call prepareForWriting twice"); ALOGE("the client attempts to call prepareForWriting twice");
_hidl_cb(Result::INVALID_STATE, _hidl_cb(Result::INVALID_STATE,
DataMQ::Descriptor(), StatusMQ::Descriptor()); CommandMQ::Descriptor(), DataMQ::Descriptor(), StatusMQ::Descriptor());
return Void(); return Void();
} }
std::unique_ptr<CommandMQ> tempCommandMQ(new CommandMQ(1));
std::unique_ptr<DataMQ> tempDataMQ( std::unique_ptr<DataMQ> tempDataMQ(
new DataMQ(frameSize * framesCount, true /* EventFlag */)); new DataMQ(frameSize * framesCount, true /* EventFlag */));
std::unique_ptr<StatusMQ> tempStatusMQ(new StatusMQ(1)); std::unique_ptr<StatusMQ> tempStatusMQ(new StatusMQ(1));
if (!tempDataMQ->isValid() || !tempStatusMQ->isValid()) { if (!tempCommandMQ->isValid() || !tempDataMQ->isValid() || !tempStatusMQ->isValid()) {
ALOGE_IF(!tempCommandMQ->isValid(), "command MQ is invalid");
ALOGE_IF(!tempDataMQ->isValid(), "data MQ is invalid"); ALOGE_IF(!tempDataMQ->isValid(), "data MQ is invalid");
ALOGE_IF(!tempStatusMQ->isValid(), "status MQ is invalid"); ALOGE_IF(!tempStatusMQ->isValid(), "status MQ is invalid");
_hidl_cb(Result::INVALID_ARGUMENTS, _hidl_cb(Result::INVALID_ARGUMENTS,
DataMQ::Descriptor(), StatusMQ::Descriptor()); CommandMQ::Descriptor(), DataMQ::Descriptor(), StatusMQ::Descriptor());
return Void(); return Void();
} }
// TODO: Remove event flag management once blocking MQ is implemented. b/33815422 // TODO: Remove event flag management once blocking MQ is implemented. b/33815422
@ -277,7 +315,7 @@ Return<void> StreamOut::prepareForWriting(
if (status != OK || !mEfGroup) { if (status != OK || !mEfGroup) {
ALOGE("failed creating event flag for data MQ: %s", strerror(-status)); ALOGE("failed creating event flag for data MQ: %s", strerror(-status));
_hidl_cb(Result::INVALID_ARGUMENTS, _hidl_cb(Result::INVALID_ARGUMENTS,
DataMQ::Descriptor(), StatusMQ::Descriptor()); CommandMQ::Descriptor(), DataMQ::Descriptor(), StatusMQ::Descriptor());
return Void(); return Void();
} }
@ -285,6 +323,7 @@ Return<void> StreamOut::prepareForWriting(
mWriteThread = new WriteThread( mWriteThread = new WriteThread(
&mStopWriteThread, &mStopWriteThread,
mStream, mStream,
tempCommandMQ.get(),
tempDataMQ.get(), tempDataMQ.get(),
tempStatusMQ.get(), tempStatusMQ.get(),
mEfGroup, mEfGroup,
@ -293,13 +332,14 @@ Return<void> StreamOut::prepareForWriting(
if (status != OK) { if (status != OK) {
ALOGW("failed to start writer thread: %s", strerror(-status)); ALOGW("failed to start writer thread: %s", strerror(-status));
_hidl_cb(Result::INVALID_ARGUMENTS, _hidl_cb(Result::INVALID_ARGUMENTS,
DataMQ::Descriptor(), StatusMQ::Descriptor()); CommandMQ::Descriptor(), DataMQ::Descriptor(), StatusMQ::Descriptor());
return Void(); return Void();
} }
mCommandMQ = std::move(tempCommandMQ);
mDataMQ = std::move(tempDataMQ); mDataMQ = std::move(tempDataMQ);
mStatusMQ = std::move(tempStatusMQ); mStatusMQ = std::move(tempStatusMQ);
_hidl_cb(Result::OK, *mDataMQ->getDesc(), *mStatusMQ->getDesc()); _hidl_cb(Result::OK, *mCommandMQ->getDesc(), *mDataMQ->getDesc(), *mStatusMQ->getDesc());
return Void(); return Void();
} }

View file

@ -54,6 +54,7 @@ using ::android::hardware::hidl_string;
using ::android::sp; using ::android::sp;
struct StreamOut : public IStreamOut { struct StreamOut : public IStreamOut {
typedef MessageQueue<WriteCommand, kSynchronizedReadWrite> CommandMQ;
typedef MessageQueue<uint8_t, kSynchronizedReadWrite> DataMQ; typedef MessageQueue<uint8_t, kSynchronizedReadWrite> DataMQ;
typedef MessageQueue<WriteStatus, kSynchronizedReadWrite> StatusMQ; typedef MessageQueue<WriteStatus, kSynchronizedReadWrite> StatusMQ;
@ -118,6 +119,7 @@ struct StreamOut : public IStreamOut {
sp<Stream> mStreamCommon; sp<Stream> mStreamCommon;
sp<StreamMmap<audio_stream_out_t>> mStreamMmap; sp<StreamMmap<audio_stream_out_t>> mStreamMmap;
sp<IStreamOutCallback> mCallback; sp<IStreamOutCallback> mCallback;
std::unique_ptr<CommandMQ> mCommandMQ;
std::unique_ptr<DataMQ> mDataMQ; std::unique_ptr<DataMQ> mDataMQ;
std::unique_ptr<StatusMQ> mStatusMQ; std::unique_ptr<StatusMQ> mStatusMQ;
EventFlag* mEfGroup; EventFlag* mEfGroup;