forked from lspitzley/edgar-10k-sa
-
Notifications
You must be signed in to change notification settings - Fork 0
/
formindex.py
83 lines (62 loc) · 2.75 KB
/
formindex.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
from collections import namedtuple
import csv
import os
import requests
SEC_GOV_URL = 'http://www.sec.gov/Archives'
FORM_INDEX_URL = os.path.join(SEC_GOV_URL,'edgar','full-index','{}','QTR{}','form.idx')
IndexRecord = namedtuple("IndexRecord",["form_type","company_name","cik","date_filed","filename"])
class FormIndex(object):
def __init__(self, index_dir):
self.formrecords = []
self.index_dir = index_dir
if not os.path.exists(index_dir):
os.makedirs(index_dir)
def retrieve(self, year, qtr):
form_idx = "form_year{}_qtr{}.index".format(year,qtr)
form_idx_path = os.path.join(self.index_dir,form_idx)
self.download(form_idx_path, year, qtr)
self.extract(form_idx_path)
def download(self, form_idx_path, year, qtr):
if os.path.exists(form_idx_path):
print("Download skipped: year {}, qtr {}".format(year,qtr))
return
# Download and save to cached directory
print("Downloading year {}, qtr {}".format(year,qtr))
index_url = FORM_INDEX_URL.format(year,qtr)
resp = requests.get(index_url)
with open(form_idx_path,'wb') as fout:
fout.write(resp.content)
def extract(self, form_idx_path):
# Parse row to record
def parse_row_to_record(row, fields_begin):
record = []
for begin, end in zip(fields_begin[:],fields_begin[1:] + [len(row)] ):
field = row[begin:end].rstrip()
field = field.strip('\"')
record.append(field)
return record
# Read and parse
print("Extracting from {}".format(form_idx_path))
with open(form_idx_path,'rb') as fin:
# If arrived at 10-K section of forms
arrived = False
for row in fin.readlines():
row = row.decode('ascii')
if row.startswith("Form Type"):
fields_begin = [ row.find("Form Type"),
row.find("Company Name"),
row.find('CIK'),
row.find('Date Filed'),
row.find("File Name") ]
elif row.startswith("10-K "):
arrived = True
rec = parse_row_to_record(row,fields_begin)
self.formrecords.append(IndexRecord(*rec))
elif arrived == True:
break
def save(self, path):
print("Saving records to {}".format(path))
with open(path,'w') as fout:
writer = csv.writer(fout,delimiter=',',quotechar='\"',quoting=csv.QUOTE_ALL)
for rec in self.formrecords:
writer.writerow( tuple(rec) )