Skip to content

Commit

Permalink
Adapt to changes of API v2 that came recently in
Browse files Browse the repository at this point in the history
  • Loading branch information
lukasmittag committed Sep 4, 2024
1 parent c8a2aba commit 97f123e
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 56 deletions.
31 changes: 31 additions & 0 deletions databroker/src/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,37 @@ pub enum UpdateError {
PermissionExpired,
}

impl UpdateError {
pub fn to_status_with_code(&self, id: &i32) -> tonic::Status {
match self {
UpdateError::NotFound => tonic::Status::new(
tonic::Code::NotFound,
format!("Signal not found (id: {})", id),
),
UpdateError::WrongType => tonic::Status::new(
tonic::Code::InvalidArgument,
format!("Wrong type provided (id: {})", id),
),
UpdateError::OutOfBounds => tonic::Status::new(
tonic::Code::OutOfRange,
format!("Index out of bounds (id: {})", id),
),
UpdateError::UnsupportedType => tonic::Status::new(
tonic::Code::Unimplemented,
format!("Unsupported type (id: {})", id),
),
UpdateError::PermissionDenied => tonic::Status::new(
tonic::Code::PermissionDenied,
format!("Permission denied (id: {})", id),
),
UpdateError::PermissionExpired => tonic::Status::new(
tonic::Code::Unauthenticated,
format!("Permission expired (id: {})", id),
),
}
}
}

