Producer/Consumer
Model ocurrences in your services and handle them with the producer/consumer pattern.
Consumer
A CONSUMER is a component that listens for and reacts to ocurrences in a BOUNDED CONTEXT. Occurrences can be modeled as events and should contain all the information needed to describe what happened in the system. Then CONSUMERs are responsible for processing events and triggering side effects in response to them.
Unlike a SUBSCRIBER, that receive messages routed by the PUBLISHER, a CONSUMER is responsible for deciding which handlers to invoke based on the type of the message it consumes. The message type should describe the ocurrence in past tense as DDD suggests when modeling events.
The EVENTS should be modeled with UBIQUITOUS LANGUAGE. This means that the names of the events should reflect the domain ocurrences that the consumer is responsible for. Keep this in mind when naming the events that will be consumed by the consumer. The consumer maps it's handlers to keys generated by the EVENT's name.
Methods:
Name | Description |
---|---|
register |
Registers a message type and its corresponding handler function. |
handler |
Decorator for registering a handler function for one or more message types. |
consume |
Consumes a message by invoking its registered handler functions. |
Example
from torchsystem.services import Consumer, event
from torchsystem.services import Publisher
@event
class ModelTrained:
model: Callable
metrics: Sequence
@event
class ModelEvaluated:
model: Callable
metrics: Sequence
consumer = Consumer()
publisher = Publisher()
@consumer.handler
def on_model_iterated(event: UserCreated | UserUpdated):
for metric in event.metrics:
publisher.publish(metric, metric['name'])
consumer.consume(ModelTrained(model, [{'name': 'loss', 'value': 0.1}, {'name': 'accuracy', 'value': 0.9}]))
consumer.consume(ModelEvaluated(model, [{'name': 'loss', 'value': 0.1}, {'name': 'accuracy', 'value': 0.9}]))
dependency_overrides
property
Returns the dependency overrides for the consumer. This is useful for late binding, testing and changing the behavior of the consumer in runtime.
Returns:
Name | Type | Description |
---|---|---|
dict |
dict
|
A dictionary of the dependency map. |
consume(message)
Consumes a message by invoking its registered handler functions. If the message type is not registered with any handler, it will be ignored.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message
|
Any
|
The message to consume. |
required |
handler(wrapped)
Decorator for registering a handler function for one or more message types. The handler is registered with the name of the function as the key. The handler is also injected with the dependencies provided by the consumer.
Each message type can have multiple handlers registered to it and each handler can be registered to multiple message at the same time using unions.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
wrapped
|
Callable[..., None]
|
The function to be registered as a handler. |
required |
Returns:
Type | Description |
---|---|
Callable[..., None]
|
Callable[..., None]: The injected handler function. |
register(annotation, handler)
Registers a message type and its corresponding handler function. Handles nested or generic annotations and union types. If the annotation is a union type, it will register the handler for each of its arguments.
This method is used to register a handler from the handler
method and should not be called directly. Use
the handler
decorator instead.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
annotation
|
Any
|
The message annotation to be registered. |
required |
handler
|
Callable[..., None]
|
The handler function to be registered. |
required |
Returns:
Type | Description |
---|---|
Callable[..., None]
|
Callable[..., None]: The injected handler function. |
Producer
A producer is responsible for
emitting EVENTS that are consumed by consumers. You can implement a producer implementing the register
method to register consumers, and some delivery mechanism to deliver the events to them.
Methods:
Name | Description |
---|---|
register |
Registers a consumer to the producer. |
dispatch |
Dispatches an event to all registered consumers. |
Example
from torchsystem.services import Consumer
from torchsystem.services import Producer, event
@event
class ModelTrained:
model: Callable
metrics: Sequence
@event
class ModelEvaluated:
model: Callable
metrics: Sequence
...
producer = Producer()
producer.register(consumer)
producer.dispatch(ModelTrained(model, [{'name': 'loss', 'value': 0.1}, {'name': 'accuracy', 'value': 0.9}]))
dispatch(message)
Dispatches an event to all registered consumers. The event will be consumed by the consumers that have registered handlers for the event type.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message
|
Any
|
The event to dispatch. |
required |
register(*consumers)
Registers a sequence of consumers to the producer. The producer will dispatch events to all registered consumers.
event(cls)
A decorator to define an Event message.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cls
|
type
|
The ocurrence of something that happened represented by a class |
required |
Returns:
Name | Type | Description |
---|---|---|
type |
A dataclass representing the event. |
Example
from torchsystem.services import event
@event
class ModelTrained:
model: Callable
metrics: Sequence