Merge Python Branch into Main #1

Merged
hagi merged 45 commits from python into main 2022-12-30 19:15:27 +00:00
20 changed files with 681 additions and 115 deletions

10
.editorconfig Normal file
View file

@ -0,0 +1,10 @@
root = true
[*]
end_of_line = lf
insert_final_newline = true
trim_trailing_whitespace = true
charset = utf-8
indent_style = space
indent_size = 4
max_line_length = 88

21
.gitignore vendored
View file

@ -1,18 +1,3 @@
.*.swp
# Binaries for programs and plugins
*.exe
*.exe~
*.dll
*.so
*.dylib
blunderboard
# Test binary, built with `go test -c`
*.test
# Output of the go coverage tool, specifically when used with LiteIDE
*.out
# Dependency directories (remove the comment below to include it)
# vendor/
/dist/
/venv/
*.egg-info/

View file

@ -1,91 +0,0 @@
package main
import (
"fmt"
"github.com/notnil/chess"
"github.com/notnil/chess/uci"
"math"
"os"
)
// stolen^H^H inspired from lichess https://github.com/ornicar/lila/blob/master/modules/analyse/src/main/Advice.scala#L79
func WinningChance(cp int) float64 {
winning_chance := 2 / (1 + math.Exp(-0.004 * float64(cp))) - 1
return winning_chance
}
func main() {
reader, err := os.Open("spongeboyahoy_vs_tomlx.pgn")
if err != nil {
panic(err)
}
pgn, err := chess.PGN(reader)
if err != nil {
panic(err)
}
spongeboyahoy_vs_tomlx := chess.NewGame(pgn)
fmt.Println(spongeboyahoy_vs_tomlx)
engine, err := uci.New("stockfish")
if err != nil {
panic(err)
}
defer engine.Close()
if err := engine.Run(uci.CmdUCI, uci.CmdIsReady, uci.CmdUCINewGame); err != nil {
panic(err)
}
game := chess.NewGame()
// prevprev_winning_chance := 0.0
prev_winning_chance := 0.0
for game.Outcome() == chess.NoOutcome {
num_of_moves := len(game.Moves())
if err := engine.Run(uci.CmdPosition{Position: game.Position()}, uci.CmdGo{Depth: 12}); err != nil {
panic(err)
}
search_results := engine.SearchResults()
cp := search_results.Info.Score.CP
if (num_of_moves % 2 == 1) {
cp *= -1
}
winning_chance := WinningChance(cp)
if (num_of_moves > 0) {
delta := prev_winning_chance - winning_chance
if (num_of_moves % 2 == 0) {
delta *= -1;
}
if delta > 0.3 {
fmt.Print("B-b-b-blunder!!")
} else if delta > 0.2 {
fmt.Print("That was a mistake.")
} else if delta > 0.1 {
fmt.Print("Meh...")
} else {
fmt.Print("Ok")
}
fmt.Printf(" (%0.2f, %0.2f, %0.2f)\n", float64(cp) / 100, winning_chance, -delta)
}
// prevprev_winning_chance = prev_winning_chance
prev_winning_chance = winning_chance
// fmt.Println(game.Position().Board().Draw())
// fmt.Println("Score (centipawns):", cp, "Winning chance:", winning_chance, "Best Move: ", search_results.BestMove)
// fmt.Println("Move: ", search_results.BestMove)
move := spongeboyahoy_vs_tomlx.Moves()[num_of_moves]
fmt.Print(num_of_moves / 2 + 1, move, "\t")
if err := game.Move(move); err != nil {
panic(err)
}
// for {
// var move string
// fmt.Print("Move: ")
// fmt.Scanln(&move)
// if err := game.MoveStr(move); err == nil {
// break
// }
// fmt.Println("Illegal move!")
// }
}
fmt.Println(game.Outcome())
fmt.Println(game.Position().Board().Draw())
}

0
config-example.yaml Normal file
View file

10
deploy.sh Executable file
View file

