merge class strategy with class game. Next, merge Go with GoEnv

This commit is contained in:
Dong Yan 2017-12-15 22:19:44 +08:00
parent 00f599bba3
commit 6cb4b02fca
9 changed files with 244 additions and 730 deletions

View File

@ -10,3 +10,7 @@ Connecting our own policy-value neural network with leela-zero.
## checkpoints:
Weights of the policy-value neural network
## File Specification

View File

@ -188,7 +188,6 @@ class GTPEngine():
def cmd_show_board(self, args, **kwargs):
return self._game.board, True
if __name__ == "main":
game = Game()
engine = GTPEngine(game_obj=Game)

View File

@ -10,232 +10,40 @@ import copy
import tensorflow as tf
import numpy as np
import sys
import go
import network_small
import strategy
from collections import deque
from tianshou.core.mcts.mcts import MCTS
import Network
from strategy import strategy
'''
(1, 1) is considered as the upper left corner of the board,
(size, 1) is the lower left
'''
DELTA = [[1, 0], [-1, 0], [0, -1], [0, 1]]
class Executor:
def __init__(self, **kwargs):
self.game = kwargs['game']
def _bfs(self, vertex, color, block, status, alive_break):
block.append(vertex)
status[self.game._flatten(vertex)] = True
nei = self._neighbor(vertex)
for n in nei:
if not status[self.game._flatten(n)]:
if self.game.board[self.game._flatten(n)] == color:
self._bfs(n, color, block, status, alive_break)
def _find_block(self, vertex, alive_break=False):
block = []
status = [False] * (self.game.size * self.game.size)
color = self.game.board[self.game._flatten(vertex)]
self._bfs(vertex, color, block, status, alive_break)
for b in block:
for n in self._neighbor(b):
if self.game.board[self.game._flatten(n)] == utils.EMPTY:
return False, block
return True, block
def _find_boarder(self, vertex):
block = []
status = [False] * (self.game.size * self.game.size)
self._bfs(vertex, utils.EMPTY, block, status, False)
border = []
for b in block:
for n in self._neighbor(b):
if not (n in block):
border.append(n)
return border
def _is_qi(self, color, vertex):
nei = self._neighbor(vertex)
for n in nei:
if self.game.board[self.game._flatten(n)] == utils.EMPTY:
return True
self.game.board[self.game._flatten(vertex)] = color
for n in nei:
if self.game.board[self.game._flatten(n)] == utils.another_color(color):
can_kill, block = self._find_block(n)
if can_kill:
self.game.board[self.game._flatten(vertex)] = utils.EMPTY
return True
### can not suicide
can_kill, block = self._find_block(vertex)
if can_kill:
self.game.board[self.game._flatten(vertex)] = utils.EMPTY
return False
self.game.board[self.game._flatten(vertex)] = utils.EMPTY
return True
def _check_global_isomorphous(self, color, vertex):
##backup
_board = copy.copy(self.game.board)
self.game.board[self.game._flatten(vertex)] = color
self._process_board(color, vertex)
if self.game.board in self.game.history:
res = True
else:
res = False
self.game.board = _board
return res
def _in_board(self, vertex):
x, y = vertex
if x < 1 or x > self.game.size: return False
if y < 1 or y > self.game.size: return False
return True
def _neighbor(self, vertex):
x, y = vertex
nei = []
for d in DELTA:
_x = x + d[0]
_y = y + d[1]
if self._in_board((_x, _y)):
nei.append((_x, _y))
return nei
def _process_board(self, color, vertex):
nei = self._neighbor(vertex)
for n in nei:
if self.game.board[self.game._flatten(n)] == utils.another_color(color):
can_kill, block = self._find_block(n, alive_break=True)
if can_kill:
for b in block:
self.game.board[self.game._flatten(b)] = utils.EMPTY
def is_valid(self, color, vertex):
### in board
if not self._in_board(vertex):
return False
### already have stone
if not self.game.board[self.game._flatten(vertex)] == utils.EMPTY:
return False
### check if it is qi
if not self._is_qi(color, vertex):
return False
if self._check_global_isomorphous(color, vertex):
return False
return True
def do_move(self, color, vertex):
if not self.is_valid(color, vertex):
return False
self.game.board[self.game._flatten(vertex)] = color
self._process_board(color, vertex)
self.game.history.append(copy.copy(self.game.board))
self.game.past.append(copy.copy(self.game.board))
return True
def _find_empty(self):
idx = [i for i,x in enumerate(self.game.board) if x == utils.EMPTY ][0]
return self.game._deflatten(idx)
def get_score(self, is_unknown_estimation = False):
'''
is_unknown_estimation: whether use nearby stone to predict the unknown
return score from BLACK perspective.
'''
_board = copy.copy(self.game.board)
while utils.EMPTY in self.game.board:
vertex = self._find_empty()
boarder = self._find_boarder(vertex)
boarder_color = set(map(lambda v: self.game.board[self.game._flatten(v)], boarder))
if boarder_color == {utils.BLACK}:
self.game.board[self.game._flatten(vertex)] = utils.BLACK
elif boarder_color == {utils.WHITE}:
self.game.board[self.game._flatten(vertex)] = utils.WHITE
elif is_unknown_estimation:
self.game.board[self.game._flatten(vertex)] = self._predict_from_nearby(vertex)
else:
self.game.board[self.game._flatten(vertex)] =utils.UNKNOWN
score = 0
for i in self.game.board:
if i == utils.BLACK:
score += 1
elif i == utils.WHITE:
score -= 1
score -= self.game.komi
self.game.board = _board
return score
def _predict_from_nearby(self, vertex, neighbor_step = 3):
'''
step: the nearby 3 steps is considered
:vertex: position to be estimated
:neighbor_step: how many steps nearby
:return: the nearby positions of the input position
currently the nearby 3*3 grid is returned, altogether 4*8 points involved
'''
for step in range(1, neighbor_step + 1): # check the stones within the steps in range
neighbor_vertex_set = []
self._add_nearby_stones(neighbor_vertex_set, vertex[0] - step, vertex[1], 1, 1, neighbor_step)
self._add_nearby_stones(neighbor_vertex_set, vertex[0], vertex[1] + step, 1, -1, neighbor_step)
self._add_nearby_stones(neighbor_vertex_set, vertex[0] + step, vertex[1], -1, -1, neighbor_step)
self._add_nearby_stones(neighbor_vertex_set, vertex[0], vertex[1] - step, -1, 1, neighbor_step)
color_estimate = 0
for neighbor_vertex in neighbor_vertex_set:
color_estimate += self.game.board[self.game._flatten(neighbor_vertex)]
if color_estimate > 0:
return utils.BLACK
elif color_estimate < 0:
return utils.WHITE
def _add_nearby_stones(self, neighbor_vertex_set, start_vertex_x, start_vertex_y, x_diff, y_diff, num_step):
'''
add the nearby stones around the input vertex
:param neighbor_vertex_set: input list
:param start_vertex_x: x axis of the input vertex
:param start_vertex_y: y axis of the input vertex
:param x_diff: add x axis
:param y_diff: add y axis
:param num_step: number of steps to be added
:return:
'''
for step in xrange(num_step):
new_neighbor_vertex = (start_vertex_x, start_vertex_y)
if self._in_board(new_neighbor_vertex):
neighbor_vertex_set.append((start_vertex_x, start_vertex_y))
start_vertex_x += x_diff
start_vertex_y += y_diff
#from strategy import strategy
class Game:
'''
Load the real game and trained weights.
TODO : Maybe merge with the engine class in future,
currently leave it untouched for interacting with Go UI.
'''
def __init__(self, size=9, komi=6.5, checkpoint_path=None):
self.size = size
self.komi = komi
self.board = [utils.EMPTY] * (self.size * self.size)
self.strategy = strategy(checkpoint_path)
# self.strategy = None
self.executor = Executor(game=self)
self.history = []
self.past = deque(maxlen=8)
for _ in range(8):
self.past.append(self.board)
self.executor = go.Go(game=self)
#self.strategy = strategy(checkpoint_path)
self.simulator = strategy.GoEnv()
self.net = network_small.Network()
self.sess = self.net.forward(checkpoint_path)
self.evaluator = lambda state: self.sess.run([tf.nn.softmax(self.net.p), self.net.v],
feed_dict={self.net.x: state, self.net.is_training: False})
def _flatten(self, vertex):
x, y = vertex
return (y - 1) * self.size + (x - 1)
@ -245,7 +53,6 @@ class Game:
y = idx // self.size + 1
return (x,y)
def clear(self):
self.board = [utils.EMPTY] * (self.size * self.size)
self.history = []
@ -259,8 +66,30 @@ class Game:
def set_komi(self, k):
self.komi = k
def check_valid(self, color, vertex):
return self.executor.is_valid(color, vertex)
def data_process(self, history, color):
state = np.zeros([1, self.simulator.size, self.simulator.size, 17])
for i in range(8):
state[0, :, :, i] = np.array(np.array(history[i]) == np.ones(self.simulator.size ** 2)).reshape(self.simulator.size, self.simulator.size)
state[0, :, :, i + 8] = np.array(np.array(history[i]) == -np.ones(self.simulator.size ** 2)).reshape(self.simulator.size, self.simulator.size)
if color == utils.BLACK:
state[0, :, :, 16] = np.ones([self.simulator.size, self.simulator.size])
if color == utils.WHITE:
state[0, :, :, 16] = np.zeros([self.simulator.size, self.simulator.size])
return state
def strategy_gen_move(self, history, color):
self.simulator.history = copy.copy(history)
self.simulator.board = copy.copy(history[-1])
state = self.data_process(self.simulator.history, color)
mcts = MCTS(self.simulator, self.evaluator, state, self.simulator.size ** 2 + 1, inverse=True, max_step=10)
temp = 1
prob = mcts.root.N ** temp / np.sum(mcts.root.N ** temp)
choice = np.random.choice(self.simulator.size ** 2 + 1, 1, p=prob).tolist()[0]
if choice == self.simulator.size ** 2:
move = utils.PASS
else:
move = (choice % self.simulator.size + 1, choice / self.simulator.size + 1)
return move, prob
def do_move(self, color, vertex):
if vertex == utils.PASS:
@ -271,7 +100,7 @@ class Game:
def gen_move(self, color):
# move = self.strategy.gen_move(color)
# return move
move, self.prob = self.strategy.gen_move(self.past, color)
move, self.prob = self.strategy_gen_move(self.past, color)
self.do_move(color, move)
return move
@ -295,7 +124,6 @@ class Game:
print('')
sys.stdout.flush()
if __name__ == "__main__":
g = Game()
g.show_board()

