tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

notifications.py (2097B)


      1 #!/usr/bin/env python
      2 
      3 import asyncio
      4 import json
      5 
      6 import aioredis
      7 import django
      8 import websockets
      9 
     10 django.setup()
     11 
     12 from django.contrib.contenttypes.models import ContentType
     13 from sesame.utils import get_user
     14 from websockets.frames import CloseCode
     15 
     16 
     17 CONNECTIONS = {}
     18 
     19 
     20 def get_content_types(user):
     21    """Return the set of IDs of content types visible by user."""
     22    # This does only three database queries because Django caches
     23    # all permissions on the first call to user.has_perm(...).
     24    return {
     25        ct.id
     26        for ct in ContentType.objects.all()
     27        if user.has_perm(f"{ct.app_label}.view_{ct.model}")
     28        or user.has_perm(f"{ct.app_label}.change_{ct.model}")
     29    }
     30 
     31 
     32 async def handler(websocket):
     33    """Authenticate user and register connection in CONNECTIONS."""
     34    sesame = await websocket.recv()
     35    user = await asyncio.to_thread(get_user, sesame)
     36    if user is None:
     37        await websocket.close(CloseCode.INTERNAL_ERROR, "authentication failed")
     38        return
     39 
     40    ct_ids = await asyncio.to_thread(get_content_types, user)
     41    CONNECTIONS[websocket] = {"content_type_ids": ct_ids}
     42    try:
     43        await websocket.wait_closed()
     44    finally:
     45        del CONNECTIONS[websocket]
     46 
     47 
     48 async def process_events():
     49    """Listen to events in Redis and process them."""
     50    redis = aioredis.from_url("redis://127.0.0.1:6379/1")
     51    pubsub = redis.pubsub()
     52    await pubsub.subscribe("events")
     53    async for message in pubsub.listen():
     54        if message["type"] != "message":
     55            continue
     56        payload = message["data"].decode()
     57        # Broadcast event to all users who have permissions to see it.
     58        event = json.loads(payload)
     59        recipients = (
     60            websocket
     61            for websocket, connection in CONNECTIONS.items()
     62            if event["content_type_id"] in connection["content_type_ids"]
     63        )
     64        websockets.broadcast(recipients, payload)
     65 
     66 
     67 async def main():
     68    async with websockets.serve(handler, "localhost", 8888):
     69        await process_events()  # runs forever
     70 
     71 
     72 if __name__ == "__main__":
     73    asyncio.run(main())