Making your own smart ‘machine learning’ thermostat using Arduino, AWS, HBase, Spark, Raspberry PI and XBee

Previous part:
2. Reading data form a ‘dumb’ thermostat and various temperature sensors (Arduino)

3. Sending data, at 1,000 values per second, to a Raspberry PI (Python)

3.1 1,000 values per second

When I started the project I had no idea at what resolution I had to collect data in order to analyze the workings of a normal thermostat. In my job I had worked on a project that involved sensor data from industrial equipment installed in a natural gas field. In this project collecting data from analogue sensors presented a problem because of the huge amount of values per second that had to be stored and processed. So for my own thermostat project I wanted to collect sensor information at the sub-second level as well.

3.2 In memory database on a Raspberry PI

I set my goal on collecting one value every millisecond. In this scenario the Arduino would send 1,000 values per second to the Raspberry PI. That meant the pi would have to be able to ingest and process at at least the same rate.

For storing data on the Raspberry PI I wanted to use a database, mostly because I am familiar with these and that it makes averaging data very simple. I was very disappointed when I read all the performance reviews of using a common database system like MySQL or PostgreSQL on the Raspberry PI. My initial experimentation with SQLite showed it also would not meet the required performance requirements. That changed when I stored the SQLite database in memory. This more than allowed for ingesting and processing the sensor values at 1,000 per second. I wanted to use a cloud server for permanent storage, so the Raspberry PI did not have to store the data for a long period of time.

Storing the sensor data in a in-memory database requires different processes accessing that same in memory database. The only method I got working (in Python) used a multi-threaded Python script accessing the in-memory database. All threads have to be started from a single Python script for all the processes to be able access the same database. I used Robert Binns’ Another Python SQLite Wrapper (APSW) to support multithreading.

3.3 Datamodel sensor information Rapsberry PI database

The datamodel of the database is a key value store. In addition the key is split in different fields to allow indexing of different parts of the key. The key design is discussed in the next part. Values are stored as 64bit integers.


import apsw #alternative python sqlite wrapper
# sqllite settings
vuri = ':memory:'
dbc = apsw.Connection(vuri)
def dbstart():
"""
Initial tables
Creates required in memmory tables and indexes.
Runs once when script is started.
"""
try:
db1 = dbc.cursor()
db1.execute("DROP TABLE IF EXISTS sensvals")
db1.execute("CREATE TABLE sensvals(vkey TEXT, vepoch INT, vsub INT, vsource TEXT, vport TEXT, vprekey TEXT, vvalue INT)")
db1.execute("CREATE UNIQUE INDEX isens1 on sensvals(vkey)")
db1.execute("CREATE INDEX isens2 on sensvals(vepoch)")
db1.execute("CREATE INDEX isens3 on sensvals(vsource)")
db1.execute("CREATE INDEX isens4 on sensvals(vport)")
db1.execute("CREATE INDEX isens5 on sensvals(vprekey)")
except Exception:
logging.exception("dbstart")

3.4 Connecting the XBee to the Raspberry PI

The schematic and breadboard layout of the Arduino and XBee is already discussed in the previous chapter. Connecting the XBee to the Raspberry PI is explained in Michael Bouvy’s excellent blog post Raspberry PI + XBee: UART / Serial howto http://michael.bouvy.net/blog/en/2013/04/02/raspberry-pi-xbee-uart-serial-howto/ .

3.5 XBee Arduino code

Both the Arduino’s and Raspberry PI’s XBee run in API mode. This allows bi-directional data transfer and makes future extensions possible because it can handle more than two XBee nodes in the network. Using the XBee’s in API mode makes is more difficult to program.

On the Arduino side I followed the blog post of Desert Home on sending data using an Xbee 2 in API mode http://www.desert-home.com/2013/02/using-xbee-library-part-3.html . Immediately I found out that the XBee 2 is not capable of sending 1,000 values per second and that it requires a pause between the sends. Because I was sending different sensor values I made a carousel that sends the value of a different sensor at every send interval. All sensors have an identifier starting with A followed by a two digit integer. The Arduino code is shown below. The carousel sends the sensor id and the value to a function that actually sends the value to the Raspberry PI through the XBee.

The code below shows the used Arduino program. I did not have stability issues, but when evaluating the code, several problems popped up. First I believe you are not supposed to use character arrays in combination with functions. Second the variable ‘previousMillisSend’ is a long whereas the variable ‘currentMillis’ is an unsigned long. This probably causes problems when the previousMillisSend rolls over and the currentMillis does not.


