Compare commits
No commits in common. "2c9b43375e5a0479aa621ee181659c6ef9e9479f" and "fbb534c395cf8cc1ae32e9ea09b059f6a893e3dc" have entirely different histories.
2c9b43375e
...
fbb534c395
|
|
@ -1,10 +0,0 @@
|
||||||
root = true
|
|
||||||
|
|
||||||
[*]
|
|
||||||
end_of_line = lf
|
|
||||||
insert_final_newline = true
|
|
||||||
trim_trailing_whitespace = true
|
|
||||||
charset = utf-8
|
|
||||||
indent_style = space
|
|
||||||
indent_size = 4
|
|
||||||
max_line_length = 88
|
|
||||||
21
.gitignore
vendored
21
.gitignore
vendored
|
|
@ -1,3 +1,18 @@
|
||||||
/dist/
|
.*.swp
|
||||||
/venv/
|
|
||||||
*.egg-info/
|
# Binaries for programs and plugins
|
||||||
|
*.exe
|
||||||
|
*.exe~
|
||||||
|
*.dll
|
||||||
|
*.so
|
||||||
|
*.dylib
|
||||||
|
blunderboard
|
||||||
|
|
||||||
|
# Test binary, built with `go test -c`
|
||||||
|
*.test
|
||||||
|
|
||||||
|
# Output of the go coverage tool, specifically when used with LiteIDE
|
||||||
|
*.out
|
||||||
|
|
||||||
|
# Dependency directories (remove the comment below to include it)
|
||||||
|
# vendor/
|
||||||
|
|
|
||||||
91
blunderboard.go
Normal file
91
blunderboard.go
Normal file
|
|
@ -0,0 +1,91 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"github.com/notnil/chess"
|
||||||
|
"github.com/notnil/chess/uci"
|
||||||
|
"math"
|
||||||
|
"os"
|
||||||
|
)
|
||||||
|
|
||||||
|
// stolen^H^H inspired from lichess https://github.com/ornicar/lila/blob/master/modules/analyse/src/main/Advice.scala#L79
|
||||||
|
func WinningChance(cp int) float64 {
|
||||||
|
winning_chance := 2 / (1 + math.Exp(-0.004 * float64(cp))) - 1
|
||||||
|
return winning_chance
|
||||||
|
}
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
reader, err := os.Open("spongeboyahoy_vs_tomlx.pgn")
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
pgn, err := chess.PGN(reader)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
spongeboyahoy_vs_tomlx := chess.NewGame(pgn)
|
||||||
|
fmt.Println(spongeboyahoy_vs_tomlx)
|
||||||
|
|
||||||
|
engine, err := uci.New("stockfish")
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
defer engine.Close()
|
||||||
|
|
||||||
|
if err := engine.Run(uci.CmdUCI, uci.CmdIsReady, uci.CmdUCINewGame); err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
game := chess.NewGame()
|
||||||
|
// prevprev_winning_chance := 0.0
|
||||||
|
prev_winning_chance := 0.0
|
||||||
|
for game.Outcome() == chess.NoOutcome {
|
||||||
|
num_of_moves := len(game.Moves())
|
||||||
|
if err := engine.Run(uci.CmdPosition{Position: game.Position()}, uci.CmdGo{Depth: 12}); err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
search_results := engine.SearchResults()
|
||||||
|
cp := search_results.Info.Score.CP
|
||||||
|
if (num_of_moves % 2 == 1) {
|
||||||
|
cp *= -1
|
||||||
|
}
|
||||||
|
winning_chance := WinningChance(cp)
|
||||||
|
if (num_of_moves > 0) {
|
||||||
|
delta := prev_winning_chance - winning_chance
|
||||||
|
if (num_of_moves % 2 == 0) {
|
||||||
|
delta *= -1;
|
||||||
|
}
|
||||||
|
if delta > 0.3 {
|
||||||
|
fmt.Print("B-b-b-blunder!!")
|
||||||
|
} else if delta > 0.2 {
|
||||||
|
fmt.Print("That was a mistake.")
|
||||||
|
} else if delta > 0.1 {
|
||||||
|
fmt.Print("Meh...")
|
||||||
|
} else {
|
||||||
|
fmt.Print("Ok")
|
||||||
|
}
|
||||||
|
fmt.Printf(" (%0.2f, %0.2f, %0.2f)\n", float64(cp) / 100, winning_chance, -delta)
|
||||||
|
}
|
||||||
|
// prevprev_winning_chance = prev_winning_chance
|
||||||
|
prev_winning_chance = winning_chance
|
||||||
|
// fmt.Println(game.Position().Board().Draw())
|
||||||
|
// fmt.Println("Score (centipawns):", cp, "Winning chance:", winning_chance, "Best Move: ", search_results.BestMove)
|
||||||
|
// fmt.Println("Move: ", search_results.BestMove)
|
||||||
|
move := spongeboyahoy_vs_tomlx.Moves()[num_of_moves]
|
||||||
|
fmt.Print(num_of_moves / 2 + 1, move, "\t")
|
||||||
|
if err := game.Move(move); err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
// for {
|
||||||
|
// var move string
|
||||||
|
// fmt.Print("Move: ")
|
||||||
|
// fmt.Scanln(&move)
|
||||||
|
// if err := game.MoveStr(move); err == nil {
|
||||||
|
// break
|
||||||
|
// }
|
||||||
|
// fmt.Println("Illegal move!")
|
||||||
|
// }
|
||||||
|
}
|
||||||
|
fmt.Println(game.Outcome())
|
||||||
|
fmt.Println(game.Position().Board().Draw())
|
||||||
|
}
|
||||||
10
deploy.sh
10
deploy.sh
|
|
@ -1,10 +0,0 @@
|
||||||
set -e
|
|
||||||
[ -d venv ] || virtualenv venv
|
|
||||||
. venv/bin/activate
|
|
||||||
pip install build tox
|
|
||||||
tox
|
|
||||||
python -m build
|
|
||||||
ssh pi@blunderboard.igloo.icmp.camp rm -f 'blunderboard-*.whl'
|
|
||||||
scp dist/blunderboard-*.whl pi@blunderboard.igloo.icmp.camp:
|
|
||||||
ssh pi@blunderboard.igloo.icmp.camp blunderboard/venv/bin/pip uninstall -y blunderboard
|
|
||||||
ssh pi@blunderboard.igloo.icmp.camp blunderboard/venv/bin/pip install 'blunderboard-*.whl'
|
|
||||||
3
go.mod
Normal file
3
go.mod
Normal file
|
|
@ -0,0 +1,3 @@
|
||||||
|
module git.0x90.space/0x90/blunderboard
|
||||||
|
|
||||||
|
require github.com/notnil/chess v1.5.0
|
||||||
3
go.sum
Normal file
3
go.sum
Normal file
|
|
@ -0,0 +1,3 @@
|
||||||
|
github.com/ajstarks/svgo v0.0.0-20200320125537-f189e35d30ca/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw=
|
||||||
|
github.com/notnil/chess v1.5.0 h1:BcdmSGqZYhoqHsAqNpVTtPwRMOA4Sj8iZY1ZuPW4Umg=
|
||||||
|
github.com/notnil/chess v1.5.0/go.mod h1:cRuJUIBFq9Xki05TWHJxHYkC+fFpq45IWwk94DdlCrA=
|
||||||
|
|
@ -1,6 +0,0 @@
|
||||||
[build-system]
|
|
||||||
requires = [
|
|
||||||
"setuptools>=42",
|
|
||||||
"wheel"
|
|
||||||
]
|
|
||||||
build-backend = "setuptools.build_meta"
|
|
||||||
|
|
@ -1,44 +0,0 @@
|
||||||
import requests
|
|
||||||
|
|
||||||
# Replace YOUR_API_TOKEN with the API token you are using for authentication
|
|
||||||
API_TOKEN = "blunderboard-security-token"
|
|
||||||
|
|
||||||
# Set the API endpoint URL
|
|
||||||
url = "http://5.75.138.151:5000/api/get_evaluation"
|
|
||||||
wdl_api = "http://5.75.138.151:5000/api/get_wdl"
|
|
||||||
|
|
||||||
# Set the chess position and search depth
|
|
||||||
data = {
|
|
||||||
"position": "rnbqkbnr/pppppppp/8/8/8/8/PPPPPPPP/RNBQKBNR w KQkq - 0 1",
|
|
||||||
"depth": 20,
|
|
||||||
}
|
|
||||||
|
|
||||||
# Set the API token in the request header
|
|
||||||
headers = {"Authorization": API_TOKEN}
|
|
||||||
|
|
||||||
# Send a POST request to the API endpoint
|
|
||||||
response = requests.post(url, json=data, headers=headers)
|
|
||||||
|
|
||||||
# Print the response content from the server
|
|
||||||
if response.status_code == 200:
|
|
||||||
print(response.json())
|
|
||||||
|
|
||||||
else:
|
|
||||||
print("Error: " + response.json()["error"])
|
|
||||||
|
|
||||||
|
|
||||||
def api_wdl() -> str:
|
|
||||||
"""
|
|
||||||
Returns the current wdl from the REST API
|
|
||||||
:return: str
|
|
||||||
"""
|
|
||||||
# Send a POST request to the API endpoint
|
|
||||||
response2 = requests.post(wdl_api, json=data, headers=headers)
|
|
||||||
if response2.status_code == 200:
|
|
||||||
return response2.json()
|
|
||||||
else:
|
|
||||||
print("API Error: " + response2.json()["error"])
|
|
||||||
return "API Error"
|
|
||||||
|
|
||||||
|
|
||||||
print(api_wdl())
|
|
||||||
65
rest_api.py
65
rest_api.py
|
|
@ -1,65 +0,0 @@
|
||||||
from flask import Flask, request, jsonify
|
|
||||||
from stockfish import Stockfish
|
|
||||||
|
|
||||||
app = Flask(__name__)
|
|
||||||
|
|
||||||
# Replace YOUR_API_TOKEN with the API token you want to use for authentication
|
|
||||||
API_TOKEN = "blunderboard-security-token"
|
|
||||||
|
|
||||||
default_engine_settings = {
|
|
||||||
"Debug Log File": "stocklog.txt",
|
|
||||||
"Contempt": 0,
|
|
||||||
"Min Split Depth": 0,
|
|
||||||
"Threads": 2,
|
|
||||||
# More threads will make the engine stronger, but should be kept at less than
|
|
||||||
# the number of logical processors on your computer.
|
|
||||||
"Ponder": "false",
|
|
||||||
"Hash": 516,
|
|
||||||
# Default size is 16 MB. It's recommended that you increase this value, but keep
|
|
||||||
# it as some power of 2. E.g., if you're fine using 2 GB of RAM, set Hash to
|
|
||||||
# 2048 (11th power of 2).
|
|
||||||
"MultiPV": 1,
|
|
||||||
"Skill Level": 20,
|
|
||||||
"Move Overhead": 10,
|
|
||||||
"Minimum Thinking Time": 5,
|
|
||||||
"Slow Mover": 100,
|
|
||||||
"UCI_Chess960": "false",
|
|
||||||
"UCI_LimitStrength": "false",
|
|
||||||
"UCI_Elo": 1350,
|
|
||||||
# "NNUE": "true", # TODO Find out if NNUE can be used with the python wrapper
|
|
||||||
}
|
|
||||||
|
|
||||||
# Create a Stockfish chess engine instance
|
|
||||||
engine = Stockfish(path="/usr/bin/stockfish")
|
|
||||||
|
|
||||||
# Set the engine settings
|
|
||||||
engine.update_engine_parameters(default_engine_settings)
|
|
||||||
|
|
||||||
|
|
||||||
@app.route("/api/get_evaluation", methods=["POST"])
|
|
||||||
def get_evaluation():
|
|
||||||
# Get the API token from the request header
|
|
||||||
token = request.headers.get("Authorization")
|
|
||||||
|
|
||||||
# If the API token is not provided or is invalid, return an error
|
|
||||||
if token != API_TOKEN:
|
|
||||||
return jsonify({"error": "Invalid API token"}), 401
|
|
||||||
|
|
||||||
# Get the current chess position and the desired depth from the request body
|
|
||||||
data = request.get_json()
|
|
||||||
position = data.get("position")
|
|
||||||
depth = data.get("depth")
|
|
||||||
|
|
||||||
# Set the position and search depth in the chess engine
|
|
||||||
engine.set_fen_position(position)
|
|
||||||
engine.set_depth(depth)
|
|
||||||
|
|
||||||
# Get the evaluation in centipawns
|
|
||||||
evaluation = engine.get_evaluation()
|
|
||||||
|
|
||||||
# Return the evaluation to the client
|
|
||||||
return evaluation
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
app.run(host="")
|
|
||||||
70
setup.cfg
70
setup.cfg
|
|
@ -1,70 +0,0 @@
|
||||||
[metadata]
|
|
||||||
name = blunderboard
|
|
||||||
version = 0.0.1
|
|
||||||
author = 0x90.space
|
|
||||||
author_email = people@schleuder.0x90.space
|
|
||||||
description = Blunderboard
|
|
||||||
url = https://git.0x90.space/0x90/blunderboard
|
|
||||||
project_urls =
|
|
||||||
Bug Tracker = https://git.0x90.space/0x90/blunderboard/issues
|
|
||||||
classifiers =
|
|
||||||
Programming Language :: Python :: 3 :: Only
|
|
||||||
License :: OSI Approved :: ISC License (ISCL)
|
|
||||||
Operating System :: POSIX :: Linux
|
|
||||||
Topic :: Games/Entertainment :: Board Games
|
|
||||||
|
|
||||||
[options]
|
|
||||||
package_dir =
|
|
||||||
= src
|
|
||||||
packages = find:
|
|
||||||
python_requires = >=3.9
|
|
||||||
install_requires =
|
|
||||||
pygame
|
|
||||||
RPi.GPIO
|
|
||||||
stockfish
|
|
||||||
requests
|
|
||||||
|
|
||||||
[options.packages.find]
|
|
||||||
where = src
|
|
||||||
|
|
||||||
[options.data_files]
|
|
||||||
share/blunderboard/sounds/blunder =
|
|
||||||
sounds/blunder/tts_what_a_blunder.mp3
|
|
||||||
sounds/blunder/ZONK.mp3
|
|
||||||
share/blunderboard/sounds/illegal =
|
|
||||||
sounds/illegal/alarm.mp3
|
|
||||||
|
|
||||||
[options.entry_points]
|
|
||||||
console_scripts =
|
|
||||||
blunderboard = blunderboard:main
|
|
||||||
|
|
||||||
[tox:tox]
|
|
||||||
envlist = lint
|
|
||||||
isolated_build = True
|
|
||||||
|
|
||||||
[testenv:lint]
|
|
||||||
skip_install = True
|
|
||||||
deps =
|
|
||||||
black
|
|
||||||
flake8
|
|
||||||
mypy
|
|
||||||
commands =
|
|
||||||
black --check --diff src
|
|
||||||
flake8 src
|
|
||||||
mypy src
|
|
||||||
|
|
||||||
#[testenv]
|
|
||||||
#deps =
|
|
||||||
# pytest
|
|
||||||
#commands =
|
|
||||||
# pytest tests
|
|
||||||
|
|
||||||
[flake8]
|
|
||||||
max_line_length = 88
|
|
||||||
extend_ignore = E203
|
|
||||||
|
|
||||||
[mypy]
|
|
||||||
check_untyped_defs = True
|
|
||||||
ignore_missing_imports = True
|
|
||||||
install_types = True
|
|
||||||
non_interactive = True
|
|
||||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
|
|
@ -1,24 +0,0 @@
|
||||||
from blunderboard.blunderevaluator import BlunderEvaluator
|
|
||||||
from blunderboard.boardreader import BoardReader
|
|
||||||
from blunderboard.movegenerator import MoveGenerator
|
|
||||||
import cProfile
|
|
||||||
from pstats import SortKey
|
|
||||||
from time import sleep
|
|
||||||
|
|
||||||
|
|
||||||
def main_content():
|
|
||||||
try:
|
|
||||||
blunder_evaluator = BlunderEvaluator()
|
|
||||||
move_generator = MoveGenerator(blunder_evaluator)
|
|
||||||
reader = BoardReader(move_generator)
|
|
||||||
reader.scan()
|
|
||||||
reader.print()
|
|
||||||
while True:
|
|
||||||
reader.scan()
|
|
||||||
sleep(0.1)
|
|
||||||
except KeyboardInterrupt:
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
def main():
|
|
||||||
cProfile.runctx("main_content()", globals(), locals(), sort=SortKey.CUMULATIVE)
|
|
||||||
|
|
@ -1,148 +0,0 @@
|
||||||
import os
|
|
||||||
from pathlib import Path
|
|
||||||
from pygame import mixer
|
|
||||||
import random
|
|
||||||
from stockfish import Stockfish
|
|
||||||
import sys
|
|
||||||
import requests
|
|
||||||
|
|
||||||
API_TOKEN = "blunderboard-security-token"
|
|
||||||
sound_path = Path(sys.prefix) / "share" / "blunderboard" / "sounds"
|
|
||||||
api = "http://5.75.138.151:5000/api/get_evaluation"
|
|
||||||
wdl_api = "http://5.75.138.151:5000/api/get_wdl"
|
|
||||||
|
|
||||||
|
|
||||||
class BlunderEvaluator:
|
|
||||||
default_engine_settings = {
|
|
||||||
"Debug Log File": "stocklog.txt",
|
|
||||||
"Contempt": 0,
|
|
||||||
"Min Split Depth": 0,
|
|
||||||
"Threads": 1,
|
|
||||||
# More threads will make the engine stronger, but should be kept at less than
|
|
||||||
# the number of logical processors on your computer.
|
|
||||||
"Ponder": "false",
|
|
||||||
"Hash": 16,
|
|
||||||
# Default size is 16 MB. It's recommended that you increase this value, but keep
|
|
||||||
# it as some power of 2. E.g., if you're fine using 2 GB of RAM, set Hash to
|
|
||||||
# 2048 (11th power of 2).
|
|
||||||
"MultiPV": 1,
|
|
||||||
"Skill Level": 20,
|
|
||||||
"Move Overhead": 10,
|
|
||||||
"Minimum Thinking Time": 20,
|
|
||||||
"Slow Mover": 100,
|
|
||||||
"UCI_Chess960": "false",
|
|
||||||
"UCI_LimitStrength": "false",
|
|
||||||
"UCI_Elo": 1350,
|
|
||||||
# "NNUE": "true", # TODO Find out if NNUE can be used with the python wrapper
|
|
||||||
}
|
|
||||||
|
|
||||||
def __init__(self, engine_settings: dict = default_engine_settings):
|
|
||||||
self.engine = Stockfish()
|
|
||||||
self.settings = engine_settings
|
|
||||||
self.engine.update_engine_parameters(self.settings)
|
|
||||||
self.engine.set_position()
|
|
||||||
self.current_evaluation = self.api_evaluation()
|
|
||||||
self.evaluations: list[dict] = []
|
|
||||||
self.current_wdl = self.api_wdl()
|
|
||||||
self.wdls: list = []
|
|
||||||
self.current_fen = self.engine.get_fen_position()
|
|
||||||
self.white_to_move = True
|
|
||||||
|
|
||||||
def reset(self):
|
|
||||||
self.engine.set_position()
|
|
||||||
self.white_to_move = True
|
|
||||||
|
|
||||||
def move(self, move) -> None:
|
|
||||||
"""
|
|
||||||
Makes a move on the board and updates the game state
|
|
||||||
:param move: str
|
|
||||||
:return: None
|
|
||||||
"""
|
|
||||||
if self.engine.is_move_correct(move):
|
|
||||||
self.engine.make_moves_from_current_position([move])
|
|
||||||
self.current_evaluation = self.api_evaluation()
|
|
||||||
self.evaluations.append(self.current_evaluation)
|
|
||||||
self.current_wdl = self.api_wdl()
|
|
||||||
self.wdls.append(self.current_wdl)
|
|
||||||
print(self.current_wdl)
|
|
||||||
print(self.current_evaluation)
|
|
||||||
print(self.get_board())
|
|
||||||
if self.move_was_blunder():
|
|
||||||
# If the played move was a blunder play a random sound from the blunder
|
|
||||||
# path
|
|
||||||
self.play_sound("blunder")
|
|
||||||
print("Blunder!")
|
|
||||||
if self.white_to_move:
|
|
||||||
self.white_to_move = False
|
|
||||||
elif not self.white_to_move:
|
|
||||||
self.white_to_move = True
|
|
||||||
else:
|
|
||||||
print("Invalid move")
|
|
||||||
self.play_sound("illegal")
|
|
||||||
|
|
||||||
def api_evaluation(self) -> dict:
|
|
||||||
"""
|
|
||||||
Returns the current evaluation from the REST API
|
|
||||||
:return: str
|
|
||||||
"""
|
|
||||||
data = {"position": self.engine.get_fen_position(), "depth": 20}
|
|
||||||
# Set the API token in the request header
|
|
||||||
headers = {"Authorization": API_TOKEN}
|
|
||||||
# Send a POST request to the API endpoint
|
|
||||||
response = requests.post(api, json=data, headers=headers)
|
|
||||||
if response.status_code == 200:
|
|
||||||
return response.json()["value"]
|
|
||||||
else:
|
|
||||||
print("API Error: " + response.json()["error"])
|
|
||||||
return {"NOPE": "PLEASE CRASH"}
|
|
||||||
|
|
||||||
def api_wdl(self) -> str:
|
|
||||||
"""
|
|
||||||
Returns the current wdl from the REST API
|
|
||||||
:return: str
|
|
||||||
"""
|
|
||||||
data = {"position": self.engine.get_fen_position(), "depth": 20}
|
|
||||||
# Set the API token in the request header
|
|
||||||
headers = {"Authorization": API_TOKEN}
|
|
||||||
# Send a POST request to the API endpoint
|
|
||||||
response = requests.post(wdl_api, json=data, headers=headers)
|
|
||||||
if response.status_code == 200:
|
|
||||||
return response.json()
|
|
||||||
else:
|
|
||||||
print("API Error: " + response.json()["error"])
|
|
||||||
return "API Error"
|
|
||||||
|
|
||||||
def move_was_blunder(self) -> bool:
|
|
||||||
"""
|
|
||||||
Returns true if the last move was a blunder
|
|
||||||
:return: bool
|
|
||||||
"""
|
|
||||||
if len(self.wdls) > 1: # Don't check for blunders on the first move
|
|
||||||
previous_wdl = self.wdls[len(self.evaluations) - 2]
|
|
||||||
if abs(previous_wdl[0] - self.current_wdl[2]) > 300:
|
|
||||||
return True
|
|
||||||
else:
|
|
||||||
return False
|
|
||||||
return False
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def play_sound(move_type: str) -> None:
|
|
||||||
"""
|
|
||||||
Plays a random sound for the type of move (blunder, illegal)
|
|
||||||
:param move_type: str
|
|
||||||
:return: None
|
|
||||||
"""
|
|
||||||
path = sound_path / move_type
|
|
||||||
mixer.init()
|
|
||||||
mixer.music.load(path / random.choice(os.listdir(path)))
|
|
||||||
mixer.music.play()
|
|
||||||
# while mixer.music.get_busy():
|
|
||||||
# time.sleep(0.)
|
|
||||||
# I guess we won't want this, since it will block the main thread.
|
|
||||||
|
|
||||||
def get_board(self) -> str:
|
|
||||||
"""
|
|
||||||
Returns the current board state
|
|
||||||
:return: str
|
|
||||||
"""
|
|
||||||
return self.engine.get_board_visual()
|
|
||||||
|
|
@ -1,123 +0,0 @@
|
||||||
from blunderboard.movegenerator import MoveGenerator
|
|
||||||
import RPi.GPIO as gpio
|
|
||||||
|
|
||||||
|
|
||||||
class BoardReader:
|
|
||||||
hysteresis = 8
|
|
||||||
default_gpio_mode = gpio.BCM
|
|
||||||
default_row_gpios = [4, 5, 6, 12, 13, 16, 17, 19]
|
|
||||||
default_column_gpios = [20, 21, 22, 23, 24, 25, 26, 27]
|
|
||||||
|
|
||||||
def __init__(
|
|
||||||
self,
|
|
||||||
move_generator: MoveGenerator,
|
|
||||||
row_gpios: list[int] = default_row_gpios,
|
|
||||||
column_gpios: list[int] = default_column_gpios,
|
|
||||||
gpio_mode=default_gpio_mode,
|
|
||||||
):
|
|
||||||
gpio.setmode(gpio_mode)
|
|
||||||
gpio.setup(column_gpios, gpio.IN, pull_up_down=gpio.PUD_DOWN)
|
|
||||||
gpio.setup(row_gpios, gpio.OUT, initial=gpio.LOW)
|
|
||||||
self.column_gpios = column_gpios
|
|
||||||
self.row_gpios = row_gpios
|
|
||||||
self.board_history: list[list[list[str]]] = []
|
|
||||||
for _ in range(self.hysteresis):
|
|
||||||
self.board_history.append(self._initial_board())
|
|
||||||
self.move_generator = move_generator
|
|
||||||
|
|
||||||
def __del__(self):
|
|
||||||
gpio.cleanup()
|
|
||||||
|
|
||||||
def _empty_board(self) -> list[list[str]]:
|
|
||||||
board = []
|
|
||||||
for i in range(8):
|
|
||||||
board.append([" "] * 8)
|
|
||||||
return board
|
|
||||||
|
|
||||||
def _initial_board(self) -> list[list[str]]:
|
|
||||||
board = []
|
|
||||||
for i in range(2):
|
|
||||||
board.append(["x"] * 8)
|
|
||||||
for i in range(2, 6):
|
|
||||||
board.append([" "] * 8)
|
|
||||||
for i in range(6, 8):
|
|
||||||
board.append(["x"] * 8)
|
|
||||||
return board
|
|
||||||
|
|
||||||
def _is_initial_board(self, board) -> bool:
|
|
||||||
initial_board = self._initial_board()
|
|
||||||
for i, row in enumerate(board):
|
|
||||||
for j, field in enumerate(row):
|
|
||||||
if field != initial_board[i][j]:
|
|
||||||
return False
|
|
||||||
return True
|
|
||||||
|
|
||||||
def scan(self) -> None:
|
|
||||||
next_board = self._empty_board()
|
|
||||||
for i, row_gpio in enumerate(self.row_gpios):
|
|
||||||
gpio.output(row_gpio, gpio.HIGH)
|
|
||||||
for j, column_gpio in enumerate(self.column_gpios):
|
|
||||||
if gpio.input(column_gpio):
|
|
||||||
next_board[i][j] = "x"
|
|
||||||
gpio.output(row_gpio, gpio.LOW)
|
|
||||||
self.board_history = [next_board] + self.board_history[:-1]
|
|
||||||
|
|
||||||
# if the oldest half of the board history is not in inital position but all
|
|
||||||
# newer boards are, reset game state
|
|
||||||
for board in self.board_history[self.hysteresis // 2 :]:
|
|
||||||
if self._is_initial_board(board):
|
|
||||||
break
|
|
||||||
else:
|
|
||||||
for board in self.board_history[: self.hysteresis // 2]:
|
|
||||||
if not self._is_initial_board(board):
|
|
||||||
break
|
|
||||||
else:
|
|
||||||
self.print()
|
|
||||||
self.move_generator.reset()
|
|
||||||
return
|
|
||||||
|
|
||||||
for i in range(8):
|
|
||||||
for j in range(8):
|
|
||||||
# if the oldest half of the board history has a piece but no newer
|
|
||||||
# boards have it, the piece was removed
|
|
||||||
for board in self.board_history[self.hysteresis // 2 :]:
|
|
||||||
if board[i][j] == " ":
|
|
||||||
break
|
|
||||||
else:
|
|
||||||
for board in self.board_history[: self.hysteresis // 2]:
|
|
||||||
if board[i][j] == "x":
|
|
||||||
break
|
|
||||||
else:
|
|
||||||
self.print()
|
|
||||||
self.move_generator.take(i, j)
|
|
||||||
for i in range(8):
|
|
||||||
for j in range(8):
|
|
||||||
# if the oldest half of the board history doesn't have a piece but all
|
|
||||||
# newer boards have it, the piece was placed
|
|
||||||
for board in self.board_history[self.hysteresis // 2 :]:
|
|
||||||
if board[i][j] == "x":
|
|
||||||
break
|
|
||||||
else:
|
|
||||||
for board in self.board_history[: self.hysteresis // 2]:
|
|
||||||
if board[i][j] == " ":
|
|
||||||
break
|
|
||||||
else:
|
|
||||||
self.print()
|
|
||||||
self.move_generator.put(i, j)
|
|
||||||
|
|
||||||
def _print(self, board) -> None:
|
|
||||||
print(" a b c d e f g h")
|
|
||||||
print(" +---------------+")
|
|
||||||
for i, row in reversed(list(enumerate(board))):
|
|
||||||
print("%d|" % (i + 1), end="")
|
|
||||||
for j, field in enumerate(row):
|
|
||||||
print(field, end="")
|
|
||||||
if j == 7:
|
|
||||||
print("|%d" % (i + 1))
|
|
||||||
else:
|
|
||||||
print(" ", end="")
|
|
||||||
print(" +---------------+")
|
|
||||||
print(" a b c d e f g h")
|
|
||||||
|
|
||||||
def print(self) -> None:
|
|
||||||
self._print(self.board_history[0])
|
|
||||||
|
|
@ -1,178 +0,0 @@
|
||||||
from blunderboard.blunderevaluator import BlunderEvaluator
|
|
||||||
|
|
||||||
|
|
||||||
def coords_to_field(row: int, column: int):
|
|
||||||
columns = "abcdefgh"
|
|
||||||
return "%c%d" % (columns[column], row + 1)
|
|
||||||
|
|
||||||
|
|
||||||
class MoveGenerator:
|
|
||||||
def __init__(self, blunder_evaluator: BlunderEvaluator):
|
|
||||||
self.state: State = InitState(blunder_evaluator)
|
|
||||||
|
|
||||||
def reset(self) -> None:
|
|
||||||
print("reset")
|
|
||||||
self.state = self.state.reset()
|
|
||||||
|
|
||||||
def put(self, row: int, column: int) -> None:
|
|
||||||
print("put %s" % coords_to_field(row, column))
|
|
||||||
self.state = self.state.put(row, column)
|
|
||||||
|
|
||||||
def take(self, row: int, column: int) -> None:
|
|
||||||
print("take %s" % coords_to_field(row, column))
|
|
||||||
self.state = self.state.take(row, column)
|
|
||||||
|
|
||||||
|
|
||||||
class State:
|
|
||||||
def __init__(self, blunder_evaluator: BlunderEvaluator):
|
|
||||||
self.blunder_evaluator = blunder_evaluator
|
|
||||||
|
|
||||||
def reset(self) -> "State":
|
|
||||||
self.blunder_evaluator.reset()
|
|
||||||
return InitState(self.blunder_evaluator)
|
|
||||||
|
|
||||||
def put(self, row: int, column: int) -> "State":
|
|
||||||
print("ignored invalid put")
|
|
||||||
return self
|
|
||||||
|
|
||||||
def take(self, row: int, column: int) -> "State":
|
|
||||||
print("ignored invalid take")
|
|
||||||
return self
|
|
||||||
|
|
||||||
|
|
||||||
class InitState(State):
|
|
||||||
def reset(self) -> State:
|
|
||||||
super().reset()
|
|
||||||
return self
|
|
||||||
|
|
||||||
def take(self, row: int, column: int) -> State:
|
|
||||||
return TakeState(self.blunder_evaluator, coords_to_field(row, column))
|
|
||||||
|
|
||||||
|
|
||||||
class TakeState(State):
|
|
||||||
def __init__(self, blunder_evaluator: BlunderEvaluator, from_field: str):
|
|
||||||
super().__init__(blunder_evaluator)
|
|
||||||
self.from_field = from_field
|
|
||||||
|
|
||||||
def put(self, row: int, column: int) -> State:
|
|
||||||
to_field = coords_to_field(row, column)
|
|
||||||
if self.from_field == to_field:
|
|
||||||
print("ignored self-move")
|
|
||||||
return InitState(self.blunder_evaluator)
|
|
||||||
move = self.from_field + to_field
|
|
||||||
print("move %s" % move)
|
|
||||||
self.blunder_evaluator.move(move)
|
|
||||||
return InitState(self.blunder_evaluator)
|
|
||||||
|
|
||||||
def take(self, row: int, column: int) -> State:
|
|
||||||
return TakeTakeState(
|
|
||||||
self.blunder_evaluator, self.from_field, coords_to_field(row, column)
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class TakeTakeState(State):
|
|
||||||
def __init__(
|
|
||||||
self, blunder_evaluator: BlunderEvaluator, from_field: str, from2_field: str
|
|
||||||
):
|
|
||||||
super().__init__(blunder_evaluator)
|
|
||||||
self.from_field = from_field
|
|
||||||
self.from2_field = from2_field
|
|
||||||
|
|
||||||
def put(self, row: int, column: int) -> State:
|
|
||||||
to_field = coords_to_field(row, column)
|
|
||||||
if self.from2_field == to_field:
|
|
||||||
move = self.from_field + to_field
|
|
||||||
elif self.from_field == to_field:
|
|
||||||
move = self.from2_field + to_field
|
|
||||||
elif (
|
|
||||||
self.from_field[1] == self.from2_field[1]
|
|
||||||
and self.from_field[1] == to_field[1]
|
|
||||||
):
|
|
||||||
# king-side castling
|
|
||||||
if (
|
|
||||||
self.from_field in ["e1", "e8"]
|
|
||||||
and self.from2_field in ["h1", "h8"]
|
|
||||||
and to_field in ["f1", "f8", "g1", "g8"]
|
|
||||||
):
|
|
||||||
return KingCastleState(
|
|
||||||
self.blunder_evaluator, self.from_field, self.from2_field, to_field
|
|
||||||
)
|
|
||||||
# queen-side castling
|
|
||||||
if (
|
|
||||||
self.from_field in ["e1", "e8"]
|
|
||||||
and self.from2_field in ["a1", "a8"]
|
|
||||||
and to_field in ["c1", "c8", "d1", "d8"]
|
|
||||||
):
|
|
||||||
return QueenCastleState(
|
|
||||||
self.blunder_evaluator, self.from_field, self.from2_field, to_field
|
|
||||||
)
|
|
||||||
print("ignored invalid put")
|
|
||||||
return self
|
|
||||||
else:
|
|
||||||
print("ignored invalid put")
|
|
||||||
return self
|
|
||||||
print("move %s" % move)
|
|
||||||
self.blunder_evaluator.move(move)
|
|
||||||
return InitState(self.blunder_evaluator)
|
|
||||||
|
|
||||||
|
|
||||||
class KingCastleState(State):
|
|
||||||
def __init__(
|
|
||||||
self,
|
|
||||||
blunder_evaluator: BlunderEvaluator,
|
|
||||||
from_field: str,
|
|
||||||
from2_field: str,
|
|
||||||
to_field: str,
|
|
||||||
):
|
|
||||||
super().__init__(blunder_evaluator)
|
|
||||||
self.from_field = from_field
|
|
||||||
self.from2_field = from2_field
|
|
||||||
self.to_field = to_field
|
|
||||||
|
|
||||||
def put(self, row: int, column: int) -> State:
|
|
||||||
to2_field = coords_to_field(row, column)
|
|
||||||
if self.to_field[1] == to2_field[1]:
|
|
||||||
if to2_field in ["f1", "g1"]:
|
|
||||||
move = "e1g1"
|
|
||||||
elif to2_field in ["f8", "g8"]:
|
|
||||||
move = "e8g8"
|
|
||||||
else:
|
|
||||||
print("ignored invalid put")
|
|
||||||
return self
|
|
||||||
else:
|
|
||||||
print("ignored invalid put")
|
|
||||||
return self
|
|
||||||
print("move %s" % move)
|
|
||||||
self.blunder_evaluator.move(move)
|
|
||||||
return InitState(self.blunder_evaluator)
|
|
||||||
|
|
||||||
|
|
||||||
class QueenCastleState(State):
|
|
||||||
def __init__(
|
|
||||||
self,
|
|
||||||
blunder_evaluator: BlunderEvaluator,
|
|
||||||
from_field: str,
|
|
||||||
from2_field: str,
|
|
||||||
to_field: str,
|
|
||||||
):
|
|
||||||
super().__init__(blunder_evaluator)
|
|
||||||
self.from_field = from_field
|
|
||||||
self.from2_field = from2_field
|
|
||||||
self.to_field = to_field
|
|
||||||
|
|
||||||
def put(self, row: int, column: int) -> State:
|
|
||||||
to2_field = coords_to_field(row, column)
|
|
||||||
if self.to_field[1] == to2_field[1]:
|
|
||||||
if to2_field in ["c1", "d1"]:
|
|
||||||
move = "e1c1"
|
|
||||||
elif to2_field in ["c8", "d8"]:
|
|
||||||
move = "e8c8"
|
|
||||||
else:
|
|
||||||
print("ignored invalid put")
|
|
||||||
return self
|
|
||||||
else:
|
|
||||||
print("ignored invalid put")
|
|
||||||
return self
|
|
||||||
print("move %s" % move)
|
|
||||||
self.blunder_evaluator.move(move)
|
|
||||||
return InitState(self.blunder_evaluator)
|
|
||||||
Loading…
Reference in a new issue