scuffle_rtmp/protocol_control_messages/
writer.rs

1//! Writing protocol control messages.
2
3use std::io;
4
5use byteorder::{BigEndian, WriteBytesExt};
6use bytes::Bytes;
7
8use super::{
9    ProtocolControlMessageAcknowledgement, ProtocolControlMessageSetChunkSize, ProtocolControlMessageSetPeerBandwidth,
10    ProtocolControlMessageWindowAcknowledgementSize,
11};
12use crate::chunk::Chunk;
13use crate::chunk::writer::ChunkWriter;
14use crate::messages::MessageType;
15
16impl ProtocolControlMessageSetChunkSize {
17    /// Writes the [`ProtocolControlMessageSetChunkSize`] to the given writer.
18    pub fn write(&self, io: &mut impl io::Write, writer: &ChunkWriter) -> Result<(), crate::error::RtmpError> {
19        // According to spec the first bit must be 0.
20        let chunk_size = self.chunk_size & 0x7FFFFFFF; // 31 bits only
21
22        writer.write_chunk(
23            io,
24            Chunk::new(
25                2, // chunk stream must be 2
26                0, // timestamps are ignored
27                MessageType::SetChunkSize,
28                0, // message stream id is ignored
29                Bytes::from(chunk_size.to_be_bytes().to_vec()),
30            ),
31        )?;
32
33        Ok(())
34    }
35}
36
37impl ProtocolControlMessageAcknowledgement {
38    /// Writes the [`ProtocolControlMessageAcknowledgement`] to the given writer.
39    pub fn write(&self, io: &mut impl io::Write, writer: &ChunkWriter) -> Result<(), crate::error::RtmpError> {
40        writer.write_chunk(
41            io,
42            Chunk::new(
43                2, // chunk stream must be 2
44                0, // timestamps are ignored
45                MessageType::Acknowledgement,
46                0, // message stream id is ignored
47                Bytes::from(self.sequence_number.to_be_bytes().to_vec()),
48            ),
49        )?;
50
51        Ok(())
52    }
53}
54
55impl ProtocolControlMessageWindowAcknowledgementSize {
56    /// Writes the [`ProtocolControlMessageWindowAcknowledgementSize`] to the given writer.
57    pub fn write(&self, io: &mut impl io::Write, writer: &ChunkWriter) -> Result<(), crate::error::RtmpError> {
58        writer.write_chunk(
59            io,
60            Chunk::new(
61                2, // chunk stream must be 2
62                0, // timestamps are ignored
63                MessageType::WindowAcknowledgementSize,
64                0, // message stream id is ignored
65                Bytes::from(self.acknowledgement_window_size.to_be_bytes().to_vec()),
66            ),
67        )?;
68
69        Ok(())
70    }
71}
72
73impl ProtocolControlMessageSetPeerBandwidth {
74    /// Writes the [`ProtocolControlMessageSetPeerBandwidth`] to the given writer.
75    pub fn write(&self, io: &mut impl io::Write, writer: &ChunkWriter) -> Result<(), crate::error::RtmpError> {
76        let mut data = Vec::new();
77        data.write_u32::<BigEndian>(self.acknowledgement_window_size)
78            .expect("Failed to write window size");
79        data.write_u8(self.limit_type as u8).expect("Failed to write limit type");
80
81        writer.write_chunk(
82            io,
83            Chunk::new(
84                2, // chunk stream must be 2
85                0, // timestamps are ignored
86                MessageType::SetPeerBandwidth,
87                0, // message stream id is ignored
88                Bytes::from(data),
89            ),
90        )?;
91
92        Ok(())
93    }
94}
95
96#[cfg(test)]
97#[cfg_attr(all(test, coverage_nightly), coverage(off))]
98mod tests {
99    use bytes::{BufMut, BytesMut};
100
101    use super::*;
102    use crate::chunk::reader::ChunkReader;
103    use crate::protocol_control_messages::ProtocolControlMessageSetPeerBandwidthLimitType;
104
105    #[test]
106    fn write_set_chunk_size() {
107        let writer = ChunkWriter::default();
108        let mut buf = BytesMut::new();
109
110        ProtocolControlMessageSetChunkSize { chunk_size: 1 }
111            .write(&mut (&mut buf).writer(), &writer)
112            .unwrap();
113
114        let mut reader = ChunkReader::default();
115
116        let chunk = reader.read_chunk(&mut buf).expect("read chunk").expect("chunk");
117        assert_eq!(chunk.basic_header.chunk_stream_id, 0x02);
118        assert_eq!(chunk.message_header.msg_type_id.0, 0x01);
119        assert_eq!(chunk.message_header.msg_stream_id, 0);
120        assert_eq!(chunk.payload, vec![0x00, 0x00, 0x00, 0x01]);
121    }
122
123    #[test]
124    fn write_acknowledgement() {
125        let writer = ChunkWriter::default();
126        let mut buf = BytesMut::new();
127
128        ProtocolControlMessageAcknowledgement { sequence_number: 1 }
129            .write(&mut (&mut buf).writer(), &writer)
130            .unwrap();
131
132        let mut reader = ChunkReader::default();
133
134        let chunk = reader.read_chunk(&mut buf).expect("read chunk").expect("chunk");
135        assert_eq!(chunk.basic_header.chunk_stream_id, 0x02);
136        assert_eq!(chunk.message_header.msg_type_id.0, 0x03);
137        assert_eq!(chunk.message_header.msg_stream_id, 0);
138        assert_eq!(chunk.payload, vec![0x00, 0x00, 0x00, 0x01]);
139    }
140
141    #[test]
142    fn window_acknowledgement_size() {
143        let writer = ChunkWriter::default();
144        let mut buf = BytesMut::new();
145
146        ProtocolControlMessageWindowAcknowledgementSize {
147            acknowledgement_window_size: 1,
148        }
149        .write(&mut (&mut buf).writer(), &writer)
150        .unwrap();
151
152        let mut reader = ChunkReader::default();
153
154        let chunk = reader.read_chunk(&mut buf).expect("read chunk").expect("chunk");
155        assert_eq!(chunk.basic_header.chunk_stream_id, 0x02);
156        assert_eq!(chunk.message_header.msg_type_id.0, 0x05);
157        assert_eq!(chunk.message_header.msg_stream_id, 0);
158        assert_eq!(chunk.payload, vec![0x00, 0x00, 0x00, 0x01]);
159    }
160
161    #[test]
162    fn set_peer_bandwidth() {
163        let writer = ChunkWriter::default();
164        let mut buf = BytesMut::new();
165
166        ProtocolControlMessageSetPeerBandwidth {
167            acknowledgement_window_size: 1,
168            limit_type: ProtocolControlMessageSetPeerBandwidthLimitType::Dynamic,
169        }
170        .write(&mut (&mut buf).writer(), &writer)
171        .unwrap();
172
173        let mut reader = ChunkReader::default();
174
175        let chunk = reader.read_chunk(&mut buf).expect("read chunk").expect("chunk");
176        assert_eq!(chunk.basic_header.chunk_stream_id, 0x02);
177        assert_eq!(chunk.message_header.msg_type_id.0, 0x06);
178        assert_eq!(chunk.message_header.msg_stream_id, 0);
179        assert_eq!(chunk.payload, vec![0x00, 0x00, 0x00, 0x01, 0x02]);
180    }
181}