Merge changes from topic "pes"
* changes: Adding PES filter functionality to assemble PES data Adding filter status tracking logic into Demux default impl
This commit is contained in:
commit
04587692b1
2 changed files with 108 additions and 34 deletions
|
@ -106,7 +106,7 @@ Return<void> Demux::addFilter(DemuxFilterType type, uint32_t bufferSize,
|
|||
} else {
|
||||
filterId = ++mLastUsedFilterId;
|
||||
|
||||
mDemuxCallbacks.resize(filterId + 1);
|
||||
mFilterCallbacks.resize(filterId + 1);
|
||||
mFilterMQs.resize(filterId + 1);
|
||||
mFilterEvents.resize(filterId + 1);
|
||||
mFilterEventFlags.resize(filterId + 1);
|
||||
|
@ -114,6 +114,7 @@ Return<void> Demux::addFilter(DemuxFilterType type, uint32_t bufferSize,
|
|||
mFilterThreads.resize(filterId + 1);
|
||||
mFilterPids.resize(filterId + 1);
|
||||
mFilterOutputs.resize(filterId + 1);
|
||||
mFilterStatus.resize(filterId + 1);
|
||||
}
|
||||
|
||||
mUsedFilterIds.insert(filterId);
|
||||
|
@ -125,7 +126,7 @@ Return<void> Demux::addFilter(DemuxFilterType type, uint32_t bufferSize,
|
|||
}
|
||||
|
||||
// Add callback
|
||||
mDemuxCallbacks[filterId] = cb;
|
||||
mFilterCallbacks[filterId] = cb;
|
||||
|
||||
// Mapping from the filter ID to the filter event
|
||||
DemuxFilterEvent event{
|
||||
|
@ -211,9 +212,16 @@ Return<Result> Demux::stopFilter(uint32_t filterId) {
|
|||
return Result::SUCCESS;
|
||||
}
|
||||
|
||||
Return<Result> Demux::flushFilter(uint32_t /* filterId */) {
|
||||
Return<Result> Demux::flushFilter(uint32_t filterId) {
|
||||
ALOGV("%s", __FUNCTION__);
|
||||
|
||||
// temp implementation to flush the FMQ
|
||||
int size = mFilterMQs[filterId]->availableToRead();
|
||||
char* buffer = new char[size];
|
||||
mOutputMQ->read((unsigned char*)&buffer[0], size);
|
||||
delete[] buffer;
|
||||
mFilterStatus[filterId] = DemuxFilterStatus::DATA_READY;
|
||||
|
||||
return Result::SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -254,7 +262,7 @@ Return<Result> Demux::close() {
|
|||
mFilterThreads.clear();
|
||||
mUnusedFilterIds.clear();
|
||||
mUsedFilterIds.clear();
|
||||
mDemuxCallbacks.clear();
|
||||
mFilterCallbacks.clear();
|
||||
mFilterMQs.clear();
|
||||
mFilterEvents.clear();
|
||||
mFilterEventFlags.clear();
|
||||
|
@ -458,32 +466,54 @@ Result Demux::startSectionFilterHandler(uint32_t filterId) {
|
|||
|
||||
Result Demux::startPesFilterHandler(uint32_t filterId) {
|
||||
std::lock_guard<std::mutex> lock(mFilterEventLock);
|
||||
DemuxFilterPesEvent pesEvent;
|
||||
if (mFilterOutputs[filterId].empty()) {
|
||||
return Result::SUCCESS;
|
||||
}
|
||||
|
||||
for (int i = 0; i < mFilterOutputs[filterId].size(); i += 188) {
|
||||
uint8_t pusi = mFilterOutputs[filterId][i + 1] & 0x40;
|
||||
uint8_t adaptFieldControl = (mFilterOutputs[filterId][i + 3] & 0x30) >> 4;
|
||||
ALOGD("[Demux] pusi %d, adaptFieldControl %d", pusi, adaptFieldControl);
|
||||
if (pusi && (adaptFieldControl == 0x01)) {
|
||||
vector<uint8_t>::const_iterator first = mFilterOutputs[filterId].begin() + i + 4;
|
||||
vector<uint8_t>::const_iterator last = mFilterOutputs[filterId].begin() + i + 187;
|
||||
vector<uint8_t> filterOutData(first, last);
|
||||
if (!writeDataToFilterMQ(filterOutData, filterId)) {
|
||||
mFilterOutputs[filterId].clear();
|
||||
return Result::INVALID_STATE;
|
||||
if (mPesSizeLeft == 0) {
|
||||
uint32_t prefix = (mFilterOutputs[filterId][i + 4] << 16) |
|
||||
(mFilterOutputs[filterId][i + 5] << 8) |
|
||||
mFilterOutputs[filterId][i + 6];
|
||||
ALOGD("[Demux] prefix %d", prefix);
|
||||
if (prefix == 0x000001) {
|
||||
// TODO handle mulptiple Pes filters
|
||||
mPesSizeLeft =
|
||||
(mFilterOutputs[filterId][i + 7] << 8) | mFilterOutputs[filterId][i + 8];
|
||||
ALOGD("[Demux] pes data length %d", mPesSizeLeft);
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
pesEvent = {
|
||||
// temp dump meta data
|
||||
.streamId = filterOutData[3],
|
||||
.dataLength = static_cast<uint16_t>(filterOutData.size()),
|
||||
};
|
||||
int size = mFilterEvents[filterId].events.size();
|
||||
mFilterEvents[filterId].events.resize(size + 1);
|
||||
mFilterEvents[filterId].events[size].pes(pesEvent);
|
||||
}
|
||||
|
||||
int endPoint = min(184, mPesSizeLeft);
|
||||
// append data and check size
|
||||
vector<uint8_t>::const_iterator first = mFilterOutputs[filterId].begin() + i + 4;
|
||||
vector<uint8_t>::const_iterator last = mFilterOutputs[filterId].begin() + i + 3 + endPoint;
|
||||
mPesOutput.insert(mPesOutput.end(), first, last);
|
||||
// size does not match then continue
|
||||
mPesSizeLeft -= endPoint;
|
||||
if (mPesSizeLeft > 0) {
|
||||
continue;
|
||||
}
|
||||
// size match then create event
|
||||
if (!writeDataToFilterMQ(mPesOutput, filterId)) {
|
||||
mFilterOutputs[filterId].clear();
|
||||
return Result::INVALID_STATE;
|
||||
}
|
||||
maySendFilterStatusCallback(filterId);
|
||||
DemuxFilterPesEvent pesEvent;
|
||||
pesEvent = {
|
||||
// temp dump meta data
|
||||
.streamId = mPesOutput[3],
|
||||
.dataLength = static_cast<uint16_t>(mPesOutput.size()),
|
||||
};
|
||||
ALOGD("[Demux] assembled pes data length %d", pesEvent.dataLength);
|
||||
|
||||
int size = mFilterEvents[filterId].events.size();
|
||||
mFilterEvents[filterId].events.resize(size + 1);
|
||||
mFilterEvents[filterId].events[size].pes(pesEvent);
|
||||
mPesOutput.clear();
|
||||
}
|
||||
|
||||
mFilterOutputs[filterId].clear();
|
||||
|
@ -672,8 +702,10 @@ void Demux::filterThreadLoop(uint32_t filterId) {
|
|||
continue;
|
||||
}
|
||||
// After successfully write, send a callback and wait for the read to be done
|
||||
mDemuxCallbacks[filterId]->onFilterEvent(mFilterEvents[filterId]);
|
||||
mFilterCallbacks[filterId]->onFilterEvent(mFilterEvents[filterId]);
|
||||
mFilterEvents[filterId].events.resize(0);
|
||||
mFilterStatus[filterId] = DemuxFilterStatus::DATA_READY;
|
||||
mFilterCallbacks[filterId]->onFilterStatus(filterId, mFilterStatus[filterId]);
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -693,18 +725,20 @@ void Demux::filterThreadLoop(uint32_t filterId) {
|
|||
break;
|
||||
}
|
||||
|
||||
if (mDemuxCallbacks[filterId] == nullptr) {
|
||||
if (mFilterCallbacks[filterId] == nullptr) {
|
||||
ALOGD("[Demux] filter %d does not hava callback. Ending thread", filterId);
|
||||
break;
|
||||
}
|
||||
|
||||
maySendFilterStatusCallback(filterId);
|
||||
|
||||
while (mFilterThreadRunning[filterId]) {
|
||||
std::lock_guard<std::mutex> lock(mFilterEventLock);
|
||||
if (mFilterEvents[filterId].events.size() == 0) {
|
||||
continue;
|
||||
}
|
||||
// After successfully write, send a callback and wait for the read to be done
|
||||
mDemuxCallbacks[filterId]->onFilterEvent(mFilterEvents[filterId]);
|
||||
mFilterCallbacks[filterId]->onFilterEvent(mFilterEvents[filterId]);
|
||||
mFilterEvents[filterId].events.resize(0);
|
||||
break;
|
||||
}
|
||||
|
@ -755,16 +789,31 @@ void Demux::maySendInputStatusCallback() {
|
|||
int availableToWrite = mInputMQ->availableToWrite();
|
||||
|
||||
DemuxInputStatus newStatus =
|
||||
checkStatusChange(availableToWrite, availableToRead, mInputSettings.highThreshold,
|
||||
mInputSettings.lowThreshold);
|
||||
checkInputStatusChange(availableToWrite, availableToRead, mInputSettings.highThreshold,
|
||||
mInputSettings.lowThreshold);
|
||||
if (mIntputStatus != newStatus) {
|
||||
mInputCallback->onInputStatus(newStatus);
|
||||
mIntputStatus = newStatus;
|
||||
}
|
||||
}
|
||||
|
||||
DemuxInputStatus Demux::checkStatusChange(uint32_t availableToWrite, uint32_t availableToRead,
|
||||
uint32_t highThreshold, uint32_t lowThreshold) {
|
||||
void Demux::maySendFilterStatusCallback(uint32_t filterId) {
|
||||
std::lock_guard<std::mutex> lock(mFilterStatusLock);
|
||||
int availableToRead = mFilterMQs[filterId]->availableToRead();
|
||||
int availableToWrite = mInputMQ->availableToWrite();
|
||||
int fmqSize = mFilterMQs[filterId]->getQuantumCount();
|
||||
|
||||
DemuxFilterStatus newStatus =
|
||||
checkFilterStatusChange(filterId, availableToWrite, availableToRead,
|
||||
ceil(fmqSize * 0.75), ceil(fmqSize * 0.25));
|
||||
if (mFilterStatus[filterId] != newStatus) {
|
||||
mFilterCallbacks[filterId]->onFilterStatus(filterId, newStatus);
|
||||
mFilterStatus[filterId] = newStatus;
|
||||
}
|
||||
}
|
||||
|
||||
DemuxInputStatus Demux::checkInputStatusChange(uint32_t availableToWrite, uint32_t availableToRead,
|
||||
uint32_t highThreshold, uint32_t lowThreshold) {
|
||||
if (availableToWrite == 0) {
|
||||
return DemuxInputStatus::SPACE_FULL;
|
||||
} else if (availableToRead > highThreshold) {
|
||||
|
@ -777,6 +826,19 @@ DemuxInputStatus Demux::checkStatusChange(uint32_t availableToWrite, uint32_t av
|
|||
return mIntputStatus;
|
||||
}
|
||||
|
||||
DemuxFilterStatus Demux::checkFilterStatusChange(uint32_t filterId, uint32_t availableToWrite,
|
||||
uint32_t availableToRead, uint32_t highThreshold,
|
||||
uint32_t lowThreshold) {
|
||||
if (availableToWrite == 0) {
|
||||
return DemuxFilterStatus::OVERFLOW;
|
||||
} else if (availableToRead > highThreshold) {
|
||||
return DemuxFilterStatus::HIGH_WATER;
|
||||
} else if (availableToRead < lowThreshold) {
|
||||
return DemuxFilterStatus::LOW_WATER;
|
||||
}
|
||||
return mFilterStatus[filterId];
|
||||
}
|
||||
|
||||
Result Demux::startBroadcastInputLoop() {
|
||||
pthread_create(&mBroadcastInputThread, NULL, __threadLoopBroadcast, this);
|
||||
pthread_setname_np(mBroadcastInputThread, "broadcast_input_thread");
|
||||
|
@ -818,7 +880,7 @@ void Demux::broadcastInputThreadLoop() {
|
|||
}
|
||||
// filter and dispatch filter output
|
||||
vector<uint8_t> byteBuffer;
|
||||
byteBuffer.resize(sizeof(buffer));
|
||||
byteBuffer.resize(packetSize);
|
||||
for (int index = 0; index < byteBuffer.size(); index++) {
|
||||
byteBuffer[index] = static_cast<uint8_t>(buffer[index]);
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
#include <android/hardware/tv/tuner/1.0/IDemux.h>
|
||||
#include <fmq/MessageQueue.h>
|
||||
#include <math.h>
|
||||
#include <set>
|
||||
#include "Frontend.h"
|
||||
#include "Tuner.h"
|
||||
|
@ -153,8 +154,12 @@ class Demux : public IDemux {
|
|||
bool readDataFromMQ();
|
||||
bool writeSectionsAndCreateEvent(uint32_t filterId, vector<uint8_t> data);
|
||||
void maySendInputStatusCallback();
|
||||
DemuxInputStatus checkStatusChange(uint32_t availableToWrite, uint32_t availableToRead,
|
||||
uint32_t highThreshold, uint32_t lowThreshold);
|
||||
void maySendFilterStatusCallback(uint32_t filterId);
|
||||
DemuxInputStatus checkInputStatusChange(uint32_t availableToWrite, uint32_t availableToRead,
|
||||
uint32_t highThreshold, uint32_t lowThreshold);
|
||||
DemuxFilterStatus checkFilterStatusChange(uint32_t filterId, uint32_t availableToWrite,
|
||||
uint32_t availableToRead, uint32_t highThreshold,
|
||||
uint32_t lowThreshold);
|
||||
/**
|
||||
* A dispatcher to read and dispatch input data to all the started filters.
|
||||
* Each filter handler handles the data filtering/output writing/filterEvent updating.
|
||||
|
@ -203,7 +208,7 @@ class Demux : public IDemux {
|
|||
/**
|
||||
* Demux callbacks used on filter events or IO buffer status
|
||||
*/
|
||||
vector<sp<IDemuxCallback>> mDemuxCallbacks;
|
||||
vector<sp<IDemuxCallback>> mFilterCallbacks;
|
||||
sp<IDemuxCallback> mInputCallback;
|
||||
sp<IDemuxCallback> mOutputCallback;
|
||||
bool mInputConfigured = false;
|
||||
|
@ -219,6 +224,7 @@ class Demux : public IDemux {
|
|||
|
||||
// FMQ status local records
|
||||
DemuxInputStatus mIntputStatus;
|
||||
vector<DemuxFilterStatus> mFilterStatus;
|
||||
/**
|
||||
* If a specific filter's writing loop is still running
|
||||
*/
|
||||
|
@ -239,6 +245,7 @@ class Demux : public IDemux {
|
|||
* Lock to protect writes to the input status
|
||||
*/
|
||||
std::mutex mInputStatusLock;
|
||||
std::mutex mFilterStatusLock;
|
||||
std::mutex mBroadcastInputThreadLock;
|
||||
std::mutex mFilterThreadLock;
|
||||
std::mutex mInputThreadLock;
|
||||
|
@ -247,6 +254,11 @@ class Demux : public IDemux {
|
|||
* TODO make this dynamic/random/can take as a parameter
|
||||
*/
|
||||
const uint16_t SECTION_WRITE_COUNT = 10;
|
||||
|
||||
// temp handle single PES filter
|
||||
// TODO handle mulptiple Pes filters
|
||||
int mPesSizeLeft = 0;
|
||||
vector<uint8_t> mPesOutput;
|
||||
};
|
||||
|
||||
} // namespace implementation
|
||||
|
|
Loading…
Reference in a new issue