1 """
2 Copyright (c) 2015 Alan Yorinks All rights reserved.
3
4 This program is free software; you can redistribute it and/or
5 modify it under the terms of the GNU General Public
6 License as published by the Free Software Foundation; either
7 version 3 of the License, or (at your option) any later version.
8
9 This library is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 General Public License for more details.
13
14 You should have received a copy of the GNU General Public
15 License along with this library; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
17 """
18
19 import asyncio
20 import os
21
22 from .pymata_core import PymataCore
23
24
25
27 """
28 This class exposes and implements a proxy API for the pymata_core asyncio API, If your application
29 does not use asyncio directly, this is the API that you should use..
30 """
31
32 - def __init__(self, arduino_wait=2, com_port=None):
33 """
34 Constructor for the PyMata3 API
35 @param arduino_wait: Amount of time to allow Arduino to finish its reset (2 seconds for Uno, Leonardo can be 0)
36 @param com_port: If specified, auto port detection will not be performed and this com port will be used.
37 @return: None
38 """
39 self.sleep_tune = .001
40 self.core = PymataCore(arduino_wait, self.sleep_tune, com_port)
41 self.core.start()
42 self.sleep(1)
43
45 """
46 Retrieve the last data update for the specified analog pin.
47 It is intended for a polling application.
48 @param pin: Analog pin number (ex. A2 is specified as 2)
49 @return: Last value reported for the analog pin
50 """
51 loop = asyncio.get_event_loop()
52 asyncio.async(self.core.analog_read(pin))
53 value = loop.run_until_complete(self.core.analog_read(pin))
54 return value
55
57 """
58 Set the selected PWM pin to the specified value.
59 @param pin: PWM pin number
60 @param value: Set the selected pin to the specified value. 0-0x4000 (14 bits)
61 @return: No return value
62 """
63 asyncio.async(self.core.analog_write(pin, value))
64
66 """
67 Retrieve the last data update for the specified digital pin.
68 It is intended for a polling application.
69 @param pin: Digital pin number
70 @return: Last value reported for the digital pin
71 """
72 loop = asyncio.get_event_loop()
73 asyncio.async(self.core.digital_read(pin))
74 value = loop.run_until_complete(self.core.digital_read(pin))
75 return value
76
78 """
79 Set the specified digital input pin to the provided value
80 @param pin: Digital pin to be set
81 @param value: 0 or 1
82 @return: No return value
83 """
84 loop = asyncio.get_event_loop()
85 asyncio.async(self.core.digital_write(pin, value))
86 loop.run_until_complete(self.core.digital_write(pin, value))
87
89 """
90 Disables analog reporting for a single analog pin.
91 @param pin: Analog pin number. For example for A0, the number is 0.
92 @return: No return value
93 """
94 asyncio.async(self.core.disable_analog_reporting(pin))
95
97 """
98 Disables digital reporting. By turning reporting off for this pin, reporting
99 is disabled for all 8 bits in the "port" -
100 @param pin: Pin and all pins for this port
101 @return: No return value
102 """
103 asyncio.async(self.core.disable_analog_reporting(pin))
104
106 """
107 This command enables the rotary encoder (2 pin + ground) and will
108 enable encoder reporting.
109
110 This is a FirmataPlus feature.
111
112 Encoder data is retrieved by performing a digital_read from pin a (encoder pin 1)
113
114 @param pin_a: Encoder pin 1.
115 @param pin_b: Encoder pin 2.
116 @param cb: callback function to report encoder changes
117 @return: No return value
118 """
119 asyncio.async(self.core.encoder_config(pin_a, pin_b, cb))
120
122 """
123 This method retrieves the latest encoder data value. It is a FirmataPlus feature.
124 @param pin: Encoder Pin
125 @return: encoder data value
126 """
127 loop = asyncio.get_event_loop()
128 asyncio.async(self.core.encoder_read(pin))
129 try:
130 value = loop.run_until_complete(self.core.encoder_read(pin))
131 return value
132 except RuntimeError:
133 self.shutdown()
134
136 """
137 Enables analog reporting for a single analog pin,
138 @param pin: Analog pin number. For example for A0, the number is 0.
139 @return: No return value
140 """
141 asyncio.async(self.core.enable_analog_reporting(pin))
142
144 """
145 Enables digital reporting. By turning reporting on for all 8 bits in the "port" -
146 This is part of Firmata's protocol specification.
147 @param pin: Pin and all pins for this port
148 @return: No return value
149 """
150 asyncio.async(self.core.enable_digital_reporting(pin))
151
153 """
154 This method will send an extended-data analog write command to the selected pin..
155 @param pin: 0 - 127
156 @param data: 0 - 0-0x4000 (14 bits)
157 @return: No return value
158 """
159 loop = asyncio.get_event_loop()
160 asyncio.async(self.core.extended_analog(pin, data))
161 loop.run_until_complete(self.core.extended_analog(pin, data))
162
164 """
165 A list is returned containing the latch state for the pin, the latched value, and the time stamp
166 [pin_num, latch_state, latched_value, time_stamp]
167 If the the latch state is LATCH_LATCHED, the table is reset (data and timestamp set to zero)
168 It is intended to be used for a polling application.
169 @param pin: Pin number.
170 @return: [latched_state, threshold_type, threshold_value, latched_data, time_stamp]
171 """
172 loop = asyncio.get_event_loop()
173 asyncio.async(self.core.get_analog_latch_data(pin))
174 l_data = loop.run_until_complete(self.core.get_analog_latch_data(pin))
175 return l_data
176
178 """
179 This method requests and returns an analog map.
180 @return: An analog map response or None if a timeout occurs
181 """
182 loop = asyncio.get_event_loop()
183 asyncio.async(self.core.get_analog_map())
184 report = loop.run_until_complete(self.core.get_analog_map())
185 if cb:
186 cb(report)
187 else:
188 return report
189
191 """
192 This method retrieves the Firmata capability report
193 @param raw: If True, it either stores or provides the callback with a report as list.
194 If False, prints a formatted report to the console
195 @param cb: Optional callback reference to receive a raw report
196 @return: capability report
197 """
198 loop = asyncio.get_event_loop()
199 asyncio.async(self.core.get_capability_report())
200
201 report = loop.run_until_complete(self.core.get_capability_report())
202 if raw:
203 if cb:
204 cb(report)
205 else:
206 return report
207 else:
208
209 self.core._format_capability_report(report)
210
212 """
213 A list is returned containing the latch state for the pin, the latched value, and the time stamp
214 [pin_num, latch_state, latched_value, time_stamp]
215 If the the latch state is LATCH_LATCHED, the table is reset (data and timestamp set to zero).
216 It is intended for use by a polling application.
217 @param pin: Pin number.
218 @return: [latched_state, threshold_type, threshold_value, latched_data, time_stamp]
219 """
220 loop = asyncio.get_event_loop()
221 asyncio.async(self.core.get_digital_latch_data(pin))
222 l_data = loop.run_until_complete(self.core.get_digital_latch_data(pin))
223 return l_data
224
226 """
227 This method retrieves the Firmata firmware version
228 @param cb: Reference to a callback function
229 @return:If no callback is specified, the firmware version
230 """
231 loop = asyncio.get_event_loop()
232 asyncio.async(self.core.get_firmware_version())
233
234 version = loop.run_until_complete(
235 self.core.get_firmware_version())
236 if cb:
237 cb(version)
238 else:
239 return version
240
242 """
243 This method retrieves the Firmata protocol version
244 @param cb: Optional callback reference.
245 @return:If no callback is specified, the firmware version
246 """
247 loop = asyncio.get_event_loop()
248 asyncio.async(self.core.get_protocol_version())
249 version = loop.run_until_complete(self.core.get_protocol_version())
250
251 if cb:
252 cb(version)
253 else:
254 return version
255
257 """
258 This method retrieves a pin state report for the specified pin
259 @param pin: Pin of interest
260 @param cb: optional callback reference
261 @return: pin state report
262 """
263 loop = asyncio.get_event_loop()
264 asyncio.async(self.core.get_pin_state(pin))
265 report = loop.run_until_complete(self.core.get_pin_state(pin))
266 if cb:
267 cb(report)
268 else:
269 return report
270
272 """
273 This method retrieves the PyMata version number
274 @return: PyMata version number.
275 """
276 asyncio.async(self.core.get_pymata_version())
277
279 """
280 This method configures Arduino i2c with an optional read delay time.
281 @param read_delay_time: firmata i2c delay time
282 @return: No return value
283 """
284 loop = asyncio.get_event_loop()
285 asyncio.async(self.core.i2c_config(read_delay_time))
286
287 loop.run_until_complete(self.core.i2c_config(read_delay_time))
288
290 """
291 Retrieve result of last data read from i2c device.
292 i2c_read_request should be called before trying to retrieve data.
293 It is intended for use by a polling application.
294 @param address: i2c
295 @return: last data read or None if no data is present.
296 """
297 loop = asyncio.get_event_loop()
298 asyncio.async(self.core.i2c_read_data(address))
299
300 value = loop.run_until_complete(self.core.i2c_read_data(address))
301 return value
302
303 - def i2c_read_request(self, address, register, number_of_bytes, read_type, cb=None):
304 """
305 This method issues an i2c read request for a single read,continuous read or a stop, specified by the read_type.
306 Because different i2c devices return data at different rates, if a callback is not specified,
307 the user must first call this method and then call i2c_read_data after waiting for sufficient time for the
308 i2c device to respond.
309 Some devices require that transmission be restarted (e.g. MMA8452Q accelerometer).
310 Use I2C_READ | I2C_RESTART_TX for those cases.
311 @param address: i2c device
312 @param register: i2c register number
313 @param number_of_bytes: number of bytes to be returned
314 @param read_type: Constants.I2C_READ, Constants.I2C_READ_CONTINUOUSLY or Constants.I2C_STOP_READING.
315 Constants.I2C_RESTART_TX may be OR'ed when required
316 @param cb: optional callback reference
317 @return: No return value """
318
319 asyncio.async(self.core.i2c_read_request(address, register, number_of_bytes, read_type, cb))
320
322 """
323 Write data to an i2c device.
324 @param address: i2c device address
325 @param args: A variable number of bytes to be sent to the device passed in as a list.
326 @return: No return value """
327 loop = asyncio.get_event_loop()
328 asyncio.async(self.core.i2c_write_request(address, args))
329
330 loop.run_until_complete(self.core.i2c_write_request(address, args))
331
332 - def play_tone(self, pin, tone_command, frequency, duration):
333 """
334 This method will call the Tone library for the selected pin.
335 It requires FirmataPlus to be loaded onto the arduino
336 If the tone command is set to TONE_TONE, then the specified tone will be played.
337 Else, if the tone command is TONE_NO_TONE, then any currently playing tone will be disabled.
338 It is intended for a future release of Arduino Firmata
339 @param pin: Pin number
340 @param tone_command: Either TONE_TONE, or TONE_NO_TONE
341 @param frequency: Frequency of tone
342 @param duration: Duration of tone in milliseconds
343 @return: No return value
344 """
345 asyncio.async(self.core.play_tone(pin, tone_command, frequency, duration))
346
348 """
349 Send a Firmata reset command
350 @return: No return value """
351 loop = asyncio.get_event_loop()
352 asyncio.async(self.core.send_reset())
353 loop.run_until_complete(self.core.send_reset())
354
355 - def servo_config(self, pin, min_pulse=544, max_pulse=2400):
356 """
357 This method configures the Arduino for servo operation.
358 @param pin: Servo control pin
359 @param min_pulse: Minimum pulse width
360 @param max_pulse: Maximum pulse width
361 @return: No return value """
362 asyncio.async(self.core.servo_config(pin, min_pulse, max_pulse))
363
365 """
366 This method "arms" an analog pin for its data to be latched and saved in the latching table
367 If a callback method is provided, when latching criteria is achieved, the callback function is called
368 with latching data notification.
369 @param pin: Analog pin number (value following an 'A' designator, i.e. A5 = 5
370 @param threshold_type: Constants.LATCH_GT | Constants.LATCH_LT | Constants.LATCH_GTE | Constants.LATCH_LTE
371 @param threshold_value: numerical value - between 0 and 1023
372 @param cb: callback method
373 @return: True if successful, False if parameter data is invalid
374 """
375 loop = asyncio.get_event_loop()
376 asyncio.async(self.core.set_analog_latch(pin, threshold_type, threshold_value, cb))
377
378 result = loop.run_until_complete(self.core.set_analog_latch(pin,
379 threshold_type, threshold_value, cb))
380 return result
381
383 """
384 This method "arms" a digital pin for its data to be latched and saved in the latching table
385 If a callback method is provided, when latching criteria is achieved, the callback function is called
386 with latching data notification.
387 @param pin: Digital pin number
388 @param threshold_value: 0 or 1
389 @param cb: callback function
390 @return: True if successful, False if parameter data is invalid
391 """
392 loop = asyncio.get_event_loop()
393 asyncio.async(self.core.set_digital_latch(pin, threshold_value, cb))
394
395 result = loop.run_until_complete(self.core.set_digital_latch(pin, threshold_value, cb))
396 return result
397
398 - def set_pin_mode(self, pin_number, pin_state, callback=None):
399 """
400 This method sets the pin mode for the specified pin.
401 @param pin_number: Arduino Pin Number
402 @param pin_state: INPUT/OUTPUT/ANALOG/PWM - for SERVO use servo_config()
403 @param callback: Optional: A reference to a call back function to be called when pin data value changes
404 @return: No return value """
405 loop = asyncio.get_event_loop()
406 asyncio.async(self.core.set_pin_mode(pin_number, pin_state, callback))
407
408 loop.run_until_complete(self.core.set_pin_mode(pin_number, pin_state, callback))
409
411 """
412 This method sets the sampling interval for the Firmata loop method
413 @param interval: time in milliseconds
414 @return: No return value """
415 asyncio.async(self.core.set_sampling_interval(interval))
416
418 """
419 Perform an asyncio sleep for the time specified in seconds. This method should be used in place of time.sleep()
420 @param time: time in seconds
421 @return: No return value """
422 try:
423 loop = asyncio.get_event_loop()
424 asyncio.async(self.core.sleep(time))
425
426 loop.run_until_complete(self.core.sleep(time))
427 except asyncio.CancelledError:
428 pass
429 except RuntimeError:
430 pass
431
433 """
434 Shutdown the application and exit
435 @return: No return value """
436 print('Shutting down ...')
437 try:
438 loop = asyncio.get_event_loop()
439
440 for t in asyncio.Task.all_tasks(loop):
441 t.cancel()
442 loop.run_until_complete(asyncio.sleep(0.1))
443 self.send_reset()
444 loop.close()
445
446 os._exit(1)
447 except TypeError:
448
449 pass
450 except RuntimeError:
451
452 pass
453 except Exception as ex:
454 print(ex)
455
457 """
458 Retrieve Ping (HC-SR04 type) data. The data is presented as a dictionary.
459 The 'key' is the trigger pin specified in sonar_config() and the 'data' is the
460 current measured distance (in centimeters)
461 for that pin. If there is no data, the value is set to None.
462 This is a FirmataPlus feature.
463
464 @return: active_sonar_map
465 """
466 sonar_data = asyncio.async(self.core.sonar_data_retrieve())
467 return sonar_data
468
469
470 - def sonar_config(self, trigger_pin, echo_pin, cb=None, ping_interval=50, max_distance=200):
471 """
472 Configure the pins,ping interval and maximum distance for an HC-SR04 type device.
473 Single pin configuration may be used. To do so, set both the trigger and echo pins to the same value.
474 Up to a maximum of 6 SONAR devices is supported
475 If the maximum is exceeded a message is sent to the console and the request is ignored.
476 NOTE: data is measured in centimeters
477 This is FirmataPlus feature.
478 @param trigger_pin: The pin number of for the trigger (transmitter).
479 @param echo_pin: The pin number for the received echo.
480 @param cb: optional callback function to report sonar data changes
481 @param ping_interval: Minimum interval between pings. Lowest number to use is 33 ms.Max is 127
482 @param max_distance: Maximum distance in cm. Max is 200.
483 @return: No return value
484 """
485 asyncio.async(self.core.sonar_config(trigger_pin, echo_pin, cb, ping_interval=50,
486 max_distance=200))
487
489 """
490 Configure stepper motor prior to operation.
491 This is a FirmataPlus feature.
492 @param steps_per_revolution: number of steps per motor revolution
493 @param stepper_pins: a list of control pin numbers - either 4 or 2
494 @return: No return value
495
496 """
497 asyncio.async(self.core.stepper_config(steps_per_revolution, stepper_pins))
498
500 """
501 Move a stepper motor for the number of steps at the specified speed
502 This is a FirmataPlus feature.
503 @param motor_speed: 21 bits of data to set motor speed
504 @param number_of_steps: 14 bits for number of steps & direction
505 positive is forward, negative is reverse
506 """
507 loop = asyncio.get_event_loop()
508 asyncio.async(self.core.stepper_step(motor_speed, number_of_steps))
509 loop.run_until_complete(self.core.stepper_step(motor_speed, number_of_steps))
510