Merge Python Branch into Main #1
10
.editorconfig
Normal file
10
.editorconfig
Normal file
|
@ -0,0 +1,10 @@
|
||||||
|
root = true
|
||||||
|
|
||||||
|
[*]
|
||||||
|
end_of_line = lf
|
||||||
|
insert_final_newline = true
|
||||||
|
trim_trailing_whitespace = true
|
||||||
|
charset = utf-8
|
||||||
|
indent_style = space
|
||||||
|
indent_size = 4
|
||||||
|
max_line_length = 88
|
21
.gitignore
vendored
21
.gitignore
vendored
|
@ -1,18 +1,3 @@
|
||||||
.*.swp
|
/dist/
|
||||||
|
/venv/
|
||||||
# Binaries for programs and plugins
|
*.egg-info/
|
||||||
*.exe
|
|
||||||
*.exe~
|
|
||||||
*.dll
|
|
||||||
*.so
|
|
||||||
*.dylib
|
|
||||||
blunderboard
|
|
||||||
|
|
||||||
# Test binary, built with `go test -c`
|
|
||||||
*.test
|
|
||||||
|
|
||||||
# Output of the go coverage tool, specifically when used with LiteIDE
|
|
||||||
*.out
|
|
||||||
|
|
||||||
# Dependency directories (remove the comment below to include it)
|
|
||||||
# vendor/
|
|
||||||
|
|
|
@ -1,91 +0,0 @@
|
||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"github.com/notnil/chess"
|
|
||||||
"github.com/notnil/chess/uci"
|
|
||||||
"math"
|
|
||||||
"os"
|
|
||||||
)
|
|
||||||
|
|
||||||
// stolen^H^H inspired from lichess https://github.com/ornicar/lila/blob/master/modules/analyse/src/main/Advice.scala#L79
|
|
||||||
func WinningChance(cp int) float64 {
|
|
||||||
winning_chance := 2 / (1 + math.Exp(-0.004 * float64(cp))) - 1
|
|
||||||
return winning_chance
|
|
||||||
}
|
|
||||||
|
|
||||||
func main() {
|
|
||||||
reader, err := os.Open("spongeboyahoy_vs_tomlx.pgn")
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
pgn, err := chess.PGN(reader)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
spongeboyahoy_vs_tomlx := chess.NewGame(pgn)
|
|
||||||
fmt.Println(spongeboyahoy_vs_tomlx)
|
|
||||||
|
|
||||||
engine, err := uci.New("stockfish")
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
defer engine.Close()
|
|
||||||
|
|
||||||
if err := engine.Run(uci.CmdUCI, uci.CmdIsReady, uci.CmdUCINewGame); err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
game := chess.NewGame()
|
|
||||||
// prevprev_winning_chance := 0.0
|
|
||||||
prev_winning_chance := 0.0
|
|
||||||
for game.Outcome() == chess.NoOutcome {
|
|
||||||
num_of_moves := len(game.Moves())
|
|
||||||
if err := engine.Run(uci.CmdPosition{Position: game.Position()}, uci.CmdGo{Depth: 12}); err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
search_results := engine.SearchResults()
|
|
||||||
cp := search_results.Info.Score.CP
|
|
||||||
if (num_of_moves % 2 == 1) {
|
|
||||||
cp *= -1
|
|
||||||
}
|
|
||||||
winning_chance := WinningChance(cp)
|
|
||||||
if (num_of_moves > 0) {
|
|
||||||
delta := prev_winning_chance - winning_chance
|
|
||||||
if (num_of_moves % 2 == 0) {
|
|
||||||
delta *= -1;
|
|
||||||
}
|
|
||||||
if delta > 0.3 {
|
|
||||||
fmt.Print("B-b-b-blunder!!")
|
|
||||||
} else if delta > 0.2 {
|
|
||||||
fmt.Print("That was a mistake.")
|
|
||||||
} else if delta > 0.1 {
|
|
||||||
fmt.Print("Meh...")
|
|
||||||
} else {
|
|
||||||
fmt.Print("Ok")
|
|
||||||
}
|
|
||||||
fmt.Printf(" (%0.2f, %0.2f, %0.2f)\n", float64(cp) / 100, winning_chance, -delta)
|
|
||||||
}
|
|
||||||
// prevprev_winning_chance = prev_winning_chance
|
|
||||||
prev_winning_chance = winning_chance
|
|
||||||
// fmt.Println(game.Position().Board().Draw())
|
|
||||||
// fmt.Println("Score (centipawns):", cp, "Winning chance:", winning_chance, "Best Move: ", search_results.BestMove)
|
|
||||||
// fmt.Println("Move: ", search_results.BestMove)
|
|
||||||
move := spongeboyahoy_vs_tomlx.Moves()[num_of_moves]
|
|
||||||
fmt.Print(num_of_moves / 2 + 1, move, "\t")
|
|
||||||
if err := game.Move(move); err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
// for {
|
|
||||||
// var move string
|
|
||||||
// fmt.Print("Move: ")
|
|
||||||
// fmt.Scanln(&move)
|
|
||||||
// if err := game.MoveStr(move); err == nil {
|
|
||||||
// break
|
|
||||||
// }
|
|
||||||
// fmt.Println("Illegal move!")
|
|
||||||
// }
|
|
||||||
}
|
|
||||||
fmt.Println(game.Outcome())
|
|
||||||
fmt.Println(game.Position().Board().Draw())
|
|
||||||
}
|
|
0
config-example.yaml
Normal file
0
config-example.yaml
Normal file
10
deploy.sh
Executable file
10
deploy.sh
Executable file
|
@ -0,0 +1,10 @@
|
||||||
|
set -e
|
||||||
|
[ -d venv ] || virtualenv venv
|
||||||
|
. venv/bin/activate
|
||||||
|
pip install build tox
|
||||||
|
tox
|
||||||
|
python -m build
|
||||||
|
ssh pi@blunderboard.igloo.icmp.camp rm -f 'blunderboard-*.whl'
|
||||||
|
scp dist/blunderboard-*.whl pi@blunderboard.igloo.icmp.camp:
|
||||||
|
ssh pi@blunderboard.igloo.icmp.camp blunderboard/venv/bin/pip uninstall -y blunderboard
|
||||||
|
ssh pi@blunderboard.igloo.icmp.camp blunderboard/venv/bin/pip install 'blunderboard-*.whl'
|
3
go.mod
3
go.mod
|
@ -1,3 +0,0 @@
|
||||||
module git.0x90.space/0x90/blunderboard
|
|
||||||
|
|
||||||
require github.com/notnil/chess v1.5.0
|
|
3
go.sum
3
go.sum
|
@ -1,3 +0,0 @@
|
||||||
github.com/ajstarks/svgo v0.0.0-20200320125537-f189e35d30ca/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw=
|
|
||||||
github.com/notnil/chess v1.5.0 h1:BcdmSGqZYhoqHsAqNpVTtPwRMOA4Sj8iZY1ZuPW4Umg=
|
|
||||||
github.com/notnil/chess v1.5.0/go.mod h1:cRuJUIBFq9Xki05TWHJxHYkC+fFpq45IWwk94DdlCrA=
|
|
6
pyproject.toml
Normal file
6
pyproject.toml
Normal file
|
@ -0,0 +1,6 @@
|
||||||
|
[build-system]
|
||||||
|
requires = [
|
||||||
|
"setuptools>=42",
|
||||||
|
"wheel"
|
||||||
|
]
|
||||||
|
build-backend = "setuptools.build_meta"
|
44
request_test.py
Normal file
44
request_test.py
Normal file
|
@ -0,0 +1,44 @@
|
||||||
|
import requests
|
||||||
|
|
||||||
|
# Replace YOUR_API_TOKEN with the API token you are using for authentication
|
||||||
|
API_TOKEN = "blunderboard-security-token"
|
||||||
|
|
||||||
|
# Set the API endpoint URL
|
||||||
|
url = "http://5.75.138.151:5000/api/get_evaluation"
|
||||||
|
wdl_api = "http://5.75.138.151:5000/api/get_wdl"
|
||||||
|
|
||||||
|
# Set the chess position and search depth
|
||||||
|
data = {
|
||||||
|
"position": "rnbqkbnr/pppppppp/8/8/8/8/PPPPPPPP/RNBQKBNR w KQkq - 0 1",
|
||||||
|
"depth": 20,
|
||||||
|
}
|
||||||
|
|
||||||
|
# Set the API token in the request header
|
||||||
|
headers = {"Authorization": API_TOKEN}
|
||||||
|
|
||||||
|
# Send a POST request to the API endpoint
|
||||||
|
response = requests.post(url, json=data, headers=headers)
|
||||||
|
|
||||||
|
# Print the response content from the server
|
||||||
|
if response.status_code == 200:
|
||||||
|
print(response.json())
|
||||||
|
|
||||||
|
else:
|
||||||
|
print("Error: " + response.json()["error"])
|
||||||
|
|
||||||
|
|
||||||
|
def api_wdl() -> str:
|
||||||
|
"""
|
||||||
|
Returns the current wdl from the REST API
|
||||||
|
:return: str
|
||||||
|
"""
|
||||||
|
# Send a POST request to the API endpoint
|
||||||
|
response2 = requests.post(wdl_api, json=data, headers=headers)
|
||||||
|
if response2.status_code == 200:
|
||||||
|
return response2.json()
|
||||||
|
else:
|
||||||
|
print("API Error: " + response2.json()["error"])
|
||||||
|
return "API Error"
|
||||||
|
|
||||||
|
|
||||||
|
print(api_wdl())
|
65
rest_api.py
Normal file
65
rest_api.py
Normal file
|
@ -0,0 +1,65 @@
|
||||||
|
from flask import Flask, request, jsonify
|
||||||
|
from stockfish import Stockfish
|
||||||
|
|
||||||
|
app = Flask(__name__)
|
||||||
|
|
||||||
|
# Replace YOUR_API_TOKEN with the API token you want to use for authentication
|
||||||
|
API_TOKEN = "blunderboard-security-token"
|
||||||
|
|
||||||
|
default_engine_settings = {
|
||||||
|
"Debug Log File": "stocklog.txt",
|
||||||
|
"Contempt": 0,
|
||||||
|
"Min Split Depth": 0,
|
||||||
|
"Threads": 2,
|
||||||
|
# More threads will make the engine stronger, but should be kept at less than
|
||||||
|
# the number of logical processors on your computer.
|
||||||
|
"Ponder": "false",
|
||||||
|
"Hash": 516,
|
||||||
|
# Default size is 16 MB. It's recommended that you increase this value, but keep
|
||||||
|
# it as some power of 2. E.g., if you're fine using 2 GB of RAM, set Hash to
|
||||||
|
# 2048 (11th power of 2).
|
||||||
|
"MultiPV": 1,
|
||||||
|
"Skill Level": 20,
|
||||||
|
"Move Overhead": 10,
|
||||||
|
"Minimum Thinking Time": 5,
|
||||||
|
"Slow Mover": 100,
|
||||||
|
"UCI_Chess960": "false",
|
||||||
|
"UCI_LimitStrength": "false",
|
||||||
|
"UCI_Elo": 1350,
|
||||||
|
# "NNUE": "true", # TODO Find out if NNUE can be used with the python wrapper
|
||||||
|
}
|
||||||
|
|
||||||
|
# Create a Stockfish chess engine instance
|
||||||
|
engine = Stockfish(path="/usr/bin/stockfish")
|
||||||
|
|
||||||
|
# Set the engine settings
|
||||||
|
engine.update_engine_parameters(default_engine_settings)
|
||||||
|
|
||||||
|
|
||||||
|
@app.route("/api/get_evaluation", methods=["POST"])
|
||||||
|
def get_evaluation():
|
||||||
|
# Get the API token from the request header
|
||||||
|
token = request.headers.get("Authorization")
|
||||||
|
|
||||||
|
# If the API token is not provided or is invalid, return an error
|
||||||
|
if token != API_TOKEN:
|
||||||
|
return jsonify({"error": "Invalid API token"}), 401
|
||||||
|
|
||||||
|
# Get the current chess position and the desired depth from the request body
|
||||||
|
data = request.get_json()
|
||||||
|
position = data.get("position")
|
||||||
|
depth = data.get("depth")
|
||||||
|
|
||||||
|
# Set the position and search depth in the chess engine
|
||||||
|
engine.set_fen_position(position)
|
||||||
|
engine.set_depth(depth)
|
||||||
|
|
||||||
|
# Get the evaluation in centipawns
|
||||||
|
evaluation = engine.get_evaluation()
|
||||||
|
|
||||||
|
# Return the evaluation to the client
|
||||||
|
return evaluation
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
app.run(host="")
|
70
setup.cfg
Normal file
70
setup.cfg
Normal file
|
@ -0,0 +1,70 @@
|
||||||
|
[metadata]
|
||||||
|
name = blunderboard
|
||||||
|
version = 0.0.1
|
||||||
|
author = 0x90.space
|
||||||
|
author_email = people@schleuder.0x90.space
|
||||||
|
description = Blunderboard
|
||||||
|
url = https://git.0x90.space/0x90/blunderboard
|
||||||
|
project_urls =
|
||||||
|
Bug Tracker = https://git.0x90.space/0x90/blunderboard/issues
|
||||||
|
classifiers =
|
||||||
|
Programming Language :: Python :: 3 :: Only
|
||||||
|
License :: OSI Approved :: ISC License (ISCL)
|
||||||
|
Operating System :: POSIX :: Linux
|
||||||
|
Topic :: Games/Entertainment :: Board Games
|
||||||
|
|
||||||
|
[options]
|
||||||
|
package_dir =
|
||||||
|
= src
|
||||||
|
packages = find:
|
||||||
|
python_requires = >=3.9
|
||||||
|
install_requires =
|
||||||
|
pygame
|
||||||
|
RPi.GPIO
|
||||||
|
stockfish
|
||||||
|
requests
|
||||||
|
|
||||||
|
[options.packages.find]
|
||||||
|
where = src
|
||||||
|
|
||||||
|
[options.data_files]
|
||||||
|
share/blunderboard/sounds/blunder =
|
||||||
|
sounds/blunder/tts_what_a_blunder.mp3
|
||||||
|
sounds/blunder/ZONK.mp3
|
||||||
|
share/blunderboard/sounds/illegal =
|
||||||
|
sounds/illegal/alarm.mp3
|
||||||
|
|
||||||
|
[options.entry_points]
|
||||||
|
console_scripts =
|
||||||
|
blunderboard = blunderboard:main
|
||||||
|
|
||||||
|
[tox:tox]
|
||||||
|
envlist = lint
|
||||||
|
isolated_build = True
|
||||||
|
|
||||||
|
[testenv:lint]
|
||||||
|
skip_install = True
|
||||||
|
deps =
|
||||||
|
black
|
||||||
|
flake8
|
||||||
|
mypy
|
||||||
|
commands =
|
||||||
|
black --check --diff src
|
||||||
|
flake8 src
|
||||||
|
mypy src
|
||||||
|
|
||||||
|
#[testenv]
|
||||||
|
#deps =
|
||||||
|
# pytest
|
||||||
|
#commands =
|
||||||
|
# pytest tests
|
||||||
|
|
||||||
|
[flake8]
|
||||||
|
max_line_length = 88
|
||||||
|
extend_ignore = E203
|
||||||
|
|
||||||
|
[mypy]
|
||||||
|
check_untyped_defs = True
|
||||||
|
ignore_missing_imports = True
|
||||||
|
install_types = True
|
||||||
|
non_interactive = True
|
BIN
sounds/blunder/ZONK.mp3
Normal file
BIN
sounds/blunder/ZONK.mp3
Normal file
Binary file not shown.
BIN
sounds/blunder/tts_what_a_blunder.mp3
Normal file
BIN
sounds/blunder/tts_what_a_blunder.mp3
Normal file
Binary file not shown.
BIN
sounds/illegal/alarm.mp3
Normal file
BIN
sounds/illegal/alarm.mp3
Normal file
Binary file not shown.
24
src/blunderboard/__init__.py
Normal file
24
src/blunderboard/__init__.py
Normal file
|
@ -0,0 +1,24 @@
|
||||||
|
from blunderboard.blunderevaluator import BlunderEvaluator
|
||||||
|
from blunderboard.boardreader import BoardReader
|
||||||
|
from blunderboard.movegenerator import MoveGenerator
|
||||||
|
import cProfile
|
||||||
|
from pstats import SortKey
|
||||||
|
from time import sleep
|
||||||
|
|
||||||
|
|
||||||
|
def main_content():
|
||||||
|
try:
|
||||||
|
blunder_evaluator = BlunderEvaluator()
|
||||||
|
move_generator = MoveGenerator(blunder_evaluator)
|
||||||
|
reader = BoardReader(move_generator)
|
||||||
|
reader.scan()
|
||||||
|
reader.print()
|
||||||
|
while True:
|
||||||
|
reader.scan()
|
||||||
|
sleep(0.1)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
cProfile.runctx("main_content()", globals(), locals(), sort=SortKey.CUMULATIVE)
|
148
src/blunderboard/blunderevaluator.py
Normal file
148
src/blunderboard/blunderevaluator.py
Normal file
|
@ -0,0 +1,148 @@
|
||||||
|
import os
|
||||||
|
from pathlib import Path
|
||||||
|
from pygame import mixer
|
||||||
|
import random
|
||||||
|
from stockfish import Stockfish
|
||||||
|
import sys
|
||||||
|
import requests
|
||||||
|
|
||||||
|
API_TOKEN = "blunderboard-security-token"
|
||||||
|
sound_path = Path(sys.prefix) / "share" / "blunderboard" / "sounds"
|
||||||
|
api = "http://5.75.138.151:5000/api/get_evaluation"
|
||||||
|
wdl_api = "http://5.75.138.151:5000/api/get_wdl"
|
||||||
|
|
||||||
|
|
||||||
|
class BlunderEvaluator:
|
||||||
|
default_engine_settings = {
|
||||||
|
"Debug Log File": "stocklog.txt",
|
||||||
|
"Contempt": 0,
|
||||||
|
"Min Split Depth": 0,
|
||||||
|
"Threads": 1,
|
||||||
|
# More threads will make the engine stronger, but should be kept at less than
|
||||||
|
# the number of logical processors on your computer.
|
||||||
|
"Ponder": "false",
|
||||||
|
"Hash": 16,
|
||||||
|
# Default size is 16 MB. It's recommended that you increase this value, but keep
|
||||||
|
# it as some power of 2. E.g., if you're fine using 2 GB of RAM, set Hash to
|
||||||
|
# 2048 (11th power of 2).
|
||||||
|
"MultiPV": 1,
|
||||||
|
"Skill Level": 20,
|
||||||
|
"Move Overhead": 10,
|
||||||
|
"Minimum Thinking Time": 20,
|
||||||
|
"Slow Mover": 100,
|
||||||
|
"UCI_Chess960": "false",
|
||||||
|
"UCI_LimitStrength": "false",
|
||||||
|
"UCI_Elo": 1350,
|
||||||
|
# "NNUE": "true", # TODO Find out if NNUE can be used with the python wrapper
|
||||||
|
}
|
||||||
|
|
||||||
|
def __init__(self, engine_settings: dict = default_engine_settings):
|
||||||
|
self.engine = Stockfish()
|
||||||
|
self.settings = engine_settings
|
||||||
|
self.engine.update_engine_parameters(self.settings)
|
||||||
|
self.engine.set_position()
|
||||||
|
self.current_evaluation = self.api_evaluation()
|
||||||
|
self.evaluations: list[dict] = []
|
||||||
|
self.current_wdl = self.api_wdl()
|
||||||
|
self.wdls: list = []
|
||||||
|
self.current_fen = self.engine.get_fen_position()
|
||||||
|
self.white_to_move = True
|
||||||
|
|
||||||
|
def reset(self):
|
||||||
|
self.engine.set_position()
|
||||||
|
self.white_to_move = True
|
||||||
|
|
||||||
|
def move(self, move) -> None:
|
||||||
|
"""
|
||||||
|
Makes a move on the board and updates the game state
|
||||||
|
:param move: str
|
||||||
|
:return: None
|
||||||
|
"""
|
||||||
|
if self.engine.is_move_correct(move):
|
||||||
|
self.engine.make_moves_from_current_position([move])
|
||||||
|
self.current_evaluation = self.api_evaluation()
|
||||||
|
self.evaluations.append(self.current_evaluation)
|
||||||
|
self.current_wdl = self.api_wdl()
|
||||||
|
self.wdls.append(self.current_wdl)
|
||||||
|
print(self.current_wdl)
|
||||||
|
print(self.current_evaluation)
|
||||||
|
print(self.get_board())
|
||||||
|
if self.move_was_blunder():
|
||||||
|
# If the played move was a blunder play a random sound from the blunder
|
||||||
|
# path
|
||||||
|
self.play_sound("blunder")
|
||||||
|
print("Blunder!")
|
||||||
|
if self.white_to_move:
|
||||||
|
self.white_to_move = False
|
||||||
|
elif not self.white_to_move:
|
||||||
|
self.white_to_move = True
|
||||||
|
else:
|
||||||
|
print("Invalid move")
|
||||||
|
self.play_sound("illegal")
|
||||||
|
|
||||||
|
def api_evaluation(self) -> dict:
|
||||||
|
"""
|
||||||
|
Returns the current evaluation from the REST API
|
||||||
|
:return: str
|
||||||
|
"""
|
||||||
|
data = {"position": self.engine.get_fen_position(), "depth": 20}
|
||||||
|
# Set the API token in the request header
|
||||||
|
headers = {"Authorization": API_TOKEN}
|
||||||
|
# Send a POST request to the API endpoint
|
||||||
|
response = requests.post(api, json=data, headers=headers)
|
||||||
|
if response.status_code == 200:
|
||||||
|
return response.json()["value"]
|
||||||
|
else:
|
||||||
|
print("API Error: " + response.json()["error"])
|
||||||
|
return {"NOPE": "PLEASE CRASH"}
|
||||||
|
|
||||||
|
def api_wdl(self) -> str:
|
||||||
|
"""
|
||||||
|
Returns the current wdl from the REST API
|
||||||
|
:return: str
|
||||||
|
"""
|
||||||
|
data = {"position": self.engine.get_fen_position(), "depth": 20}
|
||||||
|
# Set the API token in the request header
|
||||||
|
headers = {"Authorization": API_TOKEN}
|
||||||
|
# Send a POST request to the API endpoint
|
||||||
|
response = requests.post(wdl_api, json=data, headers=headers)
|
||||||
|
if response.status_code == 200:
|
||||||
|
return response.json()
|
||||||
|
else:
|
||||||
|
print("API Error: " + response.json()["error"])
|
||||||
|
return "API Error"
|
||||||
|
|
||||||
|
def move_was_blunder(self) -> bool:
|
||||||
|
"""
|
||||||
|
Returns true if the last move was a blunder
|
||||||
|
:return: bool
|
||||||
|
"""
|
||||||
|
if len(self.wdls) > 1: # Don't check for blunders on the first move
|
||||||
|
previous_wdl = self.wdls[len(self.evaluations) - 2]
|
||||||
|
if abs(previous_wdl[0] - self.current_wdl[2]) > 300:
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
return False
|
||||||
|
return False
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def play_sound(move_type: str) -> None:
|
||||||
|
"""
|
||||||
|
Plays a random sound for the type of move (blunder, illegal)
|
||||||
|
:param move_type: str
|
||||||
|
:return: None
|
||||||
|
"""
|
||||||
|
path = sound_path / move_type
|
||||||
|
mixer.init()
|
||||||
|
mixer.music.load(path / random.choice(os.listdir(path)))
|
||||||
|
mixer.music.play()
|
||||||
|
# while mixer.music.get_busy():
|
||||||
|
# time.sleep(0.)
|
||||||
|
# I guess we won't want this, since it will block the main thread.
|
||||||
|
|
||||||
|
def get_board(self) -> str:
|
||||||
|
"""
|
||||||
|
Returns the current board state
|
||||||
|
:return: str
|
||||||
|
"""
|
||||||
|
return self.engine.get_board_visual()
|
123
src/blunderboard/boardreader.py
Normal file
123
src/blunderboard/boardreader.py
Normal file
|
@ -0,0 +1,123 @@
|
||||||
|
from blunderboard.movegenerator import MoveGenerator
|
||||||
|
import RPi.GPIO as gpio
|
||||||
|
|
||||||
|
|
||||||
|
class BoardReader:
|
||||||
|
hysteresis = 8
|
||||||
|
default_gpio_mode = gpio.BCM
|
||||||
|
default_row_gpios = [4, 5, 6, 12, 13, 16, 17, 19]
|
||||||
|
default_column_gpios = [20, 21, 22, 23, 24, 25, 26, 27]
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
move_generator: MoveGenerator,
|
||||||
|
row_gpios: list[int] = default_row_gpios,
|
||||||
|
column_gpios: list[int] = default_column_gpios,
|
||||||
|
gpio_mode=default_gpio_mode,
|
||||||
|
):
|
||||||
|
gpio.setmode(gpio_mode)
|
||||||
|
gpio.setup(column_gpios, gpio.IN, pull_up_down=gpio.PUD_DOWN)
|
||||||
|
gpio.setup(row_gpios, gpio.OUT, initial=gpio.LOW)
|
||||||
|
self.column_gpios = column_gpios
|
||||||
|
self.row_gpios = row_gpios
|
||||||
|
self.board_history: list[list[list[str]]] = []
|
||||||
|
for _ in range(self.hysteresis):
|
||||||
|
self.board_history.append(self._initial_board())
|
||||||
|
self.move_generator = move_generator
|
||||||
|
|
||||||
|
def __del__(self):
|
||||||
|
gpio.cleanup()
|
||||||
|
|
||||||
|
def _empty_board(self) -> list[list[str]]:
|
||||||
|
board = []
|
||||||
|
for i in range(8):
|
||||||
|
board.append([" "] * 8)
|
||||||
|
return board
|
||||||
|
|
||||||
|
def _initial_board(self) -> list[list[str]]:
|
||||||
|
board = []
|
||||||
|
for i in range(2):
|
||||||
|
board.append(["x"] * 8)
|
||||||
|
for i in range(2, 6):
|
||||||
|
board.append([" "] * 8)
|
||||||
|
for i in range(6, 8):
|
||||||
|
board.append(["x"] * 8)
|
||||||
|
return board
|
||||||
|
|
||||||
|
def _is_initial_board(self, board) -> bool:
|
||||||
|
initial_board = self._initial_board()
|
||||||
|
for i, row in enumerate(board):
|
||||||
|
for j, field in enumerate(row):
|
||||||
|
if field != initial_board[i][j]:
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
def scan(self) -> None:
|
||||||
|
next_board = self._empty_board()
|
||||||
|
for i, row_gpio in enumerate(self.row_gpios):
|
||||||
|
gpio.output(row_gpio, gpio.HIGH)
|
||||||
|
for j, column_gpio in enumerate(self.column_gpios):
|
||||||
|
if gpio.input(column_gpio):
|
||||||
|
next_board[i][j] = "x"
|
||||||
|
gpio.output(row_gpio, gpio.LOW)
|
||||||
|
self.board_history = [next_board] + self.board_history[:-1]
|
||||||
|
|
||||||
|
# if the oldest half of the board history is not in inital position but all
|
||||||
|
# newer boards are, reset game state
|
||||||
|
for board in self.board_history[self.hysteresis // 2 :]:
|
||||||
|
if self._is_initial_board(board):
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
for board in self.board_history[: self.hysteresis // 2]:
|
||||||
|
if not self._is_initial_board(board):
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
self.print()
|
||||||
|
self.move_generator.reset()
|
||||||
|
return
|
||||||
|
|
||||||
|
for i in range(8):
|
||||||
|
for j in range(8):
|
||||||
|
# if the oldest half of the board history has a piece but no newer
|
||||||
|
# boards have it, the piece was removed
|
||||||
|
for board in self.board_history[self.hysteresis // 2 :]:
|
||||||
|
if board[i][j] == " ":
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
for board in self.board_history[: self.hysteresis // 2]:
|
||||||
|
if board[i][j] == "x":
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
self.print()
|
||||||
|
self.move_generator.take(i, j)
|
||||||
|
for i in range(8):
|
||||||
|
for j in range(8):
|
||||||
|
# if the oldest half of the board history doesn't have a piece but all
|
||||||
|
# newer boards have it, the piece was placed
|
||||||
|
for board in self.board_history[self.hysteresis // 2 :]:
|
||||||
|
if board[i][j] == "x":
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
for board in self.board_history[: self.hysteresis // 2]:
|
||||||
|
if board[i][j] == " ":
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
self.print()
|
||||||
|
self.move_generator.put(i, j)
|
||||||
|
|
||||||
|
def _print(self, board) -> None:
|
||||||
|
print(" a b c d e f g h")
|
||||||
|
print(" +---------------+")
|
||||||
|
for i, row in reversed(list(enumerate(board))):
|
||||||
|
print("%d|" % (i + 1), end="")
|
||||||
|
for j, field in enumerate(row):
|
||||||
|
print(field, end="")
|
||||||
|
if j == 7:
|
||||||
|
print("|%d" % (i + 1))
|
||||||
|
else:
|
||||||
|
print(" ", end="")
|
||||||
|
print(" +---------------+")
|
||||||
|
print(" a b c d e f g h")
|
||||||
|
|
||||||
|
def print(self) -> None:
|
||||||
|
self._print(self.board_history[0])
|
0
src/blunderboard/commentator.py
Normal file
0
src/blunderboard/commentator.py
Normal file
0
src/blunderboard/config.py
Normal file
0
src/blunderboard/config.py
Normal file
178
src/blunderboard/movegenerator.py
Normal file
178
src/blunderboard/movegenerator.py
Normal file
|
@ -0,0 +1,178 @@
|
||||||
|
from blunderboard.blunderevaluator import BlunderEvaluator
|
||||||
|
|
||||||
|
|
||||||
|
def coords_to_field(row: int, column: int):
|
||||||
|
columns = "abcdefgh"
|
||||||
|
return "%c%d" % (columns[column], row + 1)
|
||||||
|
|
||||||
|
|
||||||
|
class MoveGenerator:
|
||||||
|
def __init__(self, blunder_evaluator: BlunderEvaluator):
|
||||||
|
self.state: State = InitState(blunder_evaluator)
|
||||||
|
|
||||||
|
def reset(self) -> None:
|
||||||
|
print("reset")
|
||||||
|
self.state = self.state.reset()
|
||||||
|
|
||||||
|
def put(self, row: int, column: int) -> None:
|
||||||
|
print("put %s" % coords_to_field(row, column))
|
||||||
|
self.state = self.state.put(row, column)
|
||||||
|
|
||||||
|
def take(self, row: int, column: int) -> None:
|
||||||
|
print("take %s" % coords_to_field(row, column))
|
||||||
|
self.state = self.state.take(row, column)
|
||||||
|
|
||||||
|
|
||||||
|
class State:
|
||||||
|
def __init__(self, blunder_evaluator: BlunderEvaluator):
|
||||||
|
self.blunder_evaluator = blunder_evaluator
|
||||||
|
|
||||||
|
def reset(self) -> "State":
|
||||||
|
self.blunder_evaluator.reset()
|
||||||
|
return InitState(self.blunder_evaluator)
|
||||||
|
|
||||||
|
def put(self, row: int, column: int) -> "State":
|
||||||
|
print("ignored invalid put")
|
||||||
|
return self
|
||||||
|
|
||||||
|
def take(self, row: int, column: int) -> "State":
|
||||||
|
print("ignored invalid take")
|
||||||
|
return self
|
||||||
|
|
||||||
|
|
||||||
|
class InitState(State):
|
||||||
|
def reset(self) -> State:
|
||||||
|
super().reset()
|
||||||
|
return self
|
||||||
|
|
||||||
|
def take(self, row: int, column: int) -> State:
|
||||||
|
return TakeState(self.blunder_evaluator, coords_to_field(row, column))
|
||||||
|
|
||||||
|
|
||||||
|
class TakeState(State):
|
||||||
|
def __init__(self, blunder_evaluator: BlunderEvaluator, from_field: str):
|
||||||
|
super().__init__(blunder_evaluator)
|
||||||
|
self.from_field = from_field
|
||||||
|
|
||||||
|
def put(self, row: int, column: int) -> State:
|
||||||
|
to_field = coords_to_field(row, column)
|
||||||
|
if self.from_field == to_field:
|
||||||
|
print("ignored self-move")
|
||||||
|
return InitState(self.blunder_evaluator)
|
||||||
|
move = self.from_field + to_field
|
||||||
|
print("move %s" % move)
|
||||||
|
self.blunder_evaluator.move(move)
|
||||||
|
return InitState(self.blunder_evaluator)
|
||||||
|
|
||||||
|
def take(self, row: int, column: int) -> State:
|
||||||
|
return TakeTakeState(
|
||||||
|
self.blunder_evaluator, self.from_field, coords_to_field(row, column)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class TakeTakeState(State):
|
||||||
|
def __init__(
|
||||||
|
self, blunder_evaluator: BlunderEvaluator, from_field: str, from2_field: str
|
||||||
|
):
|
||||||
|
super().__init__(blunder_evaluator)
|
||||||
|
self.from_field = from_field
|
||||||
|
self.from2_field = from2_field
|
||||||
|
|
||||||
|
def put(self, row: int, column: int) -> State:
|
||||||
|
to_field = coords_to_field(row, column)
|
||||||
|
if self.from2_field == to_field:
|
||||||
|
move = self.from_field + to_field
|
||||||
|
elif self.from_field == to_field:
|
||||||
|
move = self.from2_field + to_field
|
||||||
|
elif (
|
||||||
|
self.from_field[1] == self.from2_field[1]
|
||||||
|
and self.from_field[1] == to_field[1]
|
||||||
|
):
|
||||||
|
# king-side castling
|
||||||
|
if (
|
||||||
|
self.from_field in ["e1", "e8"]
|
||||||
|
and self.from2_field in ["h1", "h8"]
|
||||||
|
and to_field in ["f1", "f8", "g1", "g8"]
|
||||||
|
):
|
||||||
|
return KingCastleState(
|
||||||
|
self.blunder_evaluator, self.from_field, self.from2_field, to_field
|
||||||
|
)
|
||||||
|
# queen-side castling
|
||||||
|
if (
|
||||||
|
self.from_field in ["e1", "e8"]
|
||||||
|
and self.from2_field in ["a1", "a8"]
|
||||||
|
and to_field in ["c1", "c8", "d1", "d8"]
|
||||||
|
):
|
||||||
|
return QueenCastleState(
|
||||||
|
self.blunder_evaluator, self.from_field, self.from2_field, to_field
|
||||||
|
)
|
||||||
|
print("ignored invalid put")
|
||||||
|
return self
|
||||||
|
else:
|
||||||
|
print("ignored invalid put")
|
||||||
|
return self
|
||||||
|
print("move %s" % move)
|
||||||
|
self.blunder_evaluator.move(move)
|
||||||
|
return InitState(self.blunder_evaluator)
|
||||||
|
|
||||||
|
|
||||||
|
class KingCastleState(State):
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
blunder_evaluator: BlunderEvaluator,
|
||||||
|
from_field: str,
|
||||||
|
from2_field: str,
|
||||||
|
to_field: str,
|
||||||
|
):
|
||||||
|
super().__init__(blunder_evaluator)
|
||||||
|
self.from_field = from_field
|
||||||
|
self.from2_field = from2_field
|
||||||
|
self.to_field = to_field
|
||||||
|
|
||||||
|
def put(self, row: int, column: int) -> State:
|
||||||
|
to2_field = coords_to_field(row, column)
|
||||||
|
if self.to_field[1] == to2_field[1]:
|
||||||
|
if to2_field in ["f1", "g1"]:
|
||||||
|
move = "e1g1"
|
||||||
|
elif to2_field in ["f8", "g8"]:
|
||||||
|
move = "e8g8"
|
||||||
|
else:
|
||||||
|
print("ignored invalid put")
|
||||||
|
return self
|
||||||
|
else:
|
||||||
|
print("ignored invalid put")
|
||||||
|
return self
|
||||||
|
print("move %s" % move)
|
||||||
|
self.blunder_evaluator.move(move)
|
||||||
|
return InitState(self.blunder_evaluator)
|
||||||
|
|
||||||
|
|
||||||
|
class QueenCastleState(State):
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
blunder_evaluator: BlunderEvaluator,
|
||||||
|
from_field: str,
|
||||||
|
from2_field: str,
|
||||||
|
to_field: str,
|
||||||
|
):
|
||||||
|
super().__init__(blunder_evaluator)
|
||||||
|
self.from_field = from_field
|
||||||
|
self.from2_field = from2_field
|
||||||
|
self.to_field = to_field
|
||||||
|
|
||||||
|
def put(self, row: int, column: int) -> State:
|
||||||
|
to2_field = coords_to_field(row, column)
|
||||||
|
if self.to_field[1] == to2_field[1]:
|
||||||
|
if to2_field in ["c1", "d1"]:
|
||||||
|
move = "e1c1"
|
||||||
|
elif to2_field in ["c8", "d8"]:
|
||||||
|
move = "e8c8"
|
||||||
|
else:
|
||||||
|
print("ignored invalid put")
|
||||||
|
return self
|
||||||
|
else:
|
||||||
|
print("ignored invalid put")
|
||||||
|
return self
|
||||||
|
print("move %s" % move)
|
||||||
|
self.blunder_evaluator.move(move)
|
||||||
|
return InitState(self.blunder_evaluator)
|
Loading…
Reference in a new issue