|
@@ -0,0 +1,296 @@
|
|
|
|
|
+from fastapi import FastAPI, WebSocket, WebSocketDisconnect
|
|
|
|
|
+from fastapi.responses import JSONResponse
|
|
|
|
|
+from pydantic import BaseModel
|
|
|
|
|
+from typing import List, Optional, Dict
|
|
|
|
|
+import random
|
|
|
|
|
+import string
|
|
|
|
|
+import uvicorn
|
|
|
|
|
+
|
|
|
|
|
+app = FastAPI()
|
|
|
|
|
+
|
|
|
|
|
+users: Dict[str, dict] = {}
|
|
|
|
|
+tokens: Dict[str, str] = {}
|
|
|
|
|
+games: Dict[int, dict] = {}
|
|
|
|
|
+user_games: Dict[str, int] = {}
|
|
|
|
|
+next_game_id = 0
|
|
|
|
|
+
|
|
|
|
|
+connections: Dict[str, WebSocket] = {}
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+class RegisterRequest(BaseModel):
|
|
|
|
|
+ nickname: str
|
|
|
|
|
+ password: str
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+class TokenRequest(BaseModel):
|
|
|
|
|
+ accessToken: str
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+class CreateRoomRequest(BaseModel):
|
|
|
|
|
+ accessToken: str
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+class EnterRoomRequest(BaseModel):
|
|
|
|
|
+ accessToken: str
|
|
|
|
|
+ gameId: int
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+class PickRequest(BaseModel):
|
|
|
|
|
+ accessToken: str
|
|
|
|
|
+ cards: List[int]
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def generate_token():
|
|
|
|
|
+ return ''.join(random.choices(string.ascii_letters + string.digits, k=16))
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def create_deck():
|
|
|
|
|
+ deck = []
|
|
|
|
|
+ card_id = 0
|
|
|
|
|
+ for color in [1, 2, 3]:
|
|
|
|
|
+ for shape in [1, 2, 3]:
|
|
|
|
|
+ for fill in [1, 2, 3]:
|
|
|
|
|
+ for count in [1, 2, 3]:
|
|
|
|
|
+ deck.append({
|
|
|
|
|
+ "id": card_id,
|
|
|
|
|
+ "color": color,
|
|
|
|
|
+ "shape": shape,
|
|
|
|
|
+ "fill": fill,
|
|
|
|
|
+ "count": count
|
|
|
|
|
+ })
|
|
|
|
|
+ card_id += 1
|
|
|
|
|
+ random.shuffle(deck)
|
|
|
|
|
+ return deck
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def is_valid_set(card1, card2, card3):
|
|
|
|
|
+ attrs = ['color', 'shape', 'fill', 'count']
|
|
|
|
|
+ for attr in attrs:
|
|
|
|
|
+ values = [card1[attr], card2[attr], card3[attr]]
|
|
|
|
|
+ if len(set(values)) == 2:
|
|
|
|
|
+ return False
|
|
|
|
|
+ return True
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def get_nickname_by_token(token: str) -> Optional[str]:
|
|
|
|
|
+ return tokens.get(token)
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def success_response(data: dict = None):
|
|
|
|
|
+ response = {"success": True, "exception": None}
|
|
|
|
|
+ if data:
|
|
|
|
|
+ response.update(data)
|
|
|
|
|
+ return response
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def error_response(message: str):
|
|
|
|
|
+ return {
|
|
|
|
|
+ "success": False,
|
|
|
|
|
+ "exception": {
|
|
|
|
|
+ "message": message
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+@app.post("/user/register")
|
|
|
|
|
+async def register(request: RegisterRequest):
|
|
|
|
|
+ if request.nickname in users:
|
|
|
|
|
+ return error_response("User already exists")
|
|
|
|
|
+
|
|
|
|
|
+ token = generate_token()
|
|
|
|
|
+ users[request.nickname] = {
|
|
|
|
|
+ "password": request.password,
|
|
|
|
|
+ "token": token
|
|
|
|
|
+ }
|
|
|
|
|
+ tokens[token] = request.nickname
|
|
|
|
|
+
|
|
|
|
|
+ return success_response({
|
|
|
|
|
+ "nickname": request.nickname,
|
|
|
|
|
+ "accessToken": token
|
|
|
|
|
+ })
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+@app.post("/set/room/create")
|
|
|
|
|
+async def create_room(request: CreateRoomRequest):
|
|
|
|
|
+ nickname = get_nickname_by_token(request.accessToken)
|
|
|
|
|
+ if not nickname:
|
|
|
|
|
+ return error_response("Invalid token")
|
|
|
|
|
+
|
|
|
|
|
+ global next_game_id
|
|
|
|
|
+ game_id = next_game_id
|
|
|
|
|
+ next_game_id += 1
|
|
|
|
|
+
|
|
|
|
|
+ deck = create_deck()
|
|
|
|
|
+ field = deck[:12]
|
|
|
|
|
+ deck = deck[12:]
|
|
|
|
|
+
|
|
|
|
|
+ games[game_id] = {
|
|
|
|
|
+ "id": game_id,
|
|
|
|
|
+ "deck": deck,
|
|
|
|
|
+ "field": field,
|
|
|
|
|
+ "players": {},
|
|
|
|
|
+ "status": "ongoing"
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ return success_response({"gameId": game_id})
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+@app.post("/set/room/list")
|
|
|
|
|
+async def list_rooms(request: TokenRequest):
|
|
|
|
|
+ nickname = get_nickname_by_token(request.accessToken)
|
|
|
|
|
+ if not nickname:
|
|
|
|
|
+ return error_response("Invalid token")
|
|
|
|
|
+
|
|
|
|
|
+ game_list = [{"id": gid} for gid in games.keys()]
|
|
|
|
|
+ return {"games": game_list}
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+@app.post("/set/room/enter")
|
|
|
|
|
+async def enter_room(request: EnterRoomRequest):
|
|
|
|
|
+ nickname = get_nickname_by_token(request.accessToken)
|
|
|
|
|
+ if not nickname:
|
|
|
|
|
+ return error_response("Invalid token")
|
|
|
|
|
+
|
|
|
|
|
+ if request.gameId not in games:
|
|
|
|
|
+ return error_response("Game not found")
|
|
|
|
|
+
|
|
|
|
|
+ game = games[request.gameId]
|
|
|
|
|
+ if nickname not in game["players"]:
|
|
|
|
|
+ game["players"][nickname] = {"score": 0}
|
|
|
|
|
+
|
|
|
|
|
+ user_games[nickname] = request.gameId
|
|
|
|
|
+
|
|
|
|
|
+ return success_response({"gameId": request.gameId})
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+@app.post("/set/field")
|
|
|
|
|
+async def get_field(request: TokenRequest):
|
|
|
|
|
+ nickname = get_nickname_by_token(request.accessToken)
|
|
|
|
|
+ if not nickname:
|
|
|
|
|
+ return error_response("Invalid token")
|
|
|
|
|
+
|
|
|
|
|
+ if nickname not in user_games:
|
|
|
|
|
+ return error_response("Not in any game")
|
|
|
|
|
+
|
|
|
|
|
+ game_id = user_games[nickname]
|
|
|
|
|
+ game = games[game_id]
|
|
|
|
|
+
|
|
|
|
|
+ score = game["players"].get(nickname, {}).get("score", 0)
|
|
|
|
|
+
|
|
|
|
|
+ return {
|
|
|
|
|
+ "cards": game["field"],
|
|
|
|
|
+ "status": game["status"],
|
|
|
|
|
+ "score": score
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+@app.post("/set/pick")
|
|
|
|
|
+async def pick_cards(request: PickRequest):
|
|
|
|
|
+ nickname = get_nickname_by_token(request.accessToken)
|
|
|
|
|
+ if not nickname:
|
|
|
|
|
+ return error_response("Invalid token")
|
|
|
|
|
+
|
|
|
|
|
+ if nickname not in user_games:
|
|
|
|
|
+ return error_response("Not in any game")
|
|
|
|
|
+
|
|
|
|
|
+ game_id = user_games[nickname]
|
|
|
|
|
+ game = games[game_id]
|
|
|
|
|
+
|
|
|
|
|
+ if len(request.cards) != 3:
|
|
|
|
|
+ return {"isSet": False, "score": game["players"][nickname]["score"]}
|
|
|
|
|
+
|
|
|
|
|
+ field_cards = {card["id"]: card for card in game["field"]}
|
|
|
|
|
+ selected_cards = []
|
|
|
|
|
+ for card_id in request.cards:
|
|
|
|
|
+ if card_id not in field_cards:
|
|
|
|
|
+ return {"isSet": False, "score": game["players"][nickname]["score"]}
|
|
|
|
|
+ selected_cards.append(field_cards[card_id])
|
|
|
|
|
+
|
|
|
|
|
+ is_set = is_valid_set(selected_cards[0], selected_cards[1], selected_cards[2])
|
|
|
|
|
+
|
|
|
|
|
+ if is_set:
|
|
|
|
|
+ game["players"][nickname]["score"] += 1
|
|
|
|
|
+
|
|
|
|
|
+ game["field"] = [c for c in game["field"] if c["id"] not in request.cards]
|
|
|
|
|
+
|
|
|
|
|
+ cards_to_add = min(3, len(game["deck"]))
|
|
|
|
|
+ for _ in range(cards_to_add):
|
|
|
|
|
+ if game["deck"]:
|
|
|
|
|
+ game["field"].append(game["deck"].pop(0))
|
|
|
|
|
+
|
|
|
|
|
+ if len(game["deck"]) == 0 and len(game["field"]) < 3:
|
|
|
|
|
+ game["status"] = "ended"
|
|
|
|
|
+
|
|
|
|
|
+ return {
|
|
|
|
|
+ "isSet": is_set,
|
|
|
|
|
+ "score": game["players"][nickname]["score"]
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+@app.post("/set/add")
|
|
|
|
|
+async def add_cards(request: TokenRequest):
|
|
|
|
|
+ nickname = get_nickname_by_token(request.accessToken)
|
|
|
|
|
+ if not nickname:
|
|
|
|
|
+ return error_response("Invalid token")
|
|
|
|
|
+
|
|
|
|
|
+ if nickname not in user_games:
|
|
|
|
|
+ return error_response("Not in any game")
|
|
|
|
|
+
|
|
|
|
|
+ game_id = user_games[nickname]
|
|
|
|
|
+ game = games[game_id]
|
|
|
|
|
+
|
|
|
|
|
+ cards_to_add = min(3, len(game["deck"]))
|
|
|
|
|
+ for _ in range(cards_to_add):
|
|
|
|
|
+ if game["deck"]:
|
|
|
|
|
+ game["field"].append(game["deck"].pop(0))
|
|
|
|
|
+
|
|
|
|
|
+ return success_response()
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+@app.post("/set/scores")
|
|
|
|
|
+async def get_scores(request: TokenRequest):
|
|
|
|
|
+ nickname = get_nickname_by_token(request.accessToken)
|
|
|
|
|
+ if not nickname:
|
|
|
|
|
+ return error_response("Invalid token")
|
|
|
|
|
+
|
|
|
|
|
+ if nickname not in user_games:
|
|
|
|
|
+ return error_response("Not in any game")
|
|
|
|
|
+
|
|
|
|
|
+ game_id = user_games[nickname]
|
|
|
|
|
+ game = games[game_id]
|
|
|
|
|
+
|
|
|
|
|
+ users_list = [
|
|
|
|
|
+ {"name": name, "score": data["score"]}
|
|
|
|
|
+ for name, data in game["players"].items()
|
|
|
|
|
+ ]
|
|
|
|
|
+
|
|
|
|
|
+ return success_response({"users": users_list})
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+@app.websocket("/set")
|
|
|
|
|
+async def websocket_endpoint(websocket: WebSocket):
|
|
|
|
|
+ await websocket.accept()
|
|
|
|
|
+ token = None
|
|
|
|
|
+ try:
|
|
|
|
|
+ token = await websocket.receive_text()
|
|
|
|
|
+ nickname = get_nickname_by_token(token)
|
|
|
|
|
+
|
|
|
|
|
+ if nickname:
|
|
|
|
|
+ connections[nickname] = websocket
|
|
|
|
|
+ await websocket.send_text(f"Connected as {nickname}")
|
|
|
|
|
+
|
|
|
|
|
+ while True:
|
|
|
|
|
+ data = await websocket.receive_text()
|
|
|
|
|
+ await websocket.send_text(f"Echo: {data}")
|
|
|
|
|
+ else:
|
|
|
|
|
+ await websocket.send_text("Invalid token")
|
|
|
|
|
+ await websocket.close()
|
|
|
|
|
+ except WebSocketDisconnect:
|
|
|
|
|
+ if token and token in tokens:
|
|
|
|
|
+ nickname = tokens[token]
|
|
|
|
|
+ if nickname in connections:
|
|
|
|
|
+ del connections[nickname]
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+if __name__ == "__main__":
|
|
|
|
|
+ uvicorn.run(app, host="0.0.0.0", port=8080)
|