Merge "audio: Fix remote submix module I/O timing and atomicity" into main

This commit is contained in:
Mikhail Naganov 2023-12-04 04:48:42 +00:00 committed by Gerrit Code Review
commit 5260337cbb
5 changed files with 85 additions and 121 deletions

View file

@ -320,9 +320,9 @@ std::unique_ptr<Configuration> getPrimaryConfiguration() {
// - no profiles specified
//
// Mix ports:
// * "r_submix output", maximum 20 opened streams, maximum 10 active streams
// * "r_submix output", maximum 10 opened streams, maximum 10 active streams
// - profile PCM 16-bit; MONO, STEREO; 8000, 11025, 16000, 32000, 44100, 48000
// * "r_submix input", maximum 20 opened streams, maximum 10 active streams
// * "r_submix input", maximum 10 opened streams, maximum 10 active streams
// - profile PCM 16-bit; MONO, STEREO; 8000, 11025, 16000, 32000, 44100, 48000
//
// Routes:
@ -355,12 +355,12 @@ std::unique_ptr<Configuration> getRSubmixConfiguration() {
// Mix ports
AudioPort rsubmixOutMix =
createPort(c.nextPortId++, "r_submix output", 0, false, createPortMixExt(20, 10));
createPort(c.nextPortId++, "r_submix output", 0, false, createPortMixExt(10, 10));
rsubmixOutMix.profiles = remoteSubmixPcmAudioProfiles;
c.ports.push_back(rsubmixOutMix);
AudioPort rsubmixInMix =
createPort(c.nextPortId++, "r_submix input", 0, true, createPortMixExt(20, 10));
createPort(c.nextPortId++, "r_submix input", 0, true, createPortMixExt(10, 10));
rsubmixInMix.profiles = remoteSubmixPcmAudioProfiles;
c.ports.push_back(rsubmixInMix);

View file

@ -71,6 +71,10 @@ class StreamRemoteSubmix : public StreamCommonImpl {
static constexpr int kMaxReadFailureAttempts = 3;
// 5ms between two read attempts when pipe is empty
static constexpr int kReadAttemptSleepUs = 5000;
long mStartTimeNs = 0;
long mFramesSinceStart = 0;
int mReadErrorCount = 0;
};
class StreamInRemoteSubmix final : public StreamIn, public StreamSwitcher {

View file

@ -16,6 +16,9 @@
#define LOG_TAG "AHAL_StreamRemoteSubmix"
#include <android-base/logging.h>
#include <audio_utils/clock.h>
#include <error/Result.h>
#include <error/expected_utils.h>
#include "core-impl/StreamRemoteSubmix.h"
@ -50,37 +53,33 @@ std::map<AudioDeviceAddress, std::shared_ptr<SubmixRoute>> StreamRemoteSubmix::s
if (routeItr != sSubmixRoutes.end()) {
mCurrentRoute = routeItr->second;
}
}
// If route is not available for this port, add it.
if (mCurrentRoute == nullptr) {
// Initialize the pipe.
mCurrentRoute = std::make_shared<SubmixRoute>();
if (::android::OK != mCurrentRoute->createPipe(mStreamConfig)) {
LOG(ERROR) << __func__ << ": create pipe failed";
return ::android::NO_INIT;
}
{
std::lock_guard guard(sSubmixRoutesLock);
sSubmixRoutes.emplace(mDeviceAddress, mCurrentRoute);
}
} else {
if (!mCurrentRoute->isStreamConfigValid(mIsInput, mStreamConfig)) {
LOG(ERROR) << __func__ << ": invalid stream config";
return ::android::NO_INIT;
}
sp<MonoPipe> sink = mCurrentRoute->getSink();
if (sink == nullptr) {
LOG(ERROR) << __func__ << ": nullptr sink when opening stream";
return ::android::NO_INIT;
}
// If the sink has been shutdown or pipe recreation is forced, delete the pipe and
// recreate it.
if (sink->isShutdown()) {
LOG(DEBUG) << __func__ << ": Non-nullptr shut down sink when opening stream";
if (::android::OK != mCurrentRoute->resetPipe()) {
LOG(ERROR) << __func__ << ": reset pipe failed";
// If route is not available for this port, add it.
if (mCurrentRoute == nullptr) {
// Initialize the pipe.
mCurrentRoute = std::make_shared<SubmixRoute>();
if (::android::OK != mCurrentRoute->createPipe(mStreamConfig)) {
LOG(ERROR) << __func__ << ": create pipe failed";
return ::android::NO_INIT;
}
sSubmixRoutes.emplace(mDeviceAddress, mCurrentRoute);
}
}
if (!mCurrentRoute->isStreamConfigValid(mIsInput, mStreamConfig)) {
LOG(ERROR) << __func__ << ": invalid stream config";
return ::android::NO_INIT;
}
sp<MonoPipe> sink = mCurrentRoute->getSink();
if (sink == nullptr) {
LOG(ERROR) << __func__ << ": nullptr sink when opening stream";
return ::android::NO_INIT;
}
// If the sink has been shutdown or pipe recreation is forced, delete the pipe and
// recreate it.
if (sink->isShutdown()) {
LOG(DEBUG) << __func__ << ": Non-nullptr shut down sink when opening stream";
if (::android::OK != mCurrentRoute->resetPipe()) {
LOG(ERROR) << __func__ << ": reset pipe failed";
return ::android::NO_INIT;
}
}
@ -110,6 +109,8 @@ std::map<AudioDeviceAddress, std::shared_ptr<SubmixRoute>> StreamRemoteSubmix::s
::android::status_t StreamRemoteSubmix::start() {
mCurrentRoute->exitStandby(mIsInput);
mStartTimeNs = ::android::uptimeNanos();
mFramesSinceStart = 0;
return ::android::OK;
}
@ -161,8 +162,21 @@ void StreamRemoteSubmix::shutdown() {
*latencyMs = getDelayInUsForFrameCount(getStreamPipeSizeInFrames()) / 1000;
LOG(VERBOSE) << __func__ << ": Latency " << *latencyMs << "ms";
mCurrentRoute->exitStandby(mIsInput);
return (mIsInput ? inRead(buffer, frameCount, actualFrameCount)
: outWrite(buffer, frameCount, actualFrameCount));
RETURN_STATUS_IF_ERROR(mIsInput ? inRead(buffer, frameCount, actualFrameCount)
: outWrite(buffer, frameCount, actualFrameCount));
const long bufferDurationUs =
(*actualFrameCount) * MICROS_PER_SECOND / mContext.getSampleRate();
const long totalDurationUs = (::android::uptimeNanos() - mStartTimeNs) / NANOS_PER_MICROSECOND;
mFramesSinceStart += *actualFrameCount;
const long totalOffsetUs =
mFramesSinceStart * MICROS_PER_SECOND / mContext.getSampleRate() - totalDurationUs;
LOG(VERBOSE) << __func__ << ": totalOffsetUs " << totalOffsetUs;
if (totalOffsetUs > 0) {
const long sleepTimeUs = std::min(totalOffsetUs, bufferDurationUs);
LOG(VERBOSE) << __func__ << ": sleeping for " << sleepTimeUs << " us";
usleep(sleepTimeUs);
}
return ::android::OK;
}
::android::status_t StreamRemoteSubmix::refinePosition(StreamDescriptor::Position* position) {
@ -200,12 +214,7 @@ size_t StreamRemoteSubmix::getStreamPipeSizeInFrames() {
if (sink != nullptr) {
if (sink->isShutdown()) {
sink.clear();
const auto delayUs = getDelayInUsForFrameCount(frameCount);
LOG(DEBUG) << __func__ << ": pipe shutdown, ignoring the write, sleeping for "
<< delayUs << " us";
// the pipe has already been shutdown, this buffer will be lost but we must
// simulate timing so we don't drain the output faster than realtime
usleep(delayUs);
LOG(DEBUG) << __func__ << ": pipe shutdown, ignoring the write";
*actualFrameCount = frameCount;
return ::android::OK;
}
@ -214,6 +223,9 @@ size_t StreamRemoteSubmix::getStreamPipeSizeInFrames() {
return ::android::UNKNOWN_ERROR;
}
LOG(VERBOSE) << __func__ << ": " << mDeviceAddress.toString() << ", " << frameCount
<< " frames";
const bool shouldBlockWrite = mCurrentRoute->shouldBlockWrite();
size_t availableToWrite = sink->availableToWrite();
// NOTE: sink has been checked above and sink and source life cycles are synchronized
@ -236,6 +248,8 @@ size_t StreamRemoteSubmix::getStreamPipeSizeInFrames() {
availableToWrite = sink->availableToWrite();
if (!shouldBlockWrite && frameCount > availableToWrite) {
LOG(WARNING) << __func__ << ": writing " << availableToWrite << " vs. requested "
<< frameCount;
// Truncate the request to avoid blocking.
frameCount = availableToWrite;
}
@ -258,92 +272,59 @@ size_t StreamRemoteSubmix::getStreamPipeSizeInFrames() {
*actualFrameCount = 0;
return ::android::UNKNOWN_ERROR;
}
LOG(VERBOSE) << __func__ << ": wrote " << writtenFrames << "frames";
if (writtenFrames > 0 && frameCount > (size_t)writtenFrames) {
LOG(WARNING) << __func__ << ": wrote " << writtenFrames << " vs. requested " << frameCount;
}
*actualFrameCount = writtenFrames;
return ::android::OK;
}
::android::status_t StreamRemoteSubmix::inRead(void* buffer, size_t frameCount,
size_t* actualFrameCount) {
// in any case, it is emulated that data for the entire buffer was available
memset(buffer, 0, mStreamConfig.frameSize * frameCount);
*actualFrameCount = frameCount;
// about to read from audio source
sp<MonoPipeReader> source = mCurrentRoute->getSource();
if (source == nullptr) {
int readErrorCount = mCurrentRoute->notifyReadError();
if (readErrorCount < kMaxReadErrorLogs) {
if (++mReadErrorCount < kMaxReadErrorLogs) {
LOG(ERROR) << __func__
<< ": no audio pipe yet we're trying to read! (not all errors will be "
"logged)";
} else {
LOG(ERROR) << __func__ << ": Read errors " << readErrorCount;
}
const auto delayUs = getDelayInUsForFrameCount(frameCount);
LOG(DEBUG) << __func__ << ": no source, ignoring the read, sleeping for " << delayUs
<< " us";
usleep(delayUs);
memset(buffer, 0, mStreamConfig.frameSize * frameCount);
*actualFrameCount = frameCount;
return ::android::OK;
}
LOG(VERBOSE) << __func__ << ": " << mDeviceAddress.toString() << ", " << frameCount
<< " frames";
// read the data from the pipe
int attempts = 0;
const long delayUs = kReadAttemptSleepUs;
char* buff = (char*)buffer;
size_t remainingFrames = frameCount;
int availableToRead = source->availableToRead();
while ((remainingFrames > 0) && (availableToRead > 0) && (attempts < kMaxReadFailureAttempts)) {
LOG(VERBOSE) << __func__ << ": frames available to read " << availableToRead;
size_t actuallyRead = 0;
long remainingFrames = frameCount;
const long deadlineTimeNs = ::android::uptimeNanos() +
getDelayInUsForFrameCount(frameCount) * NANOS_PER_MICROSECOND;
while (remainingFrames > 0) {
ssize_t framesRead = source->read(buff, remainingFrames);
LOG(VERBOSE) << __func__ << ": frames read " << framesRead;
if (framesRead > 0) {
remainingFrames -= framesRead;
buff += framesRead * mStreamConfig.frameSize;
availableToRead -= framesRead;
LOG(VERBOSE) << __func__ << ": (attempts = " << attempts << ") got " << framesRead
LOG(VERBOSE) << __func__ << ": got " << framesRead
<< " frames, remaining =" << remainingFrames;
} else {
attempts++;
LOG(WARNING) << __func__ << ": read returned " << framesRead
<< " , read failure attempts = " << attempts << ", sleeping for "
<< delayUs << " us";
usleep(delayUs);
actuallyRead += framesRead;
}
if (::android::uptimeNanos() >= deadlineTimeNs) break;
if (framesRead <= 0) {
LOG(VERBOSE) << __func__ << ": read returned " << framesRead
<< ", read failure, sleeping for " << kReadAttemptSleepUs << " us";
usleep(kReadAttemptSleepUs);
}
}
// done using the source
source.clear();
if (remainingFrames > 0) {
const size_t remainingBytes = remainingFrames * mStreamConfig.frameSize;
LOG(VERBOSE) << __func__ << ": clearing remaining_frames = " << remainingFrames;
memset(((char*)buffer) + (mStreamConfig.frameSize * frameCount) - remainingBytes, 0,
remainingBytes);
}
long readCounterFrames = mCurrentRoute->updateReadCounterFrames(frameCount);
*actualFrameCount = frameCount;
// compute how much we need to sleep after reading the data by comparing the wall clock with
// the projected time at which we should return.
// wall clock after reading from the pipe
auto recordDurationUs = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::steady_clock::now() - mCurrentRoute->getRecordStartTime());
// readCounterFrames contains the number of frames that have been read since the beginning of
// recording (including this call): it's converted to usec and compared to how long we've been
// recording for, which gives us how long we must wait to sync the projected recording time, and
// the observed recording time.
const long projectedVsObservedOffsetUs =
getDelayInUsForFrameCount(readCounterFrames) - recordDurationUs.count();
LOG(VERBOSE) << __func__ << ": record duration " << recordDurationUs.count()
<< " us, will wait: " << projectedVsObservedOffsetUs << " us";
if (projectedVsObservedOffsetUs > 0) {
usleep(projectedVsObservedOffsetUs);
if (actuallyRead < frameCount) {
LOG(WARNING) << __func__ << ": read " << actuallyRead << " vs. requested " << frameCount;
}
mCurrentRoute->updateReadCounterFrames(*actualFrameCount);
return ::android::OK;
}

View file

@ -81,11 +81,6 @@ bool SubmixRoute::shouldBlockWrite() {
return (mStreamInOpen || (mStreamInStandby && (mReadCounterFrames != 0)));
}
int SubmixRoute::notifyReadError() {
std::lock_guard guard(mLock);
return ++mReadErrorCount;
}
long SubmixRoute::updateReadCounterFrames(size_t frameCount) {
std::lock_guard guard(mLock);
mReadCounterFrames += frameCount;
@ -103,7 +98,6 @@ void SubmixRoute::openStream(bool isInput) {
}
mStreamInStandby = true;
mReadCounterFrames = 0;
mReadErrorCount = 0;
} else {
mStreamOutOpen = true;
}
@ -214,9 +208,6 @@ void SubmixRoute::exitStandby(bool isInput) {
if (mStreamInStandby || mStreamOutStandbyTransition) {
mStreamInStandby = false;
mStreamOutStandbyTransition = false;
// keep track of when we exit input standby (== first read == start "real recording")
// or when we start recording silence, and reset projected time
mRecordStartTime = std::chrono::steady_clock::now();
mReadCounterFrames = 0;
}
} else {

View file

@ -16,7 +16,6 @@
#pragma once
#include <chrono>
#include <mutex>
#include <android-base/thread_annotations.h>
@ -83,14 +82,6 @@ class SubmixRoute {
std::lock_guard guard(mLock);
return mReadCounterFrames;
}
int getReadErrorCount() {
std::lock_guard guard(mLock);
return mReadErrorCount;
}
std::chrono::time_point<std::chrono::steady_clock> getRecordStartTime() {
std::lock_guard guard(mLock);
return mRecordStartTime;
}
sp<MonoPipe> getSink() {
std::lock_guard guard(mLock);
return mSink;
@ -126,9 +117,6 @@ class SubmixRoute {
bool mStreamOutStandby GUARDED_BY(mLock) = true;
// how many frames have been requested to be read since standby
long mReadCounterFrames GUARDED_BY(mLock) = 0;
int mReadErrorCount GUARDED_BY(mLock) = 0;
// wall clock when recording starts
std::chrono::time_point<std::chrono::steady_clock> mRecordStartTime GUARDED_BY(mLock);
// Pipe variables: they handle the ring buffer that "pipes" audio:
// - from the submix virtual audio output == what needs to be played