main.py 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296
  1. from fastapi import FastAPI, WebSocket, WebSocketDisconnect
  2. from fastapi.responses import JSONResponse
  3. from pydantic import BaseModel
  4. from typing import List, Optional, Dict
  5. import random
  6. import string
  7. import uvicorn
  8. app = FastAPI()
  9. users: Dict[str, dict] = {}
  10. tokens: Dict[str, str] = {}
  11. games: Dict[int, dict] = {}
  12. user_games: Dict[str, int] = {}
  13. next_game_id = 0
  14. connections: Dict[str, WebSocket] = {}
  15. class RegisterRequest(BaseModel):
  16. nickname: str
  17. password: str
  18. class TokenRequest(BaseModel):
  19. accessToken: str
  20. class CreateRoomRequest(BaseModel):
  21. accessToken: str
  22. class EnterRoomRequest(BaseModel):
  23. accessToken: str
  24. gameId: int
  25. class PickRequest(BaseModel):
  26. accessToken: str
  27. cards: List[int]
  28. def generate_token():
  29. return ''.join(random.choices(string.ascii_letters + string.digits, k=16))
  30. def create_deck():
  31. deck = []
  32. card_id = 0
  33. for color in [1, 2, 3]:
  34. for shape in [1, 2, 3]:
  35. for fill in [1, 2, 3]:
  36. for count in [1, 2, 3]:
  37. deck.append({
  38. "id": card_id,
  39. "color": color,
  40. "shape": shape,
  41. "fill": fill,
  42. "count": count
  43. })
  44. card_id += 1
  45. random.shuffle(deck)
  46. return deck
  47. def is_valid_set(card1, card2, card3):
  48. attrs = ['color', 'shape', 'fill', 'count']
  49. for attr in attrs:
  50. values = [card1[attr], card2[attr], card3[attr]]
  51. if len(set(values)) == 2:
  52. return False
  53. return True
  54. def get_nickname_by_token(token: str) -> Optional[str]:
  55. return tokens.get(token)
  56. def success_response(data: dict = None):
  57. response = {"success": True, "exception": None}
  58. if data:
  59. response.update(data)
  60. return response
  61. def error_response(message: str):
  62. return {
  63. "success": False,
  64. "exception": {
  65. "message": message
  66. }
  67. }
  68. @app.post("/user/register")
  69. async def register(request: RegisterRequest):
  70. if request.nickname in users:
  71. return error_response("User already exists")
  72. token = generate_token()
  73. users[request.nickname] = {
  74. "password": request.password,
  75. "token": token
  76. }
  77. tokens[token] = request.nickname
  78. return success_response({
  79. "nickname": request.nickname,
  80. "accessToken": token
  81. })
  82. @app.post("/set/room/create")
  83. async def create_room(request: CreateRoomRequest):
  84. nickname = get_nickname_by_token(request.accessToken)
  85. if not nickname:
  86. return error_response("Invalid token")
  87. global next_game_id
  88. game_id = next_game_id
  89. next_game_id += 1
  90. deck = create_deck()
  91. field = deck[:12]
  92. deck = deck[12:]
  93. games[game_id] = {
  94. "id": game_id,
  95. "deck": deck,
  96. "field": field,
  97. "players": {},
  98. "status": "ongoing"
  99. }
  100. return success_response({"gameId": game_id})
  101. @app.post("/set/room/list")
  102. async def list_rooms(request: TokenRequest):
  103. nickname = get_nickname_by_token(request.accessToken)
  104. if not nickname:
  105. return error_response("Invalid token")
  106. game_list = [{"id": gid} for gid in games.keys()]
  107. return {"games": game_list}
  108. @app.post("/set/room/enter")
  109. async def enter_room(request: EnterRoomRequest):
  110. nickname = get_nickname_by_token(request.accessToken)
  111. if not nickname:
  112. return error_response("Invalid token")
  113. if request.gameId not in games:
  114. return error_response("Game not found")
  115. game = games[request.gameId]
  116. if nickname not in game["players"]:
  117. game["players"][nickname] = {"score": 0}
  118. user_games[nickname] = request.gameId
  119. return success_response({"gameId": request.gameId})
  120. @app.post("/set/field")
  121. async def get_field(request: TokenRequest):
  122. nickname = get_nickname_by_token(request.accessToken)
  123. if not nickname:
  124. return error_response("Invalid token")
  125. if nickname not in user_games:
  126. return error_response("Not in any game")
  127. game_id = user_games[nickname]
  128. game = games[game_id]
  129. score = game["players"].get(nickname, {}).get("score", 0)
  130. return {
  131. "cards": game["field"],
  132. "status": game["status"],
  133. "score": score
  134. }
  135. @app.post("/set/pick")
  136. async def pick_cards(request: PickRequest):
  137. nickname = get_nickname_by_token(request.accessToken)
  138. if not nickname:
  139. return error_response("Invalid token")
  140. if nickname not in user_games:
  141. return error_response("Not in any game")
  142. game_id = user_games[nickname]
  143. game = games[game_id]
  144. if len(request.cards) != 3:
  145. return {"isSet": False, "score": game["players"][nickname]["score"]}
  146. field_cards = {card["id"]: card for card in game["field"]}
  147. selected_cards = []
  148. for card_id in request.cards:
  149. if card_id not in field_cards:
  150. return {"isSet": False, "score": game["players"][nickname]["score"]}
  151. selected_cards.append(field_cards[card_id])
  152. is_set = is_valid_set(selected_cards[0], selected_cards[1], selected_cards[2])
  153. if is_set:
  154. game["players"][nickname]["score"] += 1
  155. game["field"] = [c for c in game["field"] if c["id"] not in request.cards]
  156. cards_to_add = min(3, len(game["deck"]))
  157. for _ in range(cards_to_add):
  158. if game["deck"]:
  159. game["field"].append(game["deck"].pop(0))
  160. if len(game["deck"]) == 0 and len(game["field"]) < 3:
  161. game["status"] = "ended"
  162. return {
  163. "isSet": is_set,
  164. "score": game["players"][nickname]["score"]
  165. }
  166. @app.post("/set/add")
  167. async def add_cards(request: TokenRequest):
  168. nickname = get_nickname_by_token(request.accessToken)
  169. if not nickname:
  170. return error_response("Invalid token")
  171. if nickname not in user_games:
  172. return error_response("Not in any game")
  173. game_id = user_games[nickname]
  174. game = games[game_id]
  175. cards_to_add = min(3, len(game["deck"]))
  176. for _ in range(cards_to_add):
  177. if game["deck"]:
  178. game["field"].append(game["deck"].pop(0))
  179. return success_response()
  180. @app.post("/set/scores")
  181. async def get_scores(request: TokenRequest):
  182. nickname = get_nickname_by_token(request.accessToken)
  183. if not nickname:
  184. return error_response("Invalid token")
  185. if nickname not in user_games:
  186. return error_response("Not in any game")
  187. game_id = user_games[nickname]
  188. game = games[game_id]
  189. users_list = [
  190. {"name": name, "score": data["score"]}
  191. for name, data in game["players"].items()
  192. ]
  193. return success_response({"users": users_list})
  194. @app.websocket("/set")
  195. async def websocket_endpoint(websocket: WebSocket):
  196. await websocket.accept()
  197. token = None
  198. try:
  199. token = await websocket.receive_text()
  200. nickname = get_nickname_by_token(token)
  201. if nickname:
  202. connections[nickname] = websocket
  203. await websocket.send_text(f"Connected as {nickname}")
  204. while True:
  205. data = await websocket.receive_text()
  206. await websocket.send_text(f"Echo: {data}")
  207. else:
  208. await websocket.send_text("Invalid token")
  209. await websocket.close()
  210. except WebSocketDisconnect:
  211. if token and token in tokens:
  212. nickname = tokens[token]
  213. if nickname in connections:
  214. del connections[nickname]
  215. if __name__ == "__main__":
  216. uvicorn.run(app, host="0.0.0.0", port=8080)