Package pymata_aio :: Module pymata3
[hide private]
[frames] | no frames]

Source Code for Module pymata_aio.pymata3

  1  """ 
  2  Copyright (c) 2015 Alan Yorinks All rights reserved. 
  3   
  4  This program is free software; you can redistribute it and/or 
  5  modify it under the terms of the GNU  General Public 
  6  License as published by the Free Software Foundation; either 
  7  version 3 of the License, or (at your option) any later version. 
  8   
  9  This library is distributed in the hope that it will be useful, 
 10  but WITHOUT ANY WARRANTY; without even the implied warranty of 
 11  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
 12  General Public License for more details. 
 13   
 14  You should have received a copy of the GNU General Public 
 15  License along with this library; if not, write to the Free Software 
 16  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA 
 17  """ 
 18   
 19  import asyncio 
 20  import os 
 21   
 22  from .pymata_core import PymataCore 
 23   
 24   
 25  # noinspection PyProtectedMember 
26 -class PyMata3:
27 """ 28 This class exposes and implements a proxy API for the pymata_core asyncio API, If your application 29 does not use asyncio directly, this is the API that you should use.. 30 """ 31
32 - def __init__(self, arduino_wait=2, com_port=None):
33 """ 34 Constructor for the PyMata3 API 35 @param arduino_wait: Amount of time to allow Arduino to finish its reset (2 seconds for Uno, Leonardo can be 0) 36 @param com_port: If specified, auto port detection will not be performed and this com port will be used. 37 @return: None 38 """ 39 self.sleep_tune = .001 40 self.core = PymataCore(arduino_wait, self.sleep_tune, com_port) 41 self.core.start() 42 self.sleep(1)
43
44 - def analog_read(self, pin):
45 """ 46 Retrieve the last data update for the specified analog pin. 47 It is intended for a polling application. 48 @param pin: Analog pin number (ex. A2 is specified as 2) 49 @return: Last value reported for the analog pin 50 """ 51 loop = asyncio.get_event_loop() 52 asyncio.async(self.core.analog_read(pin)) 53 value = loop.run_until_complete(self.core.analog_read(pin)) 54 return value
55
56 - def analog_write(self, pin, value):
57 """ 58 Set the selected PWM pin to the specified value. 59 @param pin: PWM pin number 60 @param value: Set the selected pin to the specified value. 0-0x4000 (14 bits) 61 @return: No return value 62 """ 63 asyncio.async(self.core.analog_write(pin, value))
64
65 - def digital_read(self, pin):
66 """ 67 Retrieve the last data update for the specified digital pin. 68 It is intended for a polling application. 69 @param pin: Digital pin number 70 @return: Last value reported for the digital pin 71 """ 72 loop = asyncio.get_event_loop() 73 asyncio.async(self.core.digital_read(pin)) 74 value = loop.run_until_complete(self.core.digital_read(pin)) 75 return value
76
77 - def digital_write(self, pin, value=0):
78 """ 79 Set the specified digital input pin to the provided value 80 @param pin: Digital pin to be set 81 @param value: 0 or 1 82 @return: No return value 83 """ 84 loop = asyncio.get_event_loop() 85 asyncio.async(self.core.digital_write(pin, value)) 86 loop.run_until_complete(self.core.digital_write(pin, value))
87
88 - def disable_analog_reporting(self, pin):
89 """ 90 Disables analog reporting for a single analog pin. 91 @param pin: Analog pin number. For example for A0, the number is 0. 92 @return: No return value 93 """ 94 asyncio.async(self.core.disable_analog_reporting(pin))
95
96 - def disable_digital_reporting(self, pin):
97 """ 98 Disables digital reporting. By turning reporting off for this pin, reporting 99 is disabled for all 8 bits in the "port" - 100 @param pin: Pin and all pins for this port 101 @return: No return value 102 """ 103 asyncio.async(self.core.disable_analog_reporting(pin))
104
105 - def encoder_config(self, pin_a, pin_b, cb=None):
106 """ 107 This command enables the rotary encoder (2 pin + ground) and will 108 enable encoder reporting. 109 110 This is a FirmataPlus feature. 111 112 Encoder data is retrieved by performing a digital_read from pin a (encoder pin 1) 113 114 @param pin_a: Encoder pin 1. 115 @param pin_b: Encoder pin 2. 116 @param cb: callback function to report encoder changes 117 @return: No return value 118 """ 119 asyncio.async(self.core.encoder_config(pin_a, pin_b, cb))
120
121 - def encoder_read(self, pin):
122 """ 123 This method retrieves the latest encoder data value. It is a FirmataPlus feature. 124 @param pin: Encoder Pin 125 @return: encoder data value 126 """ 127 loop = asyncio.get_event_loop() 128 asyncio.async(self.core.encoder_read(pin)) 129 try: 130 value = loop.run_until_complete(self.core.encoder_read(pin)) 131 return value 132 except RuntimeError: 133 self.shutdown()
134
135 - def enable_analog_reporting(self, pin):
136 """ 137 Enables analog reporting for a single analog pin, 138 @param pin: Analog pin number. For example for A0, the number is 0. 139 @return: No return value 140 """ 141 asyncio.async(self.core.enable_analog_reporting(pin))
142
143 - def enable_digital_reporting(self, pin):
144 """ 145 Enables digital reporting. By turning reporting on for all 8 bits in the "port" - 146 This is part of Firmata's protocol specification. 147 @param pin: Pin and all pins for this port 148 @return: No return value 149 """ 150 asyncio.async(self.core.enable_digital_reporting(pin))
151
152 - def extended_analog(self, pin, data):
153 """ 154 This method will send an extended-data analog write command to the selected pin.. 155 @param pin: 0 - 127 156 @param data: 0 - 0-0x4000 (14 bits) 157 @return: No return value 158 """ 159 loop = asyncio.get_event_loop() 160 asyncio.async(self.core.extended_analog(pin, data)) 161 loop.run_until_complete(self.core.extended_analog(pin, data))
162
163 - def get_analog_latch_data(self, pin):
164 """ 165 A list is returned containing the latch state for the pin, the latched value, and the time stamp 166 [pin_num, latch_state, latched_value, time_stamp] 167 If the the latch state is LATCH_LATCHED, the table is reset (data and timestamp set to zero) 168 It is intended to be used for a polling application. 169 @param pin: Pin number. 170 @return: [latched_state, threshold_type, threshold_value, latched_data, time_stamp] 171 """ 172 loop = asyncio.get_event_loop() 173 asyncio.async(self.core.get_analog_latch_data(pin)) 174 l_data = loop.run_until_complete(self.core.get_analog_latch_data(pin)) 175 return l_data
176
177 - def get_analog_map(self, cb=None):
178 """ 179 This method requests and returns an analog map. 180 @return: An analog map response or None if a timeout occurs 181 """ 182 loop = asyncio.get_event_loop() 183 asyncio.async(self.core.get_analog_map()) 184 report = loop.run_until_complete(self.core.get_analog_map()) 185 if cb: 186 cb(report) 187 else: 188 return report
189
190 - def get_capability_report(self, raw=True, cb=None):
191 """ 192 This method retrieves the Firmata capability report 193 @param raw: If True, it either stores or provides the callback with a report as list. 194 If False, prints a formatted report to the console 195 @param cb: Optional callback reference to receive a raw report 196 @return: capability report 197 """ 198 loop = asyncio.get_event_loop() 199 asyncio.async(self.core.get_capability_report()) 200 201 report = loop.run_until_complete(self.core.get_capability_report()) 202 if raw: 203 if cb: 204 cb(report) 205 else: 206 return report 207 else: 208 # noinspection PyProtectedMember 209 self.core._format_capability_report(report)
210
211 - def get_digital_latch_data(self, pin):
212 """ 213 A list is returned containing the latch state for the pin, the latched value, and the time stamp 214 [pin_num, latch_state, latched_value, time_stamp] 215 If the the latch state is LATCH_LATCHED, the table is reset (data and timestamp set to zero). 216 It is intended for use by a polling application. 217 @param pin: Pin number. 218 @return: [latched_state, threshold_type, threshold_value, latched_data, time_stamp] 219 """ 220 loop = asyncio.get_event_loop() 221 asyncio.async(self.core.get_digital_latch_data(pin)) 222 l_data = loop.run_until_complete(self.core.get_digital_latch_data(pin)) 223 return l_data
224
225 - def get_firmware_version(self, cb=None):
226 """ 227 This method retrieves the Firmata firmware version 228 @param cb: Reference to a callback function 229 @return:If no callback is specified, the firmware version 230 """ 231 loop = asyncio.get_event_loop() 232 asyncio.async(self.core.get_firmware_version()) 233 234 version = loop.run_until_complete( 235 self.core.get_firmware_version()) 236 if cb: 237 cb(version) 238 else: 239 return version
240
241 - def get_protocol_version(self, cb=None):
242 """ 243 This method retrieves the Firmata protocol version 244 @param cb: Optional callback reference. 245 @return:If no callback is specified, the firmware version 246 """ 247 loop = asyncio.get_event_loop() 248 asyncio.async(self.core.get_protocol_version()) 249 version = loop.run_until_complete(self.core.get_protocol_version()) 250 251 if cb: 252 cb(version) 253 else: 254 return version
255
256 - def get_pin_state(self, pin, cb=None):
257 """ 258 This method retrieves a pin state report for the specified pin 259 @param pin: Pin of interest 260 @param cb: optional callback reference 261 @return: pin state report 262 """ 263 loop = asyncio.get_event_loop() 264 asyncio.async(self.core.get_pin_state(pin)) 265 report = loop.run_until_complete(self.core.get_pin_state(pin)) 266 if cb: 267 cb(report) 268 else: 269 return report
270
271 - def get_pymata_version(self):
272 """ 273 This method retrieves the PyMata version number 274 @return: PyMata version number. 275 """ 276 asyncio.async(self.core.get_pymata_version())
277
278 - def i2c_config(self, read_delay_time=0):
279 """ 280 This method configures Arduino i2c with an optional read delay time. 281 @param read_delay_time: firmata i2c delay time 282 @return: No return value 283 """ 284 loop = asyncio.get_event_loop() 285 asyncio.async(self.core.i2c_config(read_delay_time)) 286 287 loop.run_until_complete(self.core.i2c_config(read_delay_time))
288
289 - def i2c_read_data(self, address):
290 """ 291 Retrieve result of last data read from i2c device. 292 i2c_read_request should be called before trying to retrieve data. 293 It is intended for use by a polling application. 294 @param address: i2c 295 @return: last data read or None if no data is present. 296 """ 297 loop = asyncio.get_event_loop() 298 asyncio.async(self.core.i2c_read_data(address)) 299 300 value = loop.run_until_complete(self.core.i2c_read_data(address)) 301 return value
302
303 - def i2c_read_request(self, address, register, number_of_bytes, read_type, cb=None):
304 """ 305 This method issues an i2c read request for a single read,continuous read or a stop, specified by the read_type. 306 Because different i2c devices return data at different rates, if a callback is not specified, 307 the user must first call this method and then call i2c_read_data after waiting for sufficient time for the 308 i2c device to respond. 309 Some devices require that transmission be restarted (e.g. MMA8452Q accelerometer). 310 Use I2C_READ | I2C_RESTART_TX for those cases. 311 @param address: i2c device 312 @param register: i2c register number 313 @param number_of_bytes: number of bytes to be returned 314 @param read_type: Constants.I2C_READ, Constants.I2C_READ_CONTINUOUSLY or Constants.I2C_STOP_READING. 315 Constants.I2C_RESTART_TX may be OR'ed when required 316 @param cb: optional callback reference 317 @return: No return value """ 318 # loop = asyncio.get_event_loop() 319 asyncio.async(self.core.i2c_read_request(address, register, number_of_bytes, read_type, cb))
320
321 - def i2c_write_request(self, address, args):
322 """ 323 Write data to an i2c device. 324 @param address: i2c device address 325 @param args: A variable number of bytes to be sent to the device passed in as a list. 326 @return: No return value """ 327 loop = asyncio.get_event_loop() 328 asyncio.async(self.core.i2c_write_request(address, args)) 329 330 loop.run_until_complete(self.core.i2c_write_request(address, args))
331
332 - def play_tone(self, pin, tone_command, frequency, duration):
333 """ 334 This method will call the Tone library for the selected pin. 335 It requires FirmataPlus to be loaded onto the arduino 336 If the tone command is set to TONE_TONE, then the specified tone will be played. 337 Else, if the tone command is TONE_NO_TONE, then any currently playing tone will be disabled. 338 It is intended for a future release of Arduino Firmata 339 @param pin: Pin number 340 @param tone_command: Either TONE_TONE, or TONE_NO_TONE 341 @param frequency: Frequency of tone 342 @param duration: Duration of tone in milliseconds 343 @return: No return value 344 """ 345 asyncio.async(self.core.play_tone(pin, tone_command, frequency, duration))
346
347 - def send_reset(self):
348 """ 349 Send a Firmata reset command 350 @return: No return value """ 351 loop = asyncio.get_event_loop() 352 asyncio.async(self.core.send_reset()) 353 loop.run_until_complete(self.core.send_reset())
354
355 - def servo_config(self, pin, min_pulse=544, max_pulse=2400):
356 """ 357 This method configures the Arduino for servo operation. 358 @param pin: Servo control pin 359 @param min_pulse: Minimum pulse width 360 @param max_pulse: Maximum pulse width 361 @return: No return value """ 362 asyncio.async(self.core.servo_config(pin, min_pulse, max_pulse))
363
364 - def set_analog_latch(self, pin, threshold_type, threshold_value, cb=None):
365 """ 366 This method "arms" an analog pin for its data to be latched and saved in the latching table 367 If a callback method is provided, when latching criteria is achieved, the callback function is called 368 with latching data notification. 369 @param pin: Analog pin number (value following an 'A' designator, i.e. A5 = 5 370 @param threshold_type: Constants.LATCH_GT | Constants.LATCH_LT | Constants.LATCH_GTE | Constants.LATCH_LTE 371 @param threshold_value: numerical value - between 0 and 1023 372 @param cb: callback method 373 @return: True if successful, False if parameter data is invalid 374 """ 375 loop = asyncio.get_event_loop() 376 asyncio.async(self.core.set_analog_latch(pin, threshold_type, threshold_value, cb)) 377 378 result = loop.run_until_complete(self.core.set_analog_latch(pin, 379 threshold_type, threshold_value, cb)) 380 return result
381
382 - def set_digital_latch(self, pin, threshold_value, cb=None):
383 """ 384 This method "arms" a digital pin for its data to be latched and saved in the latching table 385 If a callback method is provided, when latching criteria is achieved, the callback function is called 386 with latching data notification. 387 @param pin: Digital pin number 388 @param threshold_value: 0 or 1 389 @param cb: callback function 390 @return: True if successful, False if parameter data is invalid 391 """ 392 loop = asyncio.get_event_loop() 393 asyncio.async(self.core.set_digital_latch(pin, threshold_value, cb)) 394 395 result = loop.run_until_complete(self.core.set_digital_latch(pin, threshold_value, cb)) 396 return result
397
398 - def set_pin_mode(self, pin_number, pin_state, callback=None):
399 """ 400 This method sets the pin mode for the specified pin. 401 @param pin_number: Arduino Pin Number 402 @param pin_state: INPUT/OUTPUT/ANALOG/PWM - for SERVO use servo_config() 403 @param callback: Optional: A reference to a call back function to be called when pin data value changes 404 @return: No return value """ 405 loop = asyncio.get_event_loop() 406 asyncio.async(self.core.set_pin_mode(pin_number, pin_state, callback)) 407 408 loop.run_until_complete(self.core.set_pin_mode(pin_number, pin_state, callback))
409
410 - def set_sampling_interval(self, interval):
411 """ 412 This method sets the sampling interval for the Firmata loop method 413 @param interval: time in milliseconds 414 @return: No return value """ 415 asyncio.async(self.core.set_sampling_interval(interval))
416
417 - def sleep(self, time):
418 """ 419 Perform an asyncio sleep for the time specified in seconds. This method should be used in place of time.sleep() 420 @param time: time in seconds 421 @return: No return value """ 422 try: 423 loop = asyncio.get_event_loop() 424 asyncio.async(self.core.sleep(time)) 425 426 loop.run_until_complete(self.core.sleep(time)) 427 except asyncio.CancelledError: 428 pass 429 except RuntimeError: 430 pass
431
432 - def shutdown(self):
433 """ 434 Shutdown the application and exit 435 @return: No return value """ 436 print('Shutting down ...') 437 try: 438 loop = asyncio.get_event_loop() 439 # self.send_reset() 440 for t in asyncio.Task.all_tasks(loop): 441 t.cancel() 442 loop.run_until_complete(asyncio.sleep(0.1)) 443 self.send_reset() 444 loop.close() 445 # keeps pytest happy 446 os._exit(1) 447 except TypeError: 448 # ignore the error 449 pass 450 except RuntimeError: 451 # ignore 452 pass 453 except Exception as ex: 454 print(ex)
455
456 - def sonar_data_retrieve(self):
457 """ 458 Retrieve Ping (HC-SR04 type) data. The data is presented as a dictionary. 459 The 'key' is the trigger pin specified in sonar_config() and the 'data' is the 460 current measured distance (in centimeters) 461 for that pin. If there is no data, the value is set to None. 462 This is a FirmataPlus feature. 463 464 @return: active_sonar_map 465 """ 466 sonar_data = asyncio.async(self.core.sonar_data_retrieve()) 467 return sonar_data
468 469 # noinspection PyUnusedLocal
470 - def sonar_config(self, trigger_pin, echo_pin, cb=None, ping_interval=50, max_distance=200):
471 """ 472 Configure the pins,ping interval and maximum distance for an HC-SR04 type device. 473 Single pin configuration may be used. To do so, set both the trigger and echo pins to the same value. 474 Up to a maximum of 6 SONAR devices is supported 475 If the maximum is exceeded a message is sent to the console and the request is ignored. 476 NOTE: data is measured in centimeters 477 This is FirmataPlus feature. 478 @param trigger_pin: The pin number of for the trigger (transmitter). 479 @param echo_pin: The pin number for the received echo. 480 @param cb: optional callback function to report sonar data changes 481 @param ping_interval: Minimum interval between pings. Lowest number to use is 33 ms.Max is 127 482 @param max_distance: Maximum distance in cm. Max is 200. 483 @return: No return value 484 """ 485 asyncio.async(self.core.sonar_config(trigger_pin, echo_pin, cb, ping_interval=50, 486 max_distance=200))
487
488 - def stepper_config(self, steps_per_revolution, stepper_pins):
489 """ 490 Configure stepper motor prior to operation. 491 This is a FirmataPlus feature. 492 @param steps_per_revolution: number of steps per motor revolution 493 @param stepper_pins: a list of control pin numbers - either 4 or 2 494 @return: No return value 495 496 """ 497 asyncio.async(self.core.stepper_config(steps_per_revolution, stepper_pins))
498
499 - def stepper_step(self, motor_speed, number_of_steps):
500 """ 501 Move a stepper motor for the number of steps at the specified speed 502 This is a FirmataPlus feature. 503 @param motor_speed: 21 bits of data to set motor speed 504 @param number_of_steps: 14 bits for number of steps & direction 505 positive is forward, negative is reverse 506 """ 507 loop = asyncio.get_event_loop() 508 asyncio.async(self.core.stepper_step(motor_speed, number_of_steps)) 509 loop.run_until_complete(self.core.stepper_step(motor_speed, number_of_steps))
510