na_mpeg2_decoder/demux/
mod.rs

1//! Minimal MPEG demux helpers.
2//!
3//! This module is intentionally small: it only extracts *video elementary stream*
4//! payload bytes (and optional PTS) from common MPEG containers.
5//!
6//! Supported:
7//! - Raw ES (start-code byte stream)
8//! - MPEG-TS (188-byte packets; video PID auto-sniffed from PES stream_id 0xE0..0xEF)
9//! - MPEG-PS (pack/system headers + PES; extracts video PES payload)
10
11#[derive(Clone, Copy, Debug, PartialEq, Eq)]
12pub enum StreamType {
13    MpegVideo,
14    MpegAudio,
15    Unknown,
16}
17
18impl Default for StreamType {
19    #[inline]
20    fn default() -> Self {
21        StreamType::Unknown
22    }
23}
24
25#[derive(Clone, Debug)]
26pub struct Packet {
27    pub stream_type: StreamType,
28    pub pts_90k: Option<i64>,
29    pub data: Vec<u8>,
30}
31
32#[derive(Clone, Copy, Debug, PartialEq, Eq)]
33enum ContainerKind {
34    Auto,
35    Es,
36    MpegTs,
37    MpegPs,
38}
39
40impl Default for ContainerKind {
41    #[inline]
42    fn default() -> Self {
43        ContainerKind::Auto
44    }
45}
46
47#[derive(Debug, Default)]
48pub struct Demuxer {
49    kind: ContainerKind,
50    stream_type: StreamType,
51
52    buf: Vec<u8>,
53
54    // TS state
55    ts_video_pid: Option<u16>,
56    ts_audio_pid: Option<u16>,
57}
58
59impl Demuxer {
60    /// Create an auto-detecting demuxer.
61    pub fn new_auto() -> Self {
62        Self { kind: ContainerKind::Auto, stream_type: StreamType::MpegVideo, buf: Vec::new(), ts_video_pid: None, ts_audio_pid: None }
63    }
64
65    /// Create a demuxer with explicit container kind.
66    pub fn new(kind: StreamType) -> Self {
67        Self { kind: ContainerKind::Es, stream_type: kind, buf: Vec::new(), ts_video_pid: None, ts_audio_pid: None }
68    }
69
70    /// Feed bytes and return extracted video ES chunks.
71    pub fn push(&mut self, data: &[u8], pts_90k: Option<i64>) -> Vec<Packet> {
72        let mut out = Vec::new();
73        self.push_into(data, pts_90k, &mut out);
74        out
75    }
76
77    /// Feed bytes and append extracted video ES chunks into `out`.
78    ///
79    /// This is the preferred API for memory-sensitive callers because it allows
80    /// reusing `out` capacity across calls.
81    pub fn push_into(&mut self, data: &[u8], pts_90k: Option<i64>, out: &mut Vec<Packet>) {
82        self.buf.extend_from_slice(data);
83
84        if self.kind == ContainerKind::Auto {
85            self.kind = detect_kind(&self.buf);
86        }
87
88        match self.kind {
89            ContainerKind::Es => {
90                if !self.buf.is_empty() {
91                    out.push(Packet { stream_type: self.stream_type, pts_90k, data: std::mem::take(&mut self.buf) });
92                }
93            }
94            ContainerKind::MpegTs => self.push_ts_into(out),
95            ContainerKind::MpegPs => self.push_ps_into(out),
96            ContainerKind::Auto => {}
97        }
98    }
99
100    fn push_ts_into(&mut self, out: &mut Vec<Packet>) {
101        const TS_SIZE: usize = 188;
102
103        // Try to resync to TS packet boundary.
104        while self.buf.len() >= TS_SIZE {
105            if self.buf[0] != 0x47 {
106                if let Some(pos) = self.buf.iter().position(|&b| b == 0x47) {
107                    self.buf.drain(0..pos);
108                } else {
109                    self.buf.clear();
110                    break;
111                }
112                if self.buf.len() < TS_SIZE {
113                    break;
114                }
115            }
116
117            // Parse directly from the buffered TS packet without allocating.
118            // Important: always consume exactly one TS packet per iteration.
119            if self.buf[0] != 0x47 {
120                self.buf.drain(0..1);
121                continue;
122            }
123
124            let pkt = &self.buf[..TS_SIZE];
125
126            let pusi = (pkt[1] & 0x40) != 0;
127            let pid: u16 = (((pkt[1] & 0x1F) as u16) << 8) | (pkt[2] as u16);
128            let afc = (pkt[3] >> 4) & 0x3;
129
130            let mut idx = 4usize;
131            if afc == 2 || afc == 3 {
132                if idx >= TS_SIZE {
133                    self.buf.drain(0..TS_SIZE);
134                    continue;
135                }
136                let afl = pkt[idx] as usize;
137                idx += 1 + afl;
138                if idx > TS_SIZE {
139                    self.buf.drain(0..TS_SIZE);
140                    continue;
141                }
142            }
143
144            if afc == 0 || afc == 2 {
145                self.buf.drain(0..TS_SIZE);
146                continue; // no payload
147            }
148            if idx >= TS_SIZE {
149                self.buf.drain(0..TS_SIZE);
150                continue;
151            }
152            let payload = &pkt[idx..TS_SIZE];
153            if payload.is_empty() {
154                self.buf.drain(0..TS_SIZE);
155                continue;
156            }
157
158            // Auto-sniff video/audio PID from PES headers.
159            if pusi && (self.ts_video_pid.is_none() || self.ts_audio_pid.is_none()) {
160                if payload.len() >= 4 && payload[0] == 0 && payload[1] == 0 && payload[2] == 1 {
161                    let sid = payload[3];
162                    if self.ts_video_pid.is_none() && (0xE0..=0xEF).contains(&sid) {
163                        self.ts_video_pid = Some(pid);
164                    }
165                    if self.ts_audio_pid.is_none() && (0xC0..=0xDF).contains(&sid) {
166                        self.ts_audio_pid = Some(pid);
167                    }
168                }
169            }
170
171            let mut st: Option<StreamType> = None;
172            if let Some(vpid) = self.ts_video_pid {
173                if pid == vpid {
174                    st = Some(StreamType::MpegVideo);
175                }
176            }
177            if let Some(apid) = self.ts_audio_pid {
178                if pid == apid {
179                    st = Some(StreamType::MpegAudio);
180                }
181            }
182            let Some(stream_type) = st else {
183                self.buf.drain(0..TS_SIZE);
184                continue;
185            };
186
187            if pusi {
188                // PES start.
189                if let Some((pts, off)) = parse_pes_header(payload) {
190                    if off <= payload.len() {
191                        let es = &payload[off..];
192                        if !es.is_empty() {
193                            out.push(Packet { stream_type, pts_90k: pts, data: es.to_vec() });
194                        }
195                    }
196                } else {
197                    // No PES header; forward payload.
198                    out.push(Packet { stream_type, pts_90k: None, data: payload.to_vec() });
199                }
200            } else {
201                // PES continuation: payload is pure ES bytes.
202                out.push(Packet { stream_type, pts_90k: None, data: payload.to_vec() });
203            }
204
205            // Drop processed TS packet bytes.
206            self.buf.drain(0..TS_SIZE);
207        }
208    }
209
210    fn push_ps_into(&mut self, out: &mut Vec<Packet>) {
211
212        // Scan for PES start codes; keep the last partial chunk.
213        let mut pos = 0usize;
214        while let Some((sc_pos, sid)) = find_start_code(&self.buf, pos) {
215            if sc_pos + 4 > self.buf.len() {
216                break;
217            }
218            let stream_type = if (0xE0..=0xEF).contains(&sid) {
219                StreamType::MpegVideo
220            } else if (0xC0..=0xDF).contains(&sid) {
221                StreamType::MpegAudio
222            } else {
223                pos = sc_pos + 4;
224                continue;
225            };
226            if sc_pos + 6 > self.buf.len() {
227                break;
228            }
229
230            let pes_len = u16::from_be_bytes([self.buf[sc_pos + 4], self.buf[sc_pos + 5]]) as usize;
231            let pes_end = if pes_len != 0 {
232                sc_pos + 6 + pes_len
233            } else {
234                // Unbounded PES: end at the next *system-layer* start code.
235                // Important: video ES itself contains 00 00 01 xx start codes
236                // (e.g., 0x00, 0xB3, 0xB5, 0x01..0xAF). We must not cut on those.
237                let mut search = sc_pos + 6;
238                let mut end_opt: Option<usize> = None;
239                while let Some((next_sc, next_id)) = find_start_code(&self.buf, search) {
240                    // System / PES start codes are >= 0xB9 in program streams.
241                    if next_id >= 0xB9 {
242                        end_opt = Some(next_sc);
243                        break;
244                    }
245                    search = next_sc + 4;
246                }
247                let Some(end_pos) = end_opt else {
248                    break;
249                };
250                end_pos
251            };
252            if pes_end > self.buf.len() {
253                break;
254            }
255
256            let pes = &self.buf[sc_pos..pes_end];
257            if let Some((pts, off)) = parse_pes_header(pes) {
258                if off < pes.len() {
259                    out.push(Packet { stream_type, pts_90k: pts, data: pes[off..].to_vec() });
260                }
261            } else {
262                // Could not parse; forward raw PES bytes after start code.
263                out.push(Packet { stream_type, pts_90k: None, data: pes[4..].to_vec() });
264            }
265
266            pos = pes_end;
267        }
268
269        // Keep tail for next push.
270        if pos > 0 {
271            self.buf.drain(0..pos);
272        }
273    }
274}
275
276fn detect_kind(buf: &[u8]) -> ContainerKind {
277    // TS: sync byte 0x47 with 188-byte periodicity.
278    if buf.len() >= 188 * 3 {
279        if buf[0] == 0x47 && buf[188] == 0x47 && buf[376] == 0x47 {
280            return ContainerKind::MpegTs;
281        }
282    }
283    // PS: pack start code 00 00 01 BA.
284    if buf.windows(4).take(4096).any(|w| w == [0x00, 0x00, 0x01, 0xBA]) {
285        return ContainerKind::MpegPs;
286    }
287    ContainerKind::Es
288}
289
290fn find_start_code(buf: &[u8], from: usize) -> Option<(usize, u8)> {
291    if buf.len() < 4 {
292        return None;
293    }
294    let mut i = from;
295    while i + 3 < buf.len() {
296        if buf[i] == 0 && buf[i + 1] == 0 && buf[i + 2] == 1 {
297            return Some((i, buf[i + 3]));
298        }
299        i += 1;
300    }
301    None
302}
303
304/// Parse PES header and return (PTS, payload_offset).
305/// The returned offset is relative to the provided `buf`.
306fn parse_pes_header(buf: &[u8]) -> Option<(Option<i64>, usize)> {
307    if buf.len() < 9 {
308        return None;
309    }
310    if !(buf[0] == 0 && buf[1] == 0 && buf[2] == 1) {
311        return None;
312    }
313    let _sid = buf[3];
314    // buf[4..6] is PES_packet_length.
315
316    // Prefer MPEG-2 PES header syntax: '10' in buf[6] bits 7..6.
317    if (buf[6] & 0xC0) == 0x80 {
318        let flags = buf[7];
319        let hdr_len = buf[8] as usize;
320        let hdr_start = 9usize;
321        let payload_off = hdr_start + hdr_len;
322        if payload_off > buf.len() {
323            return None;
324        }
325        let pts_dts = (flags >> 6) & 0x3;
326        let mut pts: Option<i64> = None;
327        if (pts_dts == 2 || pts_dts == 3) && hdr_len >= 5 && hdr_start + 5 <= buf.len() {
328            pts = Some(parse_pts_90k(&buf[hdr_start..hdr_start + 5]));
329        }
330        return Some((pts, payload_off));
331    }
332
333    // MPEG-1 PES: skip stuffing and parse optional PTS.
334    // Reference: ISO/IEC 11172-1.
335    let mut idx = 6usize;
336    while idx < buf.len() && buf[idx] == 0xFF {
337        idx += 1;
338    }
339    if idx + 1 < buf.len() && (buf[idx] & 0xC0) == 0x40 {
340        idx += 2; // STD_buffer_scale/size
341    }
342    if idx >= buf.len() {
343        return None;
344    }
345    let mut pts: Option<i64> = None;
346    if (buf[idx] & 0xF0) == 0x20 {
347        // PTS only
348        if idx + 5 <= buf.len() {
349            pts = Some(parse_pts_90k(&buf[idx..idx + 5]));
350            idx += 5;
351        }
352    } else if (buf[idx] & 0xF0) == 0x30 {
353        // PTS + DTS, ignore DTS
354        if idx + 10 <= buf.len() {
355            pts = Some(parse_pts_90k(&buf[idx..idx + 5]));
356            idx += 10;
357        }
358    } else if buf[idx] == 0x0F {
359        idx += 1; // no pts
360    }
361    Some((pts, idx))
362}
363
364fn parse_pts_90k(p: &[u8]) -> i64 {
365    // p must be 5 bytes.
366    if p.len() < 5 {
367        return 0;
368    }
369    let pts = (((p[0] & 0x0E) as i64) << 29)
370        | ((p[1] as i64) << 22)
371        | (((p[2] & 0xFE) as i64) << 14)
372        | ((p[3] as i64) << 7)
373        | (((p[4] & 0xFE) as i64) >> 1);
374    pts
375}