Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add continuous subscription to subscribe method at a specific frequency #734

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

rafaeling
Copy link
Contributor

@rafaeling rafaeling commented Feb 8, 2024

This PR solves #652

How can be tested?

  • Use this test.json file as the VSS input for the broker. It contains the necessary overlays modified to test it with the ChangeType values Continuous and OnChange to verify the expected behavior
    test.json

  • You can subscribe with databroker-clie to the signals Vehicle.Driver.HeartRate (Continuous update) and Vehicle.Body.Trunk.Front.IsLightOn (OnChange update).

  • Kuksa-client seems to have some bugs that need testing for this new feature.

What to expect?

  • The value for Vehicle.Driver.HeartRate should be set, as well as for Vehicle.Body.Trunk.Front.IsLightOn.
  • Subscribe to both paths, and the terminal should display the Continuous values every second and OnChange values only when updated.

Kuksa client to test it
eclipse-kuksa/kuksa-python-sdk#21

@rafaeling rafaeling force-pushed the add_continuous_subscription branch 2 times, most recently from 43bc1b0 to d45ba30 Compare February 13, 2024 11:12
@erikbosch
Copy link
Contributor

Are you happy with the content of this PR now @argerus ? I.e. is it ready for merge?

@argerus
Copy link
Contributor

argerus commented Feb 26, 2024

Are you happy with the content of this PR now @argerus ? I.e. is it ready for merge?

Probably not. As is currently stands, this PR sets a hard coded update frequency of 1 second for everything, with no way to set it to anything else.

This combined with the fact that we don't include a VSS db with the change types configured, and every signal now defaults to Continuous is probably not the behaviour we want. I do have a version of the VSS db with all change types configured, so perhaps that will make the out of the box behaviour better (if we merge that first).

I still think this PR should include the ability to change the update frequency through the API, otherwise I'm not sure much is gained by merging it.

@rafaeling
Copy link
Contributor Author

Will then add the frequency parameter to the subscribe request on this PR, was thinking to split it into different PRs but makes no sense.

@rafaeling rafaeling force-pushed the add_continuous_subscription branch from 3417778 to 960e057 Compare March 6, 2024 15:45
@rafaeling rafaeling changed the title Add continuous subscription to subscribe method Add continuous subscription to subscribe method at a specific frequency Mar 6, 2024
@@ -921,6 +961,117 @@ impl QuerySubscription {
}
}

