Source code for qumada.instrument.custom_drivers.Dummies.dummy_dmm

# Copyright (c) 2023 JARA Institute for Quantum Information
#
# This file is part of QuMADA.
#
# QuMADA is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# QuMADA is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# QuMADA. If not, see <https://www.gnu.org/licenses/>.
#
# Contributors:
# - Bertha Lab Setup
# - Daniel Grothe
# - Till Huckeman


import threading

# most of the drivers only need a couple of these... moved all up here for clarity below
from time import sleep

import numpy as np
from qcodes.instrument import Instrument
from qcodes.parameters import Parameter
from qcodes.validators import validators as vals


# %%
[docs]class dmm_results_random(Parameter):
[docs] def get_raw(self): return np.random.sample()
[docs]class dmm_results_sinus(Parameter):
[docs] def get_raw(self): length = self.root_instrument.buffer_n_points() if length and length > 0: return [ np.sin(2 * np.pi * x / (self.root_instrument.buffer_n_points() ** 2 / self.root_instrument.buffer_SR())) for x in np.linspace(0, length, length) ] else: raise Exception("Set buffer_n_points first to a positive value")
[docs]class dmm_buffer(Parameter): def __init__(self, name, **kwargs): super().__init__(name, **kwargs) self.buffer_data = [] self.buffer_length = 512 self.SR = 512 self.is_finished = True self.subscribed_params = list() self.triggered: bool = False self._is_triggered = self.root_instrument._trigger_event
[docs] def subscribe(self, param): assert param.root_instrument == self.root_instrument if param not in self.subscribed_params: self.subscribed_params.append(param)
[docs] def ready_buffer(self): self.SR = self.root_instrument.buffer_SR() self.buffer_length = self.root_instrument.buffer_n_points() self.buffer_data = [list() for _ in self.subscribed_params] self.is_finished = False
[docs] def start(self): if self.is_finished: raise Exception("Buffer is not ready!") self.thread = threading.Thread(target=self._run, args=(), daemon=True) self.thread.start()
def _run(self): _ = self._is_triggered.wait() for i in range(0, self.buffer_length): for j in range(len(self.subscribed_params)): datapoint = self.subscribed_params[j]() if isinstance(datapoint, float): self.buffer_data[j].append(self.subscribed_params[j]()) elif isinstance(datapoint, list): self.buffer_data[j].append(self.subscribed_params[j]()[i]) sleep(1 / self.SR) self.is_finished = True
[docs] def reset(self): self.buffer_data = []
[docs] def get_raw(self): return self.buffer_data
[docs]class DummyDmm(Instrument): def __init__(self, name, trigger_event=threading.Event(), **kwargs): super().__init__(name, **kwargs) self._trigger_event = trigger_event self.add_parameter( "voltage", unit="V", parameter_class=dmm_results_sinus, ) self.add_parameter("current", unit="A", parameter_class=dmm_results_random) self.add_parameter("buffer", unit="V", parameter_class=dmm_buffer) self.add_parameter( "buffer_SR", unit="Sa/s", set_cmd=None, vals=vals.Numbers(0, 512), ) self.add_parameter( "trigger_mode", ) self.add_parameter("is_finished", get_cmd=self._is_finished) self.add_parameter("buffer_n_points", set_cmd=None, vals=vals.Ints(0, 16383)) self.add_parameter("triggered", set_cmd=None, vals=vals.Bool()) self.triggered(False) self.add_function("start", call_cmd=self._start_buffer) self.add_function("ready_buffer", call_cmd=self._ready_buffer) self.add_function("reset_buffer", call_cmd=self._reset_buffer) def _force_trigger(self): self.buffer._is_triggered.set() return None def _start_buffer(self): print("Started buffer") self.buffer.start() return None def _ready_buffer(self): print("Buffer is now ready") self.buffer.ready_buffer() self.buffer._is_triggered = self._trigger_event return None def _reset_buffer(self): print("Buffer was resetted") self.buffer.buffer_data = [] return None def _is_finished(self): return self.buffer.is_finished