| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139 |
- import json
- import uuid
- from typing import Dict, List, Optional, Any
- from game_logic import SetGame
- users_db = {}
- games_db = {}
- active_connections = {}
- def get_next_game_id() -> int:
- if not games_db:
- return 0
- return max(games_db.keys()) + 1
- def get_user_by_nickname(nickname: str) -> Optional[Dict]:
- for user in users_db.values():
- if user["nickname"] == nickname:
- return user
- return None
- def get_user_by_token(token: str) -> Optional[Dict]:
- return users_db.get(token)
- def create_user(nickname: str, password: str, token: str) -> Dict:
- from auth import get_password_hash
-
- user_data = {
- "nickname": nickname,
- "password_hash": get_password_hash(password),
- "token": token,
- "created_at": "2024-01-01",
- "score": 0
- }
- users_db[token] = user_data
- return user_data
- def create_game(creator_token: str) -> int:
- game_id = get_next_game_id()
-
- game_data = {
- "id": game_id,
- "game": SetGame(),
- "players": {
- creator_token: {
- "nickname": users_db[creator_token]["nickname"],
- "score": 0,
- "joined_at": "2024-01-01"
- }
- },
- "created_at": "2024-01-01",
- "status": "waiting",
- "max_players": 4
- }
-
- games_db[game_id] = game_data
- return game_id
- def get_game(game_id: int) -> Optional[Dict]:
- return games_db.get(game_id)
- def get_all_games() -> Dict[int, Dict]:
- return games_db
- def update_game(game_id: int, game_data: Dict):
- if game_id in games_db:
- games_db[game_id] = game_data
- def add_player_to_game(game_id: int, token: str, nickname: str) -> bool:
- if game_id not in games_db:
- return False
-
- game = games_db[game_id]
-
- if len(game["players"]) >= game["max_players"]:
- return False
-
- if token in game["players"]:
- return False
-
- game["players"][token] = {
- "nickname": nickname,
- "score": 0,
- "joined_at": "2024-01-01"
- }
-
- if len(game["players"]) >= 2:
- game["status"] = "ongoing"
-
- return True
- def remove_player_from_game(game_id: int, token: str):
- if game_id in games_db and token in games_db[game_id]["players"]:
- del games_db[game_id]["players"][token]
-
- if not games_db[game_id]["players"]:
- del games_db[game_id]
- else:
- if len(games_db[game_id]["players"]) == 1:
- games_db[game_id]["status"] = "finished"
- async def broadcast_game_state(game_id: int):
- from main import manager
-
- if game_id not in games_db:
- return
-
- game = games_db[game_id]
-
- for token, player_data in game["players"].items():
- field_cards = game["game"].get_field()
- status = "ongoing" if not game["game"].is_game_over() else "ended"
- score = player_data.get("score", 0)
-
- message = {
- "type": "game_update",
- "gameId": game_id,
- "cards": field_cards,
- "status": status,
- "score": score,
- "players": [
- {"name": p["nickname"], "score": p.get("score", 0)}
- for p in game["players"].values()
- ]
- }
-
- await manager.send_personal_message(token, message)
-
- await manager.broadcast_to_game(game_id, {
- "type": "game_notification",
- "gameId": game_id,
- "players_count": len(game["players"]),
- "status": status
- })
- def get_game_by_player_token(token: str) -> Optional[Dict]:
- for game in games_db.values():
- if token in game["players"]:
- return game
- return None
|