Develop Connectors

    Pulsar IO connectors are specialized Pulsar Functions. So writinga Pulsar IO connector is as simple as writing a Pulsar function. Pulsar IO connectors comein two flavors: ,which import data from another system, and Sink,which export data to another system. For example, would exportthe messages of a Pulsar topic to a Kinesis stream, and RabbitmqSource would importthe messages of a RabbitMQ queue to a Pulsar topic.

    Develop a source connector

    What you need to develop a source connector is to implement Sourceinterface.

    First, you need to implement the method. This method will be called once when the source connectoris initialized. In this method, you can retrieve all the connector specific settings throughthe passed config parameter, and initialize all the necessary resourcess. For example, a Kafkaconnector can create the Kafka client in this open method.

    Beside the passed-in config object, the Pulsar runtime also provides a SourceContext for theconnector to access runtime resources for tasks like collecting metrics. The implementation cansave the SourceContext for futher usage.

    The main task for a Source implementor is to implement readmethod.

    1. /**
    2. * Reads the next message from source.
    3. * If source does not have any new messages, this call should block.
    4. * @return next message from source. The return result should never be null
    5. * @throws Exception
    6. */
    7. Record<T> read() throws Exception;

    The implementation should be blocking on this method if nothing to return. It should never returnnull. The returned should encapsulates the information that is needed byPulsar IO runtime.

    • Topic Name: Optional. If the record is originated from a Pulsar topic, it should be the Pulsar topic name.
    • Value: Required. The actual data of this record.
    • Partition Id: Optional. If the record is originated from a partitioned source,return its partition id. The partition id will be used as part of the unique identifierby Pulsar IO runtime to do message deduplication and achieve exactly-once processing guarantee.
    • Record Sequence: Optional. If the record is originated from a sequential source,return its record sequence. The record sequence will be used as part of the unique identifierby Pulsar IO runtime to do message deduplication and achieve exactly-once processing guarantee.
    • Properties: Optional. If the record carries user-defined properties, return those properties.Additionally, the implemention of the record should provide two methods: and fail. Thesetwo methods will be used by Pulsar IO connector to acknowledge the records that it has doneprocessing and fail the records that it has failed to process.

    KafkaSource is a good example to follow.

    Develop a sink connector

    Developing a sink connector is as easy as developing a source connector. You just need toimplement Sink interface.

    Similarly, you first need to implement the method to initialize all the necessary resourcesbefore implementing the write method.

    The main task for a Sink implementor is to implement method.

    1. /**
    2. * Write a message to Sink
    3. * @param inputRecordContext Context of input record from the source
    4. * @param record record to write to sink
    5. * @throws Exception
    6. */
    7. void write(Record<T> record) throws Exception;

    In the implemention of write method, the implementor can decide how to write the value andthe optional key to the actual source, and leverage all the provided information such asPartition Id, Record Sequence for achieving different processing guarantees. The implementoris also responsible for acknowledging records if it has successfully written them or failingrecords if has failed to write them.

    Testing connectors can be challenging because Pulsar IO connectors interact with two systemsthat may be difficult to mock - Pulsar and the system the connector is connecting to. It isrecommended to write very specificially test the functionalities of the connector classeswhile mocking the external services.

    Once you've developed and tested your connector, you must package it so that it can be submittedto a Pulsar Functions cluster. There are two approaches describedhere work with Pulsar Functions' runtime.

    If you plan to package and distribute your connector for others to use, you are obligated toproperly license and copyright your own code and to adhere to the licensing and copyrights ofall libraries your code uses and that you include in your distribution. If you are using theapproach described in , the NAR plugin willautomatically create a DEPENDENCIES file in the generated NAR package, including the properlicensing and copyrights of all libraries of your connector.

    Creating a NAR package

    The easiest approach to packaging a Pulsar IO connector is to create a NAR package using.

    NAR stands for NiFi Archive. It is a custom packaging mechanism used by Apache NiFi, to providea bit of Java ClassLoader isolation. For more details, you can read thisblog post to understandhow NAR works. Pulsar uses the same mechanism for packaging all the .

    All what you need is to include this nifi-nar-maven-plugin in your maven project for your connector. For example:

    The connector is a good example to follow.

    Creating an Uber JAR

    You can use to create a Uber JAR. For example:

    1. <plugin>
    2. <groupId>org.apache.maven.plugins</groupId>
    3. <artifactId>maven-shade-plugin</artifactId>
    4. <version>3.1.1</version>
    5. <execution>
    6. <phase>package</phase>
    7. <goals>
    8. <goal>shade</goal>
    9. </goals>
    10. <configuration>
    11. <filters>
    12. <filter>
    13. <artifact>*:*</artifact>
    14. </filter>
    15. </filters>
    16. </configuration>
    17. </execution>
    18. </plugin>