mirror of
https://github.com/mountain-loop/yaak.git
synced 2026-05-17 21:27:16 +02:00
Cargo fmt
This commit is contained in:
@@ -79,10 +79,9 @@ where
|
||||
let len = data.len();
|
||||
self.bytes_count += len as u64;
|
||||
self.chunks.push(data.clone());
|
||||
let _ = self.event_tx.send(ProxyEvent::ResponseBodyChunk {
|
||||
id: self.request_id,
|
||||
bytes: len,
|
||||
});
|
||||
let _ = self
|
||||
.event_tx
|
||||
.send(ProxyEvent::ResponseBodyChunk { id: self.request_id, bytes: len });
|
||||
}
|
||||
Poll::Ready(Some(Ok(frame)))
|
||||
}
|
||||
|
||||
@@ -18,23 +18,14 @@ impl CertificateAuthority {
|
||||
params.is_ca = IsCa::Ca(BasicConstraints::Unconstrained);
|
||||
params.key_usages.push(KeyUsagePurpose::KeyCertSign);
|
||||
params.key_usages.push(KeyUsagePurpose::CrlSign);
|
||||
params
|
||||
.distinguished_name
|
||||
.push(rcgen::DnType::CommonName, "Debug Proxy CA");
|
||||
params
|
||||
.distinguished_name
|
||||
.push(rcgen::DnType::OrganizationName, "Debug Proxy");
|
||||
params.distinguished_name.push(rcgen::DnType::CommonName, "Debug Proxy CA");
|
||||
params.distinguished_name.push(rcgen::DnType::OrganizationName, "Debug Proxy");
|
||||
|
||||
let key = KeyPair::generate()?;
|
||||
let ca_cert = params.self_signed(&key)?;
|
||||
let ca_cert_der = ca_cert.der().clone();
|
||||
|
||||
Ok(Self {
|
||||
ca_cert,
|
||||
ca_cert_der,
|
||||
ca_key: key,
|
||||
cache: Mutex::new(HashMap::new()),
|
||||
})
|
||||
Ok(Self { ca_cert, ca_cert_der, ca_key: key, cache: Mutex::new(HashMap::new()) })
|
||||
}
|
||||
|
||||
pub fn ca_pem(&self) -> String {
|
||||
@@ -53,9 +44,7 @@ impl CertificateAuthority {
|
||||
}
|
||||
|
||||
let mut params = CertificateParams::new(vec![domain.to_string()])?;
|
||||
params
|
||||
.distinguished_name
|
||||
.push(rcgen::DnType::CommonName, domain);
|
||||
params.distinguished_name.push(rcgen::DnType::CommonName, domain);
|
||||
|
||||
let leaf_key = KeyPair::generate()?;
|
||||
let leaf_cert = params.signed_by(&leaf_key, &self.ca_cert, &self.ca_key)?;
|
||||
@@ -63,20 +52,18 @@ impl CertificateAuthority {
|
||||
let cert_der = leaf_cert.der().clone();
|
||||
let key_der = leaf_key.serialize_der();
|
||||
|
||||
let mut config = ServerConfig::builder_with_provider(Arc::new(rustls::crypto::ring::default_provider()))
|
||||
.with_safe_default_protocol_versions()?
|
||||
.with_no_client_auth()
|
||||
.with_single_cert(
|
||||
vec![cert_der, self.ca_cert_der.clone()],
|
||||
PrivateKeyDer::Pkcs8(PrivatePkcs8KeyDer::from(key_der)),
|
||||
)?;
|
||||
let mut config =
|
||||
ServerConfig::builder_with_provider(Arc::new(rustls::crypto::ring::default_provider()))
|
||||
.with_safe_default_protocol_versions()?
|
||||
.with_no_client_auth()
|
||||
.with_single_cert(
|
||||
vec![cert_der, self.ca_cert_der.clone()],
|
||||
PrivateKeyDer::Pkcs8(PrivatePkcs8KeyDer::from(key_der)),
|
||||
)?;
|
||||
config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()];
|
||||
|
||||
let config = Arc::new(config);
|
||||
self.cache
|
||||
.lock()
|
||||
.unwrap()
|
||||
.insert(domain.to_string(), config.clone());
|
||||
self.cache.lock().unwrap().insert(domain.to_string(), config.clone());
|
||||
Ok(config)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use std::sync::mpsc as std_mpsc;
|
||||
use std::sync::Arc;
|
||||
use std::sync::mpsc as std_mpsc;
|
||||
|
||||
use hyper::server::conn::http1;
|
||||
use hyper::service::service_fn;
|
||||
|
||||
@@ -4,9 +4,9 @@ mod connection;
|
||||
mod request;
|
||||
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicU64;
|
||||
use std::sync::mpsc as std_mpsc;
|
||||
use std::sync::Arc;
|
||||
|
||||
use cert::CertificateAuthority;
|
||||
use tokio::net::TcpListener;
|
||||
@@ -27,7 +27,11 @@ pub enum ProxyEvent {
|
||||
http_version: String,
|
||||
},
|
||||
/// A request header sent to the upstream server.
|
||||
RequestHeader { id: u64, name: String, value: String },
|
||||
RequestHeader {
|
||||
id: u64,
|
||||
name: String,
|
||||
value: String,
|
||||
},
|
||||
/// The full request body (buffered before forwarding).
|
||||
RequestBody { id: u64, body: Vec<u8> },
|
||||
/// Response headers received from upstream.
|
||||
@@ -38,7 +42,11 @@ pub enum ProxyEvent {
|
||||
elapsed_ms: u64,
|
||||
},
|
||||
/// A response header received from the upstream server.
|
||||
ResponseHeader { id: u64, name: String, value: String },
|
||||
ResponseHeader {
|
||||
id: u64,
|
||||
name: String,
|
||||
value: String,
|
||||
},
|
||||
/// A chunk of the response body was received (emitted per-frame).
|
||||
ResponseBodyChunk { id: u64, bytes: usize },
|
||||
/// The response body stream has completed.
|
||||
|
||||
@@ -63,10 +63,7 @@ fn emit_request_events(
|
||||
});
|
||||
}
|
||||
if let Some(body) = body {
|
||||
let _ = tx.send(ProxyEvent::RequestBody {
|
||||
id,
|
||||
body: body.clone(),
|
||||
});
|
||||
let _ = tx.send(ProxyEvent::RequestBody { id, body: body.clone() });
|
||||
}
|
||||
}
|
||||
|
||||
@@ -123,22 +120,13 @@ async fn handle_http(
|
||||
let http_version = version_str(req.version());
|
||||
let start = Instant::now();
|
||||
|
||||
let _ = event_tx.send(ProxyEvent::RequestStart {
|
||||
id,
|
||||
method,
|
||||
url: uri.clone(),
|
||||
http_version,
|
||||
});
|
||||
let _ = event_tx.send(ProxyEvent::RequestStart { id, method, url: uri.clone(), http_version });
|
||||
|
||||
let client: Client<_, Full<Bytes>> = Client::builder(TokioExecutor::new()).build_http();
|
||||
|
||||
let (parts, body) = req.into_parts();
|
||||
let body_bytes = body.collect().await?.to_bytes();
|
||||
let request_body = if body_bytes.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(body_bytes.to_vec())
|
||||
};
|
||||
let request_body = if body_bytes.is_empty() { None } else { Some(body_bytes.to_vec()) };
|
||||
emit_request_events(&event_tx, id, &parts.headers, &request_body);
|
||||
|
||||
let outgoing_req = Request::from_parts(parts, Full::new(body_bytes));
|
||||
@@ -148,16 +136,10 @@ async fn handle_http(
|
||||
emit_response_events(&event_tx, id, &resp, &start);
|
||||
|
||||
let (parts, body) = resp.into_parts();
|
||||
Ok(Response::from_parts(
|
||||
parts,
|
||||
measured_incoming(body, id, start, event_tx),
|
||||
))
|
||||
Ok(Response::from_parts(parts, measured_incoming(body, id, start, event_tx)))
|
||||
}
|
||||
Err(e) => {
|
||||
let _ = event_tx.send(ProxyEvent::Error {
|
||||
id,
|
||||
error: e.to_string(),
|
||||
});
|
||||
let _ = event_tx.send(ProxyEvent::Error { id, error: e.to_string() });
|
||||
Err(Box::new(e) as Box<dyn std::error::Error + Send + Sync>)
|
||||
}
|
||||
}
|
||||
@@ -168,11 +150,7 @@ async fn handle_connect(
|
||||
event_tx: std_mpsc::Sender<ProxyEvent>,
|
||||
ca: Arc<CertificateAuthority>,
|
||||
) -> Result<Response<BoxBody>, Box<dyn std::error::Error + Send + Sync>> {
|
||||
let authority = req
|
||||
.uri()
|
||||
.authority()
|
||||
.map(|a| a.to_string())
|
||||
.unwrap_or_default();
|
||||
let authority = req.uri().authority().map(|a| a.to_string()).unwrap_or_default();
|
||||
let (host, port) = parse_host_port(&authority);
|
||||
|
||||
let server_config = ca.server_config(&host)?;
|
||||
@@ -189,10 +167,7 @@ async fn handle_connect(
|
||||
}
|
||||
};
|
||||
|
||||
let tls_stream = match acceptor
|
||||
.accept(hyper_util::rt::TokioIo::new(upgraded))
|
||||
.await
|
||||
{
|
||||
let tls_stream = match acceptor.accept(hyper_util::rt::TokioIo::new(upgraded)).await {
|
||||
Ok(s) => s,
|
||||
Err(e) => {
|
||||
eprintln!("TLS accept failed for {host}: {e}");
|
||||
@@ -203,10 +178,7 @@ async fn handle_connect(
|
||||
let tx = event_tx.clone();
|
||||
let host_for_requests = host.clone();
|
||||
let mut builder = auto::Builder::new(TokioExecutor::new());
|
||||
builder
|
||||
.http1()
|
||||
.preserve_header_case(true)
|
||||
.title_case_headers(true);
|
||||
builder.http1().preserve_header_case(true).title_case_headers(true);
|
||||
if let Err(e) = builder
|
||||
.serve_connection_with_upgrades(
|
||||
hyper_util::rt::TokioIo::new(tls_stream),
|
||||
@@ -271,20 +243,12 @@ async fn forward_https(
|
||||
let id = REQUEST_ID.fetch_add(1, Ordering::Relaxed);
|
||||
let method = req.method().to_string();
|
||||
let http_version = version_str(req.version());
|
||||
let path = req
|
||||
.uri()
|
||||
.path_and_query()
|
||||
.map(|pq| pq.to_string())
|
||||
.unwrap_or_else(|| "/".into());
|
||||
let path = req.uri().path_and_query().map(|pq| pq.to_string()).unwrap_or_else(|| "/".into());
|
||||
let uri_str = format!("https://{host}{path}");
|
||||
let start = Instant::now();
|
||||
|
||||
let _ = event_tx.send(ProxyEvent::RequestStart {
|
||||
id,
|
||||
method,
|
||||
url: uri_str.clone(),
|
||||
http_version,
|
||||
});
|
||||
let _ =
|
||||
event_tx.send(ProxyEvent::RequestStart { id, method, url: uri_str.clone(), http_version });
|
||||
|
||||
// Connect to upstream with TLS
|
||||
let tcp_stream = TcpStream::connect(target_addr).await?;
|
||||
@@ -305,18 +269,13 @@ async fn forward_https(
|
||||
let server_name = ServerName::try_from(host.to_string())?;
|
||||
let tls_stream = connector.connect(server_name, tcp_stream).await?;
|
||||
|
||||
let negotiated_h2 = tls_stream
|
||||
.get_ref()
|
||||
.1
|
||||
.alpn_protocol()
|
||||
.map_or(false, |p| p == b"h2");
|
||||
let negotiated_h2 = tls_stream.get_ref().1.alpn_protocol().map_or(false, |p| p == b"h2");
|
||||
|
||||
let io = hyper_util::rt::TokioIo::new(tls_stream);
|
||||
|
||||
let mut sender = if negotiated_h2 {
|
||||
let (sender, conn) = hyper::client::conn::http2::Builder::new(TokioExecutor::new())
|
||||
.handshake(io)
|
||||
.await?;
|
||||
let (sender, conn) =
|
||||
hyper::client::conn::http2::Builder::new(TokioExecutor::new()).handshake(io).await?;
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = conn.await {
|
||||
eprintln!("Upstream h2 connection error: {e}");
|
||||
@@ -340,11 +299,7 @@ async fn forward_https(
|
||||
// Capture request metadata
|
||||
let (mut parts, body) = req.into_parts();
|
||||
let body_bytes = body.collect().await?.to_bytes();
|
||||
let request_body = if body_bytes.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(body_bytes.to_vec())
|
||||
};
|
||||
let request_body = if body_bytes.is_empty() { None } else { Some(body_bytes.to_vec()) };
|
||||
emit_request_events(&event_tx, id, &parts.headers, &request_body);
|
||||
|
||||
if negotiated_h2 {
|
||||
@@ -365,16 +320,10 @@ async fn forward_https(
|
||||
emit_response_events(&event_tx, id, &resp, &start);
|
||||
|
||||
let (parts, body) = resp.into_parts();
|
||||
Ok(Response::from_parts(
|
||||
parts,
|
||||
measured_incoming(body, id, start, event_tx),
|
||||
))
|
||||
Ok(Response::from_parts(parts, measured_incoming(body, id, start, event_tx)))
|
||||
}
|
||||
Err(e) => {
|
||||
let _ = event_tx.send(ProxyEvent::Error {
|
||||
id,
|
||||
error: e.to_string(),
|
||||
});
|
||||
let _ = event_tx.send(ProxyEvent::Error { id, error: e.to_string() });
|
||||
Err(Box::new(e) as Box<dyn std::error::Error + Send + Sync>)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user