diff --git a/src/main.rs b/src/main.rs index 3901e1f..c84217f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -100,62 +100,67 @@ fn read_cache( }) } + #[actix_web::post("/{chain}")] async fn rpc_call( - path: web::Path<(String,)>, + path: web::Path<(String, )>, data: web::Data, body: web::Json, ) -> Result { - let (chain,) = path.into_inner(); + let (chain, ) = path.into_inner(); let chain_state = data .chains .get(&chain.to_uppercase()) .ok_or_else(|| error::ErrorNotFound("endpoint not supported"))?; - let requests = if let Some(requests) = body.as_array() { - requests.to_vec() - } else { - vec![body.0] + let requests = match body { + web::Json(Value::Array(requests)) => requests, + web::Json(Value::Object(obj)) => vec![Value::Object(obj)], + _ => return Err(error::ErrorBadRequest("invalid request body")), }; let mut request_result = HashMap::new(); let mut uncached_requests = HashMap::new(); - let mut ordered_id = vec![]; + let mut ids_in_original_order = vec![]; let mut redis_con = data.redis.get().map_err(|err| { log::error!("fail to get redis connection because: {}", err); error::ErrorInternalServerError("fail to get redis connection") })?; - for request in &requests { - let id = request["id"] - .as_u64() - .ok_or_else(|| error::ErrorBadRequest("id not found"))?; - let method = request["method"] - .as_str() - .ok_or_else(|| error::ErrorBadRequest("method not found"))?; - let params = &request["params"]; + for mut request in requests { + let id = match request["id"].take() { + Value::Number(n) if n.as_u64().is_some() => n.as_u64().unwrap(), + _ => return Err(error::ErrorBadRequest("invalid id")), + }; + + let method = match request["method"].take() { + Value::String(s) => s, + _ => return Err(error::ErrorBadRequest("invalid method")), + }; - ordered_id.push(id); + let params = request["params"].take(); - let cache_entry = match chain_state.cache_entries.get(method) { + ids_in_original_order.push(id); + + let cache_entry = match chain_state.cache_entries.get(&method) { Some(cache_entry) => cache_entry, None => { - uncached_requests.insert(id, (method.to_string(), params.clone(), None)); + uncached_requests.insert(id, (method, params, None)); continue; } }; - let result = read_cache(&mut redis_con, cache_entry.handler.as_ref(), method, params); + let result = read_cache(&mut redis_con, cache_entry.handler.as_ref(), &method, ¶ms); match result { Err(err) => { log::error!("fail to read cache because: {}", err); - uncached_requests.insert(id, (method.to_string(), params.clone(), None)); + uncached_requests.insert(id, (method, params, None)); } Ok(CacheStatus::NotAvailable) => { log::info!("cache not available for method {}", method); - uncached_requests.insert(id, (method.to_string(), params.clone(), None)); + uncached_requests.insert(id, (method, params, None)); } Ok(CacheStatus::Cached(cache_key, value)) => { log::info!("cache hit for method {} with key {}", method, cache_key); @@ -163,7 +168,7 @@ async fn rpc_call( } Ok(CacheStatus::Missed(cache_key)) => { log::info!("cache missed for method {} with key {}", method, cache_key); - uncached_requests.insert(id, (method.to_string(), params.clone(), Some(cache_key))); + uncached_requests.insert(id, (method, params, Some(cache_key))); } } } @@ -188,30 +193,33 @@ async fn rpc_call( chain_state.rpc_url.clone(), &request_body, ) - .await - .map_err(|err| { - log::error!("fail to make rpc request because: {}", err); - error::ErrorInternalServerError(format!("fail to make rpc request because: {}", err)) - })?; - - let rpc_result = rpc_result.as_array().ok_or_else(|| { - log::error!("invalid rpc response: {}", rpc_result.to_string()); - error::ErrorInternalServerError("invalid rpc response") - })?; - - for response in rpc_result { + .await + .map_err(|err| { + log::error!("fail to make rpc request because: {}", err); + error::ErrorInternalServerError(format!("fail to make rpc request because: {}", err)) + })?; + + let result_values = match rpc_result { + Value::Array(v) => v, + _ => { + log::error!("array is expected but we got invalid rpc response: {},", rpc_result.to_string()); + return Err(error::ErrorInternalServerError("invalid rpc response")); + } + }; + + for mut response in result_values { let id = response["id"] .as_u64() .ok_or_else(|| error::ErrorBadRequest("id not found"))?; let (method, _params, cache_key) = uncached_requests.get(&id).unwrap(); - let error = &response["error"]; + let error = response["error"].take(); if !error.is_null() { request_result.insert(id, ResultOrError::Error(error.clone())); continue; } - let result = &response["result"]; + let result = response["result"].take(); request_result.insert(id, ResultOrError::Result(result.clone())); let cache_key = match cache_key { @@ -223,7 +231,7 @@ async fn rpc_call( let (can_cache, extracted_value) = cache_entry .handler - .extract_cache_value(result) + .extract_cache_value(&result) .expect("fail to extract cache value"); if can_cache { @@ -233,19 +241,19 @@ async fn rpc_call( } } - let response = ordered_id + let response = ids_in_original_order .iter() .map(|id| { let result = request_result - .get(id) + .get_mut(id) .unwrap_or_else(|| panic!("result for id {} not found", id)); match result { ResultOrError::Error(error) => { - json!({ "jsonrpc": "2.0", "id": id, "error": error }) + json!({ "jsonrpc": "2.0", "id": id, "error": error.take() }) } ResultOrError::Result(result) => { - json!({ "jsonrpc": "2.0", "id": id, "result": result }) + json!({ "jsonrpc": "2.0", "id": id, "result": result.take() }) } } })