diff --git a/AlphaGo/README.md b/AlphaGo/README.md index d21b9bd..720c4d0 100644 --- a/AlphaGo/README.md +++ b/AlphaGo/README.md @@ -10,3 +10,7 @@ Connecting our own policy-value neural network with leela-zero. ## checkpoints: Weights of the policy-value neural network + + +## File Specification + diff --git a/AlphaGo/engine.py b/AlphaGo/engine.py index 716d40b..1f9af85 100644 --- a/AlphaGo/engine.py +++ b/AlphaGo/engine.py @@ -188,7 +188,6 @@ class GTPEngine(): def cmd_show_board(self, args, **kwargs): return self._game.board, True - if __name__ == "main": game = Game() engine = GTPEngine(game_obj=Game) diff --git a/AlphaGo/game.py b/AlphaGo/game.py index 919a5d5..360921e 100644 --- a/AlphaGo/game.py +++ b/AlphaGo/game.py @@ -10,242 +10,49 @@ import copy import tensorflow as tf import numpy as np import sys +import go +import network_small +import strategy from collections import deque +from tianshou.core.mcts.mcts import MCTS import Network -from strategy import strategy - -''' -(1, 1) is considered as the upper left corner of the board, -(size, 1) is the lower left -''' - -DELTA = [[1, 0], [-1, 0], [0, -1], [0, 1]] - - -class Executor: - def __init__(self, **kwargs): - self.game = kwargs['game'] - - def _bfs(self, vertex, color, block, status, alive_break): - block.append(vertex) - status[self.game._flatten(vertex)] = True - nei = self._neighbor(vertex) - for n in nei: - if not status[self.game._flatten(n)]: - if self.game.board[self.game._flatten(n)] == color: - self._bfs(n, color, block, status, alive_break) - - def _find_block(self, vertex, alive_break=False): - block = [] - status = [False] * (self.game.size * self.game.size) - color = self.game.board[self.game._flatten(vertex)] - self._bfs(vertex, color, block, status, alive_break) - - for b in block: - for n in self._neighbor(b): - if self.game.board[self.game._flatten(n)] == utils.EMPTY: - return False, block - return True, block - - def _find_boarder(self, vertex): - block = [] - status = [False] * (self.game.size * self.game.size) - self._bfs(vertex, utils.EMPTY, block, status, False) - border = [] - for b in block: - for n in self._neighbor(b): - if not (n in block): - border.append(n) - return border - - def _is_qi(self, color, vertex): - nei = self._neighbor(vertex) - for n in nei: - if self.game.board[self.game._flatten(n)] == utils.EMPTY: - return True - - self.game.board[self.game._flatten(vertex)] = color - for n in nei: - if self.game.board[self.game._flatten(n)] == utils.another_color(color): - can_kill, block = self._find_block(n) - if can_kill: - self.game.board[self.game._flatten(vertex)] = utils.EMPTY - return True - - ### can not suicide - can_kill, block = self._find_block(vertex) - if can_kill: - self.game.board[self.game._flatten(vertex)] = utils.EMPTY - return False - - self.game.board[self.game._flatten(vertex)] = utils.EMPTY - return True - - def _check_global_isomorphous(self, color, vertex): - ##backup - _board = copy.copy(self.game.board) - self.game.board[self.game._flatten(vertex)] = color - self._process_board(color, vertex) - if self.game.board in self.game.history: - res = True - else: - res = False - - self.game.board = _board - return res - - def _in_board(self, vertex): - x, y = vertex - if x < 1 or x > self.game.size: return False - if y < 1 or y > self.game.size: return False - return True - - def _neighbor(self, vertex): - x, y = vertex - nei = [] - for d in DELTA: - _x = x + d[0] - _y = y + d[1] - if self._in_board((_x, _y)): - nei.append((_x, _y)) - return nei - - def _process_board(self, color, vertex): - nei = self._neighbor(vertex) - for n in nei: - if self.game.board[self.game._flatten(n)] == utils.another_color(color): - can_kill, block = self._find_block(n, alive_break=True) - if can_kill: - for b in block: - self.game.board[self.game._flatten(b)] = utils.EMPTY - - def is_valid(self, color, vertex): - ### in board - if not self._in_board(vertex): - return False - - ### already have stone - if not self.game.board[self.game._flatten(vertex)] == utils.EMPTY: - return False - - ### check if it is qi - if not self._is_qi(color, vertex): - return False - - if self._check_global_isomorphous(color, vertex): - return False - - return True - - def do_move(self, color, vertex): - if not self.is_valid(color, vertex): - return False - self.game.board[self.game._flatten(vertex)] = color - self._process_board(color, vertex) - self.game.history.append(copy.copy(self.game.board)) - self.game.past.append(copy.copy(self.game.board)) - return True - - def _find_empty(self): - idx = [i for i,x in enumerate(self.game.board) if x == utils.EMPTY ][0] - return self.game._deflatten(idx) - - def get_score(self, is_unknown_estimation = False): - ''' - is_unknown_estimation: whether use nearby stone to predict the unknown - return score from BLACK perspective. - ''' - _board = copy.copy(self.game.board) - while utils.EMPTY in self.game.board: - vertex = self._find_empty() - boarder = self._find_boarder(vertex) - boarder_color = set(map(lambda v: self.game.board[self.game._flatten(v)], boarder)) - if boarder_color == {utils.BLACK}: - self.game.board[self.game._flatten(vertex)] = utils.BLACK - elif boarder_color == {utils.WHITE}: - self.game.board[self.game._flatten(vertex)] = utils.WHITE - elif is_unknown_estimation: - self.game.board[self.game._flatten(vertex)] = self._predict_from_nearby(vertex) - else: - self.game.board[self.game._flatten(vertex)] =utils.UNKNOWN - score = 0 - for i in self.game.board: - if i == utils.BLACK: - score += 1 - elif i == utils.WHITE: - score -= 1 - score -= self.game.komi - - self.game.board = _board - return score - - def _predict_from_nearby(self, vertex, neighbor_step = 3): - ''' - step: the nearby 3 steps is considered - :vertex: position to be estimated - :neighbor_step: how many steps nearby - :return: the nearby positions of the input position - currently the nearby 3*3 grid is returned, altogether 4*8 points involved - ''' - for step in range(1, neighbor_step + 1): # check the stones within the steps in range - neighbor_vertex_set = [] - self._add_nearby_stones(neighbor_vertex_set, vertex[0] - step, vertex[1], 1, 1, neighbor_step) - self._add_nearby_stones(neighbor_vertex_set, vertex[0], vertex[1] + step, 1, -1, neighbor_step) - self._add_nearby_stones(neighbor_vertex_set, vertex[0] + step, vertex[1], -1, -1, neighbor_step) - self._add_nearby_stones(neighbor_vertex_set, vertex[0], vertex[1] - step, -1, 1, neighbor_step) - color_estimate = 0 - for neighbor_vertex in neighbor_vertex_set: - color_estimate += self.game.board[self.game._flatten(neighbor_vertex)] - if color_estimate > 0: - return utils.BLACK - elif color_estimate < 0: - return utils.WHITE - - def _add_nearby_stones(self, neighbor_vertex_set, start_vertex_x, start_vertex_y, x_diff, y_diff, num_step): - ''' - add the nearby stones around the input vertex - :param neighbor_vertex_set: input list - :param start_vertex_x: x axis of the input vertex - :param start_vertex_y: y axis of the input vertex - :param x_diff: add x axis - :param y_diff: add y axis - :param num_step: number of steps to be added - :return: - ''' - for step in xrange(num_step): - new_neighbor_vertex = (start_vertex_x, start_vertex_y) - if self._in_board(new_neighbor_vertex): - neighbor_vertex_set.append((start_vertex_x, start_vertex_y)) - start_vertex_x += x_diff - start_vertex_y += y_diff - - - +#from strategy import strategy class Game: + ''' + Load the real game and trained weights. + + TODO : Maybe merge with the engine class in future, + currently leave it untouched for interacting with Go UI. + ''' def __init__(self, size=9, komi=6.5, checkpoint_path=None): self.size = size self.komi = komi self.board = [utils.EMPTY] * (self.size * self.size) - self.strategy = strategy(checkpoint_path) - # self.strategy = None - self.executor = Executor(game=self) self.history = [] self.past = deque(maxlen=8) for _ in range(8): self.past.append(self.board) + self.executor = go.Go(game=self) + #self.strategy = strategy(checkpoint_path) + + self.simulator = strategy.GoEnv() + self.net = network_small.Network() + self.sess = self.net.forward(checkpoint_path) + self.evaluator = lambda state: self.sess.run([tf.nn.softmax(self.net.p), self.net.v], + feed_dict={self.net.x: state, self.net.is_training: False}) + def _flatten(self, vertex): x, y = vertex return (y - 1) * self.size + (x - 1) def _deflatten(self, idx): x = idx % self.size + 1 - y = idx // self.size + 1 + y = idx // self.size + 1 return (x,y) - def clear(self): self.board = [utils.EMPTY] * (self.size * self.size) self.history = [] @@ -259,8 +66,30 @@ class Game: def set_komi(self, k): self.komi = k - def check_valid(self, color, vertex): - return self.executor.is_valid(color, vertex) + def data_process(self, history, color): + state = np.zeros([1, self.simulator.size, self.simulator.size, 17]) + for i in range(8): + state[0, :, :, i] = np.array(np.array(history[i]) == np.ones(self.simulator.size ** 2)).reshape(self.simulator.size, self.simulator.size) + state[0, :, :, i + 8] = np.array(np.array(history[i]) == -np.ones(self.simulator.size ** 2)).reshape(self.simulator.size, self.simulator.size) + if color == utils.BLACK: + state[0, :, :, 16] = np.ones([self.simulator.size, self.simulator.size]) + if color == utils.WHITE: + state[0, :, :, 16] = np.zeros([self.simulator.size, self.simulator.size]) + return state + + def strategy_gen_move(self, history, color): + self.simulator.history = copy.copy(history) + self.simulator.board = copy.copy(history[-1]) + state = self.data_process(self.simulator.history, color) + mcts = MCTS(self.simulator, self.evaluator, state, self.simulator.size ** 2 + 1, inverse=True, max_step=10) + temp = 1 + prob = mcts.root.N ** temp / np.sum(mcts.root.N ** temp) + choice = np.random.choice(self.simulator.size ** 2 + 1, 1, p=prob).tolist()[0] + if choice == self.simulator.size ** 2: + move = utils.PASS + else: + move = (choice % self.simulator.size + 1, choice / self.simulator.size + 1) + return move, prob def do_move(self, color, vertex): if vertex == utils.PASS: @@ -271,7 +100,7 @@ class Game: def gen_move(self, color): # move = self.strategy.gen_move(color) # return move - move, self.prob = self.strategy.gen_move(self.past, color) + move, self.prob = self.strategy_gen_move(self.past, color) self.do_move(color, move) return move @@ -295,7 +124,6 @@ class Game: print('') sys.stdout.flush() - if __name__ == "__main__": g = Game() g.show_board() diff --git a/AlphaGo/go.py b/AlphaGo/go.py index b83d305..26540e1 100644 --- a/AlphaGo/go.py +++ b/AlphaGo/go.py @@ -1,428 +1,212 @@ -''' -A board is a NxN numpy array. -A Coordinate is a tuple index into the board. -A Move is a (Coordinate c | None). -A PlayerMove is a (Color, Move) tuple -(0, 0) is considered to be the upper left corner of the board, and (18, 0) is the lower left. -''' -from collections import namedtuple +from __future__ import print_function +import utils import copy -import itertools +import sys +from collections import deque -import numpy as np +''' +Settings of the Go game. -# Represent a board as a numpy array, with 0 empty, 1 is black, -1 is white. -# This means that swapping colors is as simple as multiplying array by -1. -WHITE, EMPTY, BLACK, FILL, KO, UNKNOWN = range(-1, 5) +(1, 1) is considered as the upper left corner of the board, +(size, 1) is the lower left +''' + +NEIGHBOR_OFFSET = [[1, 0], [-1, 0], [0, -1], [0, 1]] -class PlayerMove(namedtuple('PlayerMove', ['color', 'move'])): pass +class Go: + def __init__(self, **kwargs): + self.game = kwargs['game'] + def _bfs(self, vertex, color, block, status, alive_break): + block.append(vertex) + status[self.game._flatten(vertex)] = True + nei = self._neighbor(vertex) + for n in nei: + if not status[self.game._flatten(n)]: + if self.game.board[self.game._flatten(n)] == color: + self._bfs(n, color, block, status, alive_break) -# Represents "group not found" in the LibertyTracker object -MISSING_GROUP_ID = -1 + def _find_block(self, vertex, alive_break=False): + block = [] + status = [False] * (self.game.size * self.game.size) + color = self.game.board[self.game._flatten(vertex)] + self._bfs(vertex, color, block, status, alive_break) + for b in block: + for n in self._neighbor(b): + if self.game.board[self.game._flatten(n)] == utils.EMPTY: + return False, block + return True, block -class IllegalMove(Exception): pass + def _find_boarder(self, vertex): + block = [] + status = [False] * (self.game.size * self.game.size) + self._bfs(vertex, utils.EMPTY, block, status, False) + border = [] + for b in block: + for n in self._neighbor(b): + if not (n in block): + border.append(n) + return border + def _is_qi(self, color, vertex): + nei = self._neighbor(vertex) + for n in nei: + if self.game.board[self.game._flatten(n)] == utils.EMPTY: + return True -# these are initialized by set_board_size -N = None -ALL_COORDS = [] -EMPTY_BOARD = None -NEIGHBORS = {} -DIAGONALS = {} + self.game.board[self.game._flatten(vertex)] = color + for n in nei: + if self.game.board[self.game._flatten(n)] == utils.another_color(color): + can_kill, block = self._find_block(n) + if can_kill: + self.game.board[self.game._flatten(vertex)] = utils.EMPTY + return True - -def set_board_size(n): - ''' - Hopefully nobody tries to run both 9x9 and 19x19 game instances at once. - Also, never do "from go import N, W, ALL_COORDS, EMPTY_BOARD". - ''' - global N, ALL_COORDS, EMPTY_BOARD, NEIGHBORS, DIAGONALS - if N == n: return - N = n - ALL_COORDS = [(i, j) for i in range(n) for j in range(n)] - EMPTY_BOARD = np.zeros([n, n], dtype=np.int8) - - def check_bounds(c): - return c[0] % n == c[0] and c[1] % n == c[1] - - NEIGHBORS = {(x, y): list(filter(check_bounds, [(x + 1, y), (x - 1, y), (x, y + 1), (x, y - 1)])) for x, y in - ALL_COORDS} - DIAGONALS = {(x, y): list(filter(check_bounds, [(x + 1, y + 1), (x + 1, y - 1), (x - 1, y + 1), (x - 1, y - 1)])) - for x, y in ALL_COORDS} - - -def place_stones(board, color, stones): - for s in stones: - board[s] = color - - -def find_reached(board, c): - # that can reach from one place - color = board[c] - chain = set([c]) - reached = set() - frontier = [c] - while frontier: - current = frontier.pop() - chain.add(current) - for n in NEIGHBORS[current]: - if board[n] == color and (not n in chain): - frontier.append(n) - elif board[n] != color: - reached.add(n) - return chain, reached - - -def is_koish(board, c): - 'Check if c is surrounded on all sides by 1 color, and return that color' - if board[c] != EMPTY: return None - neighbors = {board[n] for n in NEIGHBORS[c]} - if len(neighbors) == 1 and not EMPTY in neighbors: - return list(neighbors)[0] - else: - return None - - -def is_eyeish(board, c): - 'Check if c is an eye, for the purpose of restricting MC rollouts.' - color = is_koish(board, c) - if color is None: - return None - diagonal_faults = 0 - diagonals = DIAGONALS[c] - if len(diagonals) < 4: - diagonal_faults += 1 - for d in diagonals: - if not board[d] in (color, EMPTY): - diagonal_faults += 1 - if diagonal_faults > 1: - return None - else: - return color - - -class Group(namedtuple('Group', ['id', 'stones', 'liberties', 'color'])): - ''' - stones: a set of Coordinates belonging to this group - liberties: a set of Coordinates that are empty and adjacent to this group. - color: color of this group - ''' - - def __eq__(self, other): - return self.stones == other.stones and self.liberties == other.liberties and self.color == other.color - - -class LibertyTracker(object): - @staticmethod - def from_board(board): - board = np.copy(board) - curr_group_id = 0 - lib_tracker = LibertyTracker() - for color in (WHITE, BLACK): - while color in board: - curr_group_id += 1 - found_color = np.where(board == color) - coord = found_color[0][0], found_color[1][0] - chain, reached = find_reached(board, coord) - liberties = set(r for r in reached if board[r] == EMPTY) - new_group = Group(curr_group_id, chain, liberties, color) - lib_tracker.groups[curr_group_id] = new_group - for s in chain: - lib_tracker.group_index[s] = curr_group_id - place_stones(board, FILL, chain) - - lib_tracker.max_group_id = curr_group_id - - liberty_counts = np.zeros([N, N], dtype=np.uint8) - for group in lib_tracker.groups.values(): - num_libs = len(group.liberties) - for s in group.stones: - liberty_counts[s] = num_libs - lib_tracker.liberty_cache = liberty_counts - - return lib_tracker - - def __init__(self, group_index=None, groups=None, liberty_cache=None, max_group_id=1): - # group_index: a NxN numpy array of group_ids. -1 means no group - # groups: a dict of group_id to groups - # liberty_cache: a NxN numpy array of liberty counts - self.group_index = group_index if group_index is not None else -np.ones([N, N], dtype=np.int32) - self.groups = groups or {} - self.liberty_cache = liberty_cache if liberty_cache is not None else np.zeros([N, N], dtype=np.uint8) - self.max_group_id = max_group_id - - def __deepcopy__(self, memodict={}): - new_group_index = np.copy(self.group_index) - new_lib_cache = np.copy(self.liberty_cache) - new_groups = { - group.id: Group(group.id, set(group.stones), set(group.liberties), group.color) - for group in self.groups.values() - } - return LibertyTracker(new_group_index, new_groups, liberty_cache=new_lib_cache, max_group_id=self.max_group_id) - - def add_stone(self, color, c): - assert self.group_index[c] == MISSING_GROUP_ID - captured_stones = set() - opponent_neighboring_group_ids = set() - friendly_neighboring_group_ids = set() - empty_neighbors = set() - - for n in NEIGHBORS[c]: - neighbor_group_id = self.group_index[n] - if neighbor_group_id != MISSING_GROUP_ID: - neighbor_group = self.groups[neighbor_group_id] - if neighbor_group.color == color: - friendly_neighboring_group_ids.add(neighbor_group_id) - else: - opponent_neighboring_group_ids.add(neighbor_group_id) - else: - empty_neighbors.add(n) - - new_group = self._create_group(color, c, empty_neighbors) - - for group_id in friendly_neighboring_group_ids: - new_group = self._merge_groups(group_id, new_group.id) - - for group_id in opponent_neighboring_group_ids: - neighbor_group = self.groups[group_id] - if len(neighbor_group.liberties) == 1: - captured = self._capture_group(group_id) - captured_stones.update(captured) - else: - self._update_liberties(group_id, remove={c}) - - self._handle_captures(captured_stones) - - # suicide is illegal - if len(new_group.liberties) == 0: - raise IllegalMove("Move at {} would commit suicide!\n".format(c)) - - return captured_stones - - def _create_group(self, color, c, liberties): - self.max_group_id += 1 - new_group = Group(self.max_group_id, set([c]), liberties, color) - self.groups[new_group.id] = new_group - self.group_index[c] = new_group.id - self.liberty_cache[c] = len(liberties) - return new_group - - def _merge_groups(self, group1_id, group2_id): - group1 = self.groups[group1_id] - group2 = self.groups[group2_id] - group1.stones.update(group2.stones) - del self.groups[group2_id] - for s in group2.stones: - self.group_index[s] = group1_id - - self._update_liberties(group1_id, add=group2.liberties, remove=(group2.stones | group1.stones)) - - return group1 - - def _capture_group(self, group_id): - dead_group = self.groups[group_id] - del self.groups[group_id] - for s in dead_group.stones: - self.group_index[s] = MISSING_GROUP_ID - self.liberty_cache[s] = 0 - return dead_group.stones - - def _update_liberties(self, group_id, add=None, remove=None): - group = self.groups[group_id] - if add: - group.liberties.update(add) - if remove: - group.liberties.difference_update(remove) - - new_lib_count = len(group.liberties) - for s in group.stones: - self.liberty_cache[s] = new_lib_count - - def _handle_captures(self, captured_stones): - for s in captured_stones: - for n in NEIGHBORS[s]: - group_id = self.group_index[n] - if group_id != MISSING_GROUP_ID: - self._update_liberties(group_id, add={s}) - - -class Position(): - def __init__(self, board=None, n=0, komi=7.5, caps=(0, 0), lib_tracker=None, ko=None, recent=tuple(), - to_play=BLACK): - ''' - board: a numpy array - n: an int representing moves played so far - komi: a float, representing points given to the second player. - caps: a (int, int) tuple of captures for B, W. - lib_tracker: a LibertyTracker object - ko: a Move - recent: a tuple of PlayerMoves, such that recent[-1] is the last move. - to_play: BLACK or WHITE - ''' - self.board = board if board is not None else np.copy(EMPTY_BOARD) - self.n = n - self.komi = komi - self.caps = caps - self.lib_tracker = lib_tracker or LibertyTracker.from_board(self.board) - self.ko = ko - self.recent = recent - self.to_play = to_play - - def __deepcopy__(self, memodict={}): - new_board = np.copy(self.board) - new_lib_tracker = copy.deepcopy(self.lib_tracker) - return Position(new_board, self.n, self.komi, self.caps, new_lib_tracker, self.ko, self.recent, self.to_play) - - def __str__(self): - pretty_print_map = { - WHITE: '\x1b[0;31;47mO', - EMPTY: '\x1b[0;31;43m.', - BLACK: '\x1b[0;31;40mX', - FILL: '#', - KO: '*', - } - board = np.copy(self.board) - captures = self.caps - if self.ko is not None: - place_stones(board, KO, [self.ko]) - raw_board_contents = [] - for i in range(N): - row = [] - for j in range(N): - appended = '<' if (self.recent and (i, j) == self.recent[-1].move) else ' ' - row.append(pretty_print_map[board[i, j]] + appended) - row.append('\x1b[0m') - raw_board_contents.append(''.join(row)) - - row_labels = ['%2d ' % i for i in range(N, 0, -1)] - annotated_board_contents = [''.join(r) for r in zip(row_labels, raw_board_contents, row_labels)] - header_footer_rows = [' ' + ' '.join('ABCDEFGHJKLMNOPQRST'[:N]) + ' '] - annotated_board = '\n'.join(itertools.chain(header_footer_rows, annotated_board_contents, header_footer_rows)) - details = "\nMove: {}. Captures X: {} O: {}\n".format(self.n, *captures) - return annotated_board + details - - def is_move_suicidal(self, move): - potential_libs = set() - for n in NEIGHBORS[move]: - neighbor_group_id = self.lib_tracker.group_index[n] - if neighbor_group_id == MISSING_GROUP_ID: - # at least one liberty after playing here, so not a suicide - return False - neighbor_group = self.lib_tracker.groups[neighbor_group_id] - if neighbor_group.color == self.to_play: - potential_libs |= neighbor_group.liberties - elif len(neighbor_group.liberties) == 1: - # would capture an opponent group if they only had one lib. - return False - # it's possible to suicide by connecting several friendly groups - # each of which had one liberty. - potential_libs -= set([move]) - return not potential_libs - - def is_move_legal(self, move): - 'Checks that a move is on an empty space, not on ko, and not suicide' - if move is None: - return True - if self.board[move] != EMPTY: + ### can not suicide + can_kill, block = self._find_block(vertex) + if can_kill: + self.game.board[self.game._flatten(vertex)] = utils.EMPTY return False - if move == self.ko: + + self.game.board[self.game._flatten(vertex)] = utils.EMPTY + return True + + def _check_global_isomorphous(self, color, vertex): + ##backup + _board = copy.copy(self.game.board) + self.game.board[self.game._flatten(vertex)] = color + self._process_board(color, vertex) + if self.game.board in self.game.history: + res = True + else: + res = False + + self.game.board = _board + return res + + def _in_board(self, vertex): + x, y = vertex + if x < 1 or x > self.game.size: return False + if y < 1 or y > self.game.size: return False + return True + + def _neighbor(self, vertex): + x, y = vertex + nei = [] + for d in NEIGHBOR_OFFSET: + _x = x + d[0] + _y = y + d[1] + if self._in_board((_x, _y)): + nei.append((_x, _y)) + return nei + + def _process_board(self, color, vertex): + nei = self._neighbor(vertex) + for n in nei: + if self.game.board[self.game._flatten(n)] == utils.another_color(color): + can_kill, block = self._find_block(n, alive_break=True) + if can_kill: + for b in block: + self.game.board[self.game._flatten(b)] = utils.EMPTY + + def is_valid(self, color, vertex): + ### in board + if not self._in_board(vertex): return False - if self.is_move_suicidal(move): + + ### already have stone + if not self.game.board[self.game._flatten(vertex)] == utils.EMPTY: + return False + + ### check if it is qi + if not self._is_qi(color, vertex): + return False + + if self._check_global_isomorphous(color, vertex): return False return True - def pass_move(self, mutate=False): - pos = self if mutate else copy.deepcopy(self) - pos.n += 1 - pos.recent += (PlayerMove(pos.to_play, None),) - pos.to_play *= -1 - pos.ko = None - return pos + def do_move(self, color, vertex): + if not self.is_valid(color, vertex): + return False + self.game.board[self.game._flatten(vertex)] = color + self._process_board(color, vertex) + self.game.history.append(copy.copy(self.game.board)) + self.game.past.append(copy.copy(self.game.board)) + return True - def flip_playerturn(self, mutate=False): - pos = self if mutate else copy.deepcopy(self) - pos.ko = None - pos.to_play *= -1 - return pos + def _find_empty(self): + idx = [i for i,x in enumerate(self.game.board) if x == utils.EMPTY ][0] + return self.game._deflatten(idx) - def get_liberties(self): - return self.lib_tracker.liberty_cache - - def play_move(self, c, color=None, mutate=False): - # Obeys CGOS Rules of Play. In short: - # No suicides - # Chinese/area scoring - # Positional superko (this is very crudely approximate at the moment.) - if color is None: - color = self.to_play - - pos = self if mutate else copy.deepcopy(self) - - if c is None: - pos = pos.pass_move(mutate=mutate) - return pos - - if not self.is_move_legal(c): - raise IllegalMove("Move at {} is illegal: \n{}".format(c, self)) - - # check must be done before potentially mutating the board - potential_ko = is_koish(self.board, c) - - place_stones(pos.board, color, [c]) - captured_stones = pos.lib_tracker.add_stone(color, c) - place_stones(pos.board, EMPTY, captured_stones) - - opp_color = color * -1 - - if len(captured_stones) == 1 and potential_ko == opp_color: - new_ko = list(captured_stones)[0] - else: - new_ko = None - - if pos.to_play == BLACK: - new_caps = (pos.caps[0] + len(captured_stones), pos.caps[1]) - else: - new_caps = (pos.caps[0], pos.caps[1] + len(captured_stones)) - - pos.n += 1 - pos.caps = new_caps - pos.ko = new_ko - pos.recent += (PlayerMove(color, c),) - pos.to_play *= -1 - return pos - - def score(self): - 'Return score from B perspective. If W is winning, score is negative.' - working_board = np.copy(self.board) - while EMPTY in working_board: - unassigned_spaces = np.where(working_board == EMPTY) - c = unassigned_spaces[0][0], unassigned_spaces[1][0] - territory, borders = find_reached(working_board, c) - border_colors = set(working_board[b] for b in borders) - X_border = BLACK in border_colors - O_border = WHITE in border_colors - if X_border and not O_border: - territory_color = BLACK - elif O_border and not X_border: - territory_color = WHITE + def get_score(self, is_unknown_estimation = False): + ''' + is_unknown_estimation: whether use nearby stone to predict the unknown + return score from BLACK perspective. + ''' + _board = copy.copy(self.game.board) + while utils.EMPTY in self.game.board: + vertex = self._find_empty() + boarder = self._find_boarder(vertex) + boarder_color = set(map(lambda v: self.game.board[self.game._flatten(v)], boarder)) + if boarder_color == {utils.BLACK}: + self.game.board[self.game._flatten(vertex)] = utils.BLACK + elif boarder_color == {utils.WHITE}: + self.game.board[self.game._flatten(vertex)] = utils.WHITE + elif is_unknown_estimation: + self.game.board[self.game._flatten(vertex)] = self._predict_from_nearby(vertex) else: - territory_color = UNKNOWN # dame, or seki - place_stones(working_board, territory_color, territory) + self.game.board[self.game._flatten(vertex)] =utils.UNKNOWN + score = 0 + for i in self.game.board: + if i == utils.BLACK: + score += 1 + elif i == utils.WHITE: + score -= 1 + score -= self.game.komi - return np.count_nonzero(working_board == BLACK) - np.count_nonzero(working_board == WHITE) - self.komi + self.game.board = _board + return score - def result(self): - score = self.score() - if score > 0: - return 'B+' + '%.1f' % score - elif score < 0: - return 'W+' + '%.1f' % abs(score) - else: - return 'DRAW' + def _predict_from_nearby(self, vertex, neighbor_step = 3): + ''' + step: the nearby 3 steps is considered + :vertex: position to be estimated + :neighbor_step: how many steps nearby + :return: the nearby positions of the input position + currently the nearby 3*3 grid is returned, altogether 4*8 points involved + ''' + for step in range(1, neighbor_step + 1): # check the stones within the steps in range + neighbor_vertex_set = [] + self._add_nearby_stones(neighbor_vertex_set, vertex[0] - step, vertex[1], 1, 1, neighbor_step) + self._add_nearby_stones(neighbor_vertex_set, vertex[0], vertex[1] + step, 1, -1, neighbor_step) + self._add_nearby_stones(neighbor_vertex_set, vertex[0] + step, vertex[1], -1, -1, neighbor_step) + self._add_nearby_stones(neighbor_vertex_set, vertex[0], vertex[1] - step, -1, 1, neighbor_step) + color_estimate = 0 + for neighbor_vertex in neighbor_vertex_set: + color_estimate += self.game.board[self.game._flatten(neighbor_vertex)] + if color_estimate > 0: + return utils.BLACK + elif color_estimate < 0: + return utils.WHITE - -set_board_size(19) + def _add_nearby_stones(self, neighbor_vertex_set, start_vertex_x, start_vertex_y, x_diff, y_diff, num_step): + ''' + add the nearby stones around the input vertex + :param neighbor_vertex_set: input list + :param start_vertex_x: x axis of the input vertex + :param start_vertex_y: y axis of the input vertex + :param x_diff: add x axis + :param y_diff: add y axis + :param num_step: number of steps to be added + :return: + ''' + for step in xrange(num_step): + new_neighbor_vertex = (start_vertex_x, start_vertex_y) + if self._in_board(new_neighbor_vertex): + neighbor_vertex_set.append((start_vertex_x, start_vertex_y)) + start_vertex_x += x_diff + start_vertex_y += y_diff diff --git a/AlphaGo/gtp_wrapper.py b/AlphaGo/gtp_wrapper.py deleted file mode 100644 index 1da8f03..0000000 --- a/AlphaGo/gtp_wrapper.py +++ /dev/null @@ -1,70 +0,0 @@ -import gtp -import go -import utils - - -def translate_gtp_colors(gtp_color): - if gtp_color == gtp.BLACK: - return go.BLACK - elif gtp_color == gtp.WHITE: - return go.WHITE - else: - return go.EMPTY - - -class GtpInterface(object): - def __init__(self): - self.size = 9 - self.position = None - self.komi = 6.5 - self.clear() - - def set_size(self, n): - self.size = n - go.set_board_size(n) - self.clear() - - def set_komi(self, komi): - self.komi = komi - self.position.komi = komi - - def clear(self): - self.position = go.Position(komi=self.komi) - - def accomodate_out_of_turn(self, color): - if not translate_gtp_colors(color) == self.position.to_play: - self.position.flip_playerturn(mutate=True) - - def make_move(self, color, vertex): - coords = utils.parse_pygtp_coords(vertex) - self.accomodate_out_of_turn(color) - try: - self.position = self.position.play_move(coords, color=translate_gtp_colors(color)) - except go.IllegalMove: - return False - return True - - def get_move(self, color): - self.accomodate_out_of_turn(color) - if self.should_resign(self.position): - return gtp.RESIGN - - if self.should_pass(self.position): - return gtp.PASS - - move = self.suggest_move(self.position) - return utils.unparse_pygtp_coords(move) - - def should_resign(self, position): - if position.caps[0] + 50 < position.caps[1]: - return gtp.RESIGN - - def should_pass(self, position): - # Pass if the opponent passes - return position.n > 100 and position.recent and position.recent[-1].move == None - - def get_score(self): - return self.position.result() - - def suggest_move(self, position): - raise NotImplementedError diff --git a/AlphaGo/play.py b/AlphaGo/play.py index 18ce869..180186a 100644 --- a/AlphaGo/play.py +++ b/AlphaGo/play.py @@ -13,12 +13,11 @@ print "Start Name Sever : " + str(start_new_server.pid)# + str(start_new_server. time.sleep(1) agent_v0 = subprocess.Popen(['python', '-u', 'player.py', '--role=black'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) -time.sleep(3) print "Start Player 0 at : " + str(agent_v0.pid) agent_v1 = subprocess.Popen(['python', '-u', 'player.py', '--role=white', '--checkpoint_path=./checkpoints_origin/'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) -time.sleep(3) print "Start Player 1 at : " + str(agent_v1.pid) +time.sleep(5) player = [None] * 2 player[0] = Pyro4.Proxy("PYRONAME:black") diff --git a/AlphaGo/player.py b/AlphaGo/player.py index 36965a9..8245c38 100644 --- a/AlphaGo/player.py +++ b/AlphaGo/player.py @@ -8,6 +8,10 @@ from engine import GTPEngine @Pyro4.expose class Player(object): + """ + This is the class which defines the object called by Pyro4 (Python remote object). + It passes the command to our engine, and return the result. + """ def __init__(self, **kwargs): self.role = kwargs['role'] self.engine = kwargs['engine'] diff --git a/AlphaGo/strategy.py b/AlphaGo/strategy.py index 327111d..5a55002 100644 --- a/AlphaGo/strategy.py +++ b/AlphaGo/strategy.py @@ -13,7 +13,6 @@ from tianshou.core.mcts.mcts import MCTS DELTA = [[1, 0], [-1, 0], [0, -1], [0, 1]] CORNER_OFFSET = [[-1, -1], [-1, 1], [1, 1], [1, -1]] - class GoEnv: def __init__(self, size=9, komi=6.5): self.size = size @@ -221,37 +220,3 @@ class GoEnv: np.array(1 - state[:, :, :, -1]).reshape(1, self.size, self.size, 1)], axis=3) return new_state, 0 - - -class strategy(object): - def __init__(self, checkpoint_path): - self.simulator = GoEnv() - self.net = network_small.Network() - self.sess = self.net.forward(checkpoint_path) - self.evaluator = lambda state: self.sess.run([tf.nn.softmax(self.net.p), self.net.v], - feed_dict={self.net.x: state, self.net.is_training: False}) - - def data_process(self, history, color): - state = np.zeros([1, self.simulator.size, self.simulator.size, 17]) - for i in range(8): - state[0, :, :, i] = np.array(np.array(history[i]) == np.ones(self.simulator.size ** 2)).reshape(self.simulator.size, self.simulator.size) - state[0, :, :, i + 8] = np.array(np.array(history[i]) == -np.ones(self.simulator.size ** 2)).reshape(self.simulator.size, self.simulator.size) - if color == utils.BLACK: - state[0, :, :, 16] = np.ones([self.simulator.size, self.simulator.size]) - if color == utils.WHITE: - state[0, :, :, 16] = np.zeros([self.simulator.size, self.simulator.size]) - return state - - def gen_move(self, history, color): - self.simulator.history = copy.copy(history) - self.simulator.board = copy.copy(history[-1]) - state = self.data_process(self.simulator.history, color) - mcts = MCTS(self.simulator, self.evaluator, state, self.simulator.size ** 2 + 1, inverse=True, max_step=10) - temp = 1 - prob = mcts.root.N ** temp / np.sum(mcts.root.N ** temp) - choice = np.random.choice(self.simulator.size ** 2 + 1, 1, p=prob).tolist()[0] - if choice == self.simulator.size ** 2: - move = utils.PASS - else: - move = (choice % self.simulator.size + 1, choice / self.simulator.size + 1) - return move, prob diff --git a/tianshou/core/mcts/mcts.py b/tianshou/core/mcts/mcts.py index e29d919..47b0768 100644 --- a/tianshou/core/mcts/mcts.py +++ b/tianshou/core/mcts/mcts.py @@ -168,6 +168,7 @@ class MCTS(object): if max_step is None and max_time is None: raise ValueError("Need a stop criteria!") + # TODO: running mcts should be implemented in another function, e.g. def search(self, max_step, max_time) self.select_time = [] self.evaluate_time = [] self.bp_time = []