| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
 100
 101
 102
 103
 104
 105
 
 | #pragma once#include <thread>
 #include <iostream>
 #include <mutex>
 #include <list>
 
 struct AVPacket;
 struct AVCodecParameters;
 struct AVRational;
 struct AVFrame;
 
 enum XLogLevel
 {
 XLOG_TYPE_DEBUG,
 XLOG_TYPE_INFO,
 XLOG_TYPE_ERROR,
 XLOG_TYPE_FATAL
 };
 #define LOG_MIN_LEVEL XLOG_TYPE_DEBUG
 #define XLOG(s, level) \
 if(level >= LOG_MIN_LEVEL) \
 std::cout << level << " : " << __FILE__ << " : " << __LINE__ << " : " << s << std::endl;
 
 #define LOGDEBUG(s) XLOG(s, XLOG_TYPE_DEBUG)
 #define LOGINFO(s) XLOG(s, XLOG_TYPE_INFO)
 #define LOGERROR(s) XLOG(s, XLOG_TYPE_ERROR)
 #define LOGFATAL(s) XLOG(s, XLOG_TYPE_FATAL)
 
 
 void MSleep(unsigned int ms);
 
 long long NowMs();
 
 void XFreeFrame(AVFrame **frame);
 
 void PrintErr(int err);
 
 class XThread
 {
 public:
 
 virtual void Start();
 
 virtual void Stop();
 
 virtual void Do(AVPacket *pkt) {};
 
 virtual void Next(AVPacket *pkt)
 {
 std::unique_lock<std::mutex> lock(m_);
 if (next_)
 next_->Do(pkt);
 }
 
 void set_next(XThread *xt)
 {
 std::unique_lock<std::mutex> lock(m_);
 next_ = xt;
 }
 
 protected:
 
 virtual void Main() = 0;
 
 bool is_exit_ = false;
 
 int index_ = 0;
 
 private:
 std::thread th_;
 std::mutex m_;
 XThread *next_ = nullptr;
 };
 
 class XTools
 {
 };
 
 
 class XPara
 {
 public:
 AVCodecParameters *para = nullptr;
 AVRational *time_base = nullptr;
 
 static XPara *Create();
 ~XPara();
 
 private:
 XPara();
 };
 
 
 class XAVPacketList
 {
 public:
 AVPacket *Pop();
 void Push(AVPacket *pkt);
 
 private:
 std::list<AVPacket*> pkts_;
 int max_packets_ = 100;
 std::mutex mux_;
 };
 
 
 | 
参数说明:
普通成员函数
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 
 | enum XLogLevel
 {
 XLOG_TYPE_DEBUG,
 XLOG_TYPE_INFO,
 XLOG_TYPE_ERROR,
 XLOG_TYPE_FATAL
 };
 #define LOG_MIN_LEVEL XLOG_TYPE_DEBUG
 #define XLOG(s, level) \
 if(level >= LOG_MIN_LEVEL) \
 std::cout << level << " : " << __FILE__ << " : " << __LINE__ << " : " << s << std::endl;
 
 #define LOGDEBUG(s) XLOG(s, XLOG_TYPE_DEBUG)
 #define LOGINFO(s) XLOG(s, XLOG_TYPE_INFO)
 #define LOGERROR(s) XLOG(s, XLOG_TYPE_ERROR)
 #define LOGFATAL(s) XLOG(s, XLOG_TYPE_FATAL)
 
 
 void MSleep(unsigned int ms);
 
 
 long long NowMs();
 
 void XFreeFrame(AVFrame **frame);
 
 void PrintErr(int err);
 
 | 
XThread
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 
 | class XThread{
 public:
 
 virtual void Start();
 
 
 virtual void Stop();
 
 
 virtual void Do(AVPacket *pkt) {};
 
 
 virtual void Next(AVPacket *pkt)
 {
 std::unique_lock<std::mutex> lock(m_);
 if (next_)
 next_->Do(pkt);
 }
 
 
 void set_next(XThread *xt)
 {
 std::unique_lock<std::mutex> lock(m_);
 next_ = xt;
 }
 
 protected:
 
 
 virtual void Main() = 0;
 
 
 bool is_exit_ = false;
 
 
 int index_ = 0;
 
 
 private:
 std::thread th_;
 std::mutex m_;
 XThread *next_ = nullptr;
 };
 
 | 
