Package pymata_aio :: Module pymata_core
[hide private]
[frames] | no frames]

Source Code for Module pymata_aio.pymata_core

   1  """ 
   2  Copyright (c) 2015 Alan Yorinks All rights reserved. 
   3   
   4  This program is free software; you can redistribute it and/or 
   5  modify it under the terms of the GNU  General Public 
   6  License as published by the Free Software Foundation; either 
   7  version 3 of the License, or (at your option) any later version. 
   8   
   9  This library is distributed in the hope that it will be useful, 
  10  but WITHOUT ANY WARRANTY; without even the implied warranty of 
  11  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
  12  General Public License for more details. 
  13   
  14  You should have received a copy of the GNU General Public 
  15  License along with this library; if not, write to the Free Software 
  16  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA 
  17  """ 
  18   
  19  import asyncio 
  20  import sys 
  21  import time 
  22  import signal 
  23  # import os 
  24   
  25  import serial 
  26   
  27  from .constants import Constants 
  28  from .private_constants import PrivateConstants 
  29  from .pin_data import PinData 
  30  from .pymata_serial import PymataSerial 
31 32 33 -class PymataCore:
34 """ 35 This class exposes and implements the pymata_core asyncio API, It includes the public API methods as well as 36 a set of private methods. If your application is using asyncio, this is the API that you should use. 37 38 After instantiating this class, its "start" method MUST be called to perform Arduino pin auto-detection. 39 """ 40
41 - def __init__(self, arduino_wait=2, sleep_tune=.001, com_port=None):
42 """ 43 This is the "constructor" method for the PymataCore class. 44 @param arduino_wait: Amount of time to wait for Arduino to reset. UNO takes 2 seconds, Leonardo can be zero 45 @param sleep_tune: This parameter sets the amount of time PyMata core uses to set asyncio.sleep 46 @param com_port: Manually selected com port - normally it is auto-detected 47 @return: This method never returns 48 """ 49 self.sleep_tune = sleep_tune 50 self.arduino_wait = arduino_wait 51 self.com_port = com_port 52 53 # this dictionary for mapping incoming Firmata message types to handlers for the messages 54 self.command_dictionary = {PrivateConstants.REPORT_VERSION: self._report_version, 55 PrivateConstants.REPORT_FIRMWARE: self._report_firmware, 56 PrivateConstants.CAPABILITY_RESPONSE: self._capability_response, 57 PrivateConstants.ANALOG_MAPPING_RESPONSE: self._analog_mapping_response, 58 PrivateConstants.PIN_STATE_RESPONSE: self._pin_state_response, 59 PrivateConstants.STRING_DATA: self._string_data, 60 PrivateConstants.ANALOG_MESSAGE: self._analog_message, 61 PrivateConstants.DIGITAL_MESSAGE: self._digital_message, 62 PrivateConstants.I2C_REPLY: self._i2c_reply, 63 PrivateConstants.SONAR_DATA: self._sonar_data, 64 PrivateConstants.ENCODER_DATA: self._encoder_data} 65 66 # report query results are stored in this dictionary 67 self.query_reply_data = {PrivateConstants.REPORT_VERSION: '', PrivateConstants.STRING_DATA: '', 68 PrivateConstants.REPORT_FIRMWARE: '', 69 PrivateConstants.CAPABILITY_RESPONSE: None, 70 PrivateConstants.ANALOG_MAPPING_RESPONSE: None, 71 PrivateConstants.PIN_STATE_RESPONSE: None} 72 # PrivateConstants.PYMATA_VERSION: 'PyMata_aio version 1.01'} 73 74 # An i2c_map entry consists of a device i2c address as the key, and the value of the key consists of a 75 # dictionary containing 2 entries. The first entry. 'value' contains the last value reported, and 76 # the second, 'callback' contains a reference to a callback function. 77 # For example: {12345: {'value': 23, 'callback': None}} 78 self.i2c_map = {} 79 80 # the active_sonar_map maps the sonar trigger pin number (the key) to the current data value returned 81 # if a callback was specified, it is stored in the map as well. 82 # an entry in the map consists of: 83 # pin: [callback,[current_data_returned]] 84 self.active_sonar_map = {} 85 86 # The latch_map is a dictionary that stores all latches setup by the user. 87 # The key is a string defined as follows: 88 # Digital Pin : D + pin number (D12) 89 # Analog Pin: A + pin number (A3) 90 91 # The value associated with each key is a list comprised of: 92 93 # [latched_state, threshold_type, threshold_value, latched_data, time_stamp] 94 95 # latched_state: Each list entry contains a latch state, a threshold type, a threshold value value 96 # the data value at time of latching, and a date stamp when latched. 97 # A latch state: 98 99 # LATCH_IGNORE = 0 # this item currently not participating in latching 100 # LATCH_ARMED = 1 # When the next item value change is received and,if it matches the latch 101 # criteria, the data will be latched 102 # LATCH_LATCHED = 2 # data has been latched. Read the data to re-arm the latch 103 104 # threshold type: 105 # LATCH_EQ = 0 data value is equal to the latch threshold value 106 # LATCH_GT = 1 data value is greater than the latch threshold value 107 # LATCH_LT = 2 data value is less than the latch threshold value 108 # LATCH_GTE = 3 data value is greater than or equal to the latch threshold value 109 # LATCH_LTE = 4 # data value is less than or equal to the latch threshold value 110 111 # threshold value: target threshold data value 112 113 # latched data value: value of data at time of latching event 114 115 # time stamp: time of latching event 116 117 # analog_latch_table entry = pin: [latched_state, threshold_type, threshold_value, latched_data, time_stamp] 118 # digital_latch_table_entry = pin: [latched_state, threshold_type, latched_data, time_stamp] 119 120 self.latch_map = {} 121 122 print('{}{}{}'.format('\n', "pymata_aio Version " + PrivateConstants.PYMATA_VERSION, 123 '\tCopyright (c) 2015 Alan Yorinks All rights reserved.\n')) 124 sys.stdout.flush() 125 126 if com_port is None: 127 self.com_port = self._discover_port() 128 129 self.sleep_tune = sleep_tune 130 131 # a list of PinData objects - one for each pin segregate by pin type 132 self.analog_pins = [] 133 self.digital_pins = [] 134 self.loop = None 135 self.the_task = None 136 self.serial_port = None
137
138 - def start(self):
139 """ 140 This method must be called immediately after the class is instantiated. It instantiates the serial 141 interface and then performs auto pin discovery. 142 @return:No return value. 143 """ 144 self.loop = asyncio.get_event_loop() 145 try: 146 self.serial_port = PymataSerial(self.com_port, 57600, self.sleep_tune) 147 except serial.SerialException: 148 print('Cannot instantiate serial interface: ' + self.com_port) 149 sys.exit(0) 150 151 # wait for arduino to go through a reset cycle if need be 152 time.sleep(self.arduino_wait) 153 154 # register the get_command method with the event loop 155 self.loop = asyncio.get_event_loop() 156 self.the_task = self.loop.create_task(self._command_dispatcher()) 157 158 # get an analog pin map 159 asyncio.async(self.get_analog_map()) 160 161 # try to get an analog report. if it comes back as none - shutdown 162 report = self.loop.run_until_complete(self.get_analog_map()) 163 if not report: 164 print('\n\n{} {} ***'.format('*** Analog map query timed out waiting for port:', 165 self.serial_port.com_port)) 166 print('\nIs your Arduino plugged in?') 167 try: 168 loop = asyncio.get_event_loop() 169 for t in asyncio.Task.all_tasks(loop): 170 t.cancel() 171 loop.run_until_complete(asyncio.sleep(.1)) 172 loop.close() 173 loop.stop() 174 sys.exit(0) 175 except RuntimeError: 176 # this suppresses the Event Loop Is Running message, which may be a bug in python 3.4.3 177 sys.exit(0) 178 except TypeError: 179 sys.exit(0) 180 181 # custom assemble the pin lists 182 for pin in report: 183 digital_data = PinData() 184 self.digital_pins.append(digital_data) 185 if pin != Constants.IGNORE: 186 analog_data = PinData() 187 self.analog_pins.append(analog_data) 188 189 print('{} {} {} {} {}'.format('Auto-discovery complete. Found', len(self.digital_pins), 'Digital Pins and', 190 len(self.analog_pins), 'Analog Pins\n\n'))
191 192 @asyncio.coroutine
193 - def analog_read(self, pin):
194 """ 195 Retrieve the last data update for the specified analog pin. 196 @param pin: Analog pin number (ex. A2 is specified as 2) 197 @return: Last value reported for the analog pin 198 """ 199 return self.analog_pins[pin].current_value
200 201 @asyncio.coroutine
202 - def analog_write(self, pin, value):
203 """ 204 Set the selected pin to the specified value. 205 @param pin: PWM pin number 206 @param value: Pin value (0 - 0x4000) 207 @return: No return value 208 """ 209 if PrivateConstants.ANALOG_MESSAGE + pin < 0xf0: 210 command = [PrivateConstants.ANALOG_MESSAGE + pin, value & 0x7f, value >> 7] 211 yield from self._send_command(command) 212 else: 213 yield from self.extended_analog(pin, value)
214 215 @asyncio.coroutine
216 - def digital_read(self, pin):
217 """ 218 Retrieve the last data update for the specified digital pin. 219 @param pin: Digital pin number 220 @return: Last value reported for the digital pin 221 """ 222 return self.digital_pins[pin].current_value
223 224 @asyncio.coroutine
225 - def digital_write(self, pin, value):
226 """ 227 Set the specified pin to the specified value. 228 @param pin: pin number 229 @param value: pin value 230 @return: No return value 231 """ 232 # The command value is not a fixed value, but needs to be calculated using the 233 # pin's port number 234 port = pin // 8 235 236 calculated_command = PrivateConstants.DIGITAL_MESSAGE + port 237 mask = 1 << (pin % 8) 238 # Calculate the value for the pin's position in the port mask 239 if value == 1: 240 PrivateConstants.DIGITAL_OUTPUT_PORT_PINS[port] |= mask 241 else: 242 PrivateConstants.DIGITAL_OUTPUT_PORT_PINS[port] &= ~mask 243 244 # Assemble the command 245 command = (calculated_command, PrivateConstants.DIGITAL_OUTPUT_PORT_PINS[port] & 0x7f, 246 PrivateConstants.DIGITAL_OUTPUT_PORT_PINS[port] >> 7) 247 248 yield from self._send_command(command)
249 250 @asyncio.coroutine
251 - def disable_analog_reporting(self, pin):
252 """ 253 Disables analog reporting for a single analog pin. 254 @param pin: Analog pin number. For example for A0, the number is 0. 255 @return: No return value 256 """ 257 command = [PrivateConstants.REPORT_ANALOG + pin, PrivateConstants.REPORTING_DISABLE] 258 yield from self._send_command(command)
259 260 @asyncio.coroutine
261 - def disable_digital_reporting(self, pin):
262 """ 263 Disables digital reporting. By turning reporting off for this pin, Reporting 264 is disabled for all 8 bits in the "port" - 265 @param pin: Pin and all pins for this port 266 @return: No return value 267 """ 268 port = pin // 8 269 command = [PrivateConstants.REPORT_DIGITAL + port, PrivateConstants.REPORTING_DISABLE] 270 yield from self._send_command(command)
271 272 @asyncio.coroutine
273 - def encoder_config(self, pin_a, pin_b, cb=None):
274 """ 275 This command enables the rotary encoder (2 pin + ground) and will 276 enable encoder reporting. 277 278 NOTE: This command is not currently part of standard arduino firmata, but is provided for legacy 279 support of CodeShield on an Arduino UNO. 280 281 Encoder data is retrieved by performing a digital_read from pin a (encoder pin_a) 282 283 @param pin_a: Encoder pin 1. 284 @param pin_b: Encoder pin 2. 285 @param cb: callback function to report encoder changes 286 @return: No return value 287 """ 288 data = [pin_a, pin_b] 289 if cb: 290 self.digital_pins[pin_a].cb = cb 291 yield from self._send_sysex(PrivateConstants.ENCODER_CONFIG, data)
292 293 @asyncio.coroutine
294 - def encoder_read(self, pin):
295 """ 296 This method retrieves the latest encoder data value 297 @param pin: Encoder Pin 298 @return: encoder data value 299 """ 300 return self.digital_pins[pin].current_value
301 302 @asyncio.coroutine
303 - def enable_analog_reporting(self, pin):
304 """ 305 Enables analog reporting. By turning reporting on for a single pin, 306 @param pin: Analog pin number. For example for A0, the number is 0. 307 @return: No return value 308 """ 309 command = [PrivateConstants.REPORT_ANALOG + pin, PrivateConstants.REPORTING_ENABLE] 310 yield from self._send_command(command)
311 312 @asyncio.coroutine
313 - def enable_digital_reporting(self, pin):
314 """ 315 Enables digital reporting. By turning reporting on for all 8 bits in the "port" - 316 this is part of Firmata's protocol specification. 317 @param pin: Pin and all pins for this port 318 @return: No return value 319 """ 320 port = pin // 8 321 command = [PrivateConstants.REPORT_DIGITAL + port, PrivateConstants.REPORTING_ENABLE] 322 yield from self._send_command(command)
323 324 @asyncio.coroutine
325 - def extended_analog(self, pin, data):
326 """ 327 This method will send an extended-data analog write command to the selected pin. 328 @param pin: 0 - 127 329 @param data: 0 - 0xfffff 330 @return: No return value 331 """ 332 analog_data = [pin, data & 0x7f, (data >> 7) & 0x7f, data >> 14] 333 yield from self._send_sysex(PrivateConstants.EXTENDED_ANALOG, analog_data)
334 335 @asyncio.coroutine
336 - def get_analog_latch_data(self, pin):
337 """ 338 A list is returned containing the latch state for the pin, the latched value, and the time stamp 339 [latched_state, threshold_type, threshold_value, latched_data, time_stamp] 340 @param pin: Pin number. 341 @return: [latched_state, threshold_type, threshold_value, latched_data, time_stamp] 342 """ 343 key = 'A' + str(pin) 344 if key in self.latch_map: 345 entry = self.latch_map.get(key) 346 return entry 347 else: 348 return None
349 350 @asyncio.coroutine
351 - def get_analog_map(self):
352 """ 353 This method requests a Firmata analog map query and returns the results. 354 @return: An analog map response or None if a timeout occurs 355 """ 356 # get the current time to make sure a report is retrieved 357 current_time = time.time() 358 359 # if we do not have existing report results, send a Firmata message to request one 360 if self.query_reply_data.get(PrivateConstants.ANALOG_MAPPING_QUERY) is None: 361 yield from self._send_sysex(PrivateConstants.ANALOG_MAPPING_QUERY, None) 362 # wait for the report results to return for 2 seconds 363 # if the timer expires, shutdown 364 while self.query_reply_data.get(PrivateConstants.ANALOG_MAPPING_RESPONSE) is None: 365 elapsed_time = time.time() 366 if elapsed_time - current_time > 2: 367 return None 368 yield from asyncio.sleep(self.sleep_tune) 369 return self.query_reply_data.get(PrivateConstants.ANALOG_MAPPING_RESPONSE)
370 371 @asyncio.coroutine
372 - def get_capability_report(self):
373 """ 374 This method requests and returns a Firmata capability query report 375 @return: A capability report in the form of a list 376 """ 377 if self.query_reply_data.get(PrivateConstants.CAPABILITY_QUERY) is None: 378 yield from self._send_sysex(PrivateConstants.CAPABILITY_QUERY, None) 379 while self.query_reply_data.get(PrivateConstants.CAPABILITY_RESPONSE) is None: 380 yield from asyncio.sleep(self.sleep_tune) 381 return self.query_reply_data.get(PrivateConstants.CAPABILITY_RESPONSE)
382 383 @asyncio.coroutine
384 - def get_digital_latch_data(self, pin):
385 """ 386 A list is returned containing the latch state for the pin, the latched value, and the time stamp 387 [pin_num, latch_state, latched_value, time_stamp] 388 @param pin: Pin number. 389 @return: [latched_state, threshold_type, threshold_value, latched_data, time_stamp] 390 """ 391 key = 'D' + str(pin) 392 if key in self.latch_map: 393 entry = self.latch_map.get(key) 394 return entry 395 else: 396 return None
397 398 @asyncio.coroutine
399 - def get_firmware_version(self):
400 """ 401 This method retrieves the Firmata firmware version 402 @return: Firmata firmware version 403 """ 404 if self.query_reply_data.get(PrivateConstants.REPORT_FIRMWARE) == '': 405 yield from self._send_sysex(PrivateConstants.REPORT_FIRMWARE, None) 406 while self.query_reply_data.get(PrivateConstants.REPORT_FIRMWARE) == '': 407 yield from asyncio.sleep(self.sleep_tune) 408 reply = '' 409 for x in self.query_reply_data.get(PrivateConstants.REPORT_FIRMWARE): 410 reply_data = ord(x) 411 if reply_data: 412 reply += chr(reply_data) 413 self.query_reply_data[PrivateConstants.REPORT_FIRMWARE] = reply 414 return self.query_reply_data.get(PrivateConstants.REPORT_FIRMWARE)
415 416 @asyncio.coroutine
417 - def get_protocol_version(self):
418 """ 419 This method returns the major and minor values for the protocol version, i.e. 2.4 420 @return: Firmata protocol version 421 """ 422 if self.query_reply_data.get(PrivateConstants.REPORT_VERSION) == '': 423 yield from self._send_command(PrivateConstants.REPORT_VERSION) 424 while self.query_reply_data.get(PrivateConstants.REPORT_VERSION) == '': 425 yield from asyncio.sleep(self.sleep_tune) 426 return self.query_reply_data.get(PrivateConstants.REPORT_VERSION)
427 428 @asyncio.coroutine
429 - def get_pin_state(self, pin):
430 """ 431 This method retrieves a pin state report for the specified pin 432 @param pin: Pin of interest 433 @return: pin state report 434 """ 435 pin_list = [pin] 436 yield from self._send_sysex(PrivateConstants.PIN_STATE_QUERY, pin_list) 437 while self.query_reply_data.get(PrivateConstants.PIN_STATE_RESPONSE) is None: 438 yield from asyncio.sleep(self.sleep_tune) 439 pin_state_report = self.query_reply_data.get(PrivateConstants.PIN_STATE_RESPONSE) 440 self.query_reply_data[PrivateConstants.PIN_STATE_RESPONSE] = None 441 return pin_state_report
442 443 @asyncio.coroutine
444 - def get_pymata_version(self):
445 """ 446 This method retrieves the PyMata version number 447 @return: PyMata version number. 448 """ 449 return PrivateConstants.PYMATA_VERSION
450 451 @asyncio.coroutine
452 - def i2c_config(self, read_delay_time=0):
453 """ 454 NOTE: THIS METHOD MUST BE CALLED BEFORE ANY I2C REQUEST IS MADE 455 This method initializes Firmata for I2c operations. 456 457 @param read_delay_time (in microseconds): an optional parameter, default is 0 458 @return: No Return Value 459 """ 460 data = [read_delay_time & 0x7f, read_delay_time >> 7] 461 yield from self._send_sysex(PrivateConstants.I2C_CONFIG, data)
462 463 @asyncio.coroutine
464 - def i2c_read_data(self, address):
465 """ 466 This method retrieves cached i2c data to support a polling mode. 467 @param address: I2C device address 468 @return:Last cached value read 469 """ 470 if address in self.i2c_map: 471 map_entry = self.i2c_map.get(address) 472 data = map_entry.get('value') 473 return data 474 else: 475 return None
476 477 @asyncio.coroutine
478 - def i2c_read_request(self, address, register, number_of_bytes, read_type, cb=None):
479 """ 480 This method requests the read of an i2c device. Results are retrieved by a call to 481 i2c_get_read_data(). or by callback. 482 If a callback method is provided, when data is received from the device it will be sent to the callback method. 483 Some devices require that transmission be restarted (e.g. MMA8452Q acceleromater). 484 Use I2C_READ | I2C_RESTART_TX for those cases. 485 @param address: i2c device address 486 @param register: register number (can be set to zero) 487 @param number_of_bytes: number of bytes expected to be returned 488 @param read_type: I2C_READ or I2C_READ_CONTINUOUSLY. I2C_RESTART_TX may be OR'ed when required 489 @param cb: Optional callback function to report i2c data as result of read command 490 @return: No return value. 491 """ 492 493 if address not in self.i2c_map: 494 # self.i2c_map[address] = [None, cb] 495 self.i2c_map[address] = {'value': None, 'callback': cb} 496 data = [address, read_type, register & 0x7f, register >> 7, 497 number_of_bytes & 0x7f, number_of_bytes >> 7] 498 yield from self._send_sysex(PrivateConstants.I2C_REQUEST, data)
499 500 @asyncio.coroutine
501 - def i2c_write_request(self, address, args):
502 """ 503 Write data to an i2c device. 504 @param address: i2c device address 505 @param args: A variable number of bytes to be sent to the device passed in as a list 506 @return: No return value. 507 """ 508 data = [address, Constants.I2C_WRITE] 509 for item in args: 510 item_lsb = item & 0x7f 511 data.append(item_lsb) 512 item_msb = item >> 7 513 data.append(item_msb) 514 yield from self._send_sysex(PrivateConstants.I2C_REQUEST, data)
515 516 @asyncio.coroutine
517 - def play_tone(self, pin, tone_command, frequency, duration):
518 """ 519 This method will call the Tone library for the selected pin. 520 It requires FirmataPlus to be loaded onto the arduino 521 If the tone command is set to TONE_TONE, then the specified tone will be played. 522 Else, if the tone command is TONE_NO_TONE, then any currently playing tone will be disabled. 523 @param pin: Pin number 524 @param tone_command: Either TONE_TONE, or TONE_NO_TONE 525 @param frequency: Frequency of tone 526 @param duration: Duration of tone in milliseconds 527 @return: No return value 528 """ 529 # convert the integer values to bytes 530 if tone_command == Constants.TONE_TONE: 531 # duration is specified 532 if duration: 533 data = [tone_command, pin, frequency & 0x7f, frequency >> 7, duration & 0x7f, duration >> 7] 534 535 else: 536 data = [tone_command, pin, frequency & 0x7f, frequency >> 7, 0, 0] 537 538 # self._command_handler.digital_response_table[pin][self._command_handler.RESPONSE_TABLE_MODE] = \ 539 # self.TONE 540 # turn off tone 541 else: 542 data = [tone_command, pin] 543 yield from self._send_sysex(PrivateConstants.TONE_DATA, data)
544 545 @asyncio.coroutine
546 - def send_reset(self):
547 """ 548 Send a Sysex reset command to the arduino 549 @return: No return value. 550 """ 551 try: 552 yield from self._send_command([PrivateConstants.SYSTEM_RESET]) 553 except RuntimeError: 554 exit(0)
555 556 @asyncio.coroutine
557 - def servo_config(self, pin, min_pulse=544, max_pulse=2400):
558 """ 559 Configure a pin as a servo pin. Set pulse min, max in ms. 560 Use this method (not set_pin_mode) to configure a pin for servo operation. 561 @param pin: Servo Pin. 562 @param min_pulse: Min pulse width in ms. 563 @param max_pulse: Max pulse width in ms. 564 @return: No return value 565 """ 566 # self.set_pin_mode(pin, PrivateConstants.SERVO, PrivateConstants.OUTPUT) 567 command = [pin, min_pulse & 0x7f, min_pulse >> 7, max_pulse & 0x7f, 568 max_pulse >> 7] 569 570 yield from self._send_sysex(PrivateConstants.SERVO_CONFIG, command)
571 572 @asyncio.coroutine
573 - def set_analog_latch(self, pin, threshold_type, threshold_value, cb=None):
574 """ 575 This method "arms" an analog pin for its data to be latched and saved in the latching table 576 If a callback method is provided, when latching criteria is achieved, the callback function is called 577 with latching data notification. 578 Data returned in the callback list has the pin number as the first element, 579 @param pin: Analog pin number (value following an 'A' designator, i.e. A5 = 5 580 @param threshold_type: ANALOG_LATCH_GT | ANALOG_LATCH_LT | ANALOG_LATCH_GTE | ANALOG_LATCH_LTE 581 @param threshold_value: numerical value - between 0 and 1023 582 @param cb: callback method 583 @return: True if successful, False if parameter data is invalid 584 """ 585 if Constants.LATCH_GT <= threshold_type <= Constants.LATCH_LTE: 586 key = 'A' + str(pin) 587 if 0 <= threshold_value <= 1023: 588 self.latch_map[key] = [Constants.LATCH_ARMED, threshold_type, threshold_value, 0, 0, cb] 589 return True 590 else: 591 return False
592 593 @asyncio.coroutine
594 - def set_digital_latch(self, pin, threshold_value, cb=None):
595 """ 596 This method "arms" a digital pin for its data to be latched and saved in the latching table 597 If a callback method is provided, when latching criteria is achieved, the callback function is called 598 with latching data notification. 599 Data returned in the callback list has the pin number as the first element, 600 @param pin: Digital pin number 601 @param threshold_value: 0 or 1 602 @param cb: callback function 603 @return: True if successful, False if parameter data is invalid 604 """ 605 if 0 <= threshold_value <= 1: 606 key = 'D' + str(pin) 607 self.latch_map[key] = [Constants.LATCH_ARMED, Constants.LATCH_EQ, threshold_value, 0, 0, cb] 608 return True 609 else: 610 return False
611 612 @asyncio.coroutine
613 - def set_pin_mode(self, pin_number, pin_state, callback=None):
614 """ 615 This method sets the pin mode for the specified pin. For Servo, use servo_config() instead. 616 @param pin_number: Arduino Pin Number 617 @param pin_state:INPUT/OUTPUT/ANALOG/PWM/ 618 @param callback: Optional: A reference to a call back function to be called when pin data value changes 619 @return: No return value. 620 """ 621 if callback: 622 if pin_state == Constants.INPUT: 623 self.digital_pins[pin_number].cb = callback 624 elif pin_state == Constants.ANALOG: 625 self.analog_pins[pin_number].cb = callback 626 else: 627 print('{} {}'.format('set_pin_mode: callback ignored for pin state:', pin_state)) 628 # 629 if pin_state == Constants.INPUT or pin_state == Constants.ANALOG: 630 pin_mode = Constants.INPUT 631 else: 632 pin_mode = pin_state 633 command = [PrivateConstants.SET_PIN_MODE, pin_number, pin_mode] 634 yield from self._send_command(command) 635 if pin_state == Constants.ANALOG: 636 yield from self.enable_analog_reporting(pin_number) 637 elif pin_state == Constants.INPUT: 638 yield from self.enable_digital_reporting(pin_number) 639 else: 640 pass
641 642 @asyncio.coroutine
643 - def set_sampling_interval(self, interval):
644 """ 645 This method sends the desired sampling interval to Firmata. 646 Note: Standard Firmata will ignore any interval less than 10 milliseconds 647 @param interval: Integer value for desired sampling interval in milliseconds 648 @return: No return value. 649 """ 650 data = [interval & 0x7f, interval >> 7] 651 self._send_sysex(PrivateConstants.SAMPLING_INTERVAL, data)
652 653 @asyncio.coroutine
654 - def shutdown(self):
655 """ 656 This method attempts an orderly shutdown 657 @return: No return value 658 """ 659 print('Shutting down ...') 660 yield from self.send_reset() 661 yield from asyncio.sleep(1) 662 signal.alarm(1)
663 664 @asyncio.coroutine
665 - def sleep(self, sleep_time):
666 """ 667 This method is a proxy method for asyncio.sleep 668 @param sleep_time: Sleep interval in seconds 669 @return:No return value. 670 """ 671 try: 672 yield from asyncio.sleep(sleep_time) 673 except RuntimeError: 674 print('sleep exception') 675 self.shutdown()
676 677 @asyncio.coroutine
678 - def sonar_config(self, trigger_pin, echo_pin, cb=None, ping_interval=50, max_distance=200):
679 """ 680 Configure the pins,ping interval and maximum distance for an HC-SR04 type device. 681 Single pin configuration may be used. To do so, set both the trigger and echo pins to the same value. 682 Up to a maximum of 6 SONAR devices is supported 683 If the maximum is exceeded a message is sent to the console and the request is ignored. 684 NOTE: data is measured in centimeters 685 @param trigger_pin: The pin number of for the trigger (transmitter). 686 @param echo_pin: The pin number for the received echo. 687 @param cb: optional callback function to report sonar data changes 688 @param ping_interval: Minimum interval between pings. Lowest number to use is 33 ms.Max is 127 689 @param max_distance: Maximum distance in cm. Max is 200. 690 @return:No return value. 691 692 """ 693 694 # if there is an entry for the trigger pin in existence, just exit 695 if trigger_pin in self.active_sonar_map: 696 return 697 698 if max_distance > 200: 699 max_distance = 200 700 max_distance_lsb = max_distance & 0x7f 701 max_distance_msb = max_distance >> 7 702 data = [trigger_pin, echo_pin, ping_interval, max_distance_lsb, max_distance_msb] 703 self.set_pin_mode(trigger_pin, Constants.SONAR, Constants.INPUT) 704 self.set_pin_mode(echo_pin, Constants.SONAR, Constants.INPUT) 705 # update the ping data map for this pin 706 if len(self.active_sonar_map) > 6: 707 708 # if self.verbose: 709 print("sonar_config: maximum number of devices assigned - ignoring request") 710 # return 711 else: 712 self.active_sonar_map[trigger_pin] = [cb, 0] 713 714 yield from self._send_sysex(PrivateConstants.SONAR_CONFIG, data)
715 716 @asyncio.coroutine
717 - def sonar_data_retrieve(self, trigger_pin):
718 """ 719 Retrieve Ping (HC-SR04 type) data. The data is presented as a dictionary. 720 The 'key' is the trigger pin specified in sonar_config() and the 'data' is the 721 current measured distance (in centimeters) 722 for that pin. If there is no data, the value is set to None. 723 @param trigger_pin: key into sonar data map 724 @return: active_sonar_map 725 """ 726 # sonar_pin_entry = self.active_sonar_map[pin] 727 sonar_pin_entry = self.active_sonar_map.get(trigger_pin) 728 value = sonar_pin_entry[1] 729 return value
730 731 @asyncio.coroutine
732 - def stepper_config(self, steps_per_revolution, stepper_pins):
733 """ 734 Configure stepper motor prior to operation. 735 This is a FirmataPlus feature. 736 @param steps_per_revolution: number of steps per motor revolution 737 @param stepper_pins: a list of control pin numbers - either 4 or 2 738 @return:No return value. 739 740 """ 741 data = [PrivateConstants.STEPPER_CONFIGURE, steps_per_revolution & 0x7f, steps_per_revolution >> 7] 742 for pin in range(len(stepper_pins)): 743 data.append(stepper_pins[pin]) 744 yield from self._send_sysex(PrivateConstants.STEPPER_DATA, data)
745 746 @asyncio.coroutine
747 - def stepper_step(self, motor_speed, number_of_steps):
748 """ 749 Move a stepper motor for the number of steps at the specified speed 750 This is a FirmataPlus feature. 751 @param motor_speed: 21 bits of data to set motor speed 752 @param number_of_steps: 14 bits for number of steps & direction 753 positive is forward, negative is reverse 754 @return:No return value. 755 756 """ 757 if number_of_steps > 0: 758 direction = 1 759 else: 760 direction = 0 761 abs_number_of_steps = abs(number_of_steps) 762 data = [PrivateConstants.STEPPER_STEP, motor_speed & 0x7f, (motor_speed >> 7) & 0x7f, motor_speed >> 14, 763 abs_number_of_steps & 0x7f, abs_number_of_steps >> 7, direction] 764 yield from self._send_sysex(PrivateConstants.STEPPER_DATA, data)
765 766 @asyncio.coroutine
767 - def _command_dispatcher(self):
768 """ 769 This is a private method. 770 It continually accepts and interprets data coming from Firmata,and then 771 dispatches the correct handler to process the data. 772 @return: This method never returns 773 """ 774 # sysex commands are assembled into this list for processing 775 sysex = [] 776 777 while True: 778 try: 779 next_command_byte = yield from self.serial_port.read() 780 # if this is a SYSEX command, then assemble the entire command process it 781 if next_command_byte == PrivateConstants.START_SYSEX: 782 while next_command_byte != PrivateConstants.END_SYSEX: 783 yield from asyncio.sleep(self.sleep_tune) 784 next_command_byte = yield from self.serial_port.read() 785 sysex.append(next_command_byte) 786 yield from self.command_dictionary[sysex[0]](sysex) 787 sysex = [] 788 yield from asyncio.sleep(self.sleep_tune) 789 # if this is an analog message, process it. 790 elif 0xE0 <= next_command_byte < 0xEF: 791 # analog message 792 # assemble the entire analog message in command 793 command = [] 794 # get the pin number for the message 795 pin = next_command_byte & 0x0f 796 command.append(pin) 797 # get the next 2 bytes for the command 798 command = yield from self._wait_for_data(command, 2) 799 # process the analog message 800 yield from self._analog_message(command) 801 # handle the digital message 802 elif 0x90 <= next_command_byte <= 0x9F: 803 command = [] 804 pin = next_command_byte & 0x0f 805 command.append(pin) 806 command = yield from self._wait_for_data(command, 2) 807 yield from self._digital_message(command) 808 # handle all other messages by looking them up in the command dictionary 809 elif next_command_byte in self.command_dictionary: 810 yield from self.command_dictionary[next_command_byte]() 811 yield from asyncio.sleep(self.sleep_tune) 812 else: 813 # we need to yield back to the loop 814 yield from asyncio.sleep(self.sleep_tune) 815 continue 816 # yield from asyncio.sleep(self.sleep_tune) 817 except Exception as ex: 818 # should never get here 819 print(ex) 820 raise # re-raise exception.
821 822 ''' 823 Firmata message handlers 824 ''' 825 826 @asyncio.coroutine
827 - def _analog_mapping_response(self, data):
828 """ 829 This is a private message handler method. 830 It is a message handler for the analog mapping response message 831 @param data: response data 832 @return: none - but saves the response 833 """ 834 self.query_reply_data[PrivateConstants.ANALOG_MAPPING_RESPONSE] = data[1:-1]
835 836 @asyncio.coroutine
837 - def _analog_message(self, data):
838 """ 839 This is a private message handler method. 840 It is a message handler for analog messages. 841 @param data: message data 842 @return: None - but saves the data in the pins structure 843 """ 844 pin = data[0] 845 value = (data[PrivateConstants.MSB] << 7) + data[PrivateConstants.LSB] 846 # if self.analog_pins[pin].current_value != value: 847 self.analog_pins[pin].current_value = value 848 if self.analog_pins[pin].cb: 849 # append pin number to return value and return as a list 850 # self.analog_pins[pin].cb(value) 851 value = [pin, value] 852 loop = asyncio.get_event_loop() 853 loop.call_soon(self.analog_pins[pin].cb, value) 854 855 # is there a latch entry for this pin? 856 key = 'A' + str(pin) 857 if key in self.latch_map: 858 yield from self._check_latch_data(key, value[1])
859 860 @asyncio.coroutine
861 - def _capability_response(self, data):
862 """ 863 This is a private message handler method. 864 It is a message handler for capability report responses. 865 @param data: capability report 866 @return: None - but report is saved 867 """ 868 self.query_reply_data[PrivateConstants.CAPABILITY_RESPONSE] = data[1:-1]
869 870 @asyncio.coroutine
871 - def _digital_message(self, data):
872 """ 873 This is a private message handler method. 874 It is a message handler for Digital Messages. 875 @param data: digital message 876 @return: None - but update is saved in pins structure 877 """ 878 port = data[0] 879 port_data = (data[PrivateConstants.MSB] << 7) + data[PrivateConstants.LSB] 880 pin = port * 8 881 for pin in range(pin, min(pin + 8, len(self.digital_pins))): 882 self.digital_pins[pin].current_value = port_data & 0x01 883 data = [pin, self.digital_pins[pin].current_value] 884 if self.digital_pins[pin].cb: 885 self.digital_pins[pin].cb(data) 886 # is there a latch entry for this pin? 887 key = 'D' + str(pin) 888 if key in self.latch_map: 889 yield from self._check_latch_data(key, port_data & 0x01) 890 port_data >>= 1
891 892 @asyncio.coroutine
893 - def _encoder_data(self, data):
894 """ 895 This is a private message handler method. 896 It handles encoder data messages. 897 @param data: encoder data 898 @return: None - but update is saved in the digital pins structure 899 """ 900 # strip off sysex start and end 901 data = data[1:-1] 902 pin = data[0] 903 val = int((data[PrivateConstants.MSB] << 7) + data[PrivateConstants.LSB]) 904 # set value so that it shows positive and negative values 905 if val > 8192: 906 val -= 16384 907 # if this value is different that is what is already in the table store it and check for callback 908 if val != self.digital_pins[pin].current_value: 909 self.digital_pins[pin].current_value = val 910 if self.digital_pins[pin].cb: 911 self.digital_pins[pin].cb([pin, val])
912 913 @asyncio.coroutine
914 - def _i2c_reply(self, data):
915 """ 916 This is a private message handler method. 917 It handles replies to i2c_read requests. It stores the data for each i2c device 918 address in a dictionary called i2c_map. The data may be retrieved via a polling call to i2c_get_read_data(). 919 It a callback was specified in pymata.i2c_read, the raw data is sent through the callback 920 @param data: raw data returned from i2c device 921 """ 922 # remove the start and end sysex commands from the data 923 data = data[1:-1] 924 reply_data = [] 925 address = (data[0] & 0x7f) + (data[1] << 7) 926 if address in self.i2c_map: 927 for i in range(0, len(data)): 928 reply_data.append(data[i]) 929 map_entry = self.i2c_map.get(address) 930 map_entry['value'] = reply_data 931 self.i2c_map[address] = map_entry 932 cb = map_entry.get('callback') 933 if cb: 934 cb(reply_data) 935 yield from asyncio.sleep(self.sleep_tune)
936 937 @asyncio.coroutine
938 - def _pin_state_response(self, data):
939 """ 940 This is a private message handler method. 941 It handles pin state query response messages. 942 @param data: Pin state message 943 @return: None - but response is saved 944 """ 945 self.query_reply_data[PrivateConstants.PIN_STATE_RESPONSE] = data[1:-1]
946 947 @asyncio.coroutine
948 - def _report_firmware(self, sysex_data):
949 """ 950 This is a private message handler method. 951 This method handles the sysex 'report firmware' command sent by Firmata (0x79). 952 It assembles the firmware version by concatenating the major and minor version number components and 953 the firmware identifier into a string. 954 e.g. "2.3 StandardFirmata.ino" 955 @param sysex_data: Sysex data sent from Firmata 956 @return: None 957 """ 958 # first byte after command is major number 959 major = sysex_data[1] 960 version_string = str(major) 961 962 # next byte is minor number 963 minor = sysex_data[2] 964 965 # append a dot to major number 966 version_string += '.' 967 968 # append minor number 969 version_string += str(minor) 970 # add a space after the major and minor numbers 971 version_string += ' ' 972 973 # slice the identifier - from the first byte after the minor number up until, but not including 974 # the END_SYSEX byte 975 name = sysex_data[3:-1] 976 977 # convert the identifier to printable text and add each character to the version string 978 for e in name: 979 version_string += chr(e) 980 981 # store the value 982 self.query_reply_data[PrivateConstants.REPORT_FIRMWARE] = version_string
983 984 @asyncio.coroutine
985 - def _report_version(self):
986 """ 987 This is a private message handler method. 988 This method reads the following 2 bytes after the report version command (0xF9 - non sysex). 989 The first byte is the major number and the second byte is the minor number. 990 @return: None 991 """ 992 # get next two bytes 993 major = yield from self.serial_port.read() 994 version_string = str(major) 995 minor = yield from self.serial_port.read() 996 version_string += '.' 997 version_string += str(minor) 998 self.query_reply_data[PrivateConstants.REPORT_VERSION] = version_string
999 1000 @asyncio.coroutine
1001 - def _sonar_data(self, data):
1002 """ 1003 This method handles the incoming sonar data message and stores 1004 the data in the response table. 1005 @param data: Message data from Firmata 1006 @return: No return value. 1007 """ 1008 1009 # strip off sysex start and end 1010 data = data[1:-1] 1011 pin_number = data[0] 1012 val = int((data[PrivateConstants.MSB] << 7) + data[PrivateConstants.LSB]) 1013 1014 sonar_pin_entry = self.active_sonar_map[pin_number] 1015 # also write it into the digital response table 1016 # self.digital_response_table[data[self.RESPONSE_TABLE_MODE]][self.RESPONSE_TABLE_PIN_DATA_VALUE] = val 1017 # send data through callback if there is a callback function for the pin 1018 if sonar_pin_entry[0] is not None: 1019 # check if value changed since last reading 1020 if sonar_pin_entry[1] != val: 1021 sonar_pin_entry[1] = val 1022 self.active_sonar_map[pin_number] = sonar_pin_entry 1023 # Do a callback if one is specified in the table 1024 if sonar_pin_entry[0]: 1025 sonar_pin_entry[0]([pin_number, val]) 1026 # update the data in the table with latest value 1027 # sonar_pin_entry[1] = val 1028 self.active_sonar_map[pin_number] = sonar_pin_entry 1029 1030 yield from asyncio.sleep(self.sleep_tune)
1031 1032 @asyncio.coroutine
1033 - def _string_data(self, data):
1034 """ 1035 This is a private message handler method. 1036 It is the message handler for String data messages that will be printed to the console. 1037 @param data: message 1038 @return: None - message is sent to console 1039 """ 1040 reply = '' 1041 data = data[1:-1] 1042 for x in data: 1043 reply_data = x 1044 if reply_data: 1045 reply += chr(reply_data) 1046 print(reply)
1047 1048 ''' 1049 utilities 1050 ''' 1051 1052 @asyncio.coroutine
1053 - def _check_latch_data(self, key, data):
1054 """ 1055 This is a private utility method. 1056 When a data change message is received this method checks to see if latching needs to be processed 1057 @param key: encoded pin number 1058 @param data: data change 1059 @return: None 1060 """ 1061 process = False 1062 latching_entry = self.latch_map.get(key) 1063 if latching_entry[Constants.LATCH_STATE] == Constants.LATCH_ARMED: 1064 # Has the latching criteria been met 1065 if latching_entry[Constants.LATCHED_THRESHOLD_TYPE] == Constants.LATCH_EQ: 1066 if data == latching_entry[Constants.LATCH_DATA_TARGET]: 1067 process = True 1068 elif latching_entry[Constants.LATCHED_THRESHOLD_TYPE] == Constants.LATCH_GT: 1069 if data > latching_entry[Constants.LATCH_DATA_TARGET]: 1070 process = True 1071 elif latching_entry[Constants.LATCHED_THRESHOLD_TYPE] == Constants.LATCH_GTE: 1072 if data >= latching_entry[Constants.LATCH_DATA_TARGET]: 1073 process = True 1074 elif latching_entry[Constants.LATCHED_THRESHOLD_TYPE] == Constants.LATCH_LT: 1075 if data < latching_entry[Constants.LATCH_DATA_TARGET]: 1076 process = True 1077 elif latching_entry[Constants.LATCHED_THRESHOLD_TYPE] == Constants.LATCH_LTE: 1078 if data <= latching_entry[Constants.LATCH_DATA_TARGET]: 1079 process = True 1080 if process: 1081 latching_entry[Constants.LATCHED_DATA] = data 1082 yield from self._process_latching(key, latching_entry)
1083 1084 # noinspection PyMethodMayBeStatic
1085 - def _discover_port(self):
1086 """ 1087 This is a private utility method. 1088 This method attempts to discover the com port that the arduino is connected to. 1089 @return: Detected Comport 1090 """ 1091 locations = ['dev/ttyACM0', '/dev/ttyACM0', '/dev/ttyACM1', '/dev/ttyACM2', '/dev/ttyACM3', '/dev/ttyACM4', 1092 '/dev/ttyACM5', '/dev/ttyUSB0', '/dev/ttyUSB1', '/dev/ttyUSB2', '/dev/ttyUSB3', '/dev/ttyUSB4', 1093 '/dev/ttyUSB5', '/dev/ttyUSB6', '/dev/ttyUSB7', '/dev/ttyUSB8', '/dev/ttyUSB9', '/dev/ttyUSB10', 1094 '/dev/ttyS0', '/dev/ttyS1', '/dev/ttyS2', 'com2', 'com3', 'com4', 'com5', 'com6', 'com7', 'com8', 1095 'com9', 'com10', 'com11', 'com12', 'com13', 'com14', 'com15', 'com16', 'com17', 'com18', 'com19', 1096 'com20', 'com21', 'com1', 'end' 1097 ] 1098 detected = None 1099 for device in locations: 1100 try: 1101 serialport = serial.Serial(device, 57600, timeout=0) 1102 detected = device 1103 serialport.close() 1104 break 1105 except serial.SerialException: 1106 if device == 'end': 1107 print('Unable to find Serial Port, Please plug in cable or check cable connections.') 1108 detected = None 1109 exit() 1110 print('{}{}\n'.format("Using COM Port:", detected)) 1111 return detected
1112 1113 # noinspection PyMethodMayBeStatic
1114 - def _format_capability_report(self, data):
1115 """ 1116 This is a private utility method. 1117 This method formats a capability report if the user wishes to send it to the console 1118 @param data: Capability report 1119 @return: None 1120 """ 1121 pin_modes = {0: 'Digital_Input', 1: 'Digital_Output', 2: 'Analog', 3: 'PWM', 4: 'Servo', 1122 5: 'Shift', 6: 'I2C', 7: 'One Wire', 8: 'Stepper', 9: 'Encoder'} 1123 x = 0 1124 pin = 0 1125 1126 print('\nCapability Report') 1127 print('-----------------\n') 1128 while x < len(data): 1129 # get index of next end marker 1130 print('{} {}{}'.format('Pin', str(pin), ':')) 1131 while data[x] != 127: 1132 mode_str = "" 1133 pin_mode = pin_modes.get(data[x]) 1134 mode_str += pin_mode 1135 x += 1 1136 bits = data[x] 1137 print('{:>5}{}{} {}'.format(' ', mode_str, ':', bits)) 1138 x += 1 1139 x += 1 1140 pin += 1
1141 1142 @asyncio.coroutine
1143 - def _process_latching(self, key, latching_entry):
1144 """ 1145 This is a private utility method. 1146 This method process latching events and either returns them via callback or stores them in the latch map 1147 @param key: Encoded pin 1148 @param latching_entry: a latch table entry 1149 @return: Callback or store data in latch map 1150 """ 1151 if latching_entry[Constants.LATCH_CALLBACK]: 1152 # auto clear entry and execute the callback 1153 1154 latching_entry[Constants.LATCH_CALLBACK]([key, 1155 latching_entry[Constants.LATCHED_DATA], 1156 time.time()]) 1157 self.latch_map[key] = [0, 0, 0, 0, 0, None] 1158 else: 1159 updated_latch_entry = latching_entry 1160 updated_latch_entry[Constants.LATCH_STATE] = Constants.LATCH_LATCHED 1161 updated_latch_entry[Constants.LATCHED_DATA] = latching_entry[Constants.LATCHED_DATA] 1162 # time stamp it 1163 updated_latch_entry[Constants.LATCHED_TIME_STAMP] = time.time() 1164 self.latch_map[key] = updated_latch_entry
1165 1166 @asyncio.coroutine
1167 - def _send_command(self, command):
1168 """ 1169 This is a private utility method. 1170 The method sends a non-sysex command to Firmata. 1171 @param command: command data 1172 @return: length of data sent 1173 """ 1174 send_message = "" 1175 for i in command: 1176 send_message += chr(i) 1177 result = None 1178 for data in send_message: 1179 try: 1180 result = yield from self.serial_port.write(data) 1181 except(): 1182 print('cannot send command') 1183 return result
1184 1185 @asyncio.coroutine
1186 - def _send_sysex(self, sysex_command, sysex_data=None):
1187 """ 1188 This is a private utility method. 1189 This method sends a sysex command to Firmata. 1190 1191 @param sysex_command: sysex command 1192 @param sysex_data: data for command 1193 @return : No return value. 1194 """ 1195 if not sysex_data: 1196 sysex_data = [] 1197 1198 # convert the message command and data to characters 1199 sysex_message = chr(PrivateConstants.START_SYSEX) 1200 sysex_message += chr(sysex_command) 1201 if len(sysex_data): 1202 for d in sysex_data: 1203 sysex_message += chr(d) 1204 sysex_message += chr(PrivateConstants.END_SYSEX) 1205 1206 for data in sysex_message: 1207 yield from self.serial_port.write(data)
1208 1209 @asyncio.coroutine
1210 - def _wait_for_data(self, current_command, number_of_bytes):
1211 """ 1212 This is a private utility method. 1213 This method accumulates the requested number of bytes and then returns the full command 1214 @param current_command: command id 1215 @param number_of_bytes: how many bytes to wait for 1216 @return: command 1217 """ 1218 while number_of_bytes: 1219 next_command_byte = yield from self.serial_port.read() 1220 current_command.append(next_command_byte) 1221 number_of_bytes -= 1 1222 yield from asyncio.sleep(self.sleep_tune) 1223 return current_command
1224
1225 1226 # noinspection PyUnusedLocal 1227 -def _signal_handler(the_signal, frame):
1228 """ 1229 The 'Control-C' handler 1230 @param the_signal: signal 1231 @param frame: not used 1232 @return: never returns. 1233 """ 1234 print('You pressed Ctrl+C!') 1235 # to get coverage data or profiling data the code using pymata_iot, uncomment out the following line 1236 # exit() 1237 try: 1238 loop = asyncio.get_event_loop() 1239 for t in asyncio.Task.all_tasks(loop): 1240 t.cancel() 1241 loop.run_until_complete(asyncio.sleep(.01)) 1242 loop.close() 1243 loop.stop() 1244 # os._exit(0) 1245 sys.exit(0) 1246 except RuntimeError: 1247 # this suppresses the Event Loop Is Running message, which may be a bug in python 3.4.3 1248 # os._exit(1) 1249 sys.exit(1)
1250 1251 1252 signal.signal(signal.SIGINT, _signal_handler) 1253 signal.signal(signal.SIGTERM, _signal_handler) 1254 signal.signal(signal.SIGALRM, _signal_handler) 1255