Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add three new ZMQ publishers for TP results / Retrieve TP results #2230

Open
wants to merge 7 commits into
base: main
Choose a base branch
from

Conversation

MichaelHuth
Copy link
Collaborator

Publishers added:
testpulse results live
testpulse results 1s update
testpulse results 5s update
testpulse results 10s update

The TP results from TP analysis are packed in a JSON in TP_TSAnalysis and published to ZMQ.
The time information when the next publishing is due is stored in the TUFXOP.

  • PUB_Publish was changed to be threadsafe and added an optional argument to control if the JSON should be released.

close #2157

@MichaelHuth MichaelHuth self-assigned this Aug 15, 2024
@MichaelHuth MichaelHuth force-pushed the feature/2230-Add_zeromq_pub_filters_for_TP branch 4 times, most recently from 33d4401 to 716f021 Compare August 20, 2024 17:09
@MichaelHuth
Copy link
Collaborator Author

@t-b Feedback request for general design improvements.

@t-b
Copy link
Collaborator

t-b commented Aug 20, 2024

General approach looks good.

Two comments:

  • PUB_AddTPResultEntry: We only want a value object if it has a unit. See the other publishing functions for an example. The publishing docu can be generated with FetchAndParseMessage and OUTPUT_DOCUMENTATION_JSON_DUMP.

  • PUB_CheckPublishingTime this should use TSD_ReadVar/TSD_WriteVar.

@MichaelHuth MichaelHuth force-pushed the feature/2230-Add_zeromq_pub_filters_for_TP branch 2 times, most recently from 96f6592 to 9c33299 Compare August 22, 2024 11:51
@MichaelHuth MichaelHuth assigned t-b and unassigned MichaelHuth Aug 22, 2024
@t-b
Copy link
Collaborator

t-b commented Aug 22, 2024

Git review:

ef3f1d2 (Util: Add two conversion function for DA/AD unit string return, 2024-08-22)

Good idea!

  • s/in clampmode/on clampmode/

  • Can we tweak them to also accept a channel typ parameter (XopChannelConstants) so that they also return the DA units and use these functions in GetChanAmpAssignUnit? Use it GetChanAmpAssignUnit. In that way we only have a single place where we define the units depending on the clamp mode.

27dac75 (TP: Add more information that is transferred to the TP analysis thread, 2024-08-22)

Nice!

6368bbb (PUB: Add four zeromq publishers for TP data, 2024-08-22)

  • s/jst/just/ in the commit message.

  • Can you move the ZMQ_FILTER_* constants close to AUTO_TP_FILTER? And their names need to be something like "testpulse:", this is done so that people can easily subscribe to all messages from a certain area. See ZMQ_SUBSCRIBE in https://libzmq.readthedocs.io/en/latest/zmq_setsockopt.html. Maybe that needs a comment though.

  • PUB_Publish changes can be their own commit.

  • The test TestTPPublishing fails here with:

•runwithOpts(testcase = "TestTPPublishing")
  Start of test "MIES with HardwareBasic"
  Entering test suite "UTF_TestPulseAndTPDuringDAQ.ipf"
  Entering test case "TestPulseAndTPDuringDAQ#TestTPPublishing:ITC"
  Entering reentry "TestPulseAndTPDuringDAQ#TestTPPublishing_REENTRY"
  String mismatch (case sensitive):
str1: 0:0:0> h e a r t b e a t
str2: 0:0:0> t e s t p u l s e   r
: is false. Assertion "CHECK_EQUAL_STR(filter, expectedFilter)" failed in TestPulseAndTPDuringDAQ#TestTPPublishing_REENTRY➔FetchPublishedMessage (UTF_HelperFunctions.ipf, line 1426➔228)
  Encountered "AbortOnValue" Code 10009 in test case "TestPulseAndTPDuringDAQ#TestTPPublishing" (UTF_TestPulseAndTPDuringDAQ.ipf)
  Leaving test case "TestPulseAndTPDuringDAQ#TestTPPublishing:ITC"
  Failed with 2 errors
  Leaving test suite "UTF_TestPulseAndTPDuringDAQ.ipf"
  Test finished with 2 errors
    ▶ Assertion "CHECK_EQUAL_STR(filter, expectedFilter)" failed in TestPulseAndTPDuringDAQ#TestTPPublishing_REENTRY➔FetchPublishedMessage (UTF_HelperFunctions.ipf, line 1426➔228)
    ▶ Encountered "AbortOnValue" Code 10009 in test case "TestPulseAndTPDuringDAQ#TestTPPublishing" (UTF_TestPulseAndTPDuringDAQ.ipf)
  End of test "MIES with HardwareBasic"

