Skip to content

Update to std::future #389

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Aug 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ addons:
matrix:
include:
- rust: nightly
- rust: stable
# - rust: stable
before_deploy: cargo doc --no-deps
allow_failures:
- rust: nightly
# allow_failures:
# - rust: nightly

before_script:
- cargo clean
Expand All @@ -39,8 +39,8 @@ script:
# Run integration tests
- cargo test -p h2-tests

# Run h2spec on stable
- if [ "${TRAVIS_RUST_VERSION}" = "stable" ]; then ./ci/h2spec.sh; fi
# Run h2spec on nightly for the time being. TODO: Change it to stable after Rust 1.38 release
- if [ "${TRAVIS_RUST_VERSION}" = "nightly" ]; then ./ci/h2spec.sh; fi

# Check minimal versions
- if [ "${TRAVIS_RUST_VERSION}" = "nightly" ]; then cargo clean; cargo check -Z minimal-versions; fi
Expand Down
7 changes: 4 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ members = [
]

[dependencies]
futures = "0.1"
tokio-io = "0.1.4"
futures-preview = "0.3.0-alpha.17"
tokio-io = "0.2.0-alpha.1"
tokio-codec = "0.2.0-alpha.1"
bytes = "0.4.7"
http = "0.1.8"
log = "0.4.1"
Expand All @@ -64,7 +65,7 @@ serde = "1.0.0"
serde_json = "1.0.0"

# Akamai example
tokio = "0.1.8"
tokio = "0.2.0-alpha.1"
env_logger = { version = "0.5.3", default-features = false }
rustls = "0.12"
tokio-rustls = "0.5.0"
Expand Down
70 changes: 30 additions & 40 deletions examples/akamai.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
fn main() {
// Enable the below code once tokio_rustls moves to std::future
}

/*
#![feature(async_await)]

use h2::client;

use futures::*;
Expand All @@ -10,10 +17,12 @@ use tokio_rustls::ClientConfigExt;
use webpki::DNSNameRef;

use std::net::ToSocketAddrs;
use std::error::Error;

const ALPN_H2: &str = "h2";

pub fn main() {
#[tokio::main]
pub async fn main() -> Result<(), Box<dyn Error>> {
let _ = env_logger::try_init();

let tls_client_config = std::sync::Arc::new({
Expand All @@ -33,49 +42,30 @@ pub fn main() {

println!("ADDR: {:?}", addr);

let tcp = TcpStream::connect(&addr);
let tcp = TcpStream::connect(&addr).await?;
let dns_name = DNSNameRef::try_from_ascii_str("http2.akamai.com").unwrap();

let tcp = tcp.then(move |res| {
let tcp = res.unwrap();
tls_client_config
.connect_async(dns_name, tcp)
.then(|res| {
let tls = res.unwrap();
{
let (_, session) = tls.get_ref();
let negotiated_protocol = session.get_alpn_protocol();
assert_eq!(Some(ALPN_H2), negotiated_protocol.as_ref().map(|x| &**x));
}

println!("Starting client handshake");
client::handshake(tls)
})
.then(|res| {
let (mut client, h2) = res.unwrap();

let request = Request::builder()
let res = tls_client_config.connect_async(dns_name, tcp).await;
let tls = res.unwrap();
{
let (_, session) = tls.get_ref();
let negotiated_protocol = session.get_alpn_protocol();
assert_eq!(Some(ALPN_H2), negotiated_protocol.as_ref().map(|x| &**x));
}

println!("Starting client handshake");
let (mut client, h2) = client::handshake(tls).await?;

let request = Request::builder()
.method(Method::GET)
.uri("https://http2.akamai.com/")
.body(())
.unwrap();

let (response, _) = client.send_request(request, true).unwrap();

let stream = response.and_then(|response| {
let (_, body) = response.into_parts();

body.for_each(|chunk| {
println!("RX: {:?}", chunk);
Ok(())
})
});

h2.join(stream)
})
})
.map_err(|e| eprintln!("ERROR: {:?}", e))
.map(|((), ())| ());

tokio::run(tcp);
let (response, _) = client.send_request(request, true).unwrap();
let (_, mut body) = response.await?.into_parts();
while let Some(chunk) = body.next().await {
println!("RX: {:?}", chunk?);
}
Ok(())
}
*/
105 changes: 37 additions & 68 deletions examples/client.rs
Original file line number Diff line number Diff line change
@@ -1,87 +1,56 @@
#![feature(async_await)]

use futures::future::poll_fn;
use futures::StreamExt;
use h2::client;
use h2::RecvStream;
use http::{HeaderMap, Request};

use futures::*;
use http::*;
use std::error::Error;

use tokio::net::TcpStream;

struct Process {
body: RecvStream,
trailers: bool,
}

impl Future for Process {
type Item = ();
type Error = h2::Error;

fn poll(&mut self) -> Poll<(), h2::Error> {
loop {
if self.trailers {
let trailers = try_ready!(self.body.poll_trailers());

println!("GOT TRAILERS: {:?}", trailers);

return Ok(().into());
} else {
match try_ready!(self.body.poll()) {
Some(chunk) => {
println!("GOT CHUNK = {:?}", chunk);
},
None => {
self.trailers = true;
},
}
}
}
}
}