@ -0,0 +1,10 @@
set -e
[ -d venv ] || virtualenv venv
. venv/bin/activate
pip install build tox
tox
python -m build
ssh pi@blunderboard.igloo.icmp.camp rm -f 'blunderboard-*.whl'
scp dist/blunderboard-*.whl pi@blunderboard.igloo.icmp.camp:
ssh pi@blunderboard.igloo.icmp.camp blunderboard/venv/bin/pip uninstall -y blunderboard
ssh pi@blunderboard.igloo.icmp.camp blunderboard/venv/bin/pip install 'blunderboard-*.whl'

3
go.mod
View file

@ -1,3 +0,0 @@
module git.0x90.space/0x90/blunderboard
require github.com/notnil/chess v1.5.0

3
go.sum
View file

@ -1,3 +0,0 @@
github.com/ajstarks/svgo v0.0.0-20200320125537-f189e35d30ca/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw=
github.com/notnil/chess v1.5.0 h1:BcdmSGqZYhoqHsAqNpVTtPwRMOA4Sj8iZY1ZuPW4Umg=
github.com/notnil/chess v1.5.0/go.mod h1:cRuJUIBFq9Xki05TWHJxHYkC+fFpq45IWwk94DdlCrA=

6
pyproject.toml Normal file
View file

@ -0,0 +1,6 @@
[build-system]
requires = [
"setuptools>=42",
"wheel"
]
build-backend = "setuptools.build_meta"

44
request_test.py Normal file
View file

@ -0,0 +1,44 @@
import requests
# Replace YOUR_API_TOKEN with the API token you are using for authentication
API_TOKEN = "blunderboard-security-token"
# Set the API endpoint URL
url = "http://5.75.138.151:5000/api/get_evaluation"
wdl_api = "http://5.75.138.151:5000/api/get_wdl"
# Set the chess position and search depth
data = {
"position": "rnbqkbnr/pppppppp/8/8/8/8/PPPPPPPP/RNBQKBNR w KQkq - 0 1",
"depth": 20,
}
# Set the API token in the request header
headers = {"Authorization": API_TOKEN}
# Send a POST request to the API endpoint
response = requests.post(url, json=data, headers=headers)
# Print the response content from the server
if response.status_code == 200:
print(response.json())
else:
print("Error: " + response.json()["error"])
def api_wdl() -> str:
"""
Returns the current wdl from the REST API
:return: str
"""
# Send a POST request to the API endpoint
response2 = requests.post(wdl_api, json=data, headers=headers)
if response2.status_code == 200:
return response2.json()
else:
print("API Error: " + response2.json()["error"])
return "API Error"
print(api_wdl())

65
rest_api.py Normal file
View file

@ -0,0 +1,65 @@
from flask import Flask, request, jsonify
from stockfish import Stockfish
app = Flask(__name__)
# Replace YOUR_API_TOKEN with the API token you want to use for authentication
API_TOKEN = "blunderboard-security-token"
default_engine_settings = {
"Debug Log File": "stocklog.txt",
"Contempt": 0,
"Min Split Depth": 0,
"Threads": 2,
# More threads will make the engine stronger, but should be kept at less than
# the number of logical processors on your computer.
"Ponder": "false",
"Hash": 516,
# Default size is 16 MB. It's recommended that you increase this value, but keep
# it as some power of 2. E.g., if you're fine using 2 GB of RAM, set Hash to
# 2048 (11th power of 2).
"MultiPV": 1,
"Skill Level": 20,
"Move Overhead": 10,
"Minimum Thinking Time": 5,
"Slow Mover": 100,
"UCI_Chess960": "false",
"UCI_LimitStrength": "false",
"UCI_Elo": 1350,
# "NNUE": "true", # TODO Find out if NNUE can be used with the python wrapper
}
# Create a Stockfish chess engine instance
engine = Stockfish(path="/usr/bin/stockfish")
# Set the engine settings
engine.update_engine_parameters(default_engine_settings)
@app.route("/api/get_evaluation", methods=["POST"])
def get_evaluation():
# Get the API token from the request header
token = request.headers.get("Authorization")
# If the API token is not provided or is invalid, return an error
if token != API_TOKEN:
return jsonify({"error": "Invalid API token"}), 401
# Get the current chess position and the desired depth from the request body
data = request.get_json()
position = data.get("position")
depth = data.get("depth")
# Set the position and search depth in the chess engine
engine.set_fen_position(position)
engine.set_depth(depth)
# Get the evaluation in centipawns
evaluation = engine.get_evaluation()
# Return the evaluation to the client
return evaluation
if __name__ == "__main__":
app.run(host="")

