Skip to content

Commit 405cb7d

Browse files
authored
Merge pull request #2371 from AllenInstitute/feature/2371-update-zeromq-xop
Update ZeroMQ XOP and related house keeping
2 parents 13c5357 + 7630adb commit 405cb7d

17 files changed

+180
-108
lines changed

Packages/MIES/MIES_IVSCC.ipf

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
/// - Listening port for the REP/ROUTER socket starts at #ZEROMQ_BIND_REP_PORT.
1414
/// - Listening port for the PUBLISHER socket starts at #ZEROMQ_BIND_PUB_PORT
1515
/// - If one of those ports is already in use, the next larger port is tried.
16-
/// - The publisher socket does include an automatic heartbeat message every 5 seconds. Subscribe to #ZeroMQ_HEARTBEAT if
16+
/// - The publisher socket does include an automatic heartbeat message every 5 seconds. Subscribe to #ZMQ_HEARTBEAT if
1717
/// you want to receive that.
1818
/// - All available message filters can be queried via FFI_GetAvailableMessageFilters().
1919
/// - More information regarding the ZeroMQ-XOP is located [here](https://github.com/AllenInstitute/ZeroMQ-XOP/#readme)

Packages/MIES/MIES_MiesUtilities_Conversion.ipf

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,3 +263,15 @@ Function MapAnaFuncToConstant(string anaFunc)
263263
#endif // AUTOMATED_TESTING
264264
endswitch
265265
End
266+
267+
/// @brief returns the unit string for the AD channel depending on clampmode
268+
threadsafe Function/S GetADChannelUnit(variable clampMode)
269+
270+
return SelectString(clampMode == V_CLAMP_MODE, "mV", "pA")
271+
End
272+
273+
/// @brief returns the unit string for the DA channel depending on clampmode
274+
threadsafe Function/S GetDAChannelUnit(variable clampMode)
275+
276+
return SelectString(clampMode == V_CLAMP_MODE, "pA", "mV")
277+
End

Packages/MIES/MIES_MiesUtilities_ZeroMQ.ipf

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111

1212
Function GetZeroMQXOPFlags()
1313

14-
return ZeroMQ_SET_FLAGS_DEFAULT | ZeroMQ_SET_FLAGS_LOGGING | ZeroMQ_SET_FLAGS_NOBUSYWAITRECV
14+
return ZeroMQ_SET_FLAGS_DEFAULT | ZeroMQ_SET_FLAGS_NOBUSYWAITRECV
1515
End
1616

1717
/// @brief Start the ZeroMQ sockets and the message handler

