Skip to content

Commit 4b77280

Browse files
add async feature
1 parent c616960 commit 4b77280

File tree

7 files changed

+808
-133
lines changed

7 files changed

+808
-133
lines changed

Cargo.toml

Lines changed: 55 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,20 +11,70 @@ categories = ["embedded", "no-std", "network-programming"]
1111
readme = "README.md"
1212

1313
[dependencies]
14-
sha1 = "0.10.1"
15-
heapless = "0.7.14"
16-
byteorder = { version = "1.4.3", default-features = false }
17-
httparse = { version = "1.7.1", default-features = false }
18-
rand_core = "0.6.3"
14+
anyhow = "1.0.58"
15+
async-std = { version = "1.12.0", optional = true }
1916
base64 = { version = "0.13.0", default-features = false }
2017
base64-simd = { version = "0.5.0", default-features = false, optional = true }
18+
byteorder = { version = "1.4.3", default-features = false }
2119
cfg-if = "1.0.0"
20+
core2 = "0.4.0"
21+
futures = { version = "0.3.21", optional = true }
22+
heapless = "0.7.14"
23+
httparse = { version = "1.7.1", default-features = false }
24+
rand_core = "0.6.3"
25+
sha1 = "0.10.1"
26+
smol = { version = "1.2.5", optional = true }
27+
tokio = { version = "1.19.2", features = ["io-util"], optional = true }
2228

2329
[dev-dependencies]
30+
async-std = { version = "1.12.0", features = ["attributes"] }
31+
async-trait = "0.1.56"
32+
cfg-if = "1.0.0"
33+
once_cell = "1.12.0"
2434
rand = "0.8.5"
35+
route-recognizer = "0.3.1"
36+
smol = "1.2.5"
37+
smol-potat = { version = "1.1.2", features = ["auto"] }
38+
tokio = { version = "1.19.2", features = ["macros", "net", "rt-multi-thread", "io-util"] }
39+
tokio-stream = { version = "0.1.9", features = ["net"] }
2540

2641
# see readme for no_std support
2742
[features]
2843
default = ["std"]
2944
# default = []
3045
std = []
46+
async = ["std"]
47+
tokio = ["dep:tokio", "async"]
48+
futures = ["dep:futures", "async"]
49+
smol = ["dep:smol", "async"]
50+
async-std = ["dep:async-std", "async"]
51+
52+
[[example]]
53+
name = "server_tokio"
54+
path = "examples/server_async/main.rs"
55+
required-features = ["tokio"]
56+
57+
[[example]]
58+
name = "server_smol"
59+
path = "examples/server_async/main.rs"
60+
required-features = ["smol"]
61+
62+
[[example]]
63+
name = "server_async_std"
64+
path = "examples/server_async/main.rs"
65+
required-features = ["async-std"]
66+
67+
[[example]]
68+
name = "client_tokio"
69+
path = "examples/client_async/main.rs"
70+
required-features = ["tokio"]
71+
72+
[[example]]
73+
name = "client_smol"
74+
path = "examples/client_async/main.rs"
75+
required-features = ["smol"]
76+
77+
[[example]]
78+
name = "client_async_std"
79+
path = "examples/client_async/main.rs"
80+
required-features = ["async-std"]

examples/client.rs renamed to examples/client/main.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,15 @@ use embedded_websocket::{
1313
framer::{Framer, FramerError, ReadResult},
1414
WebSocketClient, WebSocketCloseStatusCode, WebSocketOptions, WebSocketSendMessageType,
1515
};
16-
use std::{error::Error, net::TcpStream};
16+
use std::net::TcpStream;
1717

