__author__ = 'Frederik Lauber'
class hp_infinium(object):
known_types = ("Type:", "Points:", "Count:", "XInc:", "XOrg:", "XRef:", "YData range:", "YData center:",
"Coupling:", "XRange:", "XOffset:", "YRange:", "YOffset:", "Date:", "Time:", "Frame:",
"Acq mode:", "Completion:", "X Units:", "Y Units:", "Max bandwidth:", "Min bandwidth:")
known_set = set(known_types)
# not checked yet, only from spec
# only number 3 was not tested, no idea how
coupling_spec = {"0": "AC",
# not checked yet, only from spec
acq_spec = {"0": "real time",
# not checked yet, only from spec
xy_units_spec = {"0": "unknown",
def from_gpig_name(cls, gpib_identifier_string, timeout):
re = visa.ResourceManager()
gpib_connection = re.get_instrument(gpib_identifier_string)
return cls(gpib_connection, timeout)
def __init__(self, gpib_connection, timeout):
self._gpib_connection = gpib_connection
# variable used by the .timeout setter/getter method
self.write = self._gpib_connection.write
self.ask = self._gpib_connection.ask
def timeout(self, new_timeout):
self._timeout = new_timeout
if not new_timeout is None:
self._gpib_connection.timeout = new_timeout
del self._gpib_connection.timeout
# deletes the data on the scope and arms it again
answer = self.ask("*TRG;*OPC?")
return True if answer == "1" else False
return self.ask("AER?").strip()
header = self.ask(":WAVeform:PREamble?").split(",")
# this are the headers I found in the original csv files
# if I did not have a value I used the one from the spec
# if you find a difference to the once produced by the oszi directly
tmp["Type:"] = self.type_spec[header[1]].strip()
tmp["Points:"] = header[2].strip()
tmp["Count:"] = header[3].strip()
tmp["XInc:"] = header[4].strip()
tmp["XOrg:"] = header[5].strip()
tmp["XRef:"] = header[6].strip()
tmp["YData range:"] = header[7].strip()
tmp["YData center:"] = header[8].strip()
# header[9] not used, spec says always 0
tmp["Coupling:"] = self.coupling_spec[header[10]].strip()
tmp["XRange:"] = header[11].strip()
tmp["XOffset:"] = header[12].strip()
tmp["YRange:"] = header[13].strip()
tmp["YOffset:"] = header[14].strip()
tmp["Date:"] = header[15].strip('"')
tmp["Time:"] = header[16].strip('"')
tmp["Frame:"] = header[17].strip('"')
tmp["Acq mode:"] = self.acq_spec[header[19]].strip()
tmp["Completion:"] = header[20].strip()
tmp["X Units:"] = self.xy_units_spec[header[21]].strip()
tmp["Y Units:"] = self.xy_units_spec[header[22]].strip()
tmp["Max bandwidth:"] = header[23].strip()
tmp["Min bandwidth:"] = header[24].strip()
# adding header so we always know which one
# was taken by the program
y = self.ask(":WAVeform:DATA?").split(",")
xorig = float(header["XOrg:"])
xinc = float(header["XInc:"])
x_data = tuple(xorig + i * xinc for i in range(len(y)))
y_data = tuple(float(i) for i in y)
def make_csv_like_string(self, preample, data):
preample_keys = set(preample.keys())
# second add unknown keys
# -> order will be the same as in the csv files generated by oszi
for key in self.known_types:
# Keyname+white space until length 15 and actual value
output.append("".join([key, (15 - len(key)) * " ", str(preample[key])]))
for key in preample_keys.difference(self.known_set):
output.append("".join([key, (15 - len(key)) * " ", str(preample[key])]))
# for x, y in zip(x_data, y_data):
for i in range(len(x_data)):
output.append("%E,%E" % (x_data[i], y_data[i]))
class Agilent_Waveform_Generator:
def __init__(self, gpib_connection, timeout):
self._gpib_connection = gpib_connection
# variable used by the .timeout setter/getter method
# add some shortcuts for gpib
self.write = self._gpib_connection.write
self.ask = self._gpib_connection.ask
def from_gpig_name(cls, gpib_identifier_string, timeout):
re = visa.ResourceManager()
gpib_connection = re.get_instrument(gpib_identifier_string)
return cls(gpib_connection, timeout)
def timeout(self, new_timeout):
self._timeout = new_timeout
if not new_timeout is None:
self._gpib_connection.timeout = new_timeout
del self._gpib_connection.timeout
def trigger_source(self):
return self.ask("TRIG:SOUR?").strip()
def trigger_source(self, value):
self.write("TRIG:SOUR " + str(value))
return self.ask("BURS:MODE?").strip()
def burst_mode(self, value):
self.write("BURS:MODE " + str(value))
return self.ask("FUNC?").strip()
def function(self, value):
# SIN, SQU, RAMP, PULS, NOIS, DC, USER
self.write("FUNC " + str(value))
return float(self.ask("FREQ?"))
def frequency(self, value):
# allowed: MIN, MAX, floating point number
self.write("FREQ " + str(value))
return float(self.ask("VOLT?"))
def voltage(self, value):
# allowed: MIN, MAX, floating point number
self.write("VOLT " + str(value))
return float(self.ask("VOLT:HIGH?"))
def voltage_high(self, value):
self.write("VOLT:HIGH " + str(value))
return float(self.ask("VOLT:LOW?"))
def voltage_low(self, value):
self.write("VOLT:LOW " + str(value))
return float(self.ask("PULS:WIDT?"))
def pulse_width(self, value):
# MIN, MAX, floating point in seconds
self.write("PULS:WIDT " + str(value))
return float(self.ask("PULS:PER?"))
def pulse_period(self, value):
# MIN, MAX, floating point in seconds
self.write("PULS:PER " + str(value))
def pulse_transition(self):
return float(self.ask("PULS:TRAN?"))
def pulse_transition(self, value):
# MIN, MAX, floating point in seconds
self.write("PULS:TRAN " + str(value))
return float(self.ask("BURS:NCYC?"))
def burst_count(self, value):
# MIN, MAX, floating point in seconds
self.write("BURS:NCYC " + str(value))
return float(self.ask("BURS:PHAS?"))
def burst_phase(self, value):
# MIN, MAX, floating point in seconds
self.write("BURS:PHAS " + str(value))
return False if self.ask("OUTP?").strip() == '0' else True
return float(self.ask("OUTP:LOAD?"))
def output_load(self, value):
# INF, MIN, MAX, flaoting point in Ohms
self.write("OUTP:LOAD " + str(value))
def output_trigger(self):
return False if self.ask("OUTP:TRIG?").strip() == '0' else True
def output_trigger(self, value):
self.write("OUTP:TRIG ON")
self.write("OUTP:TRIG OFF")
def output_trigger_slope(self):
return self.ask("OUTP:TRIG:SLOP?").strip()
@output_trigger_slope.setter
def output_trigger_slope(self, value):
self.write("OUTP:TRIG:SLOP " + str(value))
def enable_burst_mode(self):
self.write("BURS:STAT ON")
def clear_registers(self):
re = visa.ResourceManager()
device_list = re.list_resources()
instrument_list = tuple(map(re.get_instrument, device_list))
for instrument in instrument_list:
idn = instrument.ask("*IDN?")
#always set the waveform generator to the same settings
WFG.output_trigger = True
WFG.output_trigger_slope = "POS"
WFG.pulse_transition = 5E-09
WFG.trigger_source = "BUS"
if not WFG.function == "PULS" \
or not WFG.output_load == 50 \
or not WFG.output_trigger is True \
or not WFG.output_trigger_slope == "POS" \
or not WFG.voltage_high == 2.5 \
or not WFG.voltage_low == 0 \
or not WFG.pulse_width == 1E-07 \
or not WFG.burst_mode == "TRIG" \
or not WFG.trigger_source == "BUS" \
or not WFG.burst_count == 1:
print("Setting for WFG failed failed")
print(WFG.function == "PULS")
print(WFG.output_load == 50)
print(WFG.output_trigger is True)
print(WFG.output_trigger_slope)
print(WFG.voltage_high == 2.5)
print(WFG.voltage_low == 0)
print(WFG.pulse_width == 1E-07)
print(WFG.burst_mode == "TRIG")
print(WFG.trigger_source == "BUS")
print(WFG.burst_count == 1)
print("WFG setup complete")
instrument_list = list_all_devices()
OSZI = hp_infinium(instrument_list[0], 10)
WFG = Agilent_Waveform_Generator(instrument_list[1], 10)
#clear the the oscilloscope
preample = OSZI.preample()
with zipfile.ZipFile("test.zip", mode='w', compression=zipfile.ZIP_BZIP2) as file_zip:
for counter in range(0, 6666, 1):
while not int(OSZI.trigger_armed):
# wait for the arm register to be marked as armed
while not int(OSZI.ask("ADER?").strip()):
# make sure we really get new data
data = OSZI.data(preample)
csv_string = OSZI.make_csv_like_string(preample, data)
file_zip.writestr(str(counter).zfill(5) + ".csv", csv_string)
if __name__ == "__main__":