Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add three new ZMQ publishers for TP results / Retrieve TP results #2230

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
11 changes: 11 additions & 0 deletions Packages/MIES/MIES_Constants.ipf
Original file line number Diff line number Diff line change
Expand Up @@ -1857,6 +1857,10 @@ StrConstant PRESSURE_STATE_FILTER = "pressure:state"
StrConstant PRESSURE_SEALED_FILTER = "pressure:sealed"
StrConstant PRESSURE_BREAKIN_FILTER = "pressure:break in"
StrConstant AUTO_TP_FILTER = "testpulse:autotune result"
StrConstant ZMQ_FILTER_TPRESULT_NOW = "testpulse:results live"
StrConstant ZMQ_FILTER_TPRESULT_1S = "testpulse:results 1s update"
StrConstant ZMQ_FILTER_TPRESULT_5S = "testpulse:results 5s update"
StrConstant ZMQ_FILTER_TPRESULT_10S = "testpulse:results 10s update"
StrConstant AMPLIFIER_CLAMP_MODE_FILTER = "amplifier:clamp mode"
StrConstant AMPLIFIER_AUTO_BRIDGE_BALANCE = "amplifier:auto bridge balance"
StrConstant ANALYSIS_FUNCTION_PB = "analysis function:pipette in bath"
Expand Down Expand Up @@ -2317,3 +2321,10 @@ Constant ABORTCODE_ABORT = -3
Constant ABORTCODE_STACKOVERFLOW = -2
Constant ABORTCODE_USERABORT = -1
///@}

// If this constant with dimLabels is changed the following functions should be verified:
//
// TP_TSAnalysis
// GetTPResultAsyncBuffer
// GetTPResults (reuses same dimlabels partially)
StrConstant TP_ANALYSIS_DATA_LABELS = "BASELINE;STEADYSTATERES;INSTANTRES;ELEVATED_SS;ELEVATED_INST;NOW;HEADSTAGE;MARKER;NUMBER_OF_TP_CHANNELS;TIMESTAMP;TIMESTAMPUTC;CLAMPMODE;CLAMPAMP;BASELINEFRAC;CYCLEID;TPLENGTHPOINTSADC;PULSELENGTHPOINTSADC;PULSESTARTPOINTSADC;SAMPLINGINTERVALADC;TPLENGTHPOINTSDAC;PULSELENGTHPOINTSDAC;PULSESTARTPOINTSDAC;SAMPLINGINTERVALDAC;"
3 changes: 2 additions & 1 deletion Packages/MIES/MIES_ForeignFunctionInterface.ipf
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ Function/WAVE FFI_GetAvailableMessageFilters()
PRESSURE_BREAKIN_FILTER, AUTO_TP_FILTER, AMPLIFIER_CLAMP_MODE_FILTER, \
AMPLIFIER_AUTO_BRIDGE_BALANCE, ANALYSIS_FUNCTION_PB, ANALYSIS_FUNCTION_SE, \
ANALYSIS_FUNCTION_VM, DAQ_TP_STATE_CHANGE_FILTER, \
ANALYSIS_FUNCTION_AR}
ANALYSIS_FUNCTION_AR, ZMQ_FILTER_TPRESULT_NOW, ZMQ_FILTER_TPRESULT_1S, \
ZMQ_FILTER_TPRESULT_5S, ZMQ_FILTER_TPRESULT_10S}

Note/K wv, "Heartbeat is sent every 5 seconds."

Expand Down
12 changes: 12 additions & 0 deletions Packages/MIES/MIES_MiesUtilities_Conversion.ipf
Original file line number Diff line number Diff line change
Expand Up @@ -270,3 +270,15 @@ Function MapAnaFuncToConstant(anaFunc)
#endif
endswitch
End

/// @brief returns the unit string for the AD channel depending on clampmode
threadsafe Function/S GetADChannelUnit(variable clampMode)

return SelectString(clampMode == V_CLAMP_MODE, "mV", "pA")
End

/// @brief returns the unit string for the DA channel depending on clampmode
threadsafe Function/S GetDAChannelUnit(variable clampMode)

return SelectString(clampMode == V_CLAMP_MODE, "pA", "mV")
End
45 changes: 28 additions & 17 deletions Packages/MIES/MIES_Oscilloscope.ipf
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ Function SCOPE_UpdateOscilloscopeData(device, dataAcqOrTP, [chunk, fifoPos, devi

STRUCT TPAnalysisInput tpInput
variable i, j
variable tpChannels, numADCs, numDACs, tpLengthPoints, tpStart, tpEnd, tpStartPos
variable tpChannels, numADCs, numDACs, tpLengthPointsADC, tpStart, tpEnd, tpStartPos
variable TPChanIndex, saveTP, clampAmp
variable headstage, fifoLatest, channelIndex
string hsList
Expand Down Expand Up @@ -530,10 +530,10 @@ Function SCOPE_UpdateOscilloscopeData(device, dataAcqOrTP, [chunk, fifoPos, devi
WAVE TPSettings = GetTPSettings(device)
WAVE TPSettingsCalc = GetTPSettingsCalculated(device)

tpLengthPoints = (dataAcqOrTP == TEST_PULSE_MODE) ? TPSettingsCalc[%totalLengthPointsTP_ADC] : TPSettingsCalc[%totalLengthPointsDAQ_ADC]
tpLengthPointsADC = (dataAcqOrTP == TEST_PULSE_MODE) ? TPSettingsCalc[%totalLengthPointsTP_ADC] : TPSettingsCalc[%totalLengthPointsDAQ_ADC]

// use a 'virtual' end position for fifoLatest for TP Mode since the input data contains one TP only
fifoLatest = (dataAcqOrTP == TEST_PULSE_MODE) ? tpLengthPoints : fifoPos
fifoLatest = (dataAcqOrTP == TEST_PULSE_MODE) ? tpLengthPointsADC : fifoPos

WAVE ADCs = GetADCListFromConfig(config)
WAVE DACs = GetDACListFromConfig(config)
Expand All @@ -544,33 +544,40 @@ Function SCOPE_UpdateOscilloscopeData(device, dataAcqOrTP, [chunk, fifoPos, devi
numADCs = DimSize(ADCs, ROWS)

// note: currently this works for multiplier = 1 only, see DC_PlaceDataInDAQDataWave
Make/FREE/N=(tpLengthPoints) channelData
Make/FREE/N=(tpLengthPointsADC) channelData
WAVE tpInput.data = channelData

tpInput.device = device
tpInput.duration = (dataAcqOrTP == TEST_PULSE_MODE) ? TPSettingsCalc[%pulseLengthPointsTP_ADC] : TPSettingsCalc[%pulseLengthPointsDAQ_ADC]
tpInput.baselineFrac = TPSettingsCalc[%baselineFrac]
tpInput.tpLengthPoints = tpLengthPoints
tpInput.readTimeStamp = ticks * TICKS_TO_SECONDS
tpInput.activeADCs = tpChannels

tpStart = trunc(fifoPosGlobal / tpLengthPoints)
tpEnd = trunc(fifoLatest / tpLengthPoints)
tpInput.device = device
tpInput.tpLengthPointsADC = tpLengthPointsADC
tpInput.pulseLengthPointsADC = (dataAcqOrTP == TEST_PULSE_MODE) ? TPSettingsCalc[%pulseLengthPointsTP_ADC] : TPSettingsCalc[%pulseLengthPointsDAQ_ADC]
tpInput.pulseStartPointsADC = (dataAcqOrTP == TEST_PULSE_MODE) ? TPSettingsCalc[%pulseStartPointsTP_ADC] : TPSettingsCalc[%pulseStartPointsDAQ_ADC]
tpInput.samplingIntervalADC = DimDelta(scaledDataWave[numDACs], ROWS)
tpInput.tpLengthPointsDAC = (dataAcqOrTP == TEST_PULSE_MODE) ? TPSettingsCalc[%totalLengthPointsTP] : TPSettingsCalc[%totalLengthPointsDAQ]
tpInput.pulseLengthPointsDAC = (dataAcqOrTP == TEST_PULSE_MODE) ? TPSettingsCalc[%pulseLengthPointsTP] : TPSettingsCalc[%pulseLengthPointsDAQ]
tpInput.pulseStartPointsDAC = (dataAcqOrTP == TEST_PULSE_MODE) ? TPSettingsCalc[%pulseStartPointsTP] : TPSettingsCalc[%pulseStartPointsDAQ]
tpInput.samplingIntervalDAC = DimDelta(scaledDataWave[0], ROWS)
tpInput.baselineFrac = TPSettingsCalc[%baselineFrac]
tpInput.readTimeStamp = ticks * TICKS_TO_SECONDS
tpInput.activeADCs = tpChannels
tpInput.cycleId = ROVAR(GetTestpulseCycleID(device))

tpStart = trunc(fifoPosGlobal / tpLengthPointsADC)
tpEnd = trunc(fifoLatest / tpLengthPointsADC)
ASSERT(tpStart <= tpEnd, "New fifopos is smaller than previous fifopos")
Make/FREE/D/N=(tpEnd - tpStart) tpMarker
NewRandomSeed()
tpMarker[] = GetUniqueInteger()

DEBUGPRINT("tpChannels: ", var = tpChannels)
DEBUGPRINT("tpLength: ", var = tpLengthPoints)
DEBUGPRINT("tpLength: ", var = tpLengthPointsADC)

for(i = tpStart; i < tpEnd; i += 1)

tpInput.measurementMarker = tpMarker[i - tpStart]
tpStartPos = i * tpLengthPoints
tpStartPos = i * tpLengthPointsADC

if(saveTP)
Make/FREE/N=(tpLengthPoints, tpChannels) StoreTPWave
Make/FREE/N=(tpLengthPointsADC, tpChannels) StoreTPWave
for(j = 0; j < tpChannels; j += 1)
WAVE scaledChannel = scaledDataWave[numDACs + j]
Multithread StoreTPWave[][j] = scaledChannel[tpStartPos + p]
Expand All @@ -580,6 +587,10 @@ Function SCOPE_UpdateOscilloscopeData(device, dataAcqOrTP, [chunk, fifoPos, devi
hsList = ""
endif

// Use same time for all headstages
tpInput.timeStamp = DateTime
tpInput.timeStampUTC = DateTimeInUTC()

for(j = 0; j < numADCs; j += 1)
if(ADCmode[j] == DAQ_CHANNEL_TYPE_TP)

Expand Down Expand Up @@ -622,7 +633,7 @@ Function SCOPE_UpdateOscilloscopeData(device, dataAcqOrTP, [chunk, fifoPos, devi
endfor

if(dataAcqOrTP == DATA_ACQUISITION_MODE && tpEnd > tpStart)
tpStartPos = (tpEnd - 1) * tpLengthPoints
tpStartPos = (tpEnd - 1) * tpLengthPointsADC
if(DAG_GetNumericalValue(device, "check_settings_show_power"))
WAVE tpOsciForPowerSpectrum = GetScaledTPTempWave(device)
Make/FREE/D/N=(numADCs) tpColumns
Expand Down
185 changes: 181 additions & 4 deletions Packages/MIES/MIES_Publish.ipf
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,22 @@ static Function PUB_GetJSONTemplate(string device, variable headstage)
End

/// @brief Publish the given message as given by the JSON and the filter
static Function PUB_Publish(variable jsonID, string messageFilter)
threadsafe Function PUB_Publish(variable jsonID, string messageFilter, [variable releaseJSON])
variable err
string payload

payload = JSON_Dump(jsonID)
JSON_Release(jsonID)
releaseJSON = ParamIsDefault(releaseJSON) ? 1 : !!releaseJSON
payload = JSON_Dump(jsonID)
if(releaseJSON)
JSON_Release(jsonID)
endif

AssertOnAndClearRTError()
try
zeromq_pub_send(messageFilter, payload); AbortOnRTE
catch
err = ClearRTError()
BUG("Could not publish " + messageFilter + " due to: " + num2str(err))
BUG_TS("Could not publish " + messageFilter + " due to: " + num2str(err))
endtry
End

Expand Down Expand Up @@ -642,3 +645,177 @@ Function PUB_AccessResistanceSmoke(string device, variable sweepNo, variable hea

PUB_Publish(jsonID, ANALYSIS_FUNCTION_AR)
End

threadsafe static Function PUB_AddTPResultEntry(variable jsonId, string path, variable value, string unit)

if(IsEmpty(unit))
JSON_AddVariable(jsonID, path, value)
else
JSON_AddTreeObject(jsonID, path)
JSON_AddVariable(jsonID, path + "/value", value)
JSON_AddString(jsonID, path + "/unit", unit)
endif
End

/// Filter: #ZMQ_FILTER_TPRESULT_NOW
/// Filter: #ZMQ_FILTER_TPRESULT_1S
/// Filter: #ZMQ_FILTER_TPRESULT_5S
/// Filter: #ZMQ_FILTER_TPRESULT_10S
///
/// Example:
///
/// \rst
/// .. code-block:: json
///
/// {
/// "properties": {
/// "baseline fraction": {
/// "unit": "%",
/// "value": 35
/// },
/// "clamp amplitude": {
/// "unit": "mV",
/// "value": 10
/// },
/// "clamp mode": 0,
/// "device": "TestDevice",
/// "headstage": 1,
/// "pulse duration ADC": {
/// "unit": "points",
/// "value": 500
/// },
/// "pulse duration DAC": {
/// "unit": "points",
/// "value": 600
/// },
/// "pulse start point ADC": {
/// "unit": "point",
/// "value": 500
/// },
/// "pulse start point DAC": {
/// "unit": "point",
/// "value": 600
/// },
/// "sample interval ADC": {
/// "unit": "ms",
/// "value": 0.002
/// },
/// "sample interval DAC": {
/// "unit": "ms",
/// "value": 0.002
/// },
/// "time of tp acquisition": {
/// "unit": "s",
/// "value": 1000000
/// },
/// "timestamp": {
/// "unit": "s",
/// "value": 2000000
/// },
/// "timestampUTC": {
/// "unit": "s",
/// "value": 3000000
/// },
/// "tp cycle id": 456,
/// "tp length ADC": {
/// "unit": "points",
/// "value": 1500
/// },
/// "tp length DAC": {
/// "unit": "points",
/// "value": 1800
/// },
/// "tp marker": 1234
/// },
/// "results": {
/// "average baseline steady state": {
/// "unit": "pA",
/// "value": 2
/// },
/// "average tp steady state": {
/// "unit": "pA",
/// "value": 10
/// },
/// "instantaneous": {
/// "unit": "pA",
/// "value": 11
/// },
/// "instantaneous resistance": {
/// "unit": "MΩ",
/// "value": 2345
/// },
/// "steady state resistance": {
/// "unit": "MΩ",
/// "value": 1234
/// }
/// }
/// }
///
/// \endrst
threadsafe Function PUB_TPResult(string device, WAVE tpData)

string path
variable jsonId = JSON_New()
string adUnit = GetADChannelUnit(tpData[%CLAMPMODE])
string daUnit = GetDAChannelUnit(tpData[%CLAMPMODE])

path = "properties"
JSON_AddTreeObject(jsonID, path)
JSON_AddString(jsonID, path + "/device", device)
JSON_AddVariable(jsonID, path + "/tp marker", tpData[%MARKER])
JSON_AddVariable(jsonID, path + "/headstage", tpData[%HEADSTAGE])
JSON_AddVariable(jsonID, path + "/clamp mode", tpData[%CLAMPMODE])

PUB_AddTPResultEntry(jsonId, path + "/time of tp acquisition", tpData[%NOW], "s")
PUB_AddTPResultEntry(jsonId, path + "/clamp amplitude", tpData[%CLAMPAMP], daUnit)
PUB_AddTPResultEntry(jsonId, path + "/tp length ADC", tpData[%TPLENGTHPOINTSADC], "points")
PUB_AddTPResultEntry(jsonId, path + "/pulse duration ADC", tpData[%PULSELENGTHPOINTSADC], "points")
PUB_AddTPResultEntry(jsonId, path + "/pulse start point ADC", tpData[%PULSESTARTPOINTSADC], "point")
PUB_AddTPResultEntry(jsonId, path + "/sample interval ADC", tpData[%SAMPLINGINTERVALADC], "ms")
PUB_AddTPResultEntry(jsonId, path + "/tp length DAC", tpData[%TPLENGTHPOINTSDAC], "points")
PUB_AddTPResultEntry(jsonId, path + "/pulse duration DAC", tpData[%PULSELENGTHPOINTSDAC], "points")
PUB_AddTPResultEntry(jsonId, path + "/pulse start point DAC", tpData[%PULSESTARTPOINTSDAC], "point")
PUB_AddTPResultEntry(jsonId, path + "/sample interval DAC", tpData[%SAMPLINGINTERVALDAC], "ms")
PUB_AddTPResultEntry(jsonId, path + "/baseline fraction", tpData[%BASELINEFRAC] * ONE_TO_PERCENT, "%")
PUB_AddTPResultEntry(jsonId, path + "/timestamp", tpData[%TIMESTAMP], "s")
PUB_AddTPResultEntry(jsonId, path + "/timestampUTC", tpData[%TIMESTAMPUTC], "s")
PUB_AddTPResultEntry(jsonId, path + "/tp cycle id", tpData[%CYCLEID], "")

path = "results"
JSON_AddTreeObject(jsonID, path)
PUB_AddTPResultEntry(jsonId, path + "/average baseline steady state", tpData[%BASELINE], adUnit)
PUB_AddTPResultEntry(jsonId, path + "/average tp steady state", tpData[%ELEVATED_SS], adUnit)
PUB_AddTPResultEntry(jsonId, path + "/instantaneous", tpData[%ELEVATED_INST], adUnit)
PUB_AddTPResultEntry(jsonId, path + "/steady state resistance", tpData[%STEADYSTATERES], "MΩ")
PUB_AddTPResultEntry(jsonId, path + "/instantaneous resistance", tpData[%INSTANTRES], "MΩ")

PUB_Publish(jsonID, ZMQ_FILTER_TPRESULT_NOW, releaseJSON = 0)
if(PUB_CheckPublishingTime(ZMQ_FILTER_TPRESULT_1S, 1))
PUB_Publish(jsonID, ZMQ_FILTER_TPRESULT_1S, releaseJSON = 0)
endif
if(PUB_CheckPublishingTime(ZMQ_FILTER_TPRESULT_5S, 5))
PUB_Publish(jsonID, ZMQ_FILTER_TPRESULT_5S, releaseJSON = 0)
endif
if(PUB_CheckPublishingTime(ZMQ_FILTER_TPRESULT_10S, 10))
PUB_Publish(jsonID, ZMQ_FILTER_TPRESULT_10S, releaseJSON = 0)
endif
JSON_Release(jsonID)
End

/// @brief Updates the publishing timestamp in the TUFXOP storage and returns 1 if an update is due (0 otherwise)
threadsafe static Function PUB_CheckPublishingTime(string pubFilter, variable period)

variable lastTime
variable curTime = DateTime

TUFXOP_AcquireLock/N=(pubFilter)
lastTime = TSDS_ReadVar(pubFilter, defValue = 0, create = 1)
if(lastTime + period < curTime)
TSDS_Write(pubFilter, var = curTime + period)
TUFXOP_ReleaseLock/N=(pubFilter)
return 1
endif
TUFXOP_ReleaseLock/N=(pubFilter)

return 0
End
13 changes: 11 additions & 2 deletions Packages/MIES/MIES_Structures.ipf
Original file line number Diff line number Diff line change
Expand Up @@ -306,14 +306,23 @@ Structure TPAnalysisInput
WAVE data
variable clampAmp
variable clampMode
variable duration // [points]
variable tpLengthPointsADC
variable pulseLengthPointsADC
variable pulseStartPointsADC
variable samplingIntervalADC
variable tpLengthPointsDAC
variable pulseLengthPointsDAC
variable pulseStartPointsDAC
variable samplingIntervalDAC
variable baselineFrac
variable tpLengthPoints
variable readTimeStamp
variable hsIndex
string device
variable measurementMarker
variable activeADCs
variable timeStamp
variable timeStampUTC
variable cycleId
EndStructure

/// @brief Helper structure for GetPlotArea()
Expand Down
9 changes: 5 additions & 4 deletions Packages/MIES/MIES_SweepFormula.ipf
Original file line number Diff line number Diff line change
Expand Up @@ -3027,9 +3027,10 @@ static Function/WAVE SF_OperationTPImpl(string graph, WAVE/WAVE mode, WAVE/Z sel

// Assemble TP data
WAVE tpInput.data = SF_AverageTPFromSweep(epochMatches, sweepData)
tpInput.tpLengthPoints = DimSize(tpInput.data, ROWS)
tpInput.duration = (str2num(epochTPPulse[0][EPOCH_COL_ENDTIME]) - str2num(epochTPPulse[0][EPOCH_COL_STARTTIME])) * ONE_TO_MILLI / DimDelta(sweepData, ROWS)
tpInput.baselineFrac = TP_CalculateBaselineFraction(tpInput.duration, tpInput.duration + 2 * tpBaseLinePoints)
tpInput.tpLengthPointsADC = DimSize(tpInput.data, ROWS)
tpInput.samplingIntervalADC = DimDelta(tpInput.data, ROWS)
tpInput.pulseLengthPointsADC = (str2num(epochTPPulse[0][EPOCH_COL_ENDTIME]) - str2num(epochTPPulse[0][EPOCH_COL_STARTTIME])) * ONE_TO_MILLI / DimDelta(sweepData, ROWS)
tpInput.baselineFrac = TP_CalculateBaselineFraction(tpInput.pulseLengthPointsADC, tpInput.pulseLengthPointsADC + 2 * tpBaseLinePoints)

[WAVE settings, settingsIndex] = GetLastSettingChannel(numericalValues, textualValues, sweepNo, CLAMPMODE_ENTRY_KEY, dacChannelNr, XOP_CHANNEL_TYPE_DAC, DATA_ACQUISITION_MODE)
SFH_ASSERT(WaveExists(settings), "Failed to retrieve TP Clamp Mode from LBN")
Expand All @@ -3044,7 +3045,7 @@ static Function/WAVE SF_OperationTPImpl(string graph, WAVE/WAVE mode, WAVE/Z sel
DFREF dfrTPAnalysis = TP_PrepareAnalysisDF(graph, tpInput)
DFREF dfrTPAnalysisInput = dfrTPAnalysis:input
DFREF dfr = TP_TSAnalysis(dfrTPAnalysisInput)
WAVE tpOutData = dfr:outData
WAVE tpOutData = dfr:tpData

// handle waves sent out when TP_ANALYSIS_DEBUGGING is defined
if(WaveExists(dfr:data) && WaveExists(dfr:colors))
Expand Down
Loading