from fastapi import FastAPI, WebSocket, WebSocketDisconnect from fastapi.responses import JSONResponse from pydantic import BaseModel from typing import List, Optional, Dict import random import string import uvicorn app = FastAPI() users: Dict[str, dict] = {} tokens: Dict[str, str] = {} games: Dict[int, dict] = {} user_games: Dict[str, int] = {} next_game_id = 0 connections: Dict[str, WebSocket] = {} class RegisterRequest(BaseModel): nickname: str password: str class TokenRequest(BaseModel): accessToken: str class CreateRoomRequest(BaseModel): accessToken: str class EnterRoomRequest(BaseModel): accessToken: str gameId: int class PickRequest(BaseModel): accessToken: str cards: List[int] def generate_token(): return ''.join(random.choices(string.ascii_letters + string.digits, k=16)) def create_deck(): deck = [] card_id = 0 for color in [1, 2, 3]: for shape in [1, 2, 3]: for fill in [1, 2, 3]: for count in [1, 2, 3]: deck.append({ "id": card_id, "color": color, "shape": shape, "fill": fill, "count": count }) card_id += 1 random.shuffle(deck) return deck def is_valid_set(card1, card2, card3): attrs = ['color', 'shape', 'fill', 'count'] for attr in attrs: values = [card1[attr], card2[attr], card3[attr]] if len(set(values)) == 2: return False return True def get_nickname_by_token(token: str) -> Optional[str]: return tokens.get(token) def success_response(data: dict = None): response = {"success": True, "exception": None} if data: response.update(data) return response def error_response(message: str): return { "success": False, "exception": { "message": message } } @app.post("/user/register") async def register(request: RegisterRequest): if request.nickname in users: return error_response("User already exists") token = generate_token() users[request.nickname] = { "password": request.password, "token": token } tokens[token] = request.nickname return success_response({ "nickname": request.nickname, "accessToken": token }) @app.post("/set/room/create") async def create_room(request: CreateRoomRequest): nickname = get_nickname_by_token(request.accessToken) if not nickname: return error_response("Invalid token") global next_game_id game_id = next_game_id next_game_id += 1 deck = create_deck() field = deck[:12] deck = deck[12:] games[game_id] = { "id": game_id, "deck": deck, "field": field, "players": {}, "status": "ongoing" } return success_response({"gameId": game_id}) @app.post("/set/room/list") async def list_rooms(request: TokenRequest): nickname = get_nickname_by_token(request.accessToken) if not nickname: return error_response("Invalid token") game_list = [{"id": gid} for gid in games.keys()] return {"games": game_list} @app.post("/set/room/enter") async def enter_room(request: EnterRoomRequest): nickname = get_nickname_by_token(request.accessToken) if not nickname: return error_response("Invalid token") if request.gameId not in games: return error_response("Game not found") game = games[request.gameId] if nickname not in game["players"]: game["players"][nickname] = {"score": 0} user_games[nickname] = request.gameId return success_response({"gameId": request.gameId}) @app.post("/set/field") async def get_field(request: TokenRequest): nickname = get_nickname_by_token(request.accessToken) if not nickname: return error_response("Invalid token") if nickname not in user_games: return error_response("Not in any game") game_id = user_games[nickname] game = games[game_id] score = game["players"].get(nickname, {}).get("score", 0) return { "cards": game["field"], "status": game["status"], "score": score } @app.post("/set/pick") async def pick_cards(request: PickRequest): nickname = get_nickname_by_token(request.accessToken) if not nickname: return error_response("Invalid token") if nickname not in user_games: return error_response("Not in any game") game_id = user_games[nickname] game = games[game_id] if len(request.cards) != 3: return {"isSet": False, "score": game["players"][nickname]["score"]} field_cards = {card["id"]: card for card in game["field"]} selected_cards = [] for card_id in request.cards: if card_id not in field_cards: return {"isSet": False, "score": game["players"][nickname]["score"]} selected_cards.append(field_cards[card_id]) is_set = is_valid_set(selected_cards[0], selected_cards[1], selected_cards[2]) if is_set: game["players"][nickname]["score"] += 1 game["field"] = [c for c in game["field"] if c["id"] not in request.cards] cards_to_add = min(3, len(game["deck"])) for _ in range(cards_to_add): if game["deck"]: game["field"].append(game["deck"].pop(0)) if len(game["deck"]) == 0 and len(game["field"]) < 3: game["status"] = "ended" return { "isSet": is_set, "score": game["players"][nickname]["score"] } @app.post("/set/add") async def add_cards(request: TokenRequest): nickname = get_nickname_by_token(request.accessToken) if not nickname: return error_response("Invalid token") if nickname not in user_games: return error_response("Not in any game") game_id = user_games[nickname] game = games[game_id] cards_to_add = min(3, len(game["deck"])) for _ in range(cards_to_add): if game["deck"]: game["field"].append(game["deck"].pop(0)) return success_response() @app.post("/set/scores") async def get_scores(request: TokenRequest): nickname = get_nickname_by_token(request.accessToken) if not nickname: return error_response("Invalid token") if nickname not in user_games: return error_response("Not in any game") game_id = user_games[nickname] game = games[game_id] users_list = [ {"name": name, "score": data["score"]} for name, data in game["players"].items() ] return success_response({"users": users_list}) @app.websocket("/set") async def websocket_endpoint(websocket: WebSocket): await websocket.accept() token = None try: token = await websocket.receive_text() nickname = get_nickname_by_token(token) if nickname: connections[nickname] = websocket await websocket.send_text(f"Connected as {nickname}") while True: data = await websocket.receive_text() await websocket.send_text(f"Echo: {data}") else: await websocket.send_text("Invalid token") await websocket.close() except WebSocketDisconnect: if token and token in tokens: nickname = tokens[token] if nickname in connections: del connections[nickname] if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=8080)