diff --git a/src/deepsparse/v2/schedulers/__init__.py b/src/deepsparse/v2/schedulers/__init__.py index 04c37077e1..b4d78521ab 100644 --- a/src/deepsparse/v2/schedulers/__init__.py +++ b/src/deepsparse/v2/schedulers/__init__.py @@ -1,4 +1,5 @@ # flake8: noqa +# isort: skip_file # Copyright (c) 2021 - present / Neuralmagic, Inc. All Rights Reserved. # @@ -16,3 +17,4 @@ from .scheduler import * from .scheduler_group import * +from .continuous_batching_scheduler import * diff --git a/src/deepsparse/v2/schedulers/continuous_batching_scheduler.py b/src/deepsparse/v2/schedulers/continuous_batching_scheduler.py new file mode 100644 index 0000000000..96e0a502b6 --- /dev/null +++ b/src/deepsparse/v2/schedulers/continuous_batching_scheduler.py @@ -0,0 +1,141 @@ +# Copyright (c) 2021 - present / Neuralmagic, Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from concurrent.futures import Future +from threading import Lock +from typing import List + +from deepsparse.v2.operators import EngineOperator, Operator +from deepsparse.v2.schedulers.scheduler import OperatorScheduler +from deepsparse.v2.schedulers.utils import ( + ContinuousBatchingExecutorThread, + ContinuousBatchingQueues, +) + + +__all__ = ["ContinuousBatchingScheduler"] + + +class ContinuousBatchingScheduler(OperatorScheduler): + """ + Manages EngineOperator jobs that should be run with continuous batching. + Groups requests for the same engine into larger batches and returns + the result to the respeictive request threads after scheduled completion + + :param max_workers: maximum number of threads to execute at once, default 1 + """ + + def __init__(self, max_workers: int = 1): + self._max_workers = max_workers + + self._mutex = Lock() + + # Dict[EngineOperator, Dict[batch_size, Engine]] + self._operators_to_engines = {} # EngineOperator -> Dict[batch_size, Engine] + self._queues = ContinuousBatchingQueues() + + # create and start max number of worker threads + self._threads = [ + ContinuousBatchingExecutorThread(self._queues, self._operators_to_engines) + for _ in range(self.max_workers) + ] + for worker_thread in self._threads: + worker_thread.start() + + @property + def max_workers(self) -> int: + """ + :return: maximum number of threads to execute at once + """ + return self._max_workers + + def submit(self, *args, operator: Operator, **kwargs) -> Future: + """ + :param operator: operator to run + :param operator_input: input schema to the operator + :return: future referencing the asynchronously run output of the operator + """ + inputs = args[0] + if not isinstance(inputs, operator.input_schema): + raise ValueError( + "Inputs to ContinuousBatchingScheduler must be the specific " + f"input schema to the given operator. Expected {operator.input_schema}" + f"found {type(inputs)}" + ) + + future = Future() + self._queues.add_queue_item(key=operator, item=inputs, future=future) + + return future + + def can_process(self, *args, operator: Operator, **kwargs) -> bool: + """ + :param operator: operator to check + :param operator_input: operator_input to check + :return: True if this Operator can process the given operator and input. + SchedulerGroup always returns True + """ + return operator in self._operators_to_engines and operator in self._queues + + def add_engine_operator( + self, engine_operator: EngineOperator, batch_sizes: List[int] + ): + """ + Adds tracking for an engine operator to this scheduler + with continuous batching for the given sizes + + :param engine_operator: an EngineOperator, must be compiled with + batch_size=1 + :param batch_sizes: batch sizes to use for continuous batching + """ + # lock updates to _operators_to_engines while updating + self._mutex.acquire() + + # validation + if engine_operator in self._operators_to_engines: + # operator already added + return + + if not isinstance(engine_operator, EngineOperator): + raise ValueError( + f"Expected an EngineOperator instance, found {type(engine_operator)}" + ) + if engine_operator.batch_size != 1: + raise ValueError( + "For continuous batching, EngineOperator must have batch_size=1. " + f"found batch_size={engine_operator.batch_size}" + ) + + # build EngineOperator -> List[batch_size] dict + operator_engines = {} + # base engine, expected batch size is 1 + operator_engines[engine_operator.batch_size] = engine_operator.engine + + # compile auxillary engines for continuous batching + for batch_size in batch_sizes: + if batch_size == 1: + continue # already added + operator_engines[batch_size] = operator_engines.create_engine( + batch_size=batch_size + ) + + self._operators_to_engines[engine_operator] = operator_engines + self._queues.add_queue( + key=engine_operator, + batch_sizes=list(operator_engines.keys()), + ) + + # release lock + self._mutex.release() diff --git a/tests/deepsparse/v2/schedulers/test_continuous_batching_scheduler.py b/tests/deepsparse/v2/schedulers/test_continuous_batching_scheduler.py new file mode 100644 index 0000000000..7ed49de004 --- /dev/null +++ b/tests/deepsparse/v2/schedulers/test_continuous_batching_scheduler.py @@ -0,0 +1,48 @@ +# Copyright (c) 2021 - present / Neuralmagic, Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from concurrent.futures import Future + +import numpy + +from deepsparse.v2.operators import EngineOperator +from deepsparse.v2.schedulers import ContinuousBatchingScheduler + + +def test_continuous_batching_executor_thread(): + # simple test that ContinuousBatchingScheduler can be instantiated and return + # a result from a request, for testing multi-batch execution, making enough + # concurrent requests guarantee batched execution is out of scope + scheduler = ContinuousBatchingScheduler() + + # mobilenet model with batch_size=2 + engine_operator = EngineOperator( + "zoo:mobilenet_v2-1.0-imagenet-base", + batch_size=1, + ) + + scheduler.add_engine_operator(engine_operator, [1]) + + # submit job to scheduler and expect future to be returned + engine_input = engine_operator.input_schema( + engine_inputs=[numpy.random.randn(1, 3, 224, 224).astype(numpy.float32)] + ) + future = scheduler.submit(engine_input, operator=engine_operator) + assert isinstance(future, Future) + assert not future.done() # assume this runs before engine has a chance to complete + + # assert that output resolves and contains a numpy array + engine_output = future.result() + assert isinstance(engine_output, engine_operator.output_schema) + assert isinstance(engine_output.engine_outputs[0], numpy.ndarray)