Source code for controller.controller_def

Controller module (also referable to as 'Application Controller' module)

This module contains the class and module definitions for the Application Controller

from __future__ import annotations
import random
from typing import Optional, List

import websockets

from role import Role
from user import User
from .kafka_manager_def import KafkaManagerFactory

[docs] class Controller: """ Class definition for application controller. """ ws_url = None __websocket: Optional[websockets.WebSocketClientProtocol] = None
[docs] @staticmethod async def initialise( number_of_users: int, ws_url: str, chat_context: str ) -> Controller: """ Constructor for Controller object. The controller performs lightly as a Controller from the MVC Design Pattern :param number_of_users: Number of users to participate in application lifecycle :param ws_url: Web socket url for Controller to interact with. :param chat_context: Group chat context """ cls = Controller() assert (type(number_of_users) is int and number_of_users > 0) and type( chat_context ) is str cls.ws_url = ws_url cls.chat_context = chat_context chat_uuid = cls.ws_url.split("/")[-1] cls.kafka_manager = KafkaManagerFactory.create_base_kafka_manager( number_of_users ) cls.participating_users: List[User] = [User() for _ in range(number_of_users)] for consumer in cls.kafka_manager.consumers: consumer.subscribe([chat_uuid]) cls.first_publisher: User = random.choice(cls.participating_users) cls.first_publisher.role = Role.PUBLISHER await cls.connect_ws() return cls
@property def websocket(self): """Controller websocket getter.""" return self.__websocket @websocket.setter def websocket(self, new_websocket_value): assert type(new_websocket_value) is websockets.WebSocketClientProtocol self.__websocket = new_websocket_value
[docs] async def connect_ws(self, message=None): """ Connects controller to websocket with web socket url :param message: Optional message to send to the websocket :return: """ if not message: self.__websocket = await websockets.connect(self.ws_url) else: async with websockets.connect(self.ws_url) as websocket: await websocket.send(message)