Making your own smart ‘machine learning’ thermostat using Arduino, AWS, HBase, Spark, Raspberry PI and XBee

Previous part:
6. Using outside temperature and scenarios to control an Arduino from a Raspberry PI

7. Learning and adapting temperature scenarios in the Amazon cloud (SPARK)

7.1 Learning thermostat & temperature scenarios

When I finally got to making the ‘learning’ part it was already May and by that time our cold spring, that was helping me before to give me some more time to complete this project, was finally gone. From implementing the use of temperature scenarios and uploading these to the cloud server, as described in part 6 of this blog post, I got 40 used temperature scenarios. This final installment of the blog post continues with the concept of temperature scenarios.

Although described in more detail in part 6, here is a quick recap of the concept of temperature scenarios. At this moment the Raspberry PI selects a temperature scenario to heat the house based on the tempdif and outtempif.

  • The temperature scenario represents a pattern of how long the boiler should be on during six intervals of 10 minutes. For example 5 4 3 2 2, in which the boiler is on for 5 minutes in the first interval of 10 minutes, 4 in the second interval, 3 in the third etc.
  • The tempdif is the difference between the set temperature and the inside temperature at the time the Raspberry PI selected the temperature scenario.
  • The outtempdif is the difference between set temperature and the outside temperature when the Raspberry PI selected the temperature scenario.
  • The Raspberry PI selects the best temperature scenario given a tempdif and outtempdiff. The best scenario is the scenario with the lowest score. It also randomly selects an alternative scenario, to see if this alternative scenario has a better result of this tempdif and outtempdif.

7. 2 Learning by improving the temperature scenarios

The learning part is implemented by learning from the used scenarios and improving these scenarios in order to make the thermostat more efficient. As stated above, by the tie this part was implemented, the house no longer needed to be heated, so none of the new scenarios where actually used. Nevertheless, improvements and the learning capability of the thermostat are clearly shown in table 1. The thermostat learned that if the difference between the set temperature and the inside temperature (tempdif) is +/- 0.8 degrees Celsius and the difference between set temperature and the outside temperature (outtempdif) is +/- 8 degrees Celsius it only has to be on for 2 minutes at the first third and fifth interval.

table_v2
Table 1: minutes on per run of 10 minutes – initial and improved scenario

7.3. Temperature groups

In addition to the concept of temperature scenarios, the concept of temperature groups is introduced. Each temperature group represents a difference between the set temperature and the inside temperature (tempdif) and the difference between set temperature and the outside temperature (outtempdif), as shown in Figure 15.

For example in group J in figure 15 the difference between the outside temperature and the set temperature on the thermostat is 14 degrees and the difference between the inside temperature and the set temperature is 0.5 degrees Celsius. So this is a situation in which the thermostat has the keep the temperature the same and it is cold outside. In group A, the outtempdif is only 8 degrees but the difference between the set temperature and the actual room temperature is 2.5 degrees, meaning that the thermostat has to increase the room temperature, but it is not very could outside.

The thermostat starts with some pre-set general temperature groups. Using k-means clustering these groups will evolve toward the real temperature groups as experienced by the thermostat. The current temperature groups of the thermostat are shown in figure 16. Each group also starts with a default temperature scenario. Over time the thermostat learns what the best pattern is for each real-live temperature group.

temp_group_start_v2
Figure 15: initial temperature groups

tem_groups_clusters_v2
Figure 16: Current temperature groups

7.4 Two-step process using Apache Spark

The creation of temperature groups and selecting the best scenario, or patern, for each group is implemented in the two steps. The first step looks at the used scenarios and scores the scenario. The second step uses clustering to rank the temperature scenarios for a given temperature group.

Both steps are implemented using Apache Spark. The main reason for using Spark was that I would like to gain some experience with it, particularly running Spark on HBase. The advantage of using Spark is that it is designed for fast in memory processing of large amounts of data. In addition, one of the standard examples is using Spark for k-means clustering.

