Connecting Banyan Applications To MQTT
The MQTT Gateway is a Banyan component that can subscribe to receive MQTT messages from the MQTT network, and then forward those messages to the Banyan network. It also can subscribe to receive Banyan messages, and then forward those to the MQTT network.
The MQTT Gateway is installed as a command-line executable when Python Banyan is installed. To invoke the MQTT Gateway, open a terminal window and type:
mgw
There are several optional command-line parameters available in mgw. For the examples that follow, the default values are accepted
Here is a list of all the optional parameters.
mgw -h
usage: mqtt_gateway.py [-h] [-a MQTT_IP_ADDRESS] [-b BACK_PLANE_IP_ADDRESS]
[-d MQTT_PORT] [-e BANYAN_PUB_TOPIC]
[-g BANYAN_SUB_TOPICS [BANYAN_SUB_TOPICS ...]]
[-i MQTT_PUB_TOPIC]
[-j MQTT_SUB_TOPICS [MQTT_SUB_TOPICS ...]]
[-n PROCESS_NAME] [-p PUBLISHER_PORT]
[-s SUBSCRIBER_PORT] [-t LOOP_TIME]
optional arguments:
-h, --help show this help message and exit
-a MQTT_IP_ADDRESS IP address of mqtt broker
-b BACK_PLANE_IP_ADDRESS
None or IP address used by Back Plane
-d MQTT_PORT MQTT Port Number
-e BANYAN_PUB_TOPIC Topic for messages to MQTT
-g BANYAN_SUB_TOPICS [BANYAN_SUB_TOPICS ...]
Banyan topics space-delimited: topic1 topic2 topic3
-i MQTT_PUB_TOPIC Topic for messages sent to MQTT
-j MQTT_SUB_TOPICS [MQTT_SUB_TOPICS ...]
MQTT topics space-delimited: topic1 topic2 topic3
-n PROCESS_NAME Set process name in banner
-p PUBLISHER_PORT Publisher IP port
-s SUBSCRIBER_PORT Subscriber IP port
-t LOOP_TIME Event Loop Timer in seconds
When the MQTT Gateway is invoked, it automatically connects to both the Banyan Backplane and the MQTT Broker.
Before running the following examples, make sure that you have a mosquitto broker and a Python Banyan Backplane running.
Example 1: Pure MQTT Publish And Subscribe
This example will publish a simple MQTT message to be received by a simple MQTT subscriber.
MQTT Publisher
Below is the code for a simple MQTT publisher that publishes a single message and exits.
import json
import paho.mqtt.client as mqtt
# This is a simple MQTT publisher example.
# It connects to an MQTT broker, builds a payload and
# then publishes the message with a topic of "mqtt_network".
my_client = mqtt.Client()
my_client.connect("localhost", 1883, 60)
z = {'from_mqtt_client': 'hello'}
payload = json.dumps(z).encode()
my_client.publish("mqtt_network", payload)
my_client.disconnect()
MQTT Subscriber
The MQTT subscriber connects to an MQTT broker and subscribes to 2 topics. The mqtt_network messages are for messages originated on the MQTT network. The from_banyan messages are messages that were originated on the Banyan network. For this first example, no messages are originated from the Banyan network.
import paho.mqtt.client as mqtt
import json
# This is a simple MQTT subscriber example.
# It connects to an MQTT broker and
# subscribes to "mqtt_network" messages
# and prints a message when one is received.
def on_connect(client, userdata, flags, rc):
print("Connected with result code " + str(rc))
client.subscribe("mqtt_network")
client.subscribe("from_banyan")
def on_message(client, userdata, msg):
m = msg.payload.decode()
x = json.loads(m)
print(x)
my_client = mqtt.Client()
my_client.connect("localhost", 1883, 60)
my_client.on_connect = on_connect
my_client.on_message = on_message
my_client.loop_forever()
First, we start the subscriber and then the publisher. The subscriber prints any messages it receives. Here is the subscriber console output after running the publisher:
python3 sub.py
Connected with result code 0
{'from_mqtt_client': 'hello'}
Example 2: Publishing a Banyan Message To the MQTT Network
Start the MQTT Gateway by opening a terminal window and typing: mgw
The Gateway connects to the Python Backplane and the MQTT broker
$ mgw
************************************************************
MqttGateway using Back Plane IP address: 192.168.2.189
Subscriber Port = 43125
Publisher Port = 43124
Loop Time = 0.1 seconds
************************************************************
MQTT Gateway Connected to MQTT localhost:1883 with result code 0.
If it not already running, start the MQTT subscriber, sub.py.
To generate a message from Banyan to the MQTT network, we will use a Banyan component called bpub.py:
from python_banyan.banyan_base import BanyanBase
class Bpub(BanyanBase):
"""
This class will publish a message for the MqttGateway to forward to the MQTT network.
"""
def __init__(self):
"""
This is constructor for the Bpub class
"""
# initialize the base class
super(Bpub, self).__init__(process_name='Bpub')
# send a message to the MqttGateway
self.publish_payload({'from_banyan': 'hello_mqtt_world'}, 'to_mqtt')
# exit
self.clean_up()
b = Bpub()
If we then start the bpub.py and look at the MQTT Subscriber's output, we see the message from Banyan was sent out over MQTT and received by the MQTT subscriber:
~/PycharmProjects/python_banyan$ python3 sub.py
Connected with result code 0
{'from_banyan': 'hello_mqtt_world'}
The Gateway received a Banyan message, translated it to an MQTT message, published this message on the MQTT network, and received the translated message by the MQTT subscriber.
Example 3: Publishing an MQTT Message To the Banyan Network
In this example, an MQTT message will be published by pub.py. The MQTT Gateway will receive this message. It will then translate this message to a Banyan message and publish the message to the Banyan network.
A Banyan component called bsub.py subscribes to receive the translated message and will publish it to the console.
To run this example, start the MQTT Gateway as we did in Example 2.
Start bsub.py:
python3 bsub.py
from python_banyan.banyan_base import BanyanBase
class Bsub(BanyanBase):
"""
This class will receive any MQTT messages intercepted by MqttGateway
"""
def __init__(self):
"""
This is constructor for the Bpub class
"""
# initialize the base class
super(Bsub, self).__init__(process_name='Bsub')
# subscribe to receive MQTT messages processed
# by the MqttGateway
self.set_subscriber_topic('from_mqtt')
# start the receive_loop
self.receive_loop()
def incoming_message_processing(self, topic, payload):
print(topic, payload)
b = Bsub()
And finally, start the MQTT publisher pub.py
python3 pub.py
Looking at the console for bsub.py, we see that the message was received from the MQTT network and received by the Banyan network.
python3 bsub.py
************************************************************
Bsub using Back Plane IP address: 192.168.2.189
Subscriber Port = 43125
Publisher Port = 43124
Loop Time = 0.1 seconds
************************************************************
from_mqtt {'from_mqtt_client': 'hello'}
Copyright (C) 2017-2020 Alan Yorinks All Rights Reserved