Tianshou/AlphaGo/strategy.py

187 lines
5.9 KiB
Python
Raw Normal View History

2017-12-01 01:37:55 +08:00
import os,sys
sys.path.append(os.path.join(os.path.dirname(__file__), os.path.pardir))
2017-11-26 13:36:52 +08:00
import numpy as np
import utils
2017-11-28 17:00:10 +08:00
import time
2017-12-04 11:01:49 +08:00
import network_small
2017-11-28 17:00:10 +08:00
import tensorflow as tf
2017-11-26 13:36:52 +08:00
from collections import deque
from tianshou.core.mcts.mcts import MCTS
class GoEnv:
2017-12-04 11:01:49 +08:00
def __init__(self, size=9, komi=6.5):
2017-11-26 13:36:52 +08:00
self.size = size
self.komi = 6.5
self.board = [utils.EMPTY] * (self.size * self.size)
self.history = deque(maxlen=8)
def _flatten(self, vertex):
x, y = vertex
return (x - 1) * self.size + (y - 1)
2017-12-03 19:16:21 +08:00
def _bfs(self, vertex, color, block, status, alive_break):
block.append(vertex)
status[self._flatten(vertex)] = True
nei = self._neighbor(vertex)
for n in nei:
if not status[self._flatten(n)]:
if self.board[self._flatten(n)] == color:
self._bfs(n, color, block, status, alive_break)
def _find_block(self, vertex, alive_break=False):
block = []
status = [False] * (self.size * self.size)
color = self.board[self._flatten(vertex)]
self._bfs(vertex, color, block, status, alive_break)
for b in block:
for n in self._neighbor(b):
if self.board[self._flatten(n)] == utils.EMPTY:
return False, block
return True, block
def _is_qi(self, color, vertex):
nei = self._neighbor(vertex)
for n in nei:
if self.board[self._flatten(n)] == utils.EMPTY:
return True
self.board[self._flatten(vertex)] = color
for n in nei:
if self.board[self._flatten(n)] == utils.another_color(color):
can_kill, block = self._find_block(n)
if can_kill:
self.board[self._flatten(vertex)] = utils.EMPTY
return True
### can not suicide
can_kill, block = self._find_block(vertex)
if can_kill:
self.board[self._flatten(vertex)] = utils.EMPTY
return False
self.board[self._flatten(vertex)] = utils.EMPTY
return True
def _check_global_isomorphous(self, color, vertex):
##backup
_board = copy.copy(self.board)
self.board[self._flatten(vertex)] = color
self._process_board(color, vertex)
if self.board in self.history:
res = True
else:
res = False
self.board = _board
return res
def _in_board(self, vertex):
x, y = vertex
if x < 1 or x > self.size: return False
if y < 1 or y > self.size: return False
return True
def _neighbor(self, vertex):
x, y = vertex
nei = []
for d in DELTA:
_x = x + d[0]
_y = y + d[1]
if self._in_board((_x, _y)):
nei.append((_x, _y))
return nei
def _process_board(self, color, vertex):
nei = self._neighbor(vertex)
for n in nei:
if self.board[self._flatten(n)] == utils.another_color(color):
can_kill, block = self._find_block(n, alive_break=True)
if can_kill:
for b in block:
self.board[self._flatten(b)] = utils.EMPTY
def is_valid(self, color, vertex):
### in board
if not self._in_board(vertex):
return False
### already have stone
if not self.board[self._flatten(vertex)] == utils.EMPTY:
return False
### check if it is qi
if not self._is_qi(color, vertex):
return False
if self._check_global_isomorphous(color, vertex):
return False
return True
2017-11-26 13:36:52 +08:00
def do_move(self, color, vertex):
if vertex == utils.PASS:
return True
id_ = self._flatten(vertex)
if self.board[id_] == utils.EMPTY:
self.board[id_] = color
self.history.append(self.board)
return True
else:
return False
def step_forward(self, state, action):
# print(state)
if state[0, 0, 0, -1] == 1:
color = 1
else:
color = -1
2017-12-04 11:01:49 +08:00
if action == 81:
2017-11-26 13:36:52 +08:00
vertex = (0, 0)
else:
2017-12-04 11:01:49 +08:00
vertex = (action / 9 + 1, action % 9)
2017-11-26 13:36:52 +08:00
self.do_move(color, vertex)
new_state = np.concatenate(
2017-12-04 11:01:49 +08:00
[state[:, :, :, 1:8], (np.array(self.board) == 1).reshape(1, 9, 9, 1),
state[:, :, :, 9:16], (np.array(self.board) == -1).reshape(1, 9, 9, 1),
np.array(1 - state[:, :, :, -1]).reshape(1, 9, 9, 1)],
2017-11-26 13:36:52 +08:00
axis=3)
return new_state, 0
class strategy(object):
2017-11-28 17:00:10 +08:00
def __init__(self):
2017-11-26 13:36:52 +08:00
self.simulator = GoEnv()
2017-12-04 11:01:49 +08:00
self.net = network_small.Network()
2017-11-28 17:00:10 +08:00
self.sess = self.net.forward()
self.evaluator = lambda state: self.sess.run([tf.nn.softmax(self.net.p), self.net.v],
feed_dict={self.net.x: state, self.net.is_training: False})
2017-11-26 13:36:52 +08:00
def data_process(self, history, color):
2017-12-04 11:01:49 +08:00
state = np.zeros([1, 9, 9, 17])
2017-11-26 13:36:52 +08:00
for i in range(8):
state[0, :, :, i] = history[i] == 1
state[0, :, :, i + 8] = history[i] == -1
if color == 1:
2017-12-04 11:01:49 +08:00
state[0, :, :, 16] = np.ones([9, 9])
2017-11-26 13:36:52 +08:00
if color == -1:
2017-12-04 11:01:49 +08:00
state[0, :, :, 16] = np.zeros([9, 9])
2017-11-26 13:36:52 +08:00
return state
def gen_move(self, history, color):
self.simulator.history = history
self.simulator.board = history[-1]
state = self.data_process(history, color)
prior = self.evaluator(state)[0]
2017-12-04 11:01:49 +08:00
mcts = MCTS(self.simulator, self.evaluator, state, 82, prior, inverse=True, max_step=100)
2017-11-26 13:36:52 +08:00
temp = 1
p = mcts.root.N ** temp / np.sum(mcts.root.N ** temp)
2017-12-04 11:01:49 +08:00
choice = np.random.choice(82, 1, p=p).tolist()[0]
if choice == 81:
2017-11-26 13:36:52 +08:00
move = (0, 0)
else:
2017-12-04 11:01:49 +08:00
move = (choice / 9 + 1, choice % 9 + 1)
2017-11-26 13:36:52 +08:00
return move