diff --git a/qiskit_ibm_runtime/fake_provider/local_service.py b/qiskit_ibm_runtime/fake_provider/local_service.py index b59fcf466..9b9717827 100644 --- a/qiskit_ibm_runtime/fake_provider/local_service.py +++ b/qiskit_ibm_runtime/fake_provider/local_service.py @@ -14,20 +14,24 @@ from __future__ import annotations -import logging import copy -from typing import Dict, Union, Literal +import logging import warnings from dataclasses import asdict - -from qiskit.utils import optionals -from qiskit.providers.backend import BackendV1, BackendV2 -from qiskit.primitives import BackendSampler, BackendEstimator +from typing import Dict, Literal, Union + +from qiskit.primitives import ( + BackendEstimator, + BackendEstimatorV2, + BackendSampler, + BackendSamplerV2, +) from qiskit.primitives.primitive_job import PrimitiveJob +from qiskit.providers.backend import BackendV1, BackendV2 +from qiskit.utils import optionals -from ..runtime_options import RuntimeOptions from ..ibm_backend import IBMBackend -from ..qiskit.primitives import BackendEstimatorV2, BackendSamplerV2 # type: ignore[attr-defined] +from ..runtime_options import RuntimeOptions logger = logging.getLogger(__name__) @@ -137,7 +141,7 @@ def _run_aer_primitive_v1( The job object of the result of the primitive. """ # pylint: disable=import-outside-toplevel - from qiskit_aer.primitives import Sampler, Estimator + from qiskit_aer.primitives import Estimator, Sampler # TODO: issue warning if extra options are used options_copy = copy.deepcopy(options) diff --git a/qiskit_ibm_runtime/qiskit/__init__.py b/qiskit_ibm_runtime/qiskit/__init__.py deleted file mode 100644 index c4f3827ae..000000000 --- a/qiskit_ibm_runtime/qiskit/__init__.py +++ /dev/null @@ -1,15 +0,0 @@ -# This code is part of Qiskit. -# -# (C) Copyright IBM 2024. -# -# This code is licensed under the Apache License, Version 2.0. You may -# obtain a copy of this license in the LICENSE.txt file in the root directory -# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. -# -# Any modifications or derivative works of this code must retain this -# copyright notice, and modified files need to carry a notice indicating -# that they have been altered from the originals. - -""" -Qiskit primitives. -""" diff --git a/qiskit_ibm_runtime/qiskit/primitives/__init__.py b/qiskit_ibm_runtime/qiskit/primitives/__init__.py deleted file mode 100644 index 862aa3ea5..000000000 --- a/qiskit_ibm_runtime/qiskit/primitives/__init__.py +++ /dev/null @@ -1,19 +0,0 @@ -# This code is part of Qiskit. -# -# (C) Copyright IBM 2024. -# -# This code is licensed under the Apache License, Version 2.0. You may -# obtain a copy of this license in the LICENSE.txt file in the root directory -# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. -# -# Any modifications or derivative works of this code must retain this -# copyright notice, and modified files need to carry a notice indicating -# that they have been altered from the originals. -# type: ignore - -""" -Qiskit primitives. -""" - -from .backend_sampler_v2 import BackendSamplerV2 -from .backend_estimator_v2 import BackendEstimatorV2 diff --git a/qiskit_ibm_runtime/qiskit/primitives/backend_estimator_v2.py b/qiskit_ibm_runtime/qiskit/primitives/backend_estimator_v2.py deleted file mode 100644 index 515ee901d..000000000 --- a/qiskit_ibm_runtime/qiskit/primitives/backend_estimator_v2.py +++ /dev/null @@ -1,295 +0,0 @@ -# This code is part of Qiskit. -# -# (C) Copyright IBM 2024. -# -# This code is licensed under the Apache License, Version 2.0. You may -# obtain a copy of this license in the LICENSE.txt file in the root directory -# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. -# -# Any modifications or derivative works of this code must retain this -# copyright notice, and modified files need to carry a notice indicating -# that they have been altered from the originals. -# type: ignore - -"""Estimator V2 implementation for an arbitrary Backend object.""" - -from __future__ import annotations - -from collections import defaultdict -from collections.abc import Iterable -from dataclasses import dataclass - -import numpy as np - -from qiskit.circuit import ClassicalRegister, QuantumCircuit, QuantumRegister -from qiskit.exceptions import QiskitError -from qiskit.providers import BackendV1, BackendV2 -from qiskit.quantum_info import Pauli, PauliList -from qiskit.transpiler import PassManager, PassManagerConfig -from qiskit.transpiler.passes import Optimize1qGatesDecomposition - -from qiskit.primitives.backend_estimator import ( - _pauli_expval_with_variance, - _prepare_counts, - _run_circuits, -) -from qiskit.primitives.base import BaseEstimatorV2 -from qiskit.primitives.containers import EstimatorPubLike, PrimitiveResult, PubResult -from qiskit.primitives.containers.bindings_array import BindingsArray -from qiskit.primitives.containers.estimator_pub import EstimatorPub -from qiskit.primitives.primitive_job import PrimitiveJob - - -@dataclass -class Options: - """Options for :class:`~.BackendEstimatorV2`.""" - - default_precision: float = 0.015625 - """The default precision to use if none are specified in :meth:`~run`. - Default: 0.015625 (1 / sqrt(4096)). - """ - - abelian_grouping: bool = True - """Whether the observables should be grouped into sets of qubit-wise commuting observables. - Default: True. - """ - - seed_simulator: int | None = None - """The seed to use in the simulator. If None, a random seed will be used. - Default: None. - """ - - -class BackendEstimatorV2(BaseEstimatorV2): - """Evaluates expectation values for provided quantum circuit and observable combinations - - The :class:`~.BackendEstimatorV2` class is a generic implementation of the - :class:`~.BaseEstimatorV2` interface that is used to wrap a :class:`~.BackendV2` - (or :class:`~.BackendV1`) object in the :class:`~.BaseEstimatorV2` API. It - facilitates using backends that do not provide a native - :class:`~.BaseEstimatorV2` implementation in places that work with - :class:`~.BaseEstimatorV2`. However, - if you're using a provider that has a native implementation of - :class:`~.BaseEstimatorV2`, it is a better choice to leverage that native - implementation as it will likely include additional optimizations and be - a more efficient implementation. The generic nature of this class - precludes doing any provider- or backend-specific optimizations. - - This class does not perform any measurement or gate mitigation, and, presently, is only - compatible with Pauli-based observables. - - Each tuple of ``(circuit, observables, parameter values, precision)``, - called an estimator primitive unified bloc (PUB), produces its own array-based result. The - :meth:`~.BackendEstimatorV2.run` method can be given a sequence of pubs to run in one call. - - The options for :class:`~.BackendEstimatorV2` consist of the following items. - - * ``default_precision``: The default precision to use if none are specified in :meth:`~run`. - Default: 0.015625 (1 / sqrt(4096)). - - * ``abelian_grouping``: Whether the observables should be grouped into sets of qubit-wise - commuting observables. - Default: True. - - * ``seed_simulator``: The seed to use in the simulator. If None, a random seed will be used. - Default: None. - """ - - def __init__( - self, - *, - backend: BackendV1 | BackendV2, - options: dict | None = None, - ): - """ - Args: - backend: The backend to run the primitive on. - options: The options to control the default precision (``default_precision``), - the operator grouping (``abelian_grouping``), and - the random seed for the simulator (``seed_simulator``). - """ - self._backend = backend - self._options = Options(**options) if options else Options() - - basis = PassManagerConfig.from_backend(backend).basis_gates - if isinstance(backend, BackendV2): - opt1q = Optimize1qGatesDecomposition(basis=basis, target=backend.target) - else: - opt1q = Optimize1qGatesDecomposition(basis=basis) - self._passmanager = PassManager([opt1q]) - - @property - def options(self) -> Options: - """Return the options""" - return self._options - - @property - def backend(self) -> BackendV1 | BackendV2: - """Returns the backend which this sampler object based on.""" - return self._backend - - def run( - self, pubs: Iterable[EstimatorPubLike], *, precision: float | None = None - ) -> PrimitiveJob[PrimitiveResult[PubResult]]: - if precision is None: - precision = self._options.default_precision - coerced_pubs = [EstimatorPub.coerce(pub, precision) for pub in pubs] - self._validate_pubs(coerced_pubs) - job = PrimitiveJob(self._run, coerced_pubs) - job._submit() - return job - - def _validate_pubs(self, pubs: list[EstimatorPub]): - for i, pub in enumerate(pubs): - if pub.precision <= 0.0: - raise ValueError( - f"The {i}-th pub has precision less than or equal to 0 ({pub.precision}). ", - "But precision should be larger than 0.", - ) - - def _run(self, pubs: list[EstimatorPub]) -> PrimitiveResult[PubResult]: - return PrimitiveResult([self._run_pub(pub) for pub in pubs]) - - def _run_pub(self, pub: EstimatorPub) -> PubResult: - shots = int(np.ceil(1.0 / pub.precision**2)) - circuit = pub.circuit - observables = pub.observables - parameter_values = pub.parameter_values - - # calculate broadcasting of parameters and observables - param_shape = parameter_values.shape - param_indices = np.fromiter(np.ndindex(param_shape), dtype=object).reshape(param_shape) - bc_param_ind, bc_obs = np.broadcast_arrays(param_indices, observables) - - # calculate expectation values for each pair of parameter value set and pauli - param_obs_map = defaultdict(set) - for index in np.ndindex(*bc_param_ind.shape): - param_index = bc_param_ind[index] - param_obs_map[param_index].update(bc_obs[index].keys()) - expval_map = self._calc_expval_paulis(circuit, parameter_values, param_obs_map, shots) - - # calculate expectation values (evs) and standard errors (stds) - evs = np.zeros_like(bc_param_ind, dtype=float) - variances = np.zeros_like(bc_param_ind, dtype=float) - for index in np.ndindex(*bc_param_ind.shape): - param_index = bc_param_ind[index] - for pauli, coeff in bc_obs[index].items(): - expval, variance = expval_map[param_index, pauli] - evs[index] += expval * coeff - variances[index] += variance * coeff**2 - stds = np.sqrt(variances / shots) - data_bin_cls = self._make_data_bin(pub) - data_bin = data_bin_cls(evs=evs, stds=stds) - return PubResult(data_bin, metadata={"target_precision": pub.precision}) - - def _calc_expval_paulis( - self, - circuit: QuantumCircuit, - parameter_values: BindingsArray, - param_obs_map: dict[tuple[int, ...], set[str]], - shots: int, - ) -> dict[tuple[tuple[int, ...], str], tuple[float, float]]: - # generate circuits - circuits = [] - for param_index, pauli_strings in param_obs_map.items(): - bound_circuit = parameter_values.bind(circuit, param_index) - # sort pauli_strings so that the order is deterministic - meas_paulis = PauliList(sorted(pauli_strings)) - new_circuits = self._preprocessing(bound_circuit, meas_paulis, param_index) - circuits.extend(new_circuits) - - # run circuits - result, metadata = _run_circuits( - circuits, self._backend, shots=shots, seed_simulator=self._options.seed_simulator - ) - - # postprocessing results - expval_map: dict[tuple[tuple[int, ...], str], tuple[float, float]] = {} - counts = _prepare_counts(result) - for count, meta in zip(counts, metadata): - orig_paulis = meta["orig_paulis"] - meas_paulis = meta["meas_paulis"] - param_index = meta["param_index"] - expvals, variances = _pauli_expval_with_variance(count, meas_paulis) - for pauli, expval, variance in zip(orig_paulis, expvals, variances): - expval_map[param_index, pauli.to_label()] = (expval, variance) - return expval_map - - def _preprocessing( - self, circuit: QuantumCircuit, observable: PauliList, param_index: tuple[int, ...] - ) -> list[QuantumCircuit]: - # generate measurement circuits with metadata - meas_circuits: list[QuantumCircuit] = [] - if self._options.abelian_grouping: - for obs in observable.group_commuting(qubit_wise=True): - basis = Pauli((np.logical_or.reduce(obs.z), np.logical_or.reduce(obs.x))) - meas_circuit, indices = _measurement_circuit(circuit.num_qubits, basis) - paulis = PauliList.from_symplectic( - obs.z[:, indices], - obs.x[:, indices], - obs.phase, - ) - meas_circuit.metadata = { - "orig_paulis": obs, - "meas_paulis": paulis, - "param_index": param_index, - } - meas_circuits.append(meas_circuit) - else: - for basis in observable: - meas_circuit, indices = _measurement_circuit(circuit.num_qubits, basis) - obs = PauliList(basis) - paulis = PauliList.from_symplectic( - obs.z[:, indices], - obs.x[:, indices], - obs.phase, - ) - meas_circuit.metadata = { - "orig_paulis": obs, - "meas_paulis": paulis, - "param_index": param_index, - } - meas_circuits.append(meas_circuit) - - # unroll basis gates - meas_circuits = self._passmanager.run(meas_circuits) - - # combine measurement circuits - preprocessed_circuits = [] - for meas_circuit in meas_circuits: - circuit_copy = circuit.copy() - # meas_circuit is supposed to have a classical register whose name is different from - # those of the transpiled_circuit - clbits = meas_circuit.cregs[0] - for creg in circuit_copy.cregs: - if clbits.name == creg.name: - raise QiskitError( - "Classical register for measurements conflict with those of the input " - f"circuit: {clbits}. " - "Recommended to avoid register names starting with '__'." - ) - circuit_copy.add_register(clbits) - circuit_copy.compose(meas_circuit, clbits=clbits, inplace=True) - circuit_copy.metadata = meas_circuit.metadata - preprocessed_circuits.append(circuit_copy) - return preprocessed_circuits - - -def _measurement_circuit(num_qubits: int, pauli: Pauli): - # Note: if pauli is I for all qubits, this function generates a circuit to measure only - # the first qubit. - # Although such an operator can be optimized out by interpreting it as a constant (1), - # this optimization requires changes in various methods. So it is left as future work. - qubit_indices = np.arange(pauli.num_qubits)[pauli.z | pauli.x] - if not np.any(qubit_indices): - qubit_indices = [0] - meas_circuit = QuantumCircuit( - QuantumRegister(num_qubits, "q"), ClassicalRegister(len(qubit_indices), f"__c_{pauli}") - ) - for clbit, i in enumerate(qubit_indices): - if pauli.x[i]: - if pauli.z[i]: - meas_circuit.sdg(i) - meas_circuit.h(i) - meas_circuit.measure(i, clbit) - return meas_circuit, qubit_indices diff --git a/qiskit_ibm_runtime/qiskit/primitives/backend_sampler_v2.py b/qiskit_ibm_runtime/qiskit/primitives/backend_sampler_v2.py deleted file mode 100644 index 1547c82e2..000000000 --- a/qiskit_ibm_runtime/qiskit/primitives/backend_sampler_v2.py +++ /dev/null @@ -1,237 +0,0 @@ -# This code is part of Qiskit. -# -# (C) Copyright IBM 2024. -# -# This code is licensed under the Apache License, Version 2.0. You may -# obtain a copy of this license in the LICENSE.txt file in the root directory -# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. -# -# Any modifications or derivative works of this code must retain this -# copyright notice, and modified files need to carry a notice indicating -# that they have been altered from the originals. -# type: ignore - -"""Sampler V2 implementation for an arbitrary Backend object.""" - -from __future__ import annotations - -import warnings -from dataclasses import dataclass -from typing import Iterable - -import numpy as np -from numpy.typing import NDArray - -from qiskit.circuit import QuantumCircuit -from qiskit.primitives.backend_estimator import _run_circuits -from qiskit.primitives.base import BaseSamplerV2 -from qiskit.primitives.containers import ( - BitArray, - PrimitiveResult, - PubResult, - SamplerPubLike, - make_data_bin, -) -from qiskit.primitives.containers.bit_array import _min_num_bytes -from qiskit.primitives.containers.sampler_pub import SamplerPub -from qiskit.primitives.primitive_job import PrimitiveJob -from qiskit.providers.backend import BackendV1, BackendV2 -from qiskit.result import Result - - -@dataclass -class Options: - """Options for :class:`~.BackendSamplerV2`""" - - default_shots: int = 1024 - """The default shots to use if none are specified in :meth:`~.run`. - Default: 1024. - """ - - seed_simulator: int | None = None - """The seed to use in the simulator. If None, a random seed will be used. - Default: None. - """ - - -@dataclass -class _MeasureInfo: - creg_name: str - num_bits: int - num_bytes: int - start: int - - -class BackendSamplerV2(BaseSamplerV2): - """Evaluates bitstrings for provided quantum circuits - - The :class:`~.BackendSamplerV2` class is a generic implementation of the - :class:`~.BaseSamplerV2` interface that is used to wrap a :class:`~.BackendV2` - (or :class:`~.BackendV1`) object in the class :class:`~.BaseSamplerV2` API. It - facilitates using backends that do not provide a native - :class:`~.BaseSamplerV2` implementation in places that work with - :class:`~.BaseSamplerV2`. However, - if you're using a provider that has a native implementation of - :class:`~.BaseSamplerV2`, it is a better choice to leverage that native - implementation as it will likely include additional optimizations and be - a more efficient implementation. The generic nature of this class - precludes doing any provider- or backend-specific optimizations. - - This class does not perform any measurement or gate mitigation. - - Each tuple of ``(circuit, parameter values, shots)``, called a sampler - primitive unified bloc (PUB), produces its own array-valued result. The :meth:`~run` method can - be given many pubs at once. - - The options for :class:`~.BackendSamplerV2` consist of the following items. - - * ``default_shots``: The default shots to use if none are specified in :meth:`~run`. - Default: 1024. - - * ``seed_simulator``: The seed to use in the simulator. If None, a random seed will be used. - Default: None. - - .. note:: - - This class requires a backend that supports ``memory`` option. - - """ - - def __init__( - self, - *, - backend: BackendV1 | BackendV2, - options: dict | None = None, - ): - """ - Args: - backend: The backend to run the primitive on. - options: The options to control the default shots (``default_shots``) and - the random seed for the simulator (``seed_simulator``). - """ - self._backend = backend - self._options = Options(**options) if options else Options() - - @property - def backend(self) -> BackendV1 | BackendV2: - """Returns the backend which this sampler object based on.""" - return self._backend - - @property - def options(self) -> Options: - """Return the options""" - return self._options - - def run( - self, pubs: Iterable[SamplerPubLike], *, shots: int | None = None - ) -> PrimitiveJob[PrimitiveResult[PubResult]]: - if shots is None: - shots = self._options.default_shots - coerced_pubs = [SamplerPub.coerce(pub, shots) for pub in pubs] - self._validate_pubs(coerced_pubs) - job = PrimitiveJob(self._run, coerced_pubs) - job._submit() - return job - - def _validate_pubs(self, pubs: list[SamplerPub]): - for i, pub in enumerate(pubs): - if len(pub.circuit.cregs) == 0: - warnings.warn( - f"The {i}-th pub's circuit has no output classical registers and so the result " - "will be empty. Did you mean to add measurement instructions?", - UserWarning, - ) - - def _run(self, pubs: Iterable[SamplerPub]) -> PrimitiveResult[PubResult]: - results = [self._run_pub(pub) for pub in pubs] - return PrimitiveResult(results) - - def _run_pub(self, pub: SamplerPub) -> PubResult: - meas_info, max_num_bytes = _analyze_circuit(pub.circuit) - bound_circuits = pub.parameter_values.bind_all(pub.circuit) - arrays = { - item.creg_name: np.zeros( - bound_circuits.shape + (pub.shots, item.num_bytes), dtype=np.uint8 - ) - for item in meas_info - } - flatten_circuits = np.ravel(bound_circuits).tolist() - result_memory, _ = _run_circuits( - flatten_circuits, - self._backend, - memory=True, - shots=pub.shots, - seed_simulator=self._options.seed_simulator, - ) - memory_list = _prepare_memory(result_memory, max_num_bytes) - - for samples, index in zip(memory_list, np.ndindex(*bound_circuits.shape)): - for item in meas_info: - ary = _samples_to_packed_array(samples, item.num_bits, item.start) - arrays[item.creg_name][index] = ary - - data_bin_cls = make_data_bin( - [(item.creg_name, BitArray) for item in meas_info], - shape=bound_circuits.shape, - ) - meas = { - item.creg_name: BitArray(arrays[item.creg_name], item.num_bits) for item in meas_info - } - data_bin = data_bin_cls(**meas) - return PubResult(data_bin, metadata=pub.circuit.metadata) - - -def _analyze_circuit(circuit: QuantumCircuit) -> tuple[list[_MeasureInfo], int]: - meas_info = [] - max_num_bits = 0 - for creg in circuit.cregs: - name = creg.name - num_bits = creg.size - if num_bits != 0: - start = circuit.find_bit(creg[0]).index - else: - start = 0 - meas_info.append( - _MeasureInfo( - creg_name=name, - num_bits=num_bits, - num_bytes=_min_num_bytes(num_bits), - start=start, - ) - ) - max_num_bits = max(max_num_bits, start + num_bits) - return meas_info, _min_num_bytes(max_num_bits) - - -def _prepare_memory(results: list[Result], num_bytes: int) -> NDArray[np.uint8]: - lst = [] - for res in results: - for exp in res.results: - if hasattr(exp.data, "memory") and exp.data.memory: - data = b"".join(int(i, 16).to_bytes(num_bytes, "big") for i in exp.data.memory) - data = np.frombuffer(data, dtype=np.uint8).reshape(-1, num_bytes) - else: - # no measure in a circuit - data = np.zeros((exp.shots, num_bytes), dtype=np.uint8) - lst.append(data) - ary = np.array(lst, copy=False) - return np.unpackbits(ary, axis=-1, bitorder="big") - - -def _samples_to_packed_array( - samples: NDArray[np.uint8], num_bits: int, start: int -) -> NDArray[np.uint8]: - # samples of `Backend.run(memory=True)` will be the order of - # clbit_last, ..., clbit_1, clbit_0 - # place samples in the order of clbit_start+num_bits-1, ..., clbit_start+1, clbit_start - if start == 0: - ary = samples[:, -start - num_bits :] - else: - ary = samples[:, -start - num_bits : -start] - # pad 0 in the left to align the number to be mod 8 - # since np.packbits(bitorder='big') pads 0 to the right. - pad_size = -num_bits % 8 - ary = np.pad(ary, ((0, 0), (pad_size, 0)), constant_values=0) - # pack bits in big endian order - ary = np.packbits(ary, axis=-1, bitorder="big") - return ary diff --git a/requirements.txt b/requirements.txt index 05aef2e04..5df5ffaeb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,4 +7,4 @@ websocket-client>=1.5.1 typing-extensions>=4.0.0 ibm-platform-services>=0.22.6 pydantic>=2.5.0 -qiskit>=1.0.0 \ No newline at end of file +qiskit>=1.1.0 diff --git a/test/unit/qiskit/__init__.py b/test/unit/qiskit/__init__.py deleted file mode 100644 index 139b265e1..000000000 --- a/test/unit/qiskit/__init__.py +++ /dev/null @@ -1,11 +0,0 @@ -# This code is part of Qiskit. -# -# (C) Copyright IBM 2024. -# -# This code is licensed under the Apache License, Version 2.0. You may -# obtain a copy of this license in the LICENSE.txt file in the root directory -# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. -# -# Any modifications or derivative works of this code must retain this -# copyright notice, and modified files need to carry a notice indicating -# that they have been altered from the originals. diff --git a/test/unit/qiskit/test_backend_estimator_v2.py b/test/unit/qiskit/test_backend_estimator_v2.py deleted file mode 100644 index e8fe29c1e..000000000 --- a/test/unit/qiskit/test_backend_estimator_v2.py +++ /dev/null @@ -1,413 +0,0 @@ -# This code is part of Qiskit. -# -# (C) Copyright IBM 2024. -# -# This code is licensed under the Apache License, Version 2.0. You may -# obtain a copy of this license in the LICENSE.txt file in the root directory -# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. -# -# Any modifications or derivative works of this code must retain this -# copyright notice, and modified files need to carry a notice indicating -# that they have been altered from the originals. -# type: ignore - -"""Tests for Backend Estimator V2.""" - -from __future__ import annotations - -import unittest - -import numpy as np -from ddt import ddt - -from qiskit.circuit import Parameter, QuantumCircuit -from qiskit.circuit.library import RealAmplitudes -from qiskit.primitives import StatevectorEstimator -from qiskit.primitives.containers.bindings_array import BindingsArray -from qiskit.primitives.containers.estimator_pub import EstimatorPub -from qiskit.primitives.containers.observables_array import ObservablesArray -from qiskit.providers.backend_compat import BackendV2Converter -from qiskit.providers.basic_provider import BasicSimulator -from qiskit.providers.fake_provider import Fake7QPulseV1 -from qiskit.quantum_info import SparsePauliOp -from qiskit.transpiler.preset_passmanagers import generate_preset_pass_manager -from qiskit.utils import optionals -from qiskit_ibm_runtime.qiskit.primitives.backend_estimator_v2 import BackendEstimatorV2 - -from ...ibm_test_case import IBMTestCase -from ...utils import combine - -BACKENDS = [BasicSimulator(), Fake7QPulseV1(), BackendV2Converter(Fake7QPulseV1())] - - -@ddt -class TestBackendEstimatorV2(IBMTestCase): - """Test Estimator""" - - def setUp(self): - super().setUp() - self._precision = 5e-3 - self._rtol = 3e-1 - self._seed = 12 - self._rng = np.random.default_rng(self._seed) - self._options = {"default_precision": self._precision, "seed_simulator": self._seed} - self.ansatz = RealAmplitudes(num_qubits=2, reps=2) - self.observable = SparsePauliOp.from_list( - [ - ("II", -1.052373245772859), - ("IZ", 0.39793742484318045), - ("ZI", -0.39793742484318045), - ("ZZ", -0.01128010425623538), - ("XX", 0.18093119978423156), - ] - ) - self.expvals = -1.0284380963435145, -1.284366511861733 - - self.psi = (RealAmplitudes(num_qubits=2, reps=2), RealAmplitudes(num_qubits=2, reps=3)) - self.params = tuple(psi.parameters for psi in self.psi) - self.hamiltonian = ( - SparsePauliOp.from_list([("II", 1), ("IZ", 2), ("XI", 3)]), - SparsePauliOp.from_list([("IZ", 1)]), - SparsePauliOp.from_list([("ZI", 1), ("ZZ", 1)]), - ) - self.theta = ( - [0, 1, 1, 2, 3, 5], - [0, 1, 1, 2, 3, 5, 8, 13], - [1, 2, 3, 4, 5, 6], - ) - - @combine(backend=BACKENDS, abelian_grouping=[True, False]) - def test_estimator_run(self, backend, abelian_grouping): - """Test Estimator.run()""" - psi1, psi2 = self.psi - hamiltonian1, hamiltonian2, hamiltonian3 = self.hamiltonian - theta1, theta2, theta3 = self.theta - pm = generate_preset_pass_manager(optimization_level=0, backend=backend) - psi1, psi2 = pm.run([psi1, psi2]) - estimator = BackendEstimatorV2(backend=backend, options=self._options) - estimator.options.abelian_grouping = abelian_grouping - # Specify the circuit and observable by indices. - # calculate [ ] - ham1 = hamiltonian1.apply_layout(psi1.layout) - job = estimator.run([(psi1, ham1, [theta1])]) - result = job.result() - np.testing.assert_allclose(result[0].data.evs, [1.5555572817900956], rtol=self._rtol) - - # Objects can be passed instead of indices. - # Note that passing objects has an overhead - # since the corresponding indices need to be searched. - # User can append a circuit and observable. - # calculate [ ] - ham1 = hamiltonian1.apply_layout(psi2.layout) - result2 = estimator.run([(psi2, ham1, theta2)]).result() - np.testing.assert_allclose(result2[0].data.evs, [2.97797666], rtol=self._rtol) - - # calculate [ , ] - ham2 = hamiltonian2.apply_layout(psi1.layout) - ham3 = hamiltonian3.apply_layout(psi1.layout) - result3 = estimator.run([(psi1, [ham2, ham3], theta1)]).result() - np.testing.assert_allclose(result3[0].data.evs, [-0.551653, 0.07535239], rtol=self._rtol) - - # calculate [ [, - # ], - # [] ] - ham1 = hamiltonian1.apply_layout(psi1.layout) - ham3 = hamiltonian3.apply_layout(psi1.layout) - ham2 = hamiltonian2.apply_layout(psi2.layout) - result4 = estimator.run( - [ - (psi1, [ham1, ham3], [theta1, theta3]), - (psi2, ham2, theta2), - ] - ).result() - np.testing.assert_allclose(result4[0].data.evs, [1.55555728, -1.08766318], rtol=self._rtol) - np.testing.assert_allclose(result4[1].data.evs, [0.17849238], rtol=self._rtol) - - @combine(backend=BACKENDS, abelian_grouping=[True, False]) - def test_estimator_with_pub(self, backend, abelian_grouping): - """Test estimator with explicit EstimatorPubs.""" - psi1, psi2 = self.psi - hamiltonian1, hamiltonian2, hamiltonian3 = self.hamiltonian - theta1, theta2, theta3 = self.theta - pm = generate_preset_pass_manager(optimization_level=0, backend=backend) - psi1, psi2 = pm.run([psi1, psi2]) - - ham1 = hamiltonian1.apply_layout(psi1.layout) - ham3 = hamiltonian3.apply_layout(psi1.layout) - obs1 = ObservablesArray.coerce([ham1, ham3]) - bind1 = BindingsArray.coerce({tuple(psi1.parameters): [theta1, theta3]}) - pub1 = EstimatorPub(psi1, obs1, bind1) - - ham2 = hamiltonian2.apply_layout(psi2.layout) - obs2 = ObservablesArray.coerce(ham2) - bind2 = BindingsArray.coerce({tuple(psi2.parameters): theta2}) - pub2 = EstimatorPub(psi2, obs2, bind2) - - estimator = BackendEstimatorV2(backend=backend, options=self._options) - estimator.options.abelian_grouping = abelian_grouping - result4 = estimator.run([pub1, pub2]).result() - np.testing.assert_allclose(result4[0].data.evs, [1.55555728, -1.08766318], rtol=self._rtol) - np.testing.assert_allclose(result4[1].data.evs, [0.17849238], rtol=self._rtol) - - @combine(backend=BACKENDS, abelian_grouping=[True, False]) - def test_estimator_run_no_params(self, backend, abelian_grouping): - """test for estimator without parameters""" - circuit = self.ansatz.assign_parameters([0, 1, 1, 2, 3, 5]) - pm = generate_preset_pass_manager(optimization_level=0, backend=backend) - circuit = pm.run(circuit) - est = BackendEstimatorV2(backend=backend, options=self._options) - est.options.abelian_grouping = abelian_grouping - observable = self.observable.apply_layout(circuit.layout) - result = est.run([(circuit, observable)]).result() - np.testing.assert_allclose(result[0].data.evs, [-1.284366511861733], rtol=self._rtol) - - @combine(backend=BACKENDS, abelian_grouping=[True, False]) - def test_run_single_circuit_observable(self, backend, abelian_grouping): - """Test for single circuit and single observable case.""" - est = BackendEstimatorV2(backend=backend, options=self._options) - est.options.abelian_grouping = abelian_grouping - pm = generate_preset_pass_manager(optimization_level=0, backend=backend) - - with self.subTest("No parameter"): - qc = QuantumCircuit(1) - qc.x(0) - qc = pm.run(qc) - op = SparsePauliOp("Z") - op = op.apply_layout(qc.layout) - param_vals = [None, [], [[]], np.array([]), np.array([[]]), [np.array([])]] - target = [-1] - for val in param_vals: - self.subTest(f"{val}") - result = est.run([(qc, op, val)]).result() - np.testing.assert_allclose(result[0].data.evs, target, rtol=self._rtol) - self.assertEqual(result[0].metadata["target_precision"], self._precision) - - with self.subTest("One parameter"): - param = Parameter("x") - qc = QuantumCircuit(1) - qc.ry(param, 0) - qc = pm.run(qc) - op = SparsePauliOp("Z") - op = op.apply_layout(qc.layout) - param_vals = [ - [np.pi], - np.array([np.pi]), - ] - target = [-1] - for val in param_vals: - self.subTest(f"{val}") - result = est.run([(qc, op, val)]).result() - np.testing.assert_allclose(result[0].data.evs, target, rtol=self._rtol) - self.assertEqual(result[0].metadata["target_precision"], self._precision) - - with self.subTest("More than one parameter"): - qc = self.psi[0] - qc = pm.run(qc) - op = self.hamiltonian[0] - op = op.apply_layout(qc.layout) - param_vals = [ - self.theta[0], - [self.theta[0]], - np.array(self.theta[0]), - np.array([self.theta[0]]), - [np.array(self.theta[0])], - ] - target = [1.5555572817900956] - for val in param_vals: - self.subTest(f"{val}") - result = est.run([(qc, op, val)]).result() - np.testing.assert_allclose(result[0].data.evs, target, rtol=self._rtol) - self.assertEqual(result[0].metadata["target_precision"], self._precision) - - @combine(backend=BACKENDS, abelian_grouping=[True, False]) - def test_run_1qubit(self, backend, abelian_grouping): - """Test for 1-qubit cases""" - qc = QuantumCircuit(1) - qc2 = QuantumCircuit(1) - qc2.x(0) - pm = generate_preset_pass_manager(optimization_level=0, backend=backend) - qc, qc2 = pm.run([qc, qc2]) - - op = SparsePauliOp.from_list([("I", 1)]) - op2 = SparsePauliOp.from_list([("Z", 1)]) - - est = BackendEstimatorV2(backend=backend, options=self._options) - est.options.abelian_grouping = abelian_grouping - op_1 = op.apply_layout(qc.layout) - result = est.run([(qc, op_1)]).result() - np.testing.assert_allclose(result[0].data.evs, [1], rtol=self._rtol) - - op_2 = op2.apply_layout(qc.layout) - result = est.run([(qc, op_2)]).result() - np.testing.assert_allclose(result[0].data.evs, [1], rtol=self._rtol) - - op_3 = op.apply_layout(qc2.layout) - result = est.run([(qc2, op_3)]).result() - np.testing.assert_allclose(result[0].data.evs, [1], rtol=self._rtol) - - op_4 = op2.apply_layout(qc2.layout) - result = est.run([(qc2, op_4)]).result() - np.testing.assert_allclose(result[0].data.evs, [-1], rtol=self._rtol) - - @combine(backend=BACKENDS, abelian_grouping=[True, False]) - def test_run_2qubits(self, backend, abelian_grouping): - """Test for 2-qubit cases (to check endian)""" - qc = QuantumCircuit(2) - qc2 = QuantumCircuit(2) - qc2.x(0) - pm = generate_preset_pass_manager(optimization_level=0, backend=backend) - qc, qc2 = pm.run([qc, qc2]) - - op = SparsePauliOp.from_list([("II", 1)]) - op2 = SparsePauliOp.from_list([("ZI", 1)]) - op3 = SparsePauliOp.from_list([("IZ", 1)]) - - est = BackendEstimatorV2(backend=backend, options=self._options) - est.options.abelian_grouping = abelian_grouping - op_1 = op.apply_layout(qc.layout) - result = est.run([(qc, op_1)]).result() - np.testing.assert_allclose(result[0].data.evs, [1], rtol=self._rtol) - - op_2 = op.apply_layout(qc2.layout) - result = est.run([(qc2, op_2)]).result() - np.testing.assert_allclose(result[0].data.evs, [1], rtol=self._rtol) - - op_3 = op2.apply_layout(qc.layout) - result = est.run([(qc, op_3)]).result() - np.testing.assert_allclose(result[0].data.evs, [1], rtol=self._rtol) - - op_4 = op2.apply_layout(qc2.layout) - result = est.run([(qc2, op_4)]).result() - np.testing.assert_allclose(result[0].data.evs, [1], rtol=self._rtol) - - op_5 = op3.apply_layout(qc.layout) - result = est.run([(qc, op_5)]).result() - np.testing.assert_allclose(result[0].data.evs, [1], rtol=self._rtol) - - op_6 = op3.apply_layout(qc2.layout) - result = est.run([(qc2, op_6)]).result() - np.testing.assert_allclose(result[0].data.evs, [-1], rtol=self._rtol) - - @combine(backend=BACKENDS, abelian_grouping=[True, False]) - def test_run_errors(self, backend, abelian_grouping): - """Test for errors""" - qc = QuantumCircuit(1) - qc2 = QuantumCircuit(2) - - op = SparsePauliOp.from_list([("I", 1)]) - op2 = SparsePauliOp.from_list([("II", 1)]) - - est = BackendEstimatorV2(backend=backend, options=self._options) - est.options.abelian_grouping = abelian_grouping - with self.assertRaises(ValueError): - est.run([(qc, op2)]).result() - with self.assertRaises(ValueError): - est.run([(qc, op, [[1e4]])]).result() - with self.assertRaises(ValueError): - est.run([(qc2, op2, [[1, 2]])]).result() - with self.assertRaises(ValueError): - est.run([(qc, [op, op2], [[1]])]).result() - with self.assertRaises(ValueError): - est.run([(qc, op)], precision=-1).result() - with self.assertRaises(ValueError): - est.run([(qc, 1j * op)], precision=0.1).result() - # precision == 0 - with self.assertRaises(ValueError): - est.run([(qc, op, None, 0)]).result() - with self.assertRaises(ValueError): - est.run([(qc, op)], precision=0).result() - # precision < 0 - with self.assertRaises(ValueError): - est.run([(qc, op, None, -1)]).result() - with self.assertRaises(ValueError): - est.run([(qc, op)], precision=-1).result() - - @combine(backend=BACKENDS, abelian_grouping=[True, False]) - def test_run_numpy_params(self, backend, abelian_grouping): - """Test for numpy array as parameter values""" - qc = RealAmplitudes(num_qubits=2, reps=2) - pm = generate_preset_pass_manager(optimization_level=0, backend=backend) - qc = pm.run(qc) - op = SparsePauliOp.from_list([("IZ", 1), ("XI", 2), ("ZY", -1)]) - op = op.apply_layout(qc.layout) - k = 5 - params_array = self._rng.random((k, qc.num_parameters)) - params_list = params_array.tolist() - params_list_array = list(params_array) - statevector_estimator = StatevectorEstimator(seed=123) - target = statevector_estimator.run([(qc, op, params_list)]).result() - - backend_estimator = BackendEstimatorV2(backend=backend, options=self._options) - backend_estimator.options.abelian_grouping = abelian_grouping - - with self.subTest("ndarrary"): - result = backend_estimator.run([(qc, op, params_array)]).result() - self.assertEqual(result[0].data.evs.shape, (k,)) - np.testing.assert_allclose(result[0].data.evs, target[0].data.evs, rtol=self._rtol) - - with self.subTest("list of ndarray"): - result = backend_estimator.run([(qc, op, params_list_array)]).result() - self.assertEqual(result[0].data.evs.shape, (k,)) - np.testing.assert_allclose(result[0].data.evs, target[0].data.evs, rtol=self._rtol) - - @combine(backend=BACKENDS, abelian_grouping=[True, False]) - def test_precision(self, backend, abelian_grouping): - """Test for precision""" - estimator = BackendEstimatorV2(backend=backend, options=self._options) - estimator.options.abelian_grouping = abelian_grouping - pm = generate_preset_pass_manager(optimization_level=0, backend=backend) - psi1 = pm.run(self.psi[0]) - hamiltonian1 = self.hamiltonian[0].apply_layout(psi1.layout) - theta1 = self.theta[0] - job = estimator.run([(psi1, hamiltonian1, [theta1])]) - result = job.result() - np.testing.assert_allclose(result[0].data.evs, [1.901141473854881], rtol=self._rtol) - # The result of the second run is the same - job = estimator.run([(psi1, hamiltonian1, [theta1]), (psi1, hamiltonian1, [theta1])]) - result = job.result() - np.testing.assert_allclose(result[0].data.evs, [1.901141473854881], rtol=self._rtol) - np.testing.assert_allclose(result[1].data.evs, [1.901141473854881], rtol=self._rtol) - # apply smaller precision value - job = estimator.run([(psi1, hamiltonian1, [theta1])], precision=self._precision * 0.5) - result = job.result() - np.testing.assert_allclose(result[0].data.evs, [1.5555572817900956], rtol=self._rtol) - - @unittest.skipUnless(optionals.HAS_AER, "qiskit-aer is required to run this test") - @combine(abelian_grouping=[True, False]) - def test_aer(self, abelian_grouping): - """Test for Aer simulator""" - from qiskit_aer import AerSimulator # pylint: disable=import-outside-toplevel - - backend = AerSimulator() - seed = 123 - qc = RealAmplitudes(num_qubits=2, reps=1) - pm = generate_preset_pass_manager(optimization_level=0, backend=backend) - qc = pm.run(qc) - op = [SparsePauliOp("IX"), SparsePauliOp("YI")] - shape = (3, 2) - params_array = self._rng.random(shape + (qc.num_parameters,)) - params_list = params_array.tolist() - params_list_array = list(params_array) - statevector_estimator = StatevectorEstimator(seed=seed) - target = statevector_estimator.run([(qc, op, params_list)]).result() - - backend_estimator = BackendEstimatorV2(backend=backend, options=self._options) - backend_estimator.options.abelian_grouping = abelian_grouping - - with self.subTest("ndarrary"): - result = backend_estimator.run([(qc, op, params_array)]).result() - self.assertEqual(result[0].data.evs.shape, shape) - np.testing.assert_allclose( - result[0].data.evs, target[0].data.evs, rtol=self._rtol, atol=1e-1 - ) - - with self.subTest("list of ndarray"): - result = backend_estimator.run([(qc, op, params_list_array)]).result() - self.assertEqual(result[0].data.evs.shape, shape) - np.testing.assert_allclose( - result[0].data.evs, target[0].data.evs, rtol=self._rtol, atol=1e-1 - ) - - -if __name__ == "__main__": - unittest.main() diff --git a/test/unit/qiskit/test_backend_sampler_v2.py b/test/unit/qiskit/test_backend_sampler_v2.py deleted file mode 100644 index 3ed31795a..000000000 --- a/test/unit/qiskit/test_backend_sampler_v2.py +++ /dev/null @@ -1,692 +0,0 @@ -# This code is part of Qiskit. -# -# (C) Copyright IBM 2024. -# -# This code is licensed under the Apache License, Version 2.0. You may -# obtain a copy of this license in the LICENSE.txt file in the root directory -# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. -# -# Any modifications or derivative works of this code must retain this -# copyright notice, and modified files need to carry a notice indicating -# that they have been altered from the originals. -# type: ignore - -"""Tests for Backend Sampler V2.""" - -from __future__ import annotations - -import unittest - -import numpy as np -from ddt import ddt -from numpy.typing import NDArray - -from qiskit import ClassicalRegister, QuantumCircuit, QuantumRegister -from qiskit.circuit import Parameter -from qiskit.circuit.library import RealAmplitudes, UnitaryGate -from qiskit.primitives import PrimitiveResult, PubResult, StatevectorSampler -from qiskit.primitives.containers import BitArray -from qiskit.primitives.containers.data_bin import DataBin -from qiskit.primitives.containers.sampler_pub import SamplerPub -from qiskit.providers import JobStatus -from qiskit.providers.backend_compat import BackendV2Converter -from qiskit.providers.basic_provider import BasicSimulator -from qiskit.providers.fake_provider import Fake7QPulseV1 -from qiskit.transpiler.preset_passmanagers import generate_preset_pass_manager -from qiskit_ibm_runtime.qiskit.primitives.backend_sampler_v2 import BackendSamplerV2 - -from ...ibm_test_case import IBMTestCase -from ...utils import combine - -BACKENDS = [BasicSimulator(), Fake7QPulseV1(), BackendV2Converter(Fake7QPulseV1())] - - -@ddt -class TestBackendSamplerV2(IBMTestCase): - """Test for BackendSamplerV2""" - - def setUp(self): - super().setUp() - self._shots = 10000 - self._seed = 123 - self._options = {"default_shots": self._shots, "seed_simulator": self._seed} - - self._cases = [] - hadamard = QuantumCircuit(1, 1, name="Hadamard") - hadamard.h(0) - hadamard.measure(0, 0) - self._cases.append((hadamard, None, {0: 5000, 1: 5000})) # case 0 - - bell = QuantumCircuit(2, name="Bell") - bell.h(0) - bell.cx(0, 1) - bell.measure_all() - self._cases.append((bell, None, {0: 5000, 3: 5000})) # case 1 - - pqc = RealAmplitudes(num_qubits=2, reps=2) - pqc.measure_all() - self._cases.append((pqc, [0] * 6, {0: 10000})) # case 2 - self._cases.append((pqc, [1] * 6, {0: 168, 1: 3389, 2: 470, 3: 5973})) # case 3 - self._cases.append((pqc, [0, 1, 1, 2, 3, 5], {0: 1339, 1: 3534, 2: 912, 3: 4215})) # case 4 - self._cases.append((pqc, [1, 2, 3, 4, 5, 6], {0: 634, 1: 291, 2: 6039, 3: 3036})) # case 5 - - pqc2 = RealAmplitudes(num_qubits=2, reps=3) - pqc2.measure_all() - self._cases.append( - (pqc2, [0, 1, 2, 3, 4, 5, 6, 7], {0: 1898, 1: 6864, 2: 928, 3: 311}) - ) # case 6 - - def _assert_allclose(self, bitarray: BitArray, target: NDArray | BitArray, rtol=1e-1, atol=5e2): - self.assertEqual(bitarray.shape, target.shape) - for idx in np.ndindex(bitarray.shape): - int_counts = bitarray.get_int_counts(idx) - target_counts = ( - target.get_int_counts(idx) if isinstance(target, BitArray) else target[idx] - ) - max_key = max( # pylint: disable=nested-min-max - max(int_counts.keys()), max(target_counts.keys()) - ) - ary = np.array([int_counts.get(i, 0) for i in range(max_key + 1)]) - tgt = np.array([target_counts.get(i, 0) for i in range(max_key + 1)]) - np.testing.assert_allclose(ary, tgt, rtol=rtol, atol=atol, err_msg=f"index: {idx}") - - @combine(backend=BACKENDS) - def test_sampler_run(self, backend): - """Test run().""" - pm = generate_preset_pass_manager(optimization_level=0, backend=backend) - - with self.subTest("single"): - bell, _, target = self._cases[1] - bell = pm.run(bell) - sampler = BackendSamplerV2(backend=backend, options=self._options) - job = sampler.run([bell], shots=self._shots) - result = job.result() - self.assertIsInstance(result, PrimitiveResult) - self.assertIsInstance(result.metadata, dict) - self.assertEqual(len(result), 1) - self.assertIsInstance(result[0], PubResult) - self.assertIsInstance(result[0].data, DataBin) - self.assertIsInstance(result[0].data.meas, BitArray) - self._assert_allclose(result[0].data.meas, np.array(target)) - - with self.subTest("single with param"): - pqc, param_vals, target = self._cases[2] - sampler = BackendSamplerV2(backend=backend, options=self._options) - pqc = pm.run(pqc) - params = (param.name for param in pqc.parameters) - job = sampler.run([(pqc, {params: param_vals})], shots=self._shots) - result = job.result() - self.assertIsInstance(result, PrimitiveResult) - self.assertIsInstance(result.metadata, dict) - self.assertEqual(len(result), 1) - self.assertIsInstance(result[0], PubResult) - self.assertIsInstance(result[0].data, DataBin) - self.assertIsInstance(result[0].data.meas, BitArray) - self._assert_allclose(result[0].data.meas, np.array(target)) - - with self.subTest("multiple"): - pqc, param_vals, target = self._cases[2] - sampler = BackendSamplerV2(backend=backend, options=self._options) - pqc = pm.run(pqc) - params = (param.name for param in pqc.parameters) - job = sampler.run( - [(pqc, {params: [param_vals, param_vals, param_vals]})], shots=self._shots - ) - result = job.result() - self.assertIsInstance(result, PrimitiveResult) - self.assertIsInstance(result.metadata, dict) - self.assertEqual(len(result), 1) - self.assertIsInstance(result[0], PubResult) - self.assertIsInstance(result[0].data, DataBin) - self.assertIsInstance(result[0].data.meas, BitArray) - self._assert_allclose(result[0].data.meas, np.array([target, target, target])) - - with self.subTest("with circuit metadata"): - sample_metadata = { - "user_metadata_field_1": "metadata_1", - "user_metadata_field_2": "metadata_2", - } - pqc, _, _ = self._cases[1] - pqc.metadata = sample_metadata - sampler = BackendSamplerV2(backend=backend, options=self._options) - pqc = pm.run(pqc) - job = sampler.run([pqc], shots=self._shots) - result = job.result() - self.assertIsInstance(result, PrimitiveResult) - self.assertIsInstance(result.metadata, dict) - self.assertEqual(len(result), 1) - self.assertIsInstance(result[0], PubResult) - self.assertIsInstance(result[0].metadata, dict) - self.assertEqual(result[0].metadata, sample_metadata) - - @combine(backend=BACKENDS) - def test_sampler_run_multiple_times(self, backend): - """Test run() returns the same results if the same input is given.""" - bell, _, _ = self._cases[1] - sampler = BackendSamplerV2(backend=backend, options=self._options) - pm = generate_preset_pass_manager(optimization_level=0, backend=backend) - bell = pm.run(bell) - result1 = sampler.run([bell], shots=self._shots).result() - meas1 = result1[0].data.meas - result2 = sampler.run([bell], shots=self._shots).result() - meas2 = result2[0].data.meas - self._assert_allclose(meas1, meas2, rtol=0) - - @combine(backend=BACKENDS) - def test_sample_run_multiple_circuits(self, backend): - """Test run() with multiple circuits.""" - bell, _, target = self._cases[1] - sampler = BackendSamplerV2(backend=backend, options=self._options) - pm = generate_preset_pass_manager(optimization_level=0, backend=backend) - bell = pm.run(bell) - result = sampler.run([bell, bell, bell], shots=self._shots).result() - self.assertEqual(len(result), 3) - self._assert_allclose(result[0].data.meas, np.array(target)) - self._assert_allclose(result[1].data.meas, np.array(target)) - self._assert_allclose(result[2].data.meas, np.array(target)) - - @combine(backend=BACKENDS) - def test_sampler_run_with_parameterized_circuits(self, backend): - """Test run() with parameterized circuits.""" - pqc1, param1, target1 = self._cases[4] - pqc2, param2, target2 = self._cases[5] - pqc3, param3, target3 = self._cases[6] - pm = generate_preset_pass_manager(optimization_level=0, backend=backend) - pqc1, pqc2, pqc3 = pm.run([pqc1, pqc2, pqc3]) - - sampler = BackendSamplerV2(backend=backend, options=self._options) - result = sampler.run( - [(pqc1, param1), (pqc2, param2), (pqc3, param3)], shots=self._shots - ).result() - self.assertEqual(len(result), 3) - self._assert_allclose(result[0].data.meas, np.array(target1)) - self._assert_allclose(result[1].data.meas, np.array(target2)) - self._assert_allclose(result[2].data.meas, np.array(target3)) - - @combine(backend=BACKENDS) - def test_run_1qubit(self, backend): - """test for 1-qubit cases""" - qc = QuantumCircuit(1) - qc.measure_all() - qc2 = QuantumCircuit(1) - qc2.x(0) - qc2.measure_all() - pm = generate_preset_pass_manager(optimization_level=0, backend=backend) - qc, qc2 = pm.run([qc, qc2]) - - sampler = BackendSamplerV2(backend=backend, options=self._options) - result = sampler.run([qc, qc2], shots=self._shots).result() - self.assertEqual(len(result), 2) - for i in range(2): - self._assert_allclose(result[i].data.meas, np.array({i: self._shots})) - - @combine(backend=BACKENDS) - def test_run_2qubit(self, backend): - """test for 2-qubit cases""" - qc0 = QuantumCircuit(2) - qc0.measure_all() - qc1 = QuantumCircuit(2) - qc1.x(0) - qc1.measure_all() - qc2 = QuantumCircuit(2) - qc2.x(1) - qc2.measure_all() - qc3 = QuantumCircuit(2) - qc3.x([0, 1]) - qc3.measure_all() - pm = generate_preset_pass_manager(optimization_level=0, backend=backend) - qc0, qc1, qc2, qc3 = pm.run([qc0, qc1, qc2, qc3]) - - sampler = BackendSamplerV2(backend=backend, options=self._options) - result = sampler.run([qc0, qc1, qc2, qc3], shots=self._shots).result() - self.assertEqual(len(result), 4) - for i in range(4): - self._assert_allclose(result[i].data.meas, np.array({i: self._shots})) - - @combine(backend=BACKENDS) - def test_run_single_circuit(self, backend): - """Test for single circuit case.""" - sampler = BackendSamplerV2(backend=backend, options=self._options) - pm = generate_preset_pass_manager(optimization_level=0, backend=backend) - - with self.subTest("No parameter"): - circuit, _, target = self._cases[1] - circuit = pm.run(circuit) - param_target = [ - (None, np.array(target)), - ({}, np.array(target)), - ] - for param, target in param_target: - with self.subTest(f"{circuit.name} w/ {param}"): - result = sampler.run([(circuit, param)], shots=self._shots).result() - self.assertEqual(len(result), 1) - self._assert_allclose(result[0].data.meas, target) - - with self.subTest("One parameter"): - circuit = QuantumCircuit(1, 1, name="X gate") - param = Parameter("x") - circuit.ry(param, 0) - circuit.measure(0, 0) - circuit = pm.run(circuit) - param_target = [ - ({"x": np.pi}, np.array({1: self._shots})), - ({param: np.pi}, np.array({1: self._shots})), - ({"x": np.array(np.pi)}, np.array({1: self._shots})), - ({param: np.array(np.pi)}, np.array({1: self._shots})), - ({"x": [np.pi]}, np.array({1: self._shots})), - ({param: [np.pi]}, np.array({1: self._shots})), - ({"x": np.array([np.pi])}, np.array({1: self._shots})), - ({param: np.array([np.pi])}, np.array({1: self._shots})), - ] - for param, target in param_target: - with self.subTest(f"{circuit.name} w/ {param}"): - result = sampler.run([(circuit, param)], shots=self._shots).result() - self.assertEqual(len(result), 1) - self._assert_allclose(result[0].data.c, target) - - with self.subTest("More than one parameter"): - circuit, param, target = self._cases[3] - circuit = pm.run(circuit) - param_target = [ - (param, np.array(target)), - (tuple(param), np.array(target)), - (np.array(param), np.array(target)), - ((param,), np.array([target])), - ([param], np.array([target])), - (np.array([param]), np.array([target])), - ] - for param, target in param_target: - with self.subTest(f"{circuit.name} w/ {param}"): - result = sampler.run([(circuit, param)], shots=self._shots).result() - self.assertEqual(len(result), 1) - self._assert_allclose(result[0].data.meas, target) - - @combine(backend=BACKENDS) - def test_run_reverse_meas_order(self, backend): - """test for sampler with reverse measurement order""" - x = Parameter("x") - y = Parameter("y") - - qc = QuantumCircuit(3, 3) - qc.rx(x, 0) - qc.rx(y, 1) - qc.x(2) - qc.measure(0, 2) - qc.measure(1, 1) - qc.measure(2, 0) - pm = generate_preset_pass_manager(optimization_level=0, backend=backend) - qc = pm.run(qc) - - sampler = BackendSamplerV2(backend=backend) - sampler.options.seed_simulator = self._seed - result = sampler.run([(qc, [0, 0]), (qc, [np.pi / 2, 0])], shots=self._shots).result() - self.assertEqual(len(result), 2) - - # qc({x: 0, y: 0}) - self._assert_allclose(result[0].data.c, np.array({1: self._shots})) - - # qc({x: pi/2, y: 0}) - self._assert_allclose(result[1].data.c, np.array({1: self._shots / 2, 5: self._shots / 2})) - - @combine(backend=BACKENDS) - def test_run_errors(self, backend): - """Test for errors with run method""" - qc1 = QuantumCircuit(1) - qc1.measure_all() - qc2 = RealAmplitudes(num_qubits=1, reps=1) - qc2.measure_all() - pm = generate_preset_pass_manager(optimization_level=0, backend=backend) - qc1, qc2 = pm.run([qc1, qc2]) - - sampler = BackendSamplerV2(backend=backend) - with self.subTest("set parameter values to a non-parameterized circuit"): - with self.assertRaises(ValueError): - _ = sampler.run([(qc1, [1e2])]).result() - with self.subTest("missing all parameter values for a parameterized circuit"): - with self.assertRaises(ValueError): - _ = sampler.run([qc2]).result() - with self.assertRaises(ValueError): - _ = sampler.run([(qc2, [])]).result() - with self.assertRaises(ValueError): - _ = sampler.run([(qc2, None)]).result() - with self.subTest("missing some parameter values for a parameterized circuit"): - with self.assertRaises(ValueError): - _ = sampler.run([(qc2, [1e2])]).result() - with self.subTest("too many parameter values for a parameterized circuit"): - with self.assertRaises(ValueError): - _ = sampler.run([(qc2, [1e2] * 100)]).result() - with self.subTest("negative shots, run arg"): - with self.assertRaises(ValueError): - _ = sampler.run([qc1], shots=-1).result() - with self.subTest("negative shots, pub-like"): - with self.assertRaises(ValueError): - _ = sampler.run([(qc1, None, -1)]).result() - with self.subTest("negative shots, pub"): - with self.assertRaises(ValueError): - _ = sampler.run([SamplerPub(qc1, shots=-1)]).result() - with self.subTest("zero shots, run arg"): - with self.assertRaises(ValueError): - _ = sampler.run([qc1], shots=0).result() - with self.subTest("zero shots, pub-like"): - with self.assertRaises(ValueError): - _ = sampler.run([(qc1, None, 0)]).result() - with self.subTest("zero shots, pub"): - with self.assertRaises(ValueError): - _ = sampler.run([SamplerPub(qc1, shots=0)]).result() - - @combine(backend=BACKENDS) - def test_run_empty_parameter(self, backend): - """Test for empty parameter""" - n = 5 - qc = QuantumCircuit(n, n - 1) - qc.measure(range(n - 1), range(n - 1)) - pm = generate_preset_pass_manager(optimization_level=0, backend=backend) - qc = pm.run(qc) - sampler = BackendSamplerV2(backend=backend, options=self._options) - with self.subTest("one circuit"): - result = sampler.run([qc], shots=self._shots).result() - self.assertEqual(len(result), 1) - self._assert_allclose(result[0].data.c, np.array({0: self._shots})) - - with self.subTest("two circuits"): - result = sampler.run([qc, qc], shots=self._shots).result() - self.assertEqual(len(result), 2) - for i in range(2): - self._assert_allclose(result[i].data.c, np.array({0: self._shots})) - - @combine(backend=BACKENDS) - def test_run_numpy_params(self, backend): - """Test for numpy array as parameter values""" - qc = RealAmplitudes(num_qubits=2, reps=2) - qc.measure_all() - pm = generate_preset_pass_manager(optimization_level=0, backend=backend) - qc = pm.run(qc) - k = 5 - params_array = np.linspace(0, 1, k * qc.num_parameters).reshape((k, qc.num_parameters)) - params_list = params_array.tolist() - sampler = StatevectorSampler(seed=self._seed) - target = sampler.run([(qc, params_list)], shots=self._shots).result() - - with self.subTest("ndarray"): - sampler = BackendSamplerV2(backend=backend, options=self._options) - result = sampler.run([(qc, params_array)], shots=self._shots).result() - self.assertEqual(len(result), 1) - self._assert_allclose(result[0].data.meas, target[0].data.meas) - - with self.subTest("split a list"): - sampler = BackendSamplerV2(backend=backend, options=self._options) - result = sampler.run( - [(qc, params) for params in params_list], shots=self._shots - ).result() - self.assertEqual(len(result), k) - for i in range(k): - self._assert_allclose( - result[i].data.meas, np.array(target[0].data.meas.get_int_counts(i)) - ) - - @combine(backend=BACKENDS) - def test_run_with_shots_option(self, backend): - """test with shots option.""" - bell, _, _ = self._cases[1] - pm = generate_preset_pass_manager(optimization_level=0, backend=backend) - bell = pm.run(bell) - shots = 100 - - with self.subTest("run arg"): - sampler = BackendSamplerV2(backend=backend, options=self._options) - result = sampler.run([bell], shots=shots).result() - self.assertEqual(len(result), 1) - self.assertEqual(result[0].data.meas.num_shots, shots) - self.assertEqual(sum(result[0].data.meas.get_counts().values()), shots) - - with self.subTest("default shots"): - sampler = BackendSamplerV2(backend=backend, options=self._options) - default_shots = sampler.options.default_shots - result = sampler.run([bell]).result() - self.assertEqual(len(result), 1) - self.assertEqual(result[0].data.meas.num_shots, default_shots) - self.assertEqual(sum(result[0].data.meas.get_counts().values()), default_shots) - - with self.subTest("setting default shots"): - default_shots = 100 - sampler = BackendSamplerV2(backend=backend, options=self._options) - sampler.options.default_shots = default_shots - self.assertEqual(sampler.options.default_shots, default_shots) - result = sampler.run([bell]).result() - self.assertEqual(len(result), 1) - self.assertEqual(result[0].data.meas.num_shots, default_shots) - self.assertEqual(sum(result[0].data.meas.get_counts().values()), default_shots) - - with self.subTest("pub-like"): - sampler = BackendSamplerV2(backend=backend, options=self._options) - result = sampler.run([(bell, None, shots)]).result() - self.assertEqual(len(result), 1) - self.assertEqual(result[0].data.meas.num_shots, shots) - self.assertEqual(sum(result[0].data.meas.get_counts().values()), shots) - - with self.subTest("pub"): - sampler = BackendSamplerV2(backend=backend, options=self._options) - result = sampler.run([SamplerPub(bell, shots=shots)]).result() - self.assertEqual(len(result), 1) - self.assertEqual(result[0].data.meas.num_shots, shots) - self.assertEqual(sum(result[0].data.meas.get_counts().values()), shots) - - with self.subTest("multiple pubs"): - sampler = BackendSamplerV2(backend=backend, options=self._options) - shots1 = 100 - shots2 = 200 - result = sampler.run( - [ - SamplerPub(bell, shots=shots1), - SamplerPub(bell, shots=shots2), - ], - shots=self._shots, - ).result() - self.assertEqual(len(result), 2) - self.assertEqual(result[0].data.meas.num_shots, shots1) - self.assertEqual(sum(result[0].data.meas.get_counts().values()), shots1) - self.assertEqual(result[1].data.meas.num_shots, shots2) - self.assertEqual(sum(result[1].data.meas.get_counts().values()), shots2) - - @combine(backend=BACKENDS) - def test_run_shots_result_size(self, backend): - """test with shots option to validate the result size""" - n = 7 # should be less than or equal to the number of qubits of backend - qc = QuantumCircuit(n) - qc.h(range(n)) - qc.measure_all() - pm = generate_preset_pass_manager(optimization_level=0, backend=backend) - qc = pm.run(qc) - sampler = BackendSamplerV2(backend=backend, options=self._options) - result = sampler.run([qc], shots=self._shots).result() - self.assertEqual(len(result), 1) - self.assertLessEqual(result[0].data.meas.num_shots, self._shots) - self.assertEqual(sum(result[0].data.meas.get_counts().values()), self._shots) - - @combine(backend=BACKENDS) - def test_primitive_job_status_done(self, backend): - """test primitive job's status""" - bell, _, _ = self._cases[1] - pm = generate_preset_pass_manager(optimization_level=0, backend=backend) - bell = pm.run(bell) - sampler = BackendSamplerV2(backend=backend, options=self._options) - job = sampler.run([bell], shots=self._shots) - _ = job.result() - self.assertEqual(job.status(), JobStatus.DONE) - - @combine(backend=BACKENDS) - def test_circuit_with_unitary(self, backend): - """Test for circuit with unitary gate.""" - pm = generate_preset_pass_manager(optimization_level=0, backend=backend) - - with self.subTest("identity"): - gate = UnitaryGate(np.eye(2)) - - circuit = QuantumCircuit(1) - circuit.append(gate, [0]) - circuit.measure_all() - circuit = pm.run(circuit) - - sampler = BackendSamplerV2(backend=backend, options=self._options) - result = sampler.run([circuit], shots=self._shots).result() - self.assertEqual(len(result), 1) - self._assert_allclose(result[0].data.meas, np.array({0: self._shots})) - - with self.subTest("X"): - gate = UnitaryGate([[0, 1], [1, 0]]) - - circuit = QuantumCircuit(1) - circuit.append(gate, [0]) - circuit.measure_all() - circuit = pm.run(circuit) - - sampler = BackendSamplerV2(backend=backend, options=self._options) - result = sampler.run([circuit], shots=self._shots).result() - self.assertEqual(len(result), 1) - self._assert_allclose(result[0].data.meas, np.array({1: self._shots})) - - @combine(backend=BACKENDS) - def test_circuit_with_multiple_cregs(self, backend): - """Test for circuit with multiple classical registers.""" - pm = generate_preset_pass_manager(optimization_level=0, backend=backend) - cases = [] - - # case 1 - a = ClassicalRegister(1, "a") - b = ClassicalRegister(2, "b") - c = ClassicalRegister(3, "c") - - qc = QuantumCircuit(QuantumRegister(3), a, b, c) - qc.h(range(3)) - qc.measure([0, 1, 2, 2], [0, 2, 4, 5]) - qc = pm.run(qc) - target = {"a": {0: 5000, 1: 5000}, "b": {0: 5000, 2: 5000}, "c": {0: 5000, 6: 5000}} - cases.append(("use all cregs", qc, target)) - - # case 2 - a = ClassicalRegister(1, "a") - b = ClassicalRegister(5, "b") - c = ClassicalRegister(3, "c") - - qc = QuantumCircuit(QuantumRegister(3), a, b, c) - qc.h(range(3)) - qc.measure([0, 1, 2, 2], [0, 2, 4, 5]) - qc = pm.run(qc) - target = { - "a": {0: 5000, 1: 5000}, - "b": {0: 2500, 2: 2500, 24: 2500, 26: 2500}, - "c": {0: 10000}, - } - cases.append(("use only a and b", qc, target)) - - # case 3 - a = ClassicalRegister(1, "a") - b = ClassicalRegister(2, "b") - c = ClassicalRegister(3, "c") - - qc = QuantumCircuit(QuantumRegister(3), a, b, c) - qc.h(range(3)) - qc.measure(1, 5) - qc = pm.run(qc) - target = {"a": {0: 10000}, "b": {0: 10000}, "c": {0: 5000, 4: 5000}} - cases.append(("use only c", qc, target)) - - # case 4 - a = ClassicalRegister(1, "a") - b = ClassicalRegister(2, "b") - c = ClassicalRegister(3, "c") - - qc = QuantumCircuit(QuantumRegister(3), a, b, c) - qc.h(range(3)) - qc.measure([0, 1, 2], [5, 5, 5]) - qc = pm.run(qc) - target = {"a": {0: 10000}, "b": {0: 10000}, "c": {0: 5000, 4: 5000}} - cases.append(("use only c multiple qubits", qc, target)) - - # case 5 - a = ClassicalRegister(1, "a") - b = ClassicalRegister(2, "b") - c = ClassicalRegister(3, "c") - - qc = QuantumCircuit(QuantumRegister(3), a, b, c) - qc.h(range(3)) - qc = pm.run(qc) - target = {"a": {0: 10000}, "b": {0: 10000}, "c": {0: 10000}} - cases.append(("no measure", qc, target)) - - for title, qc, target in cases: - with self.subTest(title): - sampler = BackendSamplerV2(backend=backend, options=self._options) - result = sampler.run([qc], shots=self._shots).result() - self.assertEqual(len(result), 1) - data = result[0].data - self.assertEqual(len(data._FIELDS), 3) - for creg in qc.cregs: - self.assertTrue(hasattr(data, creg.name)) - self._assert_allclose(getattr(data, creg.name), np.array(target[creg.name])) - - @combine(backend=BACKENDS) - def test_circuit_with_aliased_cregs(self, backend): - """Test for circuit with aliased classical registers.""" - q = QuantumRegister(3, "q") - c1 = ClassicalRegister(1, "c1") - c2 = ClassicalRegister(1, "c2") - - qc = QuantumCircuit(q, c1, c2) - qc.ry(np.pi / 4, 2) - qc.cx(2, 1) - qc.cx(0, 1) - qc.h(0) - qc.measure(0, c1) - qc.measure(1, c2) - qc.z(2).c_if(c1, 1) - qc.x(2).c_if(c2, 1) - qc2 = QuantumCircuit(5, 5) - qc2.compose(qc, [0, 2, 3], [2, 4], inplace=True) - cregs = [creg.name for creg in qc2.cregs] - target = { - cregs[0]: {0: 4255, 4: 4297, 16: 720, 20: 726}, - cregs[1]: {0: 5000, 1: 5000}, - cregs[2]: {0: 8500, 1: 1500}, - } - - sampler = BackendSamplerV2(backend=backend, options=self._options) - pm = generate_preset_pass_manager(optimization_level=0, backend=backend) - qc2 = pm.run(qc2) - result = sampler.run([qc2], shots=self._shots).result() - self.assertEqual(len(result), 1) - data = result[0].data - self.assertEqual(len(data._FIELDS), 3) - for creg_name in target: # pylint: disable=consider-using-dict-items - self.assertTrue(hasattr(data, creg_name)) - self._assert_allclose(getattr(data, creg_name), np.array(target[creg_name])) - - @combine(backend=BACKENDS) - def test_no_cregs(self, backend): - """Test that the sampler works when there are no classical register in the circuit.""" - qc = QuantumCircuit(2) - sampler = BackendSamplerV2(backend=backend, options=self._options) - with self.assertWarns(UserWarning): - result = sampler.run([qc]).result() - - self.assertEqual(len(result), 1) - self.assertEqual(len(result[0].data), 0) - - @combine(backend=BACKENDS) - def test_empty_creg(self, backend): - """Test that the sampler works if provided a classical register with no bits.""" - # Test case for issue #12043 - q = QuantumRegister(1, "q") - c1 = ClassicalRegister(0, "c1") - c2 = ClassicalRegister(1, "c2") - qc = QuantumCircuit(q, c1, c2) - qc.h(0) - qc.measure(0, 0) - - sampler = BackendSamplerV2(backend=backend, options=self._options) - result = sampler.run([qc], shots=self._shots).result() - self.assertEqual(result[0].data.c1.array.shape, (self._shots, 0)) - - -if __name__ == "__main__": - unittest.main()