70
setup.cfg Normal file
View file

@ -0,0 +1,70 @@
[metadata]
name = blunderboard
version = 0.0.1
author = 0x90.space
author_email = people@schleuder.0x90.space
description = Blunderboard
url = https://git.0x90.space/0x90/blunderboard
project_urls =
Bug Tracker = https://git.0x90.space/0x90/blunderboard/issues
classifiers =
Programming Language :: Python :: 3 :: Only
License :: OSI Approved :: ISC License (ISCL)
Operating System :: POSIX :: Linux
Topic :: Games/Entertainment :: Board Games
[options]
package_dir =
= src
packages = find:
python_requires = >=3.9
install_requires =
pygame
RPi.GPIO
stockfish
requests
[options.packages.find]
where = src
[options.data_files]
share/blunderboard/sounds/blunder =
sounds/blunder/tts_what_a_blunder.mp3
sounds/blunder/ZONK.mp3
share/blunderboard/sounds/illegal =
sounds/illegal/alarm.mp3
[options.entry_points]
console_scripts =
blunderboard = blunderboard:main
[tox:tox]
envlist = lint
isolated_build = True
[testenv:lint]
skip_install = True
deps =
black
flake8
mypy
commands =
black --check --diff src
flake8 src
mypy src
#[testenv]
#deps =
# pytest
#commands =
# pytest tests
[flake8]
max_line_length = 88
extend_ignore = E203
[mypy]
check_untyped_defs = True
ignore_missing_imports = True
install_types = True
non_interactive = True

BIN
sounds/blunder/ZONK.mp3 Normal file

Binary file not shown.

Binary file not shown.

BIN
sounds/illegal/alarm.mp3 Normal file

Binary file not shown.

View file

@ -0,0 +1,24 @@
from blunderboard.blunderevaluator import BlunderEvaluator
from blunderboard.boardreader import BoardReader
from blunderboard.movegenerator import MoveGenerator
import cProfile
from pstats import SortKey
from time import sleep
def main_content():
try:
blunder_evaluator = BlunderEvaluator()
move_generator = MoveGenerator(blunder_evaluator)
reader = BoardReader(move_generator)
reader.scan()
reader.print()
while True:
reader.scan()
sleep(0.1)
except KeyboardInterrupt:
pass
def main():
cProfile.runctx("main_content()", globals(), locals(), sort=SortKey.CUMULATIVE)

View file

