diff --git a/Cargo.lock b/Cargo.lock index 3828b29a..59a9f1f2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -38,7 +38,7 @@ dependencies = [ "actix-rt", "actix_derive", "bitflags", - "bytes 1.2.1", + "bytes 1.3.0", "crossbeam-channel", "futures-core", "futures-sink", @@ -81,7 +81,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "57a7559404a7f3573127aab53c08ce37a6c6a315c374a31070f3c91cd1b4a7fe" dependencies = [ "bitflags", - "bytes 1.2.1", + "bytes 1.3.0", "futures-core", "futures-sink", "log", @@ -118,7 +118,7 @@ dependencies = [ "actix-web", "askama_escape", "bitflags", - "bytes 1.2.1", + "bytes 1.3.0", "derive_more", "futures-core", "http-range", @@ -144,7 +144,7 @@ dependencies = [ "base64", "bitflags", "brotli", - "bytes 1.2.1", + "bytes 1.3.0", "bytestring", "derive_more", "encoding_rs", @@ -181,7 +181,7 @@ dependencies = [ "actix-utils", "awc", "base64", - "bytes 1.2.1", + "bytes 1.3.0", "futures-core", "http", "log", @@ -227,7 +227,7 @@ checksum = "c9edfb0e7663d7fe18c8d5b668c9c1bcf79176b1dcc9d4da9592503209a6bfb0" dependencies = [ "actix-utils", "actix-web", - "bytes 1.2.1", + "bytes 1.3.0", "derive_more", "futures-core", "httparse", @@ -415,7 +415,7 @@ dependencies = [ "actix-utils", "actix-web-codegen", "ahash 0.7.6", - "bytes 1.2.1", + "bytes 1.3.0", "bytestring", "cfg-if 1.0.0", "cookie 0.16.1", @@ -450,7 +450,7 @@ dependencies = [ "actix-codec", "actix-http", "actix-web", - "bytes 1.2.1", + "bytes 1.3.0", "bytestring", "futures-core", "pin-project-lite 0.2.9", @@ -496,7 +496,7 @@ dependencies = [ "ahash 0.8.0", "arc-swap", "async-trait", - "bytes 1.2.1", + "bytes 1.3.0", "bytestring", "csv", "derive_more", @@ -778,7 +778,7 @@ dependencies = [ "async-stream", "async-trait", "base64", - "bytes 1.2.1", + "bytes 1.3.0", "fast_chemail", "fnv", "futures-util", @@ -865,7 +865,7 @@ version = "4.0.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a941b499fead4a3fb5392cabf42446566d18c86313f69f2deab69560394d65f" dependencies = [ - "bytes 1.2.1", + "bytes 1.3.0", "indexmap", "serde", "serde_json", @@ -973,7 +973,7 @@ dependencies = [ "actix-utils", "ahash 0.7.6", "base64", - "bytes 1.2.1", + "bytes 1.3.0", "cfg-if 1.0.0", "cookie 0.16.1", "derive_more", @@ -1024,7 +1024,7 @@ dependencies = [ "aws-smithy-json", "aws-smithy-types", "aws-types", - "bytes 1.2.1", + "bytes 1.3.0", "hex", "http", "hyper 0.14.20", @@ -1059,7 +1059,7 @@ dependencies = [ "aws-smithy-http", "aws-smithy-types", "aws-types", - "bytes 1.2.1", + "bytes 1.3.0", "http", "http-body 0.4.5", "lazy_static", @@ -1087,7 +1087,7 @@ dependencies = [ "aws-smithy-types", "aws-smithy-xml", "aws-types", - "bytes 1.2.1", + "bytes 1.3.0", "bytes-utils", "http", "http-body 0.4.5", @@ -1112,7 +1112,7 @@ dependencies = [ "aws-smithy-json", "aws-smithy-types", "aws-types", - "bytes 1.2.1", + "bytes 1.3.0", "http", "tokio-stream", "tower", @@ -1135,7 +1135,7 @@ dependencies = [ "aws-smithy-types", "aws-smithy-xml", "aws-types", - "bytes 1.2.1", + "bytes 1.3.0", "http", "tower", ] @@ -1162,7 +1162,7 @@ checksum = "8d33790cecae42b999d197074c8a19e9b96b9e346284a6f93989e7489c9fa0f5" dependencies = [ "aws-smithy-eventstream", "aws-smithy-http", - "bytes 1.2.1", + "bytes 1.3.0", "form_urlencoded", "hex", "http", @@ -1194,7 +1194,7 @@ checksum = "4b402da39bc5aae618b70a9b8d828acad21fe4a3a73b82c0205b89db55d71ce8" dependencies = [ "aws-smithy-http", "aws-smithy-types", - "bytes 1.2.1", + "bytes 1.3.0", "crc32c", "crc32fast", "hex", @@ -1217,7 +1217,7 @@ dependencies = [ "aws-smithy-http", "aws-smithy-http-tower", "aws-smithy-types", - "bytes 1.2.1", + "bytes 1.3.0", "fastrand", "http", "http-body 0.4.5", @@ -1237,7 +1237,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "98c2a7b9490fd2bc7af3a1c486ae921102d7234d1fa5e7d91039068e7af48a01" dependencies = [ "aws-smithy-types", - "bytes 1.2.1", + "bytes 1.3.0", "crc32fast", ] @@ -1249,7 +1249,7 @@ checksum = "014a0ef5c4508fc2f6a9d3925c214725af19f020ea388db48e20196cc4cc9d6d" dependencies = [ "aws-smithy-eventstream", "aws-smithy-types", - "bytes 1.2.1", + "bytes 1.3.0", "bytes-utils", "futures-core", "http", @@ -1270,7 +1270,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "deecb478dc3cc40203e0e97ac0fb92947e0719754bbafd0026bdc49318e2fd03" dependencies = [ "aws-smithy-http", - "bytes 1.2.1", + "bytes 1.3.0", "http", "http-body 0.4.5", "pin-project-lite 0.2.9", @@ -1538,9 +1538,9 @@ checksum = "0e4cec68f03f32e44924783795810fa50a7035d8c8ebe78580ad7e6c703fba38" [[package]] name = "bytes" -version = "1.2.1" +version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec8a7b6a70fde80372154c65702f00a0f56f3e1c36abbc6c440484be248856db" +checksum = "dfb24e866b15a1af2a1b663f10c6b6b8f397a84aadb828f12e5b289ec23a3a3c" dependencies = [ "serde", ] @@ -1551,7 +1551,7 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e47d3a8076e283f3acd27400535992edb3ba4b5bb72f8891ad8fbe7932a7d4b9" dependencies = [ - "bytes 1.2.1", + "bytes 1.3.0", "either", ] @@ -1561,7 +1561,7 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "86b6a75fd3048808ef06af5cd79712be8111960adaf89d90250974b38fc3928a" dependencies = [ - "bytes 1.2.1", + "bytes 1.3.0", ] [[package]] @@ -1768,7 +1768,7 @@ version = "4.6.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "35ed6e9d84f0b51a7f52daf1c7d71dd136fd7a3f41a8462b8cdb8c78d920fad4" dependencies = [ - "bytes 1.2.1", + "bytes 1.3.0", "futures-core", "memchr", "pin-project-lite 0.2.9", @@ -2961,7 +2961,7 @@ version = "0.3.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5f9f29bc9dda355256b2916cf526ab02ce0aeaaaf2bad60d65ef3f12f11dd0f4" dependencies = [ - "bytes 1.2.1", + "bytes 1.3.0", "fnv", "futures-core", "futures-sink", @@ -3101,7 +3101,7 @@ version = "0.2.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "75f43d41e26995c17e71ee126451dd3941010b0514a81a9d11f3b341debc2399" dependencies = [ - "bytes 1.2.1", + "bytes 1.3.0", "fnv", "itoa 1.0.4", ] @@ -3122,7 +3122,7 @@ version = "0.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1" dependencies = [ - "bytes 1.2.1", + "bytes 1.3.0", "http", "pin-project-lite 0.2.9", ] @@ -3205,7 +3205,7 @@ version = "0.14.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "02c929dc5c39e335a03c405292728118860721b10190d98c2a0f0efd5baafbac" dependencies = [ - "bytes 1.2.1", + "bytes 1.3.0", "futures-channel", "futures-core", "futures-util", @@ -3259,7 +3259,7 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" dependencies = [ - "bytes 1.2.1", + "bytes 1.3.0", "hyper 0.14.20", "native-tls", "tokio 1.21.2", @@ -3563,7 +3563,7 @@ name = "jsonrpc-example" version = "1.0.0" dependencies = [ "actix-web", - "bytes 1.2.1", + "bytes 1.3.0", "env_logger", "futures-util", "log", @@ -4131,7 +4131,7 @@ version = "2.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6ed4198ce7a4cbd2a57af78d28c6fbb57d81ac5f1d6ad79ac6c5587419cbdf22" dependencies = [ - "bytes 1.2.1", + "bytes 1.3.0", "encoding_rs", "futures-util", "http", @@ -4180,7 +4180,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b9d8136c78f78cda5c1a4eee4ce555281b71e3e6db715817bc50e186e623b36f" dependencies = [ "bufstream", - "bytes 1.2.1", + "bytes 1.3.0", "crossbeam", "flate2", "io-enum", @@ -4211,7 +4211,7 @@ dependencies = [ "bitflags", "bitvec", "byteorder", - "bytes 1.2.1", + "bytes 1.3.0", "cc", "cmake", "crc32fast", @@ -4756,7 +4756,7 @@ checksum = "878c6cbf956e03af9aa8204b407b9cbf47c072164800aa918c516cd4b056c50c" dependencies = [ "base64", "byteorder", - "bytes 1.2.1", + "bytes 1.3.0", "fallible-iterator", "hmac", "md-5", @@ -4772,7 +4772,7 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73d946ec7d256b04dfadc4e6a3292324e6f417124750fc5c0950f981b703a0f1" dependencies = [ - "bytes 1.2.1", + "bytes 1.3.0", "fallible-iterator", "postgres-protocol", ] @@ -4864,7 +4864,7 @@ version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "399c3c31cdec40583bb68f0b18403400d01ec4289c383aa047560439952c4dd7" dependencies = [ - "bytes 1.2.1", + "bytes 1.3.0", "prost-derive", ] @@ -5043,7 +5043,7 @@ checksum = "513b3649f1a111c17954296e4a3b9eecb108b766c803e2b99f179ebe27005985" dependencies = [ "arc-swap", "async-trait", - "bytes 1.2.1", + "bytes 1.3.0", "combine 4.6.6", "futures", "futures-util", @@ -5062,7 +5062,7 @@ version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2777130e406c74c28b6cddc0194fcdc2553b5a8795eef9f6384bd3b70a07ba3f" dependencies = [ - "bytes 1.2.1", + "bytes 1.3.0", "futures-channel", "futures-sink", "futures-util", @@ -5169,7 +5169,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "431949c384f4e2ae07605ccaa56d1d9d2ecdb5cadd4f9577ccfab29f2e5149fc" dependencies = [ "base64", - "bytes 1.2.1", + "bytes 1.3.0", "encoding_rs", "futures-core", "futures-util", @@ -5974,7 +5974,7 @@ dependencies = [ "atoi", "bitflags", "byteorder", - "bytes 1.2.1", + "bytes 1.3.0", "crc", "crossbeam-queue", "dotenvy", @@ -6531,7 +6531,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a9e03c497dc955702ba729190dc4aac6f2a0ce97f913e5b1b5912fc5039d9099" dependencies = [ "autocfg", - "bytes 1.2.1", + "bytes 1.3.0", "libc", "memchr", "mio 0.8.5", @@ -6605,7 +6605,7 @@ checksum = "29a12c1b3e0704ae7dfc25562629798b29c72e6b1d0a681b6f29ab4ae5e7f7bf" dependencies = [ "async-trait", "byteorder", - "bytes 1.2.1", + "bytes 1.3.0", "fallible-iterator", "futures-channel", "futures-util", @@ -6684,7 +6684,7 @@ version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0bb2e075f03b3d66d8d8785356224ba688d2906a371015e225beeb65ca92c740" dependencies = [ - "bytes 1.2.1", + "bytes 1.3.0", "futures-core", "futures-sink", "pin-project-lite 0.2.9", @@ -7389,8 +7389,11 @@ name = "websocket-autobahn" version = "1.0.0" dependencies = [ "actix", + "actix-http", "actix-web", "actix-web-actors", + "bytes 1.3.0", + "bytestring", "env_logger", "log", ] @@ -7464,7 +7467,7 @@ dependencies = [ "actix-web", "actix-web-actors", "byteorder", - "bytes 1.2.1", + "bytes 1.3.0", "env_logger", "futures-util", "log", diff --git a/websockets/autobahn/Cargo.toml b/websockets/autobahn/Cargo.toml index 6524db3c..cbd36578 100644 --- a/websockets/autobahn/Cargo.toml +++ b/websockets/autobahn/Cargo.toml @@ -11,6 +11,9 @@ path = "src/main.rs" actix = "0.13" actix-web = "4" actix-web-actors = "4.1" +actix-http = "3" +bytes = "1.3.0" +bytestring = "1" env_logger = "0.9" log = "0.4" diff --git a/websockets/autobahn/README.md b/websockets/autobahn/README.md index 73843b96..05b386a8 100644 --- a/websockets/autobahn/README.md +++ b/websockets/autobahn/README.md @@ -11,11 +11,13 @@ cd websockets/autobahn cargo run ``` -### Running Autobahn Test Suite +### Running autobahn test suite Running the autobahn test suite is easiest using the docker image as explained on the [autobahn test suite repo](https://github.com/crossbario/autobahn-testsuite#using-the-testsuite-docker-image). -After starting the server, in the same directory, run the test suite in "fuzzing client" mode: +After starting the server, in the same directory, run the test suite in "fuzzing client" mode. + +#### Docker ```sh docker run -it --rm \ @@ -29,4 +31,30 @@ docker run -it --rm \ --mode fuzzingclient ``` -Results are written to the `reports/servers` directory for viewing. +#### Podman + +```sh +podman run -it --rm \ + -v "${PWD}/config":/config \ + -v "${PWD}/reports":/reports \ + --network host \ + --name autobahn \ + crossbario/autobahn-testsuite \ + wstest \ + --spec /config/fuzzingclient-podman.json \ + --mode fuzzingclient +``` + +If you run it with `selinux` enabled, then + +```sh +podman run -it --rm \ + -v "${PWD}/config":/config:z \ + -v "${PWD}/reports":/reports:z \ + --network host \ + --name autobahn \ + crossbario/autobahn-testsuite \ + wstest \ + --spec /config/fuzzingclient-podman.json \ + --mode fuzzingclient +``` diff --git a/websockets/autobahn/config/fuzzingclient-podman.json b/websockets/autobahn/config/fuzzingclient-podman.json new file mode 100644 index 00000000..bb50ac50 --- /dev/null +++ b/websockets/autobahn/config/fuzzingclient-podman.json @@ -0,0 +1,15 @@ +{ + "outdir": "./reports/servers", + "servers": [ + { + "agent": "actix-web-actors", + "url": "ws://127.0.0.1:9001" + } + ], + "cases": ["*"], + "exclude-cases": [ + "12.*", + "13.*" + ], + "exclude-agent-cases": {} +} diff --git a/websockets/autobahn/src/main.rs b/websockets/autobahn/src/main.rs index 670b4974..e73ee51d 100644 --- a/websockets/autobahn/src/main.rs +++ b/websockets/autobahn/src/main.rs @@ -1,36 +1,7 @@ -use actix::prelude::*; -use actix_web::{middleware, web, App, Error, HttpRequest, HttpResponse, HttpServer}; -use actix_web_actors::ws; +mod websocket; +mod utf8; -async fn ws_index(r: HttpRequest, stream: web::Payload) -> Result { - ws::start(AutobahnWebSocket::default(), &r, stream) -} - -#[derive(Debug, Clone, Default)] -struct AutobahnWebSocket; - -impl Actor for AutobahnWebSocket { - type Context = ws::WebsocketContext; -} - -impl StreamHandler> for AutobahnWebSocket { - fn handle(&mut self, msg: Result, ctx: &mut Self::Context) { - if let Ok(msg) = msg { - match msg { - ws::Message::Text(text) => ctx.text(text), - ws::Message::Binary(bin) => ctx.binary(bin), - ws::Message::Ping(bytes) => ctx.pong(&bytes), - ws::Message::Close(reason) => { - ctx.close(reason); - ctx.stop(); - } - _ => {} - } - } else { - ctx.stop(); - } - } -} +use actix_web::{middleware, web, App, HttpServer}; #[actix_web::main] async fn main() -> std::io::Result<()> { @@ -41,7 +12,7 @@ async fn main() -> std::io::Result<()> { HttpServer::new(|| { App::new() .wrap(middleware::Logger::default()) - .service(web::resource("/").route(web::get().to(ws_index))) + .service(web::resource("/").route(web::get().to(websocket::index))) }) .workers(2) .bind(("127.0.0.1", 9001))? diff --git a/websockets/autobahn/src/utf8/mod.rs b/websockets/autobahn/src/utf8/mod.rs new file mode 100644 index 00000000..ccc88216 --- /dev/null +++ b/websockets/autobahn/src/utf8/mod.rs @@ -0,0 +1,215 @@ +//! Module contains code related to handling utf8 codepoints split across multiple continuation frames +//! +//! Websocket standard allows sending continuation text frames which are not valid utf8 by themselves. +//! +//! Example: +//! > `♩` is `e2 99 a9` +//! > +//! > The first frame can end up with (e2) `0b11100010u8` which is a first byte of three byte utf8 sequence and the second +//! > continuation frame can start with (99) `0b10011001u8` followed by (a9) `0b10101001u8` which only after combining together +//! > will give the proper utf8 sequence +//! +//! What's more strict + +#[cfg(test)] +mod tests; + +use actix_http::ws::ProtocolError; + +use bytes::BufMut; +use bytes::Bytes; +use bytes::BytesMut; + +#[derive(Debug, PartialEq, Eq)] +pub struct ValidUtf8 { + pub valid: Bytes, + pub overflow: Option, +} + +const UTF8_START_2_BYTE_SEQ_MASK: u8 = 0b1110_0000u8; +const UFT8_START_3_BYTE_SEQ_MASK: u8 = 0b1111_0000u8; +const UTF8_START_4_BYTE_SEQ_MASK: u8 = 0b1111_1000u8; + +const UTF8_2_BYTE_SEQ: u8 = 0b11000000u8; +const UTF8_3_BYTE_SEQ: u8 = 0b11100000u8; +const UTF8_4_BYTE_SEQ: u8 = 0b11110000u8; + +const MAX_ASCII_VALUE: u8 = 0x7Fu8; +const MIN_CONTINUATION: u8 = 0x80u8; +const MAX_CONTINUATION: u8 = 0xBFu8; + +const ERROR_INVALID_UTF8_SEQUENCE_MESSAGE: &str = "invalid utf-8 sequence"; + +#[derive(Debug, Eq, PartialEq)] +pub enum ByteResult { + Continuation, + First(usize), + Ok, + Invalid, +} + +fn protocol_error(error: String, kind: std::io::ErrorKind) -> Result { + Err(ProtocolError::Io(std::io::Error::new(kind, error))) +} + +fn protocol_other_error(error: String) -> Result { + protocol_error(error, std::io::ErrorKind::Other) +} + +fn protocol_data_error(error: String) -> Result { + protocol_error(error, std::io::ErrorKind::InvalidData) +} + +/// This method rebuilds the code point up to the given point +/// +/// You can invoke this method only for the overflowed ("unfinished") code point. +/// As the consequence: +/// +/// 1. `data[0]` is always valid +/// 2. We don't need to check the last byte since it's not there +/// +/// +/// > From Unicode 13 spec +/// > +/// > | Code Points | First Byte | Second Byte | Third Byte | Fourth Byte | +/// > |:----------------------|:----------------|:----------------|:----------------|:----------------| +/// > | U+0000 ..= U+007f | `0x00 ..= 0x7f` | | | | +/// > | U+0080 ..= U+07FF | `0xC2 ..= 0xDF | `0x80 ..= 0xBF` | | | +/// > | U+0800 ..= U+0FFF | `0xE0` | `0xA0 ..= 0xBF` | `0x80 ..= 0xBF` | | +/// > | U+1000 ..= U+CFFF | `0xE1 ..= 0xEC | `0x80 ..= 0xBF` | `0x80 ..= 0xBF` | | +/// > | U+D000 ..= U+D7FF | `0xED` | `0x80 ..= 0x9F` | `0x80 ..= 0xBF` | | +/// > | U+E000 ..= U+FFFF | `0xEE ..= 0xEF` | `0x80 ..= 0xBF` | `0x80 ..= 0xBF` | | +/// > | U+10000 ..= U+3FFFF | `0xF0` | `0x90 ..= 0xBF` | `0x80 ..= 0xBF` | `0x80 ..= 0xBF` | +/// > | U+40000 ..= U+FFFFF | `0xF1 ..= 0xF3` | `0x80 ..= 0xBF` | `0x80 ..= 0xBF` | `0x80 ..= 0xBF` | +/// > | U+100000 ..= U+10FFFF | `0xF4` | `0x80 ..= 0x8F` | `0x80 ..= 0xBF` | `0x80 ..= 0xBF` | +fn check_overflow(data: &[u8], expected_size: usize) -> bool { + let len = data.len(); + + let raw_1 = data[0]; + if expected_size == 2 { + (0xC2u8..=0xDFu8).contains(&raw_1) + } else if expected_size == 3 { + let raw_2: u8 = if len == 2 { + data[1] + } else if raw_1 == 0xE0 { + 0xA0 + } else { + 0x80 + }; + + matches!((raw_1, raw_2), (0xE0, 0xA0..=0xBF) | (0xE1..=0xEC, 0x80..=0xBF) | (0xED, 0x80..=0x9F)) + } else { + let raw_2: u8 = if len >= 2 { + data[1] + } else if raw_1 == 0xF0 { + 0x90 + } else { + 0x80 + }; + let raw_3: u8 = if len == 3 { data[2] } else { 0x80 }; + + matches!((raw_1, raw_2, raw_3), + (0xF0, 0x90..=0xBF, 0x80..=0xBF) | + (0xF1..=0xF3, 0x80..=0xBF, 0x80..=0xBF) | + (0xf4, 0x80..=0x8F, 0x80..=0xBF) + ) + } +} + +fn check_byte(byte: u8) -> ByteResult { + if byte <= MAX_ASCII_VALUE { + ByteResult::Ok + } else if (MIN_CONTINUATION..=MAX_CONTINUATION).contains(&byte) { + ByteResult::Continuation + } else if byte & UTF8_START_2_BYTE_SEQ_MASK == UTF8_2_BYTE_SEQ { + ByteResult::First(2) + } else if byte & UFT8_START_3_BYTE_SEQ_MASK == UTF8_3_BYTE_SEQ { + ByteResult::First(3) + } else if byte & UTF8_START_4_BYTE_SEQ_MASK == UTF8_4_BYTE_SEQ { + ByteResult::First(4) + } else { + ByteResult::Invalid + } +} + +pub fn validate_utf8_bytes(data: Bytes) -> Result { + let len: usize = data.len(); + let mut overflow_size: usize = 0; + let mut checked: ByteResult; + + if len == 0 { + Ok(ValidUtf8 { + valid: data, + overflow: None, + }) + } else { + let mut index = len; + let mut expected_overflow_size = 0; + + while index > 0 { + index -= 1; + let current = match data.get(index) { + Some(b) => b, + None => return protocol_other_error(ERROR_INVALID_UTF8_SEQUENCE_MESSAGE.to_owned()), + }; + + checked = check_byte(*current); + + match checked { + ByteResult::Continuation => { + overflow_size += 1; + continue; + } + ByteResult::First(seq_size) => { + overflow_size += 1; + + if overflow_size == seq_size { + index = len; + overflow_size = 0; + expected_overflow_size = 0; + break; + // we've just checked that whole code point is inside this data frame, so no overflow is required + } + if overflow_size > seq_size { + return protocol_data_error(ERROR_INVALID_UTF8_SEQUENCE_MESSAGE.to_owned()); + } + + expected_overflow_size = seq_size; + break; + } + ByteResult::Ok => { + index += 1; + break; + } + ByteResult::Invalid => { + return protocol_data_error(ERROR_INVALID_UTF8_SEQUENCE_MESSAGE.to_owned()) + } + } + } + + // index points at first "overflowed" byte + if overflow_size > 0 { + let (data, overflow) = data.split_at(index); + + if !check_overflow(overflow, expected_overflow_size) { + return protocol_data_error(ERROR_INVALID_UTF8_SEQUENCE_MESSAGE.to_owned()); + } + + let mut bytes_data = BytesMut::with_capacity(data.len()); + bytes_data.put(data); + + let mut bytes_overflow = BytesMut::with_capacity(overflow.len()); + bytes_overflow.put(overflow); + + Ok(ValidUtf8 { + valid: bytes_data.freeze(), + overflow: Some(bytes_overflow.freeze()), + }) + } else { + Ok(ValidUtf8 { + valid: data, + overflow: None, + }) + } + } +} diff --git a/websockets/autobahn/src/utf8/tests/mod.rs b/websockets/autobahn/src/utf8/tests/mod.rs new file mode 100644 index 00000000..e2a3865b --- /dev/null +++ b/websockets/autobahn/src/utf8/tests/mod.rs @@ -0,0 +1,3 @@ +mod test6_4_2; + +use super::*; diff --git a/websockets/autobahn/src/utf8/tests/test6_4_2.rs b/websockets/autobahn/src/utf8/tests/test6_4_2.rs new file mode 100644 index 00000000..cfa17105 --- /dev/null +++ b/websockets/autobahn/src/utf8/tests/test6_4_2.rs @@ -0,0 +1,41 @@ +use super::*; + +#[test] +fn invalid_sequence() { + let data: &[u8] = b"\xce\xba\xe1\xbd\xb9\xcf\x83\xce\xbc\xce\xb5\xf4\x90\x80\x80edited"; + let bytes = BytesMut::from(data).freeze(); + let tested_bytes = bytes.slice(11..14); + + let result = validate_utf8_bytes(tested_bytes); + + match result { + Err(ProtocolError::Io(err)) => { + assert_eq!(err.kind(), std::io::ErrorKind::InvalidData, "Error kind should be `Other`"); + assert_eq!(err.to_string(), ERROR_INVALID_UTF8_SEQUENCE_MESSAGE.to_owned()); + }, + Err(_) => assert!(false, "Result should be ProtocolError::Io"), + Ok(_) => assert!(false, "Result should be an error") + } +} + +#[test] +fn first_byte_type() { + let byte: u8 = 0xf4u8; + + let expected = ByteResult::First(4); + + let result = check_byte(byte); + + assert_eq!(result, expected); +} + +#[test] +fn second_byte_type() { + let byte: u8 = 0x90u8; + + let expected = ByteResult::Continuation; + + let result = check_byte(byte); + + assert_eq!(result, expected); +} diff --git a/websockets/autobahn/src/websocket.rs b/websockets/autobahn/src/websocket.rs new file mode 100644 index 00000000..61c5992d --- /dev/null +++ b/websockets/autobahn/src/websocket.rs @@ -0,0 +1,321 @@ +use super::utf8; + +use actix::Actor; +use actix::ActorContext; +use actix::StreamHandler; +use actix_http::ws::CloseCode; +use actix_http::ws::CloseReason; +use actix_http::ws::Item; +use actix_http::ws::ProtocolError; +use actix_web::web; +use actix_web::Error as WebError; +use actix_web::HttpRequest; +use actix_web::HttpResponse; +use actix_web_actors::ws; +use actix_web_actors::ws::WebsocketContext; + +use actix_web_actors::ws::WsResponseBuilder; +use bytes::BufMut; +use bytes::Bytes; +use bytes::BytesMut; + +use bytestring::ByteString; +use utf8::validate_utf8_bytes; +use utf8::ValidUtf8; + +enum ContinuationBuffer { + Text { + data: Vec, + overflow: Option, + }, + Binary(Vec), + Empty, +} + +impl ContinuationBuffer { + fn is_empty(&self) -> bool { + match self { + Self::Text { + data: _, + overflow: _, + } => false, + Self::Binary(_) => false, + Self::Empty => true, + } + } + + fn buffer_size(&self) -> usize { + match self { + Self::Text { data, overflow: _ } => data + .iter() + .fold(0, |accumulator, element| accumulator + element.len()), + Self::Binary(buffer) => buffer + .iter() + .fold(0, |accumulator, element| accumulator + element.len()), + Self::Empty => 0, + } + } + + fn append(&mut self, data: Bytes) -> Result<(), ws::ProtocolError> { + match self { + Self::Binary(buffer) => { + buffer.push(data); + Ok(()) + } + Self::Text { + data: buffer, + overflow, + } => { + let data = match overflow { + Some(overflow) => { + let new_data_len = data.len() + overflow.len(); + let mut new_data = BytesMut::with_capacity(new_data_len); + new_data.put(overflow); + new_data.put(data); + new_data.freeze() + } + None => data, + }; + + let ValidUtf8 { + valid, + overflow: message_overflow, + } = validate_utf8_bytes(data)?; + + ByteString::try_from(valid.clone()).map_err(|e| { + ProtocolError::Io(std::io::Error::new( + std::io::ErrorKind::Other, + format!("{}", e), + )) + })?; + + buffer.push(valid); + + if let Some(message_overflow) = message_overflow { + _ = overflow.insert(message_overflow); + } + + Ok(()) + } + Self::Empty => Err(ws::ProtocolError::ContinuationNotStarted), + } + } +} + +enum ContinuationMessage { + Text(ByteString), + Binary(Bytes), + Unfinished, +} + +struct WebsocketActor { + continuation_buffer: ContinuationBuffer, +} + +impl WebsocketActor { + fn continuation_handler( + &mut self, + item: Item, + ) -> Result { + match item { + Item::FirstBinary(data) => { + if self.continuation_buffer.is_empty() { + self.continuation_buffer = ContinuationBuffer::Binary(vec![data]); + Ok(ContinuationMessage::Unfinished) + } else { + Err(ws::ProtocolError::ContinuationStarted) + } + } + Item::FirstText(data) => { + if self.continuation_buffer.is_empty() { + let ValidUtf8 { valid, overflow } = validate_utf8_bytes(data)?; + + ByteString::try_from(valid.clone()).map_err(|e| { + ProtocolError::Io(std::io::Error::new( + std::io::ErrorKind::Other, + format!("{}", e), + )) + })?; + + self.continuation_buffer = ContinuationBuffer::Text { + data: vec![valid], + overflow, + }; + Ok(ContinuationMessage::Unfinished) + } else { + Err(ws::ProtocolError::ContinuationStarted) + } + } + Item::Continue(data) => { + self.continuation_buffer.append(data)?; + Ok(ContinuationMessage::Unfinished) + } + Item::Last(data) => { + let size = self.continuation_buffer.buffer_size() + data.len(); + let mut message_data = BytesMut::with_capacity(size); + match &mut self.continuation_buffer { + ContinuationBuffer::Text { + data: buffer, + overflow, + } => { + let data = match overflow { + Some(overflow) => { + let new_data_len = data.len() + overflow.len(); + let mut new_data = BytesMut::with_capacity(new_data_len); + new_data.put(overflow); + new_data.put(data); + new_data.freeze() + } + None => data, + }; + + let ValidUtf8 { + valid, + overflow: message_overflow, + } = validate_utf8_bytes(data)?; + + if let Some(bytes) = message_overflow { + return Err(ProtocolError::Io(std::io::Error::new( + std::io::ErrorKind::Other, + format!( + "invalid utf-8 sequence of {} bytes from index {}", + bytes.len(), + valid.len() + ), + ))); + } + + ByteString::try_from(valid.clone()).map_err(|e| { + ProtocolError::Io(std::io::Error::new( + std::io::ErrorKind::Other, + format!("{}", e), + )) + })?; + + for b in buffer { + message_data.put(b); + } + message_data.put(valid); + + let text = ByteString::try_from(message_data.freeze()).map_err(|e| { + ProtocolError::Io(std::io::Error::new( + std::io::ErrorKind::Other, + format!("{}", e), + )) + })?; + + Ok(ContinuationMessage::Text(text)) + } + ContinuationBuffer::Binary(buffer) => { + for b in buffer { + message_data.put(b); + } + message_data.put(data); + + Ok(ContinuationMessage::Binary(message_data.freeze())) + } + ContinuationBuffer::Empty => Err(ws::ProtocolError::ContinuationNotStarted), + } + } + } + } + + fn binary(&mut self, bin: Bytes, ctx: &mut ::Context) { + ctx.binary(bin); + } + + fn close(&mut self, reason: Option, ctx: &mut ::Context) { + match reason { + Some(CloseReason { + code: CloseCode::Other(code), + description: _, + }) => { + if (3000u16..5000u16).contains(&code) { + ctx.close(reason); + } else { + ctx.close(Some(CloseReason::from(CloseCode::Protocol))); + } + } + Some(CloseReason { + code: CloseCode::Abnormal, + description: _, + }) => { + ctx.close(Some(CloseReason::from(CloseCode::Protocol))); + } + reason => ctx.close(reason), + } + + ctx.stop(); + } + + fn text(&mut self, text: ByteString, ctx: &mut ::Context) { + ctx.text(text); + } + + fn protocol_error(&mut self, ctx: &mut ::Context) { + ctx.stop(); + } + + fn ping(&mut self, data: Bytes, ctx: &mut ::Context) { + ctx.pong(data.as_ref()); + } +} + +impl Default for WebsocketActor { + fn default() -> Self { + Self { + continuation_buffer: ContinuationBuffer::Empty, + } + } +} + +impl Actor for WebsocketActor { + type Context = WebsocketContext; +} + +impl StreamHandler> for WebsocketActor { + fn handle(&mut self, msg: Result, ctx: &mut Self::Context) { + match msg { + Ok(ws::Message::Binary(bin)) => { + if self.continuation_buffer.is_empty() { + self.binary(bin, ctx) + } else { + self.protocol_error(ctx); + } + } + Ok(ws::Message::Close(reason)) => self.close(reason, ctx), + Ok(ws::Message::Continuation(item)) => { + let result = self.continuation_handler(item); + + match result { + Err(_) => self.protocol_error(ctx), + Ok(ContinuationMessage::Binary(bin)) => self.binary(bin, ctx), + Ok(ContinuationMessage::Text(text)) => self.text(text, ctx), + Ok(ContinuationMessage::Unfinished) => {} + } + } + Ok(ws::Message::Text(text)) => { + if self.continuation_buffer.is_empty() { + self.text(text, ctx); + } else { + self.protocol_error(ctx); + } + } + Ok(ws::Message::Ping(data)) =>{ + self.ping(data, ctx) + } + Ok(_) => (), + Err(_) => self.protocol_error(ctx), + } + } +} + +pub async fn index(req: HttpRequest, stream: web::Payload) -> Result { + WsResponseBuilder::new( + WebsocketActor::default(), + &req, stream + ) + // allow 16MB of data to be sent in single message (biggest frame in autobahn test) + .frame_size(16_777_216) + .start() +}