Source code for qumada.instrument.buffers.dummy_dmm_buffer

# Copyright (c) 2023 JARA Institute for Quantum Information
#
# This file is part of QuMADA.
#
# QuMADA is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# QuMADA is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# QuMADA. If not, see <https://www.gnu.org/licenses/>.
#
# Contributors:
# - Daniel Grothe
# - Till Huckeman

"""
Created on Tue Jan  3 15:20:07 2023

@author: till3
"""
from __future__ import annotations

import numpy as np
from jsonschema import validate
from qcodes.parameters import Parameter

from qumada.instrument.buffers import Buffer, BufferException
from qumada.instrument.custom_drivers.Dummies.dummy_dmm import DummyDmm


# %%
[docs]class DummyDMMBuffer(Buffer): """Buffer for Dummy DMM""" AVAILABLE_TRIGGERS: list[str] = ["software"] def __init__(self, device: DummyDmm): self._device = device self._trigger: str | None = None self._subscribed_parameters: set[Parameter] = set() self._num_points: int | None = None
[docs] def setup_buffer(self, settings: dict) -> None: """Sets instrument related settings for the buffer.""" validate(settings, self.settings_schema) self.settings: dict = settings self._device.buffer_SR(settings.setdefault("sampling_rate", 512)) # self._device.buffer_trig_mode("OFF") self._set_num_points() # Method defined below # Datapoints to delete at the beginning of dataset due to delay. self.delay_data_points = 0 self.delay = settings.get("delay", 0) if self.delay < 0: raise BufferException("The Dummy Dac does not support negative delays.") else: self.delay_data_points = int(self.delay * self._device.buffer_SR()) self.num_points = self.delay_data_points + self.num_points self._device.buffer_n_points(self.num_points) self._device.buffer.ready_buffer()
@property def num_points(self) -> int | None: return self._num_points @num_points.setter def num_points(self, num_points) -> None: if num_points > 16383: raise BufferException( "Dummy Dacs Buffer is to small for this measurement. " "Please reduce the number of data points or the delay" ) self._num_points = int(num_points) def _set_num_points(self) -> None: # TODO: Move to parent Buffer Class? """ Calculates number of datapoints and sets the num_points accordingly. Raises ------ Exception Exception if number of points is overdefined. Returns ------- None """ if all(k in self.settings for k in ("sampling_rate", "burst_duration", "num_points")): raise BufferException("You cannot define sampling_rate, burst_duration and num_points at the same time") elif self.settings.get("num_points", False): self.num_points = self.settings["num_points"] elif all(k in self.settings for k in ("sampling_rate", "burst_duration")): self.num_points = int(np.ceil(self.settings["sampling_rate"] * self.settings["burst_duration"])) @property def trigger(self) -> str | None: return self._trigger @trigger.setter def trigger(self, trigger: str | None) -> None: if trigger == "software": self._trigger = trigger
[docs] def force_trigger(self) -> None: self._device._force_trigger()
[docs] def read_raw(self) -> dict: data = {} for parameter in self._subscribed_parameters: index = self._device.buffer.subscribed_params.index(parameter) data[parameter.name] = self._device.buffer.get()[index] data[parameter.name] = data[parameter.name][self.delay_data_points : self.num_points] data["timestamps"] = np.linspace(0, self.num_points / self._device.buffer_SR(), self.num_points) return data
[docs] def read(self) -> dict: # TODO: Add timetrace if possible return self.read_raw()
[docs] def subscribe(self, parameters: set | list[Parameter]) -> None: for parameter in parameters: self._device.buffer.subscribe(parameter) self._subscribed_parameters.add(parameter)
[docs] def unsubscribe(self, parameters: set | list[Parameter]) -> None: for parameter in parameters.copy(): if parameter in self._device.buffer.subscribed_params: self._device.buffer.subscribed_params.remove(parameter) self._subscribed_parameters.remove(parameter) else: print(f"{parameter} is not subscribed")
[docs] def is_subscribed(self, parameter: Parameter) -> bool: return parameter in self._subscribed_parameters
[docs] def start(self) -> None: self._device.start()
[docs] def stop(self) -> None: ...
[docs] def is_ready(self) -> bool: ...
[docs] def is_finished(self) -> bool: return self._device.is_finished()