sototh il y a 1 semaine
commit
7f414e1cef
7 fichiers modifiés avec 785 ajouts et 0 suppressions
  1. 15 0
      README.md
  2. 28 0
      auth.py
  3. 139 0
      database.py
  4. 107 0
      game_logic.py
  5. 370 0
      main.py
  6. 66 0
      models.py
  7. 60 0
      websocket_manager.py

+ 15 - 0
README.md

@@ -0,0 +1,15 @@
+# внешние пакеты
+```bash
+pip install python-multipart
+pip install pydantic
+pip install pyjwt
+pip install websockets
+pip install uvicorn
+
+```
+
+
+# запуск 
+```bash
+uvicorn main:app --host 0.0.0.0 --port 8000 --reload
+```

+ 28 - 0
auth.py

@@ -0,0 +1,28 @@
+import jwt
+from datetime import datetime, timedelta
+import hashlib
+from typing import Optional, Dict
+
+SECRET_KEY = "your-secret-key"
+ALGORITHM = "HS256"
+ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 24 * 7  # 7 дней
+
+def create_access_token(data: dict) -> str:
+    to_encode = data.copy()
+    expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
+    to_encode.update({"exp": expire})
+    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
+    return encoded_jwt
+
+def verify_token(token: str) -> Optional[Dict]:
+    try:
+        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
+        return payload
+    except jwt.PyJWTError:
+        return None
+
+def get_password_hash(password: str) -> str:
+    return hashlib.sha256(password.encode()).hexdigest()
+
+def verify_password(plain_password: str, hashed_password: str) -> bool:
+    return get_password_hash(plain_password) == hashed_password

+ 139 - 0
database.py

@@ -0,0 +1,139 @@
+import json
+import uuid
+from typing import Dict, List, Optional, Any
+from game_logic import SetGame
+
+users_db = {}  
+games_db = {} 
+active_connections = {}  
+
+def get_next_game_id() -> int:
+    if not games_db:
+        return 0
+    return max(games_db.keys()) + 1
+
+def get_user_by_nickname(nickname: str) -> Optional[Dict]:
+    for user in users_db.values():
+        if user["nickname"] == nickname:
+            return user
+    return None
+
+def get_user_by_token(token: str) -> Optional[Dict]:
+    return users_db.get(token)
+
+def create_user(nickname: str, password: str, token: str) -> Dict:
+    from auth import get_password_hash
+    
+    user_data = {
+        "nickname": nickname,
+        "password_hash": get_password_hash(password),
+        "token": token,
+        "created_at": "2024-01-01", 
+        "score": 0
+    }
+    users_db[token] = user_data
+    return user_data
+
+def create_game(creator_token: str) -> int:
+    game_id = get_next_game_id()
+    
+    game_data = {
+        "id": game_id,
+        "game": SetGame(),
+        "players": {
+            creator_token: {
+                "nickname": users_db[creator_token]["nickname"],
+                "score": 0,
+                "joined_at": "2024-01-01"
+            }
+        },
+        "created_at": "2024-01-01",
+        "status": "waiting", 
+        "max_players": 4
+    }
+    
+    games_db[game_id] = game_data
+    return game_id
+
+def get_game(game_id: int) -> Optional[Dict]:
+    return games_db.get(game_id)
+
+def get_all_games() -> Dict[int, Dict]:
+    return games_db
+
+def update_game(game_id: int, game_data: Dict):
+    if game_id in games_db:
+        games_db[game_id] = game_data
+
+def add_player_to_game(game_id: int, token: str, nickname: str) -> bool:
+    if game_id not in games_db:
+        return False
+    
+    game = games_db[game_id]
+    
+    if len(game["players"]) >= game["max_players"]:
+        return False
+    
+    if token in game["players"]:
+        return False
+    
+    game["players"][token] = {
+        "nickname": nickname,
+        "score": 0,
+        "joined_at": "2024-01-01"
+    }
+    
+    if len(game["players"]) >= 2:
+        game["status"] = "ongoing"
+    
+    return True
+
+def remove_player_from_game(game_id: int, token: str):
+    if game_id in games_db and token in games_db[game_id]["players"]:
+        del games_db[game_id]["players"][token]
+        
+        if not games_db[game_id]["players"]:
+            del games_db[game_id]
+        else:
+            if len(games_db[game_id]["players"]) == 1:
+                games_db[game_id]["status"] = "finished"
+
+async def broadcast_game_state(game_id: int):
+    from main import manager
+    
+    if game_id not in games_db:
+        return
+    
+    game = games_db[game_id]
+    
+    for token, player_data in game["players"].items():
+        field_cards = game["game"].get_field()
+        status = "ongoing" if not game["game"].is_game_over() else "ended"
+        score = player_data.get("score", 0)
+        
+        message = {
+            "type": "game_update",
+            "gameId": game_id,
+            "cards": field_cards,
+            "status": status,
+            "score": score,
+            "players": [
+                {"name": p["nickname"], "score": p.get("score", 0)}
+                for p in game["players"].values()
+            ]
+        }
+        
+        await manager.send_personal_message(token, message)
+    
+    await manager.broadcast_to_game(game_id, {
+        "type": "game_notification",
+        "gameId": game_id,
+        "players_count": len(game["players"]),
+        "status": status
+    })
+
+def get_game_by_player_token(token: str) -> Optional[Dict]:
+    for game in games_db.values():
+        if token in game["players"]:
+            return game
+    return None

