Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelize MFAS #717

Merged
merged 15 commits into from
Nov 8, 2023
82 changes: 70 additions & 12 deletions gtsfm/averaging/translation/averaging_1dsfm.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@
import time
from collections import defaultdict
from enum import Enum
from typing import DefaultDict, Dict, List, Optional, Set, Tuple
from typing import DefaultDict, Dict, List, Optional, Set, Tuple, Any

import dask
import gtsam
import numpy as np
from distributed.worker import get_client
from gtsam import (
BinaryMeasurementsPoint3,
BinaryMeasurementPoint3,
Expand Down Expand Up @@ -54,15 +56,42 @@

# Minimum number of measurements required for a track to be used for averaging.
MIN_TRACK_MEASUREMENTS_FOR_AVERAGING = 3

# Number of track measurements to be added for each camera. Can be reduced to 8 for speed at the cost of some accuracy.
TRACKS_MEASUREMENTS_PER_CAMERA = 12

# Heuristically set to limit the number of delayed tasks, as recommended by Dask:
# https://docs.dask.org/en/stable/delayed-best-practices.html#avoid-too-many-tasks
MAX_DELAYED_CALLS = 5

logger = logger_utils.get_logger()

C = symbol_shorthand.A # for camera translation variables
L = symbol_shorthand.B # for track (landmark) translation variables

RelativeDirectionsDict = Dict[Tuple[int, int], Unit3]
DUMMY_NOISE_MODEL = gtsam.noiseModel.Isotropic.Sigma(3, 1e-2) # MFAS does not use this.


class MFASWrapper(object):
def __init__(self, mfas, w_i2Ui1_dict, w_iUj_dict_tracks, directions):
self.mfas = mfas
self._w_i2Ui1_dict = w_i2Ui1_dict
self._w_iUj_dict_tracks = w_iUj_dict_tracks
self._directions = directions

measurements = TranslationAveraging1DSFM._binary_measurements_from_dict(
self._w_i2Ui1_dict, self._w_iUj_dict_tracks, DUMMY_NOISE_MODEL
)
# if len(self._directions) == 0:
# self.results = mfas(measurements, self._directions).computeOutlierWeights()
# else:
self.results = []
for _dir in self._directions:
self.results.append(mfas(measurements, _dir).computeOutlierWeights())

def __reduce__(self):
return (MFASWrapper, (self.mfas, self._w_i2Ui1_dict, self._w_iUj_dict_tracks, self._directions))


class TranslationAveraging1DSFM(TranslationAveragingBase):
Expand Down Expand Up @@ -132,8 +161,8 @@ def __sample_projection_directions(

return projections

@staticmethod
def _binary_measurements_from_dict(
self,
w_i2Ui1_dict: RelativeDirectionsDict,
w_iUj_dict_tracks: RelativeDirectionsDict,
noise_model: gtsam.noiseModel,
Expand Down Expand Up @@ -209,20 +238,49 @@ def compute_inliers(
inlier_cameras: Set of inlier cameras.
"""

# def compute_outlier_weights(w_i2Ui1_dict, w_iUj_dict_tracks, direction):
# dummy_noise_model = gtsam.noiseModel.Isotropic.Sigma(3, 1e-2) # MFAS does not use this.
# w_i1Ui2_measurements = self._binary_measurements_from_dict(
# w_i2Ui1_dict, w_iUj_dict_tracks, dummy_noise_model
# )
# return MFAS(w_i1Ui2_measurements, direction).computeOutlierWeights()

# Sample directions for projection
combined_measurements = list(w_i2Ui1_dict.values()) + list(w_iUj_dict_tracks.values())
projection_directions = self.__sample_projection_directions(combined_measurements)

# Convert to measurements: map indexes to symbols.
dummy_noise_model = gtsam.noiseModel.Isotropic.Sigma(3, 1e-2) # MFAS does not use this.
w_i1Ui2_measurements = self._binary_measurements_from_dict(w_i2Ui1_dict, w_iUj_dict_tracks, dummy_noise_model)

# Compute outlier weights using MFAS.
# TODO(ayush): parallelize this step.
outlier_weights: List[Dict[Tuple[int, int], float]] = []
for direction in projection_directions:
mfas_instance = MFAS(w_i1Ui2_measurements, direction)
outlier_weights.append(mfas_instance.computeOutlierWeights())
w_i1Ui2_measurements = self._binary_measurements_from_dict(w_i2Ui1_dict, w_iUj_dict_tracks, DUMMY_NOISE_MODEL)

# Get client and scatter large data.
client = get_client()
future_w_i2Ui1_dict = client.scatter(w_i2Ui1_dict, broadcast=True)
future_w_iUj_dict_tracks = client.scatter(w_iUj_dict_tracks, broadcast=True)

# Loop through tracks and and generate delayed MFAS tasks.
batch_size = int(np.ceil(len(projection_directions) / MAX_DELAYED_CALLS))
outlier_weights: List[Any] = []
if batch_size == 1:
logger.info("BATCH SIZE 1")
for direction in projection_directions:
outlier_weights.append(
dask.delayed(MFASWrapper)(MFAS, future_w_i2Ui1_dict, future_w_iUj_dict_tracks, direction)
)
else:
for j in range(0, len(projection_directions), batch_size):
outlier_weights.append(
dask.delayed(MFASWrapper)(
MFAS, future_w_i2Ui1_dict, future_w_iUj_dict_tracks, projection_directions[j : j + batch_size]
)
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good @travisdriver. I think CI is failing though?

FAILED tests/averaging/translation/test_averaging_1dsfm.py::TestTranslationAveraging1DSFM::test_circle_all_edges - ValueError: not enough values to unpack (expected 2, got 0)


# # Compute outlier weights using MFAS.
# outlier_weights: List[Dict[Tuple[int, int], float]] = []
# for direction in projection_directions:
# outlier_weights.append(dask.delayed(MFASWrapper)(MFAS, w_i2Ui1_dict, w_iUj_dict_tracks, direction))

# Compute outlier weights in parallel.
outlier_weights = dask.compute(*outlier_weights)
logger.debug("Computed outlier weights using MFAS.")

# Compute average outlier weight.
Expand All @@ -231,7 +289,7 @@ def compute_inliers(
for outlier_weight_dict in outlier_weights:
for w_i1Ui2 in w_i1Ui2_measurements:
i1, i2 = w_i1Ui2.key1(), w_i1Ui2.key2()
outlier_weights_sum[(i1, i2)] += outlier_weight_dict[(i1, i2)]
outlier_weights_sum[(i1, i2)] += outlier_weight_dict.results[(i1, i2)]
for (i1, i2), weight_sum in outlier_weights_sum.items():
if weight_sum / len(projection_directions) < OUTLIER_WEIGHT_THRESHOLD:
inliers.add((i1, i2))
Expand Down