Skip to content

Producer/Consumer

Model ocurrences in your services and handle them with the producer/consumer pattern.

Consumer

A CONSUMER is a component that listens for and reacts to ocurrences in a BOUNDED CONTEXT. Occurrences can be modeled as events and should contain all the information needed to describe what happened in the system. Then CONSUMERs are responsible for processing events and triggering side effects in response to them.

Unlike a SUBSCRIBER, that receive messages routed by the PUBLISHER, a CONSUMER is responsible for deciding which handlers to invoke based on the type of the message it consumes. The message type should describe the ocurrence in past tense as DDD suggests when modeling events.

The EVENTS should be modeled with UBIQUITOUS LANGUAGE. This means that the names of the events should reflect the domain ocurrences that the consumer is responsible for. Keep this in mind when naming the events that will be consumed by the consumer. The consumer maps it's handlers to keys generated by the EVENT's name.

Methods:

Name Description
register

Registers a message type and its corresponding handler function.

handler

Decorator for registering a handler function for one or more message types.

consume

Consumes a message by invoking its registered handler functions.

Example
from torchsystem.services import Consumer, event
from torchsystem.services import Publisher

@event
class ModelTrained:
    model: Callable
    metrics: Sequence

@event
class ModelEvaluated:
    model: Callable
    metrics: Sequence

consumer = Consumer()
publisher = Publisher()

@consumer.handler
def on_model_iterated(event: UserCreated | UserUpdated):
    for metric in event.metrics:
        publisher.publish(metric, metric['name'])

consumer.consume(ModelTrained(model, [{'name': 'loss', 'value': 0.1}, {'name': 'accuracy', 'value': 0.9}]))
consumer.consume(ModelEvaluated(model, [{'name': 'loss', 'value': 0.1}, {'name': 'accuracy', 'value': 0.9}]))

dependency_overrides property

Returns the dependency overrides for the consumer. This is useful for late binding, testing and changing the behavior of the consumer in runtime.

Returns:

Name Type Description
dict dict

A dictionary of the dependency map.

consume(message)

Consumes a message by invoking its registered handler functions. If the message type is not registered with any handler, it will be ignored.

Parameters:

Name Type Description Default
message Any

The message to consume.

required

handler(wrapped)

Decorator for registering a handler function for one or more message types. The handler is registered with the name of the function as the key. The handler is also injected with the dependencies provided by the consumer.

Each message type can have multiple handlers registered to it and each handler can be registered to multiple message at the same time using unions.

Parameters:

Name Type Description Default
wrapped Callable[..., None]

The function to be registered as a handler.

required

Returns:

Type Description
Callable[..., None]

Callable[..., None]: The injected handler function.

register(annotation, handler)

Registers a message type and its corresponding handler function. Handles nested or generic annotations and union types. If the annotation is a union type, it will register the handler for each of its arguments.

This method is used to register a handler from the handler method and should not be called directly. Use the handler decorator instead.

Parameters:

Name Type Description Default
annotation Any

The message annotation to be registered.

required
handler Callable[..., None]

The handler function to be registered.

required

Returns:

Type Description
Callable[..., None]

Callable[..., None]: The injected handler function.

Producer

A producer is responsible for emitting EVENTS that are consumed by consumers. You can implement a producer implementing the register method to register consumers, and some delivery mechanism to deliver the events to them.

Methods:

Name Description
register

Registers a consumer to the producer.

dispatch

Dispatches an event to all registered consumers.

Example
from torchsystem.services import Consumer
from torchsystem.services import Producer, event

@event
class ModelTrained:
    model: Callable
    metrics: Sequence

@event
class ModelEvaluated:
    model: Callable
    metrics: Sequence
...

producer = Producer()
producer.register(consumer)
producer.dispatch(ModelTrained(model, [{'name': 'loss', 'value': 0.1}, {'name': 'accuracy', 'value': 0.9}]))     

dispatch(message)

Dispatches an event to all registered consumers. The event will be consumed by the consumers that have registered handlers for the event type.

Parameters:

Name Type Description Default
message Any

The event to dispatch.

required

register(*consumers)

Registers a sequence of consumers to the producer. The producer will dispatch events to all registered consumers.

event(cls)

A decorator to define an Event message.

Parameters:

Name Type Description Default
cls type

The ocurrence of something that happened represented by a class

required

Returns:

Name Type Description
type

A dataclass representing the event.

Example
from torchsystem.services import event

@event
class ModelTrained:
    model: Callable
    metrics: Sequence