+ 107 - 0
game_logic.py

@@ -0,0 +1,107 @@
+import random
+from typing import List, Dict, Optional, Tuple
+from dataclasses import dataclass
+
+@dataclass
+class Card:
+    id: int
+    color: int     
+    shape: int      
+    fill: int     
+    count: int      
+
+class SetGame:
+    
+    def __init__(self):
+        self.deck = self._generate_deck()
+        self.field = []
+        self.score = 0
+        self.game_over = False
+        self._initialize_field()
+    
+    def _generate_deck(self) -> List[Card]:
+        deck = []
+        card_id = 0
+        
+        for color in [1, 2, 3]:
+            for shape in [1, 2, 3]:
+                for fill in [1, 2, 3]:
+                    for count in [1, 2, 3]:
+                        deck.append(Card(
+                            id=card_id,
+                            color=color,
+                            shape=shape,
+                            fill=fill,
+                            count=count
+                        ))
+                        card_id += 1
+        
+        random.shuffle(deck)
+        return deck
+    
+    def _initialize_field(self):
+        for _ in range(12):
+            if self.deck:
+                self.field.append(self.deck.pop())
+    
+    def get_field(self) -> List[Dict]:
+        return [
+            {
+                "id": card.id,
+                "color": card.color,
+                "shape": card.shape,
+                "fill": card.fill,
+                "count": card.count
+            }
+            for card in self.field
+        ]
+    
+    def check_set(self, card_ids: List[int]) -> bool:
+        if len(card_ids) != 3:
+            return False
+        
+        cards = [card for card in self.field if card.id in card_ids]
+        if len(cards) != 3:
+            return False
+        
+        for attribute in ['color', 'shape', 'fill', 'count']:
+            values = [getattr(card, attribute) for card in cards]
+            
+            unique_values = set(values)
+            if len(unique_values) not in [1, 3]:
+                return False
+        
+        return True
+    
+    def remove_cards(self, card_ids: List[int]):
+
+        self.field = [card for card in self.field if card.id not in card_ids]
+        
+        while len(self.field) < 12 and self.deck:
+            self.field.append(self.deck.pop())
+        
+        if not self.deck and len(self.field) < 3:
+            self.game_over = True
+    
+    def add_cards(self, count: int = 3):
+        for _ in range(min(count, len(self.deck))):
+            if self.deck:
+                self.field.append(self.deck.pop())
+    
+    def is_game_over(self) -> bool:
+        return self.game_over
+    
+    def find_sets_on_field(self) -> List[List[int]]:
+        sets = []
+        n = len(self.field)
+        
+        for i in range(n):
+            for j in range(i + 1, n):
+                for k in range(j + 1, n):
+                    if self.check_set([self.field[i].id, self.field[j].id, self.field[k].id]):
+                        sets.append([self.field[i].id, self.field[j].id, self.field[k].id])
+        
+        return sets
+    
+    def get_deck_size(self) -> int:
+        return len(self.deck)

+ 370 - 0
main.py

@@ -0,0 +1,370 @@
+from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException, Depends
+from fastapi.middleware.cors import CORSMiddleware
+from contextlib import asynccontextmanager
+import jwt
+import json
+from datetime import datetime, timedelta
+from typing import List, Dict, Optional
+import asyncio
+
+from models import *
+from game_logic import SetGame, Card
+from database import (
+    users_db, games_db, active_connections,
+    get_user_by_nickname, create_user, get_user_by_token,
+    create_game, get_game, get_all_games, update_game,
+    add_player_to_game, remove_player_from_game,
+    broadcast_game_state
+)
+from auth import (
+    SECRET_KEY, ALGORITHM, create_access_token,
+    verify_password, get_password_hash, verify_token
+)
+from websocket_manager import ConnectionManager
+
+manager = ConnectionManager()
+
+@asynccontextmanager
+async def lifespan(app: FastAPI):
+    print("Server starting...")
+    yield
+    print("Server shutting down...")
+    await manager.disconnect_all()
+
+app = FastAPI(
+    title="Set Game Server",
+    description="Сервер для игры Set с WebSocket поддержкой",
+    version="1.0.0",
+    lifespan=lifespan
+)
+
+app.add_middleware(
+    CORSMiddleware,
+    allow_origins=["*"], 
+    allow_credentials=True,
+    allow_methods=["*"],
+    allow_headers=["*"],
+)
+
+class BaseResponse:
+    @staticmethod
+    def success(data: Dict = None, **kwargs):
+        response = {
+            "success": True,
+            "exception": None
+        }
+        if data:
+            response.update(data)
+        response.update(kwargs)
+        return response
+    
+    @staticmethod
+    def error(message: str):
+        return {
+            "success": False,
+            "exception": {
+                "message": message
+            }
+        }
+
+# Эндпоинты
+
+@app.post("/user/register", response_model=RegisterResponse)
+async def register(user: RegisterRequest):
+
+    if get_user_by_nickname(user.nickname):
+        raise HTTPException(
+            status_code=400,
+            detail=BaseResponse.error("User already exists")
+        )
+    
+    access_token = create_access_token(data={"sub": user.nickname})
+    create_user(user.nickname, user.password, access_token)
+    
+    return BaseResponse.success(
+        nickname=user.nickname,
+        accessToken=access_token
+    )
+
+@app.post("/set/room/create")
+async def create_game_room(request: GameRequest):
+    
+    user = get_user_by_token(request.accessToken)
+    if not user:
+        raise HTTPException(
+            status_code=401,
+            detail=BaseResponse.error("Invalid token")
+        )
+    
+    game_id = create_game(request.accessToken)
+    
+    return BaseResponse.success(gameId=game_id)
+
+@app.post("/set/room/list")
+async def list_games(request: GameRequest):
+    
+    user = get_user_by_token(request.accessToken)
+    if not user:
+        raise HTTPException(
+            status_code=401,
+            detail=BaseResponse.error("Invalid token")
+        )
+    
+    games = get_all_games()
+    games_list = [{"id": game_id} for game_id in games.keys()]
+    
+    return BaseResponse.success(games=games_list)
+
+@app.post("/set/room/enter")
+async def enter_game(request: EnterGameRequest):
+    
+    user = get_user_by_token(request.accessToken)
+    if not user:
+        raise HTTPException(
+            status_code=401,
+            detail=BaseResponse.error("Invalid token")
+        )
+    
+    game = get_game(request.gameId)
+    if not game:
+        raise HTTPException(
+            status_code=404,
+            detail=BaseResponse.error("Game not found")
+        )
+    
+    success = add_player_to_game(request.gameId, request.accessToken, user["nickname"])
+    if not success:
+        raise HTTPException(
+            status_code=400,
+            detail=BaseResponse.error("Cannot join game")
+        )
+    
+    await broadcast_game_state(request.gameId)
+    
+    return BaseResponse.success(gameId=request.gameId)
+
+@app.post("/set/field")
+async def get_game_field(request: GameRequest):
+
+    user = get_user_by_token(request.accessToken)
+    if not user:
+        raise HTTPException(
+            status_code=401,
+            detail=BaseResponse.error("Invalid token")
+        )
+    
+    game = None
+    for game_id, game_data in games_db.items():
+        if request.accessToken in game_data["players"]:
+            game = game_data
+            break
+    
+    if not game:
+        raise HTTPException(
+            status_code=400,
+            detail=BaseResponse.error("User not in game")
+        )
+    
+    field_cards = game["game"].get_field()
+    score = game["players"].get(request.accessToken, {}).get("score", 0)
+    status = "ongoing" if not game["game"].is_game_over() else "ended"
+    
+    return {
+        "cards": field_cards,
+        "status": status,
+        "score": score
+    }
+
+@app.post("/set/pick")
+async def pick_cards(request: PickRequest):
+
+    user = get_user_by_token(request.accessToken)
+    if not user:
+        raise HTTPException(
+            status_code=401,
+            detail=BaseResponse.error("Invalid token")
+        )
+    
+    game = None
+    game_id = None
+    for g_id, game_data in games_db.items():
+        if request.accessToken in game_data["players"]:
+            game = game_data
+            game_id = g_id
+            break
+    
+    if not game:
+        raise HTTPException(
+            status_code=400,
+            detail=BaseResponse.error("User not in game")
+        )
+    
+    is_set = game["game"].check_set(request.cards)
+    
+    if is_set:
+        game["game"].remove_cards(request.cards)
+        
+        current_score = game["players"][request.accessToken].get("score", 0)
+        game["players"][request.accessToken]["score"] = current_score + 1
+        
+        update_game(game_id, game)
+        
+        await broadcast_game_state(game_id)
+    
+    score = game["players"][request.accessToken].get("score", 0)
+    
+    return {
+        "isSet": is_set,
+        "score": score
+    }
+
+@app.post("/set/add")
+async def add_cards(request: GameRequest):
+
+    user = get_user_by_token(request.accessToken)
+    if not user:
+        raise HTTPException(
+            status_code=401,
+            detail=BaseResponse.error("Invalid token")
+        )
+    
+    game = None
+    game_id = None
+    for g_id, game_data in games_db.items():
+        if request.accessToken in game_data["players"]:
+            game = game_data
+            game_id = g_id
+            break
+    
+    if not game:
+        raise HTTPException(
+            status_code=400,
+            detail=BaseResponse.error("User not in game")
+        )
+    
+    game["game"].add_cards(3) 
+    
+    update_game(game_id, game)
+    
+    await broadcast_game_state(game_id)
+    
+    return BaseResponse.success()
+
+@app.post("/set/scores")
+async def get_scores(request: GameRequest):
+
+    user = get_user_by_token(request.accessToken)
+    if not user:
+        raise HTTPException(
+            status_code=401,
+            detail=BaseResponse.error("Invalid token")
+        )
+    
+    game = None
+    for game_data in games_db.values():
+        if request.accessToken in game_data["players"]:
+            game = game_data
+            break
+    
+    if not game:
+        raise HTTPException(
+            status_code=400,
+            detail=BaseResponse.error("User not in game")
+        )
+    
+    users = []
+    for player_data in game["players"].values():
+        users.append({
+            "name": player_data["nickname"],
+            "score": player_data.get("score", 0)
+        })
+    
+    return BaseResponse.success(users=users)
+
+@app.websocket("/set")
+async def websocket_endpoint(websocket: WebSocket):
+    
+    await manager.connect(websocket)
+    
+    try:
+        while True:
+            data = await websocket.receive_text()
+            message = json.loads(data)
+            
+            access_token = message.get("accessToken")
+            if not access_token:
+                await websocket.send_text(json.dumps(
+                    BaseResponse.error("Token required")
+                ))
+                continue
+            
+            user = get_user_by_token(access_token)
+            if not user:
+                await websocket.send_text(json.dumps(
+                    BaseResponse.error("Invalid token")
+                ))
+                continue
+            
+            game_id = None
+            for g_id, game_data in games_db.items():
+                if access_token in game_data["players"]:
+                    game_id = g_id
+                    break
+            
+            if not game_id:
+                await websocket.send_text(json.dumps(
+                    BaseResponse.error("User not in game")
+                ))
+                continue
+            
+            manager.register_user(websocket, access_token, game_id)
+            
+            game = games_db[game_id]
+            field_cards = game["game"].get_field()
+            status = "ongoing" if not game["game"].is_game_over() else "ended"
+            score = game["players"][access_token].get("score", 0)
+            
+            await websocket.send_text(json.dumps({
+                "type": "game_state",
+                "cards": field_cards,
+                "status": status,
+                "score": score
+            }))
+            
+    except WebSocketDisconnect:
+        await manager.disconnect(websocket)
+    except Exception as e:
+        print(f"WebSocket error: {e}")
+        await manager.disconnect(websocket)
+
+@app.get("/health")
+async def health_check():
+    return {
+        "status": "healthy",
+        "timestamp": datetime.now().isoformat(),
+        "users_count": len(users_db),
+        "games_count": len(games_db)
+    }
+
+@app.get("/")
+async def root():
+    return {
+        "message": "Set Game Server",
+        "version": "1.0.0",
+        "docs": "/docs",
+        "endpoints": [
+            "/user/register",
+            "/set/room/create",
+            "/set/room/list",
+            "/set/room/enter",
+            "/set/field",
+            "/set/pick",
+            "/set/add",
+            "/set/scores",
+            "/set (WebSocket)"
+        ]
+    }
+
+if __name__ == "__main__":
+    import uvicorn
+    uvicorn.run(app, host="0.0.0.0", port=8000)

+ 66 - 0
models.py

@@ -0,0 +1,66 @@
+from pydantic import BaseModel
+from typing import List, Optional, Dict, Any
+
+
+class RegisterRequest(BaseModel):
+    nickname: str
+    password: str
+
+class RegisterResponse(BaseModel):
+    success: bool = True
+    exception: Optional[Dict[str, str]] = None
+    nickname: str
+    accessToken: str
+
+class GameRequest(BaseModel):
+    accessToken: str
+
+class EnterGameRequest(GameRequest):
+    gameId: int
+
+class PickRequest(GameRequest):
+    cards: List[int]
+
+class CardResponse(BaseModel):
+    id: int
+    color: int
+    shape: int
+    fill: int
+    count: int
+
+class FieldResponse(BaseModel):
+    cards: List[CardResponse]
+    status: str  
+    score: int
+
+class PickResponse(BaseModel):
+    isSet: bool
+    score: int
+
+class UserScore(BaseModel):
+    name: str
+    score: int
+
+class ScoresResponse(BaseModel):
+    success: bool = True
+    exception: Optional[Dict[str, str]] = None
+    users: List[UserScore]
+
+class GameListResponse(BaseModel):
+    success: bool = True
+    exception: Optional[Dict[str, str]] = None
+    games: List[Dict[str, int]]
+
+class CreateGameResponse(BaseModel):
+    success: bool = True
+    exception: Optional[Dict[str, str]] = None
+    gameId: int
+
+class BaseResponseModel(BaseModel):
+    success: bool = True
+    exception: Optional[Dict[str, str]] = None
+
+class WebSocketMessage(BaseModel):
+    type: str
+    data: Optional[Dict[str, Any]] = None
+    accessToken: Optional[str] = None

+ 60 - 0
websocket_manager.py

@@ -0,0 +1,60 @@
+from fastapi import WebSocket
+from typing import Dict, List, Set
+import json
+
+class ConnectionManager:
+    
+    def __init__(self):
+        self.active_connections: Dict[WebSocket, Dict] = {}
+        self.token_to_connection: Dict[str, WebSocket] = {}
+        self.game_connections: Dict[int, Set[WebSocket]] = {}
+    
+    async def connect(self, websocket: WebSocket):
+        await websocket.accept()
+        self.active_connections[websocket] = {
+            "token": None,
+            "game_id": None
+        }
+    
+    async def disconnect(self, websocket: WebSocket):
+        if websocket in self.active_connections:
+            conn_data = self.active_connections[websocket]
+            
+            if conn_data["token"] in self.token_to_connection:
+                del self.token_to_connection[conn_data["token"]]
+            
+            if conn_data["game_id"] in self.game_connections:
+                if websocket in self.game_connections[conn_data["game_id"]]:
+                    self.game_connections[conn_data["game_id"]].remove(websocket)
+            
+            del self.active_connections[websocket]
+    
+    async def disconnect_all(self):
+        for websocket in list(self.active_connections.keys()):
+            await self.disconnect(websocket)
+    
+    def register_user(self, websocket: WebSocket, token: str, game_id: int):
+        if websocket in self.active_connections:
+            self.active_connections[websocket] = {
+                "token": token,
+                "game_id": game_id
+            }
+            self.token_to_connection[token] = websocket
+            
+            if game_id not in self.game_connections:
+                self.game_connections[game_id] = set()
+            self.game_connections[game_id].add(websocket)
+    
+    async def send_personal_message(self, token: str, message: dict):
+        if token in self.token_to_connection:
+            websocket = self.token_to_connection[token]
+            await websocket.send_text(json.dumps(message))
+    
+    async def broadcast_to_game(self, game_id: int, message: dict):
+        if game_id in self.game_connections:
+            for websocket in self.game_connections[game_id]:
+                await websocket.send_text(json.dumps(message))
+    
+    async def broadcast(self, message: dict):
+        for connection in self.active_connections:
+            await connection.send_text(json.dumps(message))