Skip to content

Message Bus

Messaging Channels

NATSQueue

Bases: AbstractMessageQueue[Generic[T]], ABC

A distributed message queue on top of NATS Jetstream.

Source code in germinate_ai/message_bus/message_queue.py
class NATSQueue(AbstractMessageQueue[Generic[T]], ABC):
    """A distributed message queue on top of NATS Jetstream."""

    def __init__(
        self,
        stream: str,
        subject: str,
        durable_consumer: str = None,
        connection: NatsConnection = None,
    ):
        if connection is None:
            connection = NatsConnection()
        self.connection = connection
        self.stream = stream
        self.subject = subject
        self.durable_consumer = durable_consumer
        self.consumer = None

    async def connect(self):
        if not self.connection.is_connected:
            await self.connection.connect()
        # if self.durable_consumer is not None:
        self.consumer = await self.connection.jetstream.pull_subscribe(
            stream=self.stream,
            subject=self.subject,
            durable=self.durable_consumer,
        )

    async def enqueue(self, item: T):
        await self.connection.jetstream.publish(self.subject, item.encode())

    async def dequeue(self) -> Msg:
        # Callers are responsible for acknowledging with `await msg.ack()` call!!
        msgs = await self.consumer.fetch(batch=1)
        for msg in msgs:
            msg.data = msg.data.decode()
            return msg

    async def aiter_dequeue(self) -> AsyncGenerator[T, None]:
        msgs = await self.consumer.fetch(batch=1)
        for msg in msgs:
            # bytes -> str
            msg.data = msg.data.decode()
            yield msg
            await msg.ack()

NatsConnection

NATS connection singleton class.

Source code in germinate_ai/message_bus/nats/connection.py
class NatsConnection:
    """NATS connection singleton class."""

    _instance: "NatsConnection" = None

    def __new__(cls, nats_url=NATS_URL) -> "NatsConnection":
        if cls._instance is None:
            logger.debug(f"Creating a new NATS connection to cluster: `{nats_url}`...")
            cls._instance = super().__new__(cls)
        return cls._instance

    def __init__(self, nats_url=NATS_URL):
        self.nats_url = nats_url
        self.nc = None
        self.jetstream = None

    @property
    def is_connected(self) -> bool:
        if self.nc is None:
            return False
        return self.nc.is_connected

    async def connect(self):
        # Connect if necessary
        if self.nc is None:
            self.nc = await nats.connect(self.nats_url)
        # Jetstream context
        if self.jetstream is None:
            self.jetstream = self.nc.jetstream()

    async def close(self):
        await self.nc.close()

ROMessageChannel

Bases: AbstractMessageChannel

A Read Only Message Channel

Source code in germinate_ai/message_bus/message_channels.py
class ROMessageChannel(AbstractMessageChannel):
    """A Read Only Message Channel"""

    consumer: JetStreamContext.PullSubscription

    async def connect(self):
        """Connect to the message bus."""
        await super().connect()
        self.consumer = await self.connection.jetstream.pull_subscribe(
            stream=self.stream,
            subject=self.subject,
            durable=self.durable_consumer,
        )

    async def read(self, batch: int = 1) -> typ.Sequence[Msg]:
        """Read `batch` messages from the channel."""
        if not self.connected:
            await self.connect()
        msgs = await self.consumer.fetch(batch=batch)
        for msg in msgs:
            msg.data = msg.data.decode()
        return msgs

connect() async

Connect to the message bus.

Source code in germinate_ai/message_bus/message_channels.py
async def connect(self):
    """Connect to the message bus."""
    await super().connect()
    self.consumer = await self.connection.jetstream.pull_subscribe(
        stream=self.stream,
        subject=self.subject,
        durable=self.durable_consumer,
    )

read(batch=1) async

Read batch messages from the channel.

Source code in germinate_ai/message_bus/message_channels.py
async def read(self, batch: int = 1) -> typ.Sequence[Msg]:
    """Read `batch` messages from the channel."""
    if not self.connected:
        await self.connect()
    msgs = await self.consumer.fetch(batch=batch)
    for msg in msgs:
        msg.data = msg.data.decode()
    return msgs

RWMessageChannel

Bases: WOMessageChannel, ROMessageChannel

A Read/Write Message Channel

Source code in germinate_ai/message_bus/message_channels.py
class RWMessageChannel(WOMessageChannel, ROMessageChannel):
    "A Read/Write Message Channel"

    pass

WOMessageChannel

Bases: AbstractMessageChannel

A Write Only Message Channel

Source code in germinate_ai/message_bus/message_channels.py
class WOMessageChannel(AbstractMessageChannel):
    "A Write Only Message Channel"

    def __init__(
        self,
        stream: str,
        subject: str,
        durable_consumer: str = None,
        connection: NatsConnection = None,
    ):
        if not _validate_write_subject(subject):
            raise ValueError(f"Invalid subject {subject} for write channel")

        self.subject = subject
        if connection is None:
            connection = NatsConnection()
        self.connection = connection
        self.stream = stream
        self.durable_consumer = durable_consumer

    async def write(self, msg: str):
        """Write the string `msg` into the message bus."""
        await self.connection.jetstream.publish(self.subject, msg.encode())

write(msg) async

Write the string msg into the message bus.

Source code in germinate_ai/message_bus/message_channels.py
async def write(self, msg: str):
    """Write the string `msg` into the message bus."""
    await self.connection.jetstream.publish(self.subject, msg.encode())

nats_connection(nats_url=NATS_URL) async

NATS connection context manager that opens and closes the connection for you.

Source code in germinate_ai/message_bus/nats/connection.py
@asynccontextmanager
async def nats_connection(nats_url=NATS_URL):
    """NATS connection context manager that opens and closes the connection for you."""
    nc = NatsConnection(nats_url=nats_url)
    await nc.connect()
    yield nc
    await nc.close()

ro_message_channel_factory(stream, subject, durable_consumer=None, connection=None)

Create a Read Only Message Channel

Source code in germinate_ai/message_bus/factories.py
def ro_message_channel_factory(
    stream, subject, durable_consumer=None, connection=None
) -> ROMessageChannel:
    """Create a Read Only Message Channel"""
    chan = ROMessageChannel(
        stream=stream,
        subject=subject,
        durable_consumer=durable_consumer,
        connection=connection,
    )
    return chan

rw_message_channel_factory(stream, subject, durable_consumer=None, connection=None)

Create a Read Only Message Channel

Source code in germinate_ai/message_bus/factories.py
def rw_message_channel_factory(
    stream, subject, durable_consumer=None, connection=None
) -> ROMessageChannel:
    """Create a Read Only Message Channel"""
    chan = RWMessageChannel(
        stream=stream,
        subject=subject,
        durable_consumer=durable_consumer,
        connection=connection,
    )
    return chan

wo_message_channel_factory(stream, subject, durable_consumer=None, connection=None)

Create a Read Only Message Channel

Source code in germinate_ai/message_bus/factories.py
def wo_message_channel_factory(
    stream, subject, durable_consumer=None, connection=None
) -> ROMessageChannel:
    """Create a Read Only Message Channel"""
    chan = WOMessageChannel(
        stream=stream,
        subject=subject,
        durable_consumer=durable_consumer,
        connection=connection,
    )
    return chan