18-
fn main() -> Result<(), FramerError<impl Error>> {
18+
fn main() -> Result<(), FramerError> {
1919
// open a TCP stream to localhost port 1337
2020
let address = "127.0.0.1:1337";
2121
println!("Connecting to: {}", address);
22-
let mut stream = TcpStream::connect(address).map_err(FramerError::Io)?;
22+
let mut stream = TcpStream::connect(address)
23+
.map_err(anyhow::Error::new)
24+
.map_err(FramerError::Io)?;
2325
println!("Connected.");
2426

2527
let mut read_buf = [0; 4000];

examples/client_async/main.rs

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
// The MIT License (MIT)
2+
// Copyright (c) 2019 David Haig
3+
4+
// Demo websocket client connecting to localhost port 1337.
5+
// This will initiate a websocket connection to path /chat. The demo sends a simple "Hello, World!"
6+
// message and expects an echo of the same message as a reply.
7+
// It will then initiate a close handshake, wait for a close response from the server,
8+
// and terminate the connection.
9+
// Note that we are using the standard library in the demo and making use of the framer helper module
10+
// but the websocket library remains no_std (see client_full for an example without the framer helper module)
11+
12+
use embedded_websocket::{
13+
framer::{Framer, FramerError, ReadResult},
14+
WebSocketClient, WebSocketCloseStatusCode, WebSocketOptions, WebSocketSendMessageType,
15+
};
16+
17+
cfg_if::cfg_if! {
18+
if #[cfg(feature = "tokio")] {
19+
use tokio::net::TcpStream;
20+
} else if #[cfg(feature = "smol")] {
21+
use smol::net::TcpStream;
22+
} else if #[cfg(feature = "async-std")] {
23+
use async_std::net::TcpStream;
24+
}
25+
}
26+
27+
#[cfg_attr(feature = "async-std", async_std::main)]
28+
#[cfg_attr(feature = "tokio", tokio::main)]
29+
#[cfg_attr(feature = "smol", smol_potat::main)]
30+
async fn main() -> Result<(), FramerError> {
31+
// open a TCP stream to localhost port 1337
32+
let address = "127.0.0.1:1337";
33+
println!("Connecting to: {}", address);
34+
let mut stream = TcpStream::connect(address)
35+
.await
36+
.map_err(anyhow::Error::new)
37+
.map_err(FramerError::Io)?;
38+
println!("Connected.");
39+
40+
let mut read_buf = [0; 4000];
41+
let mut read_cursor = 0;
42+
let mut write_buf = [0; 4000];
43+
let mut frame_buf = [0; 4000];
44+
let mut websocket = WebSocketClient::new_client(rand::thread_rng());
45+
46+
// initiate a websocket opening handshake
47+
let websocket_options = WebSocketOptions {
48+
path: "/chat",
49+
host: "localhost",
50+
origin: "http://localhost:1337",
51+
sub_protocols: None,
52+
additional_headers: None,
53+
};
54+
55+
let mut framer = Framer::new(
56+
&mut read_buf,
57+
&mut read_cursor,
58+
&mut write_buf,
59+
&mut websocket,
60+
);
61+
framer
62+
.connect_async(&mut stream, &websocket_options)
63+
.await?;
64+
65+
let message = "Hello, World!";
66+
framer
67+
.write_async(
68+
&mut stream,
69+
WebSocketSendMessageType::Text,
70+
true,
71+
message.as_bytes(),
72+
)
73+
.await?;
74+
75+
while let ReadResult::Text(s) = framer.read_async(&mut stream, &mut frame_buf).await? {
76+
println!("Received: {}", s);
77+
78+
// close the websocket after receiving the first reply
79+
framer
80+
.close_async(&mut stream, WebSocketCloseStatusCode::NormalClosure, None)
81+
.await?;
82+
println!("Sent close handshake");
83+
}
84+
85+
println!("Connection closed");
86+
Ok(())
87+
}

examples/server.rs renamed to examples/server/main.rs

Lines changed: 63 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -3,32 +3,33 @@
33

44
// Demo websocket server that listens on localhost port 1337.
55
// If accessed from a browser it will return a web page that will automatically attempt to
6-
// open a websocket connection to itself. Alternatively, the client.rs example can be used to
6+
// open a websocket connection to itself. Alternatively, the main example can be used to
77
// open a websocket connection directly. The server will echo all Text and Ping messages back to
88
// the client as well as responding to any opening and closing handshakes.
99
// Note that we are using the standard library in the demo but the websocket library remains no_std
1010