Apache Spark is installed on the Amazon Cloud server following the default tutorial on spark.apache.org. To learn how to write Spark I watched the videos of the excellent introduction workshop form the Spark 2014 Summit by Paco Nathan. I can highly recommend watching these video’s and I think Paco does and excellent job of explaining how to use spark. Of course this was the first attempt to use Spark. Actually, it was the first time I used a compiled language and I am confident a lot things can be improved.

7.5 Step 1: scoring

The scoring step looks at every uploaded used scenario and uses the uploaded sensor data from the hour after the scenario started at the Raspberry PI to score the scenario. The scoring is based on the following ideas:

  • The number of times the boiler goes on and off should be as low as possible.
  • The duration of the ‘on’ time should be as short as possible.
  • The reached temperature should be as close to the set temparature as possible.
  • The set temperature should be reached as quickly as possible.
  • There should be a penalty for overshooting the set temperature.

These ideas are implemented as penalty points. In the best case the set temperature would be reached in 0 minutes, the boiler would be on only one time, it would not overshoot and the exact set temperature would be reached, without fluctuation. This is of course unrealistic and in practice the result of scoring the used scenarios are much higher numbers. The following formula scores the used scenario.

score =
( [number of times boiler goes on] * 10 * 60 ) +
( [total minutes boiler is on] * 2 ) +
( absolute([maximum reached temperature] – [set temperature]) )+
( if([overshoot]=true ) then 3600 else 0 )

7.6 Spark implementation of scoring step

The first challenge was connecting Spark to HBASE. Most important here is setting the correct .jar dependencies. Luckily Seraph Chutium wrote a very good post on these http://www.abcn.net/2014/07/lighting-spark-with-hbase-full-edition.html . Currently I use these dependencies


–driver-class-path /usr/local/hbase/lib/hbase-server-0.98.7-hadoop2.jar:/usr/local/hbase/lib/hbase-protocol-0.98.7-hadoop2.jar:/usr/local/hbase/lib/hbase-hadoop2-compat-0.98.7-hadoop2.jar:/usr/local/hbase/lib/hbase-client-0.98.7-hadoop2.jar:/usr/local/hbase/lib/hbase-common-0.98.7-hadoop2.jar:/usr/local/hbase/lib/htrace-core-2.04.jar:/usr/local/hbase/lib/guava-12.0.1.jar

The script is using the scenario key as input. How this is fed to the Spark job is explained in the workflow paragraph below. The first step is to get the used scenario information. For example, what was the outside temperature and the set temperature when the thermostat started to heat the rooms? For this, a regular HBASE get operation is used. The start time is included in the used temperature scenario key so the start time and the end time an hour later can be derived form the key.


//spark settings
val sparkconf = new SparkConf().setAppName("tempscore")
val sc = new SparkContext(sparkconf)
//get scenario information from husedscenario table
val confmrscen = HBaseConfiguration.create()
val rowval = args(0).getBytes() //used scenario key is input
val getscenvals = new Get(rowval)
val tablescenvals = new HTable(confmrscen,"husedscenario")
val resgetscenvals = tablescenvals.get(getscenvals)
val valscenariokey = resgetscenvals.getValue("fd".getBytes(), "scenariokey".getBytes())
val valsettemp : Int = Bytes.toString(resgetscenvals.getValue("fd".getBytes(), "settemp".getBytes())).toInt
val valrevepoch : Long = Bytes.toString(rowval).substring(9,19).toLong
val valmaxrevepoch : Long = valrevepoch (60L*60L) //limit to one hour after scenario started
tablescenvals.close()

This information is then used to get the required sensor information from the sensor HBase table. For this the script uses an HBase scan operation and then the Spark Hadoop API to create an in-memory Resilient Distributed Dataset (RDD). This currently requires a scan to string functions converting the HBase scan object using Googles protocol buffers to a sting.


