-
Notifications
You must be signed in to change notification settings - Fork 10
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge branch 'main' into 927_common_filter_wheel
- Loading branch information
Showing
3 changed files
with
305 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,140 @@ | ||
import asyncio | ||
from collections.abc import Mapping | ||
from typing import Annotated as A | ||
|
||
from bluesky.protocols import Movable | ||
from ophyd_async.core import ( | ||
DEFAULT_TIMEOUT, | ||
AsyncStatus, | ||
DeviceVector, | ||
SignalR, | ||
SignalRW, | ||
SignalW, | ||
StandardReadable, | ||
StrictEnum, | ||
wait_for_value, | ||
) | ||
from ophyd_async.core import StandardReadableFormat as Format | ||
from ophyd_async.epics.core import ( | ||
EpicsDevice, | ||
PvSuffix, | ||
epics_signal_r, | ||
epics_signal_w, | ||
epics_signal_x, | ||
) | ||
|
||
|
||
class BimorphMirrorOnOff(StrictEnum): | ||
ON = "ON" | ||
OFF = "OFF" | ||
|
||
|
||
class BimorphMirrorMode(StrictEnum): | ||
HI = "HI" | ||
NORMAL = "NORMAL" | ||
FAST = "FAST" | ||
|
||
|
||
class BimorphMirrorStatus(StrictEnum): | ||
IDLE = "Idle" | ||
BUSY = "Busy" | ||
ERROR = "Error" | ||
|
||
|
||
class BimorphMirrorChannel(StandardReadable, EpicsDevice): | ||
"""Collection of PVs comprising a single bimorph channel. | ||
Attributes: | ||
target_voltage: Float RW_RBV for target voltage, which can be set using parent mirror's all target proc | ||
output_voltage: Float RW_RBV for current voltage on bimorph | ||
status: BimorphMirrorOnOff readable for ON/OFF status of channel | ||
shift: Float writeable shifting channel voltage | ||
""" | ||
|
||
target_voltage: A[SignalRW[float], PvSuffix.rbv("VTRGT"), Format.CONFIG_SIGNAL] | ||
output_voltage: A[SignalRW[float], PvSuffix.rbv("VOUT"), Format.HINTED_SIGNAL] | ||
status: A[SignalR[BimorphMirrorOnOff], PvSuffix("STATUS"), Format.CONFIG_SIGNAL] | ||
shift: A[SignalW[float], PvSuffix("SHIFT")] | ||
|
||
|
||
class BimorphMirror(StandardReadable, Movable): | ||
"""Class to represent CAENels Bimorph Mirrors. | ||
Attributes: | ||
channels: DeviceVector of BimorphMirrorChannel, indexed from 1, for each channel | ||
enabled: Writeable BimorphOnOff | ||
commit_target_voltages: Procable signal that writes values in each channel's VTRGT to VOUT | ||
status: Readable BimorphMirrorStatus Busy/Idle status | ||
err: Alarm status""" | ||
|
||
def __init__(self, prefix: str, number_of_channels: int, name=""): | ||
""" | ||
Args: | ||
prefix: str PV prefix | ||
number_of_channels: int number of channels on bimorph mirror (can be zero) | ||
name: str name of device | ||
Raises: | ||
ValueError: number_of_channels is less than zero""" | ||
|
||
if number_of_channels < 0: | ||
raise ValueError(f"Number of channels is below zero: {number_of_channels}") | ||
|
||
with self.add_children_as_readables(): | ||
self.channels = DeviceVector( | ||
{ | ||
i: BimorphMirrorChannel(f"{prefix}C{i}:") | ||
for i in range(1, number_of_channels + 1) | ||
} | ||
) | ||
self.enabled = epics_signal_w(BimorphMirrorOnOff, f"{prefix}ONOFF") | ||
self.commit_target_voltages = epics_signal_x(f"{prefix}ALLTRGT.PROC") | ||
self.status = epics_signal_r(BimorphMirrorStatus, f"{prefix}STATUS") | ||
self.err = epics_signal_r(str, f"{prefix}ERR") | ||
|
||
super().__init__(name=name) | ||
|
||
@AsyncStatus.wrap | ||
async def set(self, value: Mapping[int, float], tolerance: float = 0.0001) -> None: | ||
"""Sets bimorph voltages in parrallel via target voltage and all proc. | ||
Args: | ||
value: Dict of channel numbers to target voltages | ||
Raises: | ||
ValueError: On set to non-existent channel""" | ||
|
||
if any(key not in self.channels for key in value): | ||
raise ValueError( | ||
f"Attempting to put to non-existent channels: {[key for key in value if (key not in self.channels)]}" | ||
) | ||
|
||
# Write target voltages: | ||
await asyncio.gather( | ||
*[ | ||
self.channels[i].target_voltage.set(target, wait=True) | ||
for i, target in value.items() | ||
] | ||
) | ||
|
||
# Trigger set target voltages: | ||
await self.commit_target_voltages.trigger() | ||
|
||
# Wait for values to propogate to voltage out rbv: | ||
await asyncio.gather( | ||
*[ | ||
wait_for_value( | ||
self.channels[i].output_voltage, | ||
tolerance_func_builder(tolerance, target), | ||
timeout=DEFAULT_TIMEOUT, | ||
) | ||
for i, target in value.items() | ||
] | ||
) | ||
|
||
|
||
def tolerance_func_builder(tolerance: float, target_value: float): | ||
def is_within_value(x): | ||
return abs(x - target_value) <= tolerance | ||
|
||
return is_within_value |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,150 @@ | ||
from unittest.mock import ANY, call, patch | ||
|
||
import pytest | ||
from bluesky.run_engine import RunEngine | ||
from ophyd_async.core import DeviceCollector | ||
from ophyd_async.testing import get_mock_put | ||
|
||
from dodal.devices.bimorph_mirror import BimorphMirror | ||
|
||
VALID_BIMORPH_CHANNELS = [8, 12, 16, 24] | ||
|
||
|
||
@pytest.fixture | ||
def mirror(request, RE: RunEngine) -> BimorphMirror: | ||
number_of_channels = request.param | ||
|
||
with DeviceCollector(mock=True): | ||
bm = BimorphMirror( | ||
prefix="FAKE-PREFIX:", | ||
number_of_channels=number_of_channels, | ||
) | ||
|
||
return bm | ||
|
||
|
||
@pytest.fixture | ||
def valid_bimorph_values(mirror: BimorphMirror) -> dict[int, float]: | ||
return {i: float(i) for i in range(1, len(mirror.channels) + 1)} | ||
|
||
|
||
@pytest.fixture | ||
def mock_vtrgt_vout_propogation(mirror: BimorphMirror): | ||
for channel in mirror.channels.values(): | ||
|
||
def effect(value: float, wait=False, signal=channel.output_voltage): | ||
signal.set(value, wait=wait) | ||
|
||
get_mock_put(channel.target_voltage).side_effect = effect | ||
|
||
|
||
@pytest.mark.parametrize("mirror", VALID_BIMORPH_CHANNELS, indirect=True) | ||
async def test_set_channels_waits_for_readback( | ||
mirror: BimorphMirror, | ||
valid_bimorph_values: dict[int, float], | ||
mock_vtrgt_vout_propogation, | ||
): | ||
await mirror.set(valid_bimorph_values) | ||
|
||
assert { | ||
key: await mirror.channels[key].target_voltage.get_value() | ||
for key in valid_bimorph_values | ||
} == valid_bimorph_values | ||
|
||
|
||
@pytest.mark.parametrize("mirror", VALID_BIMORPH_CHANNELS, indirect=True) | ||
async def test_set_channels_triggers_alltrgt_proc( | ||
mirror: BimorphMirror, | ||
valid_bimorph_values: dict[int, float], | ||
mock_vtrgt_vout_propogation, | ||
): | ||
mock_alltrgt_proc = get_mock_put(mirror.commit_target_voltages) | ||
|
||
mock_alltrgt_proc.assert_not_called() | ||
|
||
await mirror.set(valid_bimorph_values) | ||
|
||
mock_alltrgt_proc.assert_called_once() | ||
|
||
|
||
@pytest.mark.parametrize("mirror", VALID_BIMORPH_CHANNELS, indirect=True) | ||
async def test_set_channels_waits_for_vout_readback( | ||
mirror: BimorphMirror, | ||
valid_bimorph_values: dict[int, float], | ||
mock_vtrgt_vout_propogation, | ||
): | ||
with patch("dodal.devices.bimorph_mirror.wait_for_value") as mock_wait_for_value: | ||
mock_wait_for_value.assert_not_called() | ||
|
||
await mirror.set(valid_bimorph_values) | ||
|
||
assert [ | ||
call(mirror.channels[i].output_voltage, ANY, timeout=ANY) | ||
for i, val in valid_bimorph_values.items() | ||
] == mock_wait_for_value.call_args_list | ||
|
||
|
||
@pytest.mark.parametrize("mirror", VALID_BIMORPH_CHANNELS, indirect=True) | ||
async def test_set_channels_allows_tolerance( | ||
mirror: BimorphMirror, | ||
valid_bimorph_values: dict[int, float], | ||
): | ||
for channel in mirror.channels.values(): | ||
|
||
def out_by_a_little(value: float, wait=False, signal=channel.output_voltage): | ||
signal.set(value + 0.00001, wait=wait) | ||
|
||
get_mock_put(channel.target_voltage).side_effect = out_by_a_little | ||
|
||
await mirror.set(valid_bimorph_values) | ||
|
||
|
||
@pytest.mark.parametrize("mirror", VALID_BIMORPH_CHANNELS, indirect=True) | ||
async def test_set_one_channel(mirror: BimorphMirror, mock_vtrgt_vout_propogation): | ||
values = {1: 1} | ||
|
||
await mirror.set(values) | ||
|
||
read = await mirror.read() | ||
|
||
assert [ | ||
await mirror.channels[key].target_voltage.get_value() for key in values | ||
] == list(values) | ||
|
||
assert [ | ||
read[f"{mirror.name}-channels-{key}-output_voltage"]["value"] for key in values | ||
] == list(values) | ||
|
||
|
||
@pytest.mark.parametrize("mirror", VALID_BIMORPH_CHANNELS, indirect=True) | ||
async def test_read( | ||
mirror: BimorphMirror, | ||
valid_bimorph_values: dict[int, float], | ||
mock_vtrgt_vout_propogation, | ||
): | ||
await mirror.set(valid_bimorph_values) | ||
|
||
read = await mirror.read() | ||
|
||
assert [ | ||
read[f"{mirror.name}-channels-{i}-output_voltage"]["value"] | ||
for i in range(1, len(mirror.channels) + 1) | ||
] == list(valid_bimorph_values.values()) | ||
|
||
|
||
@pytest.mark.parametrize("mirror", VALID_BIMORPH_CHANNELS, indirect=True) | ||
async def test_set_invalid_channel_throws_error(mirror: BimorphMirror): | ||
with pytest.raises(ValueError): | ||
await mirror.set({len(mirror.channels) + 1: 0.0}) | ||
|
||
|
||
@pytest.mark.parametrize("number_of_channels", [-1]) | ||
async def test_init_mirror_with_invalid_channels_throws_error(number_of_channels): | ||
with pytest.raises(ValueError): | ||
BimorphMirror(prefix="FAKE-PREFIX:", number_of_channels=number_of_channels) | ||
|
||
|
||
@pytest.mark.parametrize("number_of_channels", [0]) | ||
async def test_init_mirror_with_zero_channels(number_of_channels): | ||
mirror = BimorphMirror(prefix="FAKE-PREFIX", number_of_channels=number_of_channels) | ||
assert len(mirror.channels) == 0 |