#include <XBee.h>
#include <LiquidCrystal.h>
#include <SoftwareSerial.h>
// create a software serial port for the XBee
SoftwareSerial Serial3(6, 7);
//XBee objects and constants
XBee xbee = XBee();
XBeeAddress64 addr64 = XBeeAddress64(0x0013A200, 0x40B7FB33); //XBee adress of XBee connected to Raspberry PI
// LCD Connection & screen size
LiquidCrystal lcd(12, 11, 5, 4, 3, 2);
#define WIDTH 16
//char varaibles for XBseend
char sendValue[10];
//variable for carousell deciding which sensor value to send over xbee to raspberry pi
int sendp = 0;
long previousMillisSend = 0; //previous send
long intervalSend = 100; //interval between xbee sends
void setup() {
// start display
pinMode(controllPin, OUTPUT);
lcd.begin(WIDTH, 2);
lcd.clear();
lcd.print("Smart Therm");
lcd.setCursor(0,1);
lcd.print("v1.00");
delay(1000);
//Start XBee software serial
Serial3.begin(9600);
xbee.setSerial(Serial3);
}
void loop() {
unsigned long currentMillis = millis();
//carousell to send one sensor value over xbee each send interval
if(currentMillis – previousMillisSend > intervalSend) {
previousMillisSend = currentMillis;
if(sendp==0) {
fxbesend("A01_",tempValue);
} else if(sendp==1) {
fxbesend("A02_",sensorValue);
} else if(sendp==2) {
fxbesend("A03_",boilerValue);
} else if(sendp==3) {
fxbesend("A04_",thermistorValue);
} else if(sendp==4) {
fxbesend("A05_",thermistor2Value);
} else if(sendp==5) {
fxbesend("A07_",boilerStat);
}
if(sendp<5) {
sendp = sendp + 1;
} else {
sendp = 0;
}
}
}
//XBee send function
void fxbesend(char fport[4],int fvalue ) {
sprintf(sendValue,"%s%i",fport,fvalue);
ZBTxRequest zbtxt = ZBTxRequest(addr64, (uint8_t *)sendValue, strlen(sendValue));
xbee.send(zbtxt);
}

3.6 Stability issues when sending more than 20 values per second

The code above also highlights another issue. I was never able to send more than 20 values per second without stability issues. When the Arduino was both receiving and sending data from and to the Raspberry PI, I had to increase the interval to 100ms to prevent crashes. The XBee 2 manual states that 50hz is the maximum sample rate so it is possible that this is not a coincidence.

To see how far I could stress the amount of values sent per second, I included the possibility to handle an array of values on the Raspberry PI side. The payload of the XBee can be a maximum of 72 bytes. Subtracting 4 bytes for the sensor id leaves 68 bytes for sensor values. Every sensor value has a maximum of 4 digits. Including a comma as separation character about 12 values could be sent in each payload. The Arduino code to actually send an array of values, as shown below, was not implemented until recently and has not been running stable, most likely due to the issues discussed above, especially the use of character arrays in combination with functions.

3.7 XBee Raspberry PI Python Code

At the Raspberry PI side the python code was relatively easy to implement. The basis is a thread that receives the data from the XBee and puts it in the in-memory database for further processing by other threads. By default the function receiving the data takes a regular sensor payload from an XBee. A special case is made for receiving data from the serial port of the other XBee. In this case it is assumed the other XBee is connected to an Arduino and the serial port is not the actual sensor identifier. Instead the first 3 characters of the payload, as explained it the Arduino part, are added to the sensor identifier and the rest of the payload is seen as an array of values.

For inserting the values in the database I made an insert function that correctly sets the different parts of the key and finally the key itself, as will be explained in the next part Storing data in the Amazon Cloud.

A second thread creates a rolling average for each of the sensor values. Every second it calculates the average measured temperature of the last 20 seconds. This value is also inserted into the main sensor value table so that the average values are also uploaded to the cloud server. A third thread runs every 5 seconds and deletes all entries in the sensor table older than 2 minutes, keeping two minutes of data on the Raspberry PI.

Thread 1: receiving sensor data and inserting in database


