Source code for controller.kafka_manager_def
"""
Kafka Manager module (also referable to as 'Application Controller Kafka Manager' module)
This module contains the class and module definitions for the Application Controller Kafka Manager
"""
import asyncio
from typing import Tuple
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
from utils import exceptions
[docs]
class KafkaManager:
def __init__(self, number_of_users: int):
self.__consumers = tuple()
self.__producers = tuple()
for i in range(number_of_users):
if i % min(10, (number_of_users // 2) + 1) == 0:
self.__consumers += (AIOKafkaConsumer(),)
self.__producers += (AIOKafkaProducer(),)
@property
def consumers(self) -> Tuple[AIOKafkaConsumer]:
return self.__consumers
@property
def producers(self) -> Tuple[AIOKafkaProducer]:
return self.__producers
@producers.setter
def producers(self, *args, **kwargs):
raise exceptions.OperationNotAllowedException(
"Modification of Kafka Manager Producers Is Not Allowed"
)
@consumers.setter
def consumers(self, *args, **kwargs):
raise exceptions.OperationNotAllowedException(
"Modification of Kafka Manager Consumers Is Not Allowed"
)
@consumers.deleter
def consumers(self, *args, **kwargs):
raise exceptions.OperationNotAllowedException(
"Deletion of Kafka Manager Consumers Is Not Allowed"
)
@producers.deleter
def producers(self, *args, **kwargs):
raise exceptions.OperationNotAllowedException(
"Deletion of Kafka Manager Producers Is Not Allowed"
)
async def __close(self):
"""
Gracefully close the Kafka manager.
:return: None
"""
for consumer in self.consumers:
if type(consumer) is AIOKafkaConsumer:
await consumer.stop()
for producer in self.producers:
if type(producer) is AIOKafkaProducer:
await producer.stop()
def __del__(self):
try:
loop = asyncio.get_running_loop()
loop.create_task(self.__close())
except RuntimeError:
asyncio.run(self.__close())
[docs]
class KafkaManagerFactory:
"""
Class definition for the kafka manager employed primarily by the application controller.
"""
[docs]
@staticmethod
def create_base_kafka_manager(number_of_users: int = None) -> KafkaManager:
"""
Create and return bare-minimum Kafka Manager object.
:param number_of_users: Number of users participating in chat.
:return: KafkaManager object.
"""
if number_of_users is None:
raise ValueError(
"You must supply the number of users the kafka manager is catering for!"
)
if type(number_of_users) is not int or (
type(number_of_users) is int and number_of_users < 1
):
raise ValueError(
"You must supply a non-zero positive integer for number of users the kafka manager is catering for!"
)
return KafkaManager(number_of_users)