From 83f1dd762fb0d37756d30a2fdbd578f6e6659c14 Mon Sep 17 00:00:00 2001 From: Travis Driver Date: Thu, 14 Sep 2023 13:12:44 -0400 Subject: [PATCH 01/11] Parallelize MFAS, not much faster yet --- .../averaging/translation/averaging_1dsfm.py | 105 ++++++++++++++---- 1 file changed, 83 insertions(+), 22 deletions(-) diff --git a/gtsfm/averaging/translation/averaging_1dsfm.py b/gtsfm/averaging/translation/averaging_1dsfm.py index 329431a45..ae2dddc16 100644 --- a/gtsfm/averaging/translation/averaging_1dsfm.py +++ b/gtsfm/averaging/translation/averaging_1dsfm.py @@ -12,9 +12,12 @@ """ from collections import defaultdict from enum import Enum -from typing import DefaultDict, Dict, List, Optional, Set, Tuple +from typing import DefaultDict, Dict, List, Optional, Set, Tuple, Any + +import dask import gtsam import numpy as np +from distributed.worker import get_client from gtsam import ( BinaryMeasurementsPoint3, BinaryMeasurementPoint3, @@ -52,15 +55,42 @@ # Minimum number of measurements required for a track to be used for averaging. MIN_TRACK_MEASUREMENTS_FOR_AVERAGING = 3 + # Number of track measurements to be added for each camera. Can be reduced to 8 for speed at the cost of some accuracy. TRACKS_MEASUREMENTS_PER_CAMERA = 12 +# Heuristically set to limit the number of delayed tasks, as recommended by Dask: +# https://docs.dask.org/en/stable/delayed-best-practices.html#avoid-too-many-tasks +MAX_DELAYED_CALLS = 5 + logger = logger_utils.get_logger() C = symbol_shorthand.A # for camera translation variables L = symbol_shorthand.B # for track (landmark) translation variables RelativeDirectionsDict = Dict[Tuple[int, int], Unit3] +DUMMY_NOISE_MODEL = gtsam.noiseModel.Isotropic.Sigma(3, 1e-2) # MFAS does not use this. + + +class MFASWrapper(object): + def __init__(self, mfas, w_i2Ui1_dict, w_iUj_dict_tracks, directions): + self.mfas = mfas + self._w_i2Ui1_dict = w_i2Ui1_dict + self._w_iUj_dict_tracks = w_iUj_dict_tracks + self._directions = directions + + measurements = TranslationAveraging1DSFM._binary_measurements_from_dict( + self._w_i2Ui1_dict, self._w_iUj_dict_tracks, DUMMY_NOISE_MODEL + ) + # if len(self._directions) == 0: + # self.results = mfas(measurements, self._directions).computeOutlierWeights() + # else: + self.results = [] + for _dir in self._directions: + self.results.append(mfas(measurements, _dir).computeOutlierWeights()) + + def __reduce__(self): + return (MFASWrapper, (self.mfas, self._w_i2Ui1_dict, self._w_iUj_dict_tracks, self._directions)) class TranslationAveraging1DSFM(TranslationAveragingBase): @@ -130,8 +160,8 @@ def __sample_projection_directions( return projections + @staticmethod def _binary_measurements_from_dict( - self, w_i2Ui1_dict: RelativeDirectionsDict, w_iUj_dict_tracks: RelativeDirectionsDict, noise_model: gtsam.noiseModel, @@ -207,20 +237,49 @@ def compute_inliers( inlier_cameras: Set of inlier cameras. """ + # def compute_outlier_weights(w_i2Ui1_dict, w_iUj_dict_tracks, direction): + # dummy_noise_model = gtsam.noiseModel.Isotropic.Sigma(3, 1e-2) # MFAS does not use this. + # w_i1Ui2_measurements = self._binary_measurements_from_dict( + # w_i2Ui1_dict, w_iUj_dict_tracks, dummy_noise_model + # ) + # return MFAS(w_i1Ui2_measurements, direction).computeOutlierWeights() + # Sample directions for projection combined_measurements = list(w_i2Ui1_dict.values()) + list(w_iUj_dict_tracks.values()) projection_directions = self.__sample_projection_directions(combined_measurements) # Convert to measurements: map indexes to symbols. - dummy_noise_model = gtsam.noiseModel.Isotropic.Sigma(3, 1e-2) # MFAS does not use this. - w_i1Ui2_measurements = self._binary_measurements_from_dict(w_i2Ui1_dict, w_iUj_dict_tracks, dummy_noise_model) - - # Compute outlier weights using MFAS. - # TODO(ayush): parallelize this step. - outlier_weights: List[Dict[Tuple[int, int], float]] = [] - for direction in projection_directions: - mfas_instance = MFAS(w_i1Ui2_measurements, direction) - outlier_weights.append(mfas_instance.computeOutlierWeights()) + w_i1Ui2_measurements = self._binary_measurements_from_dict(w_i2Ui1_dict, w_iUj_dict_tracks, DUMMY_NOISE_MODEL) + + # Get client and scatter large data. + client = get_client() + future_w_i2Ui1_dict = client.scatter(w_i2Ui1_dict, broadcast=True) + future_w_iUj_dict_tracks = client.scatter(w_iUj_dict_tracks, broadcast=True) + + # Loop through tracks and and generate delayed MFAS tasks. + batch_size = int(np.ceil(len(projection_directions) / MAX_DELAYED_CALLS)) + outlier_weights: List[Any] = [] + if batch_size == 1: + logger.info("BATCH SIZE 1") + for direction in projection_directions: + outlier_weights.append( + dask.delayed(MFASWrapper)(MFAS, future_w_i2Ui1_dict, future_w_iUj_dict_tracks, direction) + ) + else: + for j in range(0, len(projection_directions), batch_size): + outlier_weights.append( + dask.delayed(MFASWrapper)( + MFAS, future_w_i2Ui1_dict, future_w_iUj_dict_tracks, projection_directions[j : j + batch_size] + ) + ) + + # # Compute outlier weights using MFAS. + # outlier_weights: List[Dict[Tuple[int, int], float]] = [] + # for direction in projection_directions: + # outlier_weights.append(dask.delayed(MFASWrapper)(MFAS, w_i2Ui1_dict, w_iUj_dict_tracks, direction)) + + # Compute outlier weights in parallel. + outlier_weights = dask.compute(*outlier_weights) logger.debug("Computed outlier weights using MFAS.") # Compute average outlier weight. @@ -229,7 +288,7 @@ def compute_inliers( for outlier_weight_dict in outlier_weights: for w_i1Ui2 in w_i1Ui2_measurements: i1, i2 = w_i1Ui2.key1(), w_i1Ui2.key2() - outlier_weights_sum[(i1, i2)] += outlier_weight_dict[(i1, i2)] + outlier_weights_sum[(i1, i2)] += outlier_weight_dict.results[(i1, i2)] for (i1, i2), weight_sum in outlier_weights_sum.items(): if weight_sum / len(projection_directions) < OUTLIER_WEIGHT_THRESHOLD: inliers.add((i1, i2)) @@ -530,12 +589,12 @@ def compute_metrics( GtsfmMetric("num_outlier_1dsfm_measurements", len(outlier_i1_i2_pairs)), GtsfmMetric("num_translations_estimated", len([wti for wti in wti_list if wti is not None])), ] - + # Remaining metrics require ground truth, so return if GT is not available. gt_available = np.array([gt_wTi is not None for gt_wTi in gt_wTi_list]).any() if not gt_available: return GtsfmMetricsGroup("translation_averaging_metrics", ta_metrics) - + # Get ground truth translation directions for the measurements. gt_i2Ui1_dict = metrics_utils.get_twoview_translation_directions(gt_wTi_list) @@ -562,14 +621,16 @@ def compute_metrics( gt_wti_list = [gt_wTi.translation() if gt_wTi is not None else None for gt_wTi in gt_wTi_list] threshold_suffix = str(int(MAX_INLIER_MEASUREMENT_ERROR_DEG)) + "_deg" - ta_metrics.extend([ - GtsfmMetric("1dsfm_precision_" + threshold_suffix, precision), - GtsfmMetric("1dsfm_recall_" + threshold_suffix, recall), - GtsfmMetric("1dsfm_inlier_angular_errors_deg", inlier_angular_errors), - GtsfmMetric("1dsfm_outlier_angular_errors_deg", outlier_angular_errors), - metrics_utils.compute_translation_angle_metric(measured_gt_i2Ui1_dict, wTi_aligned_list), - metrics_utils.compute_translation_distance_metric(wti_aligned_list, gt_wti_list), - ]) + ta_metrics.extend( + [ + GtsfmMetric("1dsfm_precision_" + threshold_suffix, precision), + GtsfmMetric("1dsfm_recall_" + threshold_suffix, recall), + GtsfmMetric("1dsfm_inlier_angular_errors_deg", inlier_angular_errors), + GtsfmMetric("1dsfm_outlier_angular_errors_deg", outlier_angular_errors), + metrics_utils.compute_translation_angle_metric(measured_gt_i2Ui1_dict, wTi_aligned_list), + metrics_utils.compute_translation_distance_metric(wti_aligned_list, gt_wti_list), + ] + ) return GtsfmMetricsGroup("translation_averaging_metrics", ta_metrics) From c6457d9fc0124dfa5acfc6374edeb910f6df5382 Mon Sep 17 00:00:00 2001 From: senselessdev1 Date: Tue, 19 Sep 2023 01:34:49 -0400 Subject: [PATCH 02/11] add outer batch dim --- gtsfm/averaging/translation/averaging_1dsfm.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/gtsfm/averaging/translation/averaging_1dsfm.py b/gtsfm/averaging/translation/averaging_1dsfm.py index ae4af72a3..46bce0651 100644 --- a/gtsfm/averaging/translation/averaging_1dsfm.py +++ b/gtsfm/averaging/translation/averaging_1dsfm.py @@ -259,16 +259,16 @@ def compute_inliers( # Loop through tracks and and generate delayed MFAS tasks. batch_size = int(np.ceil(len(projection_directions) / MAX_DELAYED_CALLS)) - outlier_weights: List[Any] = [] + batched_outlier_weights: List[Any] = [] if batch_size == 1: logger.info("BATCH SIZE 1") for direction in projection_directions: - outlier_weights.append( + batched_outlier_weights.append( dask.delayed(MFASWrapper)(MFAS, future_w_i2Ui1_dict, future_w_iUj_dict_tracks, direction) ) else: for j in range(0, len(projection_directions), batch_size): - outlier_weights.append( + batched_outlier_weights.append( dask.delayed(MFASWrapper)( MFAS, future_w_i2Ui1_dict, future_w_iUj_dict_tracks, projection_directions[j : j + batch_size] ) @@ -280,16 +280,17 @@ def compute_inliers( # outlier_weights.append(dask.delayed(MFASWrapper)(MFAS, w_i2Ui1_dict, w_iUj_dict_tracks, direction)) # Compute outlier weights in parallel. - outlier_weights = dask.compute(*outlier_weights) + batched_outlier_weights = dask.compute(*batched_outlier_weights) logger.debug("Computed outlier weights using MFAS.") # Compute average outlier weight. outlier_weights_sum: DefaultDict[Tuple[int, int], float] = defaultdict(float) inliers = set() - for outlier_weight_dict in outlier_weights: - for w_i1Ui2 in w_i1Ui2_measurements: - i1, i2 = w_i1Ui2.key1(), w_i1Ui2.key2() - outlier_weights_sum[(i1, i2)] += outlier_weight_dict.results[(i1, i2)] + for batch_outlier_weights in batched_outlier_weights: + for outlier_weight_dict in batch_outlier_weights.results: + for w_i1Ui2 in w_i1Ui2_measurements: + i1, i2 = w_i1Ui2.key1(), w_i1Ui2.key2() + outlier_weights_sum[(i1, i2)] += outlier_weight_dict[(i1, i2)] for (i1, i2), weight_sum in outlier_weights_sum.items(): if weight_sum / len(projection_directions) < OUTLIER_WEIGHT_THRESHOLD: inliers.add((i1, i2)) From 1c21a3cbcf1e33c18000b4cfef53f06df100f765 Mon Sep 17 00:00:00 2001 From: senselessdev1 Date: Wed, 20 Sep 2023 00:10:21 -0400 Subject: [PATCH 03/11] fix unit test --- .../averaging/translation/averaging_1dsfm.py | 44 +++++++++---------- 1 file changed, 21 insertions(+), 23 deletions(-) diff --git a/gtsfm/averaging/translation/averaging_1dsfm.py b/gtsfm/averaging/translation/averaging_1dsfm.py index 46bce0651..9f27ddb29 100644 --- a/gtsfm/averaging/translation/averaging_1dsfm.py +++ b/gtsfm/averaging/translation/averaging_1dsfm.py @@ -74,21 +74,32 @@ class MFASWrapper(object): - def __init__(self, mfas, w_i2Ui1_dict, w_iUj_dict_tracks, directions): + def __init__( + self, + mfas, + w_i2Ui1_dict: RelativeDirectionsDict, + w_iUj_dict_tracks: RelativeDirectionsDict, + directions: List[Unit3], + ) -> None: + """ + + Args: + mfas: + w_i2Ui1_dict: Dictionary of Unit3 relative translations between cameras. + w_i2Ui1_dict_tracks: Dictionary of Unit3 relative translations between cameras and landmarks. + directions: Sampled Unit3 projection directions to use. + """ self.mfas = mfas self._w_i2Ui1_dict = w_i2Ui1_dict self._w_iUj_dict_tracks = w_iUj_dict_tracks self._directions = directions - measurements = TranslationAveraging1DSFM._binary_measurements_from_dict( + w_i1Ui2_measurements = TranslationAveraging1DSFM._binary_measurements_from_dict( self._w_i2Ui1_dict, self._w_iUj_dict_tracks, DUMMY_NOISE_MODEL ) - # if len(self._directions) == 0: - # self.results = mfas(measurements, self._directions).computeOutlierWeights() - # else: self.results = [] for _dir in self._directions: - self.results.append(mfas(measurements, _dir).computeOutlierWeights()) + self.results.append(mfas(w_i1Ui2_measurements, _dir).computeOutlierWeights()) def __reduce__(self): return (MFASWrapper, (self.mfas, self._w_i2Ui1_dict, self._w_iUj_dict_tracks, self._directions)) @@ -238,13 +249,6 @@ def compute_inliers( inlier_cameras: Set of inlier cameras. """ - # def compute_outlier_weights(w_i2Ui1_dict, w_iUj_dict_tracks, direction): - # dummy_noise_model = gtsam.noiseModel.Isotropic.Sigma(3, 1e-2) # MFAS does not use this. - # w_i1Ui2_measurements = self._binary_measurements_from_dict( - # w_i2Ui1_dict, w_iUj_dict_tracks, dummy_noise_model - # ) - # return MFAS(w_i1Ui2_measurements, direction).computeOutlierWeights() - # Sample directions for projection combined_measurements = list(w_i2Ui1_dict.values()) + list(w_iUj_dict_tracks.values()) projection_directions = self.__sample_projection_directions(combined_measurements) @@ -252,10 +256,9 @@ def compute_inliers( # Convert to measurements: map indexes to symbols. w_i1Ui2_measurements = self._binary_measurements_from_dict(w_i2Ui1_dict, w_iUj_dict_tracks, DUMMY_NOISE_MODEL) - # Get client and scatter large data. - client = get_client() - future_w_i2Ui1_dict = client.scatter(w_i2Ui1_dict, broadcast=True) - future_w_iUj_dict_tracks = client.scatter(w_iUj_dict_tracks, broadcast=True) + # TODO(johnwlambert): Consider getting global client and scattering large data. + future_w_i2Ui1_dict = w_i2Ui1_dict + future_w_iUj_dict_tracks = w_iUj_dict_tracks # Loop through tracks and and generate delayed MFAS tasks. batch_size = int(np.ceil(len(projection_directions) / MAX_DELAYED_CALLS)) @@ -264,7 +267,7 @@ def compute_inliers( logger.info("BATCH SIZE 1") for direction in projection_directions: batched_outlier_weights.append( - dask.delayed(MFASWrapper)(MFAS, future_w_i2Ui1_dict, future_w_iUj_dict_tracks, direction) + dask.delayed(MFASWrapper)(MFAS, future_w_i2Ui1_dict, future_w_iUj_dict_tracks, [direction]) ) else: for j in range(0, len(projection_directions), batch_size): @@ -274,11 +277,6 @@ def compute_inliers( ) ) - # # Compute outlier weights using MFAS. - # outlier_weights: List[Dict[Tuple[int, int], float]] = [] - # for direction in projection_directions: - # outlier_weights.append(dask.delayed(MFASWrapper)(MFAS, w_i2Ui1_dict, w_iUj_dict_tracks, direction)) - # Compute outlier weights in parallel. batched_outlier_weights = dask.compute(*batched_outlier_weights) logger.debug("Computed outlier weights using MFAS.") From 3a8e23f536a314b4bcaa196143de2fbdfbe2afca Mon Sep 17 00:00:00 2001 From: senselessdev1 Date: Wed, 20 Sep 2023 00:55:29 -0400 Subject: [PATCH 04/11] parameterize # of delayed calls --- gtsfm/averaging/translation/averaging_1dsfm.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/gtsfm/averaging/translation/averaging_1dsfm.py b/gtsfm/averaging/translation/averaging_1dsfm.py index 9f27ddb29..faecc036b 100644 --- a/gtsfm/averaging/translation/averaging_1dsfm.py +++ b/gtsfm/averaging/translation/averaging_1dsfm.py @@ -126,13 +126,16 @@ def __init__( use_tracks_for_averaging: bool = True, reject_outliers: bool = True, projection_sampling_method: ProjectionSamplingMethod = ProjectionSamplingMethod.SAMPLE_WITH_UNIFORM_DENSITY, + max_delayed_calls: int = MAX_DELAYED_CALLS ) -> None: """Initializes the 1DSFM averaging instance. Args: robust_measurement_noise: Whether to use a robust noise model for the measurements, defaults to true. + use_tracks_for_averaging: reject_outliers: whether to perform outlier rejection with MFAS algorithm (default True). projection_sampling_method: ProjectionSamplingMethod to be used for directions to run 1DSfM. + max_delayed_calls: Maximum number of concurrent delayed tasks to create. """ super().__init__(robust_measurement_noise) @@ -141,6 +144,7 @@ def __init__( self._reject_outliers = reject_outliers self._projection_sampling_method = projection_sampling_method self._use_tracks_for_averaging = use_tracks_for_averaging + self._max_delayed_calls = max_delayed_calls def __sample_projection_directions( self, @@ -261,7 +265,7 @@ def compute_inliers( future_w_iUj_dict_tracks = w_iUj_dict_tracks # Loop through tracks and and generate delayed MFAS tasks. - batch_size = int(np.ceil(len(projection_directions) / MAX_DELAYED_CALLS)) + batch_size = int(np.ceil(len(projection_directions) / self._max_delayed_calls)) batched_outlier_weights: List[Any] = [] if batch_size == 1: logger.info("BATCH SIZE 1") From 44ba737067613b35de85ed23b5ace2a51ed6d47f Mon Sep 17 00:00:00 2001 From: Travis Driver Date: Tue, 24 Oct 2023 09:52:30 -0400 Subject: [PATCH 05/11] Added timing --- gtsfm/averaging/translation/averaging_1dsfm.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/gtsfm/averaging/translation/averaging_1dsfm.py b/gtsfm/averaging/translation/averaging_1dsfm.py index faecc036b..a22069213 100644 --- a/gtsfm/averaging/translation/averaging_1dsfm.py +++ b/gtsfm/averaging/translation/averaging_1dsfm.py @@ -11,6 +11,7 @@ Authors: Jing Wu, Ayush Baid, Akshay Krishnan """ import time +import timeit from collections import defaultdict from enum import Enum from typing import DefaultDict, Dict, List, Optional, Set, Tuple, Any @@ -89,6 +90,7 @@ def __init__( w_i2Ui1_dict_tracks: Dictionary of Unit3 relative translations between cameras and landmarks. directions: Sampled Unit3 projection directions to use. """ + _t0 = timeit.default_timer() self.mfas = mfas self._w_i2Ui1_dict = w_i2Ui1_dict self._w_iUj_dict_tracks = w_iUj_dict_tracks @@ -97,9 +99,12 @@ def __init__( w_i1Ui2_measurements = TranslationAveraging1DSFM._binary_measurements_from_dict( self._w_i2Ui1_dict, self._w_iUj_dict_tracks, DUMMY_NOISE_MODEL ) + _t1 = timeit.default_timer() self.results = [] for _dir in self._directions: self.results.append(mfas(w_i1Ui2_measurements, _dir).computeOutlierWeights()) + _t2 = timeit.default_timer() + print(_t1 - _t0, _t2 - _t1) def __reduce__(self): return (MFASWrapper, (self.mfas, self._w_i2Ui1_dict, self._w_iUj_dict_tracks, self._directions)) @@ -126,7 +131,7 @@ def __init__( use_tracks_for_averaging: bool = True, reject_outliers: bool = True, projection_sampling_method: ProjectionSamplingMethod = ProjectionSamplingMethod.SAMPLE_WITH_UNIFORM_DENSITY, - max_delayed_calls: int = MAX_DELAYED_CALLS + max_delayed_calls: int = MAX_DELAYED_CALLS, ) -> None: """Initializes the 1DSFM averaging instance. @@ -265,6 +270,7 @@ def compute_inliers( future_w_iUj_dict_tracks = w_iUj_dict_tracks # Loop through tracks and and generate delayed MFAS tasks. + _t0 = timeit.default_timer() batch_size = int(np.ceil(len(projection_directions) / self._max_delayed_calls)) batched_outlier_weights: List[Any] = [] if batch_size == 1: @@ -280,10 +286,13 @@ def compute_inliers( MFAS, future_w_i2Ui1_dict, future_w_iUj_dict_tracks, projection_directions[j : j + batch_size] ) ) + _t1 = timeit.default_timer() + print(f"Built delayed Tasks in {_t1 - _t0} seconds") # Compute outlier weights in parallel. + _t2 = timeit.default_timer() batched_outlier_weights = dask.compute(*batched_outlier_weights) - logger.debug("Computed outlier weights using MFAS.") + logger.debug(f"Computed outlier weights using MFAS in {timeit.default_timer() - _t2}.") # Compute average outlier weight. outlier_weights_sum: DefaultDict[Tuple[int, int], float] = defaultdict(float) From 9fa0ed733a0b80a0221be2a645300cb3757a7082 Mon Sep 17 00:00:00 2001 From: Travis Driver Date: Mon, 6 Nov 2023 13:56:52 -0500 Subject: [PATCH 06/11] Testing --- gtsfm/averaging/translation/averaging_1dsfm.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/gtsfm/averaging/translation/averaging_1dsfm.py b/gtsfm/averaging/translation/averaging_1dsfm.py index a22069213..b71de8f66 100644 --- a/gtsfm/averaging/translation/averaging_1dsfm.py +++ b/gtsfm/averaging/translation/averaging_1dsfm.py @@ -102,9 +102,14 @@ def __init__( _t1 = timeit.default_timer() self.results = [] for _dir in self._directions: - self.results.append(mfas(w_i1Ui2_measurements, _dir).computeOutlierWeights()) + _tt0 = timeit.default_timer() + _mfas = mfas(w_i1Ui2_measurements, _dir) + _tt1 = timeit.default_timer() + self.results.append(_mfas.computeOutlierWeights()) + _tt2 = timeit.default_timer() + # print("inner loop", _tt1 - _tt0, _tt2 - _tt1) _t2 = timeit.default_timer() - print(_t1 - _t0, _t2 - _t1) + # print("outter loop", _t1 - _t0, _t2 - _t1) def __reduce__(self): return (MFASWrapper, (self.mfas, self._w_i2Ui1_dict, self._w_iUj_dict_tracks, self._directions)) @@ -272,6 +277,7 @@ def compute_inliers( # Loop through tracks and and generate delayed MFAS tasks. _t0 = timeit.default_timer() batch_size = int(np.ceil(len(projection_directions) / self._max_delayed_calls)) + print("BATCH SIZE:", batch_size, len(projection_directions)) batched_outlier_weights: List[Any] = [] if batch_size == 1: logger.info("BATCH SIZE 1") From 73d82ff29d4656c033fda77295f2bb79fe79bd93 Mon Sep 17 00:00:00 2001 From: Travis Driver Date: Mon, 6 Nov 2023 22:00:20 -0500 Subject: [PATCH 07/11] Working --- .../averaging/translation/averaging_1dsfm.py | 143 +++++++++++------- 1 file changed, 89 insertions(+), 54 deletions(-) diff --git a/gtsfm/averaging/translation/averaging_1dsfm.py b/gtsfm/averaging/translation/averaging_1dsfm.py index b71de8f66..bc325815c 100644 --- a/gtsfm/averaging/translation/averaging_1dsfm.py +++ b/gtsfm/averaging/translation/averaging_1dsfm.py @@ -63,7 +63,7 @@ # Heuristically set to limit the number of delayed tasks, as recommended by Dask: # https://docs.dask.org/en/stable/delayed-best-practices.html#avoid-too-many-tasks -MAX_DELAYED_CALLS = 5 +MAX_DELAYED_CALLS = 16 logger = logger_utils.get_logger() @@ -74,45 +74,64 @@ DUMMY_NOISE_MODEL = gtsam.noiseModel.Isotropic.Sigma(3, 1e-2) # MFAS does not use this. -class MFASWrapper(object): - def __init__( - self, - mfas, - w_i2Ui1_dict: RelativeDirectionsDict, - w_iUj_dict_tracks: RelativeDirectionsDict, - directions: List[Unit3], - ) -> None: - """ +# class MFASWrapper(object): +# def __init__( +# self, +# mfas, +# w_i2Ui1_dict: RelativeDirectionsDict, +# w_iUj_dict_tracks: RelativeDirectionsDict, +# directions: List[Unit3], +# ) -> None: +# """ +# +# Args: +# mfas: +# w_i2Ui1_dict: Dictionary of Unit3 relative translations between cameras. +# w_i2Ui1_dict_tracks: Dictionary of Unit3 relative translations between cameras and landmarks. +# directions: Sampled Unit3 projection directions to use. +# """ +# # _t0 = timeit.default_timer() +# self.mfas = mfas +# self._w_i2Ui1_dict = w_i2Ui1_dict +# self._w_iUj_dict_tracks = w_iUj_dict_tracks +# self._directions = directions +# +# w_i1Ui2_measurements = TranslationAveraging1DSFM._binary_measurements_from_dict( +# self._w_i2Ui1_dict, self._w_iUj_dict_tracks, DUMMY_NOISE_MODEL +# ) +# # _t1 = timeit.default_timer() +# self.results = [] +# for _dir in self._directions: +# # _tt0 = timeit.default_timer() +# _mfas = mfas(w_i1Ui2_measurements, _dir) +# # _tt1 = timeit.default_timer() +# self.results.append(_mfas.computeOutlierWeights()) +# # _tt2 = timeit.default_timer() +# # print("inner loop", _tt1 - _tt0, _tt2 - _tt1) +# # _t2 = timeit.default_timer() +# # print("outter loop", _t1 - _t0, _t2 - _t1) +# +# def __reduce__(self): +# return (MFASWrapper, (self.mfas, self._w_i2Ui1_dict, self._w_iUj_dict_tracks, self._directions)) - Args: - mfas: - w_i2Ui1_dict: Dictionary of Unit3 relative translations between cameras. - w_i2Ui1_dict_tracks: Dictionary of Unit3 relative translations between cameras and landmarks. - directions: Sampled Unit3 projection directions to use. - """ - _t0 = timeit.default_timer() - self.mfas = mfas - self._w_i2Ui1_dict = w_i2Ui1_dict - self._w_iUj_dict_tracks = w_iUj_dict_tracks - self._directions = directions - w_i1Ui2_measurements = TranslationAveraging1DSFM._binary_measurements_from_dict( - self._w_i2Ui1_dict, self._w_iUj_dict_tracks, DUMMY_NOISE_MODEL - ) - _t1 = timeit.default_timer() +class MFASWrapper(object): + def __init__(self, mfas, results=[]) -> None: + """ """ + self.mfas = mfas self.results = [] - for _dir in self._directions: - _tt0 = timeit.default_timer() - _mfas = mfas(w_i1Ui2_measurements, _dir) - _tt1 = timeit.default_timer() - self.results.append(_mfas.computeOutlierWeights()) - _tt2 = timeit.default_timer() - # print("inner loop", _tt1 - _tt0, _tt2 - _tt1) - _t2 = timeit.default_timer() - # print("outter loop", _t1 - _t0, _t2 - _t1) def __reduce__(self): - return (MFASWrapper, (self.mfas, self._w_i2Ui1_dict, self._w_iUj_dict_tracks, self._directions)) + return (MFASWrapper, (self.mfas,)) + + +class MFASResultsWrapper(object): + def __init__(self, results=[]) -> None: + """ """ + self.results = results + + def __reduce__(self): + return (MFASWrapper, (self.results,)) class TranslationAveraging1DSFM(TranslationAveragingBase): @@ -245,6 +264,24 @@ def get_prior_in_world_frame(i2, i2Ti1_prior): ) return w_i1ti2_prior_measurements + @staticmethod + def run_mfas( + mfas_results_wrapper: MFASResultsWrapper, + w_i2Ui1_dict: RelativeDirectionsDict, + w_iUj_dict_tracks: RelativeDirectionsDict, + directions: List[Unit3], + ): + w_i1Ui2_measurements = TranslationAveraging1DSFM._binary_measurements_from_dict( + w_i2Ui1_dict, w_iUj_dict_tracks, DUMMY_NOISE_MODEL + ) + # mfas_results_wrapper = MFASResultsWrapper() + for dir in directions: + # mfas = mfas_wrapper.mfas(w_i1Ui2_measurements, dir) + # mfas_wrapper.results.append(mfas.computeOutlierWeights()) + mfas_results_wrapper.results.append(MFAS(w_i1Ui2_measurements, dir).computeOutlierWeights()) + + return mfas_results_wrapper + def compute_inliers( self, w_i2Ui1_dict: RelativeDirectionsDict, @@ -271,34 +308,32 @@ def compute_inliers( w_i1Ui2_measurements = self._binary_measurements_from_dict(w_i2Ui1_dict, w_iUj_dict_tracks, DUMMY_NOISE_MODEL) # TODO(johnwlambert): Consider getting global client and scattering large data. - future_w_i2Ui1_dict = w_i2Ui1_dict - future_w_iUj_dict_tracks = w_iUj_dict_tracks + client = get_client() + future_w_i2Ui1_dict = client.scatter(w_i2Ui1_dict, broadcast=True) + future_w_iUj_dict_tracks = client.scatter(w_iUj_dict_tracks, broadcast=True) + # future_mfas_wrapper = client.scatter(MFASWrapper(MFAS), broadcast=True) # Loop through tracks and and generate delayed MFAS tasks. - _t0 = timeit.default_timer() + # _t0 = timeit.default_timer() batch_size = int(np.ceil(len(projection_directions) / self._max_delayed_calls)) - print("BATCH SIZE:", batch_size, len(projection_directions)) + # print("BATCH SIZE:", batch_size, len(projection_directions)) batched_outlier_weights: List[Any] = [] - if batch_size == 1: - logger.info("BATCH SIZE 1") - for direction in projection_directions: - batched_outlier_weights.append( - dask.delayed(MFASWrapper)(MFAS, future_w_i2Ui1_dict, future_w_iUj_dict_tracks, [direction]) - ) - else: - for j in range(0, len(projection_directions), batch_size): - batched_outlier_weights.append( - dask.delayed(MFASWrapper)( - MFAS, future_w_i2Ui1_dict, future_w_iUj_dict_tracks, projection_directions[j : j + batch_size] - ) + for j in range(0, len(projection_directions), batch_size): + batched_outlier_weights.append( + dask.delayed(self.run_mfas)( + MFASResultsWrapper(), + future_w_i2Ui1_dict, + future_w_iUj_dict_tracks, + projection_directions[j : j + batch_size], ) - _t1 = timeit.default_timer() - print(f"Built delayed Tasks in {_t1 - _t0} seconds") + ) + # _t1 = timeit.default_timer() + # print(f"Built delayed Tasks in {_t1 - _t0} seconds") # Compute outlier weights in parallel. _t2 = timeit.default_timer() batched_outlier_weights = dask.compute(*batched_outlier_weights) - logger.debug(f"Computed outlier weights using MFAS in {timeit.default_timer() - _t2}.") + logger.info(f"Computed outlier weights using MFAS in {timeit.default_timer() - _t2}.") # Compute average outlier weight. outlier_weights_sum: DefaultDict[Tuple[int, int], float] = defaultdict(float) From 7715a38a797901dee51dc780d02588c68f47879a Mon Sep 17 00:00:00 2001 From: Travis Driver Date: Tue, 7 Nov 2023 00:21:47 -0500 Subject: [PATCH 08/11] New architecture --- .../averaging/translation/averaging_1dsfm.py | 70 +++++-------------- 1 file changed, 16 insertions(+), 54 deletions(-) diff --git a/gtsfm/averaging/translation/averaging_1dsfm.py b/gtsfm/averaging/translation/averaging_1dsfm.py index bc325815c..f1dff33b5 100644 --- a/gtsfm/averaging/translation/averaging_1dsfm.py +++ b/gtsfm/averaging/translation/averaging_1dsfm.py @@ -74,47 +74,6 @@ DUMMY_NOISE_MODEL = gtsam.noiseModel.Isotropic.Sigma(3, 1e-2) # MFAS does not use this. -# class MFASWrapper(object): -# def __init__( -# self, -# mfas, -# w_i2Ui1_dict: RelativeDirectionsDict, -# w_iUj_dict_tracks: RelativeDirectionsDict, -# directions: List[Unit3], -# ) -> None: -# """ -# -# Args: -# mfas: -# w_i2Ui1_dict: Dictionary of Unit3 relative translations between cameras. -# w_i2Ui1_dict_tracks: Dictionary of Unit3 relative translations between cameras and landmarks. -# directions: Sampled Unit3 projection directions to use. -# """ -# # _t0 = timeit.default_timer() -# self.mfas = mfas -# self._w_i2Ui1_dict = w_i2Ui1_dict -# self._w_iUj_dict_tracks = w_iUj_dict_tracks -# self._directions = directions -# -# w_i1Ui2_measurements = TranslationAveraging1DSFM._binary_measurements_from_dict( -# self._w_i2Ui1_dict, self._w_iUj_dict_tracks, DUMMY_NOISE_MODEL -# ) -# # _t1 = timeit.default_timer() -# self.results = [] -# for _dir in self._directions: -# # _tt0 = timeit.default_timer() -# _mfas = mfas(w_i1Ui2_measurements, _dir) -# # _tt1 = timeit.default_timer() -# self.results.append(_mfas.computeOutlierWeights()) -# # _tt2 = timeit.default_timer() -# # print("inner loop", _tt1 - _tt0, _tt2 - _tt1) -# # _t2 = timeit.default_timer() -# # print("outter loop", _t1 - _t0, _t2 - _t1) -# -# def __reduce__(self): -# return (MFASWrapper, (self.mfas, self._w_i2Ui1_dict, self._w_iUj_dict_tracks, self._directions)) - - class MFASWrapper(object): def __init__(self, mfas, results=[]) -> None: """ """ @@ -122,16 +81,20 @@ def __init__(self, mfas, results=[]) -> None: self.results = [] def __reduce__(self): - return (MFASWrapper, (self.mfas,)) + return (MFASWrapper, (self.mfas, self.results)) class MFASResultsWrapper(object): def __init__(self, results=[]) -> None: - """ """ + """Wraps result from MFAS. + + Note: Dask is unable to serialize the output from MFAS::computeOutlierWeights, as it's a wrapped C++ class. This + is a workaround to avoid Dask needing to serialize the output. + """ self.results = results def __reduce__(self): - return (MFASWrapper, (self.results,)) + return (MFASResultsWrapper, ()) class TranslationAveraging1DSFM(TranslationAveragingBase): @@ -266,21 +229,21 @@ def get_prior_in_world_frame(i2, i2Ti1_prior): @staticmethod def run_mfas( - mfas_results_wrapper: MFASResultsWrapper, w_i2Ui1_dict: RelativeDirectionsDict, w_iUj_dict_tracks: RelativeDirectionsDict, directions: List[Unit3], - ): + ) -> Dict[Tuple[int, int], float]: + """Runs MFAS on a batch of directions.""" w_i1Ui2_measurements = TranslationAveraging1DSFM._binary_measurements_from_dict( w_i2Ui1_dict, w_iUj_dict_tracks, DUMMY_NOISE_MODEL ) - # mfas_results_wrapper = MFASResultsWrapper() + results = [] for dir in directions: - # mfas = mfas_wrapper.mfas(w_i1Ui2_measurements, dir) - # mfas_wrapper.results.append(mfas.computeOutlierWeights()) - mfas_results_wrapper.results.append(MFAS(w_i1Ui2_measurements, dir).computeOutlierWeights()) + # Note: Have to convert output of MFAS::computeOutlierWeights to Dict, as Dask has no instructions to pickle + # KeyPairDoubleMap objects. + results.append(dict(MFAS(w_i1Ui2_measurements, dir).computeOutlierWeights())) - return mfas_results_wrapper + return results def compute_inliers( self, @@ -321,7 +284,6 @@ def compute_inliers( for j in range(0, len(projection_directions), batch_size): batched_outlier_weights.append( dask.delayed(self.run_mfas)( - MFASResultsWrapper(), future_w_i2Ui1_dict, future_w_iUj_dict_tracks, projection_directions[j : j + batch_size], @@ -333,13 +295,13 @@ def compute_inliers( # Compute outlier weights in parallel. _t2 = timeit.default_timer() batched_outlier_weights = dask.compute(*batched_outlier_weights) - logger.info(f"Computed outlier weights using MFAS in {timeit.default_timer() - _t2}.") + logger.info("Computed outlier weights using MFAS in %.2f seconds." % (timeit.default_timer() - _t2)) # Compute average outlier weight. outlier_weights_sum: DefaultDict[Tuple[int, int], float] = defaultdict(float) inliers = set() for batch_outlier_weights in batched_outlier_weights: - for outlier_weight_dict in batch_outlier_weights.results: + for outlier_weight_dict in batch_outlier_weights: for w_i1Ui2 in w_i1Ui2_measurements: i1, i2 = w_i1Ui2.key1(), w_i1Ui2.key2() outlier_weights_sum[(i1, i2)] += outlier_weight_dict[(i1, i2)] From c70fc39b18dc92ad41a38c7547a47be5d1208e49 Mon Sep 17 00:00:00 2001 From: Travis Driver Date: Tue, 7 Nov 2023 10:57:19 -0500 Subject: [PATCH 09/11] Cleanup --- .../averaging/translation/averaging_1dsfm.py | 28 ------------------- 1 file changed, 28 deletions(-) diff --git a/gtsfm/averaging/translation/averaging_1dsfm.py b/gtsfm/averaging/translation/averaging_1dsfm.py index f1dff33b5..75effbeea 100644 --- a/gtsfm/averaging/translation/averaging_1dsfm.py +++ b/gtsfm/averaging/translation/averaging_1dsfm.py @@ -74,29 +74,6 @@ DUMMY_NOISE_MODEL = gtsam.noiseModel.Isotropic.Sigma(3, 1e-2) # MFAS does not use this. -class MFASWrapper(object): - def __init__(self, mfas, results=[]) -> None: - """ """ - self.mfas = mfas - self.results = [] - - def __reduce__(self): - return (MFASWrapper, (self.mfas, self.results)) - - -class MFASResultsWrapper(object): - def __init__(self, results=[]) -> None: - """Wraps result from MFAS. - - Note: Dask is unable to serialize the output from MFAS::computeOutlierWeights, as it's a wrapped C++ class. This - is a workaround to avoid Dask needing to serialize the output. - """ - self.results = results - - def __reduce__(self): - return (MFASResultsWrapper, ()) - - class TranslationAveraging1DSFM(TranslationAveragingBase): """1D-SFM translation averaging with outlier rejection.""" @@ -274,12 +251,9 @@ def compute_inliers( client = get_client() future_w_i2Ui1_dict = client.scatter(w_i2Ui1_dict, broadcast=True) future_w_iUj_dict_tracks = client.scatter(w_iUj_dict_tracks, broadcast=True) - # future_mfas_wrapper = client.scatter(MFASWrapper(MFAS), broadcast=True) # Loop through tracks and and generate delayed MFAS tasks. - # _t0 = timeit.default_timer() batch_size = int(np.ceil(len(projection_directions) / self._max_delayed_calls)) - # print("BATCH SIZE:", batch_size, len(projection_directions)) batched_outlier_weights: List[Any] = [] for j in range(0, len(projection_directions), batch_size): batched_outlier_weights.append( @@ -289,8 +263,6 @@ def compute_inliers( projection_directions[j : j + batch_size], ) ) - # _t1 = timeit.default_timer() - # print(f"Built delayed Tasks in {_t1 - _t0} seconds") # Compute outlier weights in parallel. _t2 = timeit.default_timer() From 94997c6f0338c6b15c9218e27f458a1312a7957d Mon Sep 17 00:00:00 2001 From: Travis Driver Date: Tue, 7 Nov 2023 12:29:08 -0500 Subject: [PATCH 10/11] Fix unit tests --- gtsfm/averaging/translation/averaging_1dsfm.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/gtsfm/averaging/translation/averaging_1dsfm.py b/gtsfm/averaging/translation/averaging_1dsfm.py index 75effbeea..d74175924 100644 --- a/gtsfm/averaging/translation/averaging_1dsfm.py +++ b/gtsfm/averaging/translation/averaging_1dsfm.py @@ -247,10 +247,15 @@ def compute_inliers( # Convert to measurements: map indexes to symbols. w_i1Ui2_measurements = self._binary_measurements_from_dict(w_i2Ui1_dict, w_iUj_dict_tracks, DUMMY_NOISE_MODEL) - # TODO(johnwlambert): Consider getting global client and scattering large data. - client = get_client() - future_w_i2Ui1_dict = client.scatter(w_i2Ui1_dict, broadcast=True) - future_w_iUj_dict_tracks = client.scatter(w_iUj_dict_tracks, broadcast=True) + # Scatter data to all workers if client available. + try: + client = get_client() + future_w_i2Ui1_dict = client.scatter(w_i2Ui1_dict, broadcast=True) + future_w_iUj_dict_tracks = client.scatter(w_iUj_dict_tracks, broadcast=True) + except ValueError: # this should only happen for unit tests. + logger.info("No Dask client found... Running without scattering.") + future_w_i2Ui1_dict = w_i2Ui1_dict + future_w_iUj_dict_tracks = w_iUj_dict_tracks # Loop through tracks and and generate delayed MFAS tasks. batch_size = int(np.ceil(len(projection_directions) / self._max_delayed_calls)) From 09c116bc7d69a682e44dd1975d59750f1071c9db Mon Sep 17 00:00:00 2001 From: Travis Driver Date: Wed, 8 Nov 2023 12:26:14 -0500 Subject: [PATCH 11/11] Updated comments --- gtsfm/averaging/translation/averaging_1dsfm.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gtsfm/averaging/translation/averaging_1dsfm.py b/gtsfm/averaging/translation/averaging_1dsfm.py index d74175924..79fd3eea6 100644 --- a/gtsfm/averaging/translation/averaging_1dsfm.py +++ b/gtsfm/averaging/translation/averaging_1dsfm.py @@ -252,7 +252,7 @@ def compute_inliers( client = get_client() future_w_i2Ui1_dict = client.scatter(w_i2Ui1_dict, broadcast=True) future_w_iUj_dict_tracks = client.scatter(w_iUj_dict_tracks, broadcast=True) - except ValueError: # this should only happen for unit tests. + except ValueError: # allows use without initializing client. logger.info("No Dask client found... Running without scattering.") future_w_i2Ui1_dict = w_i2Ui1_dict future_w_iUj_dict_tracks = w_iUj_dict_tracks