Deploying and managing Pulsar Functions

    In order to deploy and manage Pulsar Functions, you need to have a Pulsar cluster running. There are several options for this:

    • You can run a standalone cluster locally on your own machine
    • You can deploy a Pulsar cluster on , Amazon Web Services, , DC/OS, and moreIf you're running a non- cluster, you'll need to obtain the service URL for the cluster. How you obtain the service URL will depend on how you deployed your Pulsar cluster.

    If you're going to deploy and trigger python user-defined functions, you should install the pulsar python client first.

    Pulsar Functions are deployed and managed using the interface, which contains commands such as create for deploying functions in , trigger for functions, list for listing deployed functions, and several others.

    Each Pulsar Function has a Fully Qualified Function Name (FQFN) that consists of three elements: the function's tenant, namespace, and function name. FQFN's look like this:

    FQFNs enable you to, for example, create multiple functions with the same name provided that they're in different namespaces.

    When managing Pulsar Functions, you'll need to specify a variety of information about those functions, including tenant, namespace, input and output topics, etc. There are some parameters, however, that have default values that will be supplied if omitted. The table below lists the defaults:

    Example use of defaults

    Take this create command:

    1. $ bin/pulsar-admin functions create \
    2. --jar my-pulsar-functions.jar \
    3. --classname org.example.MyFunction \
    4. --inputs my-function-input-topic1,my-function-input-topic2

    The created function would have default values supplied for the function name (MyFunction), tenant (public), namespace (default), subscription type (SHARED), processing guarantees (), and Pulsar service URL (pulsar://localhost:6650).

    1. $ bin/pulsar-admin functions localrun \
    2. --py myfunc.py \
    3. --classname myfunc.SomeFunction \
    4. --output persistent://public/default/output-1

    By default, the function will connect to a Pulsar cluster running on the same machine, via a local broker service URL of pulsar://localhost:6650. If you'd like to use local run mode to run a function but connect it to a non-local Pulsar cluster, you can specify a different broker URL using the —brokerServiceUrl flag. Here's an example:

    1. $ bin/pulsar-admin functions localrun \
    2. --broker-service-url pulsar://my-cluster-host:6650 \
    3. # Other function parameters

    When you run a Pulsar Function in cluster mode, the function code will be uploaded to a Pulsar broker and run alongside the broker rather than in your . You can run a function in cluster mode using the create command. Here's an example:

    1. $ bin/pulsar-admin functions create \
    2. --py myfunc.py \
    3. --classname myfunc.SomeFunction \
    4. --inputs persistent://public/default/input-1 \
    5. --output persistent://public/default/output-1

    You can use the command to update a Pulsar Function running in cluster mode. This command, for example, would update the function created in the section above:

    Pulsar Functions run as processes called instances. When you run a Pulsar Function, it runs as a single instance by default (and in you can only run a single instance of a function).

    You can also specify the parallelism of a function, i.e. the number of instances to run, when you create the function. You can set the parallelism factor using the —parallelism flag of the create command. Here's an example:

    1. $ bin/pulsar-admin functions create \
    2. --parallelism 3 \
    3. # Other function info

    You can adjust the parallelism of an already created function using the interface.

    1. $ bin/pulsar-admin functions update \
    2. --parallelism 5 \
    3. # Other function

    If you're specifying a function's configuration via YAML, use the parallelism parameter. Here's an example config file:

    1. parallelism: 3
    2. inputs:
    3. - persistent://public/default/input-1
    4. output: persistent://public/default/output-1

    And here's the corresponding update command:

    1. $ bin/pulsar-admin functions update \
    2. --function-config-file function-config.yaml

    When you run Pulsar Functions in cluster run mode, you can specify the resources that are assigned to each function :

    If a Pulsar Function is running in cluster mode, you can trigger it at any time using the command line. Triggering a function means that you send a message with a specific value to the function and get the function's output (if any) via the command line.

    To show an example of function triggering, let's start with a simple that returns a simple string based on the input:

    1. # myfunc.py
    2. def process(input):
    3. return "This function has been triggered with a value of {0}".format(input)

    Let's run that function in local run mode:

    1. $ bin/pulsar-admin functions create \
    2. --tenant public \
    3. --namespace default \
    4. --name myfunc \
    5. --py myfunc.py \
    6. --classname myfunc \
    7. --inputs persistent://public/default/in \
    8. --output persistent://public/default/out

    Now let's make a consumer listen on the output topic for messages coming from the myfunc function using the command:

    1. $ bin/pulsar-client consume persistent://public/default/out \
    2. --subscription-name my-subscription
    3. --num-messages 0 # Listen indefinitely

    Now let's trigger that function:

    1. $ bin/pulsar-admin functions trigger \
    2. --tenant public \
    3. --namespace default \
    4. --name myfunc \

    The consumer listening on the output topic should then produce this in its logs: