Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ContinuousBatching] ContinuousBatchingScheduler Implementation #1375

Merged
merged 2 commits into from
Nov 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/deepsparse/v2/schedulers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# flake8: noqa
# isort: skip_file

# Copyright (c) 2021 - present / Neuralmagic, Inc. All Rights Reserved.
#
Expand All @@ -16,3 +17,4 @@

from .scheduler import *
from .scheduler_group import *
from .continuous_batching_scheduler import *
141 changes: 141 additions & 0 deletions src/deepsparse/v2/schedulers/continuous_batching_scheduler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
# Copyright (c) 2021 - present / Neuralmagic, Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from concurrent.futures import Future
from threading import Lock
from typing import List

from deepsparse.v2.operators import EngineOperator, Operator
from deepsparse.v2.schedulers.scheduler import OperatorScheduler
from deepsparse.v2.schedulers.utils import (
ContinuousBatchingExecutorThread,
ContinuousBatchingQueues,
)


__all__ = ["ContinuousBatchingScheduler"]


class ContinuousBatchingScheduler(OperatorScheduler):
"""
Manages EngineOperator jobs that should be run with continuous batching.
Groups requests for the same engine into larger batches and returns
the result to the respeictive request threads after scheduled completion
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
the result to the respeictive request threads after scheduled completion
the result to the respective request threads after scheduled completion


:param max_workers: maximum number of threads to execute at once, default 1
"""

def __init__(self, max_workers: int = 1):
self._max_workers = max_workers

self._mutex = Lock()

# Dict[EngineOperator, Dict[batch_size, Engine]]
self._operators_to_engines = {} # EngineOperator -> Dict[batch_size, Engine]
self._queues = ContinuousBatchingQueues()

# create and start max number of worker threads
self._threads = [
ContinuousBatchingExecutorThread(self._queues, self._operators_to_engines)
for _ in range(self.max_workers)
]
for worker_thread in self._threads:
worker_thread.start()

@property
def max_workers(self) -> int:
"""
:return: maximum number of threads to execute at once
"""
return self._max_workers

def submit(self, *args, operator: Operator, **kwargs) -> Future:
Copy link
Contributor

@dsikka dsikka Nov 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all the code for the ContinuousBatchingExecutorThread is EngineOperator specific. Are we ever expecting a non-engine operator as the input to this submit function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great observation - I think it's something we could support in the future, but not sure how much gain there would be. we could probably abstract it out, but it might get a little complicated since the engine operators are unique (and foreseeably unique) that they need to have an engine compiled against the target batch size and this must be swapped in at runtime

"""
:param operator: operator to run
:param operator_input: input schema to the operator
:return: future referencing the asynchronously run output of the operator
"""
inputs = args[0]
if not isinstance(inputs, operator.input_schema):
raise ValueError(
"Inputs to ContinuousBatchingScheduler must be the specific "
f"input schema to the given operator. Expected {operator.input_schema}"
f"found {type(inputs)}"
)

future = Future()
self._queues.add_queue_item(key=operator, item=inputs, future=future)

return future

def can_process(self, *args, operator: Operator, **kwargs) -> bool:
"""
:param operator: operator to check
:param operator_input: operator_input to check
:return: True if this Operator can process the given operator and input.
SchedulerGroup always returns True
"""
return operator in self._operators_to_engines and operator in self._queues

def add_engine_operator(
self, engine_operator: EngineOperator, batch_sizes: List[int]
):
"""
Adds tracking for an engine operator to this scheduler
with continuous batching for the given sizes

:param engine_operator: an EngineOperator, must be compiled with
batch_size=1
:param batch_sizes: batch sizes to use for continuous batching
"""
# lock updates to _operators_to_engines while updating
self._mutex.acquire()

# validation
if engine_operator in self._operators_to_engines:
# operator already added
return

if not isinstance(engine_operator, EngineOperator):
raise ValueError(
f"Expected an EngineOperator instance, found {type(engine_operator)}"
)
if engine_operator.batch_size != 1:
raise ValueError(
"For continuous batching, EngineOperator must have batch_size=1. "
f"found batch_size={engine_operator.batch_size}"
)

# build EngineOperator -> List[batch_size] dict
operator_engines = {}
# base engine, expected batch size is 1
operator_engines[engine_operator.batch_size] = engine_operator.engine

# compile auxillary engines for continuous batching
for batch_size in batch_sizes:
if batch_size == 1:
continue # already added
operator_engines[batch_size] = operator_engines.create_engine(
batch_size=batch_size
)

self._operators_to_engines[engine_operator] = operator_engines
self._queues.add_queue(
key=engine_operator,
batch_sizes=list(operator_engines.keys()),
)

# release lock
self._mutex.release()
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Copyright (c) 2021 - present / Neuralmagic, Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from concurrent.futures import Future

import numpy

from deepsparse.v2.operators import EngineOperator
from deepsparse.v2.schedulers import ContinuousBatchingScheduler


def test_continuous_batching_executor_thread():
# simple test that ContinuousBatchingScheduler can be instantiated and return
# a result from a request, for testing multi-batch execution, making enough
# concurrent requests guarantee batched execution is out of scope
scheduler = ContinuousBatchingScheduler()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the goal that we would initialize (at least) two schedulers, one ContinuousBatchingScheduler and the remaining OperatorSchedulers as part of the pipeline init? And then swap to use ContinuousBatchingScheduler when the operator being executed is of type EngineOperator?

Copy link
Contributor Author

@bfineran bfineran Nov 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes exactly, the ContinuousBatchingScheduler would come first in the list of schedulers so it can pick up any relevant engine operator jobs. the default scheduler picks up the rest


# mobilenet model with batch_size=2
engine_operator = EngineOperator(
"zoo:mobilenet_v2-1.0-imagenet-base",
batch_size=1,
)

scheduler.add_engine_operator(engine_operator, [1])

# submit job to scheduler and expect future to be returned
engine_input = engine_operator.input_schema(
engine_inputs=[numpy.random.randn(1, 3, 224, 224).astype(numpy.float32)]
)
future = scheduler.submit(engine_input, operator=engine_operator)
assert isinstance(future, Future)
assert not future.done() # assume this runs before engine has a chance to complete

# assert that output resolves and contains a numpy array
engine_output = future.result()
assert isinstance(engine_output, engine_operator.output_schema)
assert isinstance(engine_output.engine_outputs[0], numpy.ndarray)