Packages/MIES/MIES_Oscilloscope.ipf

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -581,7 +581,7 @@ Function SCOPE_UpdateOscilloscopeData(string device, variable dataAcqOrTP, [vari
581581
endif
582582
tpInput.clampAmp = clampAmp
583583
tpInput.clampMode = hsProp[headstage][%ClampMode]
584-
tpInput.hsIndex = headstage
584+
tpInput.headstage = headstage
585585

586586
DEBUGPRINT("headstage: ", var = headstage)
587587
DEBUGPRINT("channel: ", var = numDACs + j)

Packages/MIES/MIES_Publish.ipf

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,20 +25,26 @@ static Function PUB_GetJSONTemplate(string device, variable headstage)
2525
End
2626

2727
/// @brief Publish the given message as given by the JSON and the filter
28-
static Function PUB_Publish(variable jsonID, string messageFilter)
28+
threadsafe Function PUB_Publish(variable jsonID, string messageFilter, [variable releaseJSON])
2929

3030
variable err
31-
string payload
3231

33-
payload = JSON_Dump(jsonID)
34-
JSON_Release(jsonID)
32+
releaseJSON = ParamIsDefault(releaseJSON) ? 1 : !!releaseJSON
33+
34+
Make/T/FREE filter = {messageFilter}
35+
Make/T/FREE payload = {JSON_Dump(jsonID)}
36+
Make/FREE/WAVE wv = {filter, payload}
37+
38+
if(releaseJSON)
39+
JSON_Release(jsonID)
40+
endif
3541

3642
AssertOnAndClearRTError()
3743
try
38-
zeromq_pub_send(messageFilter, payload); AbortOnRTE
44+
zeromq_pub_send_multi(wv); AbortOnRTE
3945
catch
4046
err = ClearRTError()
41-
BUG("Could not publish " + messageFilter + " due to: " + num2str(err))
47+
BUG_TS("Could not publish " + messageFilter + " due to: " + num2str(err))
4248
endtry
4349
End
4450

Packages/MIES/MIES_Structures.ipf

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -303,7 +303,7 @@ Structure TPAnalysisInput
303303
variable baselineFrac
304304
variable tpLengthPoints
305305
variable readTimeStamp
306-
variable hsIndex
306+
variable headstage
307307
string device
308308
variable measurementMarker
309309
variable activeADCs

Packages/MIES/MIES_TestPulse.ipf

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ Function TP_ROAnalysis(STRUCT ASYNC_ReadOutStruct &ar)
192192

193193
WAVE/SDFR=dfr inData = outData
194194
NVAR/SDFR=dfr now = now
195-
NVAR/SDFR=dfr hsIndex = hsIndex
195+
NVAR/SDFR=dfr headstage = headstage
196196
SVAR/SDFR=dfr device = device
197197
NVAR/SDFR=dfr marker = marker
198198
NVAR/SDFR=dfr activeADCs = activeADCs
@@ -218,11 +218,11 @@ Function TP_ROAnalysis(STRUCT ASYNC_ReadOutStruct &ar)
218218
asyncBuffer[bufSize][posAsync][posMarker] = marker
219219
endif
220220

221-
asyncBuffer[i][posBaseline][hsIndex] = inData[%BASELINE]
222-
asyncBuffer[i][posSSRes][hsIndex] = inData[%STEADYSTATERES]
223-
asyncBuffer[i][posInstRes][hsIndex] = inData[%INSTANTRES]
224-
asyncBuffer[i][posElevSS][hsIndex] = inData[%ELEVATED_SS]
225-
asyncBuffer[i][posElevInst][hsIndex] = inData[%ELEVATED_INST]
221+
asyncBuffer[i][posBaseline][headstage] = inData[%BASELINE]
222+
asyncBuffer[i][posSSRes][headstage] = inData[%STEADYSTATERES]
223+
asyncBuffer[i][posInstRes][headstage] = inData[%INSTANTRES]
224+
asyncBuffer[i][posElevSS][headstage] = inData[%ELEVATED_SS]
225+
asyncBuffer[i][posElevInst][headstage] = inData[%ELEVATED_INST]
226226

227227
asyncBuffer[i][posAsync][%NOW] = now
228228
asyncBuffer[i][posAsync][%REC_CHANNELS] += 1
@@ -894,7 +894,7 @@ threadsafe Function/DF TP_TSAnalysis(DFREF dfrInp)
894894
NVAR/SDFR=dfrInp baselineFrac = param4
895895
NVAR/SDFR=dfrInp lengthTPInPoints = param5
896896
NVAR/SDFR=dfrInp now = param6
897-
NVAR/SDFR=dfrInp hsIndex = param7
897+
NVAR/SDFR=dfrInp headstage = param7
898898
SVAR/SDFR=dfrInp device = param8
899899
NVAR/SDFR=dfrInp marker = param9
900900
NVAR/SDFR=dfrInp activeADCs = param10
@@ -994,7 +994,7 @@ threadsafe Function/DF TP_TSAnalysis(DFREF dfrInp)
994994

995995
// additional data copy
996996
variable/G dfrOut:now = now
997-
variable/G dfrOut:hsIndex = hsIndex
997+
variable/G dfrOut:headstage = headstage
998998
string/G dfrOut:device = device
999999
variable/G dfrOut:marker = marker
10001000
variable/G dfrOut:activeADCs = activeADCs
@@ -1505,7 +1505,7 @@ Function/DF TP_PrepareAnalysisDF(string device, STRUCT TPAnalysisInput &tpInput)
15051505
ASYNC_AddParam(threadDF, var = tpInput.baselineFrac)
15061506
ASYNC_AddParam(threadDF, var = tpInput.tpLengthPoints)
15071507
ASYNC_AddParam(threadDF, var = tpInput.readTimeStamp)
1508-
ASYNC_AddParam(threadDF, var = tpInput.hsIndex)
1508+
ASYNC_AddParam(threadDF, var = tpInput.headstage)
15091509
ASYNC_AddParam(threadDF, str = tpInput.device)
15101510
ASYNC_AddParam(threadDF, var = tpInput.measurementMarker)
15111511
ASYNC_AddParam(threadDF, var = tpInput.activeADCs)

Packages/MIES/MIES_WaveDataFolderGetters.ipf

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -162,10 +162,10 @@ Function/WAVE GetChanAmpAssignUnit(string device)
162162
Make/T/N=(4, NUM_HEADSTAGES) dfr:ChanAmpAssignUnit/WAVE=wv
163163
wv = ""
164164

165-
wv[0][] = "mV"
166-
wv[1][] = "pA"
167-
wv[2][] = "pA"
168-
wv[3][] = "mV"
165+
wv[0][] = GetDAChannelUnit(V_CLAMP_MODE)
166+
wv[1][] = GetADChannelUnit(V_CLAMP_MODE)
167+
wv[2][] = GetDAChannelUnit(I_CLAMP_MODE)
168+
wv[3][] = GetADChannelUnit(I_CLAMP_MODE)
169169
endif
170170

171171
SetDimLabel ROWS, 0, VC_DAUnit, wv
@@ -494,7 +494,7 @@ End
494494
/// Column 8: average elevated level (instantaneous)
495495
///
496496
/// Layers:
497-
/// - NUM_HEADSTAGES positions with value entries at hsIndex
497+
/// - NUM_HEADSTAGES
498498
Function/WAVE GetTPResultAsyncBuffer(string device)
499499

500500
variable versionOfNewWave = 1

Packages/MIES/ZeroMQ_Interop.ipf

Lines changed: 58 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
1-
#pragma TextEncoding = "UTF-8"
2-
#pragma rtGlobals=3 // Use modern global access method and strict wave access.
3-
#pragma IgorVersion=8.0
1+
#pragma TextEncoding="UTF-8"
2+
#pragma rtGlobals=3 // Use modern global access method and strict wave access.
3+
#pragma IgorVersion=6.37
44

55
// This file is part of the `ZeroMQ-XOP` project and licensed under BSD-3-Clause.
66

7+
/// ** Define ZeroMQ Constants (for back-compatibility) for Igor8 case. **
8+
#if igorVersion() >= 8
79
/// @name Flags for zeromq_set()
810
/// @anchor ZeroMQSetFlags
911
///@{
@@ -36,25 +38,62 @@ Constant ZeroMQ_INVALID_MESSAGE_FORMAT = 10009
3638
Constant ZeroMQ_INVALID_LOGGING_TEMPLATE = 10010
3739
Constant ZeroMQ_MESSAGE_FILTER_DUPLICATED = 10011
3840
Constant ZeroMQ_MESSAGE_FILTER_MISSING = 10012
41+
Constant ZeroMQ_MESSAGE_INVALID_TYPE = 10013
3942
///@}
43+
#endif
4044

41-
Constant REQ_SUCCESS = 0
42-
Constant REQ_UNKNOWN_ERROR = 1
43-
Constant REQ_INVALID_JSON_OBJECT = 3
44-
Constant REQ_INVALID_VERSION = 4
45-
Constant REQ_INVALID_OPERATION = 5
46-
Constant REQ_INVALID_OPERATION_FORMAT = 6
47-
Constant REQ_INVALID_MESSAGEID = 7
48-
Constant REQ_OUT_OF_MEMORY = 8
45+
/// @name Flags for zeromq_set()
46+
/// @anchor ZeroMQSetFlags
47+
///@{
48+
/// Sets the default flags (no debug, no ipv6, busy wait on receive)
49+
Constant ZMQ_SET_FLAGS_DEFAULT = 0x1
50+
/// Enable debug output
51+
Constant ZMQ_SET_FLAGS_DEBUG = 0x2
52+
/// Enable ipv6 support
53+
Constant ZMQ_SET_FLAGS_IPV6 = 0x4
54+
/// Don't do busy waiting on zeromq_server_recv() and zeromq_client_recv()
55+
/// instead immediately return if no messages are available.
56+
Constant ZMQ_SET_FLAGS_NOBUSYWAITRECV = 0x8
57+
/// Log incoming and outgoing messages
58+
Constant ZMQ_SET_FLAGS_LOGGING = 0x10
59+
60+
///@}
61+
62+
StrConstant ZMQ_HEARTBEAT = "heartbeat"
63+
64+
/// @name Error codes
65+
/// @anchor ZeroMQErrorCodes
66+
///@{
67+
Constant ZMQ_UNKNOWN_SET_FLAG = 10003
68+
Constant ZMQ_INTERNAL_ERROR = 10004
69+
Constant ZMQ_INVALID_ARG = 10005
70+
Constant ZMQ_HANDLER_ALREADY_RUNNING = 10006
71+
Constant ZMQ_HANDLER_NO_CONNECTION = 10007
72+
Constant ZMQ_MISSING_PROCEDURE_FILES = 10008
73+
Constant ZMQ_INVALID_MESSAGE_FORMAT = 10009
74+
Constant ZMQ_INVALID_LOGGING_TEMPLATE = 10010
75+
Constant ZMQ_MESSAGE_FILTER_DUPLICATED = 10011
76+
Constant ZMQ_MESSAGE_FILTER_MISSING = 10012
77+
Constant ZMQ_MESSAGE_INVALID_TYPE = 10013
78+
///@}
79+
80+
Constant REQ_SUCCESS = 0
81+
Constant REQ_UNKNOWN_ERROR = 1
82+
Constant REQ_INVALID_JSON_OBJECT = 3
83+
Constant REQ_INVALID_VERSION = 4
84+
Constant REQ_INVALID_OPERATION = 5
85+
Constant REQ_INVALID_OPERATION_FORMAT = 6
86+
Constant REQ_INVALID_MESSAGEID = 7
87+
Constant REQ_OUT_OF_MEMORY = 8
4988
// error codes for CallFunction class
50-
Constant REQ_PROC_NOT_COMPILED = 100
51-
Constant REQ_NON_EXISTING_FUNCTION = 101
52-
Constant REQ_TOO_FEW_FUNCTION_PARAMS = 102
53-
Constant REQ_TOO_MANY_FUNCTION_PARAMS = 103
54-
Constant REQ_UNSUPPORTED_FUNC_SIG = 104
55-
Constant REQ_UNSUPPORTED_FUNC_RET = 105
56-
Constant REQ_INVALID_PARAM_FORMAT = 106
57-
Constant REQ_FUNCTION_ABORTED = 107
89+
Constant REQ_PROC_NOT_COMPILED = 100
90+
Constant REQ_NON_EXISTING_FUNCTION = 101
91+
Constant REQ_TOO_FEW_FUNCTION_PARAMS = 102
92+
Constant REQ_TOO_MANY_FUNCTION_PARAMS = 103
93+
Constant REQ_UNSUPPORTED_FUNC_SIG = 104
94+
Constant REQ_UNSUPPORTED_FUNC_RET = 105
95+
Constant REQ_INVALID_PARAM_FORMAT = 106
96+
Constant REQ_FUNCTION_ABORTED = 107
5897

5998
/// @name Functions which might be useful for outside callers
6099
/// @anchor ZeroMQInterfaceFunctions

Packages/tests/Basic/UTF_ZeroMQPublishing.ipf

Lines changed: 2 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -3,49 +3,13 @@
33
#pragma rtFunctionErrors=1
44
#pragma ModuleName=ZeroMQPublishingTests
55

6-
// #define OUTPUT_DOCUMENTATION_JSON_DUMP
7-
86
static Function TEST_CASE_BEGIN_OVERRIDE(string testname)
97

108
TestCaseBeginCommon(testname)
119

1210
PrepareForPublishTest()
1311
End
1412

15-
static Function FetchAndParseMessage(string filter)
16-
17-
variable jsonID
18-
string msg
19-
20-
msg = FetchPublishedMessage(filter)
21-
22-
CHECK_PROPER_STR(msg)
23-
24-
jsonID = JSON_Parse(msg)
25-
CHECK_GE_VAR(jsonID, 0)
26-
27-
#ifdef OUTPUT_DOCUMENTATION_JSON_DUMP
28-
WAVE/T contents = ListToTextWave(JSON_Dump(jsonID, indent = 2), "\n")
29-
30-
contents[] = "/// " + contents[p]
31-
32-
print "/// Filter: #XXXX"
33-
print "///"
34-
print "/// Example:"
35-
print "///"
36-
print "/// \\rst"
37-
print "/// .. code-block:: json"
38-
print "///"
39-
for(s : contents)
40-
print s
41-
endfor
42-
print "///"
43-
print "/// \\endrst"
44-
#endif // OUTPUT_DOCUMENTATION_JSON_DUMP
45-
46-
return jsonID
47-
End
48-
4913
static Function CheckPressureState()
5014

5115
string device, expected, actual
@@ -413,7 +377,7 @@ End
413377

414378
static Function CheckAccessResSmoke()
415379

416-
string device, msg, expected, actual
380+
string device, expected, actual
417381
variable headstage, i, jsonID, value, sweepNo
418382

419383
device = "my_device"
@@ -439,9 +403,7 @@ static Function CheckAccessResSmoke()
439403

440404
MIES_PUB#PUB_AccessResistanceSmoke(device, sweepNo, headstage)
441405

442-
msg = FetchPublishedMessage(ANALYSIS_FUNCTION_AR)
443-
444-
jsonID = JSON_Parse(msg)
406+
jsonID = FetchAndParseMessage(ANALYSIS_FUNCTION_AR)
445407

446408
expected = LABNOTEBOOK_BINARY_UNIT
447409
actual = JSON_GetString(jsonID, "/results/USER_Access Res. Smoke Set QC/unit")

Packages/tests/UTF_HardwareHelperFunctions.ipf

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -837,7 +837,7 @@ End
837837

838838
Function CheckPublishedMessage(string device, variable type)
839839

840-
string expectedFilter, msg
840+
string expectedFilter
841841
variable jsonID
842842

843843
switch(type)
@@ -858,9 +858,7 @@ Function CheckPublishedMessage(string device, variable type)
858858
return NaN
859859
endswitch
860860

861-
msg = FetchPublishedMessage(expectedFilter)
862-
CHECK_PROPER_STR(msg)
863-
jsonID = JSON_Parse(msg)
861+
jsonID = FetchAndParseMessage(expectedFilter)
864862
CHECK_GE_VAR(jsonID, 0)
865863
JSON_Release(jsonID)
866864
End
@@ -1666,12 +1664,10 @@ End
16661664

16671665
Function CheckStartStopMessages(string mode, string state)
16681666

1669-
string msg, actual, expected
1667+
string actual, expected
16701668
variable jsonID
16711669

1672-
msg = FetchPublishedMessage(DAQ_TP_STATE_CHANGE_FILTER)
1673-
1674-
jsonID = JSON_Parse(msg)
1670+
jsonID = FetchAndParseMessage(DAQ_TP_STATE_CHANGE_FILTER)
16751671
actual = JSON_GetString(jsonID, "/" + mode)
16761672
expected = state
16771673
CHECK_EQUAL_STR(actual, expected)

0 commit comments

Comments
 (0)