-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
motors in a separate process working version #8
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,42 +1,116 @@ | ||
import pyvisa | ||
from multiprocessing import Process | ||
from queue import Empty | ||
from time import sleep | ||
|
||
class MotorMaster(Process): | ||
|
||
def __init__( | ||
self, | ||
motors, | ||
input_queues, | ||
output_queues, | ||
close_setup_event, | ||
): | ||
super().__init__() | ||
|
||
self.input_queues = input_queues | ||
self.output_queues = output_queues | ||
self.close_setup_event = close_setup_event | ||
self.motors = motors | ||
self.positions = dict.fromkeys(motors) | ||
self.motors_running = True | ||
self.get_positions(first=True) | ||
|
||
def run(self) -> None: | ||
for ax in self.motors.keys(): | ||
self.motors[ax].open_communication() | ||
while not self.close_setup_event.is_set(): | ||
self.get_positions() | ||
self.move_motors() | ||
self.close_setups() | ||
|
||
def close_setups(self): | ||
for axis in self.motors.keys(): | ||
self.motors[axis].end_session() | ||
|
||
def get_positions(self, first=False): | ||
for axis in self.motors.keys(): | ||
if not first: | ||
actual_pos = self.motors[axis].get_position() | ||
else: | ||
actual_pos = self.motors[axis].get_position(first=True) | ||
# print("actual pos mort", axis, actual_pos) | ||
if actual_pos is not None: | ||
self.positions[axis] = actual_pos | ||
self.output_queues[axis].put(actual_pos) | ||
|
||
def move_motors(self): | ||
|
||
for axis in self.motors.keys(): | ||
package = self.get_last_entry(self.input_queues[axis]) | ||
if package: | ||
mov_value = package[0] | ||
mov_type = package[1].name | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. again, this goes against the spirit of using |
||
empty_queue = False | ||
else: | ||
empty_queue = True | ||
|
||
if empty_queue is False: | ||
if mov_type == "relative": | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the point of using Enums is to avoid string comparisons |
||
print(mov_value) | ||
self.motors[axis].move_rel(mov_value) | ||
elif mov_type == "absolute": | ||
self.motors[axis].move_abs(mov_value) | ||
|
||
@staticmethod | ||
def get_last_entry(queue): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this appears in many places, I suggest to factor it out |
||
out = tuple() | ||
while True: | ||
try: | ||
out = queue.get(timeout=0.001) | ||
except Empty: | ||
break | ||
return out | ||
|
||
|
||
class MotorControl: | ||
|
||
def __init__( | ||
self, | ||
port, | ||
baudrate=921600, | ||
parity=pyvisa.constants.Parity.none, | ||
encoding="ascii", | ||
axes=None, | ||
self, | ||
port, | ||
baudrate=921600, | ||
parity=pyvisa.constants.Parity.none, | ||
encoding="ascii", | ||
axis=None, | ||
): | ||
self.baudrate = baudrate | ||
self.parity = parity | ||
self.encoding = encoding | ||
self.port = port | ||
axes = self.find_axis(axes) | ||
axes = self.find_axis(axis) | ||
self.axes = str(axes) | ||
self.x = 0 | ||
self.y = 0 | ||
self.home_pos = None | ||
rm = pyvisa.ResourceManager() | ||
self.motor = rm.open_resource( | ||
port, baud_rate=baudrate, parity=parity, encoding=encoding, timeout=10 | ||
) | ||
self.start_session() | ||
self.motor = None | ||
self.connection = True | ||
|
||
def get_position(self): | ||
input_m = self.axes + "TP" | ||
try: | ||
output = self.motor.query(input_m) | ||
def get_position(self, first=False): | ||
if not first: | ||
input_m = self.axes + "TP" | ||
try: | ||
output = [float(s) for s in output.split(",")] | ||
except ValueError: | ||
print("Got ", output, "from motor, what to do?") | ||
return output[0] | ||
|
||
except pyvisa.VisaIOError: | ||
print(f"Error get position axes number {self.axes} ") | ||
return None | ||
output = self.motor.query(input_m) | ||
try: | ||
output = [float(s) for s in output.split(",")] | ||
except ValueError: | ||
print("Got ", output, "from motor", self.axes, ",what to do?") | ||
except pyvisa.VisaIOError: | ||
print(f"Error get position axes number {self.axes} ") | ||
output = [None] | ||
else: | ||
output = [None] | ||
return output[0] | ||
|
||
def move_abs(self, coordinate): | ||
coordinate = str(coordinate) | ||
|
@@ -104,6 +178,14 @@ def end_session(self): | |
self.motor.close() | ||
self.connection = False | ||
|
||
def open_communication(self): | ||
rm = pyvisa.ResourceManager() | ||
self.motor = rm.open_resource( | ||
self.port, baud_rate=self.baudrate, parity=self.parity, | ||
encoding=self.encoding, timeout=10 | ||
) | ||
self.start_session() | ||
|
||
@staticmethod | ||
def find_axis(axes): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. axes can be an Enum and this function just a dictionary lookup |
||
if axes == "x": | ||
|
@@ -113,3 +195,9 @@ def find_axis(axes): | |
elif axes == "z": | ||
axes = 3 | ||
return axes | ||
|
||
|
||
class PreMotor: | ||
@staticmethod | ||
def query(self): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what is this? |
||
return "None" |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,23 +11,33 @@ | |
|
||
from PyQt5.QtGui import QColor | ||
from PyQt5.QtCore import Qt, pyqtSignal, QPointF, QTimer | ||
from enum import Enum | ||
from queue import Empty | ||
|
||
class MovementType(Enum): | ||
absolute = True | ||
relative = False | ||
|
||
|
||
class MotionControlXYZ(QWidget): | ||
def __init__(self, motors): | ||
def __init__(self, input_queues, output_queues): | ||
super().__init__() | ||
self.setLayout(QGridLayout()) | ||
|
||
for key, value in motors.items(): | ||
wid = MotorSlider(name=key, motor=value) | ||
for axis in input_queues.keys(): | ||
wid = MotorSlider(name=axis, | ||
input_queue=input_queues[axis], | ||
output_queue=output_queues[axis] | ||
) | ||
self.layout().addWidget(wid) | ||
|
||
|
||
class PrecisionSingleSliderMotorControl(PrecisionSingleSlider): | ||
def __init__(self, *args, motor=None, pos=None, **kwargs): | ||
def __init__(self, *args, input_queue=None, output_queue=None, pos=None, **kwargs): | ||
super().__init__(*args, **kwargs) | ||
self.pos = pos | ||
self.motor = motor | ||
self.input_queue = input_queue | ||
self.output_queue = output_queue | ||
self.axes_pos = 0 | ||
self.indicator_color = QColor(178, 0, 0) | ||
|
||
|
@@ -60,21 +70,24 @@ class MotorSlider(QWidget): | |
sig_changed = pyqtSignal(float) | ||
sig_end_session = pyqtSignal() | ||
|
||
def __init__(self, motor=None, move_limit_low=-3, move_limit_high=3, name=""): | ||
def __init__(self, input_queue=None, output_queue=None, move_limit_low=-3, move_limit_high=3, name=""): | ||
super().__init__() | ||
self.name = name | ||
self.grid_layout = QGridLayout() | ||
self.grid_layout.setSpacing(0) | ||
self.grid_layout.setContentsMargins(0, 0, 0, 0) | ||
self.spin_val_desired_pos = QDoubleSpinBox() | ||
self.spin_val_actual_pos = QDoubleSpinBox() | ||
|
||
value = motor.home_pos | ||
self.input_queue = input_queue | ||
self.output_queue = output_queue | ||
self.mov_type = MovementType(False) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you would rather use the enum as MovementType.relative (the point of using Enums is that this mapping true, false or integers is arbitrary) |
||
value = get_next_entry(self.output_queue) | ||
min_range = value + move_limit_low | ||
max_range = value + move_limit_high | ||
self.position = value | ||
|
||
self.slider = PrecisionSingleSliderMotorControl( | ||
default_value=value, min=min_range, max=max_range, pos=value, motor=motor | ||
default_value=value, min=min_range, max=max_range, pos=value, input_queue=None, output_queue=None, | ||
) | ||
for spin_val in [self.spin_val_actual_pos, self.spin_val_desired_pos]: | ||
spin_val.setRange(min_range, max_range) | ||
|
@@ -95,38 +108,53 @@ def __init__(self, motor=None, move_limit_low=-3, move_limit_high=3, name=""): | |
|
||
self.setLayout(self.grid_layout) | ||
self.slider.sig_changed.connect(self.update_values) | ||
self.sig_changed.connect(self.slider.motor.move_abs) | ||
|
||
self._timer_painter = QTimer(self) | ||
self._timer_painter.timeout.connect(self.update_actual_pos) | ||
self._timer_painter.start() | ||
|
||
def update_actual_pos(self): | ||
if self.slider.motor.connection is True: | ||
pos = self.slider.motor.get_position() | ||
if pos is not None: | ||
self.spin_val_actual_pos.setValue(pos) | ||
self.slider.axes_pos = pos | ||
|
||
def update_values(self, val): | ||
self.spin_val_desired_pos.setValue(val) | ||
self.sig_changed.emit(val) | ||
while True: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this again is the queue quering pattern that should be in a separate, common function |
||
try: | ||
pos = self.output_queue.get(timeout=0.001) | ||
except Empty: | ||
break | ||
self.spin_val_actual_pos.setValue(pos) | ||
self.slider.axes_pos = pos | ||
self.position = pos | ||
|
||
def update_values(self, new_val): | ||
self.spin_val_desired_pos.setValue(new_val) | ||
displacement = new_val - self.position | ||
print("update_values", displacement) | ||
self.input_queue.put((displacement, self.mov_type)) | ||
|
||
def update_slider(self, new_val): | ||
self.slider.pos = new_val | ||
self.slider.update() | ||
self.sig_changed.emit(new_val) | ||
displacement = new_val - self.position | ||
print("update_slider", displacement) | ||
self.input_queue.put((displacement, self.mov_type)) | ||
|
||
def update_external(self, new_val): | ||
self.slider.pos = new_val | ||
self.spin_val_actual_pos.setValue(new_val) | ||
self.slider.update() | ||
|
||
|
||
def get_next_entry(queue): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this can be factored out in thecolonel (which can also get a new culinary name) |
||
out = None | ||
while out is None: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. again the pattern |
||
try: | ||
out = queue.get(timeout=0.001) | ||
except Empty: | ||
pass | ||
return out | ||
|
||
if __name__ == "__main__": | ||
app = QApplication([]) | ||
app.setStyleSheet(qdarkstyle.load_stylesheet_pyqt5()) | ||
layout = QHBoxLayout() | ||
win = MotionControlXYZ(None, None, None) | ||
win.show() | ||
app.exec_() | ||
app.exec_() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a bit mysterious, I suggest that get_last_entry (in a factored-out version) throws the Empty exception and then you can have the much more compact: