Fix OAuth plugin transport errors and ad-hoc response persistence

This commit is contained in:
Gregory Schier
2026-02-19 13:53:06 -08:00
parent 9e177136af
commit e7d01e9050
3 changed files with 93 additions and 76 deletions

View File

@@ -24,7 +24,7 @@ use yaak_models::models::{
HttpResponseEvent, HttpResponseHeader, HttpResponseState, ProxySetting, ProxySettingAuth, HttpResponseEvent, HttpResponseHeader, HttpResponseState, ProxySetting, ProxySettingAuth,
}; };
use yaak_models::query_manager::QueryManager; use yaak_models::query_manager::QueryManager;
use yaak_models::util::UpdateSource; use yaak_models::util::{UpdateSource, generate_prefixed_id};
use yaak_plugins::events::{ use yaak_plugins::events::{
CallHttpAuthenticationRequest, HttpHeader, PluginContext, RenderPurpose, CallHttpAuthenticationRequest, HttpHeader, PluginContext, RenderPurpose,
}; };
@@ -488,11 +488,16 @@ pub async fn send_http_request<T: TemplateCallback>(
response.elapsed = 0; response.elapsed = 0;
response.elapsed_headers = 0; response.elapsed_headers = 0;
response.elapsed_dns = 0; response.elapsed_dns = 0;
response = params let persist_response = !response.request_id.is_empty();
.query_manager if persist_response {
.connect() response = params
.upsert_http_response(&response, &params.update_source, params.blob_manager) .query_manager
.map_err(SendHttpRequestError::PersistResponse)?; .connect()
.upsert_http_response(&response, &params.update_source, params.blob_manager)
.map_err(SendHttpRequestError::PersistResponse)?;
} else if response.id.is_empty() {
response.id = generate_prefixed_id("rs");
}
let (event_tx, mut event_rx) = let (event_tx, mut event_rx) =
mpsc::channel::<SenderHttpResponseEvent>(HTTP_EVENT_CHANNEL_CAPACITY); mpsc::channel::<SenderHttpResponseEvent>(HTTP_EVENT_CHANNEL_CAPACITY);
@@ -503,16 +508,18 @@ pub async fn send_http_request<T: TemplateCallback>(
let emit_events_to = params.emit_events_to.clone(); let emit_events_to = params.emit_events_to.clone();
let event_handle = tokio::spawn(async move { let event_handle = tokio::spawn(async move {
while let Some(event) = event_rx.recv().await { while let Some(event) = event_rx.recv().await {
let db_event = HttpResponseEvent::new( if persist_response {
&event_response_id, let db_event = HttpResponseEvent::new(
&event_workspace_id, &event_response_id,
event.clone().into(), &event_workspace_id,
); event.clone().into(),
if let Err(err) = event_query_manager );
.connect() if let Err(err) = event_query_manager
.upsert_http_response_event(&db_event, &event_update_source) .connect()
{ .upsert_http_response_event(&db_event, &event_update_source)
warn!("Failed to persist HTTP response event: {}", err); {
warn!("Failed to persist HTTP response event: {}", err);
}
} }
if let Some(tx) = emit_events_to.as_ref() { if let Some(tx) = emit_events_to.as_ref() {
@@ -531,15 +538,17 @@ pub async fn send_http_request<T: TemplateCallback>(
Ok(response) => response, Ok(response) => response,
Err(err) => { Err(err) => {
persist_cookie_jar(params.query_manager, cookie_jar.as_mut(), cookie_store.as_ref())?; persist_cookie_jar(params.query_manager, cookie_jar.as_mut(), cookie_store.as_ref())?;
let _ = persist_response_error( if persist_response {
params.query_manager, let _ = persist_response_error(
params.blob_manager, params.query_manager,
&params.update_source, params.blob_manager,
&response, &params.update_source,
started_at, &response,
err.to_string(), started_at,
request_started_url, err.to_string(),
); request_started_url,
);
}
if let Err(join_err) = event_handle.await { if let Err(join_err) = event_handle.await {
warn!("Failed to join response event task: {}", join_err); warn!("Failed to join response event task: {}", join_err);
} }
@@ -548,40 +557,39 @@ pub async fn send_http_request<T: TemplateCallback>(
}; };
let headers_elapsed = duration_to_i32(started_at.elapsed()); let headers_elapsed = duration_to_i32(started_at.elapsed());
response = params let connected_response = HttpResponse {
.query_manager state: HttpResponseState::Connected,
.connect() elapsed_headers: headers_elapsed,
.upsert_http_response( status: i32::from(http_response.status),
&HttpResponse { status_reason: http_response.status_reason.clone(),
state: HttpResponseState::Connected, url: http_response.url.clone(),
elapsed_headers: headers_elapsed, remote_addr: http_response.remote_addr.clone(),
status: i32::from(http_response.status), version: http_response.version.clone(),
status_reason: http_response.status_reason.clone(), headers: http_response
url: http_response.url.clone(), .headers
remote_addr: http_response.remote_addr.clone(), .iter()
version: http_response.version.clone(), .map(|(name, value)| HttpResponseHeader { name: name.clone(), value: value.clone() })
headers: http_response .collect(),
.headers request_headers: http_response
.iter() .request_headers
.map(|(name, value)| HttpResponseHeader { .iter()
name: name.clone(), .map(|(name, value)| HttpResponseHeader { name: name.clone(), value: value.clone() })
value: value.clone(), .collect(),
}) ..response
.collect(), };
request_headers: http_response if persist_response {
.request_headers response = params
.iter() .query_manager
.map(|(name, value)| HttpResponseHeader { .connect()
name: name.clone(), .upsert_http_response(
value: value.clone(), &connected_response,
}) &params.update_source,
.collect(), params.blob_manager,
..response )
}, .map_err(SendHttpRequestError::PersistResponse)?;
&params.update_source, } else {
params.blob_manager, response = connected_response;
) }
.map_err(SendHttpRequestError::PersistResponse)?;
let (response_body, body_stats) = let (response_body, body_stats) =
http_response.bytes().await.map_err(SendHttpRequestError::ReadResponseBody)?; http_response.bytes().await.map_err(SendHttpRequestError::ReadResponseBody)?;
@@ -598,23 +606,24 @@ pub async fn send_http_request<T: TemplateCallback>(
SendHttpRequestError::WriteResponseBody { path: body_path.clone(), source } SendHttpRequestError::WriteResponseBody { path: body_path.clone(), source }
})?; })?;
response = params let final_response = HttpResponse {
.query_manager body_path: Some(body_path.to_string_lossy().to_string()),
.connect() content_length: Some(usize_to_i32(response_body.len())),
.upsert_http_response( content_length_compressed: Some(u64_to_i32(body_stats.size_compressed)),
&HttpResponse { elapsed: duration_to_i32(started_at.elapsed()),
body_path: Some(body_path.to_string_lossy().to_string()), elapsed_headers: headers_elapsed,
content_length: Some(usize_to_i32(response_body.len())), state: HttpResponseState::Closed,
content_length_compressed: Some(u64_to_i32(body_stats.size_compressed)), ..response
elapsed: duration_to_i32(started_at.elapsed()), };
elapsed_headers: headers_elapsed, if persist_response {
state: HttpResponseState::Closed, response = params
..response .query_manager
}, .connect()
&params.update_source, .upsert_http_response(&final_response, &params.update_source, params.blob_manager)
params.blob_manager, .map_err(SendHttpRequestError::PersistResponse)?;
) } else {
.map_err(SendHttpRequestError::PersistResponse)?; response = final_response;
}
if let Err(join_err) = event_handle.await { if let Err(join_err) = event_handle.await {
warn!("Failed to join response event task: {}", join_err); warn!("Failed to join response event task: {}", join_err);

View File

@@ -61,6 +61,10 @@ export async function fetchAccessToken(
console.log('[oauth2] Got access token response', resp.status); console.log('[oauth2] Got access token response', resp.status);
if (resp.error) {
throw new Error(`Failed to fetch access token: ${resp.error}`);
}
const body = resp.bodyPath ? readFileSync(resp.bodyPath, 'utf8') : ''; const body = resp.bodyPath ? readFileSync(resp.bodyPath, 'utf8') : '';
if (resp.status < 200 || resp.status >= 300) { if (resp.status < 200 || resp.status >= 300) {

View File

@@ -71,6 +71,10 @@ export async function getOrRefreshAccessToken(
httpRequest.authenticationType = 'none'; // Don't inherit workspace auth httpRequest.authenticationType = 'none'; // Don't inherit workspace auth
const resp = await ctx.httpRequest.send({ httpRequest }); const resp = await ctx.httpRequest.send({ httpRequest });
if (resp.error) {
throw new Error(`Failed to refresh access token: ${resp.error}`);
}
if (resp.status >= 400 && resp.status < 500) { if (resp.status >= 400 && resp.status < 500) {
// Client errors (4xx) indicate the refresh token is invalid, expired, or revoked // Client errors (4xx) indicate the refresh token is invalid, expired, or revoked
// Delete the token and return null to trigger a fresh authorization flow // Delete the token and return null to trigger a fresh authorization flow