-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathparallel_odcm.py
1212 lines (1079 loc) · 64.4 KB
/
parallel_odcm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""Compute a large Origin Destination (OD) cost matrix by chunking the
inputs and solving in parallel. Write outputs into a single combined
feature class, a collection of CSV files, or a collection of Apache
Arrow files.
This is a sample script users can modify to fit their specific needs.
This script is intended to be called as a subprocess from the solve_large_odcm.py script
so that it can launch parallel processes with concurrent.futures. It must be
called as a subprocess because the main script tool process, when running
within ArcGIS Pro, cannot launch parallel subprocesses on its own.
This script should not be called directly from the command line.
Copyright 2024 Esri
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
# pylint: disable=logging-fstring-interpolation
import os
import uuid
import logging
import shutil
import itertools
import time
import datetime
import traceback
import argparse
import csv
import pandas as pd
import arcpy
# Import OD Cost Matrix settings from config file
from od_config import OD_PROPS, OD_PROPS_SET_BY_TOOL
import helpers
if helpers.arcgis_version >= "2.9":
# The pyarrow module was not included in earlier versions of Pro, so do not attempt to import it.
# Tool validation prevents users from choosing the Arrow output format in older versions anyway,
# so this module will not be needed.
import pyarrow as pa
from pyarrow import fs
DELETE_INTERMEDIATE_OD_OUTPUTS = True # Set to False for debugging purposes
# Change logging.INFO to logging.DEBUG to see verbose debug messages
LOG_LEVEL = logging.INFO
class ODCostMatrix(
helpers.JobFolderMixin, helpers.LoggingMixin, helpers.MakeNDSLayerMixin
): # pylint:disable = too-many-instance-attributes
"""Used for solving an OD Cost Matrix problem in parallel for a designated chunk of the input datasets."""
def __init__(self, **kwargs):
"""Initialize the OD Cost Matrix analysis for the given inputs.
Expected arguments:
- origins
- destinations
- output_format
- output_od_location
- network_data_source
- travel_mode
- time_units
- distance_units
- cutoff
- num_destinations
- time_of_day
- scratch_folder
- barriers
"""
self.origins = kwargs["origins"]
self.destinations = kwargs["destinations"]
self.output_format = kwargs["output_format"]
self.output_od_location = kwargs["output_od_location"]
self.network_data_source = kwargs["network_data_source"]
self.travel_mode = kwargs["travel_mode"]
self.time_units = kwargs["time_units"]
self.distance_units = kwargs["distance_units"]
self.cutoff = kwargs["cutoff"]
self.num_destinations = kwargs["num_destinations"]
self.time_of_day = kwargs["time_of_day"]
self.scratch_folder = kwargs["scratch_folder"]
self.barriers = []
if "barriers" in kwargs:
self.barriers = kwargs["barriers"]
# Create a job ID and a folder for this job
self._create_job_folder()
# Setup the class logger. Logs for each parallel process are not written to the console but instead to a
# process-specific log file.
self.setup_logger("ODCostMatrix")
# Set up other instance attributes
self.is_service = helpers.is_nds_service(self.network_data_source)
self.od_solver = None
self.solve_result = None
self.time_attribute = ""
self.distance_attribute = ""
self.is_travel_mode_time_based = True
self.is_travel_mode_dist_based = True
self.optimized_field_name = None
self.input_origins_layer = "InputOrigins" + self.job_id
self.input_destinations_layer = "InputDestinations" + self.job_id
self.input_origins_layer_obj = None
self.input_destinations_layer_obj = None
# Define fields to include in the output for CSV and Arrow modes
self.output_fields = ["OriginOID", "DestinationOID", "DestinationRank", "Total_Time", "Total_Distance"]
# Create a network dataset layer if needed
if not self.is_service:
self._make_nds_layer()
# Prepare a dictionary to store info about the analysis results
self.job_result = {
"jobId": self.job_id,
"jobFolder": self.job_folder,
"solveSucceeded": False,
"solveMessages": "",
"outputLines": "",
"logFile": self.log_file
}
# Get the ObjectID fields for origins and destinations
desc_origins = arcpy.Describe(self.origins)
desc_destinations = arcpy.Describe(self.destinations)
self.origins_oid_field_name = desc_origins.oidFieldName
self.destinations_oid_field_name = desc_destinations.oidFieldName
self.origins_fields = desc_origins.fields
self.origins_has_cutoff_field = "cutoff" in [f.name.lower() for f in self.origins_fields]
self.destinations_fields = desc_destinations.fields
self.orig_origin_oid_field = "Orig_Origin_OID"
self.orig_dest_oid_field = "Orig_Dest_OID"
def initialize_od_solver(self):
"""Initialize an OD solver object and set properties."""
# For a local network dataset, we need to checkout the Network Analyst extension license.
if not self.is_service:
arcpy.CheckOutExtension("network")
# Create a new OD cost matrix object
self.logger.debug("Creating OD Cost Matrix object...")
self.od_solver = arcpy.nax.OriginDestinationCostMatrix(self.network_data_source)
# Set the OD cost matrix analysis properties.
# Read properties from the od_config.py config file for all properties not set in the UI as parameters.
# OD properties documentation: https://pro.arcgis.com/en/pro-app/arcpy/network-analyst/odcostmatrix.htm
# The properties have been extracted to the config file to make them easier to find and set so users don't have
# to dig through the code to change them.
self.logger.debug("Setting OD Cost Matrix analysis properties from OD config file...")
for prop, value in OD_PROPS.items():
if prop in OD_PROPS_SET_BY_TOOL:
self.logger.warning(
f"OD config file property {prop} is handled explicitly by the tool parameters and will be ignored."
)
continue
try:
setattr(self.od_solver, prop, value)
if hasattr(value, "name"):
self.logger.debug(f"{prop}: {value.name}")
else:
self.logger.debug(f"{prop}: {value}")
except Exception as ex: # pylint: disable=broad-except
# Suppress warnings for older services (pre 11.0) that don't support locate settings and services
# that don't support accumulating attributes because we don't want the tool to always throw a warning.
if not (self.is_service and prop in [
"searchTolerance", "searchToleranceUnits", "accumulateAttributeNames"
]):
self.logger.warning(
f"Failed to set property {prop} from OD config file. Default will be used instead.")
self.logger.warning(str(ex))
# Set properties explicitly specified in the tool UI as arguments
self.logger.debug("Setting OD Cost Matrix analysis properties specified tool inputs...")
self.od_solver.travelMode = self.travel_mode
self.logger.debug(f"travelMode: {self.travel_mode}")
self.od_solver.timeUnits = self.time_units
self.logger.debug(f"timeUnits: {self.time_units.name}")
self.od_solver.distanceUnits = self.distance_units
self.logger.debug(f"distanceUnits: {self.distance_units.name}")
self.od_solver.defaultDestinationCount = self.num_destinations
self.logger.debug(f"defaultDestinationCount: {self.num_destinations}")
self.od_solver.defaultImpedanceCutoff = self.cutoff
self.logger.debug(f"defaultImpedanceCutoff: {self.cutoff}")
self.od_solver.timeOfDay = self.time_of_day
self.logger.debug(f"timeOfDay: {self.time_of_day}")
# Determine if the travel mode has impedance units that are time-based, distance-based, or other.
self._determine_if_travel_mode_time_based()
def solve(self, origins_criteria, destinations_criteria): # pylint: disable=too-many-locals, too-many-statements
"""Create and solve an OD Cost Matrix analysis for the designated chunk of origins and destinations.
Args:
origins_criteria (list): Origin ObjectID range to select from the input dataset
destinations_criteria ([type]): Destination ObjectID range to select from the input dataset
"""
# Initialize the OD solver object
self.initialize_od_solver()
# Select the origins and destinations to process
self._select_inputs(origins_criteria, destinations_criteria)
if not self.input_destinations_layer_obj:
# No destinations met the criteria for this set of origins
self.logger.debug("No destinations met the criteria for this set of origins. Skipping OD calculation.")
return
# Load the origins
self.logger.debug("Loading origins...")
self.od_solver.addFields(
arcpy.nax.OriginDestinationCostMatrixInputDataType.Origins,
[[self.orig_origin_oid_field, "LONG"]]
)
origins_field_mappings = self.od_solver.fieldMappings(
arcpy.nax.OriginDestinationCostMatrixInputDataType.Origins,
True # Use network location fields
)
for fname in origins_field_mappings:
if fname == self.orig_origin_oid_field:
origins_field_mappings[fname].mappedFieldName = self.origins_oid_field_name
else:
origins_field_mappings[fname].mappedFieldName = fname
self.od_solver.load(
arcpy.nax.OriginDestinationCostMatrixInputDataType.Origins,
self.input_origins_layer_obj,
origins_field_mappings,
False
)
# Load the destinations
self.logger.debug("Loading destinations...")
self.od_solver.addFields(
arcpy.nax.OriginDestinationCostMatrixInputDataType.Destinations,
[[self.orig_dest_oid_field, "LONG"]]
)
destinations_field_mappings = self.od_solver.fieldMappings(
arcpy.nax.OriginDestinationCostMatrixInputDataType.Destinations,
True # Use network location fields
)
for fname in destinations_field_mappings:
if fname == self.orig_dest_oid_field:
destinations_field_mappings[fname].mappedFieldName = self.destinations_oid_field_name
else:
destinations_field_mappings[fname].mappedFieldName = fname
self.od_solver.load(
arcpy.nax.OriginDestinationCostMatrixInputDataType.Destinations,
self.input_destinations_layer_obj,
destinations_field_mappings,
False
)
# Load barriers
# Note: This loads ALL barrier features for every analysis, even if they are very far away from any of
# the inputs in the current chunk. You may want to select only barriers within a reasonable distance of the
# inputs, particularly if you run into the maximumFeaturesAffectedByLineBarriers,
# maximumFeaturesAffectedByPointBarriers, and maximumFeaturesAffectedByPolygonBarriers tool limits for portal
# solves. However, since barriers is likely an unusual case, deal with this only if it becomes a problem.
for barrier_fc in self.barriers:
self.logger.debug(f"Loading barriers feature class {barrier_fc}...")
shape_type = arcpy.Describe(barrier_fc).shapeType
if shape_type == "Polygon":
class_type = arcpy.nax.OriginDestinationCostMatrixInputDataType.PolygonBarriers
elif shape_type == "Polyline":
class_type = arcpy.nax.OriginDestinationCostMatrixInputDataType.LineBarriers
elif shape_type == "Point":
class_type = arcpy.nax.OriginDestinationCostMatrixInputDataType.PointBarriers
else:
self.logger.warning(
f"Barrier feature class {barrier_fc} has an invalid shape type and will be ignored."
)
continue
barriers_field_mappings = self.od_solver.fieldMappings(class_type, True)
self.od_solver.load(class_type, barrier_fc, barriers_field_mappings, True)
# Solve the OD cost matrix analysis
self.logger.debug("Solving OD cost matrix...")
solve_start = time.time()
self.solve_result = self.od_solver.solve()
solve_end = time.time()
self.logger.debug(f"Solving OD cost matrix completed in {round(solve_end - solve_start, 3)} seconds.")
# Handle solve messages
solve_msgs = [msg[-1] for msg in self.solve_result.solverMessages(arcpy.nax.MessageSeverity.All)]
initial_num_msgs = len(solve_msgs)
for msg in solve_msgs:
self.logger.debug(msg)
# Remove repetitive messages so they don't clog up the stdout pipeline when running the tool
# 'No "Destinations" found for "Location 1" in "Origins".' is a common message that tends to be repeated and is
# not particularly useful to see in bulk.
# Note that this will not work for localized software when this message is translated.
common_msg_prefix = 'No "Destinations" found for '
solve_msgs = [msg for msg in solve_msgs if not msg.startswith(common_msg_prefix)]
num_msgs_removed = initial_num_msgs - len(solve_msgs)
if num_msgs_removed:
self.logger.debug(f"Repetitive messages starting with {common_msg_prefix} were consolidated.")
solve_msgs.append(f"No destinations were found for {num_msgs_removed} origins.")
solve_msgs = "\n".join(solve_msgs)
# Update the result dictionary
self.job_result["solveMessages"] = solve_msgs
if not self.solve_result.solveSucceeded:
self.logger.debug("Solve failed.")
return
self.logger.debug("Solve succeeded.")
self.job_result["solveSucceeded"] = True
# Save output
# Example: ODLines_O_1_1000_D_2001_3000.csv
out_filename = (f"ODLines_O_{origins_criteria[0]}_{origins_criteria[1]}_"
f"D_{destinations_criteria[0]}_{destinations_criteria[1]}")
if self.output_format is helpers.OutputFormat.featureclass:
self._export_to_feature_class(out_filename)
elif self.output_format is helpers.OutputFormat.csv:
out_csv_file = os.path.join(self.output_od_location, f"{out_filename}.csv")
self._export_to_csv(out_csv_file)
elif self.output_format is helpers.OutputFormat.arrow:
out_arrow_file = os.path.join(self.output_od_location, f"{out_filename}.arrow")
self._export_to_arrow(out_arrow_file)
self.logger.debug("Finished calculating OD cost matrix.")
def _export_to_feature_class(self, out_fc_name):
"""Export the OD Lines result to a feature class."""
# Make output gdb
od_workspace = self._create_output_gdb()
output_od_lines = os.path.join(od_workspace, out_fc_name)
self.logger.debug(f"Exporting OD cost matrix Lines output to {output_od_lines}...")
self.solve_result.export(arcpy.nax.OriginDestinationCostMatrixOutputDataType.Lines, output_od_lines)
self.job_result["outputLines"] = output_od_lines
# For services solve in pre-3.0 versions of ArcGIS Pro, export Origins and Destinations and properly populate
# OriginOID and DestinationOID fields in the output Lines. Services do not preserve the original input OIDs,
# instead resetting from 1, unlike solves using a local network dataset. This issue was handled on the client
# side in the ArcGIS Pro 3.1 release, but for older software, this extra post-processing step is necessary.
if self.is_service and helpers.arcgis_version < "3.1":
output_origins = os.path.join(od_workspace, "output_od_origins")
self.logger.debug(f"Exporting OD cost matrix Origins output to {output_origins}...")
self.solve_result.export(arcpy.nax.OriginDestinationCostMatrixOutputDataType.Origins, output_origins)
output_destinations = os.path.join(od_workspace, "output_od_destinations")
self.logger.debug(f"Exporting OD cost matrix Destinations output to {output_destinations}...")
self.solve_result.export(
arcpy.nax.OriginDestinationCostMatrixOutputDataType.Destinations, output_destinations)
self.logger.debug("Updating values for OriginOID and DestinationOID fields...")
lines_layer_name = "Out_Lines"
origins_layer_name = "Out_Origins"
destinations_layer_name = "Out_Destinations"
helpers.run_gp_tool(self.logger, arcpy.management.MakeFeatureLayer, [output_od_lines, lines_layer_name])
helpers.run_gp_tool(self.logger, arcpy.management.MakeFeatureLayer, [output_origins, origins_layer_name])
helpers.run_gp_tool(
self.logger, arcpy.management.MakeFeatureLayer, [output_destinations, destinations_layer_name])
# Update OriginOID values
helpers.run_gp_tool(
self.logger,
arcpy.management.AddJoin,
[lines_layer_name, "OriginOID", origins_layer_name, "ObjectID", "KEEP_ALL"]
)
helpers.run_gp_tool(
self.logger,
arcpy.management.CalculateField,
[lines_layer_name, f"{os.path.basename(output_od_lines)}.OriginOID",
f"!{os.path.basename(output_origins)}.{self.orig_origin_oid_field}!", "PYTHON3"]
)
helpers.run_gp_tool(self.logger, arcpy.management.RemoveJoin, [lines_layer_name])
# Update DestinationOID values
helpers.run_gp_tool(
self.logger,
arcpy.management.AddJoin,
[lines_layer_name, "DestinationOID", destinations_layer_name, "ObjectID", "KEEP_ALL"]
)
helpers.run_gp_tool(
self.logger,
arcpy.management.CalculateField,
[lines_layer_name, f"{os.path.basename(output_od_lines)}.DestinationOID",
f"!{os.path.basename(output_destinations)}.{self.orig_dest_oid_field}!", "PYTHON3"]
)
helpers.run_gp_tool(self.logger, arcpy.management.RemoveJoin, [lines_layer_name])
def _export_to_csv(self, out_csv_file):
"""Save the OD Lines result to a CSV file."""
self.logger.debug(f"Saving OD cost matrix Lines output to CSV as {out_csv_file}.")
# For services solve in pre-3.0 versions of ArcGIS Pro, export Origins and Destinations and properly populate
# OriginOID and DestinationOID fields in the output Lines. Services do not preserve the original input OIDs,
# instead resetting from 1, unlike solves using a local network dataset. This issue was handled on the client
# side in the ArcGIS Pro 3.1 release, but for older software, this extra post-processing step is necessary.
if self.is_service and helpers.arcgis_version < "3.1":
# Read the Lines output
with self.solve_result.searchCursor(
arcpy.nax.OriginDestinationCostMatrixOutputDataType.Lines, self.output_fields
) as cur:
od_df = pd.DataFrame(cur, columns=self.output_fields)
# Read the Origins output and transfer original OriginOID to Lines
origins_columns = ["ObjectID", self.orig_origin_oid_field]
with self.solve_result.searchCursor(
arcpy.nax.OriginDestinationCostMatrixOutputDataType.Origins, origins_columns
) as cur:
origins_df = pd.DataFrame(cur, columns=origins_columns)
origins_df.set_index("ObjectID", inplace=True)
od_df = od_df.join(origins_df, "OriginOID")
del origins_df
# Read the Destinations output and transfer original DestinationOID to Lines
dest_columns = ["ObjectID", self.orig_dest_oid_field]
with self.solve_result.searchCursor(
arcpy.nax.OriginDestinationCostMatrixOutputDataType.Destinations, dest_columns
) as cur:
dests_df = pd.DataFrame(cur, columns=dest_columns)
dests_df.set_index("ObjectID", inplace=True)
od_df = od_df.join(dests_df, "DestinationOID")
del dests_df
# Clean up and rename columns
od_df.drop(["OriginOID", "DestinationOID"], axis="columns", inplace=True)
od_df.rename(
columns={self.orig_origin_oid_field: "OriginOID", self.orig_dest_oid_field: "DestinationOID"},
inplace=True
)
# Write CSV file
od_df.to_csv(out_csv_file, index=False)
else: # Local network dataset output
with open(out_csv_file, "w", newline='') as f:
writer = csv.writer(f)
writer.writerow(self.output_fields)
for row in self.solve_result.searchCursor(
arcpy.nax.OriginDestinationCostMatrixOutputDataType.Lines,
self.output_fields
):
writer.writerow(row)
self.job_result["outputLines"] = out_csv_file
def _export_to_arrow(self, out_arrow_file):
"""Save the OD Lines result to an Apache Arrow file."""
self.logger.debug(f"Saving OD cost matrix Lines output to Apache Arrow as {out_arrow_file}.")
self.solve_result.toArrowTable(
arcpy.nax.OriginDestinationCostMatrixOutputDataType.Lines,
self.output_fields,
out_arrow_file
)
self.job_result["outputLines"] = out_arrow_file
def _hour_to_time_units(self):
"""Convert 1 hour to the user's specified time units.
Raises:
ValueError: if the time units are not one of the known arcpy.nax.TimeUnits enums
Returns:
float: 1 hour in the user's specified time units
"""
if self.time_units == arcpy.nax.TimeUnits.Minutes:
return 60.
if self.time_units == arcpy.nax.TimeUnits.Seconds:
return 3600.
if self.time_units == arcpy.nax.TimeUnits.Hours:
return 1.
if self.time_units == arcpy.nax.TimeUnits.Days:
return 1/24.
# If we got to this point, the time units were invalid.
err = f"Invalid time units: {self.time_units}"
self.logger.error(err)
raise ValueError(err)
def _mile_to_dist_units(self):
"""Convert 1 mile to the user's specified distance units.
Raises:
ValueError: if the distance units are not one of the known arcpy.nax.DistanceUnits enums
Returns:
float: 1 mile in the user's specified distance units
"""
if self.distance_units == arcpy.nax.DistanceUnits.Miles:
return 1.
if self.distance_units == arcpy.nax.DistanceUnits.Kilometers:
return 1.60934
if self.distance_units == arcpy.nax.DistanceUnits.Meters:
return 1609.33999997549
if self.distance_units == arcpy.nax.DistanceUnits.Feet:
return 5280.
if self.distance_units == arcpy.nax.DistanceUnits.Yards:
return 1760.
if self.distance_units == arcpy.nax.DistanceUnits.NauticalMiles:
return 0.868976
# If we got to this point, the distance units were invalid.
err = f"Invalid distance units: {self.distance_units}"
self.logger.error(err)
raise ValueError(err)
def _convert_time_cutoff_to_distance(self, cutoff):
"""Convert a time-based cutoff to distance units.
For a time-based travel mode, the cutoff is expected to be in the user's specified time units. Convert this
to a safe straight-line distance cutoff in the user's specified distance units to use when pre-selecting
destinations relevant to this chunk.
Returns:
float: Distance cutoff to use for pre-selecting destinations by straight-line distance
"""
# Assume a max driving speed. Note: If your analysis is doing something other than driving, you may want to
# update this.
max_speed = 80. # Miles per hour
# Convert the assumed max speed to the user-specified distance units / time units
max_speed = max_speed * (self._mile_to_dist_units() / self._hour_to_time_units()) # distance units / time units
# Convert the user's cutoff from time to the user's distance units
cutoff_dist = cutoff * max_speed
# Add a 5% margin to be on the safe side
cutoff_dist = cutoff_dist + (0.05 * cutoff_dist)
self.logger.debug((
f"Time cutoff {cutoff} {self.time_units.name} converted to distance: "
f"{cutoff_dist} {self.distance_units.name}"
))
return cutoff_dist
def _get_max_cutoff_for_chunk(self):
"""Determine the max cutoff value that applies to this chunk of origins."""
if not self.origins_has_cutoff_field:
# There are no per-origin cutoffs, so the default cutoff is the only cutoff
self.logger.debug("Origins doesn't have a Cutoff field, so there are no per-origin cutoffs.")
return self.cutoff
self.logger.debug("Checking chunk of Origins for per-origin cutoffs...")
default_cutoff = self.cutoff if self.cutoff else 0
max_cutoff = 0
for row in arcpy.da.SearchCursor(self.input_origins_layer_obj, ["Cutoff"]):
# The cutoff for the origin is either the value specified in the field, or, if that value is null or
# invalid, the specified default cutoff
try:
origin_cutoff = row[0] if row[0] and row[0] > 0 else default_cutoff
except Exception:
# Invalid per-origin cutoff value. Ignore it and use the default.
origin_cutoff = default_cutoff
max_cutoff = max(origin_cutoff, max_cutoff)
self.logger.debug(f"Max per-origin cutoff discovered: {max_cutoff}")
if max_cutoff == 0:
# No cutoffs specified for this chunk
return None
return max_cutoff
def _select_inputs(self, origins_criteria, destinations_criteria):
"""Create layers from the origins and destinations so the layers contain only the desired inputs for the chunk.
Args:
origins_criteria (list): Origin ObjectID range to select from the input dataset
destinations_criteria ([type]): Destination ObjectID range to select from the input dataset
"""
# Select the origins with ObjectIDs in this range
self.logger.debug("Selecting origins for this chunk...")
origins_where_clause = (
f"{self.origins_oid_field_name} >= {origins_criteria[0]} "
f"And {self.origins_oid_field_name} <= {origins_criteria[1]}"
)
self.logger.debug(f"Origins where clause: {origins_where_clause}")
self.input_origins_layer_obj = helpers.run_gp_tool(
self.logger,
arcpy.management.MakeFeatureLayer,
[self.origins, self.input_origins_layer, origins_where_clause],
).getOutput(0)
num_origins = int(arcpy.management.GetCount(self.input_origins_layer_obj).getOutput(0))
self.logger.debug(f"Number of origins selected: {num_origins}")
# Select the destinations with ObjectIDs in this range
self.logger.debug("Selecting destinations for this chunk...")
destinations_where_clause = (
f"{self.destinations_oid_field_name} >= {destinations_criteria[0]} "
f"And {self.destinations_oid_field_name} <= {destinations_criteria[1]} "
)
self.logger.debug(f"Destinations where clause: {destinations_where_clause}")
self.input_destinations_layer_obj = helpers.run_gp_tool(
self.logger,
arcpy.management.MakeFeatureLayer,
[self.destinations, self.input_destinations_layer, destinations_where_clause],
).getOutput(0)
num_destinations = int(arcpy.management.GetCount(self.input_destinations_layer_obj).getOutput(0))
self.logger.debug(f"Number of destinations selected: {num_destinations}")
# Eliminate irrelevant destinations in this chunk if possible by selecting only those that fall within a
# reasonable straight-line distance cutoff. The straight-line distance will always be >= the network distance,
# so any destinations falling beyond our cutoff limit in straight-line distance are guaranteed to be irrelevant
# for the network-based OD cost matrix analysis
# > If using a travel mode with impedance units that are not time or distance-based, we cannot determine how to
# convert the cutoff units into a sensible distance buffer, so just return
if not self.is_travel_mode_time_based and not self.is_travel_mode_dist_based:
return
# > If not using an impedance cutoff, we cannot do anything here, so just return
if not self.cutoff and not self.origins_has_cutoff_field:
return
# Determine the max cutoff for this chunk by combining the specified default cutoff and any per-origin
# Cutoff field values
cutoff = self._get_max_cutoff_for_chunk()
self.logger.debug(f"Initial cutoff value to use: {cutoff}")
# > If no default or per-origin cutoffs specified for this chunk, we cannot do anything here, so just return
if not cutoff:
return
# > If using a distance-based travel mode, use the cutoff value directly
if self.is_travel_mode_dist_based:
cutoff_dist = cutoff + (0.05 * cutoff) # Use 5% margin to be on the safe side
# > If using a time-based travel mode, convert the time-based cutoff to a distance value in the user's specified
# distance units by assuming a fast maximum travel speed
else:
cutoff_dist = self._convert_time_cutoff_to_distance(cutoff)
# Use SelectLayerByLocation to select those within a straight-line distance
self.logger.debug(
f"Eliminating destinations outside of distance threshold {cutoff_dist} {self.distance_units.name}...")
self.input_destinations_layer_obj = helpers.run_gp_tool(
self.logger,
arcpy.management.SelectLayerByLocation, [
self.input_destinations_layer,
"WITHIN_A_DISTANCE_GEODESIC",
self.input_origins_layer,
f"{cutoff_dist} {self.distance_units.name}",
]
).getOutput(0)
# If no destinations are within the cutoff, reset the destinations layer object
# so the iteration will be skipped
if not self.input_destinations_layer_obj.getSelectionSet():
self.input_destinations_layer_obj = None
msg = "No destinations found within the distance threshold."
self.logger.debug(msg)
self.job_result["solveMessages"] = msg
return
num_destinations = int(arcpy.management.GetCount(self.input_destinations_layer_obj).getOutput(0))
self.logger.debug(f"Number of destinations selected: {num_destinations}")
def _determine_if_travel_mode_time_based(self):
"""Determine if the travel mode uses a time-based impedance attribute."""
# Get the travel mode object from the already-instantiated OD solver object. This saves us from having to parse
# the user's input travel mode from its string name, object, or json representation.
travel_mode = self.od_solver.travelMode
impedance = travel_mode.impedance
time_attribute = travel_mode.timeAttributeName
distance_attribute = travel_mode.distanceAttributeName
self.is_travel_mode_time_based = time_attribute == impedance
self.is_travel_mode_dist_based = distance_attribute == impedance
# Determine which of the OD Lines output table fields contains the optimized cost values
if not self.is_travel_mode_time_based and not self.is_travel_mode_dist_based:
self.optimized_field_name = "Total_Other"
self.output_fields.append(self.optimized_field_name)
elif self.is_travel_mode_time_based:
self.optimized_field_name = "Total_Time"
else:
self.optimized_field_name = "Total_Distance"
def solve_od_cost_matrix(chunk, inputs):
"""Solve an OD Cost Matrix analysis for the given inputs for the given chunk of ObjectIDs.
Args:
chunk (list): Represents the ObjectID ranges to select from the origins and destinations when solving the OD
Cost Matrix. For example, [[1, 1000], [4001, 5000]] means use origin OIDs 1-1000 and destination OIDs
4001-5000.
inputs (dict): Dictionary of keyword inputs suitable for initializing the ODCostMatrix class
Returns:
dict: Dictionary of results from the ODCostMatrix class
"""
odcm = ODCostMatrix(**inputs)
odcm.logger.info((
f"Processing origins OID {chunk[0][0]} to {chunk[0][1]} and destinations OID {chunk[1][0]} to {chunk[1][1]} "
f"as job id {odcm.job_id}"
))
odcm.solve(chunk[0], chunk[1])
odcm.teardown_logger()
return odcm.job_result
class ParallelODCalculator:
"""Solves a large OD Cost Matrix by chunking the problem, solving in parallel, and combining results."""
def __init__( # pylint: disable=too-many-locals, too-many-arguments
self, logger, origins, destinations, network_data_source, travel_mode, output_format, output_od_location,
max_origins, max_destinations, max_processes, time_units, distance_units,
cutoff=None, num_destinations=None, time_of_day=None, barriers=None
):
"""Compute OD Cost Matrices between Origins and Destinations in parallel and combine results.
Compute OD cost matrices in parallel and combine and post-process the results.
This class assumes that the inputs have already been pre-processed and validated.
Args:
logger (logging.logger): Logger class to use for messages that get written to the GP window. Set up using
helpers.configure_global_logger().
origins (str): Catalog path to origins
destinations (str): Catalog path to destinations
network_data_source (str): Network data source catalog path or URL
travel_mode (str): String-based representation of a travel mode (name or JSON)
output_format (str): String representation of the output format
output_od_location (str): Catalog path to the output feature class or folder where the OD Lines output will
be stored.
max_origins (int): Maximum origins allowed in a chunk
max_destinations (int): Maximum destinations allowed in a chunk
max_processes (int): Maximum number of parallel processes allowed
time_units (str): String representation of time units
distance_units (str): String representation of distance units
cutoff (float, optional): Impedance cutoff to limit the OD Cost Matrix solve. Interpreted in the time_units
if the travel mode is time-based. Interpreted in the distance-units if the travel mode is distance-
based. Interpreted in the impedance units if the travel mode is neither time- nor distance-based.
Defaults to None. When None, do not use a cutoff.
num_destinations (int, optional): The number of destinations to find for each origin. Defaults to None,
which means to find all destinations.
time_of_day (str): String representation of the start time for the analysis ("%Y%m%d %H:%M" format)
barriers (list(str), optional): List of catalog paths to point, line, and polygon barriers to use.
Defaults to None.
"""
# Set up logger that will write to the GP window
self.logger = logger
self.origins = origins
self.destinations = destinations
time_units = helpers.convert_time_units_str_to_enum(time_units)
distance_units = helpers.convert_distance_units_str_to_enum(distance_units)
self.output_format = helpers.convert_output_format_str_to_enum(output_format)
self.output_od_location = output_od_location
if cutoff == "":
cutoff = None
if not barriers:
barriers = []
if num_destinations == "":
num_destinations = None
self.num_destinations = num_destinations
self.max_processes = max_processes
if not time_of_day:
self.time_of_day = None
else:
self.time_of_day = datetime.datetime.strptime(time_of_day, helpers.DATETIME_FORMAT)
# Scratch folder to store intermediate outputs from the OD Cost Matrix processes
unique_id = uuid.uuid4().hex
self.scratch_folder = os.path.join(arcpy.env.scratchFolder, "ODCM_" + unique_id) # pylint: disable=no-member
self.logger.info(f"Intermediate outputs will be written to {self.scratch_folder}.")
os.mkdir(self.scratch_folder)
# Output folder to store CSV or Arrow outputs
if self.output_format is not helpers.OutputFormat.featureclass:
if not os.path.exists(self.output_od_location):
os.mkdir(self.output_od_location)
# Initialize the dictionary of inputs to send to each OD solve
self.od_inputs = {
"origins": self.origins,
"destinations": self.destinations,
"output_format": self.output_format,
"output_od_location": self.output_od_location,
"network_data_source": network_data_source,
"travel_mode": travel_mode,
"scratch_folder": self.scratch_folder,
"time_units": time_units,
"distance_units": distance_units,
"cutoff": cutoff,
"num_destinations": self.num_destinations,
"time_of_day": self.time_of_day,
"barriers": barriers
}
# List of intermediate output OD Line files created by each process
self.od_line_files = []
# Construct OID ranges for chunks of origins and destinations
self.origin_ranges = helpers.get_oid_ranges_for_input(self.origins, max_origins)
destination_ranges = helpers.get_oid_ranges_for_input(self.destinations, max_destinations)
# Construct pairs of chunks to ensure that each chunk of origins is matched with each chunk of destinations
self.ranges = itertools.product(self.origin_ranges, destination_ranges)
# Calculate the total number of jobs to use in logging
self.total_jobs = len(self.origin_ranges) * len(destination_ranges)
self.optimized_cost_field = None
self.df_dest_count = None
def _validate_od_settings(self):
"""Validate OD cost matrix settings before spinning up a bunch of parallel processes doomed to failure.
Also check which field name in the output OD Lines will store the optimized cost values. This depends on the
travel mode being used by the analysis, and we capture it here to use in later steps.
Returns:
str: The name of the field in the output OD Lines table containing the optimized costs for the analysis
"""
# Create a dummy ODCostMatrix object and set properties. This allows us to
# detect any errors prior to spinning up a bunch of parallel processes and having them all fail.
self.logger.debug("Validating OD Cost Matrix settings...")
optimized_cost_field = None
odcm = None
try:
odcm = ODCostMatrix(**self.od_inputs)
odcm.initialize_od_solver()
# Check which field name in the output OD Lines will store the optimized cost values
optimized_cost_field = odcm.optimized_field_name
self.logger.debug("OD Cost Matrix settings successfully validated.")
except Exception:
self.logger.error("Error initializing OD Cost Matrix analysis.")
errs = traceback.format_exc().splitlines()
for err in errs:
self.logger.error(err)
raise
finally:
if odcm:
self.logger.debug("Deleting temporary test OD Cost Matrix job folder...")
# Close logging
odcm.teardown_logger()
# Delete output folder
shutil.rmtree(odcm.job_result["jobFolder"], ignore_errors=True)
del odcm
return optimized_cost_field
def solve_od_in_parallel(self):
"""Solve the OD Cost Matrix in chunks and post-process the results."""
# Validate OD Cost Matrix settings. Essentially, create a dummy ODCostMatrix class instance and set up the
# solver object to ensure this at least works. Do this up front before spinning up a bunch of parallel processes
# that are guaranteed to all fail. While we're doing this, check and store the field name that will represent
# costs in the output OD Lines table. We'll use this in post processing.
self.optimized_cost_field = self._validate_od_settings()
# Compute OD cost matrix in parallel
job_results = helpers.run_parallel_processes(
self.logger, solve_od_cost_matrix, [self.od_inputs], self.ranges,
self.total_jobs, self.max_processes,
"Solving OD Cost Matrix", "OD Cost Matrix"
)
# Parse the results and store components for post-processing.
for result in job_results:
if result["solveSucceeded"]:
self.od_line_files.append(result["outputLines"])
else:
# Typically, a solve fails because no destinations were found for any of the origins in the chunk,
# and this is a perfectly legitimate failure. It is not an error. However, they may be other, less
# likely, reasons for solve failure. Write solve messages to the main GP message thread in debug
# mode only in case the user is having problems. The user can also check the individual OD log
# files.
self.logger.debug(f"Solve failed for job id {result['jobId']}.")
self.logger.debug(result["solveMessages"])
# Post-process outputs
if self.od_line_files:
self.logger.info("Post-processing OD Cost Matrix results...")
self._check_per_origin_dest_counts()
self.od_line_files = sorted(self.od_line_files)
if self.output_format is helpers.OutputFormat.featureclass:
self._post_process_od_line_fcs()
elif self.output_format is helpers.OutputFormat.csv:
self._post_process_od_line_csvs()
elif self.output_format is helpers.OutputFormat.arrow:
self._post_process_od_line_arrow_files()
else:
self.logger.warning("All OD Cost Matrix solves failed, so no output was produced.")
# Clean up
# Delete the job folders if the job succeeded
if DELETE_INTERMEDIATE_OD_OUTPUTS:
self.logger.info("Deleting intermediate outputs...")
try:
shutil.rmtree(self.scratch_folder, ignore_errors=True)
except Exception: # pylint: disable=broad-except
# If deletion doesn't work, just throw a warning and move on. This does not need to kill the tool.
self.logger.warning(f"Unable to delete intermediate OD Cost Matrix output folder {self.scratch_folder}.")
self.logger.info("Finished calculating OD Cost Matrices.")
def _post_process_od_line_fcs(self):
"""Merge and post-process the OD Lines calculated in each separate process.
Create an empty final output feature class and populate it from each of the intermediate OD Lines feature
classes. Do this instead of simply using the Merge tool in order to correctly calculate the DestinationRank
field and eliminate extra records when only the k closest destinations should be found.
For the case where we wanted to find only the k closest destinations for each origin, calculating the OD in
chunks means our combined output may have more than k destinations for each origin because each individual chunk
found the closest k for that chunk. We need to eliminate all extra rows beyond the first k.
Calculating the OD in chunks also means the DestinationRank field calculated by each chunk is not correct for
the entire analysis. DestinationRank refers to the rank within the chunk, not the overall rank. We need to
recalculate DestinationRank considering the entire dataset.
"""
# Handle ridiculously huge outputs that may exceed the number of rows allowed in a 32-bit OID feature class
kwargs = {}
if helpers.arcgis_version >= "3.2": # 64-bit OIDs were introduced in ArcGIS Pro 3.2.
# First do a quick check to determine if the maximum possible OD pairs exceeds the 32-bit limit
max_possible_pairs = helpers.get_max_possible_od_pair_count(
self.origins, self.destinations, self.num_destinations
)
if max_possible_pairs > helpers.MAX_ALLOWED_FC_ROWS_32BIT:
# Do a slower check to determine if the actual number of actual output OD pairs exceeds the
# 32-bit limit
num_pairs = 0
for out_fc in self.od_line_files:
num_pairs += int(arcpy.management.GetCount(out_fc).getOutput(0))
if num_pairs > helpers.MAX_ALLOWED_FC_ROWS_32BIT:
# Use a 64bit OID field in the output feature class
kwargs = {"oid_type": "64_BIT"}
# Create the final output feature class
desc = arcpy.Describe(self.od_line_files[0])
helpers.run_gp_tool(
self.logger,
arcpy.management.CreateFeatureclass, [
os.path.dirname(self.output_od_location),
os.path.basename(self.output_od_location),
"POLYLINE",
self.od_line_files[0], # template feature class to transfer full schema
"SAME_AS_TEMPLATE",
"SAME_AS_TEMPLATE",
desc.spatialReference
],
kwargs
)
# Insert the rows from all the individual output feature classes into the final output
fields = ["SHAPE@"] + [f.name for f in desc.fields]
with arcpy.da.InsertCursor(self.output_od_location, fields) as cur: # pylint: disable=no-member
# Handle each origin range separately to avoid pulling all results into memory at once
for origin_range in self.origin_ranges:
fcs_for_origin_range = [
f for f in self.od_line_files if os.path.basename(f).startswith(
f"ODLines_O_{origin_range[0]}_{origin_range[1]}_"
)
]
if not fcs_for_origin_range:
# No records for this chunk of origins. Just move on to the next chunk of origins.
continue
if len(fcs_for_origin_range) < 2:
# All results for this chunk of origins are already in the same table, so
# there is no need to post-process because the k closest have already been found, and the
# DestinationRank field is already correct. Just insert the records directly into the final output
# table.
for row in arcpy.da.SearchCursor(fcs_for_origin_range[0], fields): # pylint: disable=no-member
cur.insertRow(row)
continue
# If there are multiple feature classes to handle at once, we need to eliminate extra rows and
# properly update the DestinationRank field. To do this, read them into pandas.
fc_dfs = []
df_fields = ["Shape"] + fields[1:]
for fc in fcs_for_origin_range:
with arcpy.da.SearchCursor(fc, fields) as cur2: # pylint: disable=no-member
fc_dfs.append(pd.DataFrame(cur2, columns=df_fields))
df = pd.concat(fc_dfs, ignore_index=True)
# Drop all but the k nearest rows for each OriginOID (if needed) and calculate DestinationRank
df = self._update_df_for_k_nearest_and_destination_rank(df)
# Write the pandas rows to the final output feature class
for row in df.itertuples(index=False, name=None):
cur.insertRow(row)
self.logger.info("Post-processing complete.")
self.logger.info(f"Results written to {self.output_od_location}.")
def _post_process_od_line_csvs(self):
"""Post-process CSV file outputs."""
# If we wanted to find only the k closest destinations for each origin, we have to do additional post-
# processing. Calculating the OD in chunks means our merged output may have more than k destinations for each
# origin because each individual chunk found the closest k for that chunk. We need to eliminate all extra rows
# beyond the first k. Sort the data by OriginOID and the Total_ field that was optimized for the analysis.
if self.num_destinations or self.df_dest_count is not None:
# Handle each origin range separately to avoid pulling all results into memory at once
for origin_range in self.origin_ranges:
csvs_for_origin_range = [
f for f in self.od_line_files if os.path.basename(f).startswith(
f"ODLines_O_{origin_range[0]}_{origin_range[1]}_"
)
]
if len(csvs_for_origin_range) < 2:
# Either there were no results for this chunk, or all results are already in the same table, so
# there is no need to post-process because the k closest have already been found.
continue
# Read the csv files into a pandas dataframe for easy sorting
df = pd.concat(map(pd.read_csv, csvs_for_origin_range), ignore_index=True)
# Drop all but the k nearest rows for each OriginOID and calculate DestinationRank
df = self._update_df_for_k_nearest_and_destination_rank(df)
# Write the updated CSV file and delete the originals
out_csv = os.path.join(self.output_od_location, f"ODLines_O_{origin_range[0]}_{origin_range[1]}.csv")
df.to_csv(out_csv, index=False)
for csv_file in csvs_for_origin_range:
os.remove(csv_file)
def _post_process_od_line_arrow_files(self):
"""Post-process Arrow file outputs."""
# If we wanted to find only the k closest destinations for each origin, we have to do additional post-
# processing. Calculating the OD in chunks means our merged output may have more than k destinations for each
# origin because each individual chunk found the closest k for that chunk. We need to eliminate all extra rows
# beyond the first k. Sort the data by OriginOID and the Total_ field that was optimized for the analysis.
if self.num_destinations or self.df_dest_count is not None:
# Handle each origin range separately to avoid pulling all results into memory at once
for origin_range in self.origin_ranges:
files_for_origin_range = [
f for f in self.od_line_files if os.path.basename(f).startswith(
f"ODLines_O_{origin_range[0]}_{origin_range[1]}_"