View File

@ -1,428 +1,212 @@
'''
A board is a NxN numpy array.
A Coordinate is a tuple index into the board.
A Move is a (Coordinate c | None).
A PlayerMove is a (Color, Move) tuple
(0, 0) is considered to be the upper left corner of the board, and (18, 0) is the lower left.
'''
from collections import namedtuple
from __future__ import print_function
import utils
import copy
import itertools
import sys
from collections import deque
import numpy as np
'''
Settings of the Go game.
# Represent a board as a numpy array, with 0 empty, 1 is black, -1 is white.
# This means that swapping colors is as simple as multiplying array by -1.
WHITE, EMPTY, BLACK, FILL, KO, UNKNOWN = range(-1, 5)
(1, 1) is considered as the upper left corner of the board,
(size, 1) is the lower left
'''
NEIGHBOR_OFFSET = [[1, 0], [-1, 0], [0, -1], [0, 1]]
class PlayerMove(namedtuple('PlayerMove', ['color', 'move'])): pass
class Go:
def __init__(self, **kwargs):
self.game = kwargs['game']
def _bfs(self, vertex, color, block, status, alive_break):
block.append(vertex)
status[self.game._flatten(vertex)] = True
nei = self._neighbor(vertex)
for n in nei:
if not status[self.game._flatten(n)]:
if self.game.board[self.game._flatten(n)] == color:
self._bfs(n, color, block, status, alive_break)
# Represents "group not found" in the LibertyTracker object
MISSING_GROUP_ID = -1
def _find_block(self, vertex, alive_break=False):
block = []
status = [False] * (self.game.size * self.game.size)
color = self.game.board[self.game._flatten(vertex)]
self._bfs(vertex, color, block, status, alive_break)
for b in block:
for n in self._neighbor(b):
if self.game.board[self.game._flatten(n)] == utils.EMPTY:
return False, block
return True, block
class IllegalMove(Exception): pass
def _find_boarder(self, vertex):
block = []
status = [False] * (self.game.size * self.game.size)
self._bfs(vertex, utils.EMPTY, block, status, False)
border = []
for b in block:
for n in self._neighbor(b):
if not (n in block):
border.append(n)
return border
# these are initialized by set_board_size
N = None
ALL_COORDS = []
EMPTY_BOARD = None
NEIGHBORS = {}
DIAGONALS = {}
def set_board_size(n):
'''
Hopefully nobody tries to run both 9x9 and 19x19 game instances at once.
Also, never do "from go import N, W, ALL_COORDS, EMPTY_BOARD".
'''
global N, ALL_COORDS, EMPTY_BOARD, NEIGHBORS, DIAGONALS
if N == n: return
N = n
ALL_COORDS = [(i, j) for i in range(n) for j in range(n)]
EMPTY_BOARD = np.zeros([n, n], dtype=np.int8)
def check_bounds(c):
return c[0] % n == c[0] and c[1] % n == c[1]
NEIGHBORS = {(x, y): list(filter(check_bounds, [(x + 1, y), (x - 1, y), (x, y + 1), (x, y - 1)])) for x, y in
ALL_COORDS}
DIAGONALS = {(x, y): list(filter(check_bounds, [(x + 1, y + 1), (x + 1, y - 1), (x - 1, y + 1), (x - 1, y - 1)]))
for x, y in ALL_COORDS}
def place_stones(board, color, stones):
for s in stones:
board[s] = color
def find_reached(board, c):
# that can reach from one place
color = board[c]
chain = set([c])
reached = set()
frontier = [c]
while frontier:
current = frontier.pop()
chain.add(current)
for n in NEIGHBORS[current]:
if board[n] == color and (not n in chain):
frontier.append(n)
elif board[n] != color:
reached.add(n)
return chain, reached
def is_koish(board, c):
'Check if c is surrounded on all sides by 1 color, and return that color'
if board[c] != EMPTY: return None
neighbors = {board[n] for n in NEIGHBORS[c]}
if len(neighbors) == 1 and not EMPTY in neighbors:
return list(neighbors)[0]
else:
return None
def is_eyeish(board, c):
'Check if c is an eye, for the purpose of restricting MC rollouts.'
color = is_koish(board, c)
if color is None:
return None
diagonal_faults = 0
diagonals = DIAGONALS[c]
if len(diagonals) < 4:
diagonal_faults += 1
for d in diagonals:
if not board[d] in (color, EMPTY):
diagonal_faults += 1
if diagonal_faults > 1:
return None
else:
return color
class Group(namedtuple('Group', ['id', 'stones', 'liberties', 'color'])):
'''
stones: a set of Coordinates belonging to this group
liberties: a set of Coordinates that are empty and adjacent to this group.
color: color of this group
'''
def __eq__(self, other):
return self.stones == other.stones and self.liberties == other.liberties and self.color == other.color
class LibertyTracker(object):
@staticmethod
def from_board(board):
board = np.copy(board)
curr_group_id = 0
lib_tracker = LibertyTracker()
for color in (WHITE, BLACK):
while color in board:
curr_group_id += 1
found_color = np.where(board == color)
coord = found_color[0][0], found_color[1][0]
chain, reached = find_reached(board, coord)
liberties = set(r for r in reached if board[r] == EMPTY)
new_group = Group(curr_group_id, chain, liberties, color)
lib_tracker.groups[curr_group_id] = new_group
for s in chain:
lib_tracker.group_index[s] = curr_group_id
place_stones(board, FILL, chain)
lib_tracker.max_group_id = curr_group_id
liberty_counts = np.zeros([N, N], dtype=np.uint8)
for group in lib_tracker.groups.values():
num_libs = len(group.liberties)
for s in group.stones:
liberty_counts[s] = num_libs
lib_tracker.liberty_cache = liberty_counts
return lib_tracker
def __init__(self, group_index=None, groups=None, liberty_cache=None, max_group_id=1):
# group_index: a NxN numpy array of group_ids. -1 means no group
# groups: a dict of group_id to groups
# liberty_cache: a NxN numpy array of liberty counts
self.group_index = group_index if group_index is not None else -np.ones([N, N], dtype=np.int32)
self.groups = groups or {}
self.liberty_cache = liberty_cache if liberty_cache is not None else np.zeros([N, N], dtype=np.uint8)
self.max_group_id = max_group_id
def __deepcopy__(self, memodict={}):
new_group_index = np.copy(self.group_index)
new_lib_cache = np.copy(self.liberty_cache)
new_groups = {
group.id: Group(group.id, set(group.stones), set(group.liberties), group.color)
for group in self.groups.values()
}
return LibertyTracker(new_group_index, new_groups, liberty_cache=new_lib_cache, max_group_id=self.max_group_id)
def add_stone(self, color, c):
assert self.group_index[c] == MISSING_GROUP_ID
captured_stones = set()
opponent_neighboring_group_ids = set()
friendly_neighboring_group_ids = set()
empty_neighbors = set()
for n in NEIGHBORS[c]:
neighbor_group_id = self.group_index[n]
if neighbor_group_id != MISSING_GROUP_ID:
neighbor_group = self.groups[neighbor_group_id]
if neighbor_group.color == color:
friendly_neighboring_group_ids.add(neighbor_group_id)
else:
opponent_neighboring_group_ids.add(neighbor_group_id)
else:
empty_neighbors.add(n)
new_group = self._create_group(color, c, empty_neighbors)
for group_id in friendly_neighboring_group_ids:
new_group = self._merge_groups(group_id, new_group.id)
for group_id in opponent_neighboring_group_ids:
neighbor_group = self.groups[group_id]
if len(neighbor_group.liberties) == 1:
captured = self._capture_group(group_id)
captured_stones.update(captured)
else:
self._update_liberties(group_id, remove={c})
self._handle_captures(captured_stones)
# suicide is illegal
if len(new_group.liberties) == 0:
raise IllegalMove("Move at {} would commit suicide!\n".format(c))
return captured_stones
def _create_group(self, color, c, liberties):
self.max_group_id += 1
new_group = Group(self.max_group_id, set([c]), liberties, color)
self.groups[new_group.id] = new_group
self.group_index[c] = new_group.id
self.liberty_cache[c] = len(liberties)
return new_group
def _merge_groups(self, group1_id, group2_id):
group1 = self.groups[group1_id]
group2 = self.groups[group2_id]
group1.stones.update(group2.stones)
del self.groups[group2_id]
for s in group2.stones:
self.group_index[s] = group1_id
self._update_liberties(group1_id, add=group2.liberties, remove=(group2.stones | group1.stones))
return group1
def _capture_group(self, group_id):
dead_group = self.groups[group_id]
del self.groups[group_id]
for s in dead_group.stones:
self.group_index[s] = MISSING_GROUP_ID
self.liberty_cache[s] = 0
return dead_group.stones
def _update_liberties(self, group_id, add=None, remove=None):
group = self.groups[group_id]
if add:
group.liberties.update(add)
if remove:
group.liberties.difference_update(remove)
new_lib_count = len(group.liberties)
for s in group.stones:
self.liberty_cache[s] = new_lib_count
def _handle_captures(self, captured_stones):
for s in captured_stones:
for n in NEIGHBORS[s]:
group_id = self.group_index[n]
if group_id != MISSING_GROUP_ID:
self._update_liberties(group_id, add={s})
class Position():
def __init__(self, board=None, n=0, komi=7.5, caps=(0, 0), lib_tracker=None, ko=None, recent=tuple(),
to_play=BLACK):
'''
board: a numpy array
n: an int representing moves played so far
komi: a float, representing points given to the second player.
caps: a (int, int) tuple of captures for B, W.
lib_tracker: a LibertyTracker object
ko: a Move
recent: a tuple of PlayerMoves, such that recent[-1] is the last move.
to_play: BLACK or WHITE
'''
self.board = board if board is not None else np.copy(EMPTY_BOARD)
self.n = n
self.komi = komi
self.caps = caps
self.lib_tracker = lib_tracker or LibertyTracker.from_board(self.board)
self.ko = ko
self.recent = recent
self.to_play = to_play
def __deepcopy__(self, memodict={}):
new_board = np.copy(self.board)
new_lib_tracker = copy.deepcopy(self.lib_tracker)
return Position(new_board, self.n, self.komi, self.caps, new_lib_tracker, self.ko, self.recent, self.to_play)
def __str__(self):
pretty_print_map = {
WHITE: '\x1b[0;31;47mO',
EMPTY: '\x1b[0;31;43m.',
BLACK: '\x1b[0;31;40mX',
FILL: '#',
KO: '*',
}
board = np.copy(self.board)
captures = self.caps
if self.ko is not None:
place_stones(board, KO, [self.ko])
raw_board_contents = []
for i in range(N):
row = []
for j in range(N):
appended = '<' if (self.recent and (i, j) == self.recent[-1].move) else ' '
row.append(pretty_print_map[board[i, j]] + appended)
row.append('\x1b[0m')
raw_board_contents.append(''.join(row))
row_labels = ['%2d ' % i for i in range(N, 0, -1)]
annotated_board_contents = [''.join(r) for r in zip(row_labels, raw_board_contents, row_labels)]
header_footer_rows = [' ' + ' '.join('ABCDEFGHJKLMNOPQRST'[:N]) + ' ']
annotated_board = '\n'.join(itertools.chain(header_footer_rows, annotated_board_contents, header_footer_rows))
details = "\nMove: {}. Captures X: {} O: {}\n".format(self.n, *captures)
return annotated_board + details
def is_move_suicidal(self, move):
potential_libs = set()
for n in NEIGHBORS[move]:
neighbor_group_id = self.lib_tracker.group_index[n]
if neighbor_group_id == MISSING_GROUP_ID:
# at least one liberty after playing here, so not a suicide
return False
neighbor_group = self.lib_tracker.groups[neighbor_group_id]
if neighbor_group.color == self.to_play:
potential_libs |= neighbor_group.liberties
elif len(neighbor_group.liberties) == 1:
# would capture an opponent group if they only had one lib.
return False
# it's possible to suicide by connecting several friendly groups
# each of which had one liberty.
potential_libs -= set([move])
return not potential_libs
def is_move_legal(self, move):
'Checks that a move is on an empty space, not on ko, and not suicide'
if move is None:
def _is_qi(self, color, vertex):
nei = self._neighbor(vertex)
for n in nei:
if self.game.board[self.game._flatten(n)] == utils.EMPTY:
return True
if self.board[move] != EMPTY:
self.game.board[self.game._flatten(vertex)] = color
for n in nei:
if self.game.board[self.game._flatten(n)] == utils.another_color(color):
can_kill, block = self._find_block(n)
if can_kill:
self.game.board[self.game._flatten(vertex)] = utils.EMPTY
return True
### can not suicide
can_kill, block = self._find_block(vertex)
if can_kill:
self.game.board[self.game._flatten(vertex)] = utils.EMPTY
return False
if move == self.ko:
self.game.board[self.game._flatten(vertex)] = utils.EMPTY
return True
def _check_global_isomorphous(self, color, vertex):
##backup
_board = copy.copy(self.game.board)
self.game.board[self.game._flatten(vertex)] = color
self._process_board(color, vertex)
if self.game.board in self.game.history:
res = True
else:
res = False
self.game.board = _board
return res
def _in_board(self, vertex):
x, y = vertex
if x < 1 or x > self.game.size: return False
if y < 1 or y > self.game.size: return False
return True
def _neighbor(self, vertex):
x, y = vertex
nei = []
for d in NEIGHBOR_OFFSET:
_x = x + d[0]
_y = y + d[1]
if self._in_board((_x, _y)):
nei.append((_x, _y))
return nei
def _process_board(self, color, vertex):
nei = self._neighbor(vertex)
for n in nei:
if self.game.board[self.game._flatten(n)] == utils.another_color(color):
can_kill, block = self._find_block(n, alive_break=True)
if can_kill:
for b in block:
self.game.board[self.game._flatten(b)] = utils.EMPTY
def is_valid(self, color, vertex):
### in board
if not self._in_board(vertex):
return False
if self.is_move_suicidal(move):
### already have stone
if not self.game.board[self.game._flatten(vertex)] == utils.EMPTY:
return False
### check if it is qi
if not self._is_qi(color, vertex):
return False
if self._check_global_isomorphous(color, vertex):
return False
return True
def pass_move(self, mutate=False):
pos = self if mutate else copy.deepcopy(self)
pos.n += 1
pos.recent += (PlayerMove(pos.to_play, None),)
pos.to_play *= -1
pos.ko = None
return pos
def do_move(self, color, vertex):
if not self.is_valid(color, vertex):
return False
self.game.board[self.game._flatten(vertex)] = color
self._process_board(color, vertex)
self.game.history.append(copy.copy(self.game.board))
self.game.past.append(copy.copy(self.game.board))
return True
def flip_playerturn(self, mutate=False):
pos = self if mutate else copy.deepcopy(self)
pos.ko = None
pos.to_play *= -1
return pos
def _find_empty(self):
idx = [i for i,x in enumerate(self.game.board) if x == utils.EMPTY ][0]
return self.game._deflatten(idx)
def get_liberties(self):
return self.lib_tracker.liberty_cache
def play_move(self, c, color=None, mutate=False):
# Obeys CGOS Rules of Play. In short:
# No suicides
# Chinese/area scoring
# Positional superko (this is very crudely approximate at the moment.)
if color is None:
color = self.to_play
pos = self if mutate else copy.deepcopy(self)
if c is None:
pos = pos.pass_move(mutate=mutate)
return pos
if not self.is_move_legal(c):
raise IllegalMove("Move at {} is illegal: \n{}".format(c, self))
# check must be done before potentially mutating the board
potential_ko = is_koish(self.board, c)
place_stones(pos.board, color, [c])
captured_stones = pos.lib_tracker.add_stone(color, c)
place_stones(pos.board, EMPTY, captured_stones)
opp_color = color * -1
if len(captured_stones) == 1 and potential_ko == opp_color:
new_ko = list(captured_stones)[0]
def get_score(self, is_unknown_estimation = False):
'''
is_unknown_estimation: whether use nearby stone to predict the unknown
return score from BLACK perspective.
'''
_board = copy.copy(self.game.board)
while utils.EMPTY in self.game.board:
vertex = self._find_empty()
boarder = self._find_boarder(vertex)
boarder_color = set(map(lambda v: self.game.board[self.game._flatten(v)], boarder))
if boarder_color == {utils.BLACK}:
self.game.board[self.game._flatten(vertex)] = utils.BLACK
elif boarder_color == {utils.WHITE}:
self.game.board[self.game._flatten(vertex)] = utils.WHITE
elif is_unknown_estimation:
self.game.board[self.game._flatten(vertex)] = self._predict_from_nearby(vertex)
else:
new_ko = None
self.game.board[self.game._flatten(vertex)] =utils.UNKNOWN
score = 0
for i in self.game.board:
if i == utils.BLACK:
score += 1
elif i == utils.WHITE:
score -= 1
score -= self.game.komi
if pos.to_play == BLACK:
new_caps = (pos.caps[0] + len(captured_stones), pos.caps[1])
else:
new_caps = (pos.caps[0], pos.caps[1] + len(captured_stones))
self.game.board = _board
return score
pos.n += 1
pos.caps = new_caps
pos.ko = new_ko
pos.recent += (PlayerMove(color, c),)
pos.to_play *= -1
return pos
def _predict_from_nearby(self, vertex, neighbor_step = 3):
'''
step: the nearby 3 steps is considered
:vertex: position to be estimated
:neighbor_step: how many steps nearby
:return: the nearby positions of the input position
currently the nearby 3*3 grid is returned, altogether 4*8 points involved
'''
for step in range(1, neighbor_step + 1): # check the stones within the steps in range
neighbor_vertex_set = []
self._add_nearby_stones(neighbor_vertex_set, vertex[0] - step, vertex[1], 1, 1, neighbor_step)
self._add_nearby_stones(neighbor_vertex_set, vertex[0], vertex[1] + step, 1, -1, neighbor_step)
self._add_nearby_stones(neighbor_vertex_set, vertex[0] + step, vertex[1], -1, -1, neighbor_step)
self._add_nearby_stones(neighbor_vertex_set, vertex[0], vertex[1] - step, -1, 1, neighbor_step)
color_estimate = 0
for neighbor_vertex in neighbor_vertex_set:
color_estimate += self.game.board[self.game._flatten(neighbor_vertex)]
if color_estimate > 0:
return utils.BLACK
elif color_estimate < 0:
return utils.WHITE
def score(self):
'Return score from B perspective. If W is winning, score is negative.'
working_board = np.copy(self.board)
while EMPTY in working_board:
unassigned_spaces = np.where(working_board == EMPTY)
c = unassigned_spaces[0][0], unassigned_spaces[1][0]
territory, borders = find_reached(working_board, c)
border_colors = set(working_board[b] for b in borders)
X_border = BLACK in border_colors
O_border = WHITE in border_colors
if X_border and not O_border:
territory_color = BLACK
elif O_border and not X_border:
territory_color = WHITE
else:
territory_color = UNKNOWN # dame, or seki
place_stones(working_board, territory_color, territory)
return np.count_nonzero(working_board == BLACK) - np.count_nonzero(working_board == WHITE) - self.komi
def result(self):
score = self.score()
if score > 0:
return 'B+' + '%.1f' % score
elif score < 0:
return 'W+' + '%.1f' % abs(score)
else:
return 'DRAW'
set_board_size(19)
def _add_nearby_stones(self, neighbor_vertex_set, start_vertex_x, start_vertex_y, x_diff, y_diff, num_step):
'''
add the nearby stones around the input vertex
:param neighbor_vertex_set: input list
:param start_vertex_x: x axis of the input vertex
:param start_vertex_y: y axis of the input vertex
:param x_diff: add x axis
:param y_diff: add y axis
:param num_step: number of steps to be added
:return:
'''
for step in xrange(num_step):
new_neighbor_vertex = (start_vertex_x, start_vertex_y)
if self._in_board(new_neighbor_vertex):
neighbor_vertex_set.append((start_vertex_x, start_vertex_y))
start_vertex_x += x_diff
start_vertex_y += y_diff