1111
use embedded_websocket as ws;
12+
use httparse::Request;
13+
use once_cell::sync::Lazy;
14+
use route_recognizer::Router;
15+
use std::io::{Read, Write};
1216
use std::net::{TcpListener, TcpStream};
1317
use std::str::Utf8Error;
1418
use std::thread;
15-
use std::{
16-
io::{Read, Write},
17-
usize,
18-
};
1919
use ws::framer::ReadResult;
2020
use ws::{
2121
framer::{Framer, FramerError},
22-
WebSocketContext, WebSocketSendMessageType, WebSocketServer,
22+
WebSocketSendMessageType, WebSocketServer,
2323
};
2424

2525
type Result<T> = std::result::Result<T, WebServerError>;
2626

2727
#[derive(Debug)]
2828
pub enum WebServerError {
2929
Io(std::io::Error),
30-
Framer(FramerError<std::io::Error>),
30+
Framer(FramerError),
3131
WebSocket(ws::Error),
32+
HttpError(String),
3233
Utf8Error,
3334
}
3435

@@ -38,8 +39,8 @@ impl From<std::io::Error> for WebServerError {
3839
}
3940
}
4041

41-
impl From<FramerError<std::io::Error>> for WebServerError {
42-
fn from(err: FramerError<std::io::Error>) -> WebServerError {
42+
impl From<FramerError> for WebServerError {
43+
fn from(err: FramerError) -> WebServerError {
4344
WebServerError::Framer(err)
4445
}
4546
}
@@ -77,13 +78,24 @@ fn main() -> std::io::Result<()> {
7778
Ok(())
7879
}
7980

80-
fn handle_client(mut stream: TcpStream) -> Result<()> {
81-
println!("Client connected {}", stream.peer_addr()?);
82-
let mut read_buf = [0; 4000];
83-
let mut read_cursor = 0;
81+
type Handler = Box<dyn Fn(&mut TcpStream, &Request) -> Result<()> + Send + Sync>;
82+
83+
static ROUTER: Lazy<Router<Handler>> = Lazy::new(|| {
84+
let mut router = Router::new();
85+
router.add("/chat", Box::new(handle_chat) as Handler);
86+
router.add("/", Box::new(handle_root) as Handler);
87+
router
88+
});
8489

85-
if let Some(websocket_context) = read_header(&mut stream, &mut read_buf, &mut read_cursor)? {
90+
fn handle_chat(stream: &mut TcpStream, req: &Request) -> Result<()> {
91+
println!("Received chat request: {:?}", req.path);
92+
93+
if let Some(websocket_context) =
94+
ws::read_http_header(req.headers.iter().map(|f| (f.name, f.value)))?
95+
{
8696
// this is a websocket upgrade HTTP request
97+
let mut read_buf = [0; 4000];
98+
let mut read_cursor = 0;
8799
let mut write_buf = [0; 4000];
88100
let mut frame_buf = [0; 4000];
89101
let mut websocket = WebSocketServer::new_server();
@@ -95,85 +107,65 @@ fn handle_client(mut stream: TcpStream) -> Result<()> {
95107
);
96108

97109
// complete the opening handshake with the client
98-
framer.accept(&mut stream, &websocket_context)?;
110+
framer.accept(stream, &websocket_context)?;
99111
println!("Websocket connection opened");
100112

101113
// read websocket frames
102-
while let ReadResult::Text(text) = framer.read(&mut stream, &mut frame_buf)? {
114+
while let ReadResult::Text(text) = framer.read(stream, &mut frame_buf)? {
103115
println!("Received: {}", text);
104116

105117
// send the text back to the client
106118
framer.write(
107-
&mut stream,
119+
stream,
108120
WebSocketSendMessageType::Text,
109121
true,
110-
text.as_bytes(),
122+
format!("shut the fuck up {}", text).as_bytes(),
111123
)?
112124
}
113125

114126
println!("Closing websocket connection");
115-
Ok(())
116-
} else {
117-
Ok(())
118127
}
128+
129+
Ok(())
119130
}
120131

121-
fn read_header(
122-
stream: &mut TcpStream,
123-
read_buf: &mut [u8],
124-
read_cursor: &mut usize,
125-
) -> Result<Option<WebSocketContext>> {
126-
loop {
127-
let mut headers = [httparse::EMPTY_HEADER; 16];
128-
let mut request = httparse::Request::new(&mut headers);
129-
130-
let received_size = stream.read(&mut read_buf[*read_cursor..])?;
131-
132-
match request
133-
.parse(&read_buf[..*read_cursor + received_size])
134-
.unwrap()
135-
{
136-
httparse::Status::Complete(len) => {
137-
// if we read exactly the right amount of bytes for the HTTP header then read_cursor would be 0
138-
*read_cursor += received_size - len;
139-
let headers = request.headers.iter().map(|f| (f.name, f.value));
140-
match ws::read_http_header(headers)? {
141-
Some(websocket_context) => match request.path {
142-
Some("/chat") => {
143-
return Ok(Some(websocket_context));
144-
}
145-
_ => return_404_not_found(stream, request.path)?,
146-
},
147-
None => {
148-
handle_non_websocket_http_request(stream, request.path)?;
149-
}
150-
}
151-
return Ok(None);
152-
}
153-
// keep reading while the HTTP header is incomplete
154-
httparse::Status::Partial => *read_cursor += received_size,
155-
}
156-
}
132+
fn handle_root(stream: &mut TcpStream, _req: &Request) -> Result<()> {
133+
stream.write_all(&ROOT_HTML.as_bytes())?;
134+
Ok(())
157135
}
158136

159-
fn handle_non_websocket_http_request(stream: &mut TcpStream, path: Option<&str>) -> Result<()> {
160-
println!("Received file request: {:?}", path);
137+
fn handle_client(mut stream: TcpStream) -> Result<()> {
138+
println!("Client connected {}", stream.peer_addr()?);
139+
let mut read_buf = [0; 4000];
140+
let mut read_cursor = 0;
161141

162-
match path {
163-
Some("/") => stream.write_all(&ROOT_HTML.as_bytes())?,
164-
unknown_path => {
165-
return_404_not_found(stream, unknown_path)?;
142+
let mut headers = vec![httparse::EMPTY_HEADER; 8];
143+
let received_size = stream.read(&mut read_buf[read_cursor..])?;
144+
let request = loop {
145+
let mut request = Request::new(&mut headers);
146+
match request.parse(&read_buf[..read_cursor + received_size]) {
147+
Ok(httparse::Status::Partial) => read_cursor += received_size,
148+
Ok(httparse::Status::Complete(_)) => break request,
149+
Err(httparse::Error::TooManyHeaders) => {
150+
headers.resize(headers.len() * 2, httparse::EMPTY_HEADER)
151+
}
152+
_ => panic!("http parser error"),
166153
}
167154
};
168155

169-
Ok(())
170-
}
171-
172-
fn return_404_not_found(stream: &mut TcpStream, unknown_path: Option<&str>) -> Result<()> {
173-
println!("Unknown path: {:?}", unknown_path);
174-
let html = "HTTP/1.1 404 Not Found\r\nContent-Length: 0\r\nConnection: close\r\n\r\n";
175-
stream.write_all(&html.as_bytes())?;
176-
Ok(())
156+
match ROUTER.recognize(request.path.unwrap_or("/")) {
157+
Ok(handler) => handler.handler()(&mut stream, &request),
158+
Err(e) => {
159+
println!("Unknown path: {:?}", request.path);
160+
let html = format!(
161+
"HTTP/1.1 404 Not Found\r\nContent-Length: {len}\r\nConnection: close\r\n\r\n{msg}",
162+
len = e.len(),
163+
msg = e
164+
);
165+
stream.write_all(&html.as_bytes())?;
166+
Err(WebServerError::HttpError(e))
167+
}
168+
}
177169
}
178170

179171
const ROOT_HTML : &str = "HTTP/1.1 200 OK\r\nContent-Type: text/html; charset=UTF-8\r\nContent-Length: 2590\r\nConnection: close\r\n\r\n<!doctype html>

0 commit comments

Comments
 (0)