The problem is likely that FetchPublishedMessage only works with a single filter. So we might need to teach FetchPublishedMessage to check fo multiple filters and return then multiple messages.

  • Can we check the exact numeric values of the published data (with some tolerance) instead of using IsInteger or != NaN? The data is in the labnotebook or TPStorage (and really only these two).

  • Test in UTF_ZeroMQPublishing.ipf missing.

  • This test is also required to fill in the documentation for PUB_TPResult like for the other PUB_XXX functions.

  • Can we name the parameters in TP_PrepareAnalysisDF, so that the code in TP_TSAnalysis is easier to grasp? See NWB_ASYNC_SerializeStruct/NWB_ASYNC_DeserializeStruct.

  • GetTPResultAsyncBuffer: numCols should only be calculated if required. The normal case it that the wave exists with the correct size.

  • Can we NaN the wave in the upgrade case? Just to be sure that we don't have bogus data lying around?

  • The wave upgrade cleanup in GetTPStorage is welcome, but should be its own commit.

  • GetTPStorage: The number of layers is different in upgrade compared to creation.

  • GetTPStorage: Are the label names in layer 31 to 39 from TP_ANALYSIS_DATA_LABELS? If yes we need a comment to say so.

9c33299 (TP: Added two functions that allow to retrieve info about TPs, 2024-08-22)

commit message:

  • s/both function/both functions/

  • s/meta information/metadata/

  • Should be its own commit:

the former function TP_GetStoredTPs was renamed to TP_GetConsecutiveTPsUptoMarker
to prevent misleading naming.

  • The code to recreate the DAC data should be its own function and not duplicated.

  • TP_GetStoredTPsFromCycle/TP_GetStoredTP: I would expect the returned metadata (tpresult, or the waves in tpsResult) to be 1D.

  • Why do both functions not require a headstage parameter? If I understand the code in SCOPE_UpdateOscilloscopeData correctly a tpmarker is constant for all headstages of a given TP. Maybe make the test use two headstages with different clamp modes to verify that.

@t-b t-b assigned MichaelHuth and unassigned t-b Aug 22, 2024
@MichaelHuth
Copy link
Collaborator Author

MichaelHuth commented Aug 22, 2024

  • Change TP return functions to have an headstage argument and slice also HS.
    Then return tpADC, tpDAC, tpStorage slice as 1d wave.

  • Add test that DimLabels of layers are correctly transferred to ROWs (if Redimension does that itself)

  • Use outData wave directly for PUB_TP* argument, scrap tpzmq struct

  • add wave getter for outData, use MoveWave in TP_TSAnalysis with named free wave

@MichaelHuth MichaelHuth force-pushed the feature/2230-Add_zeromq_pub_filters_for_TP branch from ef494e7 to e82cd88 Compare August 25, 2024 18:11
@MichaelHuth MichaelHuth assigned t-b and unassigned MichaelHuth Aug 25, 2024
@t-b
Copy link
Collaborator

t-b commented Sep 3, 2024

@MichaelHuth Please rebase

@MichaelHuth MichaelHuth force-pushed the feature/2230-Add_zeromq_pub_filters_for_TP branch from e82cd88 to 82d748d Compare September 3, 2024 12:50
@t-b t-b assigned MichaelHuth and unassigned t-b Sep 3, 2024
@MichaelHuth MichaelHuth force-pushed the feature/2230-Add_zeromq_pub_filters_for_TP branch 2 times, most recently from 28dcb99 to e86dda8 Compare September 3, 2024 22:39
@MichaelHuth MichaelHuth assigned t-b and unassigned MichaelHuth Sep 3, 2024
@t-b
Copy link
Collaborator