import threading
from xbee import ZigBee #xbee 2
import apsw #alternative python sqlite wrapper
import serial
import struct
import codecs
import sys
import time
import datetime
import logging
#basic logging information change to log file
logging.basicConfig(filename='/usr/local/tempniek/18log.log',level=logging.INFO)
logging.info('I told you so')
try:
#xbee connection
ser = serial.Serial('/dev/ttyAMA0', 9600, timeout=5)
xbee = ZigBee(ser,escaped=True)
#classes used for multithreading
#(one class for every function that runs as a separate thread)
class myThreadInsert (threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
try:
xinsert()
except Exception:
logging.exception("xinsert")
def fins(fvcurtime,fvsource,fvid,fvdata):
"""
This function inserts data into the sensvals table.
Sets rowkey used by HBASE and seperate key parts for indexing.
Key format: <device id>_<port id>_<reverse epoch>_<reverse milliseconds>.
For example: 40b5af01_rx000A01_8571346790_9184822.
Used by xinsert() function.
"""
bvepoch = 9999999999
bvsub = 9999999
fvepoch = int(fvcurtime) #remove sub seconds from epoch
fkepoch = bvepoch fvepoch
fvsub = bvsub int(datetime.datetime.fromtimestamp(fvcurtime).strftime('%f')) #only milisecond part from epoch
fvprekey = fvsource + '_' + fvid + '_'
fvkey = fvprekey + str(fkepoch) + '_' + str(fvsub)
dbi = dbc.cursor()
dbi.execute("INSERT INTO sensvals (vkey,vepoch,vsub,vsource,vport,vprekey,vvalue) VALUES(?,?,?,?,?,?,?)",(fvkey,fvepoch,fvsub,fvsource,fvid,fvprekey,fvdata))
dbi.close()
def xinsert():
"""
Receives and sends data from and to the XBee.
Sending and receiving combined in one thread because only one tread can use GPIO port.
Recieving is done as fast as possible.
Sinding only at 1 second interval.
Runs as a sperate thread.
"""
ficurtime = 0
fivepoch = 0
while True:
logging.debug("insert started")
logging.debug(datetime.datetime.utcnow())
try:
#start with receive part
ficurtime = time.time()
fivepoch = int(ficurtime)
fimintime = fivepoch 5
response = xbee.wait_read_frame()
logging.debug(response)
vid = (response['id'])
if vid != 'tx_status':
vsource = (codecs.decode(codecs.encode(response['source_addr_long'],'hex'),'utf-8')[8:])
logging.debug(vsource)
logging.debug(vid)
vdata = codecs.decode(response['rf_data'],'utf-8')
logging.debug(vdata)
if vid == 'rx': #special case if port is rx. Assumes arduino is sending data.
vid = 'rx'+(vdata[0:3].zfill(6)) #first part of payload is sendor ID
vdata = vdata[4:]
vdatas = vdata.split(',') #assumes array of values
for vdatasv in vdatas:
vdatasv = int(vdatasv)
fins(time.time(),vsource,vid,vdatasv) #use fins() function to actuall insert data in database
else: #case of normal xbee payload
vid = vid.zfill(8)
vdata = int(vdata)
fins(time.time(),vsource,vid,vdata) #use fins() function to actuall insert data in database
except Exception:
logging.exception("fxinsert")
time.sleep(0.001)
def fmain():
"""
Main thread function.
"""
try:
time.sleep(1)
logging.info("hallostart")
dbstart() #start initial SQLite
hbasestart() #start initial HBase
time.sleep(3)
#start the functions that run as seperate threads
threadInsert = myThreadInsert()
threadInsert.start()
except Exception:
logging.exception("fmain")
if __name__=="__main__":
fmain()
except Exception:
logging.exception("main")

Thread 2: creating rolling average


def fcurrent():
"""
Caclulcates roling avarages for diferent sensor values.
Inserts avarage balues in senscur tables and as new values in sensvals tables.
Runs as a sperate thread.
"""
time.sleep(2)
while True:
try:
logging.debug("current")
fvcurtime = time.time()
logging.debug(fvcurtime)
fvepoch = int(fvcurtime)
fminvepochtemp = fvepoch 20
fminvepochset = fvepoch 1
cursource = '40b5af01' #id of Rapsberry PI device
dbd3 = dbc.cursor()
dbd3.execute("DELETE FROM senscur")
dbd3.execute("INSERT INTO senscur (vepoch, vsource, vport, vvalue) SELECT MAX(vepoch), vsource, vport, round(AVG(vvalue)) FROM sensvals WHERE vepoch BETWEEN %i and %i AND vprekey in ('40b5af00_rx000A01_','40b5af00_rx000A04_','40b5af00_rx000A05_') GROUP BY vsource,vport" % (fminvepochtemp,fvepoch))
dbd3.execute("INSERT INTO senscur (vepoch, vsource, vport, vvalue) SELECT MAX(vepoch), vsource, vport, round(AVG(vvalue)) FROM sensvals WHERE vepoch BETWEEN %i and %i AND vprekey in ('40b5af00_rx000A02_') GROUP BY vsource,vport" % (fminvepochset,fvepoch))
dbd3.execute("INSERT INTO senscur (vepoch, vsource, vport, vvalue) SELECT vepoch, vsource, vport, vvalue FROM sensvals WHERE vepoch = %i AND vprekey in ('40b5af00_rx000A03_','40b5af00_rx000A07_') ORDER BY vsub DESC LIMIT 1" % (fvepoch))
dbd3.execute("SELECT * FROM senscur")
rows = dbd3.fetchall()
for row in rows:
fins(fvcurtime,cursource,row[2],row[3])
logging.debug(row)
dbd3.close()
except Exception:
logging.exception("fcurrent")
time.sleep(1)

Thread 3: deleting old data


def fdel():
"""
Deletes old data from local tables at Raspberry PI.
Runs as a sperate thread.
"""
while True:
time.sleep(5)
try:
logging.debug("del started")
fvcurtime = time.time()
fdvepoch = int(fvcurtime) 120 #keep sensor values for two minues
dbd = dbc.cursor()
dbd.execute("DELETE FROM sensvals WHERE vepoch < %i" % fdvepoch)
dbd.close()
except Exception:
logging.exception("fdel")

Next part:
4. Storing data in the Amazon Cloud (HBase)

Advertisement

2 thoughts on “Making your own smart ‘machine learning’ thermostat using Arduino, AWS, HBase, Spark, Raspberry PI and XBee

  1. Pingback: Enabling technologies: how to build your own NEST | SmartDomus

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s