Manage schema

    如果某个 schema 通过了 schema 兼容性检查,Pulsar producer 就会自动将此 schema 更新为 topic 默认创建的 schema。

    Producer 会在以下情况中:

    • If a topic doesn’t have a schema, Pulsar registers a schema automatically.

    • If a topic has a schema:

      • If a producer doesn’t carry a schema:

        • If isSchemaValidationEnforced or schemaValidationEnforced is enabled in the namespace to which the topic belongs, the producer is rejected and disconnected.

      • If a producer carries a schema:

        Broker 根据 topic 所属命名空间中已配置的兼容性检查策略执行兼容性检查。

        • 已注册 schema,producer 将连接到 broker。

        • 未注册 schema:

          • If isAllowAutoUpdateSchema sets to false, the producer is rejected to connect to a broker.

          • If isAllowAutoUpdateSchema sets to true:

            • Schema 通过了兼容性检查,则 broker 主动为该 topic 注册一个新的 schema,并连接到 producer。

            • Schema 未通过兼容性检查,则 broker 不会注册 schema,并且 producer 连接到 broker 的请求被拒。

    Consumer 的自动更新

    Consumer 会在以下情况中自动更新

    • If a consumer connects to a topic without a schema (which means the consumer receiving raw bytes), the consumer can connect to the topic successfully without doing any compatibility check.

    • If a consumer connects to a topic with a schema.

      • Topic 中不同时包含 schema / 数据 / 本地 consumer 和本地 producer:

        • If isAllowAutoUpdateSchema sets to true, then the consumer registers a schema and it is connected to a broker.

        • If isAllowAutoUpdateSchema sets to false, then the consumer is rejected to connect to a broker.

      • Topic 中包含 schema / 数据 / 本地 consumer 和本地 producer 中的一个,则执行 schema 兼容性检查。

        • Schema 通过兼容性检查,则 consumer 连接到 broker。

        • Schema 未通过兼容性检查,则拒绝 consumer 连接到 broker。

    Consumer 自动更新

    管理自动更新策略

    可以使用 pulsar-admin 命令来管理 自动更新 策略,如下所示:

    启用自动更新

    可以使用 pulsar-admin命令在命名空间上启用 自动更新

    禁用自动更新

    要在名称空间上禁用AutoUpdate,可以使用pulsar-admin命令。

    1. bin/pulsar-admin namespaces set-is-allow-auto-update-schema --disable tenant/namespace

    禁用AutoUpdate后,你只能使用pulsar-admin命令注册新的架构。

    调整兼容性

    要调整名称空间上的架构兼容性级别,可以使用pulsar-admin命令。

    1. bin/pulsar-admin namespaces set-schema-compatibility-strategy --compatibility <compatibility-level> tenant/namespace

    By default, schemaValidationEnforced is disabled for producers:

    • 没有 schema 的 producer 可以使用 schema 向 topic 中生成任何类型的消息,这可能会导致 topic 中有垃圾数据。

    • 这样,不支持 schema 的非 Java 语言客户端就可以使用 schema 向 topic 生成消息。

    但是,如果你希望对具有架构的主题有更强的保证,则可以在整个群集中或在每个命名空间的基础上启用schemaValidationEnforced

    启用 Schema 验证

    要在名称空间上启用schemaValidationEnforced,可以使用pulsar-admin命令。

    1. bin/pulsar-admin namespaces set-schema-validation-enforce --enable tenant/namespace

    禁用 Schema 验证

    要在名称空间上禁用schemaValidationEnforced,可以使用pulsar-admin命令。

    1. bin/pulsar-admin namespaces set-schema-validation-enforce --disable tenant/namespace

    若要管理架构,可以使用以下方法之一。

    更新 Schema

    Admin CLI

    REST API

    Java Admin API

    Use the upload subcommand.

    1. $ pulsar-admin schemas upload --filename <schema-definition-file> <topic-name>

    The schema-definition-file is in JSON format.

    1. { "type": "<schema-type>", "schema": "<an-utf8-encoded-string-of-schema-definition-data>", "properties": {} // the properties associated with the schema}

    The includes the following fields:

    字段Description

    type

    Schema 类型。

    schema

    Schema 定义数据以 UTF 8 字符集编码。

    • 如果 schema 是 privitive schema,则此字段应为空。

    • 如果 schema 为 struct schema,则此字段应为 Avro schema 定义的 JSON 字符串。

    properties

    与 schema 相关的其他属性。

    Here are examples of the schema-definition-file for a JSON schema.

    示例 1

    1. { "type": "JSON", "schema": "{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"com.foo\",\"fields\":[{\"name\":\"file1\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"file2\",\"type\":\"string\",\"default\":null},{\"name\":\"file3\",\"type\":[\"null\",\"string\"],\"default\":\"dfdf\"}]}", "properties": {}}

    示例 2

    1. { "type": "STRING", "schema": "", "properties": { "key1": "value1" }}

    Send a POST request to this endpoint: POST /admin/v2/schemas/:tenant/:namespace/:topic/schema

    The post payload is in JSON format.

    The post payload includes the following fields:

    字段Description

    type

    Schema 类型。

    schema

    Schema 定义数据以 UTF 8 字符集编码。

    • 如果 schema 为 struct schema,则此字段应为 Avro schema 定义的 JSON 字符串。

    properties

    与 schema 相关的其他属性。
    1. void createSchema(String topic, PostSchemaPayload schemaPayload)

    The PostSchemaPayload includes the following fields:

    Here is an example of PostSchemaPayload:

    1. PulsarAdmin admin = …;PostSchemaPayload payload = new PostSchemaPayload();payload.setType("INT8");payload.setSchema("");admin.createSchema("my-tenant/my-ns/my-topic", payload);

    获取 Schema(最新版本)

    To get the latest schema for a topic, you can use one of the following methods.

    Admin CLI

    REST API

    Java Admin API

    使用 获得 子命令。

    1. $ pulsar-admin schemas get <topic-name>{ "version": 0, "type": "String", "timestamp": 0, "data": "string", "properties": { "property1": "string", "property2": "string" }}

    Send a GET request to this endpoint: GET /admin/v2/schemas/:tenant/:namespace/:topic/schema

    Here is an example of a response, which is returned in JSON format.

    1. { "version": "<the-version-number-of-the-schema>", "type": "<the-schema-type>", "timestamp": "<the-creation-timestamp-of-the-version-of-the-schema>", "data": "<an-utf8-encoded-string-of-schema-definition-data>", "properties": {} // the properties associated with the schema}

    The response includes the following fields:

    字段Description

    version

    Schema 版本,是一个长数。

    type

    Schema 类型。

    timestamp

    创建此版本 schema 的时间戳。

    data

    Schema 定义数据以 UTF 8 字符集编码。

    • 如果 schema 是 privitive schema,则此字段应为空。

    • 如果 schema 为 struct schema,则此字段应为 Avro schema 定义的 JSON 字符串。

    properties

    与 schema 相关的其他属性。
    1. SchemaInfo createSchema(String topic)

    The SchemaInfo includes the following fields:

    字段Description

    name

    Schema 名称。

    type

    Schema 类型。

    schema

    Schema 定义数据的字节数组以 UTF 8 字符集编码。

    如果该 schema 是“primitive” schema,则此字节数组应为空。

    如果该 schema 是 struct ** schema,则此字段应为由 Avro schema 定义的 JSON 字符串转换成的字节数组。

    1. properties
      </td>
      <td> schema 相关的其他属性。</td>

    Here is an example of SchemaInfo:

    To get a specific version of a schema, you can use one of the following methods.

    Admin CLI

    REST API

    Java Admin API

    使用 获得 子命令。

    1. $ pulsar-admin schemas get <topic-name> --version=<version>

    Send a GET request to a schema endpoint:

    Here is an example of a response, which is returned in JSON format.

    The response includes the following fields:

    1. SchemaInfo createSchema(String topic, long version)

    The SchemaInfo includes the following fields:

    字段Description

    name

    Schema 名称。

    type

    Schema 类型。

    schema

    Schema 定义数据的字节数组以 UTF 8 字符集编码。

    如果该 schema 是“primitive” schema,则此字节数组应为空。

    如果该 schema 是 struct ** schema,则此字段应为由 Avro schema 定义的 JSON 字符串转换成的字节数组。

    1. properties
      </td>
      <td> schema 相关的其他属性。</td>

    Here is an example of SchemaInfo:

    1. PulsarAdmin admin = …;SchemaInfo si = admin.getSchema("my-tenant/my-ns/my-topic", 1L);

    解压缩 schema

    To provide a schema via a topic, you can use the following method.

    Admin CLI

    Use the extract subcommand.

    1. $ pulsar-admin schemas extract --classname <class-name> --jar <jar-path> --type <type-name>

    删除 schema

    To delete a schema for a topic, you can use one of the following methods.

    Admin CLI

    REST API

    Java Admin API

    使用 删除 子命令。

    1. $ pulsar-admin schemas delete <topic-name>

    Send a DELETE request to a schema endpoint: DELETE /admin/v2/schemas/:tenant/:namespace/:topic/schema

    Here is an example of a response, which is returned in JSON format.

    1. { "version": "<the-latest-version-number-of-the-schema>",}

    The response includes the following field:

    字段Description
    versionSchema 版本,是一个长数。
    1. void deleteSchema(String topic)

    Here is an example of deleting a schema.

    1. PulsarAdmin admin = …;admin.deleteSchema("my-tenant/my-ns/my-topic");

    By default, Pulsar stores various data types of schemas in deployed alongside Pulsar.

    However, you can use another storage system if needed.

    To use a non-default (non-BookKeeper) storage system for Pulsar schemas, you need to implement the following Java interfaces:

    SchemaStorage 接口

    SchemaStorage接口包含以下方法:

    SchemaStorageFactory接口

    The SchemaStorageFactory interface has the following method:

    1. public interface SchemaStorageFactory {
    2. @NotNull
    3. SchemaStorage create(PulsarService pulsar) throws Exception;
    4. }

    部署

    To use your custom schema storage implementation, perform the following steps.

    1. 将实现打包到 文件中。

    2. 将 JAR 文件添加到 Pulsar 二进制或 source 分布中的 lib 文件夹中。

    3. 启动 Pulsar。