1use std::collections::VecDeque;
2use std::sync::Arc;
3
4use crate::audio::{MpaAudioChunk, MpaAudioDecoder};
5use crate::convert::frame_to_rgba_bt601_limited;
6use crate::demux::{Demuxer, Packet, StreamType};
7use crate::error::Result;
8use crate::video::{Decoder as VideoDecoder, Frame};
9
10#[derive(Clone)]
11pub struct MpegRgbaFrame {
12 pub pts_ms: i64,
13 pub width: u32,
14 pub height: u32,
15 pub rgba: Vec<u8>,
16}
17
18#[derive(Clone)]
19pub struct MpegAudioF32 {
20 pub pts_ms: i64,
21 pub sample_rate: u32,
22 pub channels: u16,
23 pub samples: Vec<f32>,
24}
25
26#[derive(Clone)]
27pub enum MpegAvEvent {
28 Video(MpegRgbaFrame),
29 Audio(MpegAudioF32),
30}
31
32#[derive(Default)]
33pub struct MpegAvPipeline {
34 demux: Demuxer,
35 vdec: VideoDecoder,
36 adec: MpaAudioDecoder,
37
38 pkts: Vec<Packet>,
39 pub stash: VecDeque<MpegAvEvent>,
40}
41
42impl MpegAvPipeline {
43 pub fn new() -> Self {
44 Self { demux: Demuxer::new_auto(), vdec: VideoDecoder::new(), adec: MpaAudioDecoder::new(), pkts: Vec::new(), stash: VecDeque::new() }
45 }
46
47 #[inline]
48 pub fn demuxer_mut(&mut self) -> &mut Demuxer {
49 &mut self.demux
50 }
51
52 #[inline]
53 pub fn video_decoder_mut(&mut self) -> &mut VideoDecoder {
54 &mut self.vdec
55 }
56
57 #[inline]
58 pub fn audio_decoder_mut(&mut self) -> &mut MpaAudioDecoder {
59 &mut self.adec
60 }
61
62 pub fn push_with<F>(&mut self, data: &[u8], pts_90k: Option<i64>, mut on_event: F) -> Result<()>
63 where
64 F: FnMut(MpegAvEvent),
65 {
66 self.pkts.clear();
67 self.demux.push_into(data, pts_90k, &mut self.pkts);
68
69 let mut local_pkts: Vec<Packet> = Vec::new();
71 std::mem::swap(&mut self.pkts, &mut local_pkts);
72
73 for pkt in local_pkts.drain(..) {
74 match pkt.stream_type {
75 StreamType::MpegVideo => self.handle_video_pkt(&pkt, &mut on_event)?,
76 StreamType::MpegAudio => self.handle_audio_pkt(&pkt, &mut on_event)?,
77 StreamType::Unknown => {}
78 }
79 }
80
81 std::mem::swap(&mut self.pkts, &mut local_pkts);
82 self.pkts.clear();
83
84 Ok(())
85 }
86
87 pub fn push(&mut self, data: &[u8], pts_90k: Option<i64>) -> Result<()> {
88 let mut tmp: Vec<MpegAvEvent> = Vec::new();
89 self.push_with(data, pts_90k, |ev| tmp.push(ev))?;
90 for ev in tmp {
91 self.stash.push_back(ev);
92 }
93 Ok(())
94 }
95
96 pub fn flush_with<F>(&mut self, mut on_event: F) -> Result<()>
97 where
98 F: FnMut(MpegAvEvent),
99 {
100 for f in self.vdec.flush_shared()? {
102 self.emit_video_frame(f, &mut on_event)?;
103 }
104 Ok(())
105 }
106
107 pub fn flush(&mut self) -> Result<()> {
108 let mut tmp: Vec<MpegAvEvent> = Vec::new();
109 self.flush_with(|ev| tmp.push(ev))?;
110 for ev in tmp {
111 self.stash.push_back(ev);
112 }
113 Ok(())
114 }
115
116 fn handle_video_pkt<F>(&mut self, pkt: &Packet, on_event: &mut F) -> Result<()>
117 where
118 F: FnMut(MpegAvEvent),
119 {
120 let decoded: Vec<Arc<Frame>> = self.vdec.decode_shared(&pkt.data, pkt.pts_90k)?;
121 for f in decoded {
122 self.emit_video_frame(f, on_event)?;
123 }
124 Ok(())
125 }
126
127 fn emit_video_frame<F>(&mut self, f: Arc<Frame>, on_event: &mut F) -> Result<()>
128 where
129 F: FnMut(MpegAvEvent),
130 {
131 let w = f.width as u32;
132 let h = f.height as u32;
133 let mut rgba = vec![0u8; (w as usize) * (h as usize) * 4];
134 frame_to_rgba_bt601_limited(&f, &mut rgba);
135
136 let pts_ms = pts90k_opt_to_ms(f.pts_90k);
137 on_event(MpegAvEvent::Video(MpegRgbaFrame { pts_ms, width: w, height: h, rgba }));
138 Ok(())
139 }
140
141 fn handle_audio_pkt<F>(&mut self, pkt: &Packet, on_event: &mut F) -> Result<()>
142 where
143 F: FnMut(MpegAvEvent),
144 {
145 let pts_ms_opt = pkt.pts_90k.map(pts90k_to_ms);
146 self.adec.push_with(&pkt.data, pts_ms_opt, |ch: MpaAudioChunk| {
147 on_event(MpegAvEvent::Audio(MpegAudioF32 {
148 pts_ms: ch.pts_ms,
149 sample_rate: ch.sample_rate,
150 channels: ch.channels,
151 samples: ch.samples,
152 }))
153 })?;
154 Ok(())
155 }
156}
157
158#[inline]
159fn pts90k_to_ms(v: i64) -> i64 {
160 (v * 1000) / 90000
161}
162
163#[inline]
164fn pts90k_opt_to_ms(v: Option<i64>) -> i64 {
165 v.map(pts90k_to_ms).unwrap_or(0)
166}