Configure the data flow from EdgeX to eKuiper

    Typically, there are two kinds of data flow from EdgeX to eKuiper:

    • From EdgeX app service to eKuiper
    • From EdgeX message bus directly to eKuiper

    Notice that, the EdgeX message bus receives messages from various service such as device service and core data. Even for the first kind of data flow, the app service result is also publish to the message bus and then consumed by eKuiper. The differences between the two kinds is whether app service processed the message before consuming by eKuiper.

    By default, the first kind of data flow is used which allow users to prepare (transformed, enriched, filtered, etc.) and groom (formatted, compressed, encrypted, etc.) before sending to the eKuiper rule engine. If users don’t need to transform the data and would like to process the raw data in eKuiper to reduce the overhead, they can connect to the message bus directly.

    The full properties list of EdgeX source can be found . There are two critical properties that define the connection model: and messageType. Let’s explore how to configure them to adopt the connection models.

    In the default EdgeX , a default app service app-service-rules is defined as the upstream of eKuiper. The default publish topic is rules-events which is defined in the default configuration. In the same docker compose file rulesengine section, there is an environment variable definition EDGEX__DEFAULT__TOPIC: rules-events. This means eKuiper edgeX source default topic is “rules-events” which just matches the publish topic of app-service-rules. Thus, when create an edgex type stream with default configuration, the data will flow from app-service-rules to eKuiper automatically.

    To bypass the app service and gain some performances boost, users can connect to the message bus directly. Besides setting up the topics, users also need to configure the messageType property.

    EdgeX v2 message bus has multi level topics so that consumers can filter message by topic efficiently. Please refer to the .

    For example, if the rules only consider the data from Random-Integer-Device, we can modify the docker compose file for rulesengine as below to changethe topic and modify the message type to request.

    1. ...
    2. ...
    3. ...
    4. environment:
    5. ...
    6. EDGEX__DEFAULT__TOPIC: edgex/events/#/Random-Integer-Device/#
    7. EDGEX__DEFAULT__MESSAGETYPE: request
    8. ...

    By this way, the eKuiper detach from the app service and connect to the message bus directly. When create a EdgeX stream with default configuration:

    Only the events from the Random-Integer-Device will be received.

    In the real world, users usually have multiple rules. Some rules only concern about a specific profile, device or even reading of the message bus. It is a best practice to create multiple streams to map multiple points of interest and each rule only process the subset of messages.

    1. # Default conf connect app-service-rules
    2. default:
    3. protocol: tcp
    4. server: localhost
    5. port: 5563
    6. type: redis
    7. messageType: event
    8. #Override the global configurations
    9. device_conf: # Filter only Random-Integer-Device
    10. topic: edgex/events/#/Random-Integer-Device/#
    11. messageType: request
    12. another_app_service_conf:
    13. topic: new-rules-events
    14. int8_conf: # Filter only Random-Integer-Device Int8 reading
    15. topic: edgex/events/#/Random-Integer-Device/Int8
    16. messageType: request

    With this configuration, users have 3 confkey that would connect to different edgeX data flows. For example, if a user have two rules: rule1 need to process all events while rule2 only process the Int8 reading of Random-Integer-Device. Then the user can create two streams: edgexAll and edgexInt8.

    With this definition, the default confkey will be used and the rules using this stream will receive all events.

      Differently, the edgexInt8 specify the confkey explicitly to use int8_conf which configures to filtered topic for Random-Integer-Device device Int8 reading. Thus, it will only receive Int8 reading for every event and the event structure is fixed. So, the stream definition also define the schema instead of schemaless.

      Similarly, users can create streams for each confkey. And each rule can pick the streams depending on its interests.

      Shared instance

      When rules are running, each rule have an individual source instance and are separated from each other even using the same stream definition. Sometimes to reduce the overhead and guarantee the same data sequence across rules, many rules may want to share the same instance of source. This is even common for the edgex default stream which reads all events in the message bus and multiple instance may have a lot of overhead.

      So for edgexAll stream, we recommend creating a shared instance and let all rules that need the full data use it.