//scan hsensval table to retreive information to score scenario part 1: temperature information
//this scan collects the actual inside temperature sensor data
val valstartrow : String = "40b5af01_rx000A04_" + valmaxrevepoch.toString + "_9999999"
val valstoprow : String = "40b5af01_rx000A04_" + valrevepoch.toString + "_9999999"
val conf = HBaseConfiguration.create()
val scan = new Scan()
conf.set(TableInputFormat.INPUT_TABLE, "hsensvals")
scan.setStartRow(valstartrow.getBytes())
scan.setStopRow(valstoprow.getBytes())
conf.set(TableInputFormat.SCAN, convertScanToString(scan))
val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result])
val valRows = hBaseRDD.map(tuple => tuple._2).map(result => (Bytes.toString(result.getRow()).substring(18,28) , Bytes.toString(result.getValue("fd".getBytes(), "cd".getBytes())).toInt ) )
valRows.cache()

Spark uses the map reduce paradigm https://en.wikipedia.org/wiki/MapReduce . Different map reduce operations are used to set the various parameters of the scoring formula. For example in the code snippet below the variables: ‘maximum temperature’ and ‘after how many seconds the set temperature is reached’ are set. Of course this last variable can only be done if the set temperature is reached is within an hour. When the desired temperature is not reached within 60 minutes the temp reached time is set to 70 minutes, meaning a penalty of 10 minutes is added when the set temperature is not reached at all.

The map reduce paradigm uses key values to sort and group values. This is why many of the operations involve shuffling around keys and values. As highlighted by the item.swap operation before using SortByKey to calculate the highest temperature. This swap make the temperature itself the key instead of the sensor key.


val testval = (valsettemp*10) //temperature to be reached
var tempReached : Long = 70L*60L
val valFirstTempR = valRows.filter(r => r._2 >= testval)
valFirstTempR.cache()
val valFirstTempRhap : Long = valFirstTempR.count()
println("numonovermax")
println(valFirstTempRhap)
if (valFirstTempRhap > 0L) {
val valFirstTempRval : Long = (valFirstTempR.sortByKey().take(1))(0)._1.toLong
println("reached max")
println(valFirstTempRval)
tempReached = valrevepoch valFirstTempRval //after how many seconds set temperature was reached
}
val mresultmax : Int = ((valRows.map(item => item.swap).sortByKey(false).take(1))(0))._1.toInt

The final score is then set as a column in the ‘husedscenrio’ HBase table. As a final step the used senario key is deleted from the ‘husedscenariotbsc’ index table. The use of this index table is described in the paragraph below that describes the workflow.


//calculate final score
val fscore = (numonoffs*10*60)+(ontime*2)+((math.abs(mresultmaxtestval))*150)+(overflsocre)+(tempReached)
//put score in used scenario
val confputscore = HBaseConfiguration.create()
val tableputscore = new HTable(confputscore,"husedscenario")
val putscore = new Put(rowval)
putscore.add("fd".getBytes(), "score".getBytes(),fscore.toString.getBytes())
tableputscore.put(putscore)
//delete used scenario key from index table
val tabledelrow = new HTable(confputscore,"husedscenariotbsc")
val useddelete = new Delete(rowval)
tabledelrow.delete(useddelete)
tabledelrow.close()
tableputscore.close()

7.7 Spark scoring workflow

The tempscore spark job runs on the Spark cluster. I have only one Amazon virtual machine. This means that I have only one Spark worker node, which runs on the same machine as the ‘cluster’ manager. This of course defeats the purpose of using a distribution based system like Spark, but as a learning experience it is very useful. It only takes 16 seconds to score an hour-long scenario, showing that I do not have any performance issues so far.

Because it only takes 16 seconds to score a scenario a normal cron job could start the score Spark job. If the connection between the Raspberry PI and the Amazon server goes down however, a backlog is created. The Raspberry PI currently stores up to two days of used scenario data, keeping up to 48 scenarios to score.

In addition to uploading the used scenario information to the ‘husedscenario’ HBase table, the Raspberry PI inserts the used scenario key in the ‘husedscenariotbsc’ table. This table is used as a list of scenarios that have not been indexed. A Scala program on the Amazon server scans this table every 5 minutes to see if there are newly upload scenarios. If there are new rows it starts a new Spark job for every scenario key using the spark-submit shell command. This creates a primitive workflow.