t-b commented Sep 4, 2024

CI is failing.

Review:
92ad402 (Util: Add two conversion function for DA/AD unit string return, 2024-08-22)
5571774 (TP: Add more information that is transferred to the TP analysis thread, 2024-08-22)

Looks all good.

31d1771 (Getters: Cleanup of GetTPStorage wave upgrade, 2024-08-25)

Now looking at this change in isolation, I realize that this is not doing the right thing.

Consider a wave upgrade for version 13. The old code set layers 24 to 30. But
the new code sets layers 17 and 20 to 30. Overwriting valuable data!

2802321 (PUB: Preparation to add four zeromq publishers for TP data, 2024-08-25)
00941de (PUB: Add four publishers to publish TP data, 2024-08-23)

TestTPPublishing_REENTRY:

Instead of

Make/FREE/T filters = {ZMQ_FILTER_TPRESULT_NOW, ZMQ_FILTER_TPRESULT_1S, ZMQ_FILTER_TPRESULT_5S, ZMQ_FILTER_TPRESULT_10S}

you can just write

WAVE filters = DataGenerators#PUB_TPFilters()

0057895 (TP: Rename TP_GetStoredTPs to TP_GetConsecutiveTPsUptoMarker, 2024-08-25)

We nowadays have IsValidHeadstage.

e86dda8 (TP: Added two functions that allow to retrieve info about TPs, 2024-08-22)

Looks good!

@t-b t-b removed their assignment Sep 4, 2024
@MichaelHuth
Copy link
Collaborator Author

@t-b I diagnosed an issue with the JSON construction for publishing in this PR that requires an update of the JSON XOP to the current version where we fixed the handling of adding values that are large integers.
The current effect is that for certain calculated values from the TP analysis the JSON XOP returns an error. That can happen randomly (here about 1 in 100), depending on the noise.

@t-b
Copy link
Collaborator

t-b commented Sep 4, 2024

@t-b I diagnosed an issue with the JSON construction for publishing in this PR that requires an update of the JSON XOP to the current version where we fixed the handling of adding values that are large integers.

Let's update the JSON XOP then, 7a39926 (Update JSON-XOP, 2023-09-25) shows how this is done. We probably would like to update the MacOSX XOP as well.

@MichaelHuth MichaelHuth force-pushed the feature/2230-Add_zeromq_pub_filters_for_TP branch 2 times, most recently from 3b191f9 to c1fc4e9 Compare September 4, 2024 16:22
@t-b t-b force-pushed the feature/2230-Add_zeromq_pub_filters_for_TP branch from c1fc4e9 to 7030c77 Compare September 8, 2024 10:46
t-b
t-b previously approved these changes Sep 8, 2024
@t-b t-b assigned timjarsky and unassigned MichaelHuth Sep 8, 2024
@t-b
Copy link
Collaborator

t-b commented Sep 9, 2024

We return a wave reference wave from the get tp function and I was curious if we are handling recursive wave references correctly, yes we do.

Function Dostuff()

	make/o/WAVE/N=1 data
	make/o/WAVE/N=1 elem1
	make/o/WAVE/N=1 elem2

	data[0] = elem1
	elem1[0] = elem2
	
	print data, elem1, elem2

	print zeromq_test_serializewave(data)
End

gives

•dostuff()
  data[0]= {1.11244e+09}
elem1[0]= {1.11244e+09}
elem2[0]= {0}
  {
    "data": {
        "raw": [
            {
                "data": {
                    "raw": [
                        {
                            "data": {
                                "raw": [
                                    null
                                ]
                            },
                            "date": {
                                "modification": 1725915394
                            },
                            "dimension": {
                                "size": [
                                    1
                                ]
                            },
                            "type": "WAVE_TYPE"
                        }
                    ]
                },
                "date": {
                    "modification": 1725915394
                },
                "dimension": {
                    "size": [
                        1
                    ]
                },
                "type": "WAVE_TYPE"
            }
        ]
    },
    "date": {
        "modification": 1725915394
    },
    "dimension": {
        "size": [
            1
        ]
    },
    "type": "WAVE_TYPE"
}

