# Copyright (c) 2023 JARA Institute for Quantum Information
#
# This file is part of QuMADA.
#
# QuMADA is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# QuMADA is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# QuMADA. If not, see <https://www.gnu.org/licenses/>.
#
# Contributors:
# - Daniel Grothe
# - Till Huckemann
from __future__ import annotations
import numpy as np
from jsonschema import validate
from pyvisa import VisaIOError
from qcodes.instrument_drivers.stanford_research.SR830 import SR830
from qcodes.parameters import Parameter
from qumada.instrument.buffers.buffer import Buffer, BufferException
[docs]class SR830Buffer(Buffer):
"""Buffer for Stanford SR830"""
ch1_names = ["X", "R", "X Noise", "aux_in1", "aux_in2"]
ch2_names = ["Y", "Phase", "Y Noise", "aux_in3", "aux_in4"]
AVAILABLE_TRIGGERS: list[str] = ["external", "trig_in_1"]
def __init__(self, device: SR830):
self._device = device
self._trigger: str | None = None
self._subscribed_parameters: set[Parameter] = set()
self._num_points: int | None = None
[docs] def setup_buffer(self, settings: dict) -> None:
"""Sets instrument related settings for the buffer."""
# TODO: Validation for sampling rates (look up in manual) Comment: Is this required? Is handled by driver...
# TODO: Trigger und SR abgleichen
# TODO: Are there different trigger modes?
validate(settings, self.settings_schema)
self.settings: dict = settings
self._device.buffer_SR(settings.get("sampling_rate", 512))
self._device.buffer_trig_mode("OFF")
self._set_num_points()
self.delay_data_points = 0 # Datapoints to delete at the beginning of dataset due to delay.
self.delay = settings.get("delay", 0)
if self.delay < 0:
raise BufferException("The SR830'S Trigger Input does not support negative delays.")
else:
self.delay_data_points = int(self.delay * self._device.buffer_SR())
self.num_points = self.delay_data_points + self.num_points
# TODO: There has to be a more elegant way for the setter.
@property
def num_points(self) -> int | None:
return self._num_points
@num_points.setter
def num_points(self, num_points) -> None:
if num_points > 16383:
raise BufferException(
"SR830 is to small for this measurement. Please reduce the number of data points or the delay"
)
self._num_points = int(num_points)
def _set_num_points(self) -> None:
"""
Calculates number of datapoints and sets
the num_points accordingly.
Raises
------
Exception
Exception if number of points is overdefined.
Returns
-------
None
"""
if all(k in self.settings for k in ("sampling_rate", "burst_duration", "num_points")):
raise BufferException("You cannot define sampling_rate, burst_duration and num_points at the same time")
elif self.settings.get("num_points", False):
self.num_points = self.settings["num_points"]
if self.settings.get("sampling_rate", False):
self._device.buffer_SR(self.settings["sampling_rate"])
else:
self._device.buffer_SR(self.num_points / self.settings["burst_duration"])
elif all(k in self.settings for k in ("sampling_rate", "burst_duration")):
self.num_points = int(np.ceil(self.settings["sampling_rate"] * self.settings["burst_duration"]))
@property
def trigger(self) -> str | None:
return self._trigger
@trigger.setter
def trigger(self, trigger: str | None) -> None:
if trigger == "trig_in_1":
# TODO: standard value for Sample Rate
self._device.buffer_SR(512)
self._device.buffer_trig_mode("OFF")
elif trigger == "external":
self._device.buffer_SR("Trigger")
self._device.buffer_trig_mode("ON")
else:
raise BufferException(
"SR830 does not support setting custom trigger inputs. "
"Use 'external' and the input on the back of the unit."
)
self._trigger = trigger
[docs] def force_trigger(self) -> None:
self._device.buffer_start()
[docs] def read_raw(self) -> dict:
# TODO: Handle stopping buffer or not
data = {}
try:
for parameter in self._subscribed_parameters:
if parameter.name in self.ch1_names:
ch = "ch1"
elif parameter.name in self.ch2_names:
ch = "ch2"
# TODO: what structure has the data? do we get timestamps?
data[parameter.name] = self._device.__getattr__(f"{ch}_datatrace").get()
data[parameter.name] = data[parameter.name][self.delay_data_points : self.num_points]
except VisaIOError as ex:
raise BufferException("Could not read the buffer. Buffer has to be stopped before readout.") from ex
return data
[docs] def read(self) -> dict:
# TODO: Add timetrace if possible
return self.read_raw()
[docs] def subscribe(self, parameters: set | list[Parameter]) -> None:
for parameter in parameters:
name = parameter.name
if name in self.ch1_names:
self._device.ch1_display(name)
param_to_remove = {param for param in self._subscribed_parameters if param.name in self.ch1_names}
self._subscribed_parameters.difference_update(
param_to_remove
) # remove previously subscribed parameter from ch1
self._subscribed_parameters.add(parameter)
elif name in self.ch2_names:
self._device.ch2_display(name)
param_to_remove = {param for param in self._subscribed_parameters if param.name in self.ch2_names}
self._subscribed_parameters.difference_update(
param_to_remove
) # remove previously subscribed parameter from ch2
self._subscribed_parameters.add(parameter)
else:
raise BufferException(f"Parameter {parameter.name} can not be buffered.")
[docs] def unsubscribe(self, parameters: set | list[Parameter]) -> None:
for parameter in parameters.copy():
name = parameter.name
if name in ["X", "R", "X Noise", "aux_in1", "aux_in2"]:
self._subscribed_parameters.remove(parameter)
elif name in ["Y", "Phase", "Y Noise", "aux_in3", "aux_in4"]:
self._subscribed_parameters.remove(parameter)
else:
raise BufferException(f"Parameter {parameter.name} can not be buffered.")
[docs] def is_subscribed(self, parameter: Parameter) -> bool:
return parameter in self._subscribed_parameters
[docs] def start(self) -> None:
self._device.buffer_reset()
self._device.buffer_start()
[docs] def stop(self) -> None:
self._device.buffer_pause()
[docs] def is_ready(self) -> bool: ...
[docs] def is_finished(self) -> bool:
if self._device.buffer_npts() >= self.num_points:
self.stop()
return True
else:
return False