1use super::{MessageData, MessageType, UnknownMessage};
4use crate::chunk::Chunk;
5use crate::command_messages::Command;
6use crate::protocol_control_messages::{
7 ProtocolControlMessageSetChunkSize, ProtocolControlMessageWindowAcknowledgementSize,
8};
9
10impl MessageData<'_> {
11 pub fn read(chunk: &Chunk) -> Result<Self, crate::error::RtmpError> {
13 match chunk.message_header.msg_type_id {
14 MessageType::SetChunkSize => {
16 let data = ProtocolControlMessageSetChunkSize::read(&chunk.payload)?;
17 Ok(Self::SetChunkSize(data))
18 }
19 MessageType::Abort => Ok(Self::Abort), MessageType::Acknowledgement => Ok(Self::Acknowledgement), MessageType::UserControlEvent => Ok(Self::UserControlEvent), MessageType::WindowAcknowledgementSize => {
23 let data = ProtocolControlMessageWindowAcknowledgementSize::read(&chunk.payload)?;
24 Ok(Self::SetAcknowledgementWindowSize(data))
25 }
26 MessageType::SetPeerBandwidth => Ok(Self::SetPeerBandwidth), MessageType::Audio => Ok(Self::AudioData {
29 data: chunk.payload.clone(),
30 }),
31 MessageType::Video => Ok(Self::VideoData {
32 data: chunk.payload.clone(),
33 }),
34 MessageType::DataAMF3 => Ok(Self::DataAmf3), MessageType::SharedObjAMF3 => Ok(Self::SharedObjAmf3), MessageType::CommandAMF3 => Ok(Self::CommandAmf3), MessageType::DataAMF0 => Ok(Self::DataAmf0 {
39 data: chunk.payload.clone(),
40 }),
41 MessageType::SharedObjAMF0 => Ok(Self::SharedObjAmf0), MessageType::CommandAMF0 => Ok(Self::Amf0Command(Command::read(chunk.payload.clone())?)),
43 MessageType::Aggregate => Ok(Self::Aggregate), msg_type_id => Ok(Self::Unknown(UnknownMessage {
45 msg_type_id,
46 data: chunk.payload.clone(),
47 })),
48 }
49 }
50}
51
52#[cfg(test)]
53#[cfg_attr(all(test, coverage_nightly), coverage(off))]
54mod tests {
55 use bytes::Bytes;
56 use scuffle_amf0::encoder::Amf0Encoder;
57 use scuffle_amf0::{Amf0Object, Amf0Value};
58
59 use super::*;
60 use crate::command_messages::CommandType;
61 use crate::command_messages::netconnection::NetConnectionCommand;
62
63 #[test]
64 fn test_parse_command() {
65 let mut buf = Vec::new();
66 let mut encoder = Amf0Encoder::new(&mut buf);
67
68 encoder.encode_string("connect").unwrap();
69 encoder.encode_number(1.0).unwrap();
70 let object: Amf0Object = [("app".into(), Amf0Value::String("testapp".into()))].into_iter().collect();
71 encoder.encode_object(&object).unwrap();
72
73 let amf_data = Bytes::from(buf);
74
75 let chunk = Chunk::new(0, 0, MessageType::CommandAMF0, 0, amf_data);
76
77 let message = MessageData::read(&chunk).expect("no errors");
78 match message {
79 MessageData::Amf0Command(command) => {
80 let Command {
81 transaction_id,
82 command_type,
83 } = command;
84 assert_eq!(transaction_id, 1.0);
85
86 let CommandType::NetConnection(NetConnectionCommand::Connect(connect)) = command_type else {
87 panic!("wrong command");
88 };
89
90 assert_eq!(connect.app, "testapp");
91 }
92 _ => unreachable!("wrong message type"),
93 }
94 }
95
96 #[test]
97 fn test_parse_audio_packet() {
98 let chunk = Chunk::new(0, 0, MessageType::Audio, 0, vec![0x00, 0x00, 0x00, 0x00].into());
99
100 let message = MessageData::read(&chunk).expect("no errors");
101 match message {
102 MessageData::AudioData { data } => {
103 assert_eq!(data, vec![0x00, 0x00, 0x00, 0x00]);
104 }
105 _ => unreachable!("wrong message type"),
106 }
107 }
108
109 #[test]
110 fn test_parse_video_packet() {
111 let chunk = Chunk::new(0, 0, MessageType::Video, 0, vec![0x00, 0x00, 0x00, 0x00].into());
112
113 let message = MessageData::read(&chunk).expect("no errors");
114 match message {
115 MessageData::VideoData { data } => {
116 assert_eq!(data, vec![0x00, 0x00, 0x00, 0x00]);
117 }
118 _ => unreachable!("wrong message type"),
119 }
120 }
121
122 #[test]
123 fn test_parse_set_chunk_size() {
124 let chunk = Chunk::new(0, 0, MessageType::SetChunkSize, 0, vec![0x00, 0xFF, 0xFF, 0xFF].into());
125
126 let message = MessageData::read(&chunk).expect("no errors");
127 match message {
128 MessageData::SetChunkSize(ProtocolControlMessageSetChunkSize { chunk_size }) => {
129 assert_eq!(chunk_size, 0x00FFFFFF);
130 }
131 _ => unreachable!("wrong message type"),
132 }
133 }
134
135 #[test]
136 fn test_parse_window_acknowledgement_size() {
137 let chunk = Chunk::new(
138 0,
139 0,
140 MessageType::WindowAcknowledgementSize,
141 0,
142 vec![0x00, 0xFF, 0xFF, 0xFF].into(),
143 );
144
145 let message = MessageData::read(&chunk).expect("no errors");
146 match message {
147 MessageData::SetAcknowledgementWindowSize(ProtocolControlMessageWindowAcknowledgementSize {
148 acknowledgement_window_size,
149 }) => {
150 assert_eq!(acknowledgement_window_size, 0x00FFFFFF);
151 }
152 _ => unreachable!("wrong message type"),
153 }
154 }
155
156 #[test]
157 fn test_parse_metadata() {
158 let mut buf = Vec::new();
159
160 let mut encoder = Amf0Encoder::new(&mut buf);
161 encoder.encode_string("onMetaData").unwrap();
162 let object: Amf0Object = [("duration".into(), Amf0Value::Number(0.0))].into_iter().collect();
163 encoder.encode_object(&object).unwrap();
164
165 let amf_data = Bytes::from(buf);
166 let chunk = Chunk::new(0, 0, MessageType::DataAMF0, 0, amf_data.clone());
167
168 let message = MessageData::read(&chunk).expect("no errors");
169 match message {
170 MessageData::DataAmf0 { data } => {
171 assert_eq!(data, amf_data);
172 }
173 _ => unreachable!("wrong message type"),
174 }
175 }
176
177 #[test]
178 fn test_unsupported_message_type() {
179 let chunk = Chunk::new(0, 0, MessageType(42), 0, vec![0x00, 0x00, 0x00, 0x00].into());
180
181 assert!(matches!(
182 MessageData::read(&chunk).expect("no errors"),
183 MessageData::Unknown(UnknownMessage {
184 msg_type_id: MessageType(42),
185 ..
186 })
187 ));
188 }
189}