||
- from fastapi import FastAPI, WebSocket, WebSocketDisconnect
- from fastapi.responses import JSONResponse
- from pydantic import BaseModel
- from typing import List, Optional, Dict
- import random
- import string
- import uvicorn
- app = FastAPI()
- users: Dict[str, dict] = {}
- tokens: Dict[str, str] = {}
- games: Dict[int, dict] = {}
- user_games: Dict[str, int] = {}
- next_game_id = 0
- connections: Dict[str, WebSocket] = {}
- class RegisterRequest(BaseModel):
- nickname: str
- password: str
- class TokenRequest(BaseModel):
- accessToken: str
- class CreateRoomRequest(BaseModel):
- accessToken: str
- class EnterRoomRequest(BaseModel):
- accessToken: str
- gameId: int
- class PickRequest(BaseModel):
- accessToken: str
- cards: List[int]
- def generate_token():
- return ''.join(random.choices(string.ascii_letters + string.digits, k=16))
- def create_deck():
- deck = []
- card_id = 0
- for color in [1, 2, 3]:
- for shape in [1, 2, 3]:
- for fill in [1, 2, 3]:
- for count in [1, 2, 3]:
- deck.append({
- "id": card_id,
- "color": color,
- "shape": shape,
- "fill": fill,
- "count": count
- })
- card_id += 1
- random.shuffle(deck)
- return deck
- def is_valid_set(card1, card2, card3):
- attrs = ['color', 'shape', 'fill', 'count']
- for attr in attrs:
- values = [card1[attr], card2[attr], card3[attr]]
- if len(set(values)) == 2:
- return False
- return True
- def get_nickname_by_token(token: str) -> Optional[str]:
- return tokens.get(token)
- def success_response(data: dict = None):
- response = {"success": True, "exception": None}
- if data:
- response.update(data)
- return response
- def error_response(message: str):
- return {
- "success": False,
- "exception": {
- "message": message
- }
- }
- @app.post("/user/register")
- async def register(request: RegisterRequest):
- if request.nickname in users:
- return error_response("User already exists")
-
- token = generate_token()
- users[request.nickname] = {
- "password": request.password,
- "token": token
- }
- tokens[token] = request.nickname
-
- return success_response({
- "nickname": request.nickname,
- "accessToken": token
- })
- @app.post("/set/room/create")
- async def create_room(request: CreateRoomRequest):
- nickname = get_nickname_by_token(request.accessToken)
- if not nickname:
- return error_response("Invalid token")
-
- global next_game_id
- game_id = next_game_id
- next_game_id += 1
-
- deck = create_deck()
- field = deck[:12]
- deck = deck[12:]
-
- games[game_id] = {
- "id": game_id,
- "deck": deck,
- "field": field,
- "players": {},
- "status": "ongoing"
- }
-
- return success_response({"gameId": game_id})
- @app.post("/set/room/list")
- async def list_rooms(request: TokenRequest):
- nickname = get_nickname_by_token(request.accessToken)
- if not nickname:
- return error_response("Invalid token")
-
- game_list = [{"id": gid} for gid in games.keys()]
- return {"games": game_list}
- @app.post("/set/room/enter")
- async def enter_room(request: EnterRoomRequest):
- nickname = get_nickname_by_token(request.accessToken)
- if not nickname:
- return error_response("Invalid token")
-
- if request.gameId not in games:
- return error_response("Game not found")
-
- game = games[request.gameId]
- if nickname not in game["players"]:
- game["players"][nickname] = {"score": 0}
-
- user_games[nickname] = request.gameId
-
- return success_response({"gameId": request.gameId})
- @app.post("/set/field")
- async def get_field(request: TokenRequest):
- nickname = get_nickname_by_token(request.accessToken)
- if not nickname:
- return error_response("Invalid token")
-
- if nickname not in user_games:
- return error_response("Not in any game")
-
- game_id = user_games[nickname]
- game = games[game_id]
-
- score = game["players"].get(nickname, {}).get("score", 0)
-
- return {
- "cards": game["field"],
- "status": game["status"],
- "score": score
- }
- @app.post("/set/pick")
- async def pick_cards(request: PickRequest):
- nickname = get_nickname_by_token(request.accessToken)
- if not nickname:
- return error_response("Invalid token")
-
- if nickname not in user_games:
- return error_response("Not in any game")
-
- game_id = user_games[nickname]
- game = games[game_id]
-
- if len(request.cards) != 3:
- return {"isSet": False, "score": game["players"][nickname]["score"]}
-
- field_cards = {card["id"]: card for card in game["field"]}
- selected_cards = []
- for card_id in request.cards:
- if card_id not in field_cards:
- return {"isSet": False, "score": game["players"][nickname]["score"]}
- selected_cards.append(field_cards[card_id])
-
- is_set = is_valid_set(selected_cards[0], selected_cards[1], selected_cards[2])
-
- if is_set:
- game["players"][nickname]["score"] += 1
-
- game["field"] = [c for c in game["field"] if c["id"] not in request.cards]
-
- cards_to_add = min(3, len(game["deck"]))
- for _ in range(cards_to_add):
- if game["deck"]:
- game["field"].append(game["deck"].pop(0))
-
- if len(game["deck"]) == 0 and len(game["field"]) < 3:
- game["status"] = "ended"
-
- return {
- "isSet": is_set,
- "score": game["players"][nickname]["score"]
- }
- @app.post("/set/add")
- async def add_cards(request: TokenRequest):
- nickname = get_nickname_by_token(request.accessToken)
- if not nickname:
- return error_response("Invalid token")
-
- if nickname not in user_games:
- return error_response("Not in any game")
-
- game_id = user_games[nickname]
- game = games[game_id]
-
- cards_to_add = min(3, len(game["deck"]))
- for _ in range(cards_to_add):
- if game["deck"]:
- game["field"].append(game["deck"].pop(0))
-
- return success_response()
- @app.post("/set/scores")
- async def get_scores(request: TokenRequest):
- nickname = get_nickname_by_token(request.accessToken)
- if not nickname:
- return error_response("Invalid token")
-
- if nickname not in user_games:
- return error_response("Not in any game")
-
- game_id = user_games[nickname]
- game = games[game_id]
-
- users_list = [
- {"name": name, "score": data["score"]}
- for name, data in game["players"].items()
- ]
-
- return success_response({"users": users_list})
- @app.websocket("/set")
- async def websocket_endpoint(websocket: WebSocket):
- await websocket.accept()
- token = None
- try:
- token = await websocket.receive_text()
- nickname = get_nickname_by_token(token)
-
- if nickname:
- connections[nickname] = websocket
- await websocket.send_text(f"Connected as {nickname}")
-
- while True:
- data = await websocket.receive_text()
- await websocket.send_text(f"Echo: {data}")
- else:
- await websocket.send_text("Invalid token")
- await websocket.close()
- except WebSocketDisconnect:
- if token and token in tokens:
- nickname = tokens[token]
- if nickname in connections:
- del connections[nickname]
- if __name__ == "__main__":
- uvicorn.run(app, host="0.0.0.0", port=8080)
|