Package pymata_aio :: Module pymata_serial
[hide private]
[frames] | no frames]

Source Code for Module pymata_aio.pymata_serial

  1  # -*- coding: utf-8 -*- 
  2  """ 
  3  Copyright (c) 2015 Alan Yorinks All rights reserved. 
  4   
  5  This program is free software; you can redistribute it and/or 
  6  modify it under the terms of the GNU General Public 
  7  License as published by the Free Software Foundation; either 
  8  version 3 of the License, or (at your option) any later version. 
  9   
 10  This library is distributed in the hope that it will be useful, 
 11  but WITHOUT ANY WARRANTY; without even the implied warranty of 
 12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
 13  General Public License for more details. 
 14   
 15  You should have received a copy of the GNU Lesser General Public 
 16  License along with this library; if not, write to the Free Software 
 17  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA 
 18  """ 
 19  import asyncio 
 20  import sys 
 21   
 22  import serial 
23 24 25 -class PymataSerial:
26 """ 27 This class encapsulates management of the serial port that communicates with the Arduino Firmata 28 It provides a 'futures' interface to make Pyserial compatible with asyncio 29 """ 30
31 - def __init__(self, com_port='/dev/ttyACM0', speed=57600, sleep_tune=.001):
32 """ 33 This is the constructor for the aio serial handler 34 :param com_port: Com port designator 35 :param speed: baud rate 36 :return: None 37 """ 38 print('Initializing Arduino - Please wait...', end=" ") 39 sys.stdout.flush() 40 self.my_serial = serial.Serial(com_port, speed, timeout=1, writeTimeout=1) 41 self.com_port = com_port 42 self.sleep_tune = sleep_tune
43
44 - def get_serial(self):
45 """ 46 This method returns a reference to the serial port in case the user wants to call pyserial methods directly 47 :return: pyserial instance 48 """ 49 return self.my_serial
50 51 @asyncio.coroutine
52 - def write(self, data):
53 """ 54 This is an asyncio adapted version of pyserial write. It provides a non-blocking write and returns the 55 number of bytes written upon completion 56 :param data: Data to be written 57 :return: Number of bytes written 58 """ 59 # the secret sauce - it is in your future 60 future = asyncio.Future() 61 62 try: 63 result = self.my_serial.write(bytes([ord(data)])) 64 except serial.SerialException: 65 print('Write exception') 66 loop = asyncio.get_event_loop() 67 for t in asyncio.Task.all_tasks(loop): 68 t.cancel() 69 loop.run_until_complete(asyncio.sleep(.1)) 70 loop.stop() 71 loop.close() 72 self.my_serial.close() 73 sys.exit(0) 74 75 future.set_result(result) 76 while True: 77 if not future.done(): 78 # spin our asyncio wheels until future completes 79 asyncio.sleep(self.sleep_tune) 80 else: 81 return future.result()
82 83 @asyncio.coroutine
84 - def readline(self):
85 """ 86 This is an asyncio adapted version of pyserial read. It provides a non-blocking read and returns a line of 87 data read. 88 :return: A line of data 89 """ 90 future = asyncio.Future() 91 data_available = False 92 while True: 93 if not data_available: 94 if not self.my_serial.inWaiting(): 95 asyncio.sleep(self.sleep_tune) 96 else: 97 data_available = True 98 data = self.my_serial.readline() 99 future.set_result(data) 100 else: 101 if not future.done(): 102 yield from asyncio.sleep(self.sleep_tune) 103 else: 104 return future.result()
105 106 @asyncio.coroutine
107 - def read(self):
108 """ 109 This is an asyncio adapted version of pyserial read that provides non-blocking read. 110 :return: One character 111 """ 112 113 # create an asyncio Future 114 future = asyncio.Future() 115 116 # create a flag to indicate when data becomes available 117 data_available = False 118 119 # wait for a character to become available and read from the serial port 120 while True: 121 if not data_available: 122 # test to see if a character is waiting to be read. 123 # if not, relinquish control back to the event loop through the short sleep 124 if not self.my_serial.inWaiting(): 125 yield from asyncio.sleep(self.sleep_tune) 126 # data is available. 127 # set the flag to true so that the future can "wait" until the 128 # read is completed. 129 else: 130 data_available = True 131 data = self.my_serial.read() 132 # set future result to make the character available 133 future.set_result(ord(data)) 134 else: 135 # wait for the future to complete 136 if not future.done(): 137 yield from asyncio.sleep(self.sleep_tune) 138 else: 139 # future is done, so return the character 140 return future.result()
141 142 @asyncio.coroutine
143 - def close(self):
144 """ 145 This is an asyncio adapted version of pyserial read. It provides a non-blocking read and returns a line of 146 data read. 147 :return: A line of data 148 """ 149 # future = asyncio.Future() 150 self.my_serial.close()
151