Tuner HAL Demux Playback interface implementation
Test: manual Bug: 135709325 Change-Id: I0b673159b667c5bde47e9ed285cfa1bdc6c668c6
This commit is contained in:
parent
2226b070fb
commit
a4885299c2
2 changed files with 529 additions and 190 deletions
|
@ -73,34 +73,6 @@ Demux::Demux(uint32_t demuxId) {
|
|||
|
||||
Demux::~Demux() {}
|
||||
|
||||
bool Demux::createAndSaveMQ(uint32_t bufferSize, uint32_t filterId) {
|
||||
ALOGV("%s", __FUNCTION__);
|
||||
|
||||
// Create a synchronized FMQ that supports blocking read/write
|
||||
std::unique_ptr<FilterMQ> tmpFilterMQ =
|
||||
std::unique_ptr<FilterMQ>(new (std::nothrow) FilterMQ(bufferSize, true));
|
||||
if (!tmpFilterMQ->isValid()) {
|
||||
ALOGW("Failed to create FMQ of filter with id: %d", filterId);
|
||||
return false;
|
||||
}
|
||||
|
||||
mFilterMQs.resize(filterId + 1);
|
||||
mFilterMQs[filterId] = std::move(tmpFilterMQ);
|
||||
|
||||
EventFlag* mFilterEventFlag;
|
||||
if (EventFlag::createEventFlag(mFilterMQs[filterId]->getEventFlagWord(), &mFilterEventFlag) !=
|
||||
OK) {
|
||||
return false;
|
||||
}
|
||||
mFilterEventFlags.resize(filterId + 1);
|
||||
mFilterEventFlags[filterId] = mFilterEventFlag;
|
||||
mFilterWriteCount.resize(filterId + 1);
|
||||
mFilterWriteCount[filterId] = 0;
|
||||
mThreadRunning.resize(filterId + 1);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
Return<Result> Demux::setFrontendDataSource(uint32_t frontendId) {
|
||||
ALOGV("%s", __FUNCTION__);
|
||||
|
||||
|
@ -113,23 +85,42 @@ Return<void> Demux::addFilter(DemuxFilterType type, uint32_t bufferSize,
|
|||
const sp<IDemuxCallback>& cb, addFilter_cb _hidl_cb) {
|
||||
ALOGV("%s", __FUNCTION__);
|
||||
|
||||
uint32_t filterId = mLastUsedFilterId + 1;
|
||||
mLastUsedFilterId += 1;
|
||||
uint32_t filterId;
|
||||
|
||||
if (!mUnusedFilterIds.empty()) {
|
||||
filterId = *mUnusedFilterIds.begin();
|
||||
|
||||
mUnusedFilterIds.erase(filterId);
|
||||
} else {
|
||||
filterId = ++mLastUsedFilterId;
|
||||
|
||||
mDemuxCallbacks.resize(filterId + 1);
|
||||
mFilterMQs.resize(filterId + 1);
|
||||
mFilterEvents.resize(filterId + 1);
|
||||
mFilterEventFlags.resize(filterId + 1);
|
||||
mFilterThreadRunning.resize(filterId + 1);
|
||||
mFilterThreads.resize(filterId + 1);
|
||||
}
|
||||
|
||||
mUsedFilterIds.insert(filterId);
|
||||
|
||||
if ((type != DemuxFilterType::PCR || type != DemuxFilterType::TS) && cb == nullptr) {
|
||||
ALOGW("callback can't be null");
|
||||
_hidl_cb(Result::INVALID_ARGUMENT, filterId);
|
||||
return Void();
|
||||
}
|
||||
|
||||
// Add callback
|
||||
mDemuxCallbacks.resize(filterId + 1);
|
||||
mDemuxCallbacks[filterId] = cb;
|
||||
|
||||
// Mapping from the filter ID to the filter type
|
||||
mFilterTypes.resize(filterId + 1);
|
||||
mFilterTypes[filterId] = type;
|
||||
// Mapping from the filter ID to the filter event
|
||||
DemuxFilterEvent event{
|
||||
.filterId = filterId,
|
||||
.filterType = type,
|
||||
};
|
||||
mFilterEvents[filterId] = event;
|
||||
|
||||
if (!createAndSaveMQ(bufferSize, filterId)) {
|
||||
if (!createFilterMQ(bufferSize, filterId)) {
|
||||
_hidl_cb(Result::UNKNOWN_ERROR, -1);
|
||||
return Void();
|
||||
}
|
||||
|
@ -141,8 +132,8 @@ Return<void> Demux::addFilter(DemuxFilterType type, uint32_t bufferSize,
|
|||
Return<void> Demux::getFilterQueueDesc(uint32_t filterId, getFilterQueueDesc_cb _hidl_cb) {
|
||||
ALOGV("%s", __FUNCTION__);
|
||||
|
||||
if (filterId < 0 || filterId > mLastUsedFilterId) {
|
||||
ALOGW("No filter with id: %d exists", filterId);
|
||||
if (mUsedFilterIds.find(filterId) == mUsedFilterIds.end()) {
|
||||
ALOGW("No filter with id: %d exists to get desc", filterId);
|
||||
_hidl_cb(Result::INVALID_ARGUMENT, FilterMQ::Descriptor());
|
||||
return Void();
|
||||
}
|
||||
|
@ -160,35 +151,29 @@ Return<Result> Demux::configureFilter(uint32_t /* filterId */,
|
|||
|
||||
Return<Result> Demux::startFilter(uint32_t filterId) {
|
||||
ALOGV("%s", __FUNCTION__);
|
||||
Result result;
|
||||
|
||||
if (filterId < 0 || filterId > mLastUsedFilterId) {
|
||||
ALOGW("No filter with id: %d exists", filterId);
|
||||
if (mUsedFilterIds.find(filterId) == mUsedFilterIds.end()) {
|
||||
ALOGW("No filter with id: %d exists to start filter", filterId);
|
||||
return Result::INVALID_ARGUMENT;
|
||||
}
|
||||
|
||||
DemuxFilterType filterType = mFilterTypes[filterId];
|
||||
Result result;
|
||||
DemuxFilterEvent event{
|
||||
.filterId = filterId,
|
||||
.filterType = filterType,
|
||||
};
|
||||
|
||||
switch (filterType) {
|
||||
switch (mFilterEvents[filterId].filterType) {
|
||||
case DemuxFilterType::SECTION:
|
||||
result = startSectionFilterHandler(event);
|
||||
result = startFilterLoop(filterId);
|
||||
break;
|
||||
case DemuxFilterType::PES:
|
||||
result = startPesFilterHandler(event);
|
||||
result = startPesFilterHandler(filterId);
|
||||
break;
|
||||
case DemuxFilterType::TS:
|
||||
result = startTsFilterHandler();
|
||||
return Result::SUCCESS;
|
||||
case DemuxFilterType::AUDIO:
|
||||
case DemuxFilterType::VIDEO:
|
||||
result = startMediaFilterHandler(event);
|
||||
result = startMediaFilterHandler(filterId);
|
||||
break;
|
||||
case DemuxFilterType::RECORD:
|
||||
result = startRecordFilterHandler(event);
|
||||
result = startRecordFilterHandler(filterId);
|
||||
break;
|
||||
case DemuxFilterType::PCR:
|
||||
result = startPcrFilterHandler();
|
||||
|
@ -212,9 +197,13 @@ Return<Result> Demux::flushFilter(uint32_t /* filterId */) {
|
|||
return Result::SUCCESS;
|
||||
}
|
||||
|
||||
Return<Result> Demux::removeFilter(uint32_t /* filterId */) {
|
||||
Return<Result> Demux::removeFilter(uint32_t filterId) {
|
||||
ALOGV("%s", __FUNCTION__);
|
||||
|
||||
// resetFilterRecords(filterId);
|
||||
mUsedFilterIds.erase(filterId);
|
||||
mUnusedFilterIds.insert(filterId);
|
||||
|
||||
return Result::SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -239,25 +228,291 @@ Return<void> Demux::getAvSyncTime(AvSyncHwId /* avSyncHwId */, getAvSyncTime_cb
|
|||
Return<Result> Demux::close() {
|
||||
ALOGV("%s", __FUNCTION__);
|
||||
|
||||
set<uint32_t>::iterator it;
|
||||
mInputThread = 0;
|
||||
mOutputThread = 0;
|
||||
mFilterThreads.clear();
|
||||
mUnusedFilterIds.clear();
|
||||
mUsedFilterIds.clear();
|
||||
mDemuxCallbacks.clear();
|
||||
mFilterMQs.clear();
|
||||
mFilterEvents.clear();
|
||||
mFilterEventFlags.clear();
|
||||
mLastUsedFilterId = -1;
|
||||
|
||||
return Result::SUCCESS;
|
||||
}
|
||||
|
||||
bool Demux::writeSectionsAndCreateEvent(DemuxFilterEvent& event, uint32_t sectionNum) {
|
||||
event.events.resize(sectionNum);
|
||||
for (int i = 0; i < sectionNum; i++) {
|
||||
DemuxFilterSectionEvent secEvent;
|
||||
secEvent = {
|
||||
// temp dump meta data
|
||||
.tableId = 0,
|
||||
.version = 1,
|
||||
.sectionNum = 1,
|
||||
.dataLength = 530,
|
||||
};
|
||||
event.events[i].section(secEvent);
|
||||
if (!writeDataToFilterMQ(fakeDataInputBuffer, event.filterId)) {
|
||||
return false;
|
||||
}
|
||||
Return<Result> Demux::addOutput(uint32_t bufferSize, const sp<IDemuxCallback>& cb) {
|
||||
ALOGV("%s", __FUNCTION__);
|
||||
|
||||
// Create a synchronized FMQ that supports blocking read/write
|
||||
std::unique_ptr<FilterMQ> tmpFilterMQ =
|
||||
std::unique_ptr<FilterMQ>(new (std::nothrow) FilterMQ(bufferSize, true));
|
||||
if (!tmpFilterMQ->isValid()) {
|
||||
ALOGW("Failed to create output FMQ");
|
||||
return Result::UNKNOWN_ERROR;
|
||||
}
|
||||
|
||||
mOutputMQ = std::move(tmpFilterMQ);
|
||||
|
||||
if (EventFlag::createEventFlag(mOutputMQ->getEventFlagWord(), &mOutputEventFlag) != OK) {
|
||||
return Result::UNKNOWN_ERROR;
|
||||
}
|
||||
|
||||
mOutputCallback = cb;
|
||||
|
||||
return Result::SUCCESS;
|
||||
}
|
||||
|
||||
Return<void> Demux::getOutputQueueDesc(getOutputQueueDesc_cb _hidl_cb) {
|
||||
ALOGV("%s", __FUNCTION__);
|
||||
|
||||
if (!mOutputMQ) {
|
||||
_hidl_cb(Result::NOT_INITIALIZED, FilterMQ::Descriptor());
|
||||
return Void();
|
||||
}
|
||||
|
||||
_hidl_cb(Result::SUCCESS, *mOutputMQ->getDesc());
|
||||
return Void();
|
||||
}
|
||||
|
||||
Return<Result> Demux::configureOutput(const DemuxOutputSettings& /* settings */) {
|
||||
ALOGV("%s", __FUNCTION__);
|
||||
|
||||
return Result::SUCCESS;
|
||||
}
|
||||
|
||||
Return<Result> Demux::attachOutputTsFilter(uint32_t /*filterId*/) {
|
||||
ALOGV("%s", __FUNCTION__);
|
||||
|
||||
return Result::SUCCESS;
|
||||
}
|
||||
|
||||
Return<Result> Demux::detachOutputTsFilter(uint32_t /* filterId */) {
|
||||
ALOGV("%s", __FUNCTION__);
|
||||
|
||||
return Result::SUCCESS;
|
||||
}
|
||||
|
||||
Return<Result> Demux::startOutput() {
|
||||
ALOGV("%s", __FUNCTION__);
|
||||
|
||||
return Result::SUCCESS;
|
||||
}
|
||||
|
||||
Return<Result> Demux::stopOutput() {
|
||||
ALOGV("%s", __FUNCTION__);
|
||||
|
||||
return Result::SUCCESS;
|
||||
}
|
||||
|
||||
Return<Result> Demux::flushOutput() {
|
||||
ALOGV("%s", __FUNCTION__);
|
||||
|
||||
return Result::SUCCESS;
|
||||
}
|
||||
|
||||
Return<Result> Demux::removeOutput() {
|
||||
ALOGV("%s", __FUNCTION__);
|
||||
|
||||
return Result::SUCCESS;
|
||||
}
|
||||
|
||||
Return<Result> Demux::addInput(uint32_t bufferSize, const sp<IDemuxCallback>& cb) {
|
||||
ALOGV("%s", __FUNCTION__);
|
||||
|
||||
// Create a synchronized FMQ that supports blocking read/write
|
||||
std::unique_ptr<FilterMQ> tmpInputMQ =
|
||||
std::unique_ptr<FilterMQ>(new (std::nothrow) FilterMQ(bufferSize, true));
|
||||
if (!tmpInputMQ->isValid()) {
|
||||
ALOGW("Failed to create input FMQ");
|
||||
return Result::UNKNOWN_ERROR;
|
||||
}
|
||||
|
||||
mInputMQ = std::move(tmpInputMQ);
|
||||
|
||||
if (EventFlag::createEventFlag(mInputMQ->getEventFlagWord(), &mInputEventFlag) != OK) {
|
||||
return Result::UNKNOWN_ERROR;
|
||||
}
|
||||
|
||||
mInputCallback = cb;
|
||||
|
||||
return Result::SUCCESS;
|
||||
}
|
||||
|
||||
Return<void> Demux::getInputQueueDesc(getInputQueueDesc_cb _hidl_cb) {
|
||||
ALOGV("%s", __FUNCTION__);
|
||||
|
||||
if (!mInputMQ) {
|
||||
_hidl_cb(Result::NOT_INITIALIZED, FilterMQ::Descriptor());
|
||||
return Void();
|
||||
}
|
||||
|
||||
_hidl_cb(Result::SUCCESS, *mInputMQ->getDesc());
|
||||
return Void();
|
||||
}
|
||||
|
||||
Return<Result> Demux::configureInput(const DemuxInputSettings& /* settings */) {
|
||||
ALOGV("%s", __FUNCTION__);
|
||||
|
||||
return Result::SUCCESS;
|
||||
}
|
||||
|
||||
Return<Result> Demux::startInput() {
|
||||
ALOGV("%s", __FUNCTION__);
|
||||
|
||||
pthread_create(&mInputThread, NULL, __threadLoopInput, this);
|
||||
pthread_setname_np(mInputThread, "demux_input_waiting_loop");
|
||||
|
||||
// TODO start another thread to send filter status callback to the framework
|
||||
|
||||
return Result::SUCCESS;
|
||||
}
|
||||
|
||||
Return<Result> Demux::stopInput() {
|
||||
ALOGV("%s", __FUNCTION__);
|
||||
|
||||
return Result::SUCCESS;
|
||||
}
|
||||
|
||||
Return<Result> Demux::flushInput() {
|
||||
ALOGV("%s", __FUNCTION__);
|
||||
|
||||
return Result::SUCCESS;
|
||||
}
|
||||
|
||||
Return<Result> Demux::removeInput() {
|
||||
ALOGV("%s", __FUNCTION__);
|
||||
|
||||
mInputMQ = nullptr;
|
||||
|
||||
return Result::SUCCESS;
|
||||
}
|
||||
|
||||
Result Demux::startFilterLoop(uint32_t filterId) {
|
||||
struct ThreadArgs* threadArgs = (struct ThreadArgs*)malloc(sizeof(struct ThreadArgs));
|
||||
threadArgs->user = this;
|
||||
threadArgs->filterId = filterId;
|
||||
|
||||
pthread_t mFilterThread;
|
||||
pthread_create(&mFilterThread, NULL, __threadLoopFilter, (void*)threadArgs);
|
||||
mFilterThreads[filterId] = mFilterThread;
|
||||
pthread_setname_np(mFilterThread, "demux_filter_waiting_loop");
|
||||
|
||||
return Result::SUCCESS;
|
||||
}
|
||||
|
||||
Result Demux::startSectionFilterHandler(uint32_t filterId, vector<uint8_t> data) {
|
||||
if (!writeSectionsAndCreateEvent(filterId, data)) {
|
||||
ALOGD("[Demux] filter %d fails to write into FMQ. Ending thread", filterId);
|
||||
return Result::UNKNOWN_ERROR;
|
||||
}
|
||||
|
||||
return Result::SUCCESS;
|
||||
}
|
||||
|
||||
Result Demux::startPesFilterHandler(uint32_t filterId) {
|
||||
// TODO generate multiple events in one event callback
|
||||
DemuxFilterPesEvent pesEvent;
|
||||
pesEvent = {
|
||||
// temp dump meta data
|
||||
.streamId = 0,
|
||||
.dataLength = 530,
|
||||
};
|
||||
mFilterEvents[filterId].events.resize(1);
|
||||
mFilterEvents[filterId].events[0].pes(pesEvent);
|
||||
/*pthread_create(&mThreadId, NULL, __threadLoop, this);
|
||||
pthread_setname_np(mThreadId, "demux_section_filter_waiting_loop");*/
|
||||
if (!writeDataToFilterMQ(fakeDataInputBuffer, filterId)) {
|
||||
return Result::INVALID_STATE;
|
||||
}
|
||||
|
||||
if (mDemuxCallbacks[filterId] == nullptr) {
|
||||
return Result::NOT_INITIALIZED;
|
||||
}
|
||||
|
||||
mDemuxCallbacks[filterId]->onFilterEvent(mFilterEvents[filterId]);
|
||||
return Result::SUCCESS;
|
||||
}
|
||||
|
||||
Result Demux::startTsFilterHandler() {
|
||||
// TODO handle starting TS filter
|
||||
return Result::SUCCESS;
|
||||
}
|
||||
|
||||
Result Demux::startMediaFilterHandler(uint32_t filterId) {
|
||||
DemuxFilterMediaEvent mediaEvent;
|
||||
mediaEvent = {
|
||||
// temp dump meta data
|
||||
.pts = 0,
|
||||
.dataLength = 530,
|
||||
.secureMemory = nullptr,
|
||||
};
|
||||
mFilterEvents[filterId].events.resize(1);
|
||||
mFilterEvents[filterId].events[0].media() = mediaEvent;
|
||||
// TODO handle write FQM for media stream
|
||||
return Result::SUCCESS;
|
||||
}
|
||||
|
||||
Result Demux::startRecordFilterHandler(uint32_t filterId) {
|
||||
DemuxFilterRecordEvent recordEvent;
|
||||
recordEvent = {
|
||||
// temp dump meta data
|
||||
.tpid = 0,
|
||||
.packetNum = 0,
|
||||
};
|
||||
recordEvent.indexMask.tsIndexMask() = 0x01;
|
||||
mFilterEvents[filterId].events.resize(1);
|
||||
mFilterEvents[filterId].events[0].ts() = recordEvent;
|
||||
return Result::SUCCESS;
|
||||
}
|
||||
|
||||
Result Demux::startPcrFilterHandler() {
|
||||
// TODO handle starting PCR filter
|
||||
return Result::SUCCESS;
|
||||
}
|
||||
|
||||
bool Demux::createFilterMQ(uint32_t bufferSize, uint32_t filterId) {
|
||||
ALOGV("%s", __FUNCTION__);
|
||||
|
||||
// Create a synchronized FMQ that supports blocking read/write
|
||||
std::unique_ptr<FilterMQ> tmpFilterMQ =
|
||||
std::unique_ptr<FilterMQ>(new (std::nothrow) FilterMQ(bufferSize, true));
|
||||
if (!tmpFilterMQ->isValid()) {
|
||||
ALOGW("Failed to create FMQ of filter with id: %d", filterId);
|
||||
return false;
|
||||
}
|
||||
|
||||
mFilterMQs[filterId] = std::move(tmpFilterMQ);
|
||||
|
||||
EventFlag* filterEventFlag;
|
||||
if (EventFlag::createEventFlag(mFilterMQs[filterId]->getEventFlagWord(), &filterEventFlag) !=
|
||||
OK) {
|
||||
return false;
|
||||
}
|
||||
mFilterEventFlags[filterId] = filterEventFlag;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool Demux::writeSectionsAndCreateEvent(uint32_t filterId, vector<uint8_t> data) {
|
||||
// TODO check how many sections has been read
|
||||
std::lock_guard<std::mutex> lock(mFilterEventLock);
|
||||
int size = mFilterEvents[filterId].events.size();
|
||||
mFilterEvents[filterId].events.resize(size + 1);
|
||||
if (!writeDataToFilterMQ(data, filterId)) {
|
||||
return false;
|
||||
}
|
||||
DemuxFilterSectionEvent secEvent;
|
||||
secEvent = {
|
||||
// temp dump meta data
|
||||
.tableId = 0,
|
||||
.version = 1,
|
||||
.sectionNum = 1,
|
||||
.dataLength = 530,
|
||||
};
|
||||
mFilterEvents[filterId].events[size].section(secEvent);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -269,116 +524,82 @@ bool Demux::writeDataToFilterMQ(const std::vector<uint8_t>& data, uint32_t filte
|
|||
return false;
|
||||
}
|
||||
|
||||
Result Demux::startSectionFilterHandler(DemuxFilterEvent event) {
|
||||
struct ThreadArgs* threadArgs = (struct ThreadArgs*)malloc(sizeof(struct ThreadArgs));
|
||||
threadArgs->user = this;
|
||||
threadArgs->event = &event;
|
||||
bool Demux::filterAndOutputData() {
|
||||
ALOGD("[Demux] start to dispatch data to filters");
|
||||
// Read input data from the input FMQ
|
||||
int size = mInputMQ->availableToRead();
|
||||
vector<uint8_t> dataOutputBuffer;
|
||||
dataOutputBuffer.resize(size);
|
||||
mInputMQ->read(dataOutputBuffer.data(), size);
|
||||
|
||||
pthread_create(&mThreadId, NULL, __threadLoop, (void*)threadArgs);
|
||||
pthread_setname_np(mThreadId, "demux_filter_waiting_loop");
|
||||
|
||||
return Result::SUCCESS;
|
||||
}
|
||||
|
||||
Result Demux::startPesFilterHandler(DemuxFilterEvent& event) {
|
||||
// TODO generate multiple events in one event callback
|
||||
DemuxFilterPesEvent pesEvent;
|
||||
pesEvent = {
|
||||
// temp dump meta data
|
||||
.streamId = 0,
|
||||
.dataLength = 530,
|
||||
};
|
||||
event.events.resize(1);
|
||||
event.events[0].pes(pesEvent);
|
||||
/*pthread_create(&mThreadId, NULL, __threadLoop, this);
|
||||
pthread_setname_np(mThreadId, "demux_section_filter_waiting_loop");*/
|
||||
if (!writeDataToFilterMQ(fakeDataInputBuffer, event.filterId)) {
|
||||
return Result::INVALID_STATE;
|
||||
Result result;
|
||||
// Filter the data and feed the output to each filter
|
||||
set<uint32_t>::iterator it;
|
||||
for (it = mUsedFilterIds.begin(); it != mUsedFilterIds.end(); it++) {
|
||||
switch (mFilterEvents[*it].filterType) {
|
||||
case DemuxFilterType::SECTION:
|
||||
result = startSectionFilterHandler(*it, dataOutputBuffer);
|
||||
break;
|
||||
case DemuxFilterType::PES:
|
||||
result = startPesFilterHandler(*it);
|
||||
break;
|
||||
case DemuxFilterType::TS:
|
||||
result = startTsFilterHandler();
|
||||
break;
|
||||
case DemuxFilterType::AUDIO:
|
||||
case DemuxFilterType::VIDEO:
|
||||
result = startMediaFilterHandler(*it);
|
||||
break;
|
||||
case DemuxFilterType::RECORD:
|
||||
result = startRecordFilterHandler(*it);
|
||||
break;
|
||||
case DemuxFilterType::PCR:
|
||||
result = startPcrFilterHandler();
|
||||
break;
|
||||
default:
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
if (mDemuxCallbacks[event.filterId] == nullptr) {
|
||||
return Result::NOT_INITIALIZED;
|
||||
}
|
||||
|
||||
mDemuxCallbacks[event.filterId]->onFilterEvent(event);
|
||||
return Result::SUCCESS;
|
||||
return result == Result::SUCCESS;
|
||||
}
|
||||
|
||||
Result Demux::startTsFilterHandler() {
|
||||
// TODO handle starting TS filter
|
||||
return Result::SUCCESS;
|
||||
}
|
||||
|
||||
Result Demux::startMediaFilterHandler(DemuxFilterEvent& event) {
|
||||
DemuxFilterMediaEvent mediaEvent;
|
||||
mediaEvent = {
|
||||
// temp dump meta data
|
||||
.pts = 0,
|
||||
.dataLength = 530,
|
||||
.secureMemory = nullptr,
|
||||
};
|
||||
event.events.resize(1);
|
||||
event.events[0].media() = mediaEvent;
|
||||
// TODO handle write FQM for media stream
|
||||
return Result::SUCCESS;
|
||||
}
|
||||
|
||||
Result Demux::startRecordFilterHandler(DemuxFilterEvent& event) {
|
||||
DemuxFilterRecordEvent recordEvent;
|
||||
recordEvent = {
|
||||
// temp dump meta data
|
||||
.tpid = 0,
|
||||
.packetNum = 0,
|
||||
};
|
||||
recordEvent.indexMask.tsIndexMask() = 0x01;
|
||||
event.events.resize(1);
|
||||
event.events[0].ts() = recordEvent;
|
||||
return Result::SUCCESS;
|
||||
}
|
||||
|
||||
Result Demux::startPcrFilterHandler() {
|
||||
// TODO handle starting PCR filter
|
||||
return Result::SUCCESS;
|
||||
}
|
||||
|
||||
void* Demux::__threadLoop(void* threadArg) {
|
||||
void* Demux::__threadLoopFilter(void* threadArg) {
|
||||
Demux* const self = static_cast<Demux*>(((struct ThreadArgs*)threadArg)->user);
|
||||
self->filterThreadLoop(((struct ThreadArgs*)threadArg)->event);
|
||||
self->filterThreadLoop(((struct ThreadArgs*)threadArg)->filterId);
|
||||
return 0;
|
||||
}
|
||||
|
||||
void Demux::filterThreadLoop(DemuxFilterEvent* event) {
|
||||
uint32_t filterId = event->filterId;
|
||||
ALOGD("[Demux] filter %d threadLoop start.", filterId);
|
||||
mThreadRunning[filterId] = true;
|
||||
void* Demux::__threadLoopInput(void* user) {
|
||||
Demux* const self = static_cast<Demux*>(user);
|
||||
self->inputThreadLoop();
|
||||
return 0;
|
||||
}
|
||||
|
||||
while (mThreadRunning[filterId]) {
|
||||
void Demux::filterThreadLoop(uint32_t filterId) {
|
||||
ALOGD("[Demux] filter %d threadLoop start.", filterId);
|
||||
mFilterThreadRunning[filterId] = true;
|
||||
|
||||
// For the first time of filter output, implementation needs to send the filter
|
||||
// Event Callback without waiting for the DATA_CONSUMED to init the process.
|
||||
while (mFilterThreadRunning[filterId]) {
|
||||
if (mFilterEvents[filterId].events.size() == 0) {
|
||||
ALOGD("[Demux] wait for filter data output.");
|
||||
usleep(1000 * 1000);
|
||||
continue;
|
||||
}
|
||||
// After successfully write, send a callback and wait for the read to be done
|
||||
mDemuxCallbacks[filterId]->onFilterEvent(mFilterEvents[filterId]);
|
||||
mFilterEvents[filterId].events.resize(0);
|
||||
break;
|
||||
}
|
||||
|
||||
while (mFilterThreadRunning[filterId]) {
|
||||
uint32_t efState = 0;
|
||||
// We do not wait for the last round of writen data to be read to finish the thread
|
||||
// because the VTS can verify the reading itself.
|
||||
for (int i = 0; i < SECTION_WRITE_COUNT; i++) {
|
||||
DemuxFilterEvent filterEvent{
|
||||
.filterId = filterId,
|
||||
.filterType = event->filterType,
|
||||
};
|
||||
if (!writeSectionsAndCreateEvent(filterEvent, 2)) {
|
||||
ALOGD("[Demux] filter %d fails to write into FMQ. Ending thread", filterId);
|
||||
break;
|
||||
}
|
||||
mFilterWriteCount[filterId]++;
|
||||
if (mDemuxCallbacks[filterId] == nullptr) {
|
||||
ALOGD("[Demux] filter %d does not hava callback. Ending thread", filterId);
|
||||
break;
|
||||
}
|
||||
// After successfully write, send a callback and wait for the read to be done
|
||||
mDemuxCallbacks[filterId]->onFilterEvent(filterEvent);
|
||||
// We do not wait for the last read to be done
|
||||
// VTS can verify the read result itself.
|
||||
if (i == SECTION_WRITE_COUNT - 1) {
|
||||
ALOGD("[Demux] filter %d writing done. Ending thread", filterId);
|
||||
break;
|
||||
}
|
||||
while (mThreadRunning[filterId]) {
|
||||
while (mFilterThreadRunning[filterId]) {
|
||||
status_t status = mFilterEventFlags[filterId]->wait(
|
||||
static_cast<uint32_t>(DemuxQueueNotifyBits::DATA_CONSUMED), &efState,
|
||||
WAIT_TIMEOUT, true /* retry on spurious wake */);
|
||||
|
@ -388,15 +609,60 @@ void Demux::filterThreadLoop(DemuxFilterEvent* event) {
|
|||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
mFilterWriteCount[filterId] = 0;
|
||||
mThreadRunning[filterId] = false;
|
||||
if (mDemuxCallbacks[filterId] == nullptr) {
|
||||
ALOGD("[Demux] filter %d does not hava callback. Ending thread", filterId);
|
||||
break;
|
||||
}
|
||||
|
||||
while (mFilterThreadRunning[filterId]) {
|
||||
std::lock_guard<std::mutex> lock(mFilterEventLock);
|
||||
if (mFilterEvents[filterId].events.size() == 0) {
|
||||
continue;
|
||||
}
|
||||
// After successfully write, send a callback and wait for the read to be done
|
||||
mDemuxCallbacks[filterId]->onFilterEvent(mFilterEvents[filterId]);
|
||||
mFilterEvents[filterId].events.resize(0);
|
||||
break;
|
||||
}
|
||||
// We do not wait for the last read to be done
|
||||
// VTS can verify the read result itself.
|
||||
if (i == SECTION_WRITE_COUNT - 1) {
|
||||
ALOGD("[Demux] filter %d writing done. Ending thread", filterId);
|
||||
break;
|
||||
}
|
||||
}
|
||||
mFilterThreadRunning[filterId] = false;
|
||||
}
|
||||
|
||||
ALOGD("[Demux] filter thread ended.");
|
||||
}
|
||||
|
||||
void Demux::inputThreadLoop() {
|
||||
ALOGD("[Demux] input threadLoop start.");
|
||||
mInputThreadRunning = true;
|
||||
|
||||
while (mInputThreadRunning) {
|
||||
uint32_t efState = 0;
|
||||
status_t status =
|
||||
mInputEventFlag->wait(static_cast<uint32_t>(DemuxQueueNotifyBits::DATA_READY),
|
||||
&efState, WAIT_TIMEOUT, true /* retry on spurious wake */);
|
||||
if (status != OK) {
|
||||
ALOGD("[Demux] wait for data ready on the input FMQ");
|
||||
continue;
|
||||
}
|
||||
// Our current implementation filter the data and write it into the filter FMQ immedaitely
|
||||
// after the DATA_READY from the VTS/framework
|
||||
if (!filterAndOutputData()) {
|
||||
ALOGD("[Demux] input data failed to be filtered. Ending thread");
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
mInputThreadRunning = false;
|
||||
ALOGD("[Demux] input thread ended.");
|
||||
}
|
||||
|
||||
} // namespace implementation
|
||||
} // namespace V1_0
|
||||
} // namespace tuner
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
#include <android/hardware/tv/tuner/1.0/IDemux.h>
|
||||
#include <fmq/MessageQueue.h>
|
||||
#include <set>
|
||||
|
||||
using namespace std;
|
||||
|
||||
|
@ -43,6 +44,8 @@ class Demux : public IDemux {
|
|||
public:
|
||||
Demux(uint32_t demuxId);
|
||||
|
||||
~Demux();
|
||||
|
||||
virtual Return<Result> setFrontendDataSource(uint32_t frontendId) override;
|
||||
|
||||
virtual Return<Result> close() override;
|
||||
|
@ -68,8 +71,58 @@ class Demux : public IDemux {
|
|||
|
||||
virtual Return<void> getAvSyncTime(AvSyncHwId avSyncHwId, getAvSyncTime_cb _hidl_cb) override;
|
||||
|
||||
virtual Return<Result> addInput(uint32_t bufferSize, const sp<IDemuxCallback>& cb) override;
|
||||
|
||||
virtual Return<void> getInputQueueDesc(getInputQueueDesc_cb _hidl_cb) override;
|
||||
|
||||
virtual Return<Result> configureInput(const DemuxInputSettings& settings) override;
|
||||
|
||||
virtual Return<Result> startInput() override;
|
||||
|
||||
virtual Return<Result> stopInput() override;
|
||||
|
||||
virtual Return<Result> flushInput() override;
|
||||
|
||||
virtual Return<Result> removeInput() override;
|
||||
|
||||
virtual Return<Result> addOutput(uint32_t bufferSize, const sp<IDemuxCallback>& cb) override;
|
||||
|
||||
virtual Return<void> getOutputQueueDesc(getOutputQueueDesc_cb _hidl_cb) override;
|
||||
|
||||
virtual Return<Result> configureOutput(const DemuxOutputSettings& settings) override;
|
||||
|
||||
virtual Return<Result> attachOutputTsFilter(uint32_t filterId) override;
|
||||
|
||||
virtual Return<Result> detachOutputTsFilter(uint32_t filterId) override;
|
||||
|
||||
virtual Return<Result> startOutput() override;
|
||||
|
||||
virtual Return<Result> stopOutput() override;
|
||||
|
||||
virtual Return<Result> flushOutput() override;
|
||||
|
||||
virtual Return<Result> removeOutput() override;
|
||||
|
||||
private:
|
||||
virtual ~Demux();
|
||||
// A struct that passes the arguments to a newly created filter thread
|
||||
struct ThreadArgs {
|
||||
Demux* user;
|
||||
uint32_t filterId;
|
||||
};
|
||||
|
||||
/**
|
||||
* Filter handlers to handle the data filtering.
|
||||
* They are also responsible to write the filtered output into the filter FMQ
|
||||
* and update the filterEvent bound with the same filterId.
|
||||
*/
|
||||
Result startSectionFilterHandler(uint32_t filterId, vector<uint8_t> data);
|
||||
Result startPesFilterHandler(uint32_t filterId);
|
||||
Result startTsFilterHandler();
|
||||
Result startMediaFilterHandler(uint32_t filterId);
|
||||
Result startRecordFilterHandler(uint32_t filterId);
|
||||
Result startPcrFilterHandler();
|
||||
Result startFilterLoop(uint32_t filterId);
|
||||
|
||||
/**
|
||||
* To create a FilterMQ with the the next available Filter ID.
|
||||
* Creating Event Flag at the same time.
|
||||
|
@ -77,60 +130,80 @@ class Demux : public IDemux {
|
|||
*
|
||||
* Return false is any of the above processes fails.
|
||||
*/
|
||||
bool createAndSaveMQ(uint32_t bufferSize, uint32_t filterId);
|
||||
bool createFilterMQ(uint32_t bufferSize, uint32_t filterId);
|
||||
bool createMQ(FilterMQ* queue, EventFlag* eventFlag, uint32_t bufferSize);
|
||||
void deleteEventFlag();
|
||||
bool writeDataToFilterMQ(const std::vector<uint8_t>& data, uint32_t filterId);
|
||||
Result startSectionFilterHandler(DemuxFilterEvent event);
|
||||
Result startPesFilterHandler(DemuxFilterEvent& event);
|
||||
Result startTsFilterHandler();
|
||||
Result startMediaFilterHandler(DemuxFilterEvent& event);
|
||||
Result startRecordFilterHandler(DemuxFilterEvent& event);
|
||||
Result startPcrFilterHandler();
|
||||
bool writeSectionsAndCreateEvent(DemuxFilterEvent& event, uint32_t sectionNum);
|
||||
void filterThreadLoop(DemuxFilterEvent* event);
|
||||
static void* __threadLoop(void* data);
|
||||
bool readDataFromMQ();
|
||||
bool writeSectionsAndCreateEvent(uint32_t filterId, vector<uint8_t> data);
|
||||
/**
|
||||
* A dispatcher to read and dispatch input data to all the started filters.
|
||||
* Each filter handler handles the data filtering/output writing/filterEvent updating.
|
||||
*/
|
||||
bool filterAndOutputData();
|
||||
static void* __threadLoopFilter(void* data);
|
||||
static void* __threadLoopInput(void* user);
|
||||
void filterThreadLoop(uint32_t filterId);
|
||||
void inputThreadLoop();
|
||||
|
||||
uint32_t mDemuxId;
|
||||
uint32_t mSourceFrontendId;
|
||||
/**
|
||||
* Record the last used filer id. Initial value is -1.
|
||||
* Record the last used filter id. Initial value is -1.
|
||||
* Filter Id starts with 0.
|
||||
*/
|
||||
uint32_t mLastUsedFilterId = -1;
|
||||
/**
|
||||
* Record all the used filter Ids.
|
||||
* Any removed filter id should be removed from this set.
|
||||
*/
|
||||
set<uint32_t> mUsedFilterIds;
|
||||
/**
|
||||
* Record all the unused filter Ids within mLastUsedFilterId.
|
||||
* Removed filter Id should be added into this set.
|
||||
* When this set is not empty, ids here should be allocated first
|
||||
* and added into usedFilterIds.
|
||||
*/
|
||||
set<uint32_t> mUnusedFilterIds;
|
||||
/**
|
||||
* A list of created FilterMQ ptrs.
|
||||
* The array number is the filter ID.
|
||||
*/
|
||||
vector<unique_ptr<FilterMQ>> mFilterMQs;
|
||||
vector<DemuxFilterType> mFilterTypes;
|
||||
vector<EventFlag*> mFilterEventFlags;
|
||||
vector<DemuxFilterEvent> mFilterEvents;
|
||||
unique_ptr<FilterMQ> mInputMQ;
|
||||
unique_ptr<FilterMQ> mOutputMQ;
|
||||
EventFlag* mInputEventFlag;
|
||||
EventFlag* mOutputEventFlag;
|
||||
/**
|
||||
* Demux callbacks used on filter events or IO buffer status
|
||||
*/
|
||||
vector<sp<IDemuxCallback>> mDemuxCallbacks;
|
||||
/**
|
||||
* How many times a specific filter has written since started
|
||||
*/
|
||||
vector<uint16_t> mFilterWriteCount;
|
||||
pthread_t mThreadId = 0;
|
||||
sp<IDemuxCallback> mInputCallback;
|
||||
sp<IDemuxCallback> mOutputCallback;
|
||||
// Thread handlers
|
||||
pthread_t mInputThread;
|
||||
pthread_t mOutputThread;
|
||||
vector<pthread_t> mFilterThreads;
|
||||
/**
|
||||
* If a specific filter's writing loop is still running
|
||||
*/
|
||||
vector<bool> mThreadRunning;
|
||||
vector<bool> mFilterThreadRunning;
|
||||
bool mInputThreadRunning;
|
||||
/**
|
||||
* Lock to protect writes to the FMQs
|
||||
*/
|
||||
std::mutex mWriteLock;
|
||||
/**
|
||||
* Lock to protect writes to the filter event
|
||||
*/
|
||||
std::mutex mFilterEventLock;
|
||||
/**
|
||||
* How many times a filter should write
|
||||
* TODO make this dynamic/random/can take as a parameter
|
||||
*/
|
||||
const uint16_t SECTION_WRITE_COUNT = 10;
|
||||
// A struct that passes the arguments to a newly created filter thread
|
||||
struct ThreadArgs {
|
||||
Demux* user;
|
||||
DemuxFilterEvent* event;
|
||||
};
|
||||
};
|
||||
|
||||
} // namespace implementation
|
||||
|
|
Loading…
Reference in a new issue