-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathexperiment.py
executable file
·402 lines (335 loc) · 13.6 KB
/
experiment.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
__author__ = "jon mulle"
# Import required KLibs classes and functions
import klibs
from klibs.KLExceptions import TrialException
from klibs import P
from klibs.KLConstants import (EL_RIGHT_EYE, EL_LEFT_EYE, EL_BOTH_EYES, EL_SACCADE_END, EL_FALSE,
NA, RC_KEYPRESS, CIRCLE_BOUNDARY, TIMEOUT)
from klibs.KLUtilities import deg_to_px, flush, iterable, smart_sleep, boolean_to_logical, pump
from klibs.KLUtilities import line_segment_len as lsl
from klibs.KLKeyMap import KeyMap
from klibs.KLUserInterface import key_pressed
from klibs.KLGraphics import fill, flip, blit, clear
from klibs.KLGraphics.KLDraw import Rectangle, Circle, SquareAsterisk, FixationCross
from klibs.KLCommunication import any_key, ui_request, message, slack_message
# Import other required varaibles and functions
from math import sqrt, pi, cos, sin
from sdl2 import SDLK_SPACE
# Define some handy contants for the experiment
BLACK = (0, 0, 0, 255)
WHITE = (255, 255, 255, 255)
RED = (255, 0, 0, 255)
TOP = "top"
BOTTOM = "bottom"
LEFT = "left"
RIGHT = "right"
V_START_AXIS = "vertical"
H_START_AXIS = "horizontal"
ROT_CW = "clockwise"
ROT_CCW = "counterclockwise"
BOX_1 = "top_or_left"
BOX_2 = "bottom_or_right"
SACC_INSIDE = "inside"
SACC_OUTSIDE = "outside"
class MixedMotionCueingEffects(klibs.Experiment):
# trial data
saccades = []
target_acquired = False
def setup(self):
# Generate messages to be displayed during experiment
self.err_msgs = {}
if P.saccade_response_cond:
self.err_msgs['eye'] = "Moved eyes too soon!"
self.err_msgs['key'] = "Please respond with eye movements only."
self.err_msgs['early'] = self.err_msgs['key'] # for convenience in logic
else:
self.err_msgs['eye'] = "Moved eyes!"
self.err_msgs['key'] = "Please respond with the spacebar only."
self.err_msgs['early'] = "Responded too soon!"
# Stimulus sizes
self.target_width = deg_to_px(0.5, even=True) # diameter of target circle (0.5 degrees)
self.cue_size = deg_to_px(0.5, even=True) # size of asterisk/fixations (0.5 degrees)
self.box_size = deg_to_px(0.8, even=True) # size of placeholder boxes (0.8 degrees)
# Stimulus Drawbjects
self.box = Rectangle(self.box_size, stroke=(2, WHITE)).render()
self.cross_r = FixationCross(self.cue_size, 2, fill=RED).render()
self.cross_w = FixationCross(self.cue_size, 2, fill=WHITE).render()
self.circle = Circle(self.target_width, fill=WHITE).render()
self.asterisk = SquareAsterisk(self.cue_size, 2, fill=WHITE).render()
# Layout of stimuli
# offset between centre of boxes and centre of screen, in degrees
offset_size_deg = P.dm_offset_size if P.development_mode else 7.0
self.offset_size = deg_to_px(offset_size_deg)
self.target_locs = {
TOP: (P.screen_c[0], P.screen_c[1] - self.offset_size),
RIGHT: (P.screen_c[0] + self.offset_size, P.screen_c[1]),
BOTTOM: (P.screen_c[0], P.screen_c[1] + self.offset_size),
LEFT: (P.screen_c[0] - self.offset_size, P.screen_c[1])
}
# prepare all animation locations for both rotation directions and starting axes
self.animation_frames = 15
animation_duration = 300 # ms
self.frame_duration = animation_duration / self.animation_frames
rotation_increment = (pi / 2) / self.animation_frames
cx, cy = P.screen_c
self.frames = {
V_START_AXIS: {ROT_CW:[], ROT_CCW:[]},
H_START_AXIS: {ROT_CW:[], ROT_CCW:[]}
}
for i in range(0, self.animation_frames):
l_x_cw = -self.offset_size * cos(i * rotation_increment)
l_y_cw = self.offset_size * sin(i * rotation_increment)
l_x_ccw = self.offset_size * cos(i * rotation_increment)
l_y_ccw = -self.offset_size * sin(i * rotation_increment)
cw_locs = [(cx + l_x_cw, cy + l_y_cw), (cx - l_x_cw, cy - l_y_cw)]
ccw_locs = [(cx + l_x_ccw, cy - l_y_ccw), (cx - l_x_ccw, cy + l_y_ccw)]
self.frames[H_START_AXIS][ROT_CW].append(ccw_locs)
self.frames[H_START_AXIS][ROT_CCW].append(cw_locs)
self.frames[V_START_AXIS][ROT_CW].insert(0, cw_locs)
self.frames[V_START_AXIS][ROT_CCW].insert(0, ccw_locs)
# Define keymap for ResponseCollector
self.keymap = KeyMap(
"speeded response", # Name
["spacebar"], # UI Label
["spacebar"], # Data Label
[SDLK_SPACE] # SDL2 Keycode
)
# Boundaries for stimuli
self.fixation_boundary = deg_to_px(3.0) # radius of 3 degrees of visual angle
self.el.add_boundary("drift_correct", [P.screen_c, self.fixation_boundary], CIRCLE_BOUNDARY)
def block(self):
block_num = P.block_number
block_count = P.blocks_per_experiment
# Display progress messages at start of blocks
if block_num > 1:
flush()
fill()
block_msg = "Completed block {0} of {1}. Press any key to continue."
block_msg = block_msg.format(block_num - 1, block_count)
message(block_msg, registration=5, location=P.screen_c)
flip()
any_key()
# When running participants, notify researcher at halfway point & last block via Slack
if P.slack_messaging and not P.development_mode:
if block_num == ((block_count+1)/2)+1: # If participant is halfway done
slack_message("Halfway done ({0}/{1})".format(block_num, block_count))
elif block_num == block_count: # If participant is on last block
slack_message("On last block ({0}/{1})".format(block_num, block_count))
def setup_response_collector(self):
# Configure ResponseCollector to read spacebar presses as responses and display
# the target during the collection period
box_locs_during_rc = self.box_axis_during_target()
self.rc.uses(RC_KEYPRESS)
self.rc.end_collection_event = "task end"
self.rc.display_callback = self.display_refresh
self.rc.display_args = [box_locs_during_rc, self.circle, None, self.target_location]
self.rc.flip = False
self.rc.keypress_listener.key_map = self.keymap
self.rc.keypress_listener.interrupts = True
def trial_prep(self):
# Infer the cue location based on starting axis (ie. left and top boxes are 'box 1',
# bottom and right are 'box 2')
if self.cue_location == BOX_1:
self.cue_location = LEFT if self.start_axis is H_START_AXIS else TOP
else:
self.cue_location = RIGHT if self.start_axis is H_START_AXIS else BOTTOM
# Reset trial flags
self.before_target = True
self.target_acquired = False
self.moved_eyes_during_rc = False
# Add timecourse of events to EventManager
self.evm.register_tickets([
("cross fix end", 300),
("circle fix end", 1100), #800ms after cross fix end
("cue end", 1400), #300ms after circle fix end
("circle box end", 1600), #200ms after cue end
("animation end", 1900), #300ms after circle box end
("asterisk end", 2060), #160ms after animation end
("task end", 4560) #2500ms after asterisk end
])
# Perform drift correct with red fixation cross, changing to white upon
# completion
self.display_refresh(self.start_axis, self.cross_r)
self.el.drift_correct(fill_color=BLACK, el_draw_fixation=EL_FALSE)
self.display_refresh(self.start_axis, self.cross_w)
flush()
def trial(self):
while self.evm.before("cross fix end"):
self.jc_wait_time()
self.display_refresh(self.start_axis, self.cross_w)
while self.evm.before("circle fix end"):
self.jc_wait_time()
self.display_refresh(self.start_axis, self.circle)
while self.evm.before("cue end"):
self.jc_wait_time()
self.display_refresh(self.start_axis, self.circle, cue=self.cue_location)
while self.evm.before("circle box end"):
self.jc_wait_time()
self.display_refresh(self.start_axis, self.circle)
current_frame = 0
while self.evm.before("animation end"):
self.jc_wait_time()
if self.animation_trial:
if current_frame < self.animation_frames:
if self.evm.trial_time_ms > (current_frame * self.frame_duration + 1600):
box_locs = self.frames[self.start_axis][self.rotation_dir][current_frame]
self.display_refresh(box_locs, self.asterisk)
current_frame += 1
else:
self.display_refresh(self.start_axis, self.asterisk)
while self.evm.before("asterisk end"):
self.display_refresh(self.box_axis_during_target(), self.circle)
self.jc_wait_time()
flush()
self.display_refresh(self.box_axis_during_target(), self.circle, target=self.target_location)
if P.saccade_response_cond:
self.jc_saccade_data()
keypress_rt = NA
if P.keypress_response_cond:
self.rc.collect()
keypress_rt = self.rc.keypress_listener.response(rt=True, value=False)
clear()
smart_sleep(1000)
if P.keypress_response_cond:
if self.target_location == "none" and keypress_rt != TIMEOUT:
fill()
message(self.err_msgs['early'], registration=5, location=P.screen_c)
flip()
any_key()
elif self.moved_eyes_during_rc:
fill()
message("Moved eyes during response interval!", registration=5, location=P.screen_c)
flip()
any_key()
return {
"block_num": P.block_number,
"trial_num": P.trial_number,
"session_type": 'saccade' if P.saccade_response_cond else 'keypress',
"cue_location": self.cue_location,
"target_location": self.target_location,
"start_axis": self.start_axis,
"box_rotation": self.rotation_dir if self.animation_trial else NA,
"animation_trial": str(self.animation_trial).upper(),
"target_acquired": str(self.target_acquired).upper() if P.saccade_response_cond else NA,
"keypress_rt": keypress_rt,
"moved_eyes": str(self.moved_eyes_during_rc).upper() if P.keypress_response_cond else NA
}
def trial_clean_up(self):
if P.trial_id and P.saccade_response_cond: # won't exist if trial recycled
#print self.saccades
#print "\n\n"
for s in self.saccades:
s['trial_id'] = P.trial_number
s['participant_id'] = P.participant_id
label = 't_{0}_saccade_{1}'.format(P.trial_number, self.saccades.index(s))
self.db.init_entry('saccades', label)
for f in s:
if f == "end_time":
continue
self.db.log(f, s[f])
self.db.insert()
self.saccades = []
self.target_acquired = False
def clean_up(self):
pass
def display_refresh(self, boxes=None, fixation=None, cue=None, target=None):
# In keypress condition, after target presented, check that gaze
# is still within fixation bounds and print message at end if not
if P.keypress_response_cond and self.before_target == False:
if lsl(self.el.gaze(), P.screen_c) > self.fixation_boundary:
self.moved_eyes_during_rc = True
fill()
if boxes is not None:
if iterable(boxes):
box_l = boxes
if boxes == V_START_AXIS:
box_l = [self.target_locs[TOP], self.target_locs[BOTTOM]]
if boxes == H_START_AXIS:
box_l = [self.target_locs[LEFT], self.target_locs[RIGHT]]
for l in box_l:
blit(self.box, 5, l)
if fixation is not None:
blit(fixation, 5, P.screen_c)
if cue:
blit(self.asterisk, 5, self.target_locs[cue])
if target:
if target != "none": # if not catch trial, show target
blit(self.circle, 5, self.target_locs[target])
if self.before_target:
self.before_target = False
flip()
def log_and_recycle_trial(self, err_type):
"""
Renders an error message to the screen and wait for a response. When a
response is made, the incomplete trial data is logged to the trial_err
table and the trial is recycled.
"""
flush()
fill()
message(self.err_msgs[err_type], registration=5, location=P.screen_c)
flip()
any_key()
err_data = {
"participant_id": P.participant_id,
"block_num": P.block_number,
"trial_num": P.trial_number,
"session_type": 'saccade' if P.saccade_response_cond else 'keypress',
"cue_location": self.cue_location,
"target_location": self.target_location,
"start_axis": self.start_axis,
"box_rotation": self.rotation_dir if self.animation_trial else NA,
"animation_trial": boolean_to_logical(self.animation_trial),
"err_type": err_type
}
self.database.insert(data=err_data, table="trials_err")
raise TrialException(self.err_msgs[err_type])
def jc_wait_time(self):
if self.before_target:
if lsl(self.el.gaze(), P.screen_c) > self.fixation_boundary:
self.log_and_recycle_trial('eye')
q = pump(True)
if key_pressed(queue=q):
if key_pressed(SDLK_SPACE, queue=q):
self.log_and_recycle_trial('early')
else:
self.log_and_recycle_trial('key')
def jc_saccade_data(self):
# following code is tidied up but otherwise borrowed from John Christie's original code
target_onset = self.el.now()
self.el.write("TARGETON %d" % target_onset)
while self.el.now() - target_onset < 2500 and not self.target_acquired:
self.display_refresh(self.box_axis_during_target(), self.circle, target=self.target_location)
pump() # refreshes TryLink event queue if using
queue = self.el.get_event_queue([EL_SACCADE_END])
for saccade in queue:
gaze = saccade.getEndGaze()
if lsl(gaze, P.screen_c) > self.fixation_boundary:
dist_from_target = lsl(gaze, self.target_locs[self.target_location])
accuracy = SACC_OUTSIDE if dist_from_target > self.fixation_boundary else SACC_INSIDE
if len(self.saccades):
duration = saccade.getStartTime() + 4 - self.saccades[-1]['end_time']
else:
duration = saccade.getStartTime() + 4 - target_onset
if len(self.saccades) < 3:
self.saccades.append({
"rt": saccade.getStartTime() - target_onset,
"accuracy": accuracy,
"dist_from_target": dist_from_target,
"start_x": saccade.getStartGaze()[0],
"start_y": saccade.getStartGaze()[1],
"end_x": saccade.getEndGaze()[0],
"end_y": saccade.getEndGaze()[1],
"end_time": saccade.getEndTime(),
"duration": duration
})
if dist_from_target <= self.fixation_boundary:
self.target_acquired = True
break
def box_axis_during_target(self):
if self.animation_trial:
if self.start_axis == V_START_AXIS:
return H_START_AXIS
if self.start_axis == H_START_AXIS:
return V_START_AXIS
else:
return self.start_axis