-
Notifications
You must be signed in to change notification settings - Fork 19
/
BookBuilder.py
390 lines (298 loc) · 20.6 KB
/
BookBuilder.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
import io
import os
import logging
from typing import Callable
import chess
import chess.pgn
from gui_generation_status import GenerationStatus
from settings import Settings
from workerEngineReduce import WorkerPlay
import chess.engine
log_level = logging.DEBUG
logging.basicConfig(level=log_level)
logging.getLogger("chess.pgn").setLevel(logging.CRITICAL)
working_dir = os.getcwd()
class Rooter:
def __init__(self, settings, status, engine, pgn):
self.pgn = pgn
self._calculate_pgns(settings, status, engine)
def _calculate_pgns(self, settings, status, engine):
try:
game = chess.pgn.read_game(io.StringIO(self.pgn)) #reads the PGN submitted by the user
except:
self.status.error(f"Invalid PGN {self.pgn}")
raise Exception(f'Invalid PGN {self.pgn}') #error if user submitted PGN is invalid
board = game.board()
moves = list(game.mainline_moves()) #we create a list of pgn moves in UCI
logging.debug(moves)
if len(moves) % 2 == 0: #if even moves in pgn, we are black. if odd, white.
perspective = chess.BLACK
self.perspective_str = 'Black'
else:
perspective = chess.WHITE
self.perspective_str = 'White'
self.likelihood = 1 #likelihood of oppoonent playing moves starts at 100%
self.likelihood_path = []
validContinuations = []
pgnList = []
for move in moves: #we iterate through each move in the PGN/UCI generated
if board.turn != perspective: #if it's not our move we check the likelihood the move in the PGN was played
workerPlay = WorkerPlay(settings, status, engine, board.fen()) #we are calling the API each time
move_stats, chance = workerPlay.find_opponent_move(move) #we look for the PGN move in the API response, and return the odds of it being played
self.likelihood *= chance #we are creating a cumulative likelihood from each played move in the PGN
self.likelihood_path.append((move_stats['san'], chance)) #we are creating a list of PGN moves with the chance of each of them being played 0-1
logging.debug(f"likelihoods to get here: {self.likelihood_path}")
logging.debug(f"cumulative likelihood {'{:+.2%}'.format(self.likelihood)}", )
board.push(move) #play each move in the PGN
#now we have the likelihood path and cumulative likelihood of each opponent move in the PGN, so pgn can go to leafer
pgnPlus = self.pgn, self.likelihood, self.likelihood_path
pgnsreturned.append(pgnPlus)
logging.debug(f"sent from rooter: {pgnsreturned}")
class Leafer:
def __init__(self, settings, status, engine, pgn, cumulative, likelyPath):
self.pgn = pgn
self.cumulative = cumulative
self.likelyPath = likelyPath
self._calculate_pgns(settings, status, engine)
def _calculate_pgns(self, settings, status, engine):
moveSelection = settings.moveSelection
try:
game = chess.pgn.read_game(io.StringIO(self.pgn)) #reads the PGN submitted by the user
except:
self.status.error(f"Invalid PGN {self.pgn}")
raise Exception(f'Invalid PGN {self.pgn}') #error if user submitted PGN is invalid
board = game.board()
moves = list(game.mainline_moves()) #we create a list of pgn moves in UCI
logging.debug(moves)
if len(moves) % 2 == 0: #if even moves in pgn, we are black. if odd, white.
perspective = chess.BLACK
self.perspective_str = 'Black'
else:
perspective = chess.WHITE
self.perspective_str = 'White'
self.likelihood = self.cumulative #likelihood of oppoonent playing moves starts at 100%
self.likelihood_path = self.likelyPath
validContinuations = []
pgnList = []
for move in moves: #we iterate through each move in the PGN/UCI generated
board.push(move) #play each move in the PGN
#we find all continuations
self.workerPlay = WorkerPlay(settings, status, engine, board.fen()) #we call the api to get the stats in the position
continuations = self.workerPlay.find_move_tree() #list all continuations
#logging.debug(continuations)
for move in continuations:
continuationLikelihood = float(move['playrate']) * float(self.likelihood)
if (continuationLikelihood >= (float(moveSelection.depth_likelihood))) and (move['total_games'] > moveSelection.continuation_games): #we eliminate continuations that don't meet depth likelihood or minimum games
move ['cumulativeLikelihood'] = (continuationLikelihood)
validContinuations.append(move)
#print (float(move['playrate']),float(self.likelihood),float(settings.moveSelection.depth_likelihood))
#logging.debug(continuationLikelihood)
logging.debug (f'valid continuations: {validContinuations}')
#now we iterate through each valid continuation, and find our best response
for move in validContinuations:
board.push_san(move['san']) #we play each valid continuation
self.likelihood_path.append((move['san'], move['playrate'])) #we add the continuation to the likelihood path
#we look for the best move for us to play
self.workerPlay = WorkerPlay(settings, status, engine, board.fen())
_, self.best_move, self.potency, self.potency_range, self.total_games = self.workerPlay.pick_candidate() #list best candidate move, win rate,
print_playrate = '{:+.2%}'.format(move['playrate'])
print_cumulativelikelihood = '{:+.2%}'.format(move['cumulativeLikelihood'])
print_winrate = "{:+.2%}".format(self.potency)
print_potency_range = ["{:.2%}".format(x) for x in self.potency_range]
logging.debug(f"against {move['san']} played {print_playrate} cumulative playrate {print_cumulativelikelihood} our best move {self.best_move} win rate is {print_winrate} with a range of {print_potency_range} over {self.total_games} games")
#we check our response playrate and minimum played games meet threshold. if so we pass the pgn. if not we add pgn to final list
if (move['playrate'] > moveSelection.min_play_rate) and (self.total_games > moveSelection.min_games) and (self.potency != 0):
#we add the pgn of the continuation and our best move to a list
if self.perspective_str == 'Black':
newpgn = self.pgn + " " + str(board.fullmove_number) + ". " + str(move['san']) #we add opponent's continuations first
newpgn = newpgn + " " + str(self.best_move) #then our best response
pgnPlus = [newpgn, move ['cumulativeLikelihood'], self.likelihood_path[:]]
#need to return a pgn as well as moves + chance + cumulative likelihood
else:
newpgn = self.pgn + " " + move['san'] #we add opponent's continuations first
newpgn = newpgn + " " + str(board.fullmove_number) + ". " + str(self.best_move) #then our best response
pgnPlus = [newpgn, move ['cumulativeLikelihood'], self.likelihood_path[:]]
logging.debug(f"full new pgn after our move is {newpgn}")
#we make a list of pgns that we want to feed back into the algorithm, along with cumulative winrates
pgnList.append(pgnPlus)
#logging.debug(pgnList)
del self.likelihood_path [-1] #we remove the continuation from the likelihood path
board.pop() #we go back a move to undo the continuation
else:
if settings.engine.enabled and settings.engine.finish: #if we want engine to finish lines where no good move data exists
#we ask the engine the best move
depth = settings.engine.depth
status.info2(f"Running engine for '{board.fen()}' at depth {depth}, this can take a while")
PlayResult = engine.play(board, chess.engine.Limit(depth=depth)) #we get the engine to finish the line
board.push(PlayResult.move)
logging.debug(f"engine finished {PlayResult.move}")
board.pop() #we go back a move to undo the engine
engineMove = board.san(PlayResult.move)
#we add the pgn of the continuation and our best move to a list
if self.perspective_str == 'Black':
newpgn = self.pgn + " " + str(board.fullmove_number) + ". " + str(move['san']) #we add opponent's continuations first
newpgn = newpgn + " " + str(engineMove) #then our best response
pgnPlus = [newpgn, move ['cumulativeLikelihood'], self.likelihood_path[:]]
#need to return a pgn as well as moves + chance + cumulative likelihood
else:
newpgn = self.pgn + " " + move['san'] #we add opponent's continuations first
newpgn = newpgn + " " + str(board.fullmove_number) + ". " + str(engineMove) #then our best response
pgnPlus = [newpgn, move ['cumulativeLikelihood'], self.likelihood_path[:]]
logging.debug(f"full new pgn after our move is {newpgn}")
#we make a list of pgns that we want to feed back into the algorithm, along with cumulative winrates
pgnList.append(pgnPlus)
#logging.debug(pgnList)
del self.likelihood_path [-1] #we remove the continuation from the likelihood path
board.pop() #we go back a move to undo the continuation
else:
logging.debug(f"we find no good reply to {self.pgn} {move['san']}")
board.pop() #we go back a move to undo the continuation
del self.likelihood_path [-1] #we remove the continuation from the likelihood path
#we find potency and other stats
self.workerPlay = WorkerPlay(settings, status, engine, board.fen()) #we call the api to get the stats in the final position
lineWinRate, totalLineGames, throwawayDraws = self.workerPlay.find_potency() #we get the win rate and games played in the final position
logging.debug (f'saving no reply line {self.pgn} {self.likelihood} {self.likelihood_path} {lineWinRate} {totalLineGames}')
line = (self.pgn, self.likelihood, self.likelihood_path,lineWinRate, totalLineGames)
finalLine.append(line) #we add line to final line list
global pgnsreturned #we make a globally accessible variable for the new pgns returned for each continuation
pgnsreturned = pgnList #we define the variable as the completely made list of continuations and responses and send it to be extended to second list
#if there are no valid continuations we save the line to a file
if not validContinuations:
logging.debug (f'no valid continuations to {self.pgn}')
#we find potency and other stats
self.workerPlay = WorkerPlay(settings, status, engine, board.fen()) #we call the api to get the stats in the final position
lineWinRate, totalLineGames, throwawayDraws = self.workerPlay.find_potency() #we get the win rate and games played in the final position
if (totalLineGames == 0) and (lineWinRate == None): #if the line ends in mate there are no games played from the position so we need to populate games number from last move
board.pop()
self.workerPlay = WorkerPlay(settings, status, engine, board.fen()) #we call the api to get the stats in the final position
throwawayWinRate, totalLineGames, throwawayDraws = self.workerPlay.find_potency() #we get the games played in the pre Mate position
lineWinRate = 1 #we make line win rate 1
logging.debug(f'line ends in mate')
else:
if (totalLineGames < moveSelection.min_games) : #if our response is an engine 'novelty' there is no reliable lineWinRate or total games
board.pop() #we go back to opponent's move
self.workerPlay = WorkerPlay(settings, status, engine, board.fen())
lineWinRate, totalLineGames, draws = self.workerPlay.find_potency()
if moveSelection.draws_are_half: #if draws are half we inverse the winrate on the last move, and add half the draws
lineWinRate = 1 - lineWinRate + (0.5 * draws)
logging.debug(f"total games on previous move: {totalLineGames}, draws are wins and our move is engine 'almost novelty' so win rate based on previous move is {lineWinRate}")
else:
lineWinRate = 1 - lineWinRate - draws #if draws aren't half we inverse the winrate and remove minus the draws
logging.debug(f"total games on previous move: {totalLineGames}, draws aren't wins and our move is engine 'almost novelty' so win rate based on prev move is {lineWinRate}")
line = (self.pgn, self.likelihood, self.likelihood_path, lineWinRate, totalLineGames)
finalLine.append(line) #we add line to final line list
class Printer:
def __init__(self, settings, filepath):
self.settings = settings
self.filepath = filepath
with open(self.filepath, 'w') as f:
f.write('')
logging.info(f"Created new file at: {self.filepath}")
def print(self, pgn, cumulative, likelyPath, winRate, Games, lineNumber, openingName):
with open(self.filepath, 'a') as file:
pgnEvent = '[Event "' + openingName + " Line " + str(lineNumber) + '"]' #we name the event whatever you put in config
# annotation = "{likelihoods to get here:" + str(self.likelihood_path) + ". Cumulative likelihood" + str("{:+.2%}".format(self.likelihood)) + " }" #we create annotation with opponent move likelihoods and our win rate
file.write('\n' + '\n' + '\n' + pgnEvent + '\n' ) #write name of pgn
file.write ('\n' + pgn) #write pgn
file.write('\n' + "{Move playrates:") #start annotations
for move, chance in likelyPath:
moveAnnotation = str("{:+.2%}".format(chance)) + '\t' + move
file.write ('\n' + moveAnnotation)
#we write them in as annotations
if self.settings.moveSelection.draws_are_half:
lineAnnotations = "Line cumulative playrate: " + str("{:+.2%}".format(cumulative)) + '\n' + "Line winrate (draws are half): " + str("{:+.2%}".format(winRate)) + ' over ' + str(Games) + ' games'
else:
lineAnnotations = "Line cumulative playrate: " + str("{:+.2%}".format(cumulative)) + '\n' + "Line winrate (excluding draws): " + str("{:+.2%}".format(winRate)) + ' over ' + str(Games) + ' games'
file.write('\n' + lineAnnotations)
file.write("}") #end annotations
logging.info(f"Wrote data to {self.filepath}")
class Grower:
is_running = False
settings = None
status = None
engine = None
# todo: this method needs to be synchronised, and main logic should run in a separate thread
def run(self, settings: Settings, status: GenerationStatus, callback: Callable):
if self.is_running:
logging.info("Repertoire generation is already running")
return
self.is_running = True
self.settings = settings
self.status = status
self.start_engine()
try:
for chapter, opening in enumerate(settings.book.get_books(), 1):
status.info(f"Generating book #{chapter} '{opening.name}' for PGN '{opening.pgn}'")
self.iterator(chapter, opening.name, opening.pgn)
callback()
except Exception as e:
logging.error(e)
finally:
self.stop()
def stop(self):
if self.engine:
self.engine.quit()
self.is_running = False
def start_engine(self):
if not self.settings.engine.enabled:
self.engine = None
return
engine = chess.engine.SimpleEngine.popen_uci(self.settings.engine.path)
engine.configure({"Hash": self.settings.engine.hash})
engine.configure({"Threads": self.settings.engine.threads})
logging.getLogger('chess.engine').setLevel(logging.INFO)
self.engine = engine
def iterator(self, chapter, openingName, openingPgn):
global finalLine
finalLine = []
global pgnsreturned #we make a globally accessible variable for the new pgns returned by Rooter
pgnsreturned = []
Rooter(self.settings, self.status, self.engine, openingPgn)
secondList = []
secondList.extend(pgnsreturned) #we create list of pgns and cumulative probabilities returned by starter, calling the api each move
print ("second list",secondList)
# #we iterate through these with leafer, calling the api only for new moves.
i = 0
while i < len(secondList):
for pgn, cumulative, likelyPath in secondList:
Leafer(self.settings, self.status, self.engine, pgn, cumulative, likelyPath)
secondList.extend(pgnsreturned)
i += 1
# logging.debug("iterative",secondList)
#print ("final line list: ", finalLine)
#we remove duplicate lines
uniqueFinalLine = []
for line in finalLine:
if line not in uniqueFinalLine:
uniqueFinalLine.append(line)
# logging.debug(f"unique lines with subsets {uniqueFinalLine}")
printerFinalLine = [] #we prepare a list ready for printing
# we remove lines that are subsets of other lines because no valid repsonse was found
for line in uniqueFinalLine:
uniqueFinalLinestring = str(uniqueFinalLine)
lineString = str(line[0]) + " "
lineCount = uniqueFinalLinestring.count(lineString)
if lineCount == 0:
printerFinalLine.append(line) #we add line to go to print
else:
logging.debug(f"duplicate line {line}")
logging.debug(f"final line count { lineCount+1 } for line {lineString}")
logging.debug(f'we sort the lines by consecutive move probabilities')
def extract_key(printerFinalLine):
return [v for _, v in printerFinalLine[2]]
printerFinalLine = sorted(printerFinalLine, key=extract_key)
for line in printerFinalLine:
logging.debug (line[0])
logging.debug (f'we reverse the sort to make long to short')
if self.settings.book.order.LONG_TO_SHORT:
printerFinalLine.reverse() #we make the longest (main lines) first
for line in printerFinalLine:
logging.debug (line[0])
#we print the final list of lines
logging.debug(f'number of final lines {len(printerFinalLine)}')
logging.debug(f'final line sorted {printerFinalLine}')
printer = Printer(self.settings, f"{working_dir}/Chapter_{chapter}_{openingName}.pgn")
lineNumber = 1
for pgn, cumulative, likelyPath, winRate, Games in printerFinalLine:
printer.print(pgn, cumulative, likelyPath, winRate, Games, lineNumber, openingName)
lineNumber += 1