Because these jobs are theoretically able to run in parallel and each of these jobs uses a different time segment of the HBase sensor table, this workflow maximizes the parallel and fault tolerance features of HBase and Apache Spark. Of course there is no need to improve the speed and a sequential for loop would have been fine. Actually, I only have two cores available and the jobs currently are not able to run in parallel in practice. Also, I have to start the jobs in ‘client’ mode to prevent the Spark manager from choking. A more developed workflow system would not have these issues, but I was already stretching the concept of using distributed systems like Hadoop, HBase and Spark on a single machine. The primitive workflow I developed using a Scala program works fine. The runscore Scala code can be found on the githup page of this project.

Included on the githup page are also the .pom files to correctly compile the scala code, including all the dependencies with HBase and Spark. The cron command to run the runscore Scala program is shown below.


5 * * * * scala -cp "/usr/local/hbase/lib/*:/usr/local/spark/conf/hbase-site.xml:/usr/local/spark/tempniek/runscore/target/score-1.0-SNAPSHOT.jar" -Dlog4j.configuration=file:////usr/local/spark/tempniek/runscore/conf/log4j.properties runscoretemp

7.8 Step 2: clustering

The clustering step looks at the entire set of uploaded used scenarios that have been scored by the previous step. It used the tempdif and outtempdif to create temperature clusters. Both are set when the Raspberry PI decides the house is to could, turns the boiler on and starts heating the house.

The spark program uses k-means clustering on the tempdif and outtemdif observations to create 10 clusters. K-means clustering is a simple form of clustering. Starting with a number, in my case 10, random centers and then iterating over the observations to define the best clusters and centers of these clusters. This is clearly explained at the Wikipedia page of k-means clustering https://en.wikipedia.org/wiki/K-means_clustering . The Spark implementation of K-means clustering is described at the apache spark website https://spark.apache.org/docs/latest/mllib-clustering.html .

For each cluster the best scenario is selected. After the clusters are created the Spark program determines which was the scenario with the lowest score in every cluster. These scenarios with the cluster centers as new tempdif and outtempdif are inserted in the ‘hactscenrio’ table as the new temperature scenarios, which the Raspberry PI then downloads. The old scenarios are added as ‘alternative’ scenarios.

The result is that over time the best boiler pattern is selected for a given outside temperature and desired temperature increase. An example is shown in figure 17. Here you see that the number of minutes the boiler should be on is greatly reduced in the new scenario for the given tempdif and outtempdif. So the thermostat is ‘learning’ and getting ‘smarter’.

7.9 Spark implementation of clustering step

The spark script starts with an HBase scan to put the outtempdif, tempdif and score of each used scenario in a Spark RDD. It also filters out used scenarios that have not been scored yet.


//spark settings
val sparkconf = new SparkConf().setAppName("tempcluster")
val sc = new SparkContext(sparkconf)
//scan hbase used scenario table to collect used scenarios that have been scored
val confonoff = HBaseConfiguration.create()
val scanonoff = new Scan()
scanonoff.addColumn("fd".getBytes(), "outtempdif".getBytes())
scanonoff.addColumn("fd".getBytes(), "tempdif".getBytes())
scanonoff.addColumn("fd".getBytes(), "score".getBytes())
scanonoff.addColumn("fd".getBytes(), "scenariokey".getBytes())
scanonoff.setStopRow("40b5af01_9999999999_40b5af01_9999999999_9999999".getBytes())
confonoff.set(TableInputFormat.INPUT_TABLE, "husedscenario")
confonoff.set(TableInputFormat.SCAN, convertScanToString(scanonoff))
//create RDD with used scenarios
val RDDonOff = sc.newAPIHadoopRDD(confonoff, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result])
val valRowsOnOff = RDDonOff.map(tuple => tuple._2
).filter(result => Bytes.toString(result.getValue("fd".getBytes(), "score".getBytes())) != null //only select scored scenarios
).map(result => ( Bytes.toString(result.getValue("fd".getBytes(), "scenariokey".getBytes())), ( Bytes.toString(result.getValue("fd".getBytes(), "outtempdif".getBytes())).toInt , Bytes.toString(result.getValue("fd".getBytes(), "tempdif".getBytes())).toInt , Bytes.toString(result.getValue("fd".getBytes(), "score".getBytes())).toLong ) )
)
valRowsOnOff.cache()

