-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathdocumentation.html
2882 lines (2437 loc) · 153 KB
/
documentation.html
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1, minimum-scale=1" />
<meta name="generator" content="pdoc 0.9.2" />
<title>comments_downloader API documentation</title>
<meta name="description" content="" />
<link rel="preload stylesheet" as="style" href="https://cdnjs.cloudflare.com/ajax/libs/10up-sanitize.css/11.0.1/sanitize.min.css" integrity="sha256-PK9q560IAAa6WVRRh76LtCaI8pjTJ2z11v0miyNNjrs=" crossorigin>
<link rel="preload stylesheet" as="style" href="https://cdnjs.cloudflare.com/ajax/libs/10up-sanitize.css/11.0.1/typography.min.css" integrity="sha256-7l/o7C8jubJiy74VsKTidCy1yBkRtiUGbVkYBylBqUg=" crossorigin>
<link rel="stylesheet preload" as="style" href="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/10.1.1/styles/github.min.css" crossorigin>
<style>:root{--highlight-color:#fe9}.flex{display:flex !important}body{line-height:1.5em}#content{padding:20px}#sidebar{padding:30px;overflow:hidden}#sidebar > *:last-child{margin-bottom:2cm}.http-server-breadcrumbs{font-size:130%;margin:0 0 15px 0}#footer{font-size:.75em;padding:5px 30px;border-top:1px solid #ddd;text-align:right}#footer p{margin:0 0 0 1em;display:inline-block}#footer p:last-child{margin-right:30px}h1,h2,h3,h4,h5{font-weight:300}h1{font-size:2.5em;line-height:1.1em}h2{font-size:1.75em;margin:1em 0 .50em 0}h3{font-size:1.4em;margin:25px 0 10px 0}h4{margin:0;font-size:105%}h1:target,h2:target,h3:target,h4:target,h5:target,h6:target{background:var(--highlight-color);padding:.2em 0}a{color:#058;text-decoration:none;transition:color .3s ease-in-out}a:hover{color:#e82}.title code{font-weight:bold}h2[id^="header-"]{margin-top:2em}.ident{color:#900}pre code{background:#f8f8f8;font-size:.8em;line-height:1.4em}code{background:#f2f2f1;padding:1px 4px;overflow-wrap:break-word}h1 code{background:transparent}pre{background:#f8f8f8;border:0;border-top:1px solid #ccc;border-bottom:1px solid #ccc;margin:1em 0;padding:1ex}#http-server-module-list{display:flex;flex-flow:column}#http-server-module-list div{display:flex}#http-server-module-list dt{min-width:10%}#http-server-module-list p{margin-top:0}.toc ul,#index{list-style-type:none;margin:0;padding:0}#index code{background:transparent}#index h3{border-bottom:1px solid #ddd}#index ul{padding:0}#index h4{margin-top:.6em;font-weight:bold}@media (min-width:200ex){#index .two-column{column-count:2}}@media (min-width:300ex){#index .two-column{column-count:3}}dl{margin-bottom:2em}dl dl:last-child{margin-bottom:4em}dd{margin:0 0 1em 3em}#header-classes + dl > dd{margin-bottom:3em}dd dd{margin-left:2em}dd p{margin:10px 0}.name{background:#eee;font-weight:bold;font-size:.85em;padding:5px 10px;display:inline-block;min-width:40%}.name:hover{background:#e0e0e0}dt:target .name{background:var(--highlight-color)}.name > span:first-child{white-space:nowrap}.name.class > span:nth-child(2){margin-left:.4em}.inherited{color:#999;border-left:5px solid #eee;padding-left:1em}.inheritance em{font-style:normal;font-weight:bold}.desc h2{font-weight:400;font-size:1.25em}.desc h3{font-size:1em}.desc dt code{background:inherit}.source summary,.git-link-div{color:#666;text-align:right;font-weight:400;font-size:.8em;text-transform:uppercase}.source summary > *{white-space:nowrap;cursor:pointer}.git-link{color:inherit;margin-left:1em}.source pre{max-height:500px;overflow:auto;margin:0}.source pre code{font-size:12px;overflow:visible}.hlist{list-style:none}.hlist li{display:inline}.hlist li:after{content:',\2002'}.hlist li:last-child:after{content:none}.hlist .hlist{display:inline;padding-left:1em}img{max-width:100%}td{padding:0 .5em}.admonition{padding:.1em .5em;margin-bottom:1em}.admonition-title{font-weight:bold}.admonition.note,.admonition.info,.admonition.important{background:#aef}.admonition.todo,.admonition.versionadded,.admonition.tip,.admonition.hint{background:#dfd}.admonition.warning,.admonition.versionchanged,.admonition.deprecated{background:#fd4}.admonition.error,.admonition.danger,.admonition.caution{background:lightpink}</style>
<style media="screen and (min-width: 700px)">@media screen and (min-width:700px){#sidebar{width:30%;height:100vh;overflow:auto;position:sticky;top:0}#content{width:70%;max-width:100ch;padding:3em 4em;border-left:1px solid #ddd}pre code{font-size:1em}.item .name{font-size:1em}main{display:flex;flex-direction:row-reverse;justify-content:flex-end}.toc ul ul,#index ul{padding-left:1.5em}.toc > ul > li{margin-top:.5em}}</style>
<style media="print">@media print{#sidebar h1{page-break-before:always}.source{display:none}}@media print{*{background:transparent !important;color:#000 !important;box-shadow:none !important;text-shadow:none !important}a[href]:after{content:" (" attr(href) ")";font-size:90%}a[href][title]:after{content:none}abbr[title]:after{content:" (" attr(title) ")"}.ir a:after,a[href^="javascript:"]:after,a[href^="#"]:after{content:""}pre,blockquote{border:1px solid #999;page-break-inside:avoid}thead{display:table-header-group}tr,img{page-break-inside:avoid}img{max-width:100% !important}@page{margin:0.5cm}p,h2,h3{orphans:3;widows:3}h1,h2,h3,h4,h5,h6{page-break-after:avoid}}</style>
<script defer src="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/10.1.1/highlight.min.js" integrity="sha256-Uv3H6lx7dJmRfRvH8TH6kJD1TSK1aFcwgx+mdg3epi8=" crossorigin></script>
<script>window.addEventListener('DOMContentLoaded', () => hljs.initHighlighting())</script>
</head>
<body>
<main>
<article id="content">
<header>
<h1 class="title">Module <code>comments_downloader</code></h1>
</header>
<section id="section-intro">
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">import sys
import os
import requests
from requests.adapters import HTTPAdapter
import sqlite3
import pandas as pd
from datetime import datetime
from dateutil import tz
import time
import csv
import urllib3
from argparse import ArgumentParser
# we are ignoring the HTTPS check because the server occasionally returns malformed certificates (missing EOF)
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
class CommentsDownloader:
"""This class is used for downloading dockets, documents, and comments from Regulations.gov
It can be used in very general ways, by getting the raw JSON from the API, or in a more common way,
downloading the "headers" and "details" of items in bulk.
"""
def __init__(self, api_key):
self.api_key = api_key
def get_requests_remaining(self):
"""Get the number of requests remaining. An API key usually gives you 1000 requests/hour.
Returns:
int: number of requests remaining
"""
# this is a document that we know exists; it was chosen arbitrarily
r = requests.get('https://api.regulations.gov/v4/documents/FDA-2009-N-0501-0012',
headers={'X-Api-Key': self.api_key},
verify=False)
if r.status_code != 200:
print(r.json())
r.raise_for_status()
return int(r.headers['X-RateLimit-Remaining'])
def get_request_json(self, endpoint, params=None, print_remaining_requests=False,
wait_for_rate_limits=False, skip_duplicates=False):
"""Used to return the JSON associated with a request to the API
Args:
endpoint (str): URL of the API to access (e.g., https://api.regulations.gov/v4/documents)
params (dict, optional): Parameters to specify to the endpoint request. Defaults to None.
If we are querying the non-details endpoint, we also append the "page[size]" parameter
so that we always get the maximum page size of 250 elements per page.
print_remaining_requests (bool, optional): Whether to print out the number of remaining
requests this hour, based on the response headers. Defaults to False.
wait_for_rate_limits (bool, optional): Determines whether to wait to re-try if we run out of
requests in a given hour. Defaults to False.
skip_duplicates (bool, optional): If a request returns multiple items when only 1 was expected,
should we skip that request? Defaults to False.
Returns:
dict: JSON-ified request response
"""
# Our API key has a rate limit of 1,000 requests/hour. If we hit that limit, we can
# retry every WAIT_MINUTES minutes (more frequently than once an hour, in case our request limit
# is updated sooner). We will sleep for POLL_SECONDS seconds at a time to see if we've been
# interrupted. Otherwise we'd have to wait a while before getting interrupted. We could do this
# with threads, but that gets more complicated than it needs to be.
STATUS_CODE_OVER_RATE_LIMIT = 429
WAIT_MINUTES = 20 # time between attempts to get a response
POLL_SECONDS = 10 # run time.sleep() for this long, so we can check if we've been interrupted
params = params if params is not None else {}
# whether we are querying the search endpoint (e.g., /documents) or the "details" endpoint
if (endpoint.split("/")[-1] in ["dockets", "documents", "comments"]):
params = {**params, "page[size]": 250} # always get max page size
# Rather than do requests.get(), use this approach to (attempt to) gracefully handle noisy connections to the server
# We sometimes get SSL errors (unexpected EOF or ECONNRESET), so this should hopefully help us retry.
session = requests.Session()
session.mount('https', HTTPAdapter(max_retries=4))
def poll_for_response(api_key, else_func):
r = session.get(endpoint,
headers={'X-Api-Key': api_key},
params=params,
verify=False)
if r.status_code == 200:
# SUCCESS! Return the JSON of the request
num_requests_left = int(r.headers['X-RateLimit-Remaining'])
if print_remaining_requests or \
(num_requests_left < 10) or \
(num_requests_left <= 100 and num_requests_left % 10 == 0) or \
(num_requests_left % 100 == 0 and num_requests_left < 1000):
print(f"(Requests left: {r.headers['X-RateLimit-Remaining']})")
return [True, r.json()]
else:
if r.status_code == STATUS_CODE_OVER_RATE_LIMIT and wait_for_rate_limits:
else_func()
elif self._is_duplicated_on_server(r.json()) and skip_duplicates:
print("****Duplicate entries on server. Skipping.")
print(r.json()['errors'][0]['detail'])
else: # some other kind of error
print([r, r.status_code])
print(r.json())
r.raise_for_status()
return [False, r.json()]
def wait_for_requests():
the_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(f'{the_time}: Hit rate limits. Waiting {WAIT_MINUTES} minutes to try again', flush=True)
# We ran out of requests. Wait for WAIT_MINUTES minutes, but poll every POLL_SECONDS seconds for interruptions
for i in range(int(WAIT_MINUTES * 60 / POLL_SECONDS)):
time.sleep(POLL_SECONDS)
for _ in range(1, int(60 / WAIT_MINUTES) + 3):
success, r_json = poll_for_response(self.api_key, wait_for_requests)
if success or (self._is_duplicated_on_server(r_json) and skip_duplicates):
return r_json
print(r_json)
raise RuntimeError(f"Unrecoverable error; {r_json}")
def get_items_count(self, data_type, params):
"""Gets the number of items returned by a request in the totalElements attribute.
Args:
data_type (str): One of "dockets", "documents", or "comments".
params (dict): Parameters to specify to the endpoint request for the query. See details
on available parameters at https://open.gsa.gov/api/regulationsgov/.
Returns:
int: Number of items returned by request
"""
# make sure the data_type is plural
data_type = data_type if data_type[-1:] == "s" else data_type + "s"
r_items = self.get_request_json(f'https://api.regulations.gov/v4/{data_type}', params=params)
totalElements = r_items['meta']['totalElements']
return totalElements
def gather_headers(self, data_type, params, db_filename=None, csv_filename=None, max_items=None, verbose=True):
"""This function is meant to get the header data for the item returned by the query defined by
params. The API returns these data in "pages" of up to 250 items at a time, and up to 20 pages are
available per query. If the query would return more than 250*20 = 5000 items, the recommended way
to retrieve the full dataset is to sort the data by lastModifiedDate and save the largest value
from the last page of a given query, then use that to filter the next batch to all those with a
lastModifiedDate greater than or equal to the saved date. Unfortunately, this also means it's
you'll retrieve some of the same headers multiple times, but this is unavoidable because there is no
uniqueness constraint on lastModifiedDate.
The data retrieved are output either to a database (db_filename), or a CSV (csv_filename),
or both. These data do not include more specific detail that would be retrieved in a "Details" query,
which returns that data (e.g., plain-text of a comment). That kind of data can be gathered
using the gather_details function below.
An example call is:
gather_headers(data_type='comments', db_filename="comments_2020", params={'filter[postedDate][ge]': '2020-01-01'})
Args:
data_type (str): One of "dockets", "documents", or "comments".
params (dict): Parameters to specify to the endpoint request for the query. See details
on available parameters at https://open.gsa.gov/api/regulationsgov/.
db_filename (str): Name (optionally with path) of the sqlite database to write to. If it doesn't yet exist,
it will be created automatically. If it does exist, we will add to it. Can be None, in which
case a CSV file should be specified.
csv_filename (str): Name (optionally with path) of the CSV file to write to. Can be None, in which
case a database file should be specified.
max_items (int, optional): If this is specified, limits to this many items. Note that this
is an *approximate* limit. Because of how we have to query with pagination, we will inevitably
end up with duplicate records being pulled, so we will hit this limit sooner than we should,
but we shouldn't be off by very much. Defaults to None.
verbose (bool, optional): Whether to print more detailed info. Defaults to True.
"""
if db_filename is None and csv_filename is None:
raise ValueError("Must specify either a database file name or CSV filename")
# make sure the data_type is plural
data_type = data_type if data_type[-1:] == "s" else data_type + "s"
n_retrieved = 0
prev_query_max_date = '1900-01-01 00:00:00' # placeholder value for first round of 5000
EASTERN_TIME = tz.gettz('America/New_York')
# remove the trailing s before adding "Id"; e.g., "dockets" --> "docketId"
id_col = data_type[:len(data_type)-1] + "Id"
if db_filename is not None:
conn = self._get_database_connection(db_filename)
cur = conn.cursor()
else:
conn = cur = None
# first request, to ensure there are documents and to get a total count
totalElements = self.get_items_count(data_type, params)
print(f'Found {totalElements} {data_type}...', flush=True)
if max_items is not None and max_items < totalElements:
print(f'...but limiting to {max_items} {data_type}...', flush=True)
totalElements = max_items
while n_retrieved < totalElements:
# loop over 5000 in each request (20 pages of 250 each)
if verbose: print(f'\nEnter outer loop ({n_retrieved} {data_type} collected)...', flush=True)
page = 1
data = []
while (n_retrieved < totalElements) and (page == 1 or (not r_items['meta']['lastPage'])):
## note: this will NOT lead to an off-by-one error because at the start of the loop
# r_items is from the *previous* request. If the *previous* request was the last page
# then we exit the loop (unless we're on the first page, in which case get the data then exit)
retries = 5
while retries > 0:
try:
r_items = self.get_request_json(f'https://api.regulations.gov/v4/{data_type}',
params={**params,
'filter[lastModifiedDate][ge]': prev_query_max_date,
'page[number]': str(page),
'sort': f'lastModifiedDate'},
wait_for_rate_limits=True)
break
except Exception as e:
retries -= 1
if retries <= 0:
raise e
n_retrieved += len(r_items['data'])
data.extend(r_items['data']) # add all items from this request
page += 1
## There may be duplicates due to pagination, so the commented out code here doesn't apply,
## but I'm leaving it in so I know not to "fix" this issue later on.
#if n_retrieved > totalElements:
# data = data[:-(n_retrieved - totalElements)] # remove the extras
# assert len(data) == totalElements
# n_retrieved = totalElements
if verbose: print(f' {n_retrieved} {data_type} retrieved', flush=True)
# get our query's final record's lastModifiedDate, and convert to eastern timezone for filtering via URL
prev_query_max_date = r_items['data'][-1]['attributes']['lastModifiedDate'].replace('Z', '+00:00')
prev_query_max_date = datetime.fromisoformat(prev_query_max_date).astimezone(EASTERN_TIME).strftime('%Y-%m-%d %H:%M:%S')
data = self._get_processed_data(data, id_col)
self._output_data(data,
table_name=(data_type + "_header"),
conn=conn,
cur=cur,
csv_filename=csv_filename)
self._remove_duplicates_from_csv(data_type, csv_filename)
self._close_database_connection(conn)
# Note: the count in n_retrieved may not reflect what's in the database because there may be
# duplicates downloaded along the way due to the pagination mechanism on Regulations.gov's API.
# The sqlite database uses a unique constraint to avoid duplicates, so the final count printed
# below may not match what is shown in the database. For CSVs, the count here should match
# the number of rows in the output.
the_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(f'{the_time}: Finished: approximately {n_retrieved} {data_type} collected', flush=True)
def gather_details(self, data_type, ids, db_filename=None, csv_filename=None, insert_every_n_rows=500, skip_duplicates=True):
"""This function is meant to get the Details data for each item in ids, one at a time. The data
for each item is output either to a database (specified by db_filename) or a CSV (specified by csv_filename).
An example call is:
gather_details(data_type='documents', cols=documents_cols, id_col='documentId', ids=document_ids, csv_filename="documents_2020.csv")
Args:
data_type (str): One of "dockets", "documents", or "comments".
ids (list of str): List of IDs for items for which you are querying details. These IDs are
appended to the URL directly, e.g., https://api.regulations.gov/v4/comments/FWS-R8-ES-2008-0006-0003
db_filename (str): Name (optionally with path) of the sqlite database to write to. If it doesn't yet exist,
it will be created automatically. If it does exist, we will add to it. Can be None, in which
case a CSV should be specified.
csv_filename (str): Name (optionally with path) of the CSV file to write to. Can be None, in which
case a database file should be specified.
insert_every_n_rows (int): How often to write to the database or CSV. Defaults to every 500 rows.
skip_duplicates (bool, optional): If a request returns multiple items when only 1 was expected,
should we skip that request? Defaults to True.
"""
if db_filename is None and csv_filename is None:
raise ValueError("Must specify either a database file name or CSV filename")
# make sure the data_type is plural
data_type = data_type if data_type[-1:] == "s" else data_type + "s"
n_retrieved = 0
data = []
attachments = []
# remove the trailing s before adding "Id"; e.g., "dockets" --> "docketId"
id_col = data_type[:len(data_type)-1] + "Id"
if db_filename is not None:
conn = self._get_database_connection(db_filename)
cur = conn.cursor()
else:
conn = cur = None
the_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(f'{the_time}: Gathering details for {len(ids)} {data_type}...', flush=True)
for item_id in ids:
retries = 5
while retries > 0:
try:
r_item = self.get_request_json(f'https://api.regulations.gov/v4/{data_type}/{item_id}',
params={"include":"attachments"} if data_type == "comments" else None,
wait_for_rate_limits=True,
skip_duplicates=skip_duplicates)
break
except Exception as e:
retries -= 1
if retries <= 0:
print(f"Error encountered for {item_id}")
raise e
if(skip_duplicates and self._is_duplicated_on_server(r_item)):
print(f"Skipping for {item_id}\n")
continue
n_retrieved += 1
data.append(r_item['data']) # only one item from the Details endpoint, not a list, so use append (not extend)
if 'included' in r_item.keys():
attachments.append(r_item['included'][0]['attributes']['fileFormats'])
else:
attachments.append(None)
if n_retrieved > 0 and n_retrieved % insert_every_n_rows == 0:
if data_type != "comments":
attachments = None
data = self._get_processed_data(data, id_col, attachments)
self._output_data(data,
table_name=(data_type + "_detail"),
conn=conn,
cur=cur,
csv_filename=csv_filename)
data = [] # reset for next batch
attachments = []
if len(data) > 0: # insert any remaining in final batch
if data_type != "comments":
attachments = None
data = self._get_processed_data(data, id_col, attachments)
self._output_data(data,
table_name=(data_type + "_detail"),
conn=conn,
cur=cur,
csv_filename=csv_filename)
self._close_database_connection(conn)
the_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(f'{the_time}: Finished: {n_retrieved} {data_type} collected', flush=True)
def gather_comments_by_document(self, document_id, db_filename=None, csv_filename=None):
"""User-friendly function for downloading all of the comments on a single document, using
the documentId visible on the Regulations.gov website. This abstracts away all the details around
filtering and paginating through the API and downloads the data into either a CSV or sqlite database
or both.
Note that if a database is used (i.e., db_filename is not None), the "header" information for comments
will be saved, in addition to the "details" of each comment. In other words, the table comments_header
will be populated in addition to comments_detail.
Args:
document_id (str): document ID, as visible in either the URL or on the website. Note, this is
distinct from the docket ID and from the API's internal objectId.
db_filename (str): Name (optionally with path) of the sqlite database to write to. If it doesn't yet exist,
it will be created automatically. If it does exist, we will add to it. Can be None, in which
case a CSV should be specified.
csv_filename (str): Name (optionally with path) of the CSV file to write to. Can be None, in which
case a database file should be specified.
"""
if db_filename is None and csv_filename is None:
raise ValueError("Need to specify either a database filename or CSV filename or both")
def get_object_id(document_id):
# first, get the objectId for the document, which we use to filter to its comments
the_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(f"{the_time}: Getting objectId for document {document_id}...", end="", flush=True)
r_json = self.get_request_json(f'https://api.regulations.gov/v4/documents/{document_id}')
object_id = r_json['data']['attributes']['objectId']
print(f"Got it ({object_id})", flush=True)
return object_id
def get_comment_ids(object_id):
# We need to create a temporary CSV so we can read back in the commentIds. This is because the
# comment headers do not include the associated documentId or objectId, so if we append the
# comment headers to an existing file or database, we won't be able to tell which comments
# correspond to this document.
the_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(f"{the_time}: Getting comment headers associated with document {document_id}...\n", flush=True)
temp_filename = f"comment_headers_{datetime.now().strftime('%H%M%S')}.csv"
self.gather_headers(data_type="comments",
params={'filter[commentOnId]': object_id},
db_filename=db_filename,
csv_filename=temp_filename,
verbose=False)
# if file didn't get created, we found 0 comments
if os.path.isfile(temp_filename):
comment_ids = self.get_ids_from_csv(temp_filename, "comments", unique=True)
try:
os.remove(temp_filename)
except:
pass
else:
return []
print("\nDone getting comment IDs----------------\n", flush=True)
return comment_ids
object_id = get_object_id(document_id)
comment_ids = get_comment_ids(object_id)
if len(comment_ids) > 0:
the_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(f"{the_time}: Getting comments associated with document {document_id}...\n", flush=True)
self.gather_details("comments", comment_ids, db_filename=db_filename, csv_filename=csv_filename)
# Get the total number of comments retrieved. This may differ from what we expect if there
# are issues during the download process or the database prevents importing duplicates from pagination.
n_comments = self._get_item_count(data_type="comments", csv_filename=csv_filename, db_filename=db_filename,
filter_column="commentOnDocumentId", filter_value=document_id)
else:
n_comments = 0
print(f"\nDone getting all {n_comments} comments for document {document_id}----------------\n", flush=True)
def gather_comments_by_docket(self, docket_id, db_filename=None, csv_filename=None):
"""User-friendly function for downloading all of the comments in a docket, using the docketId visible
on the Regulations.gov website. This abstracts away all the details around finding all the documents
in a given docket and getting their individual comments, including filtering and paginating through
the API. It downloads the comments into either a CSV or sqlite database or both.
Note that if a database is used (i.e., db_filename is not None), the "header" information for documents
and comments will be saved, in addition to the "details" of each comment. In other words, the table
comments_header will be populated in addition to comments_detail, and the table documents_header will
be populated as well.
Args:
document_id (str): document ID, as visible in either the URL or on the website. Note, this is
distinct from the docket ID and from the API's internal objectId.
db_filename (str): Name (optionally with path) of the sqlite database to write to. If it doesn't yet exist,
it will be created automatically. If it does exist, we will add to it. Can be None, in which
case a CSV should be specified.
csv_filename (str): Name (optionally with path) of the CSV file to write to. Can be None, in which
case a database file should be specified.
"""
if db_filename is None and csv_filename is None:
raise ValueError("Need to specify either a database filename or CSV filename or both")
def get_document_ids(docket_id):
the_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(f"{the_time}: Getting documents associated with docket {docket_id}...\n", flush=True)
temp_filename = f"document_headers_{datetime.now().strftime('%H%M%S')}.csv"
self.gather_headers(data_type="documents",
params={'filter[docketId]': docket_id},
db_filename=db_filename,
csv_filename=temp_filename,
verbose=False)
# if file didn't get created, we found 0 documents
if os.path.isfile(temp_filename):
document_ids = self.get_ids_from_csv(temp_filename, "documents", unique=True)
try:
os.remove(temp_filename)
except:
pass
else:
raise ValueError(f"Docket {docket_id} has no documents (did you specify a documentId instead of a docketId by mistake?)")
print(f"\nDone----------------\n", flush=True)
return document_ids
document_ids = get_document_ids(docket_id)
for document_id in document_ids:
the_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(f"******************************\n{the_time}: Getting comments for document {document_id}...\n", flush=True)
self.gather_comments_by_document(document_id, db_filename, csv_filename)
# get the total number of comments retrieved
n_comments = self._get_item_count(data_type="comments", csv_filename=csv_filename, db_filename=db_filename,
filter_column="docketId", filter_value=docket_id)
print(f"DONE retrieving all {n_comments} comments from {len(document_ids)} document(s) for docket {docket_id}----------------\n", flush=True)
def get_ids_from_csv(self, csv_filename, data_type, unique=False):
"""Get IDs for dockets, documents, or comments in a given CSV. Assumes that the header row
exists in the file and that the ID column is named one of docketId, documentId, or commentId.
Note: the CSV could be very large, so we don't load the whole thing into memory, but instead
loop over it one row at a time.
Args:
csv_filename (str): Name (optionally with path) of the CSV file with the data
data_type (str): One of "dockets", "documents", or "comments".
unique (bool, optional): Whether to remove duplicates, making the list of IDs unique.
Defaults to False so that the IDs are returned in the same order and number as the
input file.
Returns:
list of str: IDs for the given data_type from the specified csv_filename
"""
# make sure the data_type is NOT plural before adding Id
id_column = (data_type[:-1] if data_type[-1:] == "s" else data_type) + "Id"
id_column_index = None
ids = []
with open(csv_filename, 'r', encoding='utf8', newline='') as f:
reader = csv.reader(f)
for row in reader:
if id_column_index is None:
try:
id_column_index = row.index(id_column)
except ValueError:
raise ValueError(f"Missing id column {id_column} in {csv_filename}")
else:
ids.append(row[id_column_index])
if unique:
ids = list(set(ids))
return ids
def _get_database_connection(self, filename, drop_if_exists=False):
"""Get a connection to the database in the file at filename. If the database does not
exist it will be created with the necessary tables. If it does exist, tables are kept as-is
unless drop_if_exists is specified, in which case existing tables are dropped before creating
the necessary tables.
Args:
filename (str): Filename of database, optionally including path.
drop_if_exists (bool, optional): Whether to drop the necessary tables if they exist.
Defaults to False, in which case if the tables exist, they will be left as-is and
new data will be appended.
Returns:
sqlite.Connection: open connection to the database
"""
# If the database exists already, this just ensures all the necessary tables exist
self._setup_database(filename, drop_if_exists=drop_if_exists)
return sqlite3.connect(filename)
def _setup_database(self, filename=None, drop_if_exists=False):
"""Set up a sqlite database with the tables and columns necessary for the data returned
by the Regulations.gov API.
Args:
filename (str): Filename, optionally including path.
drop_if_exists (bool, optional): Whether to drop the six tables used here if they already exist.
Defaults to False so that we don't delete any information.
"""
if filename is None:
filename = 'regulations.gov_' + datetime.now().strftime('%Y%m%d') + ".db"
# make the path if necessary
if len(os.path.dirname(filename)) > 0:
os.makedirs(os.path.dirname(filename), exist_ok=True)
conn = sqlite3.connect(filename)
cur = conn.cursor()
if drop_if_exists:
cur.execute('drop table if exists dockets_header')
cur.execute('drop table if exists dockets_detail')
cur.execute('drop table if exists documents_header')
cur.execute('drop table if exists documents_detail')
cur.execute('drop table if exists comments_header')
cur.execute('drop table if exists comments_detail')
cur.execute("""
CREATE TABLE IF NOT EXISTS dockets_header (
docketId TEXT NOT NULL UNIQUE,
agencyId TEXT,
docketType TEXT,
title TEXT,
lastModifiedDate TEXT NOT NULL,
objectId TEXT,
sqltime TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)""")
cur.execute("""
CREATE TABLE IF NOT EXISTS dockets_detail (
docketId TEXT NOT NULL UNIQUE,
agencyId TEXT,
category TEXT,
dkAbstract TEXT,
docketType TEXT,
effectiveDate TEXT,
field1 TEXT,
field2 TEXT,
generic TEXT,
keywords TEXT,
legacyId TEXT,
modifyDate TEXT NOT NULL,
objectId TEXT,
organization TEXT,
petitionNbr TEXT,
program TEXT,
rin TEXT,
shortTitle TEXT,
subType TEXT,
subType2 TEXT,
title TEXT,
sqltime TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)""")
cur.execute("""
CREATE TABLE IF NOT EXISTS documents_header (
documentId TEXT NOT NULL UNIQUE,
commentEndDate TEXT,
commentStartDate TEXT,
docketId TEXT,
documentType TEXT,
frDocNum TEXT,
lastModifiedDate TEXT NOT NULL,
objectId TEXT,
postedDate TEXT,
subtype TEXT,
title TEXT,
withdrawn INTEGER,
sqltime TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)""")
cur.execute("""
CREATE TABLE IF NOT EXISTS documents_detail (
documentId TEXT NOT NULL UNIQUE,
additionalRins TEXT,
agencyId TEXT,
allowLateComments INTEGER,
authorDate TEXT,
authors TEXT,
category TEXT,
cfrPart TEXT,
city TEXT,
comment TEXT,
commentEndDate TEXT,
commentStartDate TEXT,
country TEXT,
docAbstract TEXT,
docketId TEXT,
documentType TEXT,
effectiveDate TEXT,
exhibitLocation TEXT,
exhibitType TEXT,
field1 TEXT,
field2 TEXT,
firstName TEXT,
frDocNum TEXT,
frVolNum TEXT,
govAgency TEXT,
govAgencyType TEXT,
implementationDate TEXT,
lastName TEXT,
legacyId TEXT,
media TEXT,
modifyDate TEXT NOT NULL,
objectId TEXT,
ombApproval TEXT,
openForComment INTEGER,
organization TEXT,
originalDocumentId TEXT,
pageCount TEXT,
paperLength INTEGER,
paperWidth INTEGER,
postedDate TEXT,
postmarkDate TEXT,
reasonWithdrawn TEXT,
receiveDate TEXT,
regWriterInstruction TEXT,
restrictReason TEXT,
restrictReasonType TEXT,
sourceCitation TEXT,
startEndPage TEXT,
stateProvinceRegion TEXT,
subject TEXT,
submitterRep TEXT,
submitterRepCityState TEXT,
subtype TEXT,
title TEXT,
topics TEXT,
trackingNbr TEXT,
withdrawn INTEGER,
zip TEXT,
sqltime TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)""")
cur.execute("""
CREATE TABLE IF NOT EXISTS comments_header (
commentId TEXT NOT NULL UNIQUE,
agencyId TEXT,
documentType TEXT,
lastModifiedDate TEXT NOT NULL,
objectId TEXT,
postedDate TEXT,
title TEXT,
withdrawn INTEGER,
sqltime TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL
)""")
cur.execute("""
CREATE TABLE IF NOT EXISTS comments_detail (
commentId TEXT NOT NULL UNIQUE,
agencyId TEXT,
category TEXT,
city TEXT,
comment TEXT,
commentOn TEXT,
commentOnDocumentId TEXT,
country TEXT,
docAbstract TEXT,
docketId TEXT,
documentType TEXT,
duplicateComments INTEGER,
field1 TEXT,
field2 TEXT,
firstName TEXT,
govAgency TEXT,
govAgencyType TEXT,
lastName TEXT,
legacyId TEXT,
modifyDate TEXT NOT NULL,
objectId TEXT,
openForComment INTEGER,
organization TEXT,
originalDocumentId TEXT,
pageCount TEXT,
postedDate TEXT,
postmarkDate TEXT,
reasonWithdrawn TEXT,
receiveDate TEXT,
restrictReason TEXT,
restrictReasonType TEXT,
stateProvinceRegion TEXT,
submitterRep TEXT,
submitterRepCityState TEXT,
subtype TEXT,
title TEXT,
trackingNbr TEXT,
withdrawn INTEGER,
zip TEXT,
attachmentLinks TEXT,
sqltime TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)""")
conn.close()
def _close_database_connection(self, conn):
"""Close a database connection
Args:
conn (sqlite3.Connection): Try to close the connection. If there are any errors, ignore them.
"""
try:
conn.close()
except:
pass
def _is_duplicated_on_server(self, response_json):
"""Used to determine whether a given response indicates a duplicate on the server. This is
because there is a bug in the server: there are some commentIds, like NRCS-2009-0004-0003,
which correspond to multiple actual comments! This function determines whether the
returned JSON has an error indicating this issue
Args:
response_json (dict): JSON from request to API (usually, from get_request_json)
Returns:
bool: whether the response indicates a duplicate issue or not
"""
return ('errors' in response_json.keys()) \
and (response_json['errors'][0]['status'] == "500") \
and (response_json['errors'][0]['detail'][:21] == "Incorrect result size")
def _get_item_count(self, data_type, csv_filename=None, db_filename=None, filter_column=None, filter_value=None):
"""Simple helper function used to get the number of items retrieved, as stored in either
a CSV file or a sqlite database.
Args:
data_type (str): One of "dockets", "documents", or "comments".
csv_filename (str): File name (optionally with path) where items are stored. Defaults to
None, in which case a db_filename should be specified.
db_filename (str): File name (optionally with path) where database, containing comments, is located.
Defaults to None, in which case a csv_filename should be specified.
filter_column (str): Identifies the column used to filter the database to get the count. Defaults to
None for the case when we are using a CSV or don't want to use a filter.
filter_value (str): The value used in filter_column to filter the database to get the count. Defaults to
None for the case when we are using a CSV or don't want to use a filter.
Returns:
int: Number of items stored in either the detail table (contained in the database db_filename) or the CSV
"""
if csv_filename is None and db_filename is None:
raise ValueError("Must specify either a csv_filename or a db_filename")
# make sure the data_type is plural
data_type = data_type if data_type[-1:] == "s" else data_type + "s"
if db_filename is not None:
conn = sqlite3.connect(db_filename)
cur = conn.cursor()
if filter_column is not None and filter_value is not None:
n_items = cur.execute(f"select count(*) from {data_type}_detail where {filter_column}=?", (filter_value,)).fetchone()[0]
else:
n_items = cur.execute(f"select count(*) from {data_type}_detail").fetchone()[0]
conn.close()
else:
n_items = len(self.get_ids_from_csv(csv_filename, data_type, unique=True))
return n_items
def _get_processed_data(self, data, id_col, attachments=None):
"""Used to take the data contained in a response (e.g., the data for a bunch of comments)
and remove unnecessary columns (i.e., those not specified in `cols`). Also adds the ID
associated with the items and flattens lists contained in each item's data.
Args:
data (list of dict): List of items to process from a request (e.g., a bunch of comments).
Each dict is expected to be formatted like: {'id': '...', 'attributes': {'attrib1': 'data', ...}, <other keys:values>}
id_col (str): Name of the ID column for this data type, i.e., 'docketId', 'documentId', or 'commentId'
attachments (list of dict): List of dict with the file attachments, if any. Dict contains 'fileUrl', 'format', and 'size' keys.
Returns:
list of dict: processed dataset, ready for input into sqlite or output to CSV
"""
output = []
cols = [x for x in data[0]['attributes'].keys() if x not in \
['id', 'displayProperties', 'highlightedContent', 'fileFormats']]
for idx, item in enumerate(data):
# get just the dict of columns we want, and if one of the values is a list, flatten it
out = {k:(' '.join(v) if type(v) == list else v) for (k,v) in item['attributes'].items() if k in cols}
if attachments is not None:
if attachments[idx] is not None:
# create a "|" separated list of URLs
out["attachmentLinks"] = "|".join([x['fileUrl'] for x in attachments[idx]])
else:
out["attachmentLinks"] = ""
# add the item's ID at the first position
out = {id_col: item['id'], **out}
output.append(out)
return output
def _insert_data(self, data, table_name, conn, cur=None):
"""Add data to a specified sqlite table
Args:
data (list of dict): Data to be inserted into database
table_name (str): specifies table to insert into (one of: "dockets_header", "dockets_detail",
"documents_header", "documents_detail", "comments_header", or "comments_detail")
conn (sqlite3.Connection): Open connection to database
cur (sqlite3.Cursor): Open cursor into the database
"""
# upload into staging table, then insert, skipping any rows that violate key constraints
if conn is None:
raise ValueError("conn cannot be None")
if table_name is None:
raise ValueError("Need to specify table_name")
if cur is None:
cur = conn.cursor()
the_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
cols = [x for x in pd.read_sql_query(f'select * from {table_name} limit 1', conn).columns if x != "sqltime"]
print(f"{the_time}: Inserting {len(data)} records into database...", flush=True)
pd.DataFrame(data).to_sql("tmp", conn, if_exists="replace", index=False)
cur.execute(f"INSERT OR IGNORE INTO {table_name} ({','.join(cols)}) SELECT {','.join(cols)} FROM tmp")
conn.commit()
def _write_to_csv(self, data, csv_filename):
"""Write out data to a CSV file. Data will be appended to an existing file, or if the file does
not exist, the file will be created with headers. Subsequent appends do not include the header row.
Args:
data (list of dict): Data to write out
csv_filename (str): Name (optionally with path) of the CSV file to write to
"""
if csv_filename is None:
raise ValueError("csv_filename cannot be None")
the_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(f"{the_time}: Writing {len(data)} records to {csv_filename}...", end="", flush=True)
df = pd.DataFrame(data)
# remove line breaks in each field so that the rows of the CSV correspond to one record
df.replace(r"\r\n|\n", " ", regex=True, inplace=True)
# make the path if necessary
if len(os.path.dirname(csv_filename)) > 0:
os.makedirs(os.path.dirname(csv_filename), exist_ok=True)
df.to_csv(csv_filename, index=False, mode='a', quoting=csv.QUOTE_ALL,
header=(not os.path.isfile(csv_filename)))
print("Done", flush=True)
def _output_data(self, data, table_name=None, conn=None, cur=None, csv_filename=None):
"""Routes the output call to either database or the CSV, depending on parameters
Args:
data (list of dict): Data to write out
table_name (str): For sqlite database, specifies table to insert into (one of: "dockets_header", "dockets_detail",
"documents_header", "documents_detail", "comments_header", or "comments_detail"). Can be None if using CSV.
conn (sqlite3.Connection): Open connection to database. Can be None, in which case a CSV should be specified.
Can be None if using a CSV.
cur (sqlite3.Cursor): Open cursor into the database. Can be None, in which case a CSV should be specified.
Can be None if using a CSV.
csv_filename (str): Name (optionally with path) of the CSV file to write to. Can be None, in which
case a database file should be specified.
"""
if conn is None and csv_filename is None:
raise ValueError("Need to specify either conn or csv_filename")
if conn is not None:
self._insert_data(data, table_name, conn, cur)
if csv_filename is not None:
self._write_to_csv(data, csv_filename)
def _remove_duplicates_from_csv(self, data_type, csv_filename):
"""Function used to remove duplicates (on docketId, documentId, or commentId, depending on
the data_type) from a CSV, which may be introduced because of the pagination mechanism.
Args:
data_type (str): One of "dockets", "documents", or "comments".
csv_filename (str): Name (optionally with path) of the CSV file containing the data. Can be None, in which
case a database file should be specified.
"""