Additional Banyan Gateways
MQTT Gateway
If you need to interconnect with MQTT, a Banyan MQTT Gateway has been provided. This gateway has been documented here.
WebSocket Gateway
The OneGPIO Demo Examples include Web pages to control an Arduino, ESP-8266, and Raspberry Pi. The WebPages publish commands via a WebSocket connection. This gateway translates the WebSocket command messages to OneGPIO command messages. It also translates OneGPIO reporter messages to WebSocket reporter messages to allow reports to be displayed on the Web page.
The WebSocket IP address that this component uses is localhost since it is intended to be used in conjunction with a Web Browser running on the same computer as the Gateway. The WebSocket IP port is fixed to a value of 9000. If you need to, you can modify the supplied code to allow the user to modify these values on the command line.
IMPORTANT NOTE
The WebSocket Gateway utilizes a Python asyncio WebSocket library. It requires that Python 3.7 or higher be used.
A Quick Overview Of The WsGateway Component
The Banyan WebSocket Gateway is an executable Banyan component. It follows the command line patterns exposed in the Banyan User's Guide:
usage: ws_gateway.py [-h]
[-b BACK_PLANE_IP_ADDRESS]
[-m SUBSCRIPTION_LIST [SUBSCRIPTION_LIST ...]]
[-n PROCESS_NAME] [-p PUBLISHER_PORT]
[-s SUBSCRIBER_PORT]
optional arguments:
-h, --help show this help message and exit
-b BACK_PLANE_IP_ADDRESS
None or IP address used by Back Plane
-m SUBSCRIPTION_LIST [SUBSCRIPTION_LIST ...]
A space delimited list of topics
-n PROCESS_NAME Set process name in banner
-p PUBLISHER_PORT Publisher IP port
-s SUBSCRIBER_PORT Subscriber IP port
A Quick Look At The WsGateway Internals
Once again, a block of code will be presented, followed by a discussion.
24 import argparse
25 import asyncio
26 import datetime
27 import json
28 import signal
29 import sys
30
31 import websockets
32
33 from python_banyan.banyan_base_aio import BanyanBaseAIO
34
35
36 class WsGateway(BanyanBaseAIO):
37 """
38 This class is a gateway between a websocket client and the
39 Banyan network.
40
41 NOTE: This class requires Python 3.7 or above.
42
43 It implements a websocket server. A websocket client, upon
44 connection, must send an id message e.g.: {"id": "to_arduino"}.
45
46 The id will be used as the topic to publish data to the banyan
47 network.
48 """
49
50 def __init__(self, *subscription_list, back_plane_ip_address=None,
51 subscriber_port='43125',
52 publisher_port='43124', process_name='WebSocketGateway',
53 event_loop=None):
54 """
55 These are all the normal base class parameters
56 :param subscription_list:
57 :param back_plane_ip_address:
58 :param subscriber_port:
59 :param publisher_port:
60 :param process_name:
61 :param event_loop:
62 """
63
64 # initialize the base class
65 super(WsGateway, self).__init__(subscriber_list=subscription_list,
66 back_plane_ip_address=back_plane_ip_address,
67 subscriber_port=subscriber_port,
68 publisher_port=publisher_port,
69 process_name=process_name,
70 event_loop=event_loop)
71 # save the event loop
72 self.event_loop = event_loop
73
74 # array of active sockets
75 self.active_sockets = []
76
77 try:
78 # start the websocket server and call the main task, wsg
79 self.start_server = websockets.serve(self.wsg, '127.0.0.1', 9000)
80 self.event_loop.run_until_complete(self.start_server)
81 self.event_loop.run_forever()
82 except (websockets.exceptions.ConnectionClosed,
83 RuntimeError,
84 KeyboardInterrupt):
85 sys.exit()
In this section of code, the necessary packages are imported and we define the WsGateway class, which is derived from BanyanBaseAIO.
Being an asyncio based class, one of the parameters that may be passed to this class is an asyncio event loop. Usually, the default event loop is used, but you can supply your own event loop if you need to.
This component implements a WebSocket server that permits connections to multiple WebSocket clients. An empty array, self.active_sockets is created on line 75 to store a record for each connected socket.
Lines 79-85 start the WebSocket server. When a client connects to the WebSocket server, the wsg method is called on line 79.
For information about the WebSocket server, please go to this link.
87 async def wsg(self, websocket, path):
88 """
89 This method handles connections and will be used to send
90 messages to the client
91 :param websocket: websocket for connected client
92 :param path: required, but unused
93 :return:
94 """
95 # start up banyan
96 await self.begin()
97
98 # wait for a connection
99 data = await websocket.recv()
100
101 # expecting an id string from client
102 data = json.loads(data)
103
104 # if id field not present then raise an exception
105 try:
106 id_string = data['id']
107 except KeyError:
108 print('Client did not provide an ID string')
109 raise
110
111 # create a subscriber string from the id
112 subscriber_string = id_string.replace('to', 'from')
113
114 # subscribe to that topic
115 await self.set_subscriber_topic(subscriber_string)
116
117 # add an entry into the active_sockets table
118 entry = {websocket: 'to_banyan_topic', subscriber_string: websocket}
119 self.active_sockets.append(entry)
120
121 # create a task to receive messages from the client
122 await asyncio.create_task(self.receive_data(websocket, data['id']))
The wsg method is called when a WebSocket client connects to the WebSocket Gateway. Line 96 establishes the zeromq subscriber and publisher sockets for this component as well as providing a connection to the Banyan Backplane.
Line 99 waits to receive initial identification data from a WebSocket client. This data is used to create a subscription topic for the WebSocket Gateway.
Using the Arduino Demo Station Page as an example, the ID string sent as a WebSocket message from the Web page is "to_arduino_gateway." This ID string is used to create subscription topic strings that the WebSocket Gateway uses for its operation. This is accomplished on lines 112-115.
An entry for the socket connection is created and added to the active_websockets array. The entry is used to dispatch messages to the correct WebSocket during data transfer. This is accomplished on lines 118-119.
Line 122 creates an asyncio task to continuously receive WebSocket messages from any connected WebSocket client. This task passes these messages to the receive_data method for further processing.
124 async def receive_data(self, websocket, publisher_topic):
125 """
126 This method processes a received WebSocket command message
127 and translates it to a Banyan command message.
128 :param websocket: The currently active websocket
129 :param publisher_topic: The publishing topic
130 """
131 while True:
132 try:
133 data = await websocket.recv()
134 data = json.loads(data)
135 except (websockets.exceptions.ConnectionClosed, TypeError):
136 # remove the entry from active_sockets
137 # using a list comprehension
138 self.active_sockets = [entry for entry in self.active_sockets if websocket not in entry]
139 break
140
141 await self.publish_payload(data, publisher_topic)
The receive_data method processes WebSocket messages sent from the WebSocket client. The message is in the form of a JSON message and it is decoded on line 134. The message is then published as a Banyan OneGPIO message on line 141.
143 async def incoming_message_processing(self, topic, payload):
144 """
145 This method converts the incoming messages to ws messages
146 and sends them to the ws client
147
148 :param topic: Message Topic string.
149
150 :param payload: Message Data.
151 """
152 if 'timestamp' in payload:
153 timestamp = datetime.datetime.fromtimestamp(payload['timestamp']).strftime('%Y-%m-%d %H:%M:%S')
154 payload['timestamp'] = timestamp
155
156 ws_data = json.dumps(payload)
157
158 # find the websocket of interest by looking for the topic in
159 # active_sockets
160 for socket in self.active_sockets:
161 if topic in socket.keys():
162 pub_socket = socket[topic]
163 await pub_socket.send(ws_data)
164 # print(ws_data)
The incoming_message_processing method is the standard Banyan message processing method, overwritten to process OneGPIO messages received from the target hardware in the form of a report message. Using the topic of the message as a key, it looks up the associated WebSocket in the active_sockets array and publishes the message to the correct WebSocket.
On lines 152-154, if the gateway provided a timestamp, the timestamp is formatted and appended to the report message. Line 156 encodes the message as a JSON message and sends the message to the WebSocket client on line 163.
167 def ws_gateway():
168 # allow user to bypass the IP address auto-discovery. This is necessary if the component resides on a computer
169 # other than the computing running the backplane.
170
171 parser = argparse.ArgumentParser()
172 parser.add_argument("-b", dest="back_plane_ip_address", default="None",
173 help="None or IP address used by Back Plane")
174 # allow the user to specify a name for the component and have it shown on the console banner.
175 # modify the default process name to one you wish to see on the banner.
176 # change the default in the derived class to set the name
177 parser.add_argument("-m", dest="subscription_list", default="from_arduino_gateway, "
178 "from_ESP8266_gateway, "
179 "from_rpi_gateway, "
180 "from_microbit_gateway", nargs='+',
181 help="A space-delimited list of topics")
182 parser.add_argument("-n", dest="process_name", default="WebSocket Gateway",
183 help="Set process name in banner")
184 parser.add_argument("-p", dest="publisher_port", default='43124',
185 help="Publisher IP port")
186 parser.add_argument("-s", dest="subscriber_port", default='43125',
187 help="Subscriber IP port")
188
189 args = parser.parse_args()
190
191 subscription_list = args.subscription_list.split(',')
192
193 kw_options = {
194 'publisher_port': args.publisher_port,
195 'subscriber_port': args.subscriber_port,
196 'process_name': args.process_name,
197 }
198
199 if args.back_plane_ip_address != 'None':
200 kw_options['back_plane_ip_address'] = args.back_plane_ip_address
201
202 # get the event loop
203 loop = asyncio.get_event_loop()
204
205 WsGateway(*subscription_list, **kw_options, event_loop=loop)
206
207
208 def signal_handler(sig, frame):
209 print('Exiting Through Signal Handler')
210 raise KeyboardInterrupt
211
212
213 # listen for SIGINT
214 signal.signal(signal.SIGINT, signal_handler)
215 signal.signal(signal.SIGTERM, signal_handler)
216
217 if __name__ == '__main__':
218 ws_gateway()
Lines 167-219 implement the standard way of instantiating a Banyan component.
TCP Gateway
The TCP gateway is an example of a specialized Banyan gateway. This gateway was designed to permit communication between a Banyan application and any TCP server supporting TCP/IP sockets and MessagePack.
Testing of the TCP gateway utilized a TCP server running on Raspberry Pi Pico W loaded with MicroPython. MicroPython supports both TCP sockets as well as MessagePack.
The TCP gateway is implemented as a TCP client. A command line executable, called tgw, is installed when python_banyan is installed. Typically one uses the TCP gateway by invoking it through its command line and arguments.
Here are the command-line arguments that tgw supports:
tgw --help
usage: tgw [-h] [-a TCP_IP_ADDRESS] [-b BACK_PLANE_IP_ADDRESS]
[-e BANYAN_PUB_TOPIC]
[-g SUBSCRIPTION_LIST [SUBSCRIPTION_LIST ...]] [-l EVENT_LOOP]
[-n TCP_PORT] [-p PUBLISHER_PORT] [-s SUBSCRIBER_PORT]
[-z PROCESS_NAME]
options:
-h, --help show this help message and exit
-a TCP_IP_ADDRESS IP address TCP Server
-b BACK_PLANE_IP_ADDRESS
None or IP address used by Back Plane
-e BANYAN_PUB_TOPIC Topic for messages to the host PC
-g SUBSCRIPTION_LIST [SUBSCRIPTION_LIST ...]
Banyan topics space delimited: topic1 topic2 topic3
-l EVENT_LOOP asyncio event loop
-n TCP_PORT TCP Server Port Number
-p PUBLISHER_PORT Publisher IP port
-s SUBSCRIBER_PORT Subscriber IP port
-z PROCESS_NAME Name of this gateway
TCP_IP_ADDRESS is a required parameter. It is the IP address of the TCP server assigned by the local router.
The BACK_PLANE_IP_ADDRESS, PUBLISHER_PORT, and SUBSCRIBER_PORT are optional parameters and are typically not used. Default values are assigned if not specified.
BANYAN_PUB_TOPIC is the topic used when publishing data to the Banyan network. It has a default value of "from_pico."
SUBSCRIPTION_LIST is the list of topics for messages that the TCP gateway processes and passes on to the TCP server. Typically a single topic is used, and the default value is "figura."
EVENT_LOOP is an optional parameter allowing the application to specify an asyncio event loop. If none is specified, the TCP gateway will create an asyncio loop.
PROCESS_NAME has a default value of "TcpGateway." It displays the TCP Gateway in the application's console window.
Internally, when a subscribed Banyan message arrives, the MessagePack encoded packet is enhanced with a length byte. The packet is forwarded to the TCP server to perform MessagePack decoding and processing.
When the TCP server wishes to send a message to the Banyan network, it MessagePack encodes it and prepends a message length to the message.
The reason for adding a message length is that TCP is a streaming protocol. Multiple messages may be combined into a single TCP packet. Having a message length assures that messages are appropriately framed.
Also, note that the TCP gateway neither encodes nor decodes the messages. Doing so provides better system throughput, and encoding and decoding are performed once when needed.
A demo is provided. Follow this link. for instructions.
Copyright (C) 2017-2022 Alan Yorinks All Rights Reserved