# Copyright (c) 2023 JARA Institute for Quantum Information
#
# This file is part of QuMADA.
#
# QuMADA is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# QuMADA is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# QuMADA. If not, see <https://www.gnu.org/licenses/>.
#
# Contributors:
# - Daniel Grothe
# - Till Huckeman
from __future__ import annotations
import logging
import numpy as np
from jsonschema import validate
from qcodes.parameters import Parameter
from qumada.instrument.buffers.buffer import Buffer, BufferException
from qumada.instrument.custom_drivers.ZI.MFLI import MFLI
logger = logging.getLogger(__name__)
[docs]class MFLIBuffer(Buffer):
"""Buffer for ZurichInstruments MFLI"""
AVAILABLE_TRIGGERS: list[str] = [
"trigger_in_1",
"trigger_in_2",
"aux_in_1",
"aux_in_2",
]
TRIGGER_MODE_MAPPING: dict = {
"continuous": 0,
"edge": 1,
"pulse": 3,
"tracking_edge": 4,
"tracking_pulse": 7,
"digital": 6,
}
TRIGGER_MODE_POLARITY_MAPPING: dict = {"positive": 1, "negative": 2, "both": 3}
GRID_INTERPOLATION_MAPPING: dict = {"nearest": 1, "linear": 2, "exact": 4}
# TODO: What happens in exact mode? Look up limitations...
def __init__(self, mfli: MFLI):
self._session = mfli.session
self._device = mfli.instr
self._daq = self._session.modules.daq
self._sample_nodes: list = []
self._subscribed_parameters: list[Parameter] = []
self._trigger: str | None = None
self._channel = 0
self._num_points: int | None = None
self._num_bursts: int = 1
self._burst_duration: float | None = None
[docs] def setup_buffer(self, settings: dict) -> None:
# validate settings
validate(settings, self.settings_schema)
self.settings: dict = settings
device = self._device
self._daq.device(device)
if "channel" in settings:
self._channel = settings["channel"]
device.demods[self._channel].enable(True)
# TODO: Validate Trigger mode, edge and interpolation!:
self._daq.type(self.TRIGGER_MODE_MAPPING[settings.get("trigger_mode", "edge")])
self._daq.edge(self.TRIGGER_MODE_POLARITY_MAPPING[settings.get("trigger_mode_polarity", "positive")])
self._daq.grid.mode(self.GRID_INTERPOLATION_MAPPING[settings.get("grid_interpolation", "linear")])
self.trigger = self.trigger # Don't delete me, I am important!
if "trigger_threshold" in settings:
# TODO: better way to distinguish, which trigger level to set
self._daq.level(settings["trigger_threshold"])
self._device.triggers.in_[0].level(settings["trigger_threshold"])
self._device.triggers.in_[1].level(settings["trigger_threshold"])
else:
logger.warning("No trigger threshold specified!")
self._set_num_points()
self._daq.delay = settings.get("delay", 0)
@property
def trigger(self):
return self._trigger
@trigger.setter
def trigger(self, trigger: str | None) -> None:
# TODO: Inform user about automatic changes of settings
# TODO: This is done BEFORE the setup_buffer, so changes to trigger type will be overriden anyway?
# print(f"Running trigger setter with: {trigger}")
if trigger is None:
logger.info("No Trigger provided! Setting trigger to continuous.")
self._daq.type(0)
elif trigger in self.AVAILABLE_TRIGGERS:
samplenode = self._device.demods[self._channel].sample
if trigger == "trigger_in_1":
self._daq.triggernode(samplenode.TrigIn1)
self._daq.type(6)
elif trigger == "trigger_in_2":
self._daq.triggernode(samplenode.TrigIn2)
self._daq.type(6)
elif trigger == "aux_in_1":
self._daq.triggernode(samplenode.AuxIn0)
if self._daq.type() not in (1, 3, 4, 7):
self._daq.type(1)
elif trigger == "aux_in_2":
self._daq.triggernode(samplenode.AuxIn1)
if self._daq.type() not in (1, 3, 4, 7):
self._daq.type(1)
else:
raise BufferException(f"Trigger input '{trigger}' is not supported.")
self._trigger = trigger
[docs] def force_trigger(self) -> None:
self._daq.forcetrigger(1)
@property
def num_points(self) -> int | None:
return self._num_points
@num_points.setter
def num_points(self, num_points) -> None:
if num_points > 8_388_608:
raise BufferException(
"Buffer is to small for this measurement. \
Please reduce the number of data points"
)
self._num_points = int(num_points)
# TODO: Define setter for other settings (e.g. burst_duration, num_bursts etc)
def _set_num_points(self) -> None:
"""
Calculates all required settings for the MFLI and sets the values
accordingly
------
Exception
Exception if number of points or number of burst is overdefined.
Returns
-------
None
"""
# TODO: Include ._daq.repetitions (averages over multiple bursts)
if all(k in self.settings for k in ("sampling_rate", "burst_duration", "num_points")):
raise BufferException("You cannot define sampling_rate, burst_duration and num_points at the same time")
if all(k in self.settings for k in ("sampling_rate", "duration", "num_bursts", "num_points")):
raise BufferException(
"You cannot define sampling rate, duration and num_burst and num_points at the same time"
)
if all(k in self.settings for k in ("num_bursts", "duration", "burst_duration")):
raise BufferException("You cannnot define duration, burst_duration and num_bursts at the same time")
if "burst_duration" in self.settings:
self._burst_duration = self.settings["burst_duration"]
if "duration" in self.settings:
if "burst_duration" in self.settings:
self._num_bursts = np.ceil(self.settings["duration"] / self._burst_duration)
elif "num_bursts" in self.settings:
self._num_bursts = int(self.settings["num_bursts"])
self._burst_duration = self.settings["duration"] / self._num_bursts
else:
logger.info(
"You have specified neither burst_duration nor num_bursts. \
Using duration as burst_duration!"
)
self._burst_duration = self.settings["duration"]
if "num_points" in self.settings:
self.num_points = int(self.settings["num_points"])
if "sampling_rate" in self.settings:
self._burst_duration = float(self.num_points / self.settings["sampling_rate"])
elif "sampling_rate" in self.settings:
self._sampling_rate = float(self.settings["sampling_rate"])
if self._burst_duration is not None:
self.num_points = int(np.ceil(self._sampling_rate * self._burst_duration))
elif all(k in self.settings for k in ("duration", "num_bursts")):
self._burst_duration = float(self.settings["duration"] / self.settings["num_bursts"])
self._daq.count(self._num_bursts)
self._daq.duration(self._burst_duration)
self._daq.grid.cols(self.num_points)
[docs] def read(self) -> dict:
data = self.read_raw()
result_dict = {}
for parameter in self._subscribed_parameters:
node = self._get_node_from_parameter(parameter)
key = next(key for key in data.keys() if str(key) == str(node))
result_dict[parameter.name] = data[key][0].value
if "timestamps" not in result_dict:
result_dict["timestamps"] = data[key][0].time
return result_dict
[docs] def read_raw(self) -> dict:
return self._daq.read()
[docs] def subscribe(self, parameters: list[Parameter]) -> None:
for parameter in parameters:
node = self._get_node_from_parameter(parameter)
if node not in self._sample_nodes:
self._subscribed_parameters.append(parameter)
self._sample_nodes.append(node)
self._daq.subscribe(node)
[docs] def unsubscribe(self, parameters: list[Parameter]) -> None:
for parameter in parameters.copy():
node = self._get_node_from_parameter(parameter)
if node in self._sample_nodes:
self._sample_nodes.remove(node)
self._subscribed_parameters.remove(parameter)
self._daq.unsubscribe(node)
[docs] def is_subscribed(self, parameter: Parameter) -> bool:
return parameter in self._subscribed_parameters
[docs] def start(self) -> None:
self._daq.execute()
[docs] def stop(self) -> None:
self._daq.raw_module.finish()
[docs] def is_ready(self) -> bool: ...
[docs] def is_finished(self) -> bool:
return self._daq.raw_module.finished()
def _get_node_from_parameter(self, parameter: Parameter):
return self._device.demods[self._channel].sample.__getattr__(parameter.signal_name[1])