-
Notifications
You must be signed in to change notification settings - Fork 6
/
gethistory_tomongo.py
177 lines (144 loc) · 5.17 KB
/
gethistory_tomongo.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# -*- coding: utf-8 -*-
"""
文件记录
从网易财经接口获取所有股票从2000年01月01日到2017年08月04日的所有日成交价格
临时文件保存在./data/ephemeral.data下
最终将数据录入Mongodb数据库
http://quotes.money.163.com/service/chddata.html?code=0601857&start=20071105&end=20150618
"""
import requests as rq
import pandas as pd
import time
import traceback
import os
import logging
import datetime
import json
from pymongo import MongoClient
import sysutils
# 日志配置
logging.basicConfig(level=logging.INFO,
format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
datefmt='%a, %d %b %Y %H:%M:%S',
filename='log/'+os.path.basename(__file__)+'.log',
filemode='w')
# mongodb数据库连接
conn = MongoClient('127.0.0.1', 27017)
db = conn['stock'] #连接mydb数据库,没有则自动创建
thread_num = 1
start_date = "20000101"
"""
往mongdb中插入jsonArr
"""
def insert(coreID,JsonArr):
table = db[str(coreID)]
print type(JsonArr)
json_arry = []
for val in JsonArr:
timestamps = val['timestamp']
# 重复的不能插入
if table.find({"timestamp":timestamps}).count() <= 0:
json_arry.append(val)
if len(json_arry) > 0:
table.insert_many(json_arry)
"""
日期Str转时间戳
"""
def get_timestamp(datestr):
d = datetime.datetime.strptime(datestr, '%Y-%m-%d')
return int(time.mktime(d.timetuple()))
"""
将文件转入mongodb
"""
def sma(coreID,df,dateStr,n,m):
table = db[str(coreID)]
# 有
if table.find({"datetime": str(dateStr)}).count() > 0:
# 如果数据库表中有,则不计算sma值
return 0
return sysutils.sma(df,dateStr, n, m)
"""
在录入数据库的同时就计算出SMA的红线绿线和蓝线,然后需要比对的再做另外的统计
"""
def permit_to_mongo(coreid,filepath):
df = pd.read_csv(filepath,encoding="gbk",skiprows =1,names=["datetime","coreId","name","close","high","low","open","before_close","Fluctuation","Chg","Turnover_rate","volume","amount","TotleMarket","CirculationMarket","volnum"])
df = df.iloc[::-1]
l = df.get_values()
for i in l:
i[1] = i[1].replace("'","")
df = pd.DataFrame(l)
df.rename(columns={'index': 'id'})
df.columns = ["datetime", "coreId", "name", "close", "high", "low", "open", "before_close", "Fluctuation", "Chg",
"Turnover_rate", "volume", "amount", "TotleMarket", "CirculationMarket", "volnum"]
# axis=1取行数据,否则取列
df['timestamp'] = df.apply(lambda x:get_timestamp(x[0]),axis=1)
# 计算SMA值
df['sma_green'] = df.apply(lambda x:sma(coreid,df,x[0], 3, 5),axis=1)
df['sma_red'] = df.apply(lambda x:sma(coreid,df,x[0], 5, 8),axis=1)
df['sma_blue'] = df.apply(lambda x:sma(coreid,df,x[0], 8, 13),axis=1)
# 计算移动平均值
jarr = json.loads(df.to_json(orient='records'))
insert(coreid, jarr)
"""
获取文件最后一行的日期 +1天
"""
def get_table_last_date(coreId):
table = db[coreId]
# 1 为升序,-1为降序。找最近时间的一条记录
if table.find_one() is None:
# 改表没有记录
return None
r = table.find().sort([("timestamp", -1)]).limit(1)
x = time.localtime(int(r[0]['timestamp']) + 86400)
timeStr = time.strftime('%Y-%m-%d', x)
return timeStr
def func(i):
while l:
t = time.time()
try:
coreId = str(l.pop())
start = start_date
end_date = datetime.datetime.now().strftime('%Y%m%d')
file_path = "data/ephemeral.data/"+coreId+".csv"
last_date = get_table_last_date(coreId)
if last_date is not None:
start = last_date
market = "0"
if coreId[0] != "6":
market = "1"
url = "http://quotes.money.163.com/service/chddata.html?code="+market+coreId+"&start="+start+"&end="+end_date
print url
r = rq.get(url)
with open(file_path , 'wb') as f:
f.write(r.content)
f.close()
# 获取所有价格,之后的所有计算都是基于这个价格的
permit_to_mongo(coreId, file_path)
except Exception, e:
print 'str(Exception):\t', str(Exception)
print 'str(e):\t\t', str(e)
print 'repr(e):\t', repr(e)
print 'e.message:\t', e.message
print 'traceback.print_exc():'
traceback.print_exc()
print 'traceback.format_exc():\n%s' % traceback.format_exc()
continue
print "thread:%s len:%s time:%s.\n" % (i, len(l), time.time() - t),
if __name__ == "__main__":
# 从文件中读取
shaidIdList = []
with open("data/idList.txt") as f:
for coreId in f.xreadlines():
ids = coreId.strip()
shaidIdList.append(ids)
l = shaidIdList
threads = []
import threading
for i in range(thread_num):
t = threading.Thread(target=func, args=(i,))
t.setName(i)
threads.append(t)
for t in threads:
t.start()
for t in threads:
t.join()