pub fn main() {
#[tokio::main]
pub async fn main() -> Result<(), Box<dyn Error>> {
let _ = env_logger::try_init();

let tcp = TcpStream::connect(&"127.0.0.1:5928".parse().unwrap());
let tcp = TcpStream::connect(&"127.0.0.1:5928".parse().unwrap()).await?;
let (mut client, h2) = client::handshake(tcp).await?;

let tcp = tcp.then(|res| {
let tcp = res.unwrap();
client::handshake(tcp)
}).then(|res| {
let (mut client, h2) = res.unwrap();
println!("sending request");

println!("sending request");
let request = Request::builder()
.uri("https://http2.akamai.com/")
.body(())
.unwrap();

let request = Request::builder()
.uri("https://http2.akamai.com/")
.body(())
.unwrap();
let mut trailers = HeaderMap::new();
trailers.insert("zomg", "hello".parse().unwrap());

let mut trailers = HeaderMap::new();
trailers.insert("zomg", "hello".parse().unwrap());
let (response, mut stream) = client.send_request(request, false).unwrap();

let (response, mut stream) = client.send_request(request, false).unwrap();
// send trailers
stream.send_trailers(trailers).unwrap();

// send trailers
stream.send_trailers(trailers).unwrap();
// Spawn a task to run the conn...
tokio::spawn(async move {
if let Err(e) = h2.await {
println!("GOT ERR={:?}", e);
}
});

// Spawn a task to run the conn...
tokio::spawn(h2.map_err(|e| println!("GOT ERR={:?}", e)));
let response = response.await?;
println!("GOT RESPONSE: {:?}", response);

response
.and_then(|response| {
println!("GOT RESPONSE: {:?}", response);
// Get the body
let (_, mut body) = response.into_parts();

// Get the body
let (_, body) = response.into_parts();
while let Some(chunk) = body.next().await {
println!("GOT CHUNK = {:?}", chunk?);
}

Process {
body,
trailers: false,
}
})
.map_err(|e| {
println!("GOT ERR={:?}", e);
})
});
if let Some(trailers) = poll_fn(|cx| body.poll_trailers(cx)).await {
println!("GOT TRAILERS: {:?}", trailers?);
}

tokio::run(tcp);
Ok(())
}
86 changes: 37 additions & 49 deletions examples/server.rs
Original file line number Diff line number Diff line change
@@ -1,62 +1,50 @@
#![feature(async_await)]

use h2::server;

use bytes::*;
use futures::*;
use http::*;
use http::{Response, StatusCode};

use tokio::net::TcpListener;
use tokio::net::{TcpListener, TcpStream};
use std::error::Error;

pub fn main() {
#[tokio::main]
pub async fn main() -> Result<(), Box<dyn Error>> {
let _ = env_logger::try_init();

let listener = TcpListener::bind(&"127.0.0.1:5928".parse().unwrap()).unwrap();

println!("listening on {:?}", listener.local_addr());
let mut incoming = listener.incoming();

while let Some(socket) = incoming.next().await {
tokio::spawn(async move {
if let Err(e) = handle(socket).await {
println!(" -> err={:?}", e);
}
});
}

let server = listener.incoming().for_each(move |socket| {
// let socket = io_dump::Dump::to_stdout(socket);

let connection = server::handshake(socket)
.and_then(|conn| {
println!("H2 connection bound");

conn.for_each(|(request, mut respond)| {
println!("GOT request: {:?}", request);

let response = Response::builder().status(StatusCode::OK).body(()).unwrap();

let mut send = match respond.send_response(response, false) {
Ok(send) => send,
Err(e) => {
println!(" error respond; err={:?}", e);
return Ok(());
}
};

println!(">>>> sending data");
if let Err(e) = send.send_data(Bytes::from_static(b"hello world"), true) {
println!(" -> err={:?}", e);
}

Ok(())
})
})
.and_then(|_| {
println!("~~~~~~~~~~~~~~~~~~~~~~~~~~~ H2 connection CLOSE !!!!!! ~~~~~~~~~~~");
Ok(())
})
.then(|res| {
if let Err(e) = res {
println!(" -> err={:?}", e);
}

Ok(())
});

tokio::spawn(Box::new(connection));
Ok(())
})
.map_err(|e| eprintln!("accept error: {}", e));

tokio::run(server);
Ok(())
}

async fn handle(socket: io::Result<TcpStream>) -> Result<(), Box<dyn Error>> {
let mut connection = server::handshake(socket?).await?;
println!("H2 connection bound");

while let Some(result) = connection.next().await {
let (request, mut respond) = result?;
println!("GOT request: {:?}", request);
let response = Response::builder().status(StatusCode::OK).body(()).unwrap();

let mut send = respond.send_response(response, false)?;

println!(">>>> sending data");
send.send_data(Bytes::from_static(b"hello world"), true)?;
}

println!("~~~~~~~~~~~~~~~~~~~~~~~~~~~ H2 connection CLOSE !!!!!! ~~~~~~~~~~~");

Ok(())
}
Loading