Skip to content

Commit 5e5aca5

Browse files
authored
Add support for seanmonstar/warp web server framework (cloudevents#97)
Initial take on cloud events support for `seanmonstar/warp` Signed-off-by: Marko Milenković <[email protected]>
1 parent d69ab90 commit 5e5aca5

File tree

14 files changed

+788
-1
lines changed

14 files changed

+788
-1
lines changed

.github/workflows/rust_tests.yml

+8
Original file line numberDiff line numberDiff line change
@@ -124,3 +124,11 @@ jobs:
124124
command: build
125125
toolchain: ${{ matrix.toolchain }}
126126
args: --target ${{ matrix.target }} --manifest-path ./example-projects/actix-web-example/Cargo.toml
127+
128+
- uses: actions-rs/cargo@v1
129+
name: "Build warp-example"
130+
if: matrix.target == 'x86_64-unknown-linux-gnu' && matrix.toolchain == 'stable'
131+
with:
132+
command: build
133+
toolchain: ${{ matrix.toolchain }}
134+
args: --target ${{ matrix.target }} --manifest-path ./example-projects/warp-example/Cargo.toml

Cargo.toml

+3-1
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,12 @@ members = [
4444
".",
4545
"cloudevents-sdk-actix-web",
4646
"cloudevents-sdk-reqwest",
47-
"cloudevents-sdk-rdkafka"
47+
"cloudevents-sdk-rdkafka",
48+
"cloudevents-sdk-warp"
4849
]
4950
exclude = [
5051
"example-projects/actix-web-example",
5152
"example-projects/reqwest-wasm-example",
5253
"example-projects/rdkafka-example",
54+
"example-projects/warp-example",
5355
]

README.md

+2
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ Note: This project is WIP under active development, hence all APIs are considere
2424
* `cloudevents-sdk-actix-web`: Integration with [Actix Web](https://github.com/actix/actix-web).
2525
* `cloudevents-sdk-reqwest`: Integration with [reqwest](https://github.com/seanmonstar/reqwest).
2626
* `cloudevents-sdk-rdkafka`: Integration with [rust-rdkafka](https://fede1024.github.io/rust-rdkafka).
27+
* `cloudevents-sdk-warp`: Integration with [rust-rdkafka](https://github.com/seanmonstar/warp/).
2728

2829
## Get Started
2930

@@ -52,6 +53,7 @@ Checkout the examples using our integrations to learn how to send and receive ev
5253
* [Actix Web Example](example-projects/actix-web-example)
5354
* [Reqwest/WASM Example](example-projects/reqwest-wasm-example)
5455
* [Kafka Example](example-projects/rdkafka-example)
56+
* [Warp Example](example-projects/warp-example)
5557

5658
## Development & Contributing
5759

cloudevents-sdk-warp/Cargo.toml

+23
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
[package]
2+
name = "cloudevents-sdk-warp"
3+
version = "0.3.0"
4+
authors = ["Marko Milenković <[email protected]>"]
5+
edition = "2018"
6+
categories = ["web-programming", "encoding"]
7+
license-file = "../LICENSE"
8+
9+
[dependencies]
10+
cloudevents-sdk = { path = ".." }
11+
lazy_static = "1.4.0"
12+
bytes = "^0.5"
13+
warp = "0.2"
14+
http = "0.2"
15+
hyper = "0.13"
16+
17+
[dev-dependencies]
18+
tokio = { version = "^0.2", features = ["full"] }
19+
url = { version = "^2.1" }
20+
serde_json = "^1.0"
21+
chrono = { version = "^0.4", features = ["serde"] }
22+
mime = "0.3"
23+
version-sync = "^0.9"

cloudevents-sdk-warp/README.md

+110
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
# CloudEvents SDK Rust - WARP [![Crates badge]][crates.io] [![Docs badge]][docs.rs]
2+
3+
Integration of [CloudEvents SDK](https://github.com/cloudevents/sdk-rust/) with [Warp - Web Server Framework](https://github.com/seanmonstar/warp/).
4+
5+
Look at [CloudEvents SDK README](https://github.com/cloudevents/sdk-rust/) for more info.
6+
7+
Using this crate you can extract CloudEvent from requests and write CloudEvents to responses.
8+
9+
To echo events:
10+
11+
```rust
12+
use cloudevents_sdk_warp::{filter, reply};
13+
use warp::Filter;
14+
15+
#[tokio::main]
16+
async fn main() {
17+
let routes = warp::any()
18+
// extracting event from request
19+
.and(filter::to_event())
20+
// returning event back
21+
.map(|event| reply::from_event(event));
22+
23+
warp::serve(routes).run(([127, 0, 0, 1], 3030)).await;
24+
}
25+
```
26+
27+
Executing `http` request:
28+
29+
```bash
30+
curl -v \
31+
-H "ce-specversion: 1.0" \
32+
-H "ce-id: 2" \
33+
-H "ce-type: example.event" \
34+
-H "ce-source: url://example_response/" \
35+
-H "content-type: application/json" \
36+
-X POST -d '{ "age": 43, "name": "John Doe", "phones": ["+44 1234567","+44 2345678"] }' \
37+
http://localhost:3030/
38+
```
39+
40+
Should produce response similar to:
41+
42+
```
43+
* TCP_NODELAY set
44+
* Connected to localhost (127.0.0.1) port 3030 (#0)
45+
> POST / HTTP/1.1
46+
> Host: localhost:3030
47+
> User-Agent: curl/7.64.1
48+
> Accept: */*
49+
> ce-specversion: 1.0
50+
> ce-id: 2
51+
> ce-type: example.event
52+
> ce-source: url://example_response/
53+
> content-type: application/json
54+
> Content-Length: 74
55+
>
56+
* upload completely sent off: 74 out of 74 bytes
57+
< HTTP/1.1 200 OK
58+
< ce-specversion: 1.0
59+
< ce-id: 2
60+
< ce-type: example.event
61+
< ce-source: url://example_response/
62+
< content-type: application/json
63+
< content-length: 74
64+
< date: Mon, 02 Nov 2020 13:33:40 GMT
65+
<
66+
* Connection #0 to host localhost left intact
67+
{ "age": 43, "name": "John Doe", "phones": ["+44 1234567","+44 2345678"] }
68+
```
69+
70+
To create event inside request handlers and send them as responses:
71+
72+
```rust
73+
#[tokio::main]
74+
async fn main() {
75+
let routes = warp::any().map(|| {
76+
let event = EventBuilderV10::new()
77+
.id("1")
78+
.source(url::Url::parse("url://example_response/").unwrap())
79+
.ty("example.ce")
80+
.data(
81+
mime::APPLICATION_JSON.to_string(),
82+
json!({
83+
"name": "John Doe",
84+
"age": 43,
85+
"phones": [
86+
"+44 1234567",
87+
"+44 2345678"
88+
]
89+
}),
90+
)
91+
.build();
92+
93+
match event {
94+
Ok(event) => Ok(reply::from_event(event)),
95+
Err(e) => Ok(warp::reply::with_status(
96+
e.to_string(),
97+
StatusCode::INTERNAL_SERVER_ERROR,
98+
)
99+
.into_response()),
100+
}
101+
});
102+
103+
warp::serve(routes).run(([127, 0, 0, 1], 3030)).await;
104+
}
105+
```
106+
107+
[Crates badge]: https://img.shields.io/crates/v/cloudevents-sdk-warp.svg
108+
[crates.io]: https://crates.io/crates/cloudevents-sdk-warp
109+
[Docs badge]: https://docs.rs/cloudevents-sdk-warp/badge.svg
110+
[docs.rs]: https://docs.rs/cloudevents-sdk-warp

cloudevents-sdk-warp/src/filter.rs

+136
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
use crate::server_request::request_to_event;
2+
3+
use cloudevents::Event;
4+
use warp::http::HeaderMap;
5+
use warp::Filter;
6+
use warp::Rejection;
7+
8+
#[derive(Debug)]
9+
pub struct EventFilterError {
10+
error: cloudevents::message::Error,
11+
}
12+
13+
impl warp::reject::Reject for EventFilterError {}
14+
15+
///
16+
/// # Extracts [`cloudevents::Event`] from incoming request
17+
///
18+
/// ```
19+
/// use cloudevents_sdk_warp::filter::to_event;
20+
/// use warp::Filter;
21+
/// use warp::Reply;
22+
///
23+
/// let routes = warp::any()
24+
/// .and(to_event())
25+
/// .map(|event| {
26+
/// // do something with the event
27+
/// }
28+
/// );
29+
/// ```
30+
///
31+
pub fn to_event() -> impl Filter<Extract = (Event,), Error = Rejection> + Copy {
32+
warp::header::headers_cloned()
33+
.and(warp::body::bytes())
34+
.and_then(create_event)
35+
}
36+
37+
async fn create_event(headers: HeaderMap, body: bytes::Bytes) -> Result<Event, Rejection> {
38+
request_to_event(headers, body)
39+
.map_err(|error| warp::reject::custom(EventFilterError { error }))
40+
}
41+
42+
#[cfg(test)]
43+
mod tests {
44+
use super::to_event;
45+
use url::Url;
46+
use warp::test;
47+
48+
use chrono::Utc;
49+
use cloudevents::{EventBuilder, EventBuilderV10};
50+
use serde_json::json;
51+
use std::str::FromStr;
52+
53+
#[tokio::test]
54+
async fn test_request() {
55+
let time = Utc::now();
56+
let expected = EventBuilderV10::new()
57+
.id("0001")
58+
.ty("example.test")
59+
.source("http://localhost/")
60+
.time(time)
61+
.extension("someint", "10")
62+
.build()
63+
.unwrap();
64+
65+
let result = test::request()
66+
.method("POST")
67+
.header("ce-specversion", "1.0")
68+
.header("ce-id", "0001")
69+
.header("ce-type", "example.test")
70+
.header("ce-source", "http://localhost/")
71+
.header("ce-someint", "10")
72+
.header("ce-time", time.to_rfc3339())
73+
.filter(&to_event())
74+
.await
75+
.unwrap();
76+
77+
assert_eq!(expected, result);
78+
}
79+
80+
#[tokio::test]
81+
async fn test_bad_request() {
82+
let time = Utc::now();
83+
84+
let result = test::request()
85+
.method("POST")
86+
.header("ce-specversion", "BAD SPECIFICATION")
87+
.header("ce-id", "0001")
88+
.header("ce-type", "example.test")
89+
.header("ce-source", "http://localhost/")
90+
.header("ce-someint", "10")
91+
.header("ce-time", time.to_rfc3339())
92+
.filter(&to_event())
93+
.await;
94+
95+
assert!(result.is_err());
96+
let rejection = result.unwrap_err();
97+
98+
let reason = rejection.find::<super::EventFilterError>().unwrap();
99+
assert_eq!(
100+
reason.error.to_string(),
101+
"Invalid specversion BAD SPECIFICATION"
102+
)
103+
}
104+
105+
#[tokio::test]
106+
async fn test_request_with_full_data() {
107+
let time = Utc::now();
108+
let j = json!({"hello": "world"});
109+
110+
let expected = EventBuilderV10::new()
111+
.id("0001")
112+
.ty("example.test")
113+
.source(Url::from_str("http://localhost").unwrap())
114+
.time(time)
115+
.data("application/json", j.to_string().into_bytes())
116+
.extension("someint", "10")
117+
.build()
118+
.unwrap();
119+
120+
let result = test::request()
121+
.method("POST")
122+
.header("ce-specversion", "1.0")
123+
.header("ce-id", "0001")
124+
.header("ce-type", "example.test")
125+
.header("ce-source", "http://localhost")
126+
.header("ce-someint", "10")
127+
.header("ce-time", time.to_rfc3339())
128+
.header("content-type", "application/json")
129+
.json(&j)
130+
.filter(&to_event())
131+
.await
132+
.unwrap();
133+
134+
assert_eq!(expected, result);
135+
}
136+
}

cloudevents-sdk-warp/src/headers.rs

+61
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
use cloudevents::event::SpecVersion;
2+
use http::header::HeaderName;
3+
use lazy_static::lazy_static;
4+
use warp::http::HeaderValue;
5+
6+
use std::collections::HashMap;
7+
use std::str::FromStr;
8+
9+
macro_rules! unwrap_optional_header {
10+
($headers:expr, $name:expr) => {
11+
$headers
12+
.get::<&'static HeaderName>(&$name)
13+
.map(|a| header_value_to_str!(a))
14+
};
15+
}
16+
17+
macro_rules! header_value_to_str {
18+
($header_value:expr) => {
19+
$header_value
20+
.to_str()
21+
.map_err(|e| cloudevents::message::Error::Other {
22+
source: Box::new(e),
23+
})
24+
};
25+
}
26+
27+
macro_rules! str_name_to_header {
28+
($attribute:expr) => {
29+
HeaderName::from_str($attribute).map_err(|e| cloudevents::message::Error::Other {
30+
source: Box::new(e),
31+
})
32+
};
33+
}
34+
35+
macro_rules! attribute_name_to_header {
36+
($attribute:expr) => {
37+
str_name_to_header!(&["ce-", $attribute].concat())
38+
};
39+
}
40+
41+
fn attributes_to_headers(
42+
it: impl Iterator<Item = &'static str>,
43+
) -> HashMap<&'static str, HeaderName> {
44+
it.map(|s| {
45+
if s == "datacontenttype" {
46+
(s, http::header::CONTENT_TYPE)
47+
} else {
48+
(s, attribute_name_to_header!(s).unwrap())
49+
}
50+
})
51+
.collect()
52+
}
53+
54+
lazy_static! {
55+
pub(crate) static ref ATTRIBUTES_TO_HEADERS: HashMap<&'static str, HeaderName> =
56+
attributes_to_headers(SpecVersion::all_attribute_names());
57+
pub(crate) static ref SPEC_VERSION_HEADER: HeaderName =
58+
HeaderName::from_static("ce-specversion");
59+
pub(crate) static ref CLOUDEVENTS_JSON_HEADER: HeaderValue =
60+
HeaderValue::from_static("application/cloudevents+json");
61+
}

0 commit comments

Comments
 (0)