XPara
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 
 | class XPara
 {
 public:
 AVCodecParameters *para = nullptr;
 AVRational *time_base = nullptr;
 
 
 static XPara *Create();
 ~XPara();
 
 private:
 
 
 
 XPara();
 };
 
 | 
XAVPacketList
作用:由于std::list线程不是安全的。所以做了用于管理一个线程安全的 AVPacket 列表。这个类允许在多个线程之间安全地添加和移除 AVPacket 对象。
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 
 | class XAVPacketList
 {
 public:
 
 AVPacket *Pop();
 
 
 
 void Push(AVPacket *pkt);
 
 private:
 std::list<AVPacket*> pkts_;
 int max_packets_ = 100;
 std::mutex mux_;
 };
 
 
 | 
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 
 | #include "xtools.h"#include <sstream>
 
 using namespace std;
 
 extern "C"
 {
 #include <libavformat/avformat.h>
 #include <libavcodec/avcodec.h>
 }
 
 
 void MSleep(unsigned int ms)
 {
 auto beg = clock();
 for (int i = 0; i < ms; i++)
 {
 this_thread::sleep_for(1ms);
 if ((clock() - beg) / (CLOCKS_PER_SEC / 1000) >= ms)
 break;
 }
 }
 
 
 long long NowMs()
 {
 return clock() / (CLOCKS_PER_SEC / 1000);
 }
 
 
 void XFreeFrame(AVFrame **frame)
 {
 if (!frame || !(*frame)) return;
 av_frame_free(frame);
 }
 
 
 void PrintErr(int err)
 {
 char buf[1024] = { 0 };
 av_strerror(err, buf, sizeof(buf) - 1);
 cerr << buf << endl;
 }
 
 
 void XThread::Start()
 {
 unique_lock<mutex> lock(m_);
 static int i = 0;
 i++;
 index_ = i;
 
 is_exit_ = false;
 
 th_ = thread(&XThread::Main, this);
 stringstream ss;
 ss << "XThread::Start()  " << index_;
 
 LOGINFO(ss.str());
 }
 
 
 void XThread::Stop()
 {
 stringstream ss;
 ss << "XThread::Stop() begin  " << index_;
 LOGINFO(ss.str());
 
 is_exit_ = true;
 if (th_.joinable())
 th_.join();
 
 ss.str("");
 ss << "XThread::Stop() end  " << index_;
 LOGINFO(ss.str());
 }
 
 
 XPara *XPara::Create()
 {
 return new XPara();
 }
 
 
 XPara::~XPara()
 {
 if (para)
 {
 avcodec_parameters_free(¶);
 }
 if (time_base)
 {
 delete time_base;
 time_base = nullptr;
 }
 }
 
 
 XPara::XPara()
 {
 para = avcodec_parameters_alloc();
 time_base = new AVRational();
 }
 
 
 AVPacket *XAVPacketList::Pop()
 {
 unique_lock<mutex> lock(mux_);
 if (pkts_.empty()) return nullptr;
 auto pkt = pkts_.front();
 pkts_.pop_front();
 return pkt;
 }
 
 
 void XAVPacketList::Push(AVPacket *pkt)
 {
 unique_lock<mutex> lock(mux_);
 auto p = av_packet_alloc();
 av_packet_ref(p, pkt);
 pkts_.push_back(p);
 
 if (pkts_.size() > max_packets_)
 {
 if (pkts_.front()->flags & AV_PKT_FLAG_KEY)
 {
 av_packet_free(&pkts_.front());
 pkts_.pop_front();
 return;
 }
 
 while (pkts_.empty())
 {
 if (pkts_.front()->flags & AV_PKT_FLAG_KEY)
 {
 return;
 }
 av_packet_free(&pkts_.front());
 pkts_.pop_front();
 }
 }
 }
 
 | 
普通成员函数
MSleep()
作用:使当前线程暂停执行一段时间(以毫秒为单位)。
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 
 | void MSleep(unsigned int ms){
 auto beg = clock();
 for (int i = 0; i < ms; i++)
 {
 this_thread::sleep_for(1ms);
 if ((clock() - beg) / (CLOCKS_PER_SEC / 1000) >= ms)
 break;
 }
 }
 
 | 