@ -0,0 +1,148 @@
import os
from pathlib import Path
from pygame import mixer
import random
from stockfish import Stockfish
import sys
import requests
API_TOKEN = "blunderboard-security-token"
sound_path = Path(sys.prefix) / "share" / "blunderboard" / "sounds"
api = "http://5.75.138.151:5000/api/get_evaluation"
wdl_api = "http://5.75.138.151:5000/api/get_wdl"
class BlunderEvaluator:
default_engine_settings = {
"Debug Log File": "stocklog.txt",
"Contempt": 0,
"Min Split Depth": 0,
"Threads": 1,
# More threads will make the engine stronger, but should be kept at less than
# the number of logical processors on your computer.
"Ponder": "false",
"Hash": 16,
# Default size is 16 MB. It's recommended that you increase this value, but keep
# it as some power of 2. E.g., if you're fine using 2 GB of RAM, set Hash to
# 2048 (11th power of 2).
"MultiPV": 1,
"Skill Level": 20,
"Move Overhead": 10,
"Minimum Thinking Time": 20,
"Slow Mover": 100,
"UCI_Chess960": "false",
"UCI_LimitStrength": "false",
"UCI_Elo": 1350,
# "NNUE": "true", # TODO Find out if NNUE can be used with the python wrapper
}
def __init__(self, engine_settings: dict = default_engine_settings):
self.engine = Stockfish()
self.settings = engine_settings
self.engine.update_engine_parameters(self.settings)
self.engine.set_position()
self.current_evaluation = self.api_evaluation()
self.evaluations: list[dict] = []
self.current_wdl = self.api_wdl()
self.wdls: list = []
self.current_fen = self.engine.get_fen_position()
self.white_to_move = True
def reset(self):
self.engine.set_position()
self.white_to_move = True
def move(self, move) -> None:
"""
Makes a move on the board and updates the game state
:param move: str
:return: None
"""
if self.engine.is_move_correct(move):
self.engine.make_moves_from_current_position([move])
self.current_evaluation = self.api_evaluation()
self.evaluations.append(self.current_evaluation)
self.current_wdl = self.api_wdl()
self.wdls.append(self.current_wdl)
print(self.current_wdl)
print(self.current_evaluation)
print(self.get_board())
if self.move_was_blunder():
# If the played move was a blunder play a random sound from the blunder
# path
self.play_sound("blunder")
print("Blunder!")
if self.white_to_move:
self.white_to_move = False
elif not self.white_to_move:
self.white_to_move = True
else:
print("Invalid move")
self.play_sound("illegal")
def api_evaluation(self) -> dict:
"""
Returns the current evaluation from the REST API
:return: str
"""
data = {"position": self.engine.get_fen_position(), "depth": 20}
# Set the API token in the request header
headers = {"Authorization": API_TOKEN}
# Send a POST request to the API endpoint
response = requests.post(api, json=data, headers=headers)
if response.status_code == 200:
return response.json()["value"]
else:
print("API Error: " + response.json()["error"])
return {"NOPE": "PLEASE CRASH"}
def api_wdl(self) -> str:
"""
Returns the current wdl from the REST API
:return: str
"""
data = {"position": self.engine.get_fen_position(), "depth": 20}
# Set the API token in the request header
headers = {"Authorization": API_TOKEN}
# Send a POST request to the API endpoint
response = requests.post(wdl_api, json=data, headers=headers)
if response.status_code == 200:
return response.json()
else:
print("API Error: " + response.json()["error"])
return "API Error"
def move_was_blunder(self) -> bool:
"""
Returns true if the last move was a blunder
:return: bool
"""
if len(self.wdls) > 1: # Don't check for blunders on the first move
previous_wdl = self.wdls[len(self.evaluations) - 2]
if abs(previous_wdl[0] - self.current_wdl[2]) > 300:
return True
else:
return False
return False
@staticmethod
def play_sound(move_type: str) -> None:
"""
Plays a random sound for the type of move (blunder, illegal)
:param move_type: str
:return: None
"""
path = sound_path / move_type
mixer.init()
mixer.music.load(path / random.choice(os.listdir(path)))
mixer.music.play()
# while mixer.music.get_busy():
# time.sleep(0.)
# I guess we won't want this, since it will block the main thread.
def get_board(self) -> str:
"""
Returns the current board state
:return: str
"""
return self.engine.get_board_visual()

View file

