diff --git a/tv/tuner/1.0/default/Demux.cpp b/tv/tuner/1.0/default/Demux.cpp index 04382b0f99..15e8aaf669 100644 --- a/tv/tuner/1.0/default/Demux.cpp +++ b/tv/tuner/1.0/default/Demux.cpp @@ -106,7 +106,7 @@ Return Demux::addFilter(DemuxFilterType type, uint32_t bufferSize, } else { filterId = ++mLastUsedFilterId; - mDemuxCallbacks.resize(filterId + 1); + mFilterCallbacks.resize(filterId + 1); mFilterMQs.resize(filterId + 1); mFilterEvents.resize(filterId + 1); mFilterEventFlags.resize(filterId + 1); @@ -114,6 +114,7 @@ Return Demux::addFilter(DemuxFilterType type, uint32_t bufferSize, mFilterThreads.resize(filterId + 1); mFilterPids.resize(filterId + 1); mFilterOutputs.resize(filterId + 1); + mFilterStatus.resize(filterId + 1); } mUsedFilterIds.insert(filterId); @@ -125,7 +126,7 @@ Return Demux::addFilter(DemuxFilterType type, uint32_t bufferSize, } // Add callback - mDemuxCallbacks[filterId] = cb; + mFilterCallbacks[filterId] = cb; // Mapping from the filter ID to the filter event DemuxFilterEvent event{ @@ -211,9 +212,16 @@ Return Demux::stopFilter(uint32_t filterId) { return Result::SUCCESS; } -Return Demux::flushFilter(uint32_t /* filterId */) { +Return Demux::flushFilter(uint32_t filterId) { ALOGV("%s", __FUNCTION__); + // temp implementation to flush the FMQ + int size = mFilterMQs[filterId]->availableToRead(); + char* buffer = new char[size]; + mOutputMQ->read((unsigned char*)&buffer[0], size); + delete[] buffer; + mFilterStatus[filterId] = DemuxFilterStatus::DATA_READY; + return Result::SUCCESS; } @@ -254,7 +262,7 @@ Return Demux::close() { mFilterThreads.clear(); mUnusedFilterIds.clear(); mUsedFilterIds.clear(); - mDemuxCallbacks.clear(); + mFilterCallbacks.clear(); mFilterMQs.clear(); mFilterEvents.clear(); mFilterEventFlags.clear(); @@ -458,32 +466,54 @@ Result Demux::startSectionFilterHandler(uint32_t filterId) { Result Demux::startPesFilterHandler(uint32_t filterId) { std::lock_guard lock(mFilterEventLock); - DemuxFilterPesEvent pesEvent; if (mFilterOutputs[filterId].empty()) { return Result::SUCCESS; } for (int i = 0; i < mFilterOutputs[filterId].size(); i += 188) { - uint8_t pusi = mFilterOutputs[filterId][i + 1] & 0x40; - uint8_t adaptFieldControl = (mFilterOutputs[filterId][i + 3] & 0x30) >> 4; - ALOGD("[Demux] pusi %d, adaptFieldControl %d", pusi, adaptFieldControl); - if (pusi && (adaptFieldControl == 0x01)) { - vector::const_iterator first = mFilterOutputs[filterId].begin() + i + 4; - vector::const_iterator last = mFilterOutputs[filterId].begin() + i + 187; - vector filterOutData(first, last); - if (!writeDataToFilterMQ(filterOutData, filterId)) { - mFilterOutputs[filterId].clear(); - return Result::INVALID_STATE; + if (mPesSizeLeft == 0) { + uint32_t prefix = (mFilterOutputs[filterId][i + 4] << 16) | + (mFilterOutputs[filterId][i + 5] << 8) | + mFilterOutputs[filterId][i + 6]; + ALOGD("[Demux] prefix %d", prefix); + if (prefix == 0x000001) { + // TODO handle mulptiple Pes filters + mPesSizeLeft = + (mFilterOutputs[filterId][i + 7] << 8) | mFilterOutputs[filterId][i + 8]; + ALOGD("[Demux] pes data length %d", mPesSizeLeft); + } else { + continue; } - pesEvent = { - // temp dump meta data - .streamId = filterOutData[3], - .dataLength = static_cast(filterOutData.size()), - }; - int size = mFilterEvents[filterId].events.size(); - mFilterEvents[filterId].events.resize(size + 1); - mFilterEvents[filterId].events[size].pes(pesEvent); } + + int endPoint = min(184, mPesSizeLeft); + // append data and check size + vector::const_iterator first = mFilterOutputs[filterId].begin() + i + 4; + vector::const_iterator last = mFilterOutputs[filterId].begin() + i + 3 + endPoint; + mPesOutput.insert(mPesOutput.end(), first, last); + // size does not match then continue + mPesSizeLeft -= endPoint; + if (mPesSizeLeft > 0) { + continue; + } + // size match then create event + if (!writeDataToFilterMQ(mPesOutput, filterId)) { + mFilterOutputs[filterId].clear(); + return Result::INVALID_STATE; + } + maySendFilterStatusCallback(filterId); + DemuxFilterPesEvent pesEvent; + pesEvent = { + // temp dump meta data + .streamId = mPesOutput[3], + .dataLength = static_cast(mPesOutput.size()), + }; + ALOGD("[Demux] assembled pes data length %d", pesEvent.dataLength); + + int size = mFilterEvents[filterId].events.size(); + mFilterEvents[filterId].events.resize(size + 1); + mFilterEvents[filterId].events[size].pes(pesEvent); + mPesOutput.clear(); } mFilterOutputs[filterId].clear(); @@ -672,8 +702,10 @@ void Demux::filterThreadLoop(uint32_t filterId) { continue; } // After successfully write, send a callback and wait for the read to be done - mDemuxCallbacks[filterId]->onFilterEvent(mFilterEvents[filterId]); + mFilterCallbacks[filterId]->onFilterEvent(mFilterEvents[filterId]); mFilterEvents[filterId].events.resize(0); + mFilterStatus[filterId] = DemuxFilterStatus::DATA_READY; + mFilterCallbacks[filterId]->onFilterStatus(filterId, mFilterStatus[filterId]); break; } @@ -693,18 +725,20 @@ void Demux::filterThreadLoop(uint32_t filterId) { break; } - if (mDemuxCallbacks[filterId] == nullptr) { + if (mFilterCallbacks[filterId] == nullptr) { ALOGD("[Demux] filter %d does not hava callback. Ending thread", filterId); break; } + maySendFilterStatusCallback(filterId); + while (mFilterThreadRunning[filterId]) { std::lock_guard lock(mFilterEventLock); if (mFilterEvents[filterId].events.size() == 0) { continue; } // After successfully write, send a callback and wait for the read to be done - mDemuxCallbacks[filterId]->onFilterEvent(mFilterEvents[filterId]); + mFilterCallbacks[filterId]->onFilterEvent(mFilterEvents[filterId]); mFilterEvents[filterId].events.resize(0); break; } @@ -755,16 +789,31 @@ void Demux::maySendInputStatusCallback() { int availableToWrite = mInputMQ->availableToWrite(); DemuxInputStatus newStatus = - checkStatusChange(availableToWrite, availableToRead, mInputSettings.highThreshold, - mInputSettings.lowThreshold); + checkInputStatusChange(availableToWrite, availableToRead, mInputSettings.highThreshold, + mInputSettings.lowThreshold); if (mIntputStatus != newStatus) { mInputCallback->onInputStatus(newStatus); mIntputStatus = newStatus; } } -DemuxInputStatus Demux::checkStatusChange(uint32_t availableToWrite, uint32_t availableToRead, - uint32_t highThreshold, uint32_t lowThreshold) { +void Demux::maySendFilterStatusCallback(uint32_t filterId) { + std::lock_guard lock(mFilterStatusLock); + int availableToRead = mFilterMQs[filterId]->availableToRead(); + int availableToWrite = mInputMQ->availableToWrite(); + int fmqSize = mFilterMQs[filterId]->getQuantumCount(); + + DemuxFilterStatus newStatus = + checkFilterStatusChange(filterId, availableToWrite, availableToRead, + ceil(fmqSize * 0.75), ceil(fmqSize * 0.25)); + if (mFilterStatus[filterId] != newStatus) { + mFilterCallbacks[filterId]->onFilterStatus(filterId, newStatus); + mFilterStatus[filterId] = newStatus; + } +} + +DemuxInputStatus Demux::checkInputStatusChange(uint32_t availableToWrite, uint32_t availableToRead, + uint32_t highThreshold, uint32_t lowThreshold) { if (availableToWrite == 0) { return DemuxInputStatus::SPACE_FULL; } else if (availableToRead > highThreshold) { @@ -777,6 +826,19 @@ DemuxInputStatus Demux::checkStatusChange(uint32_t availableToWrite, uint32_t av return mIntputStatus; } +DemuxFilterStatus Demux::checkFilterStatusChange(uint32_t filterId, uint32_t availableToWrite, + uint32_t availableToRead, uint32_t highThreshold, + uint32_t lowThreshold) { + if (availableToWrite == 0) { + return DemuxFilterStatus::OVERFLOW; + } else if (availableToRead > highThreshold) { + return DemuxFilterStatus::HIGH_WATER; + } else if (availableToRead < lowThreshold) { + return DemuxFilterStatus::LOW_WATER; + } + return mFilterStatus[filterId]; +} + Result Demux::startBroadcastInputLoop() { pthread_create(&mBroadcastInputThread, NULL, __threadLoopBroadcast, this); pthread_setname_np(mBroadcastInputThread, "broadcast_input_thread"); @@ -818,7 +880,7 @@ void Demux::broadcastInputThreadLoop() { } // filter and dispatch filter output vector byteBuffer; - byteBuffer.resize(sizeof(buffer)); + byteBuffer.resize(packetSize); for (int index = 0; index < byteBuffer.size(); index++) { byteBuffer[index] = static_cast(buffer[index]); } diff --git a/tv/tuner/1.0/default/Demux.h b/tv/tuner/1.0/default/Demux.h index e4a4e2bdae..ba0b9b099a 100644 --- a/tv/tuner/1.0/default/Demux.h +++ b/tv/tuner/1.0/default/Demux.h @@ -19,6 +19,7 @@ #include #include +#include #include #include "Frontend.h" #include "Tuner.h" @@ -153,8 +154,12 @@ class Demux : public IDemux { bool readDataFromMQ(); bool writeSectionsAndCreateEvent(uint32_t filterId, vector data); void maySendInputStatusCallback(); - DemuxInputStatus checkStatusChange(uint32_t availableToWrite, uint32_t availableToRead, - uint32_t highThreshold, uint32_t lowThreshold); + void maySendFilterStatusCallback(uint32_t filterId); + DemuxInputStatus checkInputStatusChange(uint32_t availableToWrite, uint32_t availableToRead, + uint32_t highThreshold, uint32_t lowThreshold); + DemuxFilterStatus checkFilterStatusChange(uint32_t filterId, uint32_t availableToWrite, + uint32_t availableToRead, uint32_t highThreshold, + uint32_t lowThreshold); /** * A dispatcher to read and dispatch input data to all the started filters. * Each filter handler handles the data filtering/output writing/filterEvent updating. @@ -203,7 +208,7 @@ class Demux : public IDemux { /** * Demux callbacks used on filter events or IO buffer status */ - vector> mDemuxCallbacks; + vector> mFilterCallbacks; sp mInputCallback; sp mOutputCallback; bool mInputConfigured = false; @@ -219,6 +224,7 @@ class Demux : public IDemux { // FMQ status local records DemuxInputStatus mIntputStatus; + vector mFilterStatus; /** * If a specific filter's writing loop is still running */ @@ -239,6 +245,7 @@ class Demux : public IDemux { * Lock to protect writes to the input status */ std::mutex mInputStatusLock; + std::mutex mFilterStatusLock; std::mutex mBroadcastInputThreadLock; std::mutex mFilterThreadLock; std::mutex mInputThreadLock; @@ -247,6 +254,11 @@ class Demux : public IDemux { * TODO make this dynamic/random/can take as a parameter */ const uint16_t SECTION_WRITE_COUNT = 10; + + // temp handle single PES filter + // TODO handle mulptiple Pes filters + int mPesSizeLeft = 0; + vector mPesOutput; }; } // namespace implementation