Added GetADChannelUnit, GetDAChannelUnit that return the unit string
depending on clamp mode.

Adapted GetChanAmpAssignUnit to use the new functions.
- extended the TPAnalysisInput structure

This is a preparation commit for adding zeromq publishers that include
some of that information.
- the data is published from the TP analysis thread including additional
information available in the thread through the previous commit.
- The additional values are also returned by the thread and collected in
the async buffer as well then in TPResult and in TPStorage.
- The involved waves and their respective getters were adapted with new
elements that the additional data can be stored.
- As most of the elements store the same information, thus a constant
was introduced with a dimension label list that is used as helper for
the wave creation in the getter functions.
- The four publishers publish the same json, just with a different period.
  There is a filter for live, 1s, 5s and 10s publishing interval.

- See PUB_TPResult for JSON description

- publisher is called from TP_TSAnalysis thread
This prevent misleading naming and it more fitting to the functionality
the function actually implements
Added TP_GetStoredTP and TP_GetStoredTPsFromCycle that allow to get
information about a TP by tpMarker or TPs by cycle id and headstage.

- both functions allow to recreate the DA wave for the TPs with the flag includeDAC
- the returned data includes the AD, DA data as well as metadata for
  each returned TP (from TPStorage).

- These TP functions use the same TP utility function.
With a running TP adding zeromq publishing messages for each TP
it appears that we have to look through more than the last 100 messages
after a test to find the requested.

- split logic into two parts: either read out upto 10000 existing messages
  or wait up to 10 seconds (100 trys with 100 ms sleep) if no message
  is available
@t-b t-b force-pushed the feature/2230-Add_zeromq_pub_filters_for_TP branch from 7030c77 to 9a5a3ff Compare September 10, 2024 21:09
@t-b
Copy link
Collaborator

t-b commented Sep 10, 2024

Fixed conflicts.

@timjarsky
Copy link
Collaborator

@campagnola, can you confirm that this PR has the necessary functionality to pass TP data from MIES to ACQ4?

@campagnola
Copy link
Member

I will work with Ben to get this tested. Neither of us is very familiar with Igor, though; is the protocol documented somewhere?

@t-b
Copy link
Collaborator

t-b commented Sep 17, 2024

To test this I would do:

  • Install the user installer on a rig
  • Start Igor Pro
  • Lock a device
  • Start the TP

In a python console:

  • Call the function FFI_GetAvailableMessageFilters via ZeroMQ, the JSON message should look like
{
  "version" : 1,
   "CallFunction" : {
     "name" : "FFI_GetAvailableMessageFilters"
   }
}

python example code is at https://github.com/AllenInstitute/ZeroMQ-XOP/tree/main/examples#python-client.

  • Create a zeromq sub socket and subscribe to one or more of the topics returned from FFI_GetAvailableMessageFilters. The new ones are:
StrConstant ZMQ_FILTER_TPRESULT_NOW       = "testpulse:results live"
StrConstant ZMQ_FILTER_TPRESULT_1S        = "testpulse:results 1s update"
StrConstant ZMQ_FILTER_TPRESULT_5S        = "testpulse:results 5s update"
StrConstant ZMQ_FILTER_TPRESULT_10S       = "testpulse:results 10s update"
  • Look at the sent messages, their layout is defined at the function comment for PUB_TPResult.

We already do have tests that things work like we think it should, so this is more a request to check that it works like you think it should. The idea of the PR is that programs interfacing with MIES (aka ACQ4) don't need to poll the TP values with FFI_ReturnTPValues but get a message with all the data in the requested interval.

Another thing we added is the possibility to query the stored TPs via TP_GetStoredTP.

@timjarsky timjarsky assigned campagnola and unassigned timjarsky Sep 23, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Extend zeromq PUB filters for TP updates
4 participants