@ -0,0 +1,123 @@
from blunderboard.movegenerator import MoveGenerator
import RPi.GPIO as gpio
class BoardReader:
hysteresis = 8
default_gpio_mode = gpio.BCM
default_row_gpios = [4, 5, 6, 12, 13, 16, 17, 19]
default_column_gpios = [20, 21, 22, 23, 24, 25, 26, 27]
def __init__(
self,
move_generator: MoveGenerator,
row_gpios: list[int] = default_row_gpios,
column_gpios: list[int] = default_column_gpios,
gpio_mode=default_gpio_mode,
):
gpio.setmode(gpio_mode)
gpio.setup(column_gpios, gpio.IN, pull_up_down=gpio.PUD_DOWN)
gpio.setup(row_gpios, gpio.OUT, initial=gpio.LOW)
self.column_gpios = column_gpios
self.row_gpios = row_gpios
self.board_history: list[list[list[str]]] = []
for _ in range(self.hysteresis):
self.board_history.append(self._initial_board())
self.move_generator = move_generator
def __del__(self):
gpio.cleanup()
def _empty_board(self) -> list[list[str]]:
board = []
for i in range(8):
board.append([" "] * 8)
return board
def _initial_board(self) -> list[list[str]]:
board = []
for i in range(2):
board.append(["x"] * 8)
for i in range(2, 6):
board.append([" "] * 8)
for i in range(6, 8):
board.append(["x"] * 8)
return board
def _is_initial_board(self, board) -> bool:
initial_board = self._initial_board()
for i, row in enumerate(board):
for j, field in enumerate(row):
if field != initial_board[i][j]:
return False
return True
def scan(self) -> None:
next_board = self._empty_board()
for i, row_gpio in enumerate(self.row_gpios):
gpio.output(row_gpio, gpio.HIGH)
for j, column_gpio in enumerate(self.column_gpios):
if gpio.input(column_gpio):
next_board[i][j] = "x"
gpio.output(row_gpio, gpio.LOW)
self.board_history = [next_board] + self.board_history[:-1]
# if the oldest half of the board history is not in inital position but all
# newer boards are, reset game state
for board in self.board_history[self.hysteresis // 2 :]:
if self._is_initial_board(board):
break
else:
for board in self.board_history[: self.hysteresis // 2]:
if not self._is_initial_board(board):
break
else:
self.print()
self.move_generator.reset()
return
for i in range(8):
for j in range(8):
# if the oldest half of the board history has a piece but no newer
# boards have it, the piece was removed
for board in self.board_history[self.hysteresis // 2 :]:
if board[i][j] == " ":
break
else:
for board in self.board_history[: self.hysteresis // 2]:
if board[i][j] == "x":
break
else:
self.print()
self.move_generator.take(i, j)
for i in range(8):
for j in range(8):
# if the oldest half of the board history doesn't have a piece but all
# newer boards have it, the piece was placed
for board in self.board_history[self.hysteresis // 2 :]:
if board[i][j] == "x":
break
else:
for board in self.board_history[: self.hysteresis // 2]:
if board[i][j] == " ":
break
else:
self.print()
self.move_generator.put(i, j)
def _print(self, board) -> None:
print(" a b c d e f g h")
print(" +---------------+")
for i, row in reversed(list(enumerate(board))):
print("%d|" % (i + 1), end="")
for j, field in enumerate(row):
print(field, end="")
if j == 7:
print("|%d" % (i + 1))
else:
print(" ", end="")
print(" +---------------+")
print(" a b c d e f g h")
def print(self) -> None:
self._print(self.board_history[0])

View file

View file

View file

@ -0,0 +1,178 @@
from blunderboard.blunderevaluator import BlunderEvaluator
def coords_to_field(row: int, column: int):
columns = "abcdefgh"
return "%c%d" % (columns[column], row + 1)
class MoveGenerator:
def __init__(self, blunder_evaluator: BlunderEvaluator):
self.state: State = InitState(blunder_evaluator)
def reset(self) -> None:
print("reset")
self.state = self.state.reset()
def put(self, row: int, column: int) -> None:
print("put %s" % coords_to_field(row, column))
self.state = self.state.put(row, column)
def take(self, row: int, column: int) -> None:
print("take %s" % coords_to_field(row, column))
self.state = self.state.take(row, column)
class State:
def __init__(self, blunder_evaluator: BlunderEvaluator):
self.blunder_evaluator = blunder_evaluator
def reset(self) -> "State":
self.blunder_evaluator.reset()
return InitState(self.blunder_evaluator)
def put(self, row: int, column: int) -> "State":
print("ignored invalid put")
return self
def take(self, row: int, column: int) -> "State":
print("ignored invalid take")
return self
class InitState(State):
def reset(self) -> State:
super().reset()
return self
def take(self, row: int, column: int) -> State:
return TakeState(self.blunder_evaluator, coords_to_field(row, column))
class TakeState(State):
def __init__(self, blunder_evaluator: BlunderEvaluator, from_field: str):
super().__init__(blunder_evaluator)
self.from_field = from_field
def put(self, row: int, column: int) -> State:
to_field = coords_to_field(row, column)
if self.from_field == to_field:
print("ignored self-move")
return InitState(self.blunder_evaluator)
move = self.from_field + to_field
print("move %s" % move)
self.blunder_evaluator.move(move)
return InitState(self.blunder_evaluator)
def take(self, row: int, column: int) -> State:
return TakeTakeState(
self.blunder_evaluator, self.from_field, coords_to_field(row, column)
)
class TakeTakeState(State):
def __init__(
self, blunder_evaluator: BlunderEvaluator, from_field: str, from2_field: str
):
super().__init__(blunder_evaluator)
self.from_field = from_field
self.from2_field = from2_field
def put(self, row: int, column: int) -> State:
to_field = coords_to_field(row, column)
if self.from2_field == to_field:
move = self.from_field + to_field
elif self.from_field == to_field:
move = self.from2_field + to_field
elif (
self.from_field[1] == self.from2_field[1]
and self.from_field[1] == to_field[1]
):
# king-side castling
if (
self.from_field in ["e1", "e8"]
and self.from2_field in ["h1", "h8"]
and to_field in ["f1", "f8", "g1", "g8"]
):
return KingCastleState(
self.blunder_evaluator, self.from_field, self.from2_field, to_field
)
# queen-side castling
if (
self.from_field in ["e1", "e8"]
and self.from2_field in ["a1", "a8"]
and to_field in ["c1", "c8", "d1", "d8"]
):
return QueenCastleState(
self.blunder_evaluator, self.from_field, self.from2_field, to_field
)
print("ignored invalid put")
return self
else:
print("ignored invalid put")
return self
print("move %s" % move)
self.blunder_evaluator.move(move)
return InitState(self.blunder_evaluator)
class KingCastleState(State):
def __init__(
self,
blunder_evaluator: BlunderEvaluator,
from_field: str,
from2_field: str,
to_field: str,
):
super().__init__(blunder_evaluator)
self.from_field = from_field
self.from2_field = from2_field
self.to_field = to_field
def put(self, row: int, column: int) -> State:
to2_field = coords_to_field(row, column)
if self.to_field[1] == to2_field[1]:
if to2_field in ["f1", "g1"]:
move = "e1g1"
elif to2_field in ["f8", "g8"]:
move = "e8g8"
else:
print("ignored invalid put")
return self
else:
print("ignored invalid put")
return self
print("move %s" % move)
self.blunder_evaluator.move(move)
return InitState(self.blunder_evaluator)
class QueenCastleState(State):
def __init__(
self,
blunder_evaluator: BlunderEvaluator,
from_field: str,
from2_field: str,
to_field: str,
):
super().__init__(blunder_evaluator)
self.from_field = from_field
self.from2_field = from2_field
self.to_field = to_field
def put(self, row: int, column: int) -> State:
to2_field = coords_to_field(row, column)
if self.to_field[1] == to2_field[1]:
if to2_field in ["c1", "d1"]:
move = "e1c1"
elif to2_field in ["c8", "d8"]:
move = "e8c8"
else:
print("ignored invalid put")
return self
else:
print("ignored invalid put")
return self
print("move %s" % move)
self.blunder_evaluator.move(move)
return InitState(self.blunder_evaluator)