今回は実装編その1です。IAsyncReaderの動作部分を実装してみます。
宣言部については前回を参考に。
まずは読み取りパケット部のコードから
//非同期読み取りアイテム //コンストラクタ CDataRequestItem::CDataRequestItem() { } //コピーコンストラクタ CDataRequestItem::CDataRequestItem(const CDataRequestItem &item) : m_lpAsyncReader(item.m_lpAsyncReader), m_lpDataStream(item.m_lpDataStream), m_lpBuffer(item.m_lpBuffer), m_lpContext(item.m_lpContext), m_llReadPos(item.m_llReadPos), m_dwUser(item.m_dwUser), m_lLength(item.m_lLength), m_bIsAligned(item.m_bIsAligned), m_hr(item.m_hr) { } //コピー演算子 CDataRequestItem &CDataRequestItem::operator = (const CDataRequestItem &item) { if(&item != this){ memcpy(this,&item,sizeof(*this)); } return *this; } //アイテムを設定する BOOL CDataRequestItem::SetItem(CDataAsyncReader *lpAsyncReader,IDataStream *lpDataStream,LONGLONG llReadPos,BOOL bIsAligned,long lLength,void *lpBuffer,void *lpContext,DWORD_PTR dwUser) { m_lpAsyncReader = lpAsyncReader; m_lpDataStream = lpDataStream; m_llReadPos = llReadPos; m_bIsAligned = bIsAligned; m_lLength = lLength; m_lpBuffer = lpBuffer; m_lpContext = lpContext; m_dwUser = dwUser; m_hr = VFW_E_TIMEOUT; return TRUE; } //要求を完了する HRESULT CDataRequestItem::CompleteRequest(void) { DWORD dwReadSize; m_lpDataStream->Lock(); m_hr = m_lpDataStream->Seek(m_llReadPos); if(m_hr == S_OK){ m_hr = m_lpDataStream->Read(m_lpBuffer,m_lLength,&dwReadSize,m_bIsAligned); if(m_hr == OLE_S_FIRST){ if(m_lpContext != NULL){ ((IMediaSample *)m_lpContext)->SetDiscontinuity(TRUE); m_hr = S_OK; } } if(SUCCEEDED(m_hr)){ if(dwReadSize != (DWORD)m_lLength){ m_lLength = dwReadSize; m_hr = S_FALSE; } else m_hr = S_OK; } } m_lpDataStream->Unlock(); return m_hr; } //要求をキャンセルする HRESULT CDataRequestItem::CancelRequest(void) { return S_OK; }
実際に動作をしているのはCompleteRequestだけです。
それ以外は単にデータコピーを行っていたりパケットとして内部データを設定したりするだけです。
CompleteRequestも非同期にデータを読み取るためにLock=>Seek=>Read=>Unlockという流れをとっているいつものものですし。
失敗したときにエラーコードを設定する必要があるのでそれだけ気をつけて。
次にIAsyncReaderを支えるためのクラスの基礎部分
//非同期読み取りインターフェイス //コンストラクタ CDataAsyncReader::CDataAsyncReader(IDataStream *lpDataStream) : m_lpDataStream(lpDataStream), m_vEventInvalidThread(TRUE), m_vEventSetWorkItem(TRUE), m_vEventSetWorkDone(TRUE), m_vEventThreadWorkEnd(TRUE), m_bIsFlushing(FALSE), m_bIsWaitingWorkEnd(FALSE), m_nItemExecCount(0), m_hAsyncReadThread(NULL) { } //デストラクタ CDataAsyncReader::~CDataAsyncReader() { BeginFlush(); DisableAsyncRead(); } //非同期動作を有効にする BOOL CDataAsyncReader::EnableAsyncRead(void) { unsigned int uiThreadID; if(m_hAsyncReadThread != NULL) return TRUE; m_vEventInvalidThread.Reset(); m_hAsyncReadThread = (HANDLE)_beginthreadex(NULL,0,&CDataAsyncReader::StartAsyncReadThread,this,0,&uiThreadID); if(m_hAsyncReadThread == NULL) return FALSE; return TRUE; } //非同期動作を無効にする BOOL CDataAsyncReader::DisableAsyncRead(void) { if(m_hAsyncReadThread == NULL) return TRUE; m_vEventInvalidThread.Set(); WaitForSingleObject(m_hAsyncReadThread,INFINITE); CloseHandle(m_hAsyncReadThread); m_hAsyncReadThread = NULL; return TRUE; }
コンストラクタ、デストラクタそのものはとても簡単です。データを初期化しているだけですし。
なお、止めるときにはFlushの開始(残っているパケットの破棄)=>非同期読み取りの停止という手順をとります。
非同期読み取りの開始と終了はスレッドの作成とスレッドの終了で表すことができます。
このあたりから各同期イベントが出てきますが、それぞれの簡単な役割説明は宣言部で行っていますので参考に。
IAsyncReaderの動作実装部
といっても継承ではなく同じ関数プロトタイプであり、機能が同じ、というだけですが・・・。
//要求を投げる HRESULT CDataAsyncReader::Request(LONGLONG llPos,long lLength,BOOL bIsAligned,void *lpBuffer,void *lpContext,DWORD_PTR dwUser) { CDataRequestItem item; BOOL bRet; if(bIsAligned){ if(!IsAligned(llPos) || !IsAligned((LONG_PTR)lLength) || !IsAligned((LONG_PTR)lpBuffer)) return VFW_E_BADALIGN; } bRet = item.SetItem(this,m_lpDataStream,llPos,bIsAligned,lLength,lpBuffer,lpContext,dwUser); if(bRet) return PutWorkItem(item); else return E_FAIL; } //次の要求が終わるまで待つ HRESULT CDataAsyncReader::WaitForNext(DWORD dwTimeOut,void **lplpContext,DWORD_PTR *lpdwUser,long *lplActualSize) { CDataRequestItem item; LONGLONG llTotalSize; HRESULT hr; CheckPointer(lplpContext,E_POINTER); CheckPointer(lpdwUser,E_POINTER); CheckPointer(lplActualSize,E_POINTER); *lplpContext = NULL; while(1){ if(!m_vEventSetWorkDone.Wait(dwTimeOut)) return VFW_E_TIMEOUT; hr = GetDoneItem(item); if(hr == S_OK){ hr = item.GetResult(); if(hr == S_FALSE){ m_lpDataStream->GetSize(&llTotalSize); if(item.GetStart() + item.GetLength() == llTotalSize){ hr = S_OK; } else hr = E_FAIL; } *lplpContext = item.GetContextData(); *lpdwUser = item.GetUserData(); *lplActualSize = item.GetLength(); return hr; } else{ CLockObject lock(this); if(m_bIsFlushing && !m_bIsWaitingWorkEnd) return VFW_E_WRONG_STATE; } } return E_FAIL; } //アライメントを合わせて同期で読み込む HRESULT CDataAsyncReader::SyncReadAligned(LONGLONG llPos,long lLength,void *lpBuffer,long *lplActualSize,void *lpContext) { CDataRequestItem item; HRESULT hr; if(!IsAligned(llPos) || !IsAligned((LONG_PTR)lLength) || !IsAligned((LONG_PTR)lpBuffer)) return VFW_E_BADALIGN; if(!item.SetItem(this,m_lpDataStream,llPos,TRUE,lLength,lpBuffer,lpContext,0)) return E_FAIL; hr = item.CompleteRequest(); *lplActualSize = item.GetLength(); return hr; } //同期で読み込む HRESULT CDataAsyncReader::SyncRead(LONGLONG llPos,long lLength,void *lpBuffer) { CDataRequestItem item; long lTemp; if(!IsAligned(llPos) && !IsAligned((LONG_PTR)lLength) && !IsAligned((LONG_PTR)lpBuffer)) return SyncReadAligned(llPos,lLength,lpBuffer,&lTemp,NULL); if(!item.SetItem(this,m_lpDataStream,llPos,FALSE,lLength,lpBuffer,NULL,0)) return E_FAIL; return item.CompleteRequest(); } //サイズを取得する HRESULT CDataAsyncReader::Length(LONGLONG *lpllTotalLength,LONGLONG *lpllAvailableLength) { CheckPointer(lpllTotalLength,E_POINTER); return m_lpDataStream->GetSize(lpllTotalLength,lpllAvailableLength) ? S_OK : E_FAIL; } //アライメントを合わせる HRESULT CDataAsyncReader::Alignment(long *lplAlignment) { CheckPointer(lplAlignment,E_POINTER); *lplAlignment = m_lpDataStream->GetAlignment(); return S_OK; } //フラッシュ処理を開始する HRESULT CDataAsyncReader::BeginFlush(void) { CDataRequestItem item; { CLockObject lock(this); m_bIsFlushing = TRUE; while(!m_dequeWorkItem.empty()){ GetWorkItem(item); item.CancelRequest(); PutDoneItem(item); } if(m_nItemExecCount > 0) m_bIsWaitingWorkEnd = TRUE; else{ m_vEventSetWorkDone.Set(); return S_OK; } } while(1){ m_vEventThreadWorkEnd.Wait(INFINITE); { CLockObject lock(this); if(m_nItemExecCount == 0){ m_bIsWaitingWorkEnd = FALSE; m_vEventSetWorkDone.Set(); break; } } } return S_OK; } //フラッシュ処理を終了する HRESULT CDataAsyncReader::EndFlush(void) { m_bIsFlushing = FALSE; if(!m_dequeDoneItem.empty()) m_vEventSetWorkDone.Set(); else m_vEventSetWorkDone.Reset(); return S_OK; } //アライメントがあっているかどうか BOOL CDataAsyncReader::IsAligned(LONG_PTR lSize) { return (lSize & (m_lpDataStream->GetAlignment() - 1)) == 0; } #ifndef _WIN64 //アライメントがあっているかどうか(LONGLONGバージョン) BOOL CDataAsyncReader::IsAligned(LONGLONG llSize) { return IsAligned((LONG_PTR)llSize); } #endif //_WIN64
基本的には作業スレッドに読み取り処理を渡しているだけです。
なお、同期読み取り要求に関してもこちらで読み取りを書くのが面倒なので一時的に状態保持パケットを生成してそちらに命令を投げているのがまた。
処理完了待ちについては完了するまで待機してアイテムを返すだけですし、フラッシュ処理についてはフラッシュフラグを入れてキューからアイテムを破棄する動作を行います。
非同期処理本体
unsigned int __stdcall CDataAsyncReader::StartAsyncReadThread(void *lpContext) { unsigned long ulRet; ulRet = ((CDataAsyncReader *)lpContext)->ThreadProc(); return (unsigned int)ulRet; } //スレッドルーチン unsigned long CDataAsyncReader::ThreadProc(void) { HANDLE ahWaitObject[2]; DWORD dwWaitRet; ahWaitObject[0] = m_vEventInvalidThread.GetHandle(); ahWaitObject[1] = m_vEventSetWorkItem.GetHandle(); while(1){ dwWaitRet = WaitForMultipleObjects(2,ahWaitObject,FALSE,INFINITE); if(dwWaitRet == WAIT_OBJECT_0 + 1) ProcessWork(); else break; } return 0; } BOOL CDataAsyncReader::ProcessWork(void) { CDataRequestItem item; { CLockObject lock(this); if(GetWorkItem(item) != S_OK) return TRUE; m_nItemExecCount++; } item.CompleteRequest(); { CLockObject lock(this); PutDoneItem(item); if(--m_nItemExecCount == 0 && m_bIsWaitingWorkEnd){ m_vEventThreadWorkEnd.Set(); } } return TRUE; } //作業待ちアイテムを設定する HRESULT CDataAsyncReader::PutWorkItem(const CDataRequestItem &item) { HRESULT hr; CLockObject lock(this); if(m_bIsFlushing) hr = VFW_E_WRONG_STATE; else{ m_dequeWorkItem.push_back(item); m_vEventSetWorkItem.Set(); EnableAsyncRead(); hr = S_OK; } return hr; } //作業終了アイテムを設定する HRESULT CDataAsyncReader::PutDoneItem(const CDataRequestItem &item) { CLockObject lock(this); m_dequeDoneItem.push_back(item); m_vEventSetWorkDone.Set(); return S_OK; } //作業待ちアイテムを取得する HRESULT CDataAsyncReader::GetWorkItem(CDataRequestItem &item) { CLockObject lock(this); if(m_dequeWorkItem.empty()) return E_FAIL; item = m_dequeWorkItem.front(); m_dequeWorkItem.pop_front(); if(m_dequeWorkItem.empty()) m_vEventSetWorkItem.Reset(); return S_OK; } //作業終了アイテムを取得する HRESULT CDataAsyncReader::GetDoneItem(CDataRequestItem &item) { CLockObject lock(this); if(m_dequeDoneItem.empty()) return E_FAIL; item = m_dequeDoneItem.front(); m_dequeDoneItem.pop_front(); if(m_dequeWorkItem.empty() && (!m_bIsFlushing || m_bIsWaitingWorkEnd)) m_vEventSetWorkDone.Reset(); return S_OK; }
作業用スレッドの典型的なパターンです。イベント発行を待ってアイテムを処理しています。
イベントについては二種類(何か作業が存在するorスレッドを終了する必要がある)で待機して・・・というものですね。
キュー処理部分はそれぞれPut~ItemとGet~Itemで管理してますし、キューの同期が必要なときは自身をロックしています。
なお、作業用スレッドはこのクラスの作成時ではなくはじめて非同期処理を要求されたときに作成されています。
あとはDirectShowのフィルタ実装で面倒なピンとフィルタ本体の実装
といっても、baseclassesを使う限りはCBasePinやらCBaseFilterやらが使えるので非常に楽なんですよね。
これが自分で実装、とか言うと実装しなければ行けないインターフェイスやらが多すぎていやになってきます。