-
Notifications
You must be signed in to change notification settings - Fork 17
/
Copy pathimport_filings.py
100 lines (80 loc) · 2.57 KB
/
import_filings.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
import logging
import re
import os
import shutil
import tempfile
import tarfile
import ftplib
from mrjob.job import MRJob, RawValueProtocol, JSONProtocol
log = logging.getLogger('importer')
HOST = 'ftp.sec.gov'
SIC_EXTRACT = re.compile(r'<ASSIGNED-SIC> *(.*)', re.I)
DOC_IN = re.compile(r'^<DOCUMENT>\s*$', re.I)
DOC_OUT = re.compile(r'^</DOCUMENT>\s*$', re.I)
SICS = ['1311', '1381', '1382', '1389']
def match_sic(line):
sic = None
for match in SIC_EXTRACT.finditer(line):
sic = match.group(1)
if sic in SICS:
return True
return False
def split_filing(dir, line, file_name):
path = os.path.join(dir, file_name)
with open(path, 'r') as fh:
is_header = True
header = []
doc_idx = 0
doc = None
for tline in fh.readlines():
if doc is not None:
doc.append(tline)
if DOC_IN.match(tline):
if is_header:
is_header = False
header = ''.join(header)
if not match_sic(header):
return
doc = [tline]
elif DOC_OUT.match(tline):
fn = '%s:%s:%s' % (line, file_name, doc_idx)
yield fn, {
'header': header,
'doc': ''.join(doc),
'file_name': file_name,
'doc_idx': doc_idx
}
doc = None
doc_idx += 1
if is_header:
header.append(tline)
class MRImportFilings(MRJob):
INPUT_PROTOCOL = RawValueProtocol
OUTPUT_PROTOCOL = JSONProtocol
def mapper(self, x, line):
fd, tarpath = tempfile.mkstemp()
os.close(fd)
dir = tempfile.mkdtemp()
try:
ftp = ftplib.FTP(HOST)
ftp.login('anonymous', '@anonymous')
with open(tarpath, 'wb') as fh:
ftp.retrbinary("RETR " + line, fh.write)
ftp.quit()
tb = tarfile.open(tarpath, 'r:gz')
tb.extractall(path=dir)
for file_name in os.listdir(dir):
for res in split_filing(dir, line, file_name):
#print res
yield res
except EnvironmentError, ee:
log.exception(ee)
except Exception, e:
log.exception(e)
finally:
shutil.rmtree(dir)
os.unlink(tarpath)
#def reducer(self, key, values):
# yield key, values
if __name__ == '__main__':
MRImportFilings.run()