Better message serialization

This commit is contained in:
Gregory Schier
2024-02-09 05:00:48 -08:00
parent 9c35f7c85c
commit bc4ef2d9f7
7 changed files with 49 additions and 22 deletions

View File

@@ -0,0 +1,12 @@
{
"db_name": "SQLite",
"query": "\n DELETE FROM grpc_requests\n WHERE id = ?\n ",
"describe": {
"columns": [],
"parameters": {
"Right": 1
},
"nullable": []
},
"hash": "fe0652396bc30d926cf99083651c1cbd668bcf00ebe1a5f36616700c84972b39"
}

View File

@@ -143,12 +143,14 @@ pub fn message_to_json_schema(
schema.properties = Some(properties);
schema.required = Some(
message
.fields()
.map(|f| f.name().to_string())
.collect::<Vec<_>>(),
);
// All proto 3 fields are optional, so maybe we could
// make this a setting?
// schema.required = Some(
// message
// .fields()
// .map(|f| f.name().to_string())
// .collect::<Vec<_>>(),
// );
schema
}

View File

@@ -1,4 +1,4 @@
use prost_reflect::SerializeOptions;
use prost_reflect::{DynamicMessage, SerializeOptions};
use serde::{Deserialize, Serialize};
mod codec;
@@ -25,3 +25,16 @@ pub struct MethodDefinition {
pub client_streaming: bool,
pub server_streaming: bool,
}
static SERIALIZE_OPTIONS: &'static SerializeOptions = &SerializeOptions::new()
.skip_default_fields(false)
.stringify_64_bit_integers(false);
pub fn serialize_message(msg: &DynamicMessage) -> Result<String, String> {
let mut buf = Vec::new();
let mut se = serde_json::Serializer::pretty(&mut buf);
msg.serialize_with_options(&mut se, SERIALIZE_OPTIONS)
.map_err(|e| e.to_string())?;
let s = String::from_utf8(buf).expect("serde_json to emit valid utf8");
Ok(s)
}

View File

@@ -30,7 +30,7 @@ impl GrpcConnection {
service: &str,
method: &str,
message: &str,
) -> Result<String, String> {
) -> Result<DynamicMessage, String> {
let service = self.pool.get_service_by_name(service).unwrap();
let method = &service.methods().find(|m| m.name() == method).unwrap();
let input_message = method.input();
@@ -47,11 +47,11 @@ impl GrpcConnection {
let codec = DynamicCodec::new(method.clone());
client.ready().await.unwrap();
let resp = client.unary(req, path, codec).await.unwrap();
let msg = resp.into_inner();
let response_json = serde_json::to_string_pretty(&msg).expect("json to string");
Ok(response_json)
Ok(client
.unary(req, path, codec)
.await
.map_err(|e| e.to_string())?
.into_inner())
}
pub async fn streaming(

View File

@@ -45,6 +45,7 @@ pub async fn fill_pool_from_files(paths: Vec<PathBuf>) -> Result<DescriptorPool,
let parent = p.as_path().parent();
if let Some(parent_path) = parent {
cmd.arg("-I").arg(parent_path);
cmd.arg("-I").arg(parent_path.parent().unwrap());
} else {
debug!("ignoring {:?} since it does not exist.", parent)
}

View File

@@ -36,7 +36,7 @@ use tokio::time::sleep;
use window_shadows::set_shadow;
use grpc::manager::GrpcHandle;
use grpc::ServiceDefinition;
use grpc::{serialize_message, ServiceDefinition};
use window_ext::TrafficLightWindowExt;
use crate::analytics::{AnalyticsAction, AnalyticsResource};
@@ -193,7 +193,7 @@ async fn cmd_grpc_call_unary(
upsert_grpc_message(
&app_handle,
&GrpcMessage {
message: msg,
message: serialize_message(&msg)?,
workspace_id: req.workspace_id,
request_id: req.id,
connection_id: conn.clone().id,
@@ -365,7 +365,7 @@ async fn cmd_grpc_client_streaming(
.client_streaming(&service, &method, in_msg_stream)
.await
.unwrap();
let message = serde_json::to_string(&msg).unwrap();
let message = serialize_message(&msg).unwrap();
upsert_grpc_message(
&app_handle,
&GrpcMessage {
@@ -576,7 +576,7 @@ async fn cmd_grpc_streaming(
loop {
match stream.next().await {
Some(Ok(item)) => {
let item = serde_json::to_string_pretty(&item).unwrap();
let item = serialize_message(&item).unwrap();
let req = req.clone();
let conn = conn.clone();
upsert_grpc_message(
@@ -591,7 +591,6 @@ async fn cmd_grpc_streaming(
},
)
.await
.map_err(|e| e.to_string())
.unwrap();
}
Some(Err(e)) => {
@@ -727,7 +726,7 @@ async fn cmd_grpc_server_streaming(
.await?
.server_streaming(&service, &method, &req.message)
.await
.unwrap();
.expect("FAILED");
#[derive(serde::Deserialize)]
enum IncomingMsg {
@@ -768,7 +767,7 @@ async fn cmd_grpc_server_streaming(
let app_handle = app_handle.clone();
match stream.next().await {
Some(Ok(item)) => {
let item = serde_json::to_string_pretty(&item).unwrap();
let item = serialize_message(&item).unwrap();
upsert_grpc_message(
&app_handle,
&GrpcMessage {

View File

@@ -13,7 +13,7 @@ interface Props {
export const JsonAttributeTree = ({ depth = 0, attrKey, attrValue, attrKeyJsonPath }: Props) => {
attrKeyJsonPath = attrKeyJsonPath ?? `${attrKey}`;
const [isExpanded, setIsExpanded] = useState(depth === 0);
const [isExpanded, setIsExpanded] = useState(true);
const toggleExpanded = () => setIsExpanded((v) => !v);
const { isExpandable, children, label, labelClassName } = useMemo<{
@@ -78,7 +78,7 @@ export const JsonAttributeTree = ({ depth = 0, attrKey, attrValue, attrKeyJsonPa
</span>
);
return (
<div className={classNames(/*depth === 0 && '-ml-4',*/ 'font-mono text-xs')}>
<div className={classNames(/*depth === 0 && '-ml-4',*/ 'font-mono text-2xs')}>
<div className="flex items-center">
{isExpandable ? (
<button className="group relative flex items-center pl-4 w-full" onClick={toggleExpanded}>