#[derive(Debug, Clone)]
pub enum ReadError {
NotFound,
Expand Down
110 changes: 54 additions & 56 deletions databroker/src/grpc/kuksa_val_v2/val.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,21 @@ impl proto::val_server::Val for broker::DataBroker {

let request = request.into_inner();

let signal_ids = request.signal_ids;
let size = signal_ids.len();
let signal_paths = request.signal_paths;
let size = signal_paths.len();

let mut valid_requests: HashMap<i32, HashSet<broker::Field>> = HashMap::with_capacity(size);

for signal_id in signal_ids {
for path in signal_paths {
valid_requests.insert(
match get_signal_id(Some(signal_id), &broker).await {
match get_signal(
Some(proto::SignalId {
signal: Some(proto::signal_id::Signal::Path(path)),
}),
&broker,
)
.await
{
Ok(signal_id) => signal_id,
Err(err) => return Err(err),
},
Expand All @@ -126,6 +133,25 @@ impl proto::val_server::Val for broker::DataBroker {
}
}

type SubscribeIdStream = Pin<
Box<
dyn Stream<Item = Result<proto::SubscribeResponseId, tonic::Status>>
+ Send
+ Sync
+ 'static,
>,
>;

async fn subscribe_id(
&self,
_request: tonic::Request<proto::SubscribeRequestId>,
) -> Result<tonic::Response<Self::SubscribeIdStream>, tonic::Status> {
Err(tonic::Status::new(
tonic::Code::Unimplemented,
"Unimplemented",
))
}

async fn actuate(
&self,
_request: tonic::Request<proto::ActuateRequest>,
Expand Down Expand Up @@ -278,7 +304,7 @@ impl proto::val_server::Val for broker::DataBroker {
let mut updates: HashMap<i32, broker::EntryUpdate> = HashMap::with_capacity(1);

updates.insert(
match get_signal_id(request.signal_id, &broker).await {
match get_signal(request.signal_id, &broker).await {
Ok(signal_id) => signal_id,
Err(err) => return Err(err),
},
Expand All @@ -295,18 +321,12 @@ impl proto::val_server::Val for broker::DataBroker {
);

match broker.update_entries(updates).await {
Ok(()) => Ok(tonic::Response::new(proto::PublishValueResponse {
error: None,
})),
Ok(()) => Ok(tonic::Response::new(proto::PublishValueResponse {})),
Err(errors) => {
if errors.is_empty() {
Ok(tonic::Response::new(proto::PublishValueResponse {
error: None,
}))
} else if let Some((_, err)) = errors.first() {
Ok(tonic::Response::new(proto::PublishValueResponse {
error: Some(err.into()),
}))
Ok(tonic::Response::new(proto::PublishValueResponse {}))
} else if let Some((id, err)) = errors.first() {
Err(err.to_status_with_code(id))
} else {
Err(tonic::Status::internal(
"There is no error provided for the entry",
Expand Down Expand Up @@ -573,7 +593,7 @@ async fn publish_values(
}
}

async fn get_signal_id(
async fn get_signal(
signal_id: Option<proto::SignalId>,
broker: &AuthorizedAccess<'_, '_>,
) -> Result<i32, tonic::Status> {
Expand Down Expand Up @@ -724,9 +744,7 @@ mod tests {
Ok(response) => {
// Handle the successful response
let publish_response = response.into_inner();

// Check if there is an error in the response
assert_eq!(publish_response.error, None);
assert_eq!(publish_response, proto::PublishValueResponse {})
}
Err(status) => {
// Handle the error from the publish_value function
Expand Down Expand Up @@ -799,7 +817,8 @@ mod tests {
let f = false;
let broker = DataBroker::default();
let authorized_access = broker.authorized_access(&permissions::ALLOW_ALL);
let entry_id_1 = authorized_access

let entry_id = authorized_access
.add_entry(
"test.datapoint1".to_owned(),
broker::DataType::Bool,
Expand All @@ -812,30 +831,8 @@ mod tests {
.await
.unwrap();

let entry_id_2 = authorized_access
.add_entry(
"test.datapoint2".to_owned(),
broker::DataType::Bool,
broker::ChangeType::OnChange,
broker::EntryType::Sensor,
"Test datapoint 2".to_owned(),
None,
None,
)
.await
.unwrap();

let mut request = tonic::Request::new(proto::SubscribeRequest {
signal_ids: vec![
proto::SignalId {
signal: Some(proto::signal_id::Signal::Path(
"test.datapoint1".to_string(),
)),
},
proto::SignalId {
signal: Some(proto::signal_id::Signal::Id(entry_id_2)),
},
],
signal_paths: vec!["test.datapoint1".to_string()],
});

request
Expand All @@ -849,9 +846,9 @@ mod tests {
rt.block_on(broker.subscribe(request))
});

let mut request_1 = tonic::Request::new(proto::PublishValueRequest {
let mut request = tonic::Request::new(proto::PublishValueRequest {
signal_id: Some(proto::SignalId {
signal: Some(proto::signal_id::Signal::Id(entry_id_1)),
signal: Some(proto::signal_id::Signal::Id(entry_id)),
}),
data_point: Some(proto::Datapoint {
timestamp: None,
Expand All @@ -860,43 +857,44 @@ mod tests {
})),
}),
});
request_1
request
.extensions_mut()
.insert(permissions::ALLOW_ALL.clone());
match broker.publish_value(request_1).await {
match broker.publish_value(request).await {
Ok(response) => {
// Handle the successful response
let publish_response = response.into_inner();

assert_eq!(publish_response.error, None);
// Check if there is an error in the response
assert_eq!(publish_response, proto::PublishValueResponse {});
}
Err(status) => {
// Handle the error from the publish_value function
assert!(f, "Publish failed with status: {:?}", status);
}
}

let mut request_2 = tonic::Request::new(proto::PublishValueRequest {
let mut request_false = tonic::Request::new(proto::PublishValueRequest {
signal_id: Some(proto::SignalId {
signal: Some(proto::signal_id::Signal::Id(entry_id_2)),
signal: Some(proto::signal_id::Signal::Id(entry_id)),
}),
data_point: Some(proto::Datapoint {
timestamp: None,
value_state: Some(proto::datapoint::ValueState::Value(proto::Value {
typed_value: Some(proto::value::TypedValue::Bool(true)),
typed_value: Some(proto::value::TypedValue::Bool(false)),
})),
}),
});
request_2
request_false
.extensions_mut()
.insert(permissions::ALLOW_ALL.clone());
match broker.publish_value(request_2).await {
match broker.publish_value(request_false).await {
Ok(response) => {
// Handle the successful response
let publish_response = response.into_inner();

// Check if there is an error in the response
assert_eq!(publish_response.error, None);
assert_eq!(publish_response, proto::PublishValueResponse {});
}
Err(status) => {
// Handle the error from the publish_value function
Expand Down Expand Up @@ -931,12 +929,12 @@ mod tests {
check_stream_next(&item, expected_entries.clone()).await;
expected_entries.clear();
expected_entries.insert(
"test.datapoint2".to_string(),
"test.datapoint1".to_string(),
proto::Datapoint {
timestamp: None,
value_state: Some(proto::datapoint::ValueState::Value(
proto::Value {
typed_value: Some(proto::value::TypedValue::Bool(true)),
typed_value: Some(proto::value::TypedValue::Bool(false)),
},
)),
},
Expand Down

0 comments on commit 97f123e

Please sign in to comment.