diff --git a/insteon_mqtt/device/base/Base.py b/insteon_mqtt/device/base/Base.py index b3b49909..07c6c686 100644 --- a/insteon_mqtt/device/base/Base.py +++ b/insteon_mqtt/device/base/Base.py @@ -157,6 +157,9 @@ def __init__(self, protocol, modem, address, name=None, config_extra=None): # scene() for details. self.broadcast_reason = "" + # Used to set proper level when scene command is used + self.broadcast_scene_level = {"timestamp": 0, "level": 0} + # Used for internally tracking the device state self._is_on = False self._level = 0x00 diff --git a/insteon_mqtt/device/base/DimmerBase.py b/insteon_mqtt/device/base/DimmerBase.py index d256ef9f..eea8c792 100644 --- a/insteon_mqtt/device/base/DimmerBase.py +++ b/insteon_mqtt/device/base/DimmerBase.py @@ -9,6 +9,7 @@ # #=========================================================================== import functools +import time from .ResponderBase import ResponderBase from .Base import Base from ..functions import ManualCtrl @@ -319,6 +320,14 @@ def derive_on_level(self, mode): # Pressing on again when already at the default on # level causes the device to go to full-brightness. level = 0xff + + # If on was set using a scene command in the last 5 seconds, then + # set to level requested in the scene command + if self.broadcast_scene_level['timestamp'] + 5 >= time.time(): + level = self.broadcast_scene_level['level'] + + # No matter what, clear the scene level + self.broadcast_scene_level['timestamp'] = 0 return level #----------------------------------------------------------------------- diff --git a/insteon_mqtt/device/functions/Scene.py b/insteon_mqtt/device/functions/Scene.py index 339ab3c7..b871175d 100644 --- a/insteon_mqtt/device/functions/Scene.py +++ b/insteon_mqtt/device/functions/Scene.py @@ -112,6 +112,16 @@ def our_on_done(success, msg, data): # this is a reasonable guess, that will be overwritten by the # wait time calculated by the arriving broadcast message. self.protocol.set_wait_time(time.time() + 1) + + # If level was specified, then Save the level value so it can + # be used when the broadcast message is received. + if is_on and use_on_level == 0x01: + self.broadcast_scene_level = {"timestamp": time.time(), + "level": on_level} + else: + # Clear saved level + self.broadcast_scene_level['timestamp'] = 0 + # Reason is device because we're simulating a button press. # We can't really pass this around because we just get a # broadcast message later from the device. So we set a diff --git a/tests/device/functions/test_Scene.py b/tests/device/functions/test_Scene.py new file mode 100644 index 00000000..63143910 --- /dev/null +++ b/tests/device/functions/test_Scene.py @@ -0,0 +1,76 @@ +#=========================================================================== +# +# Tests for: insteont_mqtt/device/Dimmer.py +# +#=========================================================================== +import pytest +import time +# from pprint import pprint +from unittest import mock +from unittest.mock import call +import insteon_mqtt as IM +import insteon_mqtt.device.Dimmer as Dimmer +import insteon_mqtt.message as Msg +import insteon_mqtt.util as util +import helpers as H + +@pytest.fixture +def test_device(tmpdir): + ''' + Returns a generically configured dimmer for testing + ''' + protocol = H.main.MockProtocol() + modem = H.main.MockModem(tmpdir) + addr = IM.Address(0x01, 0x02, 0x03) + device = Dimmer(protocol, modem, addr) + return device + + +class Test_Scene_Function(): + @pytest.mark.parametrize("is_on,group,level", [ + (True, 0x01, None), + (True, None, None), + (True, 0x01, 128), + (False, None, None), + ]) + def test_scene(self, test_device, is_on, group, level): + test_device.scene(is_on, group=group, level=level) + + # test the message contents + if group is None: + group = 0x01 + assert len(test_device.protocol.sent) == 1 + assert test_device.protocol.sent[0].msg.cmd1 == 0x30 + assert test_device.protocol.sent[0].msg.cmd2 == 0x00 + assert test_device.protocol.sent[0].msg.data[0] == group # group + if level is None: + # don't use_on_level + if is_on: + assert test_device.protocol.sent[0].msg.data[1] == 0x00 + else: + # off always 0x01 + assert test_device.protocol.sent[0].msg.data[1] == 0x01 + assert test_device.protocol.sent[0].msg.data[2] == 0x00 + else: + # use_on_level + assert test_device.protocol.sent[0].msg.data[1] == 0x01 + assert test_device.protocol.sent[0].msg.data[2] == int(level) + if is_on: + assert test_device.protocol.sent[0].msg.data[3] == 0x11 + else: + assert test_device.protocol.sent[0].msg.data[3] == 0x13 + assert test_device.protocol.sent[0].msg.data[4] == 0x01 + assert test_device.protocol.sent[0].msg.data[5] == 0x00 + + # Test the broadcast_scene_level contents + test_device.protocol.sent[0].handler.on_done(True, None, None) + scene_timestamp = test_device.broadcast_scene_level['timestamp'] + 1 + scene_level = test_device.broadcast_scene_level['level'] + if not is_on: + assert scene_timestamp == 1 + elif level is None: + assert scene_timestamp == 1 + assert scene_level == 0 + else: + assert scene_timestamp >= time.time() + assert scene_level == level