View File

@ -1,70 +0,0 @@
import gtp
import go
import utils
def translate_gtp_colors(gtp_color):
if gtp_color == gtp.BLACK:
return go.BLACK
elif gtp_color == gtp.WHITE:
return go.WHITE
else:
return go.EMPTY
class GtpInterface(object):
def __init__(self):
self.size = 9
self.position = None
self.komi = 6.5
self.clear()
def set_size(self, n):
self.size = n
go.set_board_size(n)
self.clear()
def set_komi(self, komi):
self.komi = komi
self.position.komi = komi
def clear(self):
self.position = go.Position(komi=self.komi)
def accomodate_out_of_turn(self, color):
if not translate_gtp_colors(color) == self.position.to_play:
self.position.flip_playerturn(mutate=True)
def make_move(self, color, vertex):
coords = utils.parse_pygtp_coords(vertex)
self.accomodate_out_of_turn(color)
try:
self.position = self.position.play_move(coords, color=translate_gtp_colors(color))
except go.IllegalMove:
return False
return True
def get_move(self, color):
self.accomodate_out_of_turn(color)
if self.should_resign(self.position):
return gtp.RESIGN
if self.should_pass(self.position):
return gtp.PASS
move = self.suggest_move(self.position)
return utils.unparse_pygtp_coords(move)
def should_resign(self, position):
if position.caps[0] + 50 < position.caps[1]:
return gtp.RESIGN
def should_pass(self, position):
# Pass if the opponent passes
return position.n > 100 and position.recent and position.recent[-1].move == None
def get_score(self):
return self.position.result()
def suggest_move(self, position):
raise NotImplementedError