NowMs()
| 12
 3
 4
 5
 
 | long long NowMs()
 {
 return clock() / (CLOCKS_PER_SEC / 1000);
 }
 
 | 
XFreeFrame()
作用:释放AVFrame资源
| 12
 3
 4
 5
 
 | void XFreeFrame(AVFrame **frame){
 if (!frame || !(*frame)) return;
 av_frame_free(frame);
 }
 
 | 
PrintErr()
作用:打印错误信息。
| 12
 3
 4
 5
 6
 
 | void PrintErr(int err){
 char buf[1024] = { 0 };
 av_strerror(err, buf, sizeof(buf) - 1);
 cerr << buf << endl;
 }
 
 | 
XThread
责任链模式
类的成员和方法
- 公共成员函数:
- Start():启动线程。
- Stop():停止线程,设置退出标志并等待线程退出。
- Do(AVPacket *pkt):执行责任链传递的任务,虚函数,需在派生类中重载。
- Next(AVPacket *pkt):传递任务到责任链的下一个节点。
- set_next(XThread *xt):设置责任链的下一个节点,线程安全。
 
- 保护成员:
- Main():线程入口函数,纯虚函数,需要在派生类中实现。
- is_exit_:标志线程是否退出。
- index_:线程索引号,用于标识不同的线程。
 
- 私有成员:
- th_:- std::thread对象,用于实际的线程操作。
- m_:- std::mutex对象,用于保护对- next_成员的访问,确保线程安全。
- next_:指向责任链中的下一个节点。
 
【应用场景】:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 
 | bool XCameraWidget::Open(const char *url)
 {
 if (demux_)
 demux_->Stop();
 if (decode_)
 decode_->Stop();
 
 
 demux_ = new XDemuxTask();
 if (!demux_->Open(url))
 {
 return false;
 }
 
 
 decode_ = new XDecodeTask();
 
 ...
 
 
 demux_->set_next(decode_);
 
 ...
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 demux_->Start();
 decode_->Start();
 
 return true;
 }
 
 | 
Start()
作用:启动线程
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 
 | void XThread::Start(){
 unique_lock<mutex> lock(m_);
 
 
 
 
 static int i = 0;
 i++;
 index_ = i;
 
 
 
 is_exit_ = false;
 
 
 
 
 th_ = thread(&XThread::Main, this);
 stringstream ss;
 ss << "XThread::Start()  " << index_;
 
 LOGINFO(ss.str());
 }
 
 | 
Stop()
作用:停止线程(设置退出标志,等待线程退出)
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 
 | void XThread::Stop(){
 stringstream ss;
 ss << "XThread::Stop() begin  " << index_;
 LOGINFO(ss.str());
 
 
 
 is_exit_ = true;
 
 
 
 
 if (th_.joinable())
 th_.join();
 
 ss.str("");
 ss << "XThread::Stop() end  " << index_;
 LOGINFO(ss.str());
 }
 
 | 
XPara
Create()
作用:防止直接在栈上创建 XPara 对象。
| 12
 3
 4
 
 | XPara *XPara::Create(){
 return new XPara();
 }
 
 | 
XAVPacketList
Pop()
作用:返回List栈中的AVPacket
| 12
 3
 4
 5
 6
 7
 8
 
 | AVPacket *XAVPacketList::Pop(){
 unique_lock<mutex> lock(mux_);
 if (pkts_.empty()) return nullptr;
 auto pkt = pkts_.front();
 pkts_.pop_front();
 return pkt;
 }
 
 | 
Push()
作用:用于将 AVPacket 对象推入到一个线程安全的列表中,并在列表超出最大容量时进行清理。
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 
 | void XAVPacketList::Push(AVPacket *pkt){
 unique_lock<mutex> lock(mux_);
 
 auto p = av_packet_alloc();
 
 
 
 av_packet_ref(p, pkt);
 
 pkts_.push_back(p);
 
 
 
 
 
 
 if (pkts_.size() > max_packets_)
 {
 
 if (pkts_.front()->flags & AV_PKT_FLAG_KEY)
 {
 av_packet_free(&pkts_.front());
 pkts_.pop_front();
 return;
 }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 while (pkts_.empty())
 {
 if (pkts_.front()->flags & AV_PKT_FLAG_KEY)
 {
 return;
 }
 av_packet_free(&pkts_.front());
 pkts_.pop_front();
 }
 }
 }
 
 |