The outtempdif and the tempdif of each scenario are put into a vector and the clustering algorithm is run over the resulting vectors. For the clustering algorithm I set the number of cluster to 10 and the number of iterations to 200.


val RDDscore = valRowsOnOff.map(result => result._2).map(result => Vectors.dense(result._1,result._2))
RDDscore.cache()
//use mllib k-means clustering to create clusters (temperature groups)
val numClusters = 10
val numIterations = 200
val clusters = KMeans.train(RDDscore, numClusters, numIterations)

The next step is to determine each cluster’s best scenario. To do this, first the cluster is determined for each scenario, using the predict() function from spark KMeans. Interestingly, the scenarios are not automatically put in a cluster when the clusters are created. Two steps are necessary. The first step is to create the clusters using used temperature scenarios. And then in the second step the same temperature scenarios are fed through the clusters to determine the cluster for each scenario.

After that, the average score per scenario in each temperature is calculated and subsequently the highest scoring scenario per temperature cluster. This is shown in the code snippet below. Coming from a world dominated by SQL the map reduce way of calculating an average and selecting the (row)key with the highest value looks way more complex to me. Of course, it does make sense that an average is the sum of values divided by the number of observations. While not as easy as ORDER BY AVG(), it is definitely is more fun.


//select best scenario for each temperature cluster
val RDDtotalClustered = valRowsOnOff.map(result => ( (result._1, clusters.predict(Vectors.dense(result._2._1,result._2._2)) ), (result._2._3, 1L) )
).reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2)
).map(result => (result._1._2, ( result._2._1/result._2._2, result._1._1 ) )
).reduceByKey((x, y) => if (x._1 < y._1) {(x._1, x._2)} else {(y._1, y._2)}
).mapValues(result => (math.round(result._1), result._2 ))
RDDtotalClustered.cache()

When the best scoring scenario is selected the boiler ‘on’ pattern of all the scenarios is retrieved form the existing scenario table. The combination of best scoring scenarios and alternatives are then reinserted in the active scenario HBase table. This table contains an ‘iepoch’ column containing the unix epoch when the scenario was generated. This acts as a version number. When the Raspberry PI sees a scenario with a higher epoch than the epoch of the existing scenarios, it automatically downloads the new scenario. And the whole process starts all over again.