View File

@ -13,12 +13,11 @@ print "Start Name Sever : " + str(start_new_server.pid)# + str(start_new_server.
time.sleep(1)
agent_v0 = subprocess.Popen(['python', '-u', 'player.py', '--role=black'],
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
time.sleep(3)
print "Start Player 0 at : " + str(agent_v0.pid)
agent_v1 = subprocess.Popen(['python', '-u', 'player.py', '--role=white', '--checkpoint_path=./checkpoints_origin/'],
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
time.sleep(3)
print "Start Player 1 at : " + str(agent_v1.pid)
time.sleep(5)
player = [None] * 2
player[0] = Pyro4.Proxy("PYRONAME:black")

View File

@ -8,6 +8,10 @@ from engine import GTPEngine
@Pyro4.expose
class Player(object):
"""
This is the class which defines the object called by Pyro4 (Python remote object).
It passes the command to our engine, and return the result.
"""
def __init__(self, **kwargs):
self.role = kwargs['role']
self.engine = kwargs['engine']

View File

@ -13,7 +13,6 @@ from tianshou.core.mcts.mcts import MCTS
DELTA = [[1, 0], [-1, 0], [0, -1], [0, 1]]
CORNER_OFFSET = [[-1, -1], [-1, 1], [1, 1], [1, -1]]
class GoEnv:
def __init__(self, size=9, komi=6.5):
self.size = size
@ -221,37 +220,3 @@ class GoEnv:
np.array(1 - state[:, :, :, -1]).reshape(1, self.size, self.size, 1)],
axis=3)
return new_state, 0
class strategy(object):
def __init__(self, checkpoint_path):
self.simulator = GoEnv()
self.net = network_small.Network()
self.sess = self.net.forward(checkpoint_path)
self.evaluator = lambda state: self.sess.run([tf.nn.softmax(self.net.p), self.net.v],
feed_dict={self.net.x: state, self.net.is_training: False})
def data_process(self, history, color):
state = np.zeros([1, self.simulator.size, self.simulator.size, 17])
for i in range(8):
state[0, :, :, i] = np.array(np.array(history[i]) == np.ones(self.simulator.size ** 2)).reshape(self.simulator.size, self.simulator.size)
state[0, :, :, i + 8] = np.array(np.array(history[i]) == -np.ones(self.simulator.size ** 2)).reshape(self.simulator.size, self.simulator.size)
if color == utils.BLACK:
state[0, :, :, 16] = np.ones([self.simulator.size, self.simulator.size])
if color == utils.WHITE:
state[0, :, :, 16] = np.zeros([self.simulator.size, self.simulator.size])
return state
def gen_move(self, history, color):
self.simulator.history = copy.copy(history)
self.simulator.board = copy.copy(history[-1])
state = self.data_process(self.simulator.history, color)
mcts = MCTS(self.simulator, self.evaluator, state, self.simulator.size ** 2 + 1, inverse=True, max_step=10)
temp = 1
prob = mcts.root.N ** temp / np.sum(mcts.root.N ** temp)
choice = np.random.choice(self.simulator.size ** 2 + 1, 1, p=prob).tolist()[0]
if choice == self.simulator.size ** 2:
move = utils.PASS
else:
move = (choice % self.simulator.size + 1, choice / self.simulator.size + 1)
return move, prob

View File

@ -168,6 +168,7 @@ class MCTS(object):
if max_step is None and max_time is None:
raise ValueError("Need a stop criteria!")
# TODO: running mcts should be implemented in another function, e.g. def search(self, max_step, max_time)
self.select_time = []
self.evaluate_time = []
self.bp_time = []