| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960 |
- from fastapi import WebSocket
- from typing import Dict, List, Set
- import json
- class ConnectionManager:
-
- def __init__(self):
- self.active_connections: Dict[WebSocket, Dict] = {}
- self.token_to_connection: Dict[str, WebSocket] = {}
- self.game_connections: Dict[int, Set[WebSocket]] = {}
-
- async def connect(self, websocket: WebSocket):
- await websocket.accept()
- self.active_connections[websocket] = {
- "token": None,
- "game_id": None
- }
-
- async def disconnect(self, websocket: WebSocket):
- if websocket in self.active_connections:
- conn_data = self.active_connections[websocket]
-
- if conn_data["token"] in self.token_to_connection:
- del self.token_to_connection[conn_data["token"]]
-
- if conn_data["game_id"] in self.game_connections:
- if websocket in self.game_connections[conn_data["game_id"]]:
- self.game_connections[conn_data["game_id"]].remove(websocket)
-
- del self.active_connections[websocket]
-
- async def disconnect_all(self):
- for websocket in list(self.active_connections.keys()):
- await self.disconnect(websocket)
-
- def register_user(self, websocket: WebSocket, token: str, game_id: int):
- if websocket in self.active_connections:
- self.active_connections[websocket] = {
- "token": token,
- "game_id": game_id
- }
- self.token_to_connection[token] = websocket
-
- if game_id not in self.game_connections:
- self.game_connections[game_id] = set()
- self.game_connections[game_id].add(websocket)
-
- async def send_personal_message(self, token: str, message: dict):
- if token in self.token_to_connection:
- websocket = self.token_to_connection[token]
- await websocket.send_text(json.dumps(message))
-
- async def broadcast_to_game(self, game_id: int, message: dict):
- if game_id in self.game_connections:
- for websocket in self.game_connections[game_id]:
- await websocket.send_text(json.dumps(message))
-
- async def broadcast(self, message: dict):
- for connection in self.active_connections:
- await connection.send_text(json.dumps(message))
|