channels官方文档:Django Channels — Channels 4.0.0 documentation
效果如下:
主要实现功能
基于Django的认证的群聊
具体实现
当建立websocket的时候,建立之前是http消息,我们可以拿到http消息里面的cookie等信息进行认证,本次使用的是jwt认证,因此需要在建立的连接的时候,将jwt认证信息通过cookie发送给后端,进行通信认证,具体方法如下:
from rest_framework_simplejwt.tokens import AccessToken
from django.http.cookie import parse_cookie
from rest_framework_simplejwt.authentication import JWTAuthenticationclass ServerAccessToken(AccessToken):"""自定义的token方法是为了登出的时候,将 access token 禁用"""def verify(self):user_id = self.payload.get('user_id')if BlackAccessTokenCache(user_id, hashlib.md5(self.token).hexdigest()).get_storage_cache():raise TokenError('Token is invalid or expired')super().verify()class CookieJWTAuthentication(JWTAuthentication):"""支持cookie认证,是为了可以访问 django-proxy 的页面,比如 flower"""def get_header(self, request):header = super().get_header(request)if not header:cookies = request.META.get('HTTP_COOKIE')if cookies:cookie_dict = parse_cookie(cookies)header = f"Bearer {cookie_dict.get('X-Token')}".encode('utf-8')return headerasync def token_auth(scope):cookies = scope.get('cookies')if cookies:token = f"{cookies.get('X-Token')}".encode('utf-8')if token:try:auth_class = CookieJWTAuthentication()validated_token = ServerAccessToken(token)return True, await sync_to_async(auth_class.get_user)(validated_token)except TokenError as e:return False, e.args[0]return False, False
然后再建立连接的时候,进行一个认证
class MessageNotify(AsyncJsonWebsocketConsumer):def __init__(self, *args, **kwargs):super().__init__(args, kwargs)self.room_group_name = Noneself.disconnected = Trueself.username = ""async def connect(self):status, user_obj = await token_auth(self.scope)if not status:logger.error(f"auth failed {user_obj}")# https://developer.mozilla.org/zh-CN/docs/Web/API/CloseEvent#status_codesawait self.close(4401)
具体群聊核心代码如下:
class MessageNotify(AsyncJsonWebsocketConsumer):def __init__(self, *args, **kwargs):super().__init__(args, kwargs)self.room_group_name = Noneself.disconnected = Trueself.username = ""async def connect(self):status, user_obj = await token_auth(self.scope)if not status:logger.error(f"auth failed {user_obj}")# https://developer.mozilla.org/zh-CN/docs/Web/API/CloseEvent#status_codesawait self.close(4401)else:logger.info(f"{user_obj} connect success")room_name = self.scope["url_route"]["kwargs"].get('room_name')username = self.scope["url_route"]["kwargs"].get('username')# data = verify_token(token, room_name, success_once=True)if username and room_name:self.disconnected = Falseself.username = usernameself.room_group_name = f"message_{room_name}"# Join room groupawait self.channel_layer.group_add(self.room_group_name, self.channel_name)await self.accept()else:logger.error(f"room_name:{room_name} token:{username} auth failed")await self.close()async def disconnect(self, close_code):self.disconnected = Trueif self.room_group_name:await self.channel_layer.group_discard(self.room_group_name, self.channel_name)# Receive message from WebSocketasync def receive_json(self, content, **kwargs):action = content.get('action')if not action:await self.close()data = content.get('data', {})if action == "message":data['uid'] = self.channel_namedata['username'] = self.username# Send message to room groupawait self.channel_layer.group_send(self.room_group_name, {"type": "chat_message", "data": data})else:await self.channel_layer.send(self.channel_name, {"type": action, "data": data})async def userinfo(self, event):data = {'username': self.username,'uid': self.channel_name}await self.send_data('userinfo', {'data': data})async def chat_message(self, event):data = event["data"]data['time'] = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')# Send message to WebSocketawait self.send_data('message', {'data': data})async def send_data(self, action, content, close=False):data = {'time': time.time(),'action': action}data.update(content)return await super().send_json(data, close)
中间还涉及消息队列,本次使用的是基于redis的消息队列,需要在settings.py进行配置
CHANNEL_LAYERS = {"default": {# "BACKEND": "channels_redis.core.RedisChannelLayer","BACKEND": "channels_redis.pubsub.RedisPubSubChannelLayer","CONFIG": {"hosts": [f"redis://:{REDIS_PASSWORD}@{REDIS_HOST}:{REDIS_PORT}/{CHANNEL_LAYERS_CACHE_ID}"],},},
}
代码已经开源,GitHub地址:GitHub - nineaiyu/xadmin-server: xadmin-基于Django+vue3的rbac权限管理系统