This repository was archived by the owner on Apr 5, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathwarscrawler.py
304 lines (243 loc) · 9.45 KB
/
warscrawler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import re
import yaml
import sys
import time
import sqlite3
import requests
import datetime as dt
import pandas as pd
from bs4 import BeautifulSoup
import pdb
gtype_dict = {'10m': '', '3m': 'sb', '10s': 's1'}
WCSA_PATTERN = re.compile(r'(?<=receiveMove\(\").+?(?=\"\);)')
GAME_HEADER_PATTERN = re.compile(r'(?<=var\sgamedata\s=\s){[^}]+}', re.DOTALL)
class WarsCrawler:
'''将棋ウォーズ用のクローラー
Args
----------
dbpath : string
SQLiteのパス
interval : int, optional (default = 10)
取得時の時間間隔
n_retry : int, optional (default = 10)
取得時の再試行の回数のmax
'''
def __init__(self, dbpath, interval=10, n_retry=10):
self.dbpath = dbpath
self.INTERVAL_TIME = interval
self.MAX_N_RETRY = n_retry
# SQLiteに接続する
self.con = sqlite3.connect(self.dbpath)
self.con.text_factory = str
def get_html(self, url):
'''指定したurlのhtmlを取得する
'''
time.sleep(self.INTERVAL_TIME)
for n in range(self.MAX_N_RETRY):
time.sleep(10 * n * self.INTERVAL_TIME)
try:
res = requests.session().get(url)
except requests.ConnectionError:
print('\nConnection aborted.\n')
res = None
if res and res.status_code == 200:
return res.text
else:
print('\nretry (WarsCrawler.get_html())\n')
sys.stdout.flush()
else:
sys.exit('Exceeded MAX_N_RETRY (WarsCrawler.get_html())')
def get_url(self, user, gtype, max_iter=10):
''' 指定したユーザーの棋譜を取得する.
Args
----------
user: string
User name
gtype: string
Kifu type
max_iter: int, optional (default=10)
取得する最大数.
10個ずつ取得するたびに判定しており厳密には守るつもりはない.
'''
url_list = []
start = 1
url = ('http://shogiwars.heroz.jp/users/history/'
'{user}?gtype={gtype}&start={start}')
pattern = 'http://shogiwars.heroz.jp:3002/games/[\w\d_-]+'
while start <= max_iter:
url = url.format(user=user, gtype=gtype, start=start)
text = self.get_html(url)
match = re.findall(pattern, text)
if match:
url_list.extend(match)
start += len(match)
else:
break
return url_list
def get_kifu(self, url):
''' urlが指す棋譜とそれに関する情報を辞書にまとめて返す.
'''
html = self.get_html(url)
res = re.findall(GAME_HEADER_PATTERN, html)[0]
# keyがquoteされていないので対処する
d = yaml.load(re.sub('[{}\t,]', ' ', res))
d['user0'], d['user1'], d['date'] = d['name'].split('-')
wars_csa = re.findall(WCSA_PATTERN, html)[0]
d['wcsa'] = wars_csa
d['datetime'] = dt.datetime.strptime(d['date'], '%Y%m%d_%H%M%S')
d['csa'] = self.wcsa_to_csa(d)
return d
def get_all_kifu(self, csvpath):
''' url_list 中の url の指す棋譜を取得,SQLiteに追加.
Args
----------
csvpath: string
クロールするurlの入っているcsvのパス
'''
df_crawled = pd.read_csv(csvpath)
not_crawled = df_crawled.query('crawled==0')
if not_crawled.empty:
print('{0}のファイルは全て取得済み'.format(csvpath))
return None
url_list = list(not_crawled.url)
sec = len(url_list) * self.INTERVAL_TIME
finish_time = dt.datetime.now() + dt.timedelta(seconds=sec)
# 途中経過の表示
print('{0}件の棋譜'.format(len(url_list)))
print('棋譜収集終了予定時刻 : {0}'.format(finish_time))
sys.stdout.flush()
df = pd.DataFrame()
for _url in url_list:
kifu = self.get_kifu(_url)
df = df.append(kifu, ignore_index=True)
df_crawled.loc[df_crawled.url == _url, 'crawled'] = 1
df_crawled.to_csv(csvpath, index=False)
if not df.empty:
df.to_sql('kifu', self.con, index=False, if_exists='append')
return df
else:
return None
def get_users(self, title, max_page=10):
'''大会名を指定して,その大会の上位ユーザーのidを取ってくる
Examples
----------
将棋ウォーズ第4回名人戦に参加しているユーザーのidを取得する.
>>> wcrawler.get_users('meijin4', max_page=100)
'''
page = 0
url = 'http://shogiwars.heroz.jp/events/{title}?start={page}'
results = []
while page < max_page:
_url = url.format(title=title, page=page)
print(_url)
sys.stdout.flush()
html = self.get_html(_url)
_users = re.findall(r'\/users\/(\w+)', html)
# _usersが空でない場合追加.そうでなければbreak
if _users:
results.extend(_users)
page += 25
else:
break
return results
def get_kifu_url(self, users, gtype, csvpath,
max_iter=10, if_exists='append'):
'''ユーザー名とgtypeを指定して棋譜のurlを取得する.
Args
----------
users: string
ユーザー名
gtype: string
gtype
csvpath: string
棋譜のurlを保存するCSVファイルのパス
max_iter: string, optional (default=10)
最大試行回数
'''
url_list = []
print('\ngtype:{0}, max_iter:{1}\n'.format(gtype, max_iter))
for _user in users:
print(_user)
_url_list = self.get_url(_user, gtype=gtype, max_iter=max_iter)
url_list.extend(_url_list)
df = pd.DataFrame(url_list)
df.ix[:, 1] = 0
df.columns = ['url', 'crawled']
if os.path.exists(csvpath) and if_exists == 'append':
df_before = pd.read_csv(csvpath, index_col=0)
df = pd.concat([df_before, df], axis=0, ignore_index=True)
df.to_csv(csvpath)
return df
def wcsa_to_csa(self, d):
'''将棋ウォーズ専用?のCSA形式を一般のCSA形式に変換する.
Args
----------
wars_csa
将棋ウォーズ特有のCSA形式で表された棋譜の文字列
gtype
処理したい棋譜のgtype
'''
wcsa_list = re.split(r'[,\t]', d['wcsa'])
time_up = ['\tGOTE_WIN_TIMEOUT',
'\tGOTE_WIN_DISCONNECT',
'\tGOTE_WIN_TORYO']
if d['wcsa'] in time_up:
# 1手も指さずに時間切れ or 接続切れ or 投了
return '%TIME_UP'
if d['gtype'] == gtype_dict['10m']:
max_time = 60 * 10
rule = '\'持ち時間:10分、切れ負け'
time_limit = '$TIME_LIMIT:00:10+00'
elif d['gtype'] == gtype_dict['3m']:
max_time = 60 * 3
rule = '\'持ち時間:3分、切れ負け'
time_limit = '$TIME_LIMIT:00:03+00'
elif d['gtype'] == gtype_dict['10s']:
max_time = 3600
rule = '\'初手から10秒'
time_limit = '$TIME_LIMIT:00:00+10'
else:
print('Error: gtypeに不正な値; gtype={0}'.format(d['gtype']))
sente_prev_remain_time = max_time
gote_prev_remain_time = max_time
results = [
'\'バージョン', 'V2.2',
'\'対局者名', 'N+', d['user0'], 'N-', d['user1'],
'\'棋譜情報', '\'棋戦名', '$EVENT:', '将棋ウォーズ',
'\'対局場所', '$SITE:shogiwars.heroz.jp',
'\'開始日時',
'$START_TIME:' + d['datetime'].strftime('%Y/%m/%d %H:%M:%S'),
rule, time_limit,
'\'先手番', '+',
'\'指し手と消費時間'
]
for i, w in enumerate(wcsa_list):
if i % 2 == 0:
# 駒の動き,あるいは特殊な命令の処理
# CAUTION: 仕様がわからないので全部網羅できているかわからない
if w.find('TORYO') > 0 or w.find('DISCONNECT') > 0:
w_ap = '%TORYO'
elif w.find('TIMEOUT') > 0:
w_ap = '%TIME_UP'
elif w.find('DRAW_SENNICHI') > 0:
w_ap = '%SENNICHITE'
else:
w_ap = w
results.append(w_ap)
else:
# 時間の行の処理
if (i - 1) % 4 == 0:
# 先手の残り時間を計算
sente_remain_time = int(w[1:])
_time = sente_prev_remain_time - sente_remain_time
sente_prev_remain_time = sente_remain_time
else:
# 後手の残り時間を計算
gote_remain_time = int(w[1:])
_time = gote_prev_remain_time - gote_remain_time
gote_prev_remain_time = gote_remain_time
results.append('T' + str(_time))
return '\n'.join(results)