diff --git a/PyExpLabSys/drivers/keithley_2000.py b/PyExpLabSys/drivers/keithley_2000.py index e08dc797..ec4d6f96 100644 --- a/PyExpLabSys/drivers/keithley_2000.py +++ b/PyExpLabSys/drivers/keithley_2000.py @@ -1,34 +1,34 @@ """ Simple driver for Keithley 2000 DMM """ -from PyExpLabSys.drivers.scpi import SCPI +import time +import pyvisa -class Keithley2000(SCPI): + +class Keithley2000: """ Simple driver for Keithley 2000 DMM """ - def __init__( - self, interface, hostname='', device='', baudrate=9600, gpib_address=None - ): + def __init__(self, interface, device='', baudrate=9600, gpib_address=None): + rm = pyvisa.ResourceManager('@py') if interface == 'serial': - SCPI.__init__( - self, - interface=interface, - device=device, - baudrate=baudrate, - line_ending='\n', + conn_string = 'ASRL{}::INSTR'.format(device) + self.instr = rm.open_resource(conn_string) + self.instr.set_visa_attribute( + pyvisa.constants.VI_ATTR_ASRL_FLOW_CNTRL, + pyvisa.constants.VI_ASRL_FLOW_XON_XOFF, ) - self.comm_dev.timeout = 2 - self.comm_dev.rtscts = False - self.comm_dev.xonxoff = False + self.instr.read_termination = '\n' + self.instr.write_termination = '\n' + self.instr.baud_rate = baudrate if interface == 'gpib': - SCPI.__init__(self, interface=interface, gpib_address=gpib_address) + pass def set_bandwith(self, measurement='voltage:ac', bandwidth=None): scpi_cmd = 'SENSE:{}:DETector:BANDwidth'.format(measurement) if bandwidth is not None: - DMM.scpi_comm(scpi_cmd + ' {}'.format(bandwidth)) - value_raw = DMM.scpi_comm(scpi_cmd + '?') + self.instr.write(scpi_cmd + ' {}'.format(bandwidth)) + value_raw = self.instr.query(scpi_cmd + '?') value = float(value_raw) return value @@ -41,13 +41,15 @@ def set_range(self, value: float): if value < 0: value = 0 if value == 0: - self.scpi_comm(':SENSE:VOLT:DC:RANGE:AUTO ON') - self.scpi_comm(':SENSE:VOLT:AC:RANGE:AUTO ON') + self.instr.write(':SENSE:VOLT:DC:RANGE:AUTO ON') + self.instr.write(':SENSE:VOLT:AC:RANGE:AUTO ON') else: - self.scpi_comm(':SENSE:VOLT:DC:RANGE {:.5f}'.format(value)) - self.scpi_comm(':SENSE:VOLT:AC:RANGE {:.5f}'.format(value)) - - actual_range_raw = self.scpi_comm(':SENSE:VOLTAGE:AC:RANGE?') + self.instr.write(':SENSE:VOLT:DC:RANGE {:.5f}'.format(value)) + self.instr.write(':SENSE:VOLT:AC:RANGE {:.5f}'.format(value)) + # The instrument cannot process this infinitiely fast and might end up + # lacking behind and filling up buffers - wait a little while to prevent this + # time.sleep(0.4) + actual_range_raw = self.instr.query(':SENSE:VOLTAGE:AC:RANGE?') actual_range = float(actual_range_raw) return actual_range @@ -55,32 +57,57 @@ def set_integration_time(self, nplc: float = None): """ Set the measurement integration time """ + + if 'AC' in self.configure_measurement_type(): + set_msg = 'SENSE:VOLTAGE:AC:NPLCYCLES {}' + read_msg = 'SENSE:VOLTAGE:AC:NPLCYCLES?' + else: + set_msg = 'SENSE:VOLTAGE:DC:NPLCYCLES {}' + read_msg = 'SENSE:VOLTAGE:DC:NPLCYCLES?' + if nplc is not None: if nplc < 0.01: nplc = 0.01 if nplc > 60: nplc = 60 - self.scpi_comm('SENSE:VOLTAGE:AC:NPLCYCLES {}'.format(nplc)) - # self.scpi_comm('SENSE:VOLTAGE:DC:NPLCYCLES {}'.format(nplc)) - current_nplc = float(self.scpi_comm('SENSE:VOLTAGE:AC:NPLCYCLES?')) + self.instr.write(set_msg.format(nplc)) + current_nplc = float(self.instr.query(read_msg)) return current_nplc def configure_measurement_type(self, measurement_type=None): - """ Setup measurement type """ + """Setup measurement type""" if measurement_type is not None: # todo: Ensure type is an allow type!!!! - self.scpi_comm(':CONFIGURE:{}'.format(measurement_type)) - actual = self.scpi_comm(':CONFIGURE?') + self.instr.write(':CONFIGURE:{}'.format(measurement_type)) + actual = self.instr.query(':CONFIGURE?') return actual + def set_trigger_source(self, external): + """ + Set the trigger source either to external or immediate. + If external is true, trigger will be set accordingly + otherwise immediate triggering will be chosen. + """ + if external: + self.instr.write('TRIGGER:SOURCE External') + else: + self.instr.write('TRIGGER:SOURCE Immediate') + return external + + def read_dc_voltage(self): + """Read a voltage""" + raw = self.instr.query(':MEASURE:VOLTAGE:DC?') + voltage = float(raw) + return voltage + def read_ac_voltage(self): - """ Read a voltage """ - raw = self.scpi_comm(':MEASURE:VOLTAGE:AC?') + """Read a voltage""" + raw = self.instr.query(':MEASURE:VOLTAGE:AC?') voltage = float(raw) return voltage def next_reading(self): - """ Read next reading """ + """Read next reading""" t0 = time.time() while not self.measurement_available(): time.sleep(0.001) @@ -88,40 +115,44 @@ def next_reading(self): # Todo: This is not good enough print('Keithley 2000 TIMEOUT!') break - raw = self.scpi_comm(':DATA?') + raw = self.instr.query(':DATA?') voltage = float(raw) return voltage def measurement_available(self): - meas_event = int(self.scpi_comm('STATUS:MEASUREMENT:EVENT?')) + # todo: Check if pyvisa has some neat trick for this + meas_event = int(self.instr.query('STATUS:MEASUREMENT:EVENT?')) mav_bit = 5 mav = (meas_event & 2 ** mav_bit) == 2 ** mav_bit return mav if __name__ == '__main__': - import time - - GPIB = 16 - DMM = Keithley2000(interface='gpib', gpib_address=GPIB) + device = '/dev/serial/by-id/usb-FTDI_Chipi-X_FT6EYK1T-if00-port0' + DMM = Keithley2000( + interface='serial', + device=device, + baudrate=9600, + ) + print(repr(DMM.instr.query("*IDN?"))) + print(DMM.set_bandwith()) + + # Errors: + # +802 - RS232 overrun + # -113 - Undefined header + DMM.set_trigger_source(external=False) + + DMM.set_range(0) + for _ in range(0, 10): + # print() + # print(DMM.read_software_version()) + # DMM.set_trigger_source(external=False) + voltage = DMM.next_reading() + print(voltage) # TODO! Something changes with the configuration when this # command is called, measurement is much slower and # NPLC command fails?!?! # print(DMM.configure_measurement_type('volt:ac')) - - DMM.set_range(0.1) - print(DMM.set_integration_time(2)) - # print(DMM.set_bandwith()) - # print(DMM.set_integration_time(10)) - - for i in range(0, 20): - # time.sleep(0.05) - t = time.time() - # meas_event = DMM.scpi_comm('STATUS:MEASUREMENT:EVENT?') - # print(bin(int(meas_event))) - while not DMM.measurement_available(): - time.sleep(0.05) - reading = DMM.next_reading() - dt = time.time() - t - print('Time: {:.2f}ms. AC {:.3f}uV'.format(dt * 1e3, reading * 1e6)) + # print(DMM.read_dc_voltage()) + # print(DMM.read_dc_voltage()) diff --git a/PyExpLabSys/drivers/keithley_2182.py b/PyExpLabSys/drivers/keithley_2182.py index ad1939f4..49919689 100644 --- a/PyExpLabSys/drivers/keithley_2182.py +++ b/PyExpLabSys/drivers/keithley_2182.py @@ -1,58 +1,53 @@ """ Simple driver for Keithley 2182 Nanovolt Meter """ -from PyExpLabSys.drivers.scpi import SCPI +import time +import pyvisa -class Keithley2182(SCPI): + +class Keithley2182: """ Simple driver for Keithley 2182 Nanovolt Meter Actual implementation performed on a 2182a - please double check if you have a 2182. """ - def __init__( - self, interface, hostname='', device='', baudrate=9600, gpib_address=None - ): + def __init__(self, interface, hostname='', device='', baudrate=19200): + rm = pyvisa.ResourceManager('@py') if interface == 'serial': - SCPI.__init__( - self, - interface=interface, - device=device, - baudrate=baudrate, - line_ending='\n', - ) - self.comm_dev.timeout = 2 - self.comm_dev.rtscts = False - self.comm_dev.xonxoff = False - if interface == 'gpib': - SCPI.__init__(self, interface=interface, gpib_address=gpib_address) - + conn_string = 'ASRL{}::INSTR'.format(device) + self.instr = rm.open_resource(conn_string) + self.instr.read_termination = '\r' + self.instr.write_termination = '\r' + self.instr.baud_rate = baudrate # For now, turn off continous trigger - this might need reconsideration - self.scpi_comm('INIT:CONT OFF') + # self.scpi_comm('INIT:CONT OFF') def set_range(self, channel1: float = None, channel2: float = None): """ Set the measurement range of the device, 0 will indicate auto-range """ + print('Set range of channel 1') if channel1 is not None: if channel1 > 120: channel1 = 120 if channel1 == 0: - self.scpi_comm(':SENSE:VOLT:CHANNEL1:RANGE:AUTO ON') + self.instr.write(':SENSE:VOLT:CHANNEL1:RANGE:AUTO ON') else: - self.scpi_comm(':SENSE:VOLT:CHANNEL1:RANGE {:.2f}'.format(channel1)) + self.instr.write(':SENSE:VOLT:CHANNEL1:RANGE {:.2f}'.format(channel1)) if channel2 is not None: if channel2 > 12: channel2 = 12 if channel2 == 0: - self.scpi_comm(':SENSE:VOLTAGE:CHANNEL2:RANGE:AUTO ON') + self.instr.write(':SENSE:VOLTAGE:CHANNEL2:RANGE:AUTO ON') else: - self.scpi_comm(':SENSE:VOLT:CHANNEL2:RANGE {:.2f}'.format(channel2)) - - actual_channel1_raw = self.scpi_comm(':SENSE:VOLTAGE:CHANNEL1:RANGE?') - actual_channel2_raw = self.scpi_comm(':SENSE:VOLTAGE:CHANNEL2:RANGE?') + self.instr.write(':SENSE:VOLT:CHANNEL2:RANGE {:.2f}'.format(channel2)) + print('Check the actual range') + actual_channel1_raw = self.instr.query(':SENSE:VOLTAGE:CHANNEL1:RANGE?') + actual_channel2_raw = self.instr.query(':SENSE:VOLTAGE:CHANNEL2:RANGE?') range1 = float(actual_channel1_raw) range2 = float(actual_channel2_raw) + print('Value is: ', range1) return range1, range2 def set_integration_time(self, nplc: float = None): @@ -64,29 +59,61 @@ def set_integration_time(self, nplc: float = None): nplc = 0.01 if nplc > 60: nplc = 60 - self.scpi_comm('SENSE:VOLTAGE:NPLCYCLES {}'.format(nplc)) - current_nplc = float(self.scpi_comm('SENSE:VOLTAGE:NPLCYCLES?')) + self.instr.write('SENSE:VOLTAGE:NPLCYCLES {}'.format(nplc)) + # print('waiting....') + time.sleep(nplc * 0.25) + # print('done') + current_nplc = float(self.instr.query('SENSE:VOLTAGE:NPLCYCLES?')) return current_nplc + def set_trigger_source(self, external): + """ + Set the trigger source either to external or immediate. + If external is true, trigger will be set accordingly + otherwise immediate triggering will be chosen. + """ + if external: + self.instr.write(':TRIGGER:SOURCE External') + else: + self.instr.write(':TRIGGER:SOURCE Immediate') + return external + + def read_fresh(self): + """ + Read a single value from current channel. This will also be a new value + (or will fail if channel is not trigged. + """ + raw = self.instr.query(':DATA:FRESh?') # DF? also works + try: + voltage = float(raw) + except ValueError: + voltage = None + return voltage + def read_voltage(self, channel: int): - """ Read the measured voltage """ + """Read the measured voltage""" if channel not in (1, 2): return None - self.scpi_comm(":SENSE:FUNC 'VOLT:DC'") - self.scpi_comm(':SENSE:CHANNEL {}'.format(channel)) - raw = self.scpi_comm(':READ?') + self.instr.write(":SENSE:FUNC 'VOLT:DC'") + time.sleep(0.5) + self.instr.write(':SENSE:CHANNEL {}'.format(channel)) + time.sleep(0.5) + # raw = self.instr.query(':READ?') + raw = self.read_fresh() voltage = float(raw) return voltage if __name__ == '__main__': - GPIB = 7 - NVM = Keithley2182(interface='gpib', gpib_address=GPIB) - - print(NVM.set_range(1, 0.01)) - print(NVM.set_integration_time(10)) + # usb-1a86_USB2.0-Ser_-if00-port0 # Vxx + # usb-FTDI_Chipi-X_FT6EYK1T-if00-port0 # DMM + # usb-FTDI_Chipi-X_FT6F1A7R-if00-port0 # Old gate + # usb-Prolific_Technology_Inc._USB-Serial_Controller_D-if00-port0 # Vxy + # NVM = Keithley2182(interface='gpib', gpib_address=GPIB) + NVM = Keithley2182( + interface='serial', + device='/dev/serial/by-id/usb-1a86_USB2.0-Ser_-if00-port0', + ) - for i in range(0, 10): - print() - print('Channel 1: {:.3f}uV'.format(NVM.read_voltage(1) * 1e6)) - print('Channel 2: {:.3f}uV'.format(NVM.read_voltage(2) * 1e6)) + print(NVM.instr.query('*IDN?')) + print(NVM.set_range(1, 0.1)) diff --git a/PyExpLabSys/drivers/keithley_2400.py b/PyExpLabSys/drivers/keithley_2400.py index 52166ccb..af2066c7 100644 --- a/PyExpLabSys/drivers/keithley_2400.py +++ b/PyExpLabSys/drivers/keithley_2400.py @@ -1,42 +1,42 @@ """ Simple driver for Keithley 2400 SMU """ -from PyExpLabSys.drivers.scpi import SCPI +import pyvisa -class Keithley2400(SCPI): - """ Simple driver for Keithley 2400 SMU """ +class Keithley2400: + """Simple driver for Keithley 2400 SMU""" - def __init__( - self, interface, hostname='', device='', baudrate=9600, gpib_address=None - ): + def __init__(self, interface, device='', baudrate=9600, gpib_address=None): + rm = pyvisa.ResourceManager('@py') if interface == 'serial': - SCPI.__init__( - self, - interface=interface, - device=device, - baudrate=baudrate, - line_ending='\n', - ) - self.comm_dev.timeout = 2 - self.comm_dev.rtscts = False - self.comm_dev.xonxoff = False + conn_string = 'ASRL{}::INSTR'.format(device) + self.instr = rm.open_resource(conn_string) + # self.instr.set_visa_attribute( + # pyvisa.constants.VI_ATTR_ASRL_FLOW_CNTRL, + # pyvisa.constants.VI_ASRL_FLOW_XON_XOFF, + # ) + self.instr.read_termination = '\n' + self.instr.write_termination = '\n' + self.instr.baud_rate = baudrate if interface == 'lan': - SCPI.__init__(self, interface=interface, hostname=hostname) + # The 2400 actually no not have a LAN interface, but 2450 do + conn_string = 'TCPIP::{}::inst0::INSTR'.format(device) + self.instr = rm.open_resource(conn_string) if interface == 'gpib': - SCPI.__init__(self, interface=interface, gpib_address=gpib_address) + pass def output_state(self, output_state: bool = None): - """ Turn the output on or off """ + """Turn the output on or off""" if output_state is not None: if output_state: - self.scpi_comm('OUTPUT:STATE 1') + self.instr.write('OUTPUT:STATE 1') else: - self.scpi_comm('OUTPUT:STATE 0') - actual_state_raw = self.scpi_comm('OUTPUT:STATE?') + self.instr.write('OUTPUT:STATE 0') + actual_state_raw = self.instr.query('OUTPUT:STATE?') actual_state = actual_state_raw[0] == '1' return actual_state def set_current_measure_range(self, current_range=None): - """ Set the current measurement range """ + """Set the current measurement range""" # TODO! raise NotImplementedError @@ -51,12 +51,13 @@ def set_integration_time(self, nplc: float = None): nplc = 0.01 if nplc > 10: nplc = 10 - self.scpi_comm('SENSE:CURRENT:NPLCYCLES {}'.format(nplc)) - self.scpi_comm('SENSE:VOLTAGE:NPLCYCLES {}'.format(nplc)) - current_nplc = float(self.scpi_comm('SENSE:CURRENT:NPLCYCLES?')) + self.instr.write('SENSE:CURRENT:NPLCYCLES {}'.format(nplc)) + self.instr.write('SENSE:VOLTAGE:NPLCYCLES {}'.format(nplc)) + current_nplc = float(self.instr.query('SENSE:CURRENT:NPLCYCLES?')) return current_nplc - def _parse_status(self, status_string): + @staticmethod + def _parse_status(status_string): status_table = { 0: ('OFLO', 'Measurement was made while in over-range'), 1: ('Filter', 'Measurement was made with the filter enabled'), @@ -100,26 +101,29 @@ def read_current(self): Returns None if the output is off. """ if self.output_state(): - raw = self.scpi_comm('MEASURE:CURRENT?') + raw = self.instr.query('MEASURE:CURRENT?') else: raw = None if raw is None: return - # Values are: voltage, current, ohm, time, status - # Only the current is measured, voltage is either - # NaN or the source-setpoint. - values = raw.split(',') - current = float(values[1]) + if ',' in raw: + # Values are: voltage, current, ohm, time, status + # Only the current is measured, voltage is either + # NaN or the source-setpoint. + values = raw.split(',') + current = float(values[1]) + else: + current = float(raw) # timestamp = float(values[3]) # print(self._parse_status(values[4])) # Also return timestamp? return current def read_voltage(self): - """ Read the measured voltage """ + """Read the measured voltage""" if self.output_state(): - raw = self.scpi_comm('MEASURE:VOLTAGE?') + raw = self.instr.query('MEASURE:VOLTAGE?') else: raw = None if raw is None: @@ -135,58 +139,134 @@ def read_voltage(self): # Also return timestamp? return voltage - def set_source_function(self, function=None): + def set_source_function(self, function=None, source_range=None): if function in ('i', 'I'): - self.scpi_comm('SOURCE:FUNCTION CURRENT') + self.instr.write('SOURCE:FUNCTION CURRENT') + if source_range: + self.instr.write(':SOURCE:CURRENT:RANGE {}'.format(source_range)) if function in ('v', 'V'): - self.scpi_comm('SOURCE:FUNCTION VOLTAGE') - actual_function = self.scpi_comm('SOURCE:FUNCTION?') + self.instr.write('SOURCE:FUNCTION VOLTAGE') + if source_range: + self.instr.write(':SOURCE:VOLTAGE:RANGE {}'.format(source_range)) + + actual_function = self.instr.query('SOURCE:FUNCTION?') return actual_function + def set_sense_function(self, function, sense_range=None): + """ + Set the sense range, a value of None returns the current value without + changing the actual value. A range value of 0 indicates auto-range. + """ + if function.lower() in ('i' 'current'): + self.instr.write(':SENSE:FUNCTION:ON "CURRENT"') + if sense_range == 0: + self.instr.write(':SENSE:CURRENT:RANGE:AUTO ON') + else: + self.instr.write(':SENSE:CURRENT:RANGE {}'.format(sense_range)) + if function.lower() in ('v', 'voltage'): + # TODO: Configure read-back!!! + self.instr.write(':SENSE:FUNCTION:ON "VOLTAGE"') + if sense_range == 0: + self.instr.write(':SENSE:VOLTAGE:RANGE:AUTO ON') + else: + self.instr.write(':SENSE:VOLTAGE:RANGE {}'.format(sense_range)) + + # Double read was needed with the old scpi module, hopefully no wwith pyvisa + # self.scpi_comm(':SENSE:FUNCTION:ON?') + # self.clear_buffer() + raw = self.instr.query(':SENSE:FUNCTION:ON?') + return raw + def set_current_limit(self, current: float = None): - """ Set the desired current limit """ + """Set the desired current limit""" if current is not None: - self.scpi_comm('CURRENT:PROTECTION {:.9f}'.format(current)) - actual = self.scpi_comm('CURRENT:PROTECTION?') + self.instr.write('CURRENT:PROTECTION {:.9f}'.format(current)) + print('K2400: LIMIT IS NOW: ', current) + raw = self.instr.query('CURRENT:PROTECTION?') + actual = float(raw) return actual def set_voltage_limit(self, voltage: float = None): - """ Set the desired voltate limit """ + """Set the desired voltate limit""" if voltage is not None: - self.scpi_comm('VOLTAGE:PROTECTION {:.9f}'.format(voltage)) - actual = self.scpi_comm('VOLTAGE:PROTECTION?') + self.instr.write('VOLTAGE:PROTECTION {:.9f}'.format(voltage)) + raw = self.instr.query('VOLTAGE:PROTECTION?') + actual = float(raw) return actual def set_current(self, current: float): - """ Set the desired current """ - self.scpi_comm('SOURCE:CURRENT {:.9f}'.format(current)) + """Set the desired current""" + self.instr.write('SOURCE:CURRENT {:.9f}'.format(current)) return True + def set_voltage_range(self, voltage: float): + self.instr.write(':SOURCE:VOLTAGE:RANGE {:.9f}'.format(voltage)) + def set_voltage(self, voltage: float): - """ Set the desired current """ - self.scpi_comm('SOURCE:VOLT {:.9f}'.format(voltage)) + """Set the desired current""" + self.instr.write('SOURCE:VOLT {:.9f}'.format(voltage)) return True + def read_volt_and_current(self): + # TODO - CONFIGURE TO ENSURE RIGHT SYNTAX OF RETURN! + # Also try to configure to measure actual voltage + # self.scpi_comm(':INIT') + raw = self.instr.query(':READ?') + fields = raw.split(',') + voltage = float(fields[0]) + current = float(fields[1]) + return voltage, current + if __name__ == '__main__': - GPIB = 22 - SMU = Keithley2400(interface='gpib', gpib_address=GPIB) + import time + + device = '/dev/serial/by-id/usb-FTDI_Chipi-X_FT6F1A7R-if00-port0' + SMU = Keithley2400( + interface='serial', + device=device, + baudrate=19200, + ) + print(SMU.instr.query('*IDN?')) + exit() + + # GPIB = 22 + # SMU = Keithley2400(interface='gpib', gpib_address=GPIB) + SMU.set_source_function('v') SMU.output_state(True) - print(SMU.set_current_limit(100e-6)) + # SMU.set_voltage_range(voltage=90) + SMU.set_voltage(0.12) - # SMU.set_voltage_limit(1e-1) - SMU.set_voltage(0.0) - print(SMU.read_software_version()) - # SMU.output_state(True) + print(SMU.read_current()) + SMU.set_current_limit(1e-7) - # print(SMU.output_state()) + time.sleep(1) - current = SMU.read_current() - voltage = SMU.read_voltage() + cmd = 'READ?' + for i in range(0, 50): + print(SMU.read_volt_and_current()) - print( - 'Current: {:.1f}uA. Voltage: {:.2f}mV. Resistance: {:.1f}ohm'.format( - current * 1e6, voltage * 1000, voltage / current - ) - ) + # print(SMU.read_current()) + + exit() + + # # print(SMU.scpi_comm(':TRIGGER:OUTPUT SENSE')) + print(SMU.scpi_comm(':TRIGGER:OUTPUT SOURCE')) + print(SMU.scpi_comm(':TRIGGER:OUTPUT?')) + + print(SMU.scpi_comm(':TRIGGER:OLINE?')) + print(SMU.scpi_comm(':TRIGGER:OLINE 2')) + + # for i in range(0, 10): + # SMU.set_voltage(i / 10.0) + # time.sleep(0.5) + # print(SMU.scpi_comm(':TRIGGER:OUTPUT SENSE')) + # current = SMU.read_current() + # print(SMU.scpi_comm(':TRIGGER:OUTPUT NONE')) + # voltage = SMU.read_voltage() + # print( + # 'Current: {:.1f}uA. Voltage: {:.2f}mV. Resistance: {:.1f}ohm'.format( + # current * 1e6, voltage * 1000, voltage / current + # ) + # ) diff --git a/PyExpLabSys/drivers/keithley_2450.py b/PyExpLabSys/drivers/keithley_2450.py new file mode 100644 index 00000000..e3e998bf --- /dev/null +++ b/PyExpLabSys/drivers/keithley_2450.py @@ -0,0 +1,332 @@ +""" +Simple driver for Keithley 2450 SMU +""" +import time +from PyExpLabSys.drivers.keithley_2400 import Keithley2400 + +# Notice, SOURCE will report either setpoint or readback +# depending on configuration +# TODO: Allow setting of the READBACK parameter + +# Notice: The code is NOT abstract towards this list... changing it will +# result in values being intermixed +STANDARD_READ_PARAMS = [ + 'READING', + 'SOURCE', + 'RELATIVE', + 'SOURSTATUS', + 'STATUS', + 'SOURUNIT', + 'UNIT', +] + + +class Keithley2450(Keithley2400): + """Simple driver for Keithley 2450 SMU""" + + def __init__( + self, interface, hostname='', device='', baudrate=9600, gpib_address=None + ): + super().__init__( + interface=interface, + device=device, + ) + self.srp = ', '.join(STANDARD_READ_PARAMS) + self.latest_fetch_time = 0 # Keeps latest result from FETCh? + + # print(self.clear_comm_buffer()) + # error = self.scpi_comm("*ESR?") + # self.scpi_comm("*cls") + + @staticmethod + def _parse_raw_reading(reading): + reading_list = reading.split(',') + if len(reading_list) < len(STANDARD_READ_PARAMS): + # Reading is malformed + return None + + value = float(reading_list[0]) + source_value = float(reading_list[1]) + delta_time = float(reading_list[2]) + + source_status = reading_list[3] # This field seems to be undocumented? + status = reading_list[4] # TODO: Parse this! + + source_unit = reading_list[5] + value_unit = reading_list[6] + + reading = { + 'value': value, + 'source_value': source_value, + 'delta_time': delta_time, + 'source_status': source_status, + 'status': status, + 'source_unit': source_unit, + 'value_unit': value_unit, + } + return reading + + # *********************************** # + # Refactor these two + + # Notice: This function is actually different between 2400 and 2450 + # many of the other changes can be backported to the 2400 driver + def set_voltage_limit(self, voltage: float = None): + """Set the desired voltage limit""" + if voltage is not None: + self.instr.write(':SOURCE:CURRENT:VLIMIT {:.9f}'.format(voltage)) + actual = self.instr.query(':SOURCE:CURRENT:VLIMIT?') + return actual + + # Notice: This function is actually different between 2400 and 2450 + # many of the other changes can be backported to the 2400 driver + def set_current_limit(self, current: float = None): + """Set the desired voltage limit""" + if current is not None: + self.instr.write(':SOURCE:VOLTAGE:ILIMIT {:.9f}'.format(current)) + actual = self.instr.query(':SOURCE:VOLTAGE:ILIMIT?') + return actual + + # *********************************** # + + def set_auto_zero(self, function: str, action: bool = None): + """ + Set auto-zero behaviour for a given function (voltage or current). + Action can be 'on', 'off', or None + """ + if function.lower() == 'i': + scpi_function = 'CURRENT' + elif function.lower() == 'v': + scpi_function = 'VOLTAGE' + else: + raise Exception('Function not allowed: {}'.format(function)) + + if action is not None: + if action: + scpi_action = 'On' + else: + scpi_action = 'Off' + + if scpi_action is not None: + cmd = ':SENSE:{}:AZERO {}'.format(scpi_function, scpi_action) + self.instr.write(cmd) + + cmd = ':SENSE:{}:AZERO?'.format(scpi_function) + reply = self.instr.query(cmd) + return reply + + def auto_zero_now(self): + """ + Perform a single auto-zero + """ + cmd = ':SENSE:AZERO:ONCE' + self.instr.write(cmd) + return True + + def remote_sense(self, action: bool = None): + if action is not None: + if action: + scpi_action = 'On' + else: + scpi_action = 'Off' + cmd = 'SENSE:{}:RSENSE {}' + self.instr.write(cmd.format('VOLTAGE', scpi_action)) + self.instr.write(cmd.format('CURRENT', scpi_action)) + self.instr.write(cmd.format('RESISTANCE', scpi_action)) + cmd = 'SENSE:VOLTAGE:RSENSE?' + reply = self.instr.query(cmd) + return reply + + def trigger_measurement(self, buffer='defbuffer1'): + cmd = ':TRACE:TRIGGER "{}"; *TRG;'.format(buffer) + self.instr.write(cmd) + return True + + def make_buffer(self, buffer_name, size=10): + """ + Make a buffer of type FULL and fillmode continous + """ + if size < 10: + return False + cmd = ':TRACE:MAKE "{}", {}, FULL'.format(buffer_name, size) + self.instr.write(cmd) + + cmd = ':TRACE:FILL:MODE CONTINUOUS, "{}"'.format(buffer_name) + self.instr.write(cmd) + actual_readings = self.elements_in_buffer(buffer_name) + return True + + def elements_in_buffer(self, buffer='defbuffer1'): + raw = self.instr.query(':TRACE:ACTUAL? "{}"'.format(buffer)) + actual_readings = int(raw) + return actual_readings + + def clear_buffer(self, buffer='defbuffer1'): + actual_readings = self.elements_in_buffer(buffer) + if actual_readings > 0: + self.instr.write(':TRACE:CLEAR "{}"'.format(buffer)) + actual_readings = self.elements_in_buffer(buffer) + return actual_readings + + def read_from_buffer(self, start, stop, buffer='defbuffer1'): + data = [] + for i in range(start, stop + 1): + # cmd = 'TRACE:DATA? {}, {}, "{}", {}'.format(start, stop, buffer, self.srp) + cmd = 'TRACE:DATA? {}, {}, "{}", {}'.format(i, i, buffer, self.srp) + raw = self.instr.query(cmd) + reading = self._parse_raw_reading(raw) + data.append(reading) + return data + + def read_latest(self, buffer='defbuffer1'): + cmd = ':FETCH? "{}", {}'.format(buffer, self.srp) + + dt = 0 + iteration = 0 + while dt <= self.latest_fetch_time: + if iteration > 10: + print('Seems we have missed a trigger...') + self.trigger_measurement(buffer) + iteration = 0 + iteration += 1 + try: + t = time.time() + raw = self.instr.query(cmd) + # print('k2540 raw read', time.time() - t, ' ', self.latest_fetch_time) + reading = self._parse_raw_reading(raw) + # print(reading) + if reading is None: + continue + dt = reading['delta_time'] + except ValueError: + pass + self.latest_fetch_time = dt + return reading + + def configure_digital_port_as_triggers(self, channel_list=[]): + """ + Configure the DB9 digital output to be configured as + individual outputs, that can be (mis)used as triggers. + """ + # Manual trigger of lines can be achived like this: + # :DIGital:LINE{}:MODE DIGITAL, OUT' + # :DIGital:LINE{}:STATE 0 + # time.sleep(0.1) + # :DIGital:LINE{}:STATE 1 + # time.sleep(0.025) + # :DIGital:LINE{}:STATE 0 + if not channel_list: + channel_list = [1, 2, 3, 4, 5, 6] + for i in range(1, 7): + if i in channel_list: + cmd = ':DIGital:LINE{}:MODE TRIG, OPENdrain'.format(i) + self.instr.write(cmd) + cmd = ':TRIGger:DIGital{}:OUT:PULSewidth 1e-5'.format(i) + self.instr.write(cmd) + cmd = ':TRIGger:DIGital{}:OUT:STIMulus COMMAND'.format(i) + self.instr.write(cmd) + else: + cmd = ':DIGital:LINE{}:MODE DIGItal, IN'.format(i) + self.instr.write(cmd) + + +if __name__ == '__main__': + import time + + SMU = Keithley2450(interface='lan', device='192.168.0.30') + t = time.time() + SMU.trigger_measurement() + print(time.time() - t) + # SMU.instr.write('*TRG') + # print(time.time() - t) + exit() + + # SMU.configure_digital_port_as_triggers() + # SMU.clear_buffer() + for i in range(0, 100): + SMU.trigger_measurement() + SMU.instr.write('*TRG') + # SMU.instr.assert_trigger() + # time.sleep(0.1) + exit() + + while True: + time.sleep(1.5) + print('Trig') + SMU.instr.trigger_measurement() + # SMU.instr.write('*TRG') + # print(SMU.read_latest()) + exit() + + SMU.set_source_function('i', source_range=1e-2) + SMU.set_sense_function('v', sense_range=0) + # SMU.set_voltage_limit(2) + # SMU.set_sense_function('v', sense_range=2) + # SMU.source.set_current(0) + + # print(SMU.remote_sense(True)) + + # buffer = 'gate_data' + # buffer = 'iv_data' + + SMU.set_voltage(0) + SMU.output_state(True) + + SMU.set_integration_time(1) + + # print('NPLC') + # print(SMU.scpi_comm(':SENSE:CURRENT:NPLCYCLES?')) + # print(SMU.scpi_comm(':SENSE:VOLTAGE:NPLCYCLES?')) + # print(SMU.scpi_comm(':SENSE:CURRENT:NPLCYCLES 0.1')) + # print(SMU.scpi_comm(':SENSE:VOLTAGE:NPLCYCLES 0.1')) + + for i in range(0, 1000): + print() + SMU.trigger_measurement() + # time.sleep(0.15) + t = time.time() + latest = SMU.read_latest() + dt = time.time() - t + print(latest, dt) + exit() + + current = 1e-2 + print(SMU.set_sense_function(function='i', sense_range=0)) + print(SMU.set_source_function(function='v', source_range=current)) + print(SMU.set_current_limit(current=current)) + print(SMU.set_sense_function(function='i', sense_range=current)) + exit() + print(SMU.set_source_function(function='v', source_range=1.7)) + print(SMU.set_sense_function(function='i', sense_range=0.14)) + exit() + + print('klaf') + for i in range(0, 10): + print(i) + SMU.set_voltage(i * 0.1) + SMU.trigger_measurement('gate_data') + print(SMU.read_latest('gate_data')) + time.sleep(0.25) + exit() + # print(print(SMU.configure_measurement_function('i'))) + SMU.trigger_measurement('gate_data') + print(SMU.read_latest('gate_data')) + exit() + print(print(SMU.configure_measurement_function('i'))) + print() + for i in range(0, 10): + SMU.trigger_measurement('iv_data') + print(SMU.read_latest('iv_data')) + + exit() + + SMU.set_source_function('v') + SMU.set_current_limit(1e-3) + + SMU.set_voltage(0.001) + + for i in range(0, 20): + SMU.set_voltage(0.001 * i) + SMU.trigger_measurement('iv_data') + print(SMU.read_latest('iv_data')) + exit() diff --git a/PyExpLabSys/drivers/keithley_6220.py b/PyExpLabSys/drivers/keithley_6220.py index e770e49a..4b27907b 100644 --- a/PyExpLabSys/drivers/keithley_6220.py +++ b/PyExpLabSys/drivers/keithley_6220.py @@ -1,38 +1,34 @@ """ Simple driver for Keithley 6220 SMU """ import time -from PyExpLabSys.drivers.scpi import SCPI +import pyvisa -class Keithley6220(SCPI): + +class Keithley6220: """ Simple driver for Keithley 6200 SMU Actual implementation performed on a 6221 - please double check if you have a 6220. """ - def __init__( - self, interface, hostname='', device='', baudrate=19200, gpib_address=None - ): + def __init__(self, interface, path=''): + rm = pyvisa.ResourceManager('@py') if interface == 'serial': - SCPI.__init__( - self, - interface=interface, - device=device, - baudrate=baudrate, - line_ending='\r', - ) - self.comm_dev.timeout = 2 - # self.comm_dev.rtscts = False - self.comm_dev.xonxoff = False - if interface == 'lan': - SCPI.__init__(self, interface=interface, hostname=hostname) + pass if interface == 'gpib': - SCPI.__init__(self, interface=interface, gpib_address=gpib_address) + pass + if interface == 'lan': + conn_string = 'TCPIP0::{}::1394::SOCKET'.format(path) + # 6221 is apparantly not VXI compatible: no ::INSTR connection + self.instr = rm.open_resource(conn_string) + self.instr.read_termination = '\n' + self.instr.write_termination = '\n' self.latest_error = [] - # Set error format to binary: - self.scpi_comm('FORMAT:SREGISTER BINARY') - self.clear_error_queue() + # Set error format to binary and clear queue + self.instr.write('FORMAT:SREGISTER BINARY') + self.instr.query('*ESR?') + self.instr.write('*cls') def _check_register(self, command: str, msg=None): magic_messages = { @@ -45,7 +41,7 @@ def _check_register(self, command: str, msg=None): if msg is None: msg = command register_ok = True - status_string = self.scpi_comm(command) + status_string = self.instr.query(command) # Reverse string to count from 0 to N bit_string = status_string[::-1] for i in range(0, len(bit_string)): @@ -62,8 +58,12 @@ def read_error_queue(self): error_list = {} error = 1 while error > 0: - next_error = self.scpi_comm('STATUS:QUEUE:NEXT?') - error_raw, msg = next_error.split(',') + next_error = self.instr.query('STATUS:QUEUE:NEXT?') + try: + error_raw, msg = next_error.split(',') + except ValueError: + print('Error! ', next_error) + error_raw = 0 error = int(error_raw) if error > 0: if error in error_list: @@ -77,7 +77,7 @@ def read_status(self): registers = { '*ESR?': 'Standard Event Status', # 'STATUS:MEASUREMENT:CONDITION?': 'Measurement Condition Register', - '*STB?': 'Status Byte Register', + # '*STB?': 'Status Byte Register', 'STATUS:MEASUREMENT:EVENT?': 'Measurement Event Register', 'STATUS:QUESTIONABLE:EVENT?': 'Questionable Event Register', } @@ -100,14 +100,23 @@ def read_status(self): return status_ok def output_state(self, output_state: bool = None): - """ Turn the output on or off """ + """Turn the output on or off""" if output_state is not None: if output_state: - self.scpi_comm('OUTPUT ON') + self.instr.write('OUTPUT ON') else: - self.scpi_comm('OUTPUT OFF') - actual_state_raw = self.scpi_comm('OUTPUT?') - actual_state = int(actual_state_raw[0]) == 1 + self.instr.write('OUTPUT OFF') + + error = 0 + while -1 < error < 10: + try: + actual_state_raw = self.instr.query('OUTPUT?') + actual_state = int(actual_state_raw[0]) == 1 + error = -1 + except ValueError: + print('output state error ', repr(actual_state_raw)) + error = error + 1 + # This will fail if error exceeds 10, let's see if it happens return actual_state def set_current_range(self, current_range: float = None): @@ -116,58 +125,353 @@ def set_current_range(self, current_range: float = None): Currently we set both DC and AC range at the same time """ if current_range is not None: - self.scpi_comm('CURRENT:RANGE {}'.format(current_range)) - self.scpi_comm('SOURCE:WAVE:RANGING FIXED') - actual_range_raw = self.scpi_comm('CURRENT:RANGE?') - print(actual_range_raw) + self.instr.write('CURRENT:RANGE {}'.format(current_range)) + self.instr.write('SOURCE:WAVE:RANGING FIXED') + actual_range_raw = self.instr.query('CURRENT:RANGE?') actual_range = float(actual_range_raw) return actual_range def set_voltage_limit(self, voltage: float = None): - """ Set the desired voltate limit """ + """Set the desired voltate limit""" if voltage is not None: - self.scpi_comm('CURRENT:COMPLIANCE {:.9f}'.format(voltage)) - actual = self.scpi_comm('CURRENT:COMPLIANCE?') + self.instr.write('CURRENT:COMPLIANCE {:.9f}'.format(voltage)) + actual = self.instr.query('CURRENT:COMPLIANCE?') return actual def set_current(self, current: float): - """ Set the DC current, when not performing a waveform """ - self.scpi_comm('CURRENT {:.9f}'.format(current)) + """Set the DC current, when not performing a waveform""" + self.instr.write('CURRENT {:.12f}'.format(current)) return True def set_wave_amplitude(self, amplitude: float): cmd = 'SOUR:WAVE:AMPL {:.9f}'.format(amplitude) - self.scpi_comm(cmd) + self.instr.write(cmd) return True def source_sine_wave(self, frequency, amplitude): - self.scpi_comm('SOUR:WAVE:FUNC SIN') - self.scpi_comm('SOUR:WAVE:FREQ {}'.format(frequency)) - self.scpi_comm('SOUR:WAVE:AMPL {}'.format(1e-11)) - self.scpi_comm('SOUR:WAVE:OFFS 0') # Offset - self.scpi_comm('SOUR:WAVE:PMAR:STAT ON') - self.scpi_comm('SOUR:WAVE:PMAR 269') - self.scpi_comm('SOUR:WAVE:PMAR:OLIN 1') - self.scpi_comm('SOUR:WAVE:DUR:TIME INF') + self.instr.write('SOUR:WAVE:FUNC SIN') + self.instr.write('SOUR:WAVE:FREQ {}'.format(frequency)) + # self.scpi_comm('SOUR:WAVE:AMPL {}'.format(1e-11)) + self.instr.write('SOUR:WAVE:AMPL {}'.format(amplitude)) + self.instr.write('SOUR:WAVE:OFFS 0') # Offset + self.instr.write('SOUR:WAVE:PMAR:STAT ON') + self.instr.write('SOUR:WAVE:PMAR 0') + self.instr.write('SOUR:WAVE:PMAR:OLIN 6') + self.instr.write('SOUR:WAVE:DUR:TIME INF') # self.scpi_comm('SOUR:WAVE:RANG BEST') - self.scpi_comm('SOUR:WAVE:ARM') - self.scpi_comm('SOUR:WAVE:AMPL {}'.format(amplitude)) - self.scpi_comm('SOUR:WAVE:INIT') + self.instr.write('SOUR:WAVE:ARM') + self.instr.write('SOUR:WAVE:AMPL {}'.format(amplitude)) + self.instr.write('SOUR:WAVE:INIT') + + # TODO - CAN WE PROGRAMMATICALLY CHOOSE BETWEEN THESE + # FOR ONE COMMON FUNCTION? + def stop_and_unarm_wave(self): + self.instr.write('SOURCE:WAVE:ABORT') + + def stop_and_unarm_sweep(self): + # This now has two names - same as end_delta_measurement!!!!!!! + self.instr.write('SOURCE:SWEEP:ABORT') + + def read_diff_conduct_line(self): + try: + t = time.time() + buf_actual = int(self.instr.query('TRACe:POINTs:ACTual?').strip()) + print(' K6220: Read buffer: {}'.format(time.time() - t)) + + except ValueError: + buf_actual = 0 + print('Buf: ', buf_actual) + if buf_actual > 0: + t = time.time() + data = self.instr.query('SENS:DATA:FRESH?').strip() + print(' K6220: Read data: {}'.format(time.time() - t)) + else: + print('No data') + return {} + row = {} + if len(data) < 5: + return {} + + fields = data.split(',') + try: + row = { + 'reading': float(fields[0][:-3]), + 'time': float(fields[1][:-4]), + 'current': float(fields[2][:-3]), + 'avoltage': float(fields[3][:-3]), + 'compliance_ok': fields[4][0] == 'F', + 'reading_nr': int(fields[5][1:6]), + } + # print(row) + except ValueError: + # Could we do a fancy split based on the units even if errors exists in the + # field delimiters? + # +1.01194701E-05VDC+1.01928472E-05VDC,+1.471E+00SECS,+1.0000E-06ADC,+2.03093674E-04VDC,FCMPL,+22698RDNG# + print('Error in row: {}'.format(data)) + return row + + def perform_differential_conductance_measurement( + self, start, stop, steps, delta, v_limit=1.5, nplc=5 + ): + step_size = (stop - start) / steps + # print('Number of steps: {}'.format(steps)) + print('K6220 driver: steps size: {}'.format(step_size)) + + # 1) Configure 2182a? + # print('2182a present, ', self.scpi_comm('SOURCE:DELTA:NVPResent?')) + # self.reset_device() # Is this needed? + # time.sleep(1) + msg = 'SYST:COMM:SER:SEND "VOLT:NPLC {}"'.format(nplc) + print(msg) + self.instr.write(msg) + + self.set_voltage_limit(v_limit) + time.sleep(1) + self.instr.write('FORMat:ELEMENTS ALL') + time.sleep(0.5) + + self.instr.write('SOUR:DCON:STARt {}'.format(start)) + self.instr.write('SOUR:DCON:STEP {}'.format(step_size)) + self.instr.write('SOUR:DCON:STOP {}'.format(stop)) + self.instr.write('SOUR:DCON:DELTa {}'.format(delta)) + + self.instr.write('SOUR:DCON:DELay 5e-3') + self.instr.write('SOUR:DCON:CAB ON') # Enables Compliance Abort + self.instr.write('TRAC:POIN {}'.format(steps)) + time.sleep(0.2) + print('Prepare to arm') + self.instr.write('SOUR:DCON:ARM') + time.sleep(1) + print('Init') + self.instr.write('INIT:IMM', expect_return=True) + time.sleep(3) + return True + + def _2182a_comm(self, cmd=None): + if cmd is None: + cmd_6220 = 'SYST:COMM:SER:ENT?' + expect_return = True + else: + cmd_6220 = 'SYST:COMM:SER:SEND "{}"'.format(cmd) + print(cmd_6220) + expect_return = False + + # # TODO: REFACTOR THESE IF'S!!!!!!!!!!! + # i = 0 + # if expect_return: + # reply = self.instr.query(cmd_6220, delay=0.4).strip() + # print('keithley_6220.pyL262: ', repr(reply)) + # else: + # self.instr.write(cmd_6220) + # reply = '' + # if expect_return: + # while len(reply) < 2: + # if i > 10: + # print('Trying to get return ', i) + # reply = self.instr.query(cmd_6220, delay=0.4).strip() + # print('keithley_6220.pyL262: ', repr(reply)) + # i = i + 1 + # if i > 50: + # print('Reply apparantly never arrives?!?!?') + # break + + i = 0 + reply = '' + if not expect_return: + self.instr.write(cmd_6220) + else: + while len(reply) < 2: + if i > 10: + print('Trying to get return ', i) + reply = self.instr.query(cmd_6220, delay=0.2) + print(i, 'keithley_6220.pyL262: ', repr(reply)) + reply = reply.strip() + i = i + 1 + if i > 50: + print('Reply apparantly never arrives?!?!?') + break + + if len(reply) > 0: + while ord(reply[0]) == 19: + reply = reply[1:] + if len(reply) == 0: + break + return reply + + def read_2182a_channel_2(self, probe_current): + print(probe_current) + self.set_current(probe_current) + self.set_voltage_limit(1) + self.output_state(True) + time.sleep(0.2) + + self._2182a_comm(':CONF:VOLT') + self._2182a_comm(':SENSE:CHANNEL 2') + self._2182a_comm(':SAMP:COUNT 1') + self._2182a_comm(':READ?') + value_raw = self._2182a_comm() + self._2182a_comm(':READ?') + value_raw = self._2182a_comm() + # print(value_raw) + + # while ord(value_raw[0]) == 19: + # value_raw = value_raw[1:] + print('VALUE RAW!!', value_raw) + value = float(value_raw) + time.sleep(0.5) + # status = self.read_status() + # print('Status (check for compliance): ', status) + self.output_state(False) + return value + + def prepare_delta_measurement(self, probe_current, v_limit=1.0): + # Set range on nanovoltmeter' + self.instr.write('SYST:COMM:SER:SEND "VOLT:RANG 0.1"') + # self.instr.write('SYST:COMM:SER:SEND "VOLT:RANG {}"'.format(v_limit)) - def stop_and_unarm(self): - self.scpi_comm('SOUR:WAVE:ABOR') + # self.scpi_comm('SYST:COMM:SER:SEND ":SENSE:VOLT:CHANNEL1:RANGE:AUTO ON"') + time.sleep(1) + # self.scpi_comm('SYST:COMM:SER:SEND "VOLT:RANG?"') + # print('2181a range: ', self.scpi_comm('SYST:COMM:SER:ENT?')) + + self.instr.write('SYST:COMM:SER:SEND "VOLT:NPLC 10"') + # print('2181a NPLC: ', self.scpi_comm('SYST:COMM:SER:ENT?')) + + # self.scpi_comm('SYST:COMM:SER:SEND "VOLT:NPLC?"') + # print('2181a NPLC: ', self.scpi_comm('SYST:COMM:SER:ENT?')) + + # self.reset_device() + # TODO!!! SET RANGE ON 6221!!! + + time.sleep(3) + self.instr.write('FORMat:ELEMents READING, TSTAMP') + self.set_voltage_limit(v_limit) + # self.scpi_comm(':FORMAT:SREG BIN') + # print('High', self.scpi_comm('SOURCE:DELTA:HIGH 2e-6')) + # print('SOURCE:DELTA:HIGH {}'.format(probe_current)) + self.instr.write('SOURCE:DELTA:HIGH {}'.format(probe_current)) + self.instr.write('SOURCE:DELTA:DELAY 100e-3') # Apparantly strictly needed... + + # self.scpi_comm('SOURCE:DELTA:CAB ON') # Abort on compliance + time.sleep(3.0) + print('SOURCE:DELTA:ARM') + self.instr.write('SOURCE:DELTA:ARM') + time.sleep(3.0) + print('INIT:IMM') + self.instr.write('INIT:IMM') + + def end_delta_measurement(self): + self.instr.write('SOUR:SWE:ABOR') + + def read_delta_measurement(self): + reply = self.instr.query('SENS:DATA:FRESH?').strip() + print('delta reply', reply) + value_raw, dt_raw = reply.split(',') + value = float(value_raw) + dt = float(dt_raw) + return value, dt + + # def read_differential_conductance_measurement(self): + # reply = self.scpi_comm('SENS:DATA:FRESH?').strip() + # fields = reply.split(',') + # row = { + # 'reading': float(fields[row_nr + 0][:-3]), + # 'time': float(fields[row_nr + 1][:-4]), + # 'current': float(fields[row_nr + 2][:-3]), + # 'avoltage': float(fields[row_nr + 3][:-3]), + # 'compliance_ok': fields[row_nr + 4][0] == 'F', + # 'reading_nr': int(fields[row_nr + 5][1:6]), + # } + # # print('delta reply', reply) + # # value_raw, dt_raw = reply.split(',') + # # value = float(value_raw) + # # dt = float(dt_raw) + # # return value, dt + # return row if __name__ == '__main__': - PORT = '/dev/ttyUSB0' - SOURCE = Keithley6220(interface='serial', device=PORT) + SOURCE = Keithley6220(interface='lan', path='192.168.0.3') + for _ in range(0, 5): + print(SOURCE.instr.query('*IDN?')) - print(SOURCE.read_software_version()) + print() + + SOURCE._2182a_comm(':STATus:QUEue:NEXT?') + v_xx_error_queue = SOURCE._2182a_comm() + SOURCE._2182a_comm(':DATA:FRESh?') + value_raw = SOURCE._2182a_comm() + print('Value from 6221: ', value_raw) exit() - SOURCE.stop_and_unarm() - SOURCE.set_current_range(1e-4) - time.sleep(2) - print(SOURCE.read_status()) - SOURCE.source_sine_wave(1298, 1e-5) - SOURCE.set_wave_amplitude(2e-5) + SOURCE.source_sine_wave(741, 1.2e-8) + time.sleep(30) + exit() + + current = 5e-10 + + SOURCE.set_current_range(current) + SOURCE.set_current(current) + SOURCE.output_state(True) + + exit() + + # SOURCE.scpi_comm('SYST:COMM:SER:SEND "VOLT:NPLC 10"') + # SOURCE.scpi_comm('SYST:COMM:SER:SEND "VOLT:NPLC?"') + # print('2181a NPLC: ', SOURCE.scpi_comm('SYST:COMM:SER:ENT?')) + + SOURCE.prepare_delta_measurement(1e-5) + # SOURCE.end_delta_measurement() + # exit() + t = time.time() + rows = [] + SOURCE.perform_differential_conductance_measurement( + start=1e-6, stop=2e-5, step=2e-7, delta=1.0e-7 + ) + print('End peform start') + for i in range(0, 250): + data = SOURCE.scpi_comm('SENS:DATA:FRESH?').strip() + if len(data) > 2: + fields = data.split(',') + try: + row = { + 'reading': float(fields[0][:-3]), + 'time': float(fields[1][:-4]), + 'current': float(fields[2][:-3]), + 'avoltage': float(fields[3][:-3]), + 'compliance_ok': fields[4][0] == 'F', + 'reading_nr': int(fields[5][1:6]), + } + print(row) + rows.append(row) + except ValueError: + print('Error in row: {}'.format(data)) + if (i > 15) and (len(data) < 5): + break + print('End measurement') + nvz = SOURCE.scpi_comm('SOUR:DCON:NVZ?').strip() + print('NVZero', nvz) + + SOURCE.end_delta_measurement() + print('Number of lines: {}'.format(len(rows))) + print('First and last line:') + print(rows[0]) + print(rows[-1]) + + # print(SOURCE.read_2182a_channel_2(1e-5)) + + # SOURCE.scpi_comm('SYST:COMM:SER:SEND "VOLT:NPLC 10"') + # SOURCE.end_delta_measurement() + exit() + + print('Read software version:') + print(SOURCE.read_software_version()) + print() + # condition = SOURCE.scpi_comm('STATUS:MEASurement:CONDition?')[::-1] + # print(condition) + # exit() + + # for i in range(0, 2): + # print(SOURCE.perform_delta_measurement()) + + # SOURCE._end_delta_measurement() diff --git a/PyExpLabSys/drivers/keithley_6517b.py b/PyExpLabSys/drivers/keithley_6517b.py new file mode 100644 index 00000000..680661ff --- /dev/null +++ b/PyExpLabSys/drivers/keithley_6517b.py @@ -0,0 +1,113 @@ +""" +Simple driver for Keithley 6517B electrometer +""" + +import pyvisa + +# Notice: The code is NOT abstract towards this list... changing it will +# result in values being intermixed +STANDARD_READ_PARAMS = [ + 'READING', + 'SOURCE', + 'RELATIVE', + 'SOURSTATUS', + 'STATUS', + 'SOURUNIT', + 'UNIT', +] + + +class Keithley6517B(): + """Simple driver for Keithley 6517B electrometer""" + + def __init__(self, device, baudrate=19200): + rm = pyvisa.ResourceManager('@py') + conn_string = 'ASRL{}::INSTR'.format(device) + self.instr = rm.open_resource(conn_string) + self.instr.set_visa_attribute( + pyvisa.constants.VI_ATTR_ASRL_FLOW_CNTRL, + pyvisa.constants.VI_ASRL_FLOW_XON_XOFF, + ) + self.instr.read_termination = '\r' + self.instr.write_termination = '\r' + self.instr.baud_rate = baudrate + + def output_state(self, output_state: bool = None): + """Turn the output on or off""" + if output_state is not None: + if output_state: + self.instr.write('OUTPUT1 ON') + else: + self.instr.write('OUTPUT1 OFF') + + actual_state_raw = self.instr.query('OUTPUT1?') + actual_state = int(actual_state_raw[0]) == 1 + return actual_state + + def set_voltage(self, voltage=None): + """ Set the desired voltage """ + if voltage is not None: + cmd = ':SOURce:VOLTage:LEVel:IMMediate:AMPLitude {:.9f}'.format(voltage) + self.instr.write(cmd) + + cmd = ':SOURce:VOLTage:LEVel:IMMediate:AMPLitude?' + value_raw = self.instr.query(cmd) + value = float(value_raw) + return value + + def _parse_reading(self, raw_reading): + exp_pos = raw_reading.find('E+') + unit_end = raw_reading.find(',') + time_end = raw_reading.find(',', unit_end + 1) + reading_end = raw_reading.find('RDNG#') + # Status: + # N: Normal, Z: Zero check enabled, O: Overflow, # U: Underflow + # R: Reference (relative offset), L: Out of limit + reading = { + 'value': float(raw_reading[0:exp_pos + 4]), + 'status': raw_reading[exp_pos + 4:exp_pos + 5], + 'unit': raw_reading[exp_pos + 5:unit_end], + 'reading_nr': int(raw_reading[time_end + 2:reading_end]), + } + return reading + + def read_voltage(self): + """Read the measured voltage""" + value_raw = self.instr.query('MEASURE:VOLTAGE?') + value = self._parse_reading(value_raw)['value'] + return value + + def read_current(self): + """Read the measured current""" + value_raw = self.instr.query('MEASURE:CURRENT?') + value = self._parse_reading(value_raw)['value'] + return raw + + def read_again(self): + value_raw = self.instr.query(':SENSe:DATA:FRESh?') + value = self._parse_reading(value_raw)['value'] + return value + +if __name__ == '__main__': + import time + + EM = Keithley6517B(device='/dev/ttyUSB1') + + print(EM.instr.query('*IDN?')) + + print('Output state: ', EM.output_state(True)) + print('Output voltage: ', EM.set_voltage(0.0)) + + print() + + print(EM.read_voltage()) + exit() + print(EM.read_again()) + + print(EM.read_current()) + print(EM.read_again()) + + # for _ in range(0, 10): + # EM.instr.write('*TRG') + # time.sleep(0.2) + # print(EM.instr.query('FETCH?')) diff --git a/PyExpLabSys/drivers/oxford_mercury.py b/PyExpLabSys/drivers/oxford_mercury.py new file mode 100644 index 00000000..59d05279 --- /dev/null +++ b/PyExpLabSys/drivers/oxford_mercury.py @@ -0,0 +1,308 @@ +""" +Driver for Oxford Mercury controllers +To a large extend this is implemented agains a specic device, and will +take some amount of work to generalize. Unfortunately I have access to +only a single cryostat. +""" + +import time +import pyvisa + + +class OxfordMercury(): + def __init__(self, hostname: str) -> None: + conn_string = 'TCPIP::{}::7020::SOCKET'.format(hostname) + rm = pyvisa.ResourceManager('@py') + print(conn_string) + self.instr = rm.open_resource(conn_string) + self.instr.read_termination = '\n' + self.instr.write_termination = '\n' + # encoding='latin-1', + self.switch_heater_turn_on_time = 0 + + def _comm(self, cmd): + error = 0 + while error > -1: + try: + raw_reply = self.instr.query(cmd) + error = -1 + except ValueError: + error += 1 + time.sleep(0.1) + if error > 3: + print('Oxford read error. Try again') + raw_reply = raw_reply.strip() + return raw_reply + + def _read_value( + self, uid: str, meas_type: str, keyword: str = None + ) -> (float, str): + if keyword is None: + keyword = meas_type + fields = [] + cmd = 'READ:DEV:{}:{}:SIG:{}?'.format(uid, meas_type, keyword) + + error = 0 + while len(fields) < 2: + error = error + 1 + raw_reply = self.instr.query(cmd) + if error > 1: + print('Errror!') + print('Command:', cmd) + print('Error is: ', error) + print('Reply', raw_reply) + time.sleep(0.2) + fields = raw_reply.split(':') + + # Value is in last field + value_raw = fields[-1] + for i in range(1, len(value_raw)): + try: + float(value_raw[0 : i + 1]) + except ValueError: + break + + value = float(value_raw[0:i]) + unit = value_raw[i:] + return value, unit + + def read_configuration(self) -> str: + cmd = 'READ:SYS:CAT?' + uids = self.instr.query(cmd) + return uids + + def read_raw(self, uid: str, meas_type: str) -> str: + """ + Return all available information about a sensor as a string + """ + cmd = 'READ:DEV:{}:{}?'.format(uid, meas_type) + return self.instr.query(cmd) + + def read_temperature(self, uid: str) -> (float, str): + value = self._read_value(uid, 'TEMP') + return value + + def read_temperature_details(self, uid: str) -> dict: + """ + Notice: if uid is really a magnet, these details will + not be meaningfull, since the meta-data will be for + magnet power, not for the temperature sensor + """ + data = { + 'temperature': self._read_value(uid, 'TEMP'), + 'voltage': self._read_value(uid, 'TEMP', 'VOLT'), + 'current': self._read_value(uid, 'TEMP', 'CURR'), + 'power': self._read_value(uid, 'TEMP', 'POWR'), + } + return data + + def read_pressure(self, uid: str) -> (float, str): + value, unit = self._read_value(uid, 'PRES') + return value, unit + + def read_heater(self, uid) -> (float, str): + value, unit = self._read_value(uid, 'HTR', 'POWR') + return value, unit + + def read_magnetic_field(self, uid) -> (float, str): + value, unit = self._read_value(uid, 'PSU', 'FLD') + return value, unit + + def read_magnet_details(self, uid) -> dict: + data = { + 'voltage': self._read_value(uid, 'PSU', 'VOLT'), + 'current': self._read_value(uid, 'PSU', 'CURR'), + 'persistent_current': self._read_value(uid, 'PSU', 'PCUR') * 2, + } + return data + + def switch_heater_state(self, uid, activated=None): + if activated is not None: + if activated: + cmd = 'READ:DEV:{}:PSU:SIG:SWHT:ON'.format(uid) + self._comm(cmd) + self.switch_heater_turn_on_time = time.time() + else: + cmd = 'READ:DEV:{}:PSU:SIG:SWHT:OFF'.format(uid) + self._comm(cmd) + self.switch_heater_turn_on_time = 0 + time.sleep(1) + + cmd = 'READ:DEV:{}:PSU:SIG:SWHT?'.format(uid) + raw_reply = self._comm(cmd) + print(raw_reply) + state_raw = raw_reply.split('SWHT') + heater_on = state_raw[-1] == ':ON' + # If heater was turned on and the software did not + # notice, at least notice now. + if heater_on: + if self.switch_heater_turn_on_time == 0: + switch_heater_turn_on_time = time.time() + return heater_on + + def temperature_setpoint(self, uid: str, setpoint: float = None, rate: float = None) -> float: + if setpoint is None: + # This code is almost identical to _read_value().... + cmd = 'READ:DEV:{}:TEMP:LOOP:TSET?'.format(uid) + raw_reply = self.instr.query(cmd) + fields = raw_reply.split(':') + value_raw = fields[-1] + for i in range(1, len(value_raw)): + try: + actual_setpoint = float(value_raw[0 : i + 1]) + except ValueError: + break + return actual_setpoint + + # If we are here, we are setting a setpoint + if setpoint > 300: + setpoint = 5 + + if rate is None: + rate = 0.5 + + cmd = 'SET:DEV:{}:TEMP:LOOP:RSET:{}K/m'.format(uid, rate) + raw_reply = self.instr.query(cmd) + print(cmd) + print(raw_reply) + + # Convntion: A setpoint of 0 turns off heating entirely + if setpoint <= 0: + print('Turning off heater') + cmd = 'SET:DEV:{}:TEMP:LOOP:ENAB:OFF'.format(uid) + raw_reply = self.instr.query(cmd) + print(raw_reply) + # Todo check that raw_reply contains VALID + else: + print('Set setpoint to {}K'.format(setpoint)) + cmd = 'SET:DEV:{}:TEMP:LOOP:TSET:{}K'.format(uid, setpoint) + raw_reply = self.instr.query(cmd) + print(raw_reply) + cmd = 'SET:DEV:{}:TEMP:LOOP:ENAB:ON'.format(uid) + raw_reply = self.instr.query(cmd) + print(raw_reply) + + return setpoint + + def b_field_setpoint(self, uid: str, setpoint: float = None, rate: float = None) -> (float, str): + if setpoint is None: + # This code is almost identical to _read_value().... + cmd = 'READ:DEV:{}:PSU:SIG:FSET?'.format(uid) + raw_reply = self.instr.query(cmd) + fields = raw_reply.split(':') + value_raw = fields[-1] + for i in range(1, len(value_raw)): + try: + actual_setpoint = float(value_raw[0 : i + 1]) + except ValueError: + break + return actual_setpoint + + if rate is None: + rate = 0.1 + if rate < 0.01: + print('Rate too low, using 0.01T/min') + rate = 0.01 + if rate > 0.3: + print('Rate too high, using 0.3T/min') + rate = 0.3 + cmd = 'SET:DEV:{}:PSU:SIG:RFST:{}T/m'.format(uid, rate) + print(cmd) + raw_reply = self.instr.query(cmd) + print(raw_reply) + + # If we are here, we are setting a setpoint + if abs(setpoint) > 12: + # Temporary safety precaution + setpoint = 0 + + if setpoint == 0: + print('Turning off magnets') + cmd = 'SET:DEV:{}:PSU:ACTN:RTOZ'.format(uid) + raw_reply = self.instr.query(cmd) + print(raw_reply) + # Todo check that raw_reply contains VALID + else: + # Hold the ramp currently being performed + cmd = 'SET:DEV:{}:PSU:ACTN:HOLD'.format(uid) + raw_reply = self.instr.query(cmd) + print('Set setpoint to {}T'.format(setpoint)) + # Set new setpoint + cmd = 'SET:DEV:{}:PSU:SIG:FSET:{}T'.format(uid, setpoint) + raw_reply = self.instr.query(cmd) + print(raw_reply) + # Start the ramp + cmd = 'SET:DEV:{}:PSU:ACTN:RTOS'.format(uid) + raw_reply = self.instr.query(cmd) + print(raw_reply) + return setpoint + + +if __name__ == '__main__': + itc = OxfordMercury(hostname='192.168.0.20') + print(itc.read_heater('DB1.H1')) + exit() + ips = OxfordMercury(hostname='192.168.0.21') + + print(ips.read_software_version()) + print(itc.read_software_version()) + + print(itc.read_heater('DB1.H1')) + + print() + # VTI_TEMP_DB6.H1 + # uid = 'DB1.H1' + + uid = 'DB1.H1' + cmd = 'READ:DEV:{}:HTR:PMAX'.format(uid) + # cmd = 'READ:DEV:{}:HTR:VLIM'.format(uid) + # cmd = 'READ:DEV:{}:HTR:SIG:POWR?'.format(uid) + # STAT:DEV:DB1.H1:HTR:SIG:POWR:0.0000W + cmd = 'READ:DEV:{}:HTR:SIG:POWR?'.format(uid) + # STAT:DEV:DB1.H1:HTR:SIG:POWR:18.4934W + + ###cmd = 'SET:DEV:{}:PSU:SIG:FSET:{}T'.format(uid, setpoint) + # cmd = 'SET:DEV:{}:HTR:SIG:FSET:0.1W'.format(uid) + print(cmd) + raw_reply = itc.scpi_comm(cmd, expect_return=True) + print(raw_reply) + + exit() + + # print(mitc.b_field_setpoint(3)) + + # cmd = 'SET:DEV:PSU.M1:PSU:SIG:RCST:2.5A/m' + # ips.scpi_comm(cmd) + cmd = 'READ:DEV:PSU.M1:PSU:SIG:RCST?' # Current ramp rate + raw_reply = ips.scpi_comm(cmd, expect_return=True) + print(raw_reply) + + cmd = 'SET:DEV:GRPZ:PSU:SIG:RFST:0.25T/m' + ips.scpi_comm(cmd) + cmd = 'READ:DEV:GRPZ:PSU:SIG:RFST?' + raw_reply = ips.scpi_comm(cmd, expect_return=True) + print(raw_reply) + + # print(mitc.read_temperature('MB1.T1')) # Sample temperature + # print(mitc.read_temperature('DB6.T1')) # VTI temperature + # print(mitc.read_temperature('DB7.T1')) # Magnet temperature + # print(mitc.read_pressure('DB8.P1')) # VTI pressure + # print(mitc.read_heater('MB0.H1')) # Sample heater + # print(mitc.read_heater('DB1.H1')) # VTI heater + + # print(mitc.sample_temperature('MB1.T1')) + # mips = OxfordMercury('/dev/ttyACM1') + # print(mips.read_magnetic_field('GRPZ')) + + # print(mips.read_software_version()) + # # print(mips.read_configuration()) + + # print(mips.switch_heater_state('GRPZ')) + # # print(mips.set_field_setpoint('GRPZ', 0.5)) + # print(mips.b_field_setpoint('GRPZ')) + # print(mips.b_field_setpoint('GRPZ')) + # print(mips.b_field_setpoint('PSU.M1')) + + # # print(mips.read_temperature('PSU.M1')) + + pass diff --git a/PyExpLabSys/drivers/scpi.py b/PyExpLabSys/drivers/scpi.py index 965cd08e..4d9cd733 100644 --- a/PyExpLabSys/drivers/scpi.py +++ b/PyExpLabSys/drivers/scpi.py @@ -20,7 +20,7 @@ class SCPI(object): - """ Driver for scpi communication """ + """Driver for scpi communication""" def __init__( self, @@ -32,16 +32,18 @@ def __init__( visa_string='', gpib_address=None, line_ending='\r', + encoding='ascii', ): self.device = device self.line_ending = line_ending self.interface = interface + self.encoding = encoding if self.interface == 'file': self.comm_dev = open(self.device, 'w') self.comm_dev.close() if self.interface == 'serial': self.comm_dev = serial.Serial( - self.device, baudrate, timeout=2, xonxoff=True + self.device, baudrate, timeout=3, xonxoff=True ) if self.interface == 'lan': self.comm_dev = telnetlib.Telnet(hostname, tcp_port) @@ -55,41 +57,63 @@ def __init__( self.comm_dev = Gpib.Gpib(0, pad=gpib_address) def scpi_comm(self, command, expect_return=False): - """ Implements actual communication with SCPI instrument """ + """Implements actual communication with SCPI instrument""" return_string = "" if self.interface == 'file': - self.comm_dev = open(self.device, 'w') - self.comm_dev.write(command) - time.sleep(0.02) + self.comm_dev = open(self.device, 'wb') + self.comm_dev.write(bytes(command + self.line_ending, self.encoding)) + self.comm_dev.flush() self.comm_dev.close() - time.sleep(0.05) - if '?' in command: - self.comm_dev = open(self.device, 'r') - return_string = self.comm_dev.readline() + if ('?' in command) or (expect_return is True): + self.comm_dev = open(self.device, 'rb') + time.sleep(0.002) + t = time.time() + while len(return_string) == 0: + dt = time.time() - t + if dt > 1: + raise ValueError + return_string = self.comm_dev.read(1) + return_string += self.comm_dev.readline() + return_string = return_string.decode(self.encoding) self.comm_dev.close() - command_text = command + self.line_ending + command_text = command + self.line_ending if self.interface == 'serial': self.comm_dev.write(command_text.encode('ascii')) if command.endswith('?') or (expect_return is True): - return_string = ''.encode('ascii') + return_string = ''.encode(self.encoding) while True: next_char = self.comm_dev.read(1) + # print(next_char) if ord(next_char) == ord(self.line_ending): break return_string += next_char - return_string = return_string.decode() + return_string = return_string.decode(self.encoding) if self.interface == 'lan': lan_time = time.time() - self.comm_dev.write(command_text.encode('ascii')) - if (command.find('?') > -1) or (expect_return is True): - return_string = self.comm_dev.read_until( - chr(10).encode('ascii'), 2 - ).decode() + command_text = command + '\n' + # print('Command text ', repr(command_text)) + self.comm_dev.write(command_text.encode(self.encoding)) + if command.endswith('?') or (expect_return is True): + raw = self.comm_dev.expect([b'\n'], 2) + return_string = raw[2].decode().strip() + + # ENT? return an extra newlinw from the inline reply + if command_text.find('ENT?'): + # extra_n = self.comm_dev.read_until(b'\n', 0.1).decode() + self.comm_dev.read_until(b'\n', 0.1).decode() + + LOGGER.info('Return string length: ' + str(len(return_string))) + LOGGER.info( + 'lan_time for command ' + + command_text.strip() + + ': ' + + str(time.time() - lan_time) + ) LOGGER.info('Return string length: ' + str(len(return_string))) LOGGER.info( - 'lan_time for coomand ' + 'lan_time for command ' + command_text.strip() + ': ' + str(time.time() - lan_time) @@ -104,28 +128,28 @@ def scpi_comm(self, command, expect_return=False): if self.interface == 'gpib': self.comm_dev.write(command_text) - if (command.find('?') > -1) or expect_return: + if command.endswith('?') or expect_return: return_string = self.comm_dev.read().strip().decode() return return_string def read_software_version(self): - """ Read version string from device """ + """Read version string from device""" version_string = self.scpi_comm("*IDN?") version_string = version_string.strip() return version_string def reset_device(self): - """ Rest device """ + """Rest device""" self.scpi_comm("*RST") return True def device_clear(self): - """ Stop current operation """ + """Stop current operation""" self.scpi_comm("*abort") return True def clear_error_queue(self): - """ Clear error queue """ + """Clear error queue""" error = self.scpi_comm("*ESR?") self.scpi_comm("*cls") return error