Skip to content

Publisher/Subscriber

Create a complex a message system for your services with the publisher/subscriber pattern.

Publisher

A PUBLISHER is a component that sends messages to one or more SUBSCRIBERS. Unlike a PRODUCER It's the PUBLISHER's responsibility to route the messages to the corresponding SUBSCRIBERS.

Methods:

Name Description
register

Registers one or more SUBSCRIBERS to the PUBLISHER.

publish

Publishes a message to one or more SUBSCRIBERS based on the topic.

Example
from torchsystem.services import Subscriber

subscriber = Subscriber()

@subscriber.subscribe('loss')
def on_loss(loss):
    print(f"Loss: {loss}")

publisher = Publisher()
publisher.register(subscriber)
publisher.publish(0.1, 'loss')

publish(message, topic)

Publishes a message to one or more SUBSCRIBERS based on the topic.

Parameters:

Name Type Description Default
message Any

The message to publish.

required
topic str

The topic to publish the message to.

required

register(*subscribers)

Registers one or more SUBSCRIBERS to the PUBLISHER.

Subscriber

A SUBSCRIBER is a component that listens for messages published processes them accordingly.

Unlike a CONSUMER, a SUBSCRIBER receives messages only from the topics it has subscribed to and it's the PUBLISHER's responsibility to route the messages accordingly.

Methods:

Name Description
register

Registers a message type and its corresponding handler function.

subscribe

Decorator for registering a handler function to one or more topics.

receive

Receives a message from a given topic and triggers the corresponding handler functions to process it.

Example
from torchsystem import Depends
from torchsystem.services import Subscriber
from torchsystem.services import Publisher

subscriber = Subscriber()
metricsdb = []

def metrics():
    return metricsdb

@subscriber.subscribe('loss', 'accuracy')
def store_metric(metric, metrics: list = Depends(metrics)):
    metrics.append(metric)

@subscriber.subscribe('accuracy')
def on_accuracy_to_high(metric):
    if metric > 0.99:
        raise StopIteration

publisher = Publisher()
publisher.register(subscriber)

publisher.publish(0.1, 'loss')
publisher.publish(0.9, 'accuracy')
assert metricsdb == [0.1, 0.9]

try:
    publisher.publish(1.0, 'accuracy')
except StopIteration:
    print("Early stopping") 

dependency_overrides property

Returns the dependency overrides for the subscriber. This is useful for late binding, testing and changing the behavior of the subscriber in runtime.

Returns:

Name Type Description
dict dict

A dictionary of the dependency map.

Example
subscriber = Subscriber()
...

subscriber.dependency_overrides[db] = lambda: []

receive(message, topic)

Receives a message from a given topic and triggers the corresponding handler functions to process it. This is called by the PUBLISHER but is also useful for deliver messages between handlers directly.

Parameters:

Name Type Description Default
message Any

The message to process.

required
topic str

The topic to process the message from.

required
Example
subscriber = Subscriber()

@subscriber.subscribe('metrics')
def store_metric(metrics: list):
    for metric in metrics:
        subscriber.receive(metric, metric['name'])

@subscriber.subscribe('loss')
def on_loss(loss):
    print(f"Loss: {loss['value']}")

@subscriber.subscribe('accuracy')
def on_accuracy(accuracy):
    print(f"Accuracy: {accuracy['value']}")

subscriber.receive([
    {'name': 'loss', 'value': 0.1}, 
    {'name': 'accuracy', 'value': 0.9}], 
topic='metrics')

# Output:
# Loss: 0.1
# Accuracy: 0.9

register(topic, wrapped)

Registers a handler function with a given topic.

Parameters:

Name Type Description Default
topic str

The topic to register the handler function to.

required
wrapped Callable[..., None]

The handler function to register.

required

subscribe(*topics)

Decorator for registering a handler function to one or more topics.

Args:

*topics (str): The topics to register the handler function to.

Returns:

Type Description
Callable[..., None]

Callable[..., None]: The decorated handler function.

Example
subscriber = Subscriber()

@subscriber.subscribe('loss', 'accuracy')
def store_metric(metric, metrics = Depends(metrics)):
    ...