Publisher/Subscriber
Create a complex a message system for your services with the publisher/subscriber pattern.
Publisher
A PUBLISHER is a component that sends messages to one or more SUBSCRIBERS. Unlike a PRODUCER It's the PUBLISHER's responsibility to route the messages to the corresponding SUBSCRIBERS.
Methods:
Name | Description |
---|---|
register |
Registers one or more SUBSCRIBERS to the PUBLISHER. |
publish |
Publishes a message to one or more SUBSCRIBERS based on the topic. |
Example
from torchsystem.services import Subscriber
subscriber = Subscriber()
@subscriber.subscribe('loss')
def on_loss(loss):
print(f"Loss: {loss}")
publisher = Publisher()
publisher.register(subscriber)
publisher.publish(0.1, 'loss')
publish(message, topic)
Publishes a message to one or more SUBSCRIBERS based on the topic.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message
|
Any
|
The message to publish. |
required |
topic
|
str
|
The topic to publish the message to. |
required |
register(*subscribers)
Registers one or more SUBSCRIBERS to the PUBLISHER.
Subscriber
A SUBSCRIBER is a component that listens for messages published processes them accordingly.
Unlike a CONSUMER, a SUBSCRIBER receives messages only from the topics it has subscribed to and it's the PUBLISHER's responsibility to route the messages accordingly.
Methods:
Name | Description |
---|---|
register |
Registers a message type and its corresponding handler function. |
subscribe |
Decorator for registering a handler function to one or more topics. |
receive |
Receives a message from a given topic and triggers the corresponding handler functions to process it. |
Example
from torchsystem import Depends
from torchsystem.services import Subscriber
from torchsystem.services import Publisher
subscriber = Subscriber()
metricsdb = []
def metrics():
return metricsdb
@subscriber.subscribe('loss', 'accuracy')
def store_metric(metric, metrics: list = Depends(metrics)):
metrics.append(metric)
@subscriber.subscribe('accuracy')
def on_accuracy_to_high(metric):
if metric > 0.99:
raise StopIteration
publisher = Publisher()
publisher.register(subscriber)
publisher.publish(0.1, 'loss')
publisher.publish(0.9, 'accuracy')
assert metricsdb == [0.1, 0.9]
try:
publisher.publish(1.0, 'accuracy')
except StopIteration:
print("Early stopping")
dependency_overrides
property
Returns the dependency overrides for the subscriber. This is useful for late binding, testing and changing the behavior of the subscriber in runtime.
Returns:
Name | Type | Description |
---|---|---|
dict |
dict
|
A dictionary of the dependency map. |
Example
subscriber = Subscriber()
...
subscriber.dependency_overrides[db] = lambda: []
receive(message, topic)
Receives a message from a given topic and triggers the corresponding handler functions to process it. This is called by the PUBLISHER but is also useful for deliver messages between handlers directly.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message
|
Any
|
The message to process. |
required |
topic
|
str
|
The topic to process the message from. |
required |
Example
subscriber = Subscriber()
@subscriber.subscribe('metrics')
def store_metric(metrics: list):
for metric in metrics:
subscriber.receive(metric, metric['name'])
@subscriber.subscribe('loss')
def on_loss(loss):
print(f"Loss: {loss['value']}")
@subscriber.subscribe('accuracy')
def on_accuracy(accuracy):
print(f"Accuracy: {accuracy['value']}")
subscriber.receive([
{'name': 'loss', 'value': 0.1},
{'name': 'accuracy', 'value': 0.9}],
topic='metrics')
# Output:
# Loss: 0.1
# Accuracy: 0.9
register(topic, wrapped)
Registers a handler function with a given topic.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
topic
|
str
|
The topic to register the handler function to. |
required |
wrapped
|
Callable[..., None]
|
The handler function to register. |
required |
subscribe(*topics)
Decorator for registering a handler function to one or more topics.
Args:
*topics (str): The topics to register the handler function to.
Returns:
Type | Description |
---|---|
Callable[..., None]
|
Callable[..., None]: The decorated handler function. |
Example
subscriber = Subscriber()
@subscriber.subscribe('loss', 'accuracy')
def store_metric(metric, metrics = Depends(metrics)):
...