Source Extension
To develop a source for eKuiper is to implement api.Source interface and export it as a golang plugin.
Before starting the development, you must .
To develop a source, the Configure method must be implemented. This method will be called once the source is initialized. In this method, you can retrieve the DATASOURCE property of the stream (which is topic for mqtt and other messaging system) from the first parameter. Then in the second parameter, a map that contains the configuration in your yaml file is passed. See configuration for more detail. Typically, there will be information such as host, port, user and password of the external system. You can use this map to initialize this source.
The main task for a Source is to implement open method. The implementation should be synchronized to create a connection to the external system. Then continuously receive data from the external system and send the received message to the consumer channel. The consumer channel accepts SourceTuple interface which is composed by a map for the message body and another map for the optional metadata. Typically, use to create a SourceTuple. The meta data could be anything that worth to be recorded. For example, the qualified topic of the message. The first parameter is a StreamContext pointer. You can retrieve the context information and logger etc. from it. It is also an implementation of go context, so you can listen to Done() channel to know if the parent stream has quit. For any errors happening during the connection or receiving, handle it in this method. If the error cannot be handled, send it to the errCh. By default, the rule will be terminated if any errors received from errCh.
//Should be sync function for normal case. The container will run it in go func
Open(ctx StreamContext, consumer chan<- SourceTuple, errCh chan<- error)
The last method to implement is Close which literally close the connection. It is called when the stream is about to terminate. You could also do any clean up work in this function.
return &mySource{}
}
The is a good example.
If the is enabled, the source requires to be rewindable. That means the source need to implement both and api.Rewindable
interface.
A typical implementation is to save an offset
as a field of the source. And update the offset value when reading in new value. Notice that, when implementing GetOffset() will be called by eKuiper system which means the offset value can be accessed by multiple go routines. So a lock is required when read or write the offset.
eKuiper configurations are formatted as yaml and it provides a centralize location /etc to hold all the configurations. Inside it, a subfolder sources is provided for the source configurations including the extended sources.
A configuration system is supported for eKuiper extension which will automatically read the configuration in yaml file and feed into the Configure method of the source. If the property is specified in the stream, the configuration of that key will be fed. Otherwise, the default configuration is used.
- The name of your configuration file must be the same as the plugin name. For example, mySource.yaml.
- The yaml file must be located inside etc/sources
- The format of the yaml file could be found here
common configuration field
There are 2 common configuration fields.
concurrency
to specify how many instances will be started to run the source.bufferLength
to specify the maximum number of messages to be buffered in the memory. This is used to avoid the extra large memory usage that would cause out of memory error. Notice that the memory usage will be varied to the actual buffer. Increase the length here won’t increase the initial memory allocation so it is safe to set a large buffer length. The default value is 102400, that is if each payload size is about 100 bytes, the maximum buffer size will be about 102400 * 100B ~= 10MB.
Build the implemented source as a go plugin and make sure the output so file resides in the plugins/sources folder.
The customized source is specified in a stream definition. The related properties are:
- TYPE: specify the name of the source, must be camel case.
- CONF_KEY: specify the key of the configuration to be used.
If you have developed a source implementation MySource, you should have:
- The compiled MySource.so file is located inside plugins/sources
- If configuration needed, put mySource.yaml inside etc/sources
To use it, define a stream:
CREATE STREAM demo (
FIRST_NAME STRING,
LAST_NAME STRING,
NICKNAMES ARRAY(STRING),
Gender BOOLEAN,
ADDRESS STRUCT(STREET_NAME STRING, NUMBER BIGINT),