//alternative scnearios get highest score
val maxscore : Int = ((RDDtotalClustered.map(result=> (result._2._1,1)).sortByKey(false).take(1))(0))._1 + 10
//determine cluster centers (centers are new tempdif and outtempdif)
val RDDcenters = sc.parallelize(clusters.clusterCenters)
val RDDcentersCalc = RDDcenters.map(result => (clusters.predict(result) , result ) )
RDDcentersCalc.cache()
//join best scenarios for temperature groups with cluster centers
val RDDClusterWithCenters = RDDtotalClustered.join(RDDcentersCalc
).map(result => (result._2._1._2, (result._1, result._2._1._1, math.round(result._2._2(0)), math.round(result._2._2(1)) ) ))
RDDClusterWithCenters.cache()
//scan scenario hbase table te retreive scenario information
val bvepoch: Long = 9999999999L
val vepoch: Long = System.currentTimeMillis / 1000
val startmepoch : Long = bvepoch vepoch
val vstartrow: String = "40b5af01_"+ startmepoch.toString
val confscenvals = HBaseConfiguration.create()
val scanscenvals = new Scan()
confscenvals.set(TableInputFormat.INPUT_TABLE, "hactscenario")
confscenvals.set(TableInputFormat.SCAN, convertScanToString(scanscenvals))
val RDDconfscenvals = sc.newAPIHadoopRDD(confscenvals, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result])
val RDDscenvals = RDDconfscenvals.map(tuple => tuple._2
).map(result => ( Bytes.toString(result.getRow()) ,
(
(Bytes.toString(result.getValue("fd".getBytes(), "run0".getBytes())) ,
Bytes.toString(result.getValue("fd".getBytes(), "run1".getBytes())),
Bytes.toString(result.getValue("fd".getBytes(), "run2".getBytes())),
Bytes.toString(result.getValue("fd".getBytes(), "run3".getBytes())) ,
Bytes.toString(result.getValue("fd".getBytes(), "run4".getBytes())),
Bytes.toString(result.getValue("fd".getBytes(), "run5".getBytes()))
),
(Bytes.toString(result.getValue("fd".getBytes(), "group".getBytes())),
maxscore,
Bytes.toString(result.getValue("fd".getBytes(), "outtempdif".getBytes())),
Bytes.toString(result.getValue("fd".getBytes(), "tempdif".getBytes()))
)
)
)
)
RDDscenvals.cache()
//join newly created scenarios with all other scenarios to create complet list of scenarios (best and alternative)
//collect all to a new array
val ClusterResSet = RDDscenvals.leftOuterJoin(RDDClusterWithCenters
).mapValues(result => if (result._2 == None) { (result._1._1,result._1._2) } else {(result._1._1,(result._2).get)}
).map(result => (vstartrow+result._1.substring(19,27), result._2 ) ).collect()
//insert new scenarios with new version (ipeoch) to hactscenario table
val confputscen = HBaseConfiguration.create()
val tableputscen = new HTable(confputscen,"hactscenario")
for (i < 0 until ClusterResSet.size) {
val putrun = new Put((ClusterResSet(i)._1).getBytes())
putrun.add("fd".getBytes(), "group".getBytes(), (ClusterResSet(i)._2._2._1).toString.getBytes() )
putrun.add("fd".getBytes(), "score".getBytes(), (ClusterResSet(i)._2._2._2).toString.getBytes() )
putrun.add("fd".getBytes(), "outtempdif".getBytes(), (ClusterResSet(i)._2._2._3).toString.getBytes() )
putrun.add("fd".getBytes(), "tempdif".getBytes(), (ClusterResSet(i)._2._2._4).toString.getBytes() )
putrun.add("fd".getBytes(), "iepoch".getBytes(), vepoch.toString.getBytes() )
putrun.add("fd".getBytes(), "run0".getBytes(), (ClusterResSet(i)._2._1._1).toString.getBytes() )
putrun.add("fd".getBytes(), "run1".getBytes(), (ClusterResSet(i)._2._1._2).toString.getBytes() )
putrun.add("fd".getBytes(), "run2".getBytes(), (ClusterResSet(i)._2._1._3).toString.getBytes() )
putrun.add("fd".getBytes(), "run3".getBytes(), (ClusterResSet(i)._2._1._4).toString.getBytes() )
putrun.add("fd".getBytes(), "run4".getBytes(), (ClusterResSet(i)._2._1._5).toString.getBytes() )
putrun.add("fd".getBytes(), "run5".getBytes(), (ClusterResSet(i)._2._1._6).toString.getBytes() )
tableputscen.put(putrun)
}
println("done")
tableputscen.close()

7.10 Spark cluster workflow

Similar to the scoring workflow a simple daily cron starts the Spark cluster program. As described in paragraph 7.8 this is not the most developed workflow system, but definitely sufficient for this project.


10 11 * * * /usr/local/spark/bin/spark-submit –driver-class-path /usr/local/hbase/lib/hbase-server-0.98.7-hadoop2.jar:/usr/local/hbase/lib/hbase-protocol-0.98.7-hadoop2.jar:/usr/local/hbase/lib/hbase-hadoop2-compat-0.98.7-hadoop2.jar:/usr/local/hbase/lib/hbase-client-0.98.7-hadoop2.jar:/usr/local/hbase/lib/hbase-common-0.98.7-hadoop2.jar:/usr/local/hbase/lib/htrace-core-2.04.jar:/usr/local/hbase/lib/guava-12.0.1.jar –deploy-mode "cluster" –class "clustertemp" –master spark://ip-172-31-7-254:6066 /usr/local/spark/tempniek/cluster/target/cluster-1.0-SNAPSHOT.jar

Advertisement

2 thoughts on “Making your own smart ‘machine learning’ thermostat using Arduino, AWS, HBase, Spark, Raspberry PI and XBee

  1. Pingback: Enabling technologies: how to build your own NEST | SmartDomus

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s