impl ContinuousSubscription {
async fn notify(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that this is very similar to the ChangeSubscription notify. Couldn't it be reused somehow? What I mean is just use different subscribe function but maybe one notify? If this is not doable let me know :)

Copy link
Contributor Author

@rafaeling rafaeling Mar 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, let me see if I can use traits -> https://doc.rust-lang.org/book/ch10-02-traits.html

@@ -98,6 +98,7 @@ message SubscribeEntry {
// Subscribe to changes in datapoints.
message SubscribeRequest {
repeated SubscribeEntry entries = 1;
optional uint64 frequency_hertz = 2;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some thoughts

  • Do we see Hertz and uint64 as a reasonable representation. That makes it impossible to support intervals longer than 1 second. An alternative solution could be to for example use interval_ms, i.e. wanted interval in milliseconds. That would allow us to represent both 100 Hz as well as 0.1 Hz
  • Should we possible already in this file describe semantics? That would be good if we later intend to promote this API to a "standardized" API. I.e. document topics like
    • What is default if this field/member is not given? Max frequency? On change? Other?
    • What shall happen if 0 as given as frequency? Does that imply "on change" or does it just mean "as fast as possible, changed or not"
    • What is the semantic meaning of specifying frequency - is it getting latest value at that frequency regardless of whether the value has been updated (new timestamp) or changed (changed value)
    • What shall happen if frequency/interval cannot be supported. Like if you request 2000 Hz - should you get an error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have completely missed that frequency could range from 0 to 1 as well, so uint64 is totally wrong, better to use float type (will push the fix). I would still consider to use frequency hertz representation better than interval_ms since hertz is also used by VHAL interface for Continuous properties -> https://source.android.com/docs/automotive/vhal/vhal-interface

Regarding your questions:

  1. What is default if this field/member is not given? Max frequency? On change? Other?
  • The initial default behaviour implemented works as follow:

        1. if the Subscribe request contains any continuous signal without a frequency parameter it returns an InvalidArgument error.
        2. Same in case the request doesn't contain any continuous signal and a frequency parameter is given it must return an InvalidArgument error.

  1. What shall happen if 0 as given as frequency? Does that imply "on change" or does it just mean "as fast as possible, changed or not"
  • No behaviour specified, I would suggest to return an InvalidArgument error as well since it would mean to divide some number by 0. What do you think?
  1. What is the semantic meaning of specifying frequency - is it getting latest value at that frequency regardless of whether the value has been updated (new timestamp) or changed (changed value)
  • The semantic meaning is to get latest value at that frequency regardless of whether the value has been updated or changed with its latest update or change timestamp.
  1. What shall happen if frequency/interval cannot be supported. Like if you request 2000 Hz - should you get an error?
  • We should discuss this again, we can suggest the customer to specific its own frequency limit rate at broker initialization (should to be implemented) depending on its hardware capabilities.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it could be good to have a configurable "max frequency", similar to the "max number of signals" topic we discussed as part of the Audit findings. Then one could possible let the frequency 0 mean "as fast as possible" rather than considering it as an illegal value.

Anyway, maybe we should document this with comments so we could generate documentation with frameworks like https://github.com/pseudomuto/protoc-gen-doc so that expected behavior is clearly defined

Copy link

@BjoernAtBosch BjoernAtBosch Apr 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Findings:

  • Regardless, if we use "frequency" or "interval": It should be already clear from the parameter name, what it means. Currently, I cannot see (without reading the documentation) if it is minimum, typical, or maximum frequency/interval.
  • Don't we add detailed documentation about the behaviour in the proto files? I agree with Erik: We should also describe the semantics in the interface definition. Imho, the "outside visible behaviour" always belongs to the contract description of an interface definition.
  • In the current definition this parameter is related to the overall subscription, but not to certain signals - which is fine for me if the parameter defines the max update frequency. But the behaviour should be clearly defined if the the subscription comprises signals with different update frequencies.
  • What is the behaviour, if signals are updated faster than the max frequency requested by the client:
    • Does it get "just" some sample value at the moment the update interval is reached,
    • do we send some mean value of the last samples (how would that be calculated?),
    • do we accumulated values and send them as a batch,
    • ...?

If we are unsure about the behaviour to be achieved by this PR, we should rethink merging it ...

while !local_subscription.sender.is_closed() {
let _ = local_subscription.notify(None).await;
// Simulate some asynchronous work
tokio::time::sleep(Duration::from_millis(1 / frequency * 1000)).await;
Copy link
Contributor

@erikbosch erikbosch Mar 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does Rust manage type conversion? from_millis expects a u64. You use 1 and 1000, will it really be a float value? Or do you rather need something like:

Duration::from_millis((1000.0/ frequency).round() as u64)

(I actually do not know what resolution/performance we can expect from the system, like if we request 1000 Hz, will we really receive signals with 1000 Hz?

I have also no experience from Rust unit-tests, here a unit test would be a good way to prove that duration becomes as expected , like that 600 Hz gives 2 ms duration

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be really interesting e.g. how fast is databroker able to notify subscribers. Does it change if we have 100 subscribers at the same time. What is the limit e.g. what Erik wrote. But I guess not that much related to this PR.

Copy link
Contributor Author

@rafaeling rafaeling Mar 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've used your line code to fix it. I will also try to implement some unit tests to test it.
We could also open some performance report topic to measure its capabilities.

@rafaeling
Copy link
Contributor Author

After community call was agreed to avoid changes in VSS and to add the "ChangeType" directly to the subscription request implicitly with the optional frequency parameter.

@boschglobal boschglobal closed this by deleting the head repository Mar 21, 2024
@erikbosch
Copy link
Contributor

Reopening all PRs closed by accident by boschglobal maintenance

Copy link

@BjoernAtBosch BjoernAtBosch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just looked at the proto changes. (My Rust knowledge is too poor to rate the rest, currently)

@@ -98,6 +98,7 @@ message SubscribeEntry {
// Subscribe to changes in datapoints.
message SubscribeRequest {
repeated SubscribeEntry entries = 1;
optional uint64 frequency_hertz = 2;
Copy link

@BjoernAtBosch BjoernAtBosch Apr 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Findings:

  • Regardless, if we use "frequency" or "interval": It should be already clear from the parameter name, what it means. Currently, I cannot see (without reading the documentation) if it is minimum, typical, or maximum frequency/interval.
  • Don't we add detailed documentation about the behaviour in the proto files? I agree with Erik: We should also describe the semantics in the interface definition. Imho, the "outside visible behaviour" always belongs to the contract description of an interface definition.
  • In the current definition this parameter is related to the overall subscription, but not to certain signals - which is fine for me if the parameter defines the max update frequency. But the behaviour should be clearly defined if the the subscription comprises signals with different update frequencies.
  • What is the behaviour, if signals are updated faster than the max frequency requested by the client:
    • Does it get "just" some sample value at the moment the update interval is reached,
    • do we send some mean value of the last samples (how would that be calculated?),
    • do we accumulated values and send them as a batch,
    • ...?

If we are unsure about the behaviour to be achieved by this PR, we should rethink merging it ...

@erikbosch
Copy link
Contributor

Databroker has been migrated to https://github.com/eclipse-kuksa/kuksa-databroker.
Please open a new pull request in that repo.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants