python utils written by tongzheng
This commit is contained in:
parent
5e0184fa28
commit
889e5c2fb4
410
utils/go.py
Normal file
410
utils/go.py
Normal file
@ -0,0 +1,410 @@
|
||||
'''
|
||||
A board is a NxN numpy array.
|
||||
A Coordinate is a tuple index into the board.
|
||||
A Move is a (Coordinate c | None).
|
||||
A PlayerMove is a (Color, Move) tuple
|
||||
(0, 0) is considered to be the upper left corner of the board, and (18, 0) is the lower left.
|
||||
'''
|
||||
from collections import namedtuple
|
||||
import copy
|
||||
import itertools
|
||||
|
||||
import numpy as np
|
||||
|
||||
# Represent a board as a numpy array, with 0 empty, 1 is black, -1 is white.
|
||||
# This means that swapping colors is as simple as multiplying array by -1.
|
||||
WHITE, EMPTY, BLACK, FILL, KO, UNKNOWN = range(-1, 5)
|
||||
|
||||
class PlayerMove(namedtuple('PlayerMove', ['color', 'move'])): pass
|
||||
|
||||
# Represents "group not found" in the LibertyTracker object
|
||||
MISSING_GROUP_ID = -1
|
||||
|
||||
class IllegalMove(Exception): pass
|
||||
|
||||
# these are initialized by set_board_size
|
||||
N = None
|
||||
ALL_COORDS = []
|
||||
EMPTY_BOARD = None
|
||||
NEIGHBORS = {}
|
||||
DIAGONALS = {}
|
||||
|
||||
def set_board_size(n):
|
||||
'''
|
||||
Hopefully nobody tries to run both 9x9 and 19x19 game instances at once.
|
||||
Also, never do "from go import N, W, ALL_COORDS, EMPTY_BOARD".
|
||||
'''
|
||||
global N, ALL_COORDS, EMPTY_BOARD, NEIGHBORS, DIAGONALS
|
||||
if N == n: return
|
||||
N = n
|
||||
ALL_COORDS = [(i, j) for i in range(n) for j in range(n)]
|
||||
EMPTY_BOARD = np.zeros([n, n], dtype=np.int8)
|
||||
def check_bounds(c):
|
||||
return c[0] % n == c[0] and c[1] % n == c[1]
|
||||
|
||||
NEIGHBORS = {(x, y): list(filter(check_bounds, [(x+1, y), (x-1, y), (x, y+1), (x, y-1)])) for x, y in ALL_COORDS}
|
||||
DIAGONALS = {(x, y): list(filter(check_bounds, [(x+1, y+1), (x+1, y-1), (x-1, y+1), (x-1, y-1)])) for x, y in ALL_COORDS}
|
||||
|
||||
def place_stones(board, color, stones):
|
||||
for s in stones:
|
||||
board[s] = color
|
||||
|
||||
def find_reached(board, c):
|
||||
color = board[c]
|
||||
chain = set([c])
|
||||
reached = set()
|
||||
frontier = [c]
|
||||
while frontier:
|
||||
current = frontier.pop()
|
||||
chain.add(current)
|
||||
for n in NEIGHBORS[current]:
|
||||
if board[n] == color and not n in chain:
|
||||
frontier.append(n)
|
||||
elif board[n] != color:
|
||||
reached.add(n)
|
||||
return chain, reached
|
||||
|
||||
def is_koish(board, c):
|
||||
'Check if c is surrounded on all sides by 1 color, and return that color'
|
||||
if board[c] != EMPTY: return None
|
||||
neighbors = {board[n] for n in NEIGHBORS[c]}
|
||||
if len(neighbors) == 1 and not EMPTY in neighbors:
|
||||
return list(neighbors)[0]
|
||||
else:
|
||||
return None
|
||||
|
||||
def is_eyeish(board, c):
|
||||
'Check if c is an eye, for the purpose of restricting MC rollouts.'
|
||||
color = is_koish(board, c)
|
||||
if color is None:
|
||||
return None
|
||||
diagonal_faults = 0
|
||||
diagonals = DIAGONALS[c]
|
||||
if len(diagonals) < 4:
|
||||
diagonal_faults += 1
|
||||
for d in diagonals:
|
||||
if not board[d] in (color, EMPTY):
|
||||
diagonal_faults += 1
|
||||
if diagonal_faults > 1:
|
||||
return None
|
||||
else:
|
||||
return color
|
||||
|
||||
class Group(namedtuple('Group', ['id', 'stones', 'liberties', 'color'])):
|
||||
'''
|
||||
stones: a set of Coordinates belonging to this group
|
||||
liberties: a set of Coordinates that are empty and adjacent to this group.
|
||||
color: color of this group
|
||||
'''
|
||||
def __eq__(self, other):
|
||||
return self.stones == other.stones and self.liberties == other.liberties and self.color == other.color
|
||||
|
||||
|
||||
class LibertyTracker():
|
||||
@staticmethod
|
||||
def from_board(board):
|
||||
board = np.copy(board)
|
||||
curr_group_id = 0
|
||||
lib_tracker = LibertyTracker()
|
||||
for color in (WHITE, BLACK):
|
||||
while color in board:
|
||||
curr_group_id += 1
|
||||
found_color = np.where(board == color)
|
||||
coord = found_color[0][0], found_color[1][0]
|
||||
chain, reached = find_reached(board, coord)
|
||||
liberties = set(r for r in reached if board[r] == EMPTY)
|
||||
new_group = Group(curr_group_id, chain, liberties, color)
|
||||
lib_tracker.groups[curr_group_id] = new_group
|
||||
for s in chain:
|
||||
lib_tracker.group_index[s] = curr_group_id
|
||||
place_stones(board, FILL, chain)
|
||||
|
||||
lib_tracker.max_group_id = curr_group_id
|
||||
|
||||
liberty_counts = np.zeros([N, N], dtype=np.uint8)
|
||||
for group in lib_tracker.groups.values():
|
||||
num_libs = len(group.liberties)
|
||||
for s in group.stones:
|
||||
liberty_counts[s] = num_libs
|
||||
lib_tracker.liberty_cache = liberty_counts
|
||||
|
||||
return lib_tracker
|
||||
|
||||
def __init__(self, group_index=None, groups=None, liberty_cache=None, max_group_id=1):
|
||||
# group_index: a NxN numpy array of group_ids. -1 means no group
|
||||
# groups: a dict of group_id to groups
|
||||
# liberty_cache: a NxN numpy array of liberty counts
|
||||
self.group_index = group_index if group_index is not None else -np.ones([N, N], dtype=np.int32)
|
||||
self.groups = groups or {}
|
||||
self.liberty_cache = liberty_cache if liberty_cache is not None else np.zeros([N, N], dtype=np.uint8)
|
||||
self.max_group_id = max_group_id
|
||||
|
||||
def __deepcopy__(self, memodict={}):
|
||||
new_group_index = np.copy(self.group_index)
|
||||
new_lib_cache = np.copy(self.liberty_cache)
|
||||
new_groups = {
|
||||
group.id: Group(group.id, set(group.stones), set(group.liberties), group.color)
|
||||
for group in self.groups.values()
|
||||
}
|
||||
return LibertyTracker(new_group_index, new_groups, liberty_cache=new_lib_cache, max_group_id=self.max_group_id)
|
||||
|
||||
def add_stone(self, color, c):
|
||||
assert self.group_index[c] == MISSING_GROUP_ID
|
||||
captured_stones = set()
|
||||
opponent_neighboring_group_ids = set()
|
||||
friendly_neighboring_group_ids = set()
|
||||
empty_neighbors = set()
|
||||
|
||||
for n in NEIGHBORS[c]:
|
||||
neighbor_group_id = self.group_index[n]
|
||||
if neighbor_group_id != MISSING_GROUP_ID:
|
||||
neighbor_group = self.groups[neighbor_group_id]
|
||||
if neighbor_group.color == color:
|
||||
friendly_neighboring_group_ids.add(neighbor_group_id)
|
||||
else:
|
||||
opponent_neighboring_group_ids.add(neighbor_group_id)
|
||||
else:
|
||||
empty_neighbors.add(n)
|
||||
|
||||
new_group = self._create_group(color, c, empty_neighbors)
|
||||
|
||||
for group_id in friendly_neighboring_group_ids:
|
||||
new_group = self._merge_groups(group_id, new_group.id)
|
||||
|
||||
for group_id in opponent_neighboring_group_ids:
|
||||
neighbor_group = self.groups[group_id]
|
||||
if len(neighbor_group.liberties) == 1:
|
||||
captured = self._capture_group(group_id)
|
||||
captured_stones.update(captured)
|
||||
else:
|
||||
self._update_liberties(group_id, remove={c})
|
||||
|
||||
self._handle_captures(captured_stones)
|
||||
|
||||
# suicide is illegal
|
||||
if len(new_group.liberties) == 0:
|
||||
raise IllegalMove("Move at {} would commit suicide!\n".format(c))
|
||||
|
||||
return captured_stones
|
||||
|
||||
def _create_group(self, color, c, liberties):
|
||||
self.max_group_id += 1
|
||||
new_group = Group(self.max_group_id, set([c]), liberties, color)
|
||||
self.groups[new_group.id] = new_group
|
||||
self.group_index[c] = new_group.id
|
||||
self.liberty_cache[c] = len(liberties)
|
||||
return new_group
|
||||
|
||||
def _merge_groups(self, group1_id, group2_id):
|
||||
group1 = self.groups[group1_id]
|
||||
group2 = self.groups[group2_id]
|
||||
group1.stones.update(group2.stones)
|
||||
del self.groups[group2_id]
|
||||
for s in group2.stones:
|
||||
self.group_index[s] = group1_id
|
||||
|
||||
self._update_liberties(group1_id, add=group2.liberties, remove=(group2.stones | group1.stones))
|
||||
|
||||
return group1
|
||||
|
||||
def _capture_group(self, group_id):
|
||||
dead_group = self.groups[group_id]
|
||||
del self.groups[group_id]
|
||||
for s in dead_group.stones:
|
||||
self.group_index[s] = MISSING_GROUP_ID
|
||||
self.liberty_cache[s] = 0
|
||||
return dead_group.stones
|
||||
|
||||
def _update_liberties(self, group_id, add=None, remove=None):
|
||||
group = self.groups[group_id]
|
||||
if add:
|
||||
group.liberties.update(add)
|
||||
if remove:
|
||||
group.liberties.difference_update(remove)
|
||||
|
||||
new_lib_count = len(group.liberties)
|
||||
for s in group.stones:
|
||||
self.liberty_cache[s] = new_lib_count
|
||||
|
||||
def _handle_captures(self, captured_stones):
|
||||
for s in captured_stones:
|
||||
for n in NEIGHBORS[s]:
|
||||
group_id = self.group_index[n]
|
||||
if group_id != MISSING_GROUP_ID:
|
||||
self._update_liberties(group_id, add={s})
|
||||
|
||||
class Position():
|
||||
def __init__(self, board=None, n=0, komi=7.5, caps=(0, 0), lib_tracker=None, ko=None, recent=tuple(), to_play=BLACK):
|
||||
'''
|
||||
board: a numpy array
|
||||
n: an int representing moves played so far
|
||||
komi: a float, representing points given to the second player.
|
||||
caps: a (int, int) tuple of captures for B, W.
|
||||
lib_tracker: a LibertyTracker object
|
||||
ko: a Move
|
||||
recent: a tuple of PlayerMoves, such that recent[-1] is the last move.
|
||||
to_play: BLACK or WHITE
|
||||
'''
|
||||
self.board = board if board is not None else np.copy(EMPTY_BOARD)
|
||||
self.n = n
|
||||
self.komi = komi
|
||||
self.caps = caps
|
||||
self.lib_tracker = lib_tracker or LibertyTracker.from_board(self.board)
|
||||
self.ko = ko
|
||||
self.recent = recent
|
||||
self.to_play = to_play
|
||||
|
||||
def __deepcopy__(self, memodict={}):
|
||||
new_board = np.copy(self.board)
|
||||
new_lib_tracker = copy.deepcopy(self.lib_tracker)
|
||||
return Position(new_board, self.n, self.komi, self.caps, new_lib_tracker, self.ko, self.recent, self.to_play)
|
||||
|
||||
def __str__(self):
|
||||
pretty_print_map = {
|
||||
WHITE: '\x1b[0;31;47mO',
|
||||
EMPTY: '\x1b[0;31;43m.',
|
||||
BLACK: '\x1b[0;31;40mX',
|
||||
FILL: '#',
|
||||
KO: '*',
|
||||
}
|
||||
board = np.copy(self.board)
|
||||
captures = self.caps
|
||||
if self.ko is not None:
|
||||
place_stones(board, KO, [self.ko])
|
||||
raw_board_contents = []
|
||||
for i in range(N):
|
||||
row = []
|
||||
for j in range(N):
|
||||
appended = '<' if (self.recent and (i, j) == self.recent[-1].move) else ' '
|
||||
row.append(pretty_print_map[board[i,j]] + appended)
|
||||
row.append('\x1b[0m')
|
||||
raw_board_contents.append(''.join(row))
|
||||
|
||||
row_labels = ['%2d ' % i for i in range(N, 0, -1)]
|
||||
annotated_board_contents = [''.join(r) for r in zip(row_labels, raw_board_contents, row_labels)]
|
||||
header_footer_rows = [' ' + ' '.join('ABCDEFGHJKLMNOPQRST'[:N]) + ' ']
|
||||
annotated_board = '\n'.join(itertools.chain(header_footer_rows, annotated_board_contents, header_footer_rows))
|
||||
details = "\nMove: {}. Captures X: {} O: {}\n".format(self.n, *captures)
|
||||
return annotated_board + details
|
||||
|
||||
def is_move_suicidal(self, move):
|
||||
potential_libs = set()
|
||||
for n in NEIGHBORS[move]:
|
||||
neighbor_group_id = self.lib_tracker.group_index[n]
|
||||
if neighbor_group_id == MISSING_GROUP_ID:
|
||||
# at least one liberty after playing here, so not a suicide
|
||||
return False
|
||||
neighbor_group = self.lib_tracker.groups[neighbor_group_id]
|
||||
if neighbor_group.color == self.to_play:
|
||||
potential_libs |= neighbor_group.liberties
|
||||
elif len(neighbor_group.liberties) == 1:
|
||||
# would capture an opponent group if they only had one lib.
|
||||
return False
|
||||
# it's possible to suicide by connecting several friendly groups
|
||||
# each of which had one liberty.
|
||||
potential_libs -= set([move])
|
||||
return not potential_libs
|
||||
|
||||
def is_move_legal(self, move):
|
||||
'Checks that a move is on an empty space, not on ko, and not suicide'
|
||||
if move is None:
|
||||
return True
|
||||
if self.board[move] != EMPTY:
|
||||
return False
|
||||
if move == self.ko:
|
||||
return False
|
||||
if self.is_move_suicidal(move):
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def pass_move(self, mutate=False):
|
||||
pos = self if mutate else copy.deepcopy(self)
|
||||
pos.n += 1
|
||||
pos.recent += (PlayerMove(pos.to_play, None),)
|
||||
pos.to_play *= -1
|
||||
pos.ko = None
|
||||
return pos
|
||||
|
||||
def flip_playerturn(self, mutate=False):
|
||||
pos = self if mutate else copy.deepcopy(self)
|
||||
pos.ko = None
|
||||
pos.to_play *= -1
|
||||
return pos
|
||||
|
||||
def get_liberties(self):
|
||||
return self.lib_tracker.liberty_cache
|
||||
|
||||
def play_move(self, c, color=None, mutate=False):
|
||||
# Obeys CGOS Rules of Play. In short:
|
||||
# No suicides
|
||||
# Chinese/area scoring
|
||||
# Positional superko (this is very crudely approximate at the moment.)
|
||||
if color is None:
|
||||
color = self.to_play
|
||||
|
||||
pos = self if mutate else copy.deepcopy(self)
|
||||
|
||||
if c is None:
|
||||
pos = pos.pass_move(mutate=mutate)
|
||||
return pos
|
||||
|
||||
if not self.is_move_legal(c):
|
||||
raise IllegalMove("Move at {} is illegal: \n{}".format(c, self))
|
||||
|
||||
# check must be done before potentially mutating the board
|
||||
potential_ko = is_koish(self.board, c)
|
||||
|
||||
place_stones(pos.board, color, [c])
|
||||
captured_stones = pos.lib_tracker.add_stone(color, c)
|
||||
place_stones(pos.board, EMPTY, captured_stones)
|
||||
|
||||
opp_color = color * -1
|
||||
|
||||
if len(captured_stones) == 1 and potential_ko == opp_color:
|
||||
new_ko = list(captured_stones)[0]
|
||||
else:
|
||||
new_ko = None
|
||||
|
||||
if pos.to_play == BLACK:
|
||||
new_caps = (pos.caps[0] + len(captured_stones), pos.caps[1])
|
||||
else:
|
||||
new_caps = (pos.caps[0], pos.caps[1] + len(captured_stones))
|
||||
|
||||
pos.n += 1
|
||||
pos.caps = new_caps
|
||||
pos.ko = new_ko
|
||||
pos.recent += (PlayerMove(color, c),)
|
||||
pos.to_play *= -1
|
||||
return pos
|
||||
|
||||
def score(self):
|
||||
'Return score from B perspective. If W is winning, score is negative.'
|
||||
working_board = np.copy(self.board)
|
||||
while EMPTY in working_board:
|
||||
unassigned_spaces = np.where(working_board == EMPTY)
|
||||
c = unassigned_spaces[0][0], unassigned_spaces[1][0]
|
||||
territory, borders = find_reached(working_board, c)
|
||||
border_colors = set(working_board[b] for b in borders)
|
||||
X_border = BLACK in border_colors
|
||||
O_border = WHITE in border_colors
|
||||
if X_border and not O_border:
|
||||
territory_color = BLACK
|
||||
elif O_border and not X_border:
|
||||
territory_color = WHITE
|
||||
else:
|
||||
territory_color = UNKNOWN # dame, or seki
|
||||
place_stones(working_board, territory_color, territory)
|
||||
|
||||
return np.count_nonzero(working_board == BLACK) - np.count_nonzero(working_board == WHITE) - self.komi
|
||||
|
||||
def result(self):
|
||||
score = self.score()
|
||||
if score > 0:
|
||||
return 'B+' + '%.1f' % score
|
||||
elif score < 0:
|
||||
return 'W+' + '%.1f' % abs(score)
|
||||
else:
|
||||
return 'DRAW'
|
||||
|
||||
set_board_size(19)
|
||||
262
utils/gtp.py
Normal file
262
utils/gtp.py
Normal file
@ -0,0 +1,262 @@
|
||||
import re
|
||||
|
||||
|
||||
def pre_engine(s):
|
||||
s = re.sub("[^\t\n -~]", "", s)
|
||||
s = s.split("#")[0]
|
||||
s = s.replace("\t", " ")
|
||||
return s
|
||||
|
||||
|
||||
def pre_controller(s):
|
||||
s = re.sub("[^\t\n -~]", "", s)
|
||||
s = s.replace("\t", " ")
|
||||
return s
|
||||
|
||||
|
||||
def gtp_boolean(b):
|
||||
return "true" if b else "false"
|
||||
|
||||
|
||||
def gtp_list(l):
|
||||
return "\n".join(l)
|
||||
|
||||
|
||||
def gtp_color(color):
|
||||
# an arbitrary choice amongst a number of possibilities
|
||||
return {BLACK: "B", WHITE: "W"}[color]
|
||||
|
||||
|
||||
def gtp_vertex(vertex):
|
||||
if vertex == PASS:
|
||||
return "pass"
|
||||
elif vertex == RESIGN:
|
||||
return "resign"
|
||||
else:
|
||||
x, y = vertex
|
||||
return "{}{}".format("ABCDEFGHJKLMNOPQRSTYVWYZ"[x - 1], y)
|
||||
|
||||
|
||||
def gtp_move(color, vertex):
|
||||
return " ".join([gtp_color(color), gtp_vertex(vertex)])
|
||||
|
||||
|
||||
def parse_message(message):
|
||||
message = pre_engine(message).strip()
|
||||
first, rest = (message.split(" ", 1) + [None])[:2]
|
||||
if first.isdigit():
|
||||
message_id = int(first)
|
||||
if rest is not None:
|
||||
command, arguments = (rest.split(" ", 1) + [None])[:2]
|
||||
else:
|
||||
command, arguments = None, None
|
||||
else:
|
||||
message_id = None
|
||||
command, arguments = first, rest
|
||||
|
||||
return message_id, command, arguments
|
||||
|
||||
|
||||
WHITE = -1
|
||||
BLACK = +1
|
||||
EMPTY = 0
|
||||
|
||||
PASS = (0, 0)
|
||||
RESIGN = "resign"
|
||||
|
||||
|
||||
def parse_color(color):
|
||||
if color.lower() in ["b", "black"]:
|
||||
return BLACK
|
||||
elif color.lower() in ["w", "white"]:
|
||||
return WHITE
|
||||
else:
|
||||
return False
|
||||
|
||||
|
||||
def parse_vertex(vertex_string):
|
||||
if vertex_string is None:
|
||||
return False
|
||||
elif vertex_string.lower() == "pass":
|
||||
return PASS
|
||||
elif len(vertex_string) > 1:
|
||||
x = "abcdefghjklmnopqrstuvwxyz".find(vertex_string[0].lower()) + 1
|
||||
if x == 0:
|
||||
return False
|
||||
if vertex_string[1:].isdigit():
|
||||
y = int(vertex_string[1:])
|
||||
else:
|
||||
return False
|
||||
else:
|
||||
return False
|
||||
return (x, y)
|
||||
|
||||
|
||||
def parse_move(move_string):
|
||||
color_string, vertex_string = (move_string.split(" ") + [None])[:2]
|
||||
color = parse_color(color_string)
|
||||
if color is False:
|
||||
return False
|
||||
vertex = parse_vertex(vertex_string)
|
||||
if vertex is False:
|
||||
return False
|
||||
|
||||
return color, vertex
|
||||
|
||||
|
||||
MIN_BOARD_SIZE = 7
|
||||
MAX_BOARD_SIZE = 19
|
||||
|
||||
|
||||
def format_success(message_id, response=None):
|
||||
if response is None:
|
||||
response = ""
|
||||
else:
|
||||
response = " {}".format(response)
|
||||
if message_id:
|
||||
return "={}{}\n\n".format(message_id, response)
|
||||
else:
|
||||
return "={}\n\n".format(response)
|
||||
|
||||
|
||||
def format_error(message_id, response):
|
||||
if response:
|
||||
response = " {}".format(response)
|
||||
if message_id:
|
||||
return "?{}{}\n\n".format(message_id, response)
|
||||
else:
|
||||
return "?{}\n\n".format(response)
|
||||
|
||||
|
||||
class Engine(object):
|
||||
def __init__(self, game_obj, name="gtp (python library)", version="0.2"):
|
||||
|
||||
self.size = 19
|
||||
self.komi = 6.5
|
||||
|
||||
self._game = game_obj
|
||||
self._game.clear()
|
||||
|
||||
self._name = name
|
||||
self._version = version
|
||||
|
||||
self.disconnect = False
|
||||
|
||||
self.known_commands = [
|
||||
field[4:] for field in dir(self) if field.startswith("cmd_")]
|
||||
|
||||
def send(self, message):
|
||||
message_id, command, arguments = parse_message(message)
|
||||
if command in self.known_commands:
|
||||
try:
|
||||
return format_success(
|
||||
message_id, getattr(self, "cmd_" + command)(arguments))
|
||||
except ValueError as exception:
|
||||
return format_error(message_id, exception.args[0])
|
||||
else:
|
||||
return format_error(message_id, "unknown command")
|
||||
|
||||
def vertex_in_range(self, vertex):
|
||||
if vertex == PASS:
|
||||
return True
|
||||
if 1 <= vertex[0] <= self.size and 1 <= vertex[1] <= self.size:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
# commands
|
||||
|
||||
def cmd_protocol_version(self, arguments):
|
||||
return 2
|
||||
|
||||
def cmd_name(self, arguments):
|
||||
return self._name
|
||||
|
||||
def cmd_version(self, arguments):
|
||||
return self._version
|
||||
|
||||
def cmd_known_command(self, arguments):
|
||||
return gtp_boolean(arguments in self.known_commands)
|
||||
|
||||
def cmd_list_commands(self, arguments):
|
||||
return gtp_list(self.known_commands)
|
||||
|
||||
def cmd_quit(self, arguments):
|
||||
self.disconnect = True
|
||||
|
||||
def cmd_boardsize(self, arguments):
|
||||
if arguments.isdigit():
|
||||
size = int(arguments)
|
||||
if MIN_BOARD_SIZE <= size <= MAX_BOARD_SIZE:
|
||||
self.size = size
|
||||
self._game.set_size(size)
|
||||
else:
|
||||
raise ValueError("unacceptable size")
|
||||
else:
|
||||
raise ValueError("non digit size")
|
||||
|
||||
def cmd_clear_board(self, arguments):
|
||||
self._game.clear()
|
||||
|
||||
def cmd_komi(self, arguments):
|
||||
try:
|
||||
komi = float(arguments)
|
||||
self.komi = komi
|
||||
self._game.set_komi(komi)
|
||||
except ValueError:
|
||||
raise ValueError("syntax error")
|
||||
|
||||
def cmd_play(self, arguments):
|
||||
move = parse_move(arguments)
|
||||
if move:
|
||||
color, vertex = move
|
||||
if self.vertex_in_range(vertex):
|
||||
if self._game.make_move(color, vertex):
|
||||
return
|
||||
raise ValueError("illegal move")
|
||||
|
||||
def cmd_genmove(self, arguments):
|
||||
c = parse_color(arguments)
|
||||
if c:
|
||||
move = self._game.get_move(c)
|
||||
self._game.make_move(c, move)
|
||||
return gtp_vertex(move)
|
||||
else:
|
||||
raise ValueError("unknown player: {}".format(arguments))
|
||||
|
||||
|
||||
class MinimalGame(object):
|
||||
def __init__(self, size=19, komi=6.5):
|
||||
self.size = size
|
||||
self.komi = 6.5
|
||||
self.board = [EMPTY] * (self.size * self.size)
|
||||
|
||||
def _flatten(self, vertex):
|
||||
(x, y) = vertex
|
||||
return (x - 1) * self.size + (y - 1)
|
||||
|
||||
def clear(self):
|
||||
self.board = [EMPTY] * (self.size * self.size)
|
||||
|
||||
def make_move(self, color, vertex):
|
||||
# no legality check other than the space being empty..
|
||||
# no side-effects beyond placing the stone..
|
||||
if vertex == PASS:
|
||||
return True # noop
|
||||
idx = self._flatten(vertex)
|
||||
if self.board[idx] == EMPTY:
|
||||
self.board[idx] = color
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
def set_size(self, n):
|
||||
self.size = n
|
||||
self.clear()
|
||||
|
||||
def set_komi(self, k):
|
||||
self.komi = k
|
||||
|
||||
def get_move(self, color):
|
||||
# pass every time. At least it's legal
|
||||
return (0, 0)
|
||||
86
utils/gtp_wrapper.py
Normal file
86
utils/gtp_wrapper.py
Normal file
@ -0,0 +1,86 @@
|
||||
import gtp
|
||||
import go
|
||||
import utils
|
||||
|
||||
|
||||
|
||||
def translate_gtp_colors(gtp_color):
|
||||
if gtp_color == gtp.BLACK:
|
||||
return go.BLACK
|
||||
elif gtp_color == gtp.WHITE:
|
||||
return go.WHITE
|
||||
else:
|
||||
return go.EMPTY
|
||||
|
||||
class GtpInterface(object):
|
||||
def __init__(self):
|
||||
self.size = 9
|
||||
self.position = None
|
||||
self.komi = 6.5
|
||||
self.clear()
|
||||
|
||||
def set_size(self, n):
|
||||
self.size = n
|
||||
go.set_board_size(n)
|
||||
self.clear()
|
||||
|
||||
def set_komi(self, komi):
|
||||
self.komi = komi
|
||||
self.position.komi = komi
|
||||
|
||||
def clear(self):
|
||||
self.position = go.Position(komi=self.komi)
|
||||
|
||||
def accomodate_out_of_turn(self, color):
|
||||
if not translate_gtp_colors(color) == self.position.to_play:
|
||||
self.position.flip_playerturn(mutate=True)
|
||||
|
||||
def make_move(self, color, vertex):
|
||||
coords = utils.parse_pygtp_coords(vertex)
|
||||
self.accomodate_out_of_turn(color)
|
||||
try:
|
||||
self.position = self.position.play_move(coords, color=translate_gtp_colors(color))
|
||||
except go.IllegalMove:
|
||||
return False
|
||||
return True
|
||||
|
||||
def get_move(self, color):
|
||||
self.accomodate_out_of_turn(color)
|
||||
if self.should_resign(self.position):
|
||||
return gtp.RESIGN
|
||||
|
||||
if self.should_pass(self.position):
|
||||
return gtp.PASS
|
||||
|
||||
move = self.suggest_move(self.position)
|
||||
return utils.unparse_pygtp_coords(move)
|
||||
|
||||
def should_resign(self, position):
|
||||
if position.caps[0] + 50 < position.caps[1]:
|
||||
return gtp.RESIGN
|
||||
|
||||
def should_pass(self, position):
|
||||
# Pass if the opponent passes
|
||||
return position.n > 100 and position.recent and position.recent[-1].move == None
|
||||
|
||||
def get_score(self):
|
||||
return self.position.result()
|
||||
|
||||
def suggest_move(self, position):
|
||||
raise NotImplementedError
|
||||
|
||||
def make_gtp_instance(strategy_name, read_file):
|
||||
n = PolicyNetwork(use_cpu=True)
|
||||
n.initialize_variables(read_file)
|
||||
if strategy_name == 'random':
|
||||
instance = RandomPlayer()
|
||||
elif strategy_name == 'policy':
|
||||
instance = GreedyPolicyPlayer(n)
|
||||
elif strategy_name == 'randompolicy':
|
||||
instance = RandomPolicyPlayer(n)
|
||||
elif strategy_name == 'mcts':
|
||||
instance = MCTSPlayer(n)
|
||||
else:
|
||||
return None
|
||||
gtp_engine = gtp.Engine(instance)
|
||||
return gtp_engine
|
||||
53
utils/text2data.py
Normal file
53
utils/text2data.py
Normal file
@ -0,0 +1,53 @@
|
||||
import numpy as np
|
||||
import os
|
||||
|
||||
def hex2board(hex):
|
||||
scale = 16
|
||||
num_of_bits = 360
|
||||
binary = bin(int(hex[:-2], scale))[2:].zfill(num_of_bits) + hex[-2]
|
||||
board = np.zeros([361])
|
||||
for i in range(361):
|
||||
board[i] = int(binary[i])
|
||||
board = board.reshape(1,19,19,1)
|
||||
return board
|
||||
|
||||
def str2prob(str):
|
||||
p = str.split()
|
||||
prob = np.zeros([362])
|
||||
for i in range(362):
|
||||
prob[i] = float(p[i])
|
||||
prob = prob.reshape(1,362)
|
||||
return prob
|
||||
|
||||
dir = "/home/yama/tongzheng/leela-zero/autogtp/new_spr/"
|
||||
name = os.listdir(dir)
|
||||
text = []
|
||||
for n in name:
|
||||
if n[-4:]==".txt":
|
||||
text.append(n)
|
||||
print(text)
|
||||
for t in text:
|
||||
num = 0
|
||||
boards = np.zeros([0, 19, 19, 17])
|
||||
board = np.zeros([1, 19, 19, 0])
|
||||
win = np.zeros([0, 1])
|
||||
p = np.zeros([0, 362])
|
||||
for line in open(dir + t):
|
||||
if num % 19 < 16:
|
||||
new_board = hex2board(line)
|
||||
board = np.concatenate([board, new_board], axis=3)
|
||||
if num % 19 == 16:
|
||||
if line == '0':
|
||||
new_board = np.ones([1, 19 ,19 ,1])
|
||||
if line == '1':
|
||||
new_board = np.zeros([1, 19, 19, 1])
|
||||
board = np.concatenate([board, new_board], axis=3)
|
||||
boards = np.concatenate([boards, board], axis=0)
|
||||
board = np.zeros([1, 19, 19, 0])
|
||||
if num % 19 == 17:
|
||||
p = np.concatenate([p,str2prob(line)], axis=0)
|
||||
if num % 19 == 18:
|
||||
win = np.concatenate([win, np.array(float(line)).reshape(1,1)], axis=0)
|
||||
num=num+1
|
||||
print "Finished " + t
|
||||
np.savez("data/"+t[:-4], boards=boards, win=win, p=p)
|
||||
105
utils/utils.py
Normal file
105
utils/utils.py
Normal file
@ -0,0 +1,105 @@
|
||||
from collections import defaultdict
|
||||
import functools
|
||||
import itertools
|
||||
import operator
|
||||
import random
|
||||
import re
|
||||
import time
|
||||
|
||||
import gtp
|
||||
import go
|
||||
|
||||
KGS_COLUMNS = 'ABCDEFGHJKLMNOPQRST'
|
||||
SGF_COLUMNS = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
|
||||
|
||||
def parse_sgf_to_flat(sgf):
|
||||
return flatten_coords(parse_sgf_coords(sgf))
|
||||
|
||||
def flatten_coords(c):
|
||||
return go.N * c[0] + c[1]
|
||||
|
||||
def unflatten_coords(f):
|
||||
return divmod(f, go.N)
|
||||
|
||||
def parse_sgf_coords(s):
|
||||
'Interprets coords. aa is top left corner; sa is top right corner'
|
||||
if s is None or s == '':
|
||||
return None
|
||||
return SGF_COLUMNS.index(s[1]), SGF_COLUMNS.index(s[0])
|
||||
|
||||
def unparse_sgf_coords(c):
|
||||
if c is None:
|
||||
return ''
|
||||
return SGF_COLUMNS[c[1]] + SGF_COLUMNS[c[0]]
|
||||
|
||||
def parse_kgs_coords(s):
|
||||
'Interprets coords. A1 is bottom left; A9 is top left.'
|
||||
if s == 'pass':
|
||||
return None
|
||||
s = s.upper()
|
||||
col = KGS_COLUMNS.index(s[0])
|
||||
row_from_bottom = int(s[1:]) - 1
|
||||
return go.N - row_from_bottom - 1, col
|
||||
|
||||
def parse_pygtp_coords(vertex):
|
||||
'Interprets coords. (1, 1) is bottom left; (1, 9) is top left.'
|
||||
if vertex in (gtp.PASS, gtp.RESIGN):
|
||||
return None
|
||||
return go.N - vertex[1], vertex[0] - 1
|
||||
|
||||
def unparse_pygtp_coords(c):
|
||||
if c is None:
|
||||
return gtp.PASS
|
||||
return c[1] + 1, go.N - c[0]
|
||||
|
||||
def parse_game_result(result):
|
||||
if re.match(r'[bB]\+', result):
|
||||
return go.BLACK
|
||||
elif re.match(r'[wW]\+', result):
|
||||
return go.WHITE
|
||||
else:
|
||||
return None
|
||||
|
||||
def product(numbers):
|
||||
return functools.reduce(operator.mul, numbers)
|
||||
|
||||
def take_n(n, iterable):
|
||||
return list(itertools.islice(iterable, n))
|
||||
|
||||
def iter_chunks(chunk_size, iterator):
|
||||
while True:
|
||||
next_chunk = take_n(chunk_size, iterator)
|
||||
# If len(iterable) % chunk_size == 0, don't return an empty chunk.
|
||||
if next_chunk:
|
||||
yield next_chunk
|
||||
else:
|
||||
break
|
||||
|
||||
def shuffler(iterator, pool_size=10**5, refill_threshold=0.9):
|
||||
yields_between_refills = round(pool_size * (1 - refill_threshold))
|
||||
# initialize pool; this step may or may not exhaust the iterator.
|
||||
pool = take_n(pool_size, iterator)
|
||||
while True:
|
||||
random.shuffle(pool)
|
||||
for i in range(yields_between_refills):
|
||||
yield pool.pop()
|
||||
next_batch = take_n(yields_between_refills, iterator)
|
||||
if not next_batch:
|
||||
break
|
||||
pool.extend(next_batch)
|
||||
# finish consuming whatever's left - no need for further randomization.
|
||||
yield from pool
|
||||
|
||||
class timer(object):
|
||||
all_times = defaultdict(float)
|
||||
def __init__(self, label):
|
||||
self.label = label
|
||||
def __enter__(self):
|
||||
self.tick = time.time()
|
||||
def __exit__(self, type, value, traceback):
|
||||
self.tock = time.time()
|
||||
self.all_times[self.label] += self.tock - self.tick
|
||||
@classmethod
|
||||
def print_times(cls):
|
||||
for k, v in cls.all_times.items():
|
||||
print("%s: %.3f" % (k, v))
|
||||
Loading…
x
Reference in New Issue
Block a user