Previous part:
2. Reading data form a ‘dumb’ thermostat and various temperature sensors (Arduino)
3. Sending data, at 1,000 values per second, to a Raspberry PI (Python)
3.1 1,000 values per second
When I started the project I had no idea at what resolution I had to collect data in order to analyze the workings of a normal thermostat. In my job I had worked on a project that involved sensor data from industrial equipment installed in a natural gas field. In this project collecting data from analogue sensors presented a problem because of the huge amount of values per second that had to be stored and processed. So for my own thermostat project I wanted to collect sensor information at the sub-second level as well.
3.2 In memory database on a Raspberry PI
I set my goal on collecting one value every millisecond. In this scenario the Arduino would send 1,000 values per second to the Raspberry PI. That meant the pi would have to be able to ingest and process at at least the same rate.
For storing data on the Raspberry PI I wanted to use a database, mostly because I am familiar with these and that it makes averaging data very simple. I was very disappointed when I read all the performance reviews of using a common database system like MySQL or PostgreSQL on the Raspberry PI. My initial experimentation with SQLite showed it also would not meet the required performance requirements. That changed when I stored the SQLite database in memory. This more than allowed for ingesting and processing the sensor values at 1,000 per second. I wanted to use a cloud server for permanent storage, so the Raspberry PI did not have to store the data for a long period of time.
Storing the sensor data in a in-memory database requires different processes accessing that same in memory database. The only method I got working (in Python) used a multi-threaded Python script accessing the in-memory database. All threads have to be started from a single Python script for all the processes to be able access the same database. I used Robert Binns’ Another Python SQLite Wrapper (APSW) to support multithreading.
3.3 Datamodel sensor information Rapsberry PI database
The datamodel of the database is a key value store. In addition the key is split in different fields to allow indexing of different parts of the key. The key design is discussed in the next part. Values are stored as 64bit integers.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import apsw #alternative python sqlite wrapper | |
# sqllite settings | |
vuri = ':memory:' | |
dbc = apsw.Connection(vuri) | |
def dbstart(): | |
""" | |
Initial tables | |
Creates required in memmory tables and indexes. | |
Runs once when script is started. | |
""" | |
try: | |
db1 = dbc.cursor() | |
db1.execute("DROP TABLE IF EXISTS sensvals") | |
db1.execute("CREATE TABLE sensvals(vkey TEXT, vepoch INT, vsub INT, vsource TEXT, vport TEXT, vprekey TEXT, vvalue INT)") | |
db1.execute("CREATE UNIQUE INDEX isens1 on sensvals(vkey)") | |
db1.execute("CREATE INDEX isens2 on sensvals(vepoch)") | |
db1.execute("CREATE INDEX isens3 on sensvals(vsource)") | |
db1.execute("CREATE INDEX isens4 on sensvals(vport)") | |
db1.execute("CREATE INDEX isens5 on sensvals(vprekey)") | |
except Exception: | |
logging.exception("dbstart") |
3.4 Connecting the XBee to the Raspberry PI
The schematic and breadboard layout of the Arduino and XBee is already discussed in the previous chapter. Connecting the XBee to the Raspberry PI is explained in Michael Bouvy’s excellent blog post Raspberry PI + XBee: UART / Serial howto http://michael.bouvy.net/blog/en/2013/04/02/raspberry-pi-xbee-uart-serial-howto/ .
3.5 XBee Arduino code
Both the Arduino’s and Raspberry PI’s XBee run in API mode. This allows bi-directional data transfer and makes future extensions possible because it can handle more than two XBee nodes in the network. Using the XBee’s in API mode makes is more difficult to program.
On the Arduino side I followed the blog post of Desert Home on sending data using an Xbee 2 in API mode http://www.desert-home.com/2013/02/using-xbee-library-part-3.html . Immediately I found out that the XBee 2 is not capable of sending 1,000 values per second and that it requires a pause between the sends. Because I was sending different sensor values I made a carousel that sends the value of a different sensor at every send interval. All sensors have an identifier starting with A followed by a two digit integer. The Arduino code is shown below. The carousel sends the sensor id and the value to a function that actually sends the value to the Raspberry PI through the XBee.
The code below shows the used Arduino program. I did not have stability issues, but when evaluating the code, several problems popped up. First I believe you are not supposed to use character arrays in combination with functions. Second the variable ‘previousMillisSend’ is a long whereas the variable ‘currentMillis’ is an unsigned long. This probably causes problems when the previousMillisSend rolls over and the currentMillis does not.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <XBee.h> | |
#include <LiquidCrystal.h> | |
#include <SoftwareSerial.h> | |
// create a software serial port for the XBee | |
SoftwareSerial Serial3(6, 7); | |
//XBee objects and constants | |
XBee xbee = XBee(); | |
XBeeAddress64 addr64 = XBeeAddress64(0x0013A200, 0x40B7FB33); //XBee adress of XBee connected to Raspberry PI | |
// LCD Connection & screen size | |
LiquidCrystal lcd(12, 11, 5, 4, 3, 2); | |
#define WIDTH 16 | |
… | |
//char varaibles for XBseend | |
char sendValue[10]; | |
//variable for carousell deciding which sensor value to send over xbee to raspberry pi | |
int sendp = 0; | |
long previousMillisSend = 0; //previous send | |
long intervalSend = 100; //interval between xbee sends | |
… | |
void setup() { | |
… | |
// start display | |
pinMode(controllPin, OUTPUT); | |
lcd.begin(WIDTH, 2); | |
lcd.clear(); | |
lcd.print("Smart Therm"); | |
lcd.setCursor(0,1); | |
lcd.print("v1.00"); | |
delay(1000); | |
//Start XBee software serial | |
Serial3.begin(9600); | |
xbee.setSerial(Serial3); | |
} | |
void loop() { | |
unsigned long currentMillis = millis(); | |
… | |
//carousell to send one sensor value over xbee each send interval | |
if(currentMillis – previousMillisSend > intervalSend) { | |
previousMillisSend = currentMillis; | |
if(sendp==0) { | |
fxbesend("A01_",tempValue); | |
} else if(sendp==1) { | |
fxbesend("A02_",sensorValue); | |
} else if(sendp==2) { | |
fxbesend("A03_",boilerValue); | |
} else if(sendp==3) { | |
fxbesend("A04_",thermistorValue); | |
} else if(sendp==4) { | |
fxbesend("A05_",thermistor2Value); | |
} else if(sendp==5) { | |
fxbesend("A07_",boilerStat); | |
} | |
if(sendp<5) { | |
sendp = sendp + 1; | |
} else { | |
sendp = 0; | |
} | |
} | |
} | |
//XBee send function | |
void fxbesend(char fport[4],int fvalue ) { | |
sprintf(sendValue,"%s%i",fport,fvalue); | |
ZBTxRequest zbtxt = ZBTxRequest(addr64, (uint8_t *)sendValue, strlen(sendValue)); | |
xbee.send(zbtxt); | |
} |
3.6 Stability issues when sending more than 20 values per second
The code above also highlights another issue. I was never able to send more than 20 values per second without stability issues. When the Arduino was both receiving and sending data from and to the Raspberry PI, I had to increase the interval to 100ms to prevent crashes. The XBee 2 manual states that 50hz is the maximum sample rate so it is possible that this is not a coincidence.
To see how far I could stress the amount of values sent per second, I included the possibility to handle an array of values on the Raspberry PI side. The payload of the XBee can be a maximum of 72 bytes. Subtracting 4 bytes for the sensor id leaves 68 bytes for sensor values. Every sensor value has a maximum of 4 digits. Including a comma as separation character about 12 values could be sent in each payload. The Arduino code to actually send an array of values, as shown below, was not implemented until recently and has not been running stable, most likely due to the issues discussed above, especially the use of character arrays in combination with functions.
3.7 XBee Raspberry PI Python Code
At the Raspberry PI side the python code was relatively easy to implement. The basis is a thread that receives the data from the XBee and puts it in the in-memory database for further processing by other threads. By default the function receiving the data takes a regular sensor payload from an XBee. A special case is made for receiving data from the serial port of the other XBee. In this case it is assumed the other XBee is connected to an Arduino and the serial port is not the actual sensor identifier. Instead the first 3 characters of the payload, as explained it the Arduino part, are added to the sensor identifier and the rest of the payload is seen as an array of values.
For inserting the values in the database I made an insert function that correctly sets the different parts of the key and finally the key itself, as will be explained in the next part Storing data in the Amazon Cloud.
A second thread creates a rolling average for each of the sensor values. Every second it calculates the average measured temperature of the last 20 seconds. This value is also inserted into the main sensor value table so that the average values are also uploaded to the cloud server. A third thread runs every 5 seconds and deletes all entries in the sensor table older than 2 minutes, keeping two minutes of data on the Raspberry PI.
Thread 1: receiving sensor data and inserting in database
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import threading | |
from xbee import ZigBee #xbee 2 | |
import apsw #alternative python sqlite wrapper | |
import serial | |
import struct | |
import codecs | |
import sys | |
import time | |
import datetime | |
import logging | |
… | |
#basic logging information change to log file | |
logging.basicConfig(filename='/usr/local/tempniek/18log.log',level=logging.INFO) | |
logging.info('I told you so') | |
try: | |
#xbee connection | |
ser = serial.Serial('/dev/ttyAMA0', 9600, timeout=5) | |
xbee = ZigBee(ser,escaped=True) | |
#classes used for multithreading | |
#(one class for every function that runs as a separate thread) | |
class myThreadInsert (threading.Thread): | |
def __init__(self): | |
threading.Thread.__init__(self) | |
def run(self): | |
try: | |
xinsert() | |
except Exception: | |
logging.exception("xinsert") | |
… | |
def fins(fvcurtime,fvsource,fvid,fvdata): | |
""" | |
This function inserts data into the sensvals table. | |
Sets rowkey used by HBASE and seperate key parts for indexing. | |
Key format: <device id>_<port id>_<reverse epoch>_<reverse milliseconds>. | |
For example: 40b5af01_rx000A01_8571346790_9184822. | |
Used by xinsert() function. | |
""" | |
bvepoch = 9999999999 | |
bvsub = 9999999 | |
fvepoch = int(fvcurtime) #remove sub seconds from epoch | |
fkepoch = bvepoch – fvepoch | |
fvsub = bvsub – int(datetime.datetime.fromtimestamp(fvcurtime).strftime('%f')) #only milisecond part from epoch | |
fvprekey = fvsource + '_' + fvid + '_' | |
fvkey = fvprekey + str(fkepoch) + '_' + str(fvsub) | |
dbi = dbc.cursor() | |
dbi.execute("INSERT INTO sensvals (vkey,vepoch,vsub,vsource,vport,vprekey,vvalue) VALUES(?,?,?,?,?,?,?)",(fvkey,fvepoch,fvsub,fvsource,fvid,fvprekey,fvdata)) | |
dbi.close() | |
def xinsert(): | |
""" | |
Receives and sends data from and to the XBee. | |
Sending and receiving combined in one thread because only one tread can use GPIO port. | |
Recieving is done as fast as possible. | |
Sinding only at 1 second interval. | |
Runs as a sperate thread. | |
""" | |
ficurtime = 0 | |
fivepoch = 0 | |
while True: | |
logging.debug("insert started") | |
logging.debug(datetime.datetime.utcnow()) | |
try: | |
#start with receive part | |
ficurtime = time.time() | |
fivepoch = int(ficurtime) | |
fimintime = fivepoch – 5 | |
response = xbee.wait_read_frame() | |
logging.debug(response) | |
vid = (response['id']) | |
if vid != 'tx_status': | |
vsource = (codecs.decode(codecs.encode(response['source_addr_long'],'hex'),'utf-8')[8:]) | |
logging.debug(vsource) | |
logging.debug(vid) | |
vdata = codecs.decode(response['rf_data'],'utf-8') | |
logging.debug(vdata) | |
if vid == 'rx': #special case if port is rx. Assumes arduino is sending data. | |
vid = 'rx'+(vdata[0:3].zfill(6)) #first part of payload is sendor ID | |
vdata = vdata[4:] | |
vdatas = vdata.split(',') #assumes array of values | |
for vdatasv in vdatas: | |
vdatasv = int(vdatasv) | |
fins(time.time(),vsource,vid,vdatasv) #use fins() function to actuall insert data in database | |
else: #case of normal xbee payload | |
vid = vid.zfill(8) | |
vdata = int(vdata) | |
fins(time.time(),vsource,vid,vdata) #use fins() function to actuall insert data in database | |
except Exception: | |
logging.exception("fxinsert") | |
time.sleep(0.001) | |
def fmain(): | |
""" | |
Main thread function. | |
""" | |
try: | |
time.sleep(1) | |
logging.info("hallostart") | |
dbstart() #start initial SQLite | |
hbasestart() #start initial HBase | |
time.sleep(3) | |
#start the functions that run as seperate threads | |
threadInsert = myThreadInsert() | |
threadInsert.start() | |
… | |
except Exception: | |
logging.exception("fmain") | |
if __name__=="__main__": | |
fmain() | |
except Exception: | |
logging.exception("main") |
Thread 2: creating rolling average
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def fcurrent(): | |
""" | |
Caclulcates roling avarages for diferent sensor values. | |
Inserts avarage balues in senscur tables and as new values in sensvals tables. | |
Runs as a sperate thread. | |
""" | |
time.sleep(2) | |
while True: | |
try: | |
logging.debug("current") | |
fvcurtime = time.time() | |
logging.debug(fvcurtime) | |
fvepoch = int(fvcurtime) | |
fminvepochtemp = fvepoch – 20 | |
fminvepochset = fvepoch – 1 | |
cursource = '40b5af01' #id of Rapsberry PI device | |
dbd3 = dbc.cursor() | |
dbd3.execute("DELETE FROM senscur") | |
dbd3.execute("INSERT INTO senscur (vepoch, vsource, vport, vvalue) SELECT MAX(vepoch), vsource, vport, round(AVG(vvalue)) FROM sensvals WHERE vepoch BETWEEN %i and %i AND vprekey in ('40b5af00_rx000A01_','40b5af00_rx000A04_','40b5af00_rx000A05_') GROUP BY vsource,vport" % (fminvepochtemp,fvepoch)) | |
dbd3.execute("INSERT INTO senscur (vepoch, vsource, vport, vvalue) SELECT MAX(vepoch), vsource, vport, round(AVG(vvalue)) FROM sensvals WHERE vepoch BETWEEN %i and %i AND vprekey in ('40b5af00_rx000A02_') GROUP BY vsource,vport" % (fminvepochset,fvepoch)) | |
dbd3.execute("INSERT INTO senscur (vepoch, vsource, vport, vvalue) SELECT vepoch, vsource, vport, vvalue FROM sensvals WHERE vepoch = %i AND vprekey in ('40b5af00_rx000A03_','40b5af00_rx000A07_') ORDER BY vsub DESC LIMIT 1" % (fvepoch)) | |
dbd3.execute("SELECT * FROM senscur") | |
rows = dbd3.fetchall() | |
for row in rows: | |
fins(fvcurtime,cursource,row[2],row[3]) | |
logging.debug(row) | |
dbd3.close() | |
except Exception: | |
logging.exception("fcurrent") | |
time.sleep(1) |
Thread 3: deleting old data
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def fdel(): | |
""" | |
Deletes old data from local tables at Raspberry PI. | |
Runs as a sperate thread. | |
""" | |
while True: | |
time.sleep(5) | |
try: | |
logging.debug("del started") | |
fvcurtime = time.time() | |
fdvepoch = int(fvcurtime) – 120 #keep sensor values for two minues | |
dbd = dbc.cursor() | |
dbd.execute("DELETE FROM sensvals WHERE vepoch < %i" % fdvepoch) | |
dbd.close() | |
except Exception: | |
logging.exception("fdel") |
Next part:
4. Storing data in the Amazon Cloud (HBase)
Reblogged this on Dinesh Ram Kali..
Pingback: Enabling technologies: how to build your own NEST | SmartDomus