Run TensorFlow Lite model with eKuiper function plugin

    TensorFlow Lite is a set of tools to help developers run TensorFlow models on mobile, embedded, and IoT devices. It enables on-device machine learning inference with low latency and a small binary size.

    By integrating eKuiper and TensorFlow Lite, users can analyze the data in stream by AI with prebuilt TensorFlow models. In this tutorial, we will walk you through building a eKuiper plugin to label pictures (binary data) produced by an edge device in stream by pre-trained image recognition TensorFlow model.

    To run the TensorFlow lite interpreter, we need a trained model. We won’t cover how to train and cover a model in this tutorial, you can check for how to do that. We can either train a new model or pick one online. In this tutorial, we will use label image model from . This repo creates a golang binding for tflite C API. We will also use it to implement our plugin.

    To integrate eKuiper with TensorFlow lite, we will develop a customized eKuiper function plugin to be used by eKuiper rules. As an example, we will create function whose input is a binary type data representing an image and the output is a string representing the label of the image. For example, LabelImage(col) will produce "peacock" if the input image has a peacock.

    To develop the function plugin, we need to:

    1. Create the plugin go file. For example, in eKuiper source code, create plugins/functions/labelImage/labelImage.go file.
    2. Create a struct that implements api.Function interface.
    3. Export the struct.

    The key part of the implementation is the Exec function. The pseudo code is like:

    Another thing to notice is the export of plugin. The function is stateless, so we will only export one struct instance. All rules use this function will share one instance to avoid overhead of creating instances and loading model. The model and label path will be specified at the instantiation.

    1. var LabelImage = labelImage{
    2. modelPath: "labelImage/mobilenet_quant_v1_224.tflite",
    3. labelPath: "labelImage/labels.txt",
    4. }

    Check for detail steps of creating eKuiper plugins. Please refer to labelImage.go for the full source code.

    To use the plugin, we need to build it in the environment where eKuiper will run and then install it in eKuiper.

    1. POST http://{{eKuiperHost:eKuiperRestPort}}/plugins/functions
    2. Content-Type: application/json
    3. {"name":"labelImage", "file": "https://www.emqx.io/downloads/kuiper-plugins/v1.1.2/debian/functions/labelImage_amd64.zip"}

    Manual build

    If you don’t run eKuiper by official eKuiper docker image, the pre-built labelImage plugin will not fit due to the limitation of golang plugin. You will need to built the plugin manually. There are 3 steps to create the plugin zip file manually:

    1. Build the TensorFlowLite C API.
    2. Build the labelImage plugin.
    3. Package the plugin with install script.

    Build the TensorFlowLite C API

    There is a very simple from the tensorflow repo about build the C API. We will expand it in detail step by step in this section. Notice that, the plugin only test against TensorFlow v2.2.0-rc3, so we will build upon this version. Take ubuntu as an example, below are the build steps:

    1. Install Python 3.

    2. Copy to your location. Install required python lib: pip3 install -r requirements.txt. The requirements are from tensorflow/tensorflow/tools/pip_package/setup.py of the corresponding TensorFlow version.

    3. Install Bazel which is the build tool for TensorFlow.

    4. Build the target .so file, the output will be in ./bazel-bin. Copy the two so to tensorflow/lib folder.

    5. Install the so files.
      1. Update ldconfig file. sudo vi /etc/ld.so.conf.d/tflite.conf.
      2. Add the path {{tensorflowPath}}/lib to tflite.conf then save and exit.
      3. Run ldconfig: sudo ldconfig.
      4. Check installation result: ldconfig -p | grep libtensorflow. Make sure the two so files are listed.

    Build the labelImage plugin

    Make sure the eKuiper github repo has cloned. The plugin source file is in extensions/functions/labelImage/labelImage.go. Export the paths of the tensorflow repo and built libraries before build the plugin.

    1. $ cd {{eKuiperRepoPath}}
    2. $ export CGO_CFLAGS=-I/root/tensorflow
    3. $ export CGO_LDFLAGS=-L/root/tensorflow/lib
    4. $ go build -trimpath -modfile extensions.mod --buildmode=plugin -o plugins/functions/LabelImage.so extensions/functions/labelImage/*.go
    5. $ cp -r extensions/functions/labelImage plugins/functions

    By these commands, the plugin is built into plugins/functions/LabelImage.so and copy all dependencies to plugins/functions/labelImage folder. For development purpose, you can restart eKuiper to load this plugin automatically and do testing. After testing complete, we should package it in a zip which is ready to use by eKuiper plugin installation API so that it can be used in another machine such as in production environment.

    Package the plugin

    • etc
      • labels.txt
      • mobilenet_quant_v1_224.tflite
    • lib
      • libtensorflowlite.so
      • libtensorflowlite_c.so
    • install.sh
    • LabelImage.so
    • tflite.conf

    Install the packaged plugin to the target system like .

    Once the plugin installed, we can use it in our rule. We will create a rule to receive image byte data from a mqtt topic and label the image by tflite model.

    Define the stream by eKuiper rest API. We create a stream named tfdemo whose format is binary and the topic is tfdemo.

    1. POST http://{{host}}/streams
    2. Content-Type: application/json
    3. {"sql":"CREATE STREAM tfdemo () WITH (DATASOURCE=\"tfdemo\", FORMAT=\"BINARY\")"}

    Define the rule

    Define the rule by eKuiper rest API. We will create a rule named ruleTf. We just read the images from tfdemo stream and run the custom function labelImage against it. The result will be the label of the image recognized by the AI.

    Here we create a go program to send image data to the tfdemo topic to be processed by the rule.

    1. package main
    2. import (
    3. "fmt"
    4. mqtt "github.com/eclipse/paho.mqtt.golang"
    5. "io/ioutil"
    6. )
    7. func main() {
    8. const TOPIC = "tfdemo"
    9. images := []string{
    10. "peacock.png",
    11. "frog.jpg",
    12. // other images you want
    13. opts := mqtt.NewClientOptions().AddBroker("tcp://yourownhost:1883")
    14. client := mqtt.NewClient(opts)
    15. if token := client.Connect(); token.Wait() && token.Error() != nil {
    16. panic(token.Error())
    17. }
    18. for _, image := range images {
    19. fmt.Println("Publishing " + image)
    20. payload, err := ioutil.ReadFile(image)
    21. if err != nil {
    22. fmt.Println(err)
    23. continue
    24. }
    25. if token := client.Publish(TOPIC, 0, false, payload); token.Wait() && token.Error() != nil {
    26. fmt.Println(token.Error())
    27. } else {
    28. fmt.Println("Published " + image)
    29. }
    30. time.Sleep(1 * time.Second)
    31. }
    32. client.Disconnect(0)
    33. }

    Run pub.go, it will start to feed images into tfdemo topic.

    Check the result

    Because our rule definition has only one sink: log so the result will be written into the log file. We feed the stream with two images peacock.png and frog.png. Check the log file, we would find:

    1. time="2021-02-05 16:23:30" level=info msg="sink result for rule ruleTf: [{\"labelImage\":\"bullfrog\"}]" file="sinks/log_sink.go:16" rule=ruleTf

    The images are labeled correctly.

    In this tutorial, we walk you through building a customized eKuiper plugin to leverage a pre-trained TensorFlowLite model. If you need to use other models, just follow the steps to create another function. Notice that, the built TensorFlow C API can be shared among all functions if running in the same environment. Enjoy the AI in edge device.