first draft of the actual RPC client

This commit is contained in:
Gandalf 2025-11-24 00:54:00 +01:00
parent 2272b28038
commit 584568b784

95
src/RPC.py Normal file
View file

@ -0,0 +1,95 @@
"""JSON-RPC client module."""
from __future__ import annotations
import itertools
import json
import logging
import os
import subprocess
import sys
from queue import Empty, Queue
from threading import Event, Thread
from typing import Any, Iterator, Optional
class JsonRpcError(Exception):
"""JSON-RPC error."""
class RpcFuture:
"""RPC future waiting for RPC call result."""
def __init__(self, rpc: "Rpc", request_id: int, event: Event):
self.rpc = rpc
self.request_id = request_id
self.event = event
def __call__(self):
"""Wait for the future to return the result."""
self.event.wait()
response = self.rpc.request_results.pop(self.request_id)
if "error" in response:
raise JsonRpcError(response["error"])
if "result" in response:
return response["result"]
return None
class RpcMethod:
"""RPC method."""
def __init__(self, rpc: "Rpc", name: str):
self.rpc = rpc
self.name = name
def __call__(self, *args) -> Any:
"""Call JSON-RPC method synchronously."""
request_id = str(uuid4())
request = {
"jsonrpc": "2.0",
"method": self.name,
"params": args,
"id": request_id,
}
# send
# and return result
try:
res = self._session.post(
url=f"{self._endpoint}",
json=data,
# auth=self._auth,
# verify=self._verify_ssl,
)
res.raise_for_status()
ret = res.json()
if ret.get("id") == request_id:
if ret.get("error"):
error = ret.get("error").get("message")
raise SignalCliJSONRPCError(error)
return ret.get("result")
except Exception as err: # pylint: disable=broad-except
error = getattr(err, "message", repr(err))
raise SignalCliJSONRPCError(
f"signal-cli JSON RPC request failed: {error}"
) from err
class Rpc:
"""RPC client."""
def __init__(self, accounts_dir: Optional[str] = None, **kwargs):
"""Initialize RPC client.
"""
# seems like usable logic, just not for this exact purpose, or maybe yes.
#if accounts_dir:
# kwargs["env"] = {
# **kwargs.get("env", os.environ),
# "DC_ACCOUNTS_PATH": str(accounts_dir),
# }
self._kwargs = kwargs
def __getattr__(self, attr: str):
return RpcMethod(self, attr)