Multi-Backplane Applications
For the vast majority of Banyan applications, a single backplane is all that is needed to handle all the application's messaging needs. When using a single backplane configuration, there is no need to manually perform message routing, since all components automatically connect to a common backplane.
However, there are occasions where you might like to design your component so that it can communicate with more than one backplane. Some possible scenarios for this configuration would be to provide message load balancing or a better separation of operational concerns. To support these types of configurations, a new base class, BanyanBaseMulti, is required.
The Banyan Base Multi Base Class
In many ways, the BanyanBaseMulti base class is similar to the BanyanBase class, but there is one significant difference. A BanyanBaseMulti component requires that a backplane routing table be provided at initialization. This table is known as the routing specification file.
Note: BanyanBase and BanyanBaseMulti components may communicate with each other without restriction. No coding changes or message format changes are required.
The Routing Specification File For BanyanBaseMulti
Every BanyanBaseMulti component requires access to a unique comma-delimited routing specification file. The name and path of this file is a required component input parameter.
Let's explore the format of the specification file.
The header and first line of a routing specification file is:
backplane_name,ip_address,subscriber_port,subscriber_topic,publisher_port
The backplane_name field is a unique ID string given to each of the connected backplanes. The IDs are used to de-reference publisher and subscriber connections.
The ip_address field is the IP address of the given backplane. Backplanes that share a single IP address are differentiated by having unique subscriber and publisher ports.
The subscriber_port specifies the subscriber port for the given backplane.
The subscriber_topic is an optional Python list of subscriber topics established at the instantiation of the component. If a single topic is to be specified, it still must be entered as a Python list.
IMPORTANT NOTE: Even though the topics are strings, the individual topic strings must not be enclosed in quotes. Instead, the whole list, including the brackets, is enclosed in quotes. Also, there must not be any spaces after the commas. For example, if the topics are start, and finish, the subscriber_topic field is specified as: "[start,finish]"
The publisher_port specifies the publisher port for the given backplane.
Here is sample of a routing specification file for a component that connects to 5 backplanes.
backplane_name,ip_address,subscriber_port,subscriber_topic,publisher_port
BP1,192.168.2.194,43127,"[reply]",43126
BP2,192.168.2.194,43125,"[reply]",43124
BP3,192.168.2.177,43125,"[]",43124
BP4,192.168.2.199,43125,"[run_motors,light_leds]",
BP5,192.168.2.133,,"[run_motors,light_leds]",43124
Let's launch a sample BanyanBaseMulti component called MultiDriver. When the component initializes, it will read the routing specification file, and print out its interpretation of that file as its console header. Let's look at that console header.
Using Backplane Descriptor File: multi_driver_spec.csv
************************************************************
MultiDriver using BP1 Black plane at IP Address: 192.168.2.194
Subscriber Port = 43127
Subscribed to topic: reply
Publisher Port = 43126
MultiDriver using BP2 Black plane at IP Address: 192.168.2.194
Subscriber Port = 43125
Subscribed to topic: reply
Publisher Port = 43124
MultiDriver using BP3 Black plane at IP Address: 192.168.2.177
Subscriber Port = 43125
Subscribed to topic:
Publisher Port = 43124
MultiDriver using BP4 Black plane at IP Address: 192.168.2.199
Subscriber Port = 43125
Subscribed to topic: run_motors
Subscribed to topic: light_leds
Publisher Port = None Specified
MultiDriver using BP5 Black plane at IP Address: 192.168.2.133
Subscriber Port = None Specified
Publisher Port = 43124
Loop Time = 0.1 seconds
************************************************************
Notice that for BP1 and BP2, all fields are specified. Since they share a common IP address, their ports have different values.
For BP3, no topic list was specified, and therefore the "Subscribed to topic" for BPI3 shows no topics.
BP4 specifies multiple subscriber topics and a subscriber port, but no publisher port. This is a valid case when the component does not publish any messages.
BP5 has no subscriber port specified and no topics specified. Because there is no subscriber port, topics are not listed. For BP5, this component acts only as a publisher.
A simple working example is discussed below if you would like to try using BanyanBaseMulti.
The Python Banyan Multi Base Class Code In Detail
Lines 1 through 66 should seem very similar to those for the BanyanBase base class.
1 """
2 banyan_base_multi.py
3
4 Copyright (c) 2016 - 2019 Alan Yorinks All right reserved.
5
6 Python Banyan is free software; you can redistribute it and/or
7 modify it under the terms of the GNU AFFERO GENERAL PUBLIC LICENSE
8 Version 3 as published by the Free Software Foundation; either
9 or (at your option) any later version.
10 This library is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
14
15 You should have received a copy of the GNU AFFERO GENERAL PUBLIC LICENSE
16 along with this library; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18
19 """
20 from __future__ import unicode_literals
21
22 # Use argparse and signal if you wish to implement the argparse
23 # code located at the bottom of this file
24 # import argparse
25 # import signal
26
27 import csv
28 import sys
29 import time
30 import itertools
31 import msgpack
32 import msgpack_numpy as m
33 import umsgpack
34 import zmq
35 import os
36
37
38 # noinspection PyMethodMayBeStatic
39 class BanyanBaseMulti(object):
40 """
41
42 This is the base class for Python Banyan components that wish to connect to multiple back planes.
43 It requires the user to create a .csv descriptor file to describe the back planes and their
44 addresses/ports and subscription topics.
45
46 The .csv file has the following format. This line must be the first line in the file:
47
48 backplane_name,ip_address,subscriber_port,subscriber_topic,publisher_port
49
50 backplane_name: a unique identifier string for backplane - for informational purposes only
51 ip_address: IP address of the computer that the backplane is running on
52 subscriber_port: port number assigned to the backplane's subscriber port
53 subscriber_topic: this is an optional list of subscription topics
54 publisher_port: port number assigned to the backplane's publisher port
55
56
57 There should be an entry in this file for each backplane that the component needs to connect to.
58
59 This class encapsulates zeromq and message pack functionality. Its methods should be overridden by the user
60 in the derived class to meet the needs of the component.
61
62 To import into the derived class use:
63
64 from python_banyan.banyan_base_multi import BanyanBaseMulti
65
66 """
__init__
The __init__ method requires the name and path to a routing specification file. Similar to a BanyanBase component, it accepts a process name, a loop_time, and a flag to process numpy data.
Lines 87 and 88 establish "constants" for the socket types. This is used in the find_socket method discussed below.
Lines 94 and 95 verify that a routing specification file can be found.
Lines 100 through 108 establish instance variables used by the class.
Lines 118 through 173, read and validate the routing specification file and print out the component's "header" data to the console.
68 def __init__(self, back_plane_csv_file=None, process_name='None',
69 loop_time=.1, numpy=False, connect_time=0.3):
70 """
71 The __init__ method sets up all the ZeroMQ "plumbing"
72
73 :param back_plane_csv_file: full path to .csv file with backplane descriptors
74
75 :param process_name: identifier for your component printed at startup on the console
76
77 :param loop_time: receive loop sleep time
78
79 :param numpy: Set true if you wish to include numpy matrices in your messages
80
81 :param connect_time: a short delay to allow the component to connect to the Backplane
82
83 :return:
84 """
85
86 # socket type - used for calls to find_socket
87 self.SUB_SOCK = 0
88 self.PUB_SOCK = 1
89
90 if back_plane_csv_file is None:
91 raise ValueError('You must specify a valid .csv backplane descriptor file')
92
93 # file specified, make sure it exists
94 if not os.path.isfile(back_plane_csv_file):
95 raise ValueError("Can't find backplane configuration file")
96
97 if process_name == 'None':
98 print('Warning: No Process Name Was Specified')
99
100 self.numpy = numpy
101
102 self.connect_time = connect_time
103
104 # if using numpy apply the msgpack_numpy monkey patch
105 if numpy:
106 m.patch()
107
108 self.loop_time = loop_time
109
110 # get a zeromq context
111 self.my_context = zmq.Context()
112
113 # a list of dictionaries describing connections to the back planes
114 self.backplane_table = []
115
116 print("\nUsing Backplane Descriptor File: ", back_plane_csv_file)
117
118 with open(back_plane_csv_file) as csvfile:
119 reader = csv.DictReader(csvfile)
120 print('\n************************************************************\n')
121 for row in reader:
122 # make sure backplane name is unique
123 if any(d['backplane_name'] == row['backplane_name'] for d in self.backplane_table):
124 raise RuntimeError('Duplicate Back Plane Name - check your .csv file')
125
126 print(process_name + ' using ' + row['backplane_name'] + ' Black plane at IP Address: ' +
127 row['ip_address'])
128
129 # setup a publisher and subscriber for each backplane
130 subscriber = None
131 if row['subscriber_port']:
132 subscriber = self.my_context.socket(zmq.SUB)
133 connect_string = "tcp://" + row['ip_address'] + ':' + row['subscriber_port']
134 subscriber.connect(connect_string)
135
136 publisher = None
137 if row['publisher_port']:
138 publisher = self.my_context.socket(zmq.PUB)
139 connect_string = "tcp://" + row['ip_address'] + ':' + row['publisher_port']
140 publisher.connect(connect_string)
141
142 # get topics and subscribe to them
143 # test that topic string has a leading and trailing []
144 if row['subscriber_port']:
145 print(' Subscriber Port = ' + row['subscriber_port'])
146 topic_list = row['subscriber_topic']
147 if '[' not in topic_list:
148 raise RuntimeError('Topic field must begin with "[" and end with "]" ')
149 if ']' not in topic_list:
150 raise RuntimeError('Topic field must begin with "[" and end with "]" ')
151
152 # make sure that the topic string does not contain a space character
153 if ' ' in topic_list:
154 raise RuntimeError('Topics may not contain a space character')
155
156 topic_list = topic_list[1:-1].split(',')
157
158 # subscribe to topics in list
159 for t in topic_list:
160 print(' Subscribed to topic: ' + t)
161 self.set_subscriber_topic(t, subscriber)
162 else:
163 print(' Subscriber Port = None Specified')
164
165 if row['publisher_port']:
166 print(' Publisher Port = ' + row['publisher_port'])
167 else:
168 print(' Publisher Port = None Specified')
169
170 # update backplane table with new entry
171 self.backplane_table.append(
172 {'backplane_name': row['backplane_name'], 'subscriber': subscriber,
173 'publisher': publisher})
174
175 # wait for the last Backplane TCP connection
176 time.sleep(self.connect_time)
177
178 print()
179 print('Loop Time = ' + str(loop_time) + ' seconds\n')
180 print('************************************************************')
find_socket
The find_socket method accepts a backplane name specified in the routing specification table and a socket type (publisher or subscriber) as input parameters. It returns a socket connection for publishing messages to or receiving messages from the specified backplane.
182 def find_socket(self, backplane, socket_type):
183 """
184 Find a publisher or subscriber in the backplane table and return a ZMQ socket reference
185
186 :param backplane: backplane name entry in table
187
188 :param socket_type: publisher or subscriber
189
190 :return: socket reference or None
191 """
192 valid_socket_types = [self.PUB_SOCK, self.SUB_SOCK]
193
194 if socket_type in valid_socket_types:
195 try:
196 entry = next(item for item in self.backplane_table if item.get("backplane_name") == backplane)
197 if socket_type == self.PUB_SOCK:
198 rval = entry['publisher']
199 else:
200 rval = entry['subscriber']
201 return rval
202
203 except StopIteration:
204 raise StopIteration(backplane + ' not found in table.')
205 else:
206 raise ValueError(socket_type + ' is an illegal socket_type')
set_subscriber_topic
The set_subscriber_topic method associates a subscription topic with the given subscriber_socket connection.
208 def set_subscriber_topic(self, topic, subscriber_socket):
209 """
210 This method sets a subscriber topic.
211
212 You can subscribe to multiple topics by calling this method for
213 each topic.
214
215 :param topic: A topic string
216
217 :param subscriber_socket: subscriber socket
218
219 :return:
220 """
221 if sys.version_info[0] < 3:
222 try:
223 topic = topic.encode()
224 except AttributeError:
225 raise TypeError('Publish topic must be python_banyan string', 'topic')
226 if not type(topic) is str:
227 raise TypeError('Subscriber topic must be python_banyan string')
228
229 if subscriber_socket:
230 subscriber_socket.setsockopt(zmq.SUBSCRIBE, topic.encode())
231
232 else:
233 raise ValueError('set_subscriber_topic: socket is None')
234
unsubscribe_topic
The unsubscribe_topic method allows one to unsubscribe from a topic for a backplane subscriber socket connection.
235 def unsubscribe_topic(self, topic, subscriber_socket):
236 """
237 This method un-subscribes from a topic.
238
239 :param topic: A topic string
240
241 :param subscriber_socket: subscriber socket
242
243 :return:
244 """
245 if sys.version_info[0] < 3:
246 try:
247 topic = topic.encode()
248 except AttributeError:
249 raise TypeError('Publish topic must be python_banyan string', 'topic')
250 if not type(topic) is str:
251 raise TypeError('Subscriber topic must be python_banyan string')
252
253 if subscriber_socket:
254 subscriber_socket.unsubscribe(topic.encode())
255
256 else:
257 raise ValueError('set_subscriber_topic: socket is None')
publish_payload
The publish_payload method is very similar to that for BanyanBase.
IMPORTANT NOTE: If the topic string is "BROADCAST," then the message is published to all backplanes containing publisher ports specified in the routing specification file.
259 def publish_payload(self, payload, publisher_socket, topic=''):
260 """
261 This method will publish a python_banyan payload and its associated topic
262
263 :param payload: Protocol message to be published
264
265 :param publisher_socket: Publisher socket - handle to socket or "BROADCAST" to send to
266 all connected publisher sockets
267
268 :param topic: A string value for message topic
269
270 :return:
271 """
272 if sys.version_info[0] < 3:
273 try:
274 topic = topic.encode()
275 except AttributeError:
276 raise TypeError('Publish topic must be python_banyan string', 'topic')
277 if not type(topic) is str:
278 raise TypeError('Publish topic must be python_banyan string', 'topic')
279
280 # create python_banyan message pack payload
281 if self.numpy:
282 message = msgpack.packb(payload, default=m.encode)
283 else:
284 message = umsgpack.packb(payload)
285
286 pub_envelope = topic.encode()
287 if publisher_socket == "BROADCAST":
288 for element in self.backplane_table:
289 if element['publisher']:
290 element['publisher'].send_multipart([pub_envelope, message])
291 else:
292
293 if publisher_socket:
294 publisher_socket.send_multipart([pub_envelope, message])
295 else:
296 raise ValueError('Invalid publisher socket')
receive_loop
The receive_loop cycles through all of the backplane subscriber sockets to see if any messages need to be processed. If so, it calls the incoming_message_processing method.
298 def receive_loop(self):
299 """
300 This is the receive loop for zmq messages.
301
302 This method may be overwritten to meet the needs
303 of the application before handling received messages.
304
305 :return:
306 """
307 for element in itertools.cycle(self.backplane_table):
308 if element['subscriber']:
309 try:
310 data = element['subscriber'].recv_multipart(zmq.NOBLOCK)
311 if self.numpy:
312 payload = msgpack.unpackb(data[1], object_hook=m.decode)
313 self.incoming_message_processing(data[0].decode(), payload)
314 else:
315 self.incoming_message_processing(data[0].decode(), umsgpack.unpackb(data[1]))
316 except zmq.error.Again:
317 try:
318 time.sleep(self.loop_time)
319 except KeyboardInterrupt:
320 self.clean_up()
321 sys.exit(0)
322 except AttributeError:
323 raise
324
incoming_message_processing
This method needs to be overridden to handle the incoming messages.
325 def incoming_message_processing(self, topic, payload):
326 """
327 Override this method with a custom python_banyan message processor for subscribed messages
328
329 :param topic: Message Topic string
330
331 :param payload: Message Data
332
333 :return:
334 """
335 print('this method should be overwritten in the child class', topic, payload)
clean_up
The clean_up method iterates through all the backplanes and closes their connections.
337 def clean_up(self):
338 """
339 Clean up before exiting - override if additional cleanup is necessary
340
341 :return:
342 """
343 for element in self.backplane_table:
344 if element['publisher']:
345 element['publisher'].close()
346 if element['subscriber']:
347 element['subscriber'].close()
348 self.my_context.term()
A Simple Working Example
This section provides a simple demonstration of a Banyan application that makes use of the BanyanBaseMulti base class. For this example, 2 backplanes are invoked, each on a different computer. The example can be easily modified to run both backplanes on a single computer by adjusting the routing specification file's values.
In addition to the two backplanes, the application consists of three Banyan components. The first component is a reuse of echo_cmdline_client.py, and it resides on computer1. The second component, multi_echo_server.py derived from BanyanBaseMulti The third component, notifier.py, derived from BanyanBase, resides on computer2.
Backplane1 resides on computer1, and backplane2 resides on computer2.
The client publishes messages to backplane1, and the server echoes these messages back within backplane1. As each message is received by the server, it tests to see if the message number is 0. Message number 0 signals that this is the final message. If the message number is 0, the server then publishes a "notifier" message routed to backplane2.
The Notifier
The code for the Notifier is extremely simple. It registers to receive messages with a 'notice' topic and when a message is received, prints 'Notification Received':
1 """
2 notifier.py
3
4 Copyright (c) 2018 - 2019 Alan Yorinks All right reserved.
5
6 Python Banyan is free software; you can redistribute it and/or
7 modify it under the terms of the GNU AFFERO GENERAL PUBLIC LICENSE
8 Version 3 as published by the Free Software Foundation; either
9 or (at your option) any later version.
10 This library is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
14
15 You should have received a copy of the GNU AFFERO GENERAL PUBLIC LICENSE
16 along with this library; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18
19 """
20 import sys
21 import time
22
23 from python_banyan.banyan_base import BanyanBase
24
25
26 class Notifier(BanyanBase):
27 """
28 This class is a simple Banyan echo server
29
30 """
31
32 def __init__(self, ):
33
34 # initialize the parent
35 super(Notifier, self).__init__(process_name='Notifier')
36
37 # allow time for base class to initialize publisher/subscriber connections
38 time.sleep(.3)
39
40 # subscribe to receive 'echo' messages from the client
41 self.set_subscriber_topic('notice')
42
43 # wait for messages to arrive
44 try:
45 self.receive_loop()
46 except KeyboardInterrupt:
47 self.clean_up()
48 sys.exit(0)
49
50 def incoming_message_processing(self, topic, payload):
51 """
52 Process incoming messages from the client
53 :param topic: message topic
54 :param payload: message payload
55 :return:
56 """
57 print('Notification Received!')
58
59
60 def notifier():
61 Notifier()
62
63
64 if __name__ == '__main__':
65 notifier()
The Echo Client
We are reusing the echo_cmdline_client unmodified.
The MultiEchoServer
The server is derived from BanyanBaseMulti. Let us begin by examining its associated routing specification file.
Routing Specification File
backplane_name,ip_address,subscriber_port,subscriber_topic,publisher_port
BP1,192.168.2.190,43125,"[echo]",43124
BP2,192.168.2.180,43125,"[]",43124
BP1 is the backplane that is running on 192.168.2.190 and the server subscribes to the echo topic on backplane1.
BP2 is the backplane that is running 192.168.2.180, and the server publishes notifier messages to backplane2.
The MultiEchoServer Code
Lines 39 and 42 establish the sockets for each of the backplanes. It uses the find_socket method to retrieve the publisher sockets for the backplanes. It uses the names of the backplanes from the routing specification file to retrieve the sockets.
Line 59 echoes the incoming messages from the client. If the message number is 0, then line 62 publishes a notice message to backplane2.
1 """
2 multi_echo_server.py
3
4 Copyright (c) 2016-2019 Alan Yorinks All right reserved.
5
6 Python Banyan is free software; you can redistribute it and/or
7 modify it under the terms of the GNU AFFERO GENERAL PUBLIC LICENSE
8 Version 3 as published by the Free Software Foundation; either
9 or (at your option) any later version.
10 This library is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
14
15 You should have received a copy of the GNU AFFERO GENERAL PUBLIC LICENSE
16 along with this library; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18
19 """
20 import sys
21 import time
22
23 from python_banyan.banyan_base_multi import BanyanBaseMulti
24
25
26 class MultiEchoServer(BanyanBaseMulti):
27 """
28 This class is a simple Banyan echo server
29
30 """
31
32 def __init__(self, ):
33
34 # initialize the parent
35 super(MultiEchoServer, self).__init__(back_plane_csv_file='spec.csv', process_name='MultiDriver')
36
37 # find the sockets in the table for the 2 backplanes
38 # socket a is to reply to messages from the client
39 self.socket_a = self.find_socket("BP1", self.PUB_SOCK)
40
41 # socket b is to send the notification messages
42 self.socket_b = self.find_socket("BP2", self.PUB_SOCK)
43
44 # wait for messages to arrive
45 try:
46 self.receive_loop()
47 except KeyboardInterrupt:
48 self.clean_up()
49 sys.exit(0)
50
51 def incoming_message_processing(self, topic, payload):
52 """
53 Process incoming messages from the client
54 :param topic: message topic
55 :param payload: message payload
56 :return:
57 """
58 if topic == 'echo':
59 self.publish_payload(payload, self.socket_a, 'reply')
60 print('Message number:', payload['message_number'])
61 if payload['message_number'] == 0:
62 self.publish_payload({'message': 'got it', 'id': 'b'},
63 self.socket_b, 'notice')
64
65
66 def multi_echo_server():
67 MultiEchoServer()
68
69
70 if __name__ == '__main__':
71 multi_echo_server()
Running The Example
First, we start a backplane, a monitor and multi_echo_server.py on 192.168.2.190.
The next step is to start a backplane notifier.py on 192.168.2.180.
Finally, we start the echo client on 192.168.2.190. It sends its 10 messages. When the server receives message number 0, it sends its notification message.
Looking at the monitor on 192.168.2.190, we see:
And the monitor on 192.168.2.180 shows:
Copyright (C) 2017-2020 Alan Yorinks All Rights Reserved