Manage schema

    如果某个 schema 通过了 schema 兼容性检查,Pulsar producer 就会自动将此 schema 更新为 topic 默认创建的 schema。

    Producer 会在以下情况中执行(自动更新):

    • If a topic doesn’t have a schema, Pulsar registers a schema automatically.

    • If a topic has a schema:

      • If a producer doesn’t carry a schema:

        • If isSchemaValidationEnforced or schemaValidationEnforced is disabled in the namespace to which the topic belongs, the producer is allowed to connect to the topic and produce data.

        • If isSchemaValidationEnforced or schemaValidationEnforced is enabled in the namespace to which the topic belongs, the producer is rejected and disconnected.

      • If a producer carries a schema:

        Broker 根据 topic 所属命名空间中已配置的兼容性检查策略执行兼容性检查。

        • 已注册 schema,producer 将连接到 broker。

        • 未注册 schema:

          • If isAllowAutoUpdateSchema sets to false, the producer is rejected to connect to a broker.

          • If isAllowAutoUpdateSchema sets to true:

            • Schema 未通过兼容性检查,则 broker 不会注册 schema,并且 producer 连接到 broker 的请求被拒。

    Consumer 的自动更新

    Consumer 会在以下情况中AutoUpdate(自动更新):

    • If a consumer connects to a topic without a schema (which means the consumer receiving raw bytes), the consumer can connect to the topic successfully without doing any compatibility check.

    • If a consumer connects to a topic with a schema.

      • Topic 中不同时包含 schema / 数据 / 本地 consumer 和本地 producer:

        • If isAllowAutoUpdateSchema sets to true, then the consumer registers a schema and it is connected to a broker.

        • If isAllowAutoUpdateSchema sets to false, then the consumer is rejected to connect to a broker.

      • Topic 中包含 schema / 数据 / 本地 consumer 和本地 producer 中的一个,则执行 schema 兼容性检查。

        • Schema 通过兼容性检查,则 consumer 连接到 broker。

        • Schema 未通过兼容性检查,则拒绝 consumer 连接到 broker。

    Consumer 自动更新

    管理自动更新策略

    可以使用 pulsar-admin 命令来管理 自动更新 策略,如下所示:

    启用自动更新

    可以使用 pulsar-admin命令在命名空间上启用 AutoUpdate

    禁用自动更新

    要在名称空间上禁用AutoUpdate,可以使用pulsar-admin命令。

    1. bin/pulsar-admin namespaces set-is-allow-auto-update-schema --disable tenant/namespace

    禁用AutoUpdate后,你只能使用pulsar-admin命令注册新的架构。

    调整兼容性

    要调整名称空间上的架构兼容性级别,可以使用pulsar-admin命令。

    1. bin/pulsar-admin namespaces set-schema-compatibility-strategy --compatibility <compatibility-level> tenant/namespace

    Schema 验证

    By default, schemaValidationEnforced is disabled for producers:

    • 没有 schema 的 producer 可以使用 schema 向 topic 中生成任何类型的消息,这可能会导致 topic 中有垃圾数据。

    • 这样,不支持 schema 的非 Java 语言客户端就可以使用 schema 向 topic 生成消息。

    但是,如果你希望对具有架构的主题有更强的保证,则可以在整个群集中或在每个命名空间的基础上启用schemaValidationEnforced

    启用 Schema 验证

    要在名称空间上启用schemaValidationEnforced,可以使用pulsar-admin命令。

    1. bin/pulsar-admin namespaces set-schema-validation-enforce --enable tenant/namespace

    禁用 Schema 验证

    要在名称空间上禁用schemaValidationEnforced,可以使用pulsar-admin命令。

    1. bin/pulsar-admin namespaces set-schema-validation-enforce --disable tenant/namespace

    若要管理架构,可以使用以下方法之一。

    To upload (register) a new schema for a topic, you can use one of the following methods.

    Admin CLI

    REST API

    Java Admin API

    Use the upload subcommand.

    1. $ pulsar-admin schemas upload --filename <schema-definition-file> <topic-name>
    1. { "type": "<schema-type>", "schema": "<an-utf8-encoded-string-of-schema-definition-data>", "properties": {} // the properties associated with the schema}

    The schema-definition-file includes the following fields:

    字段说明

    type

    Schema 类型。

    schema

    Schema 定义数据,编码格式为 UTF 8 字符集。

    • 如果 schema 是 privitive schema,则此字段应为空。

    • 如果 schema 为 struct schema,则此字段应为 Avro schema 定义的 JSON 字符串。

    properties

    与 schema 相关的其他属性。

    Here are examples of the schema-definition-file for a JSON schema.

    示例 1

    1. { "type": "JSON", "schema": "{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"com.foo\",\"fields\":[{\"name\":\"file1\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"file2\",\"type\":\"string\",\"default\":null},{\"name\":\"file3\",\"type\":[\"null\",\"string\"],\"default\":\"dfdf\"}]}", "properties": {}}

    示例 2

    1. { "type": "STRING", "schema": "", "properties": { "key1": "value1" }}

    向此端点发送POST请求:POST /admin/v2/schemas/:tenant/:namespace/:topic/schema

    The post payload is in JSON format.

    The post payload includes the following fields:

    字段说明

    type

    Schema 类型。

    schema

    Schema 定义数据,编码格式为 UTF 8 字符集。

    • 如果 schema 是 privitive schema,则此字段应为空。

    • 如果 schema 为 struct schema,则此字段应为 Avro schema 定义的 JSON 字符串。

    properties

    与 schema 相关的其他属性。

    The PostSchemaPayload includes the following fields:

    Here is an example of PostSchemaPayload:

    1. PulsarAdmin admin = …;PostSchemaPayload payload = new PostSchemaPayload();payload.setType("INT8");payload.setSchema("");admin.createSchema("my-tenant/my-ns/my-topic", payload);

    获取 Schema(最新版本)

    To get the latest schema for a topic, you can use one of the following methods.

    Admin CLI

    REST API

    Java Admin API

    使用 get 子命令。

    1. $ pulsar-admin schemas get <topic-name>{ "version": 0, "type": "String", "timestamp": 0, "data": "string", "properties": { "property1": "string", "property2": "string" }}

    向此端点发送GET请求:GET /admin/v2/schemas/:tenant/:namespace/:topic/schema

    Here is an example of a response, which is returned in JSON format.

    1. { "version": "<the-version-number-of-the-schema>", "type": "<the-schema-type>", "timestamp": "<the-creation-timestamp-of-the-version-of-the-schema>", "data": "<an-utf8-encoded-string-of-schema-definition-data>", "properties": {} // the properties associated with the schema}

    The response includes the following fields:

    字段说明

    version

    Schema 版本,是一个长数。

    type

    Schema 类型。

    timestamp

    创建此版本 schema 的时间戳。

    data

    Schema 定义数据,编码格式为 UTF 8 字符集。

    • 如果 schema 是 privitive schema,则此字段应为空。

    • 如果 schema 为 struct schema,则此字段应为 Avro schema 定义的 JSON 字符串。

    properties

    与 schema 相关的其他属性。
    1. SchemaInfo createSchema(String topic)

    The SchemaInfo includes the following fields:

    字段说明

    name

    Schema 名称。

    type

    Schema 类型。

    schema

    Schema 定义数据的字节数组,以 UTF 8 字符集编码。

    如果该 schema 是“primitive” schema,则此字节数组应为空。

    如果该 schema 是 struct ** schema,则此字段应为由 Avro schema 定义的 JSON 字符串转换成的字节数组。

    1. properties
      </td>
      <td> schema 相关的其他属性。</td>

    Here is an example of SchemaInfo:

    1. PulsarAdmin admin = …;SchemaInfo si = admin.getSchema("my-tenant/my-ns/my-topic");

    获取 schema(详细信息)

    To get a specific version of a schema, you can use one of the following methods.

    Admin CLI

    REST API

    Java Admin API

    使用 get 子命令。

    1. $ pulsar-admin schemas get <topic-name> --version=<version>

    向 Schema 端点发送GET请求: GET /admin/v2/schemas/:tenant/:namespace/:topic/schema/:version

    1. { "version": "<the-version-number-of-the-schema>", "type": "<the-schema-type>", "timestamp": "<the-creation-timestamp-of-the-version-of-the-schema>", "data": "<an-utf8-encoded-string-of-schema-definition-data>", "properties": {} // the properties associated with the schema}

    The response includes the following fields:

      The SchemaInfo includes the following fields:

      字段说明

      name

      Schema 名称。

      type

      Schema 类型。

      schema

      Schema 定义数据的字节数组以 UTF 8 字符集编码。

      如果该 schema 是“primitive” schema,则此字节数组应为空。

      如果该 schema 是 struct ** schema,则此字段应为由 Avro schema 定义的 JSON 字符串转换成的字节数组。

      Here is an example of SchemaInfo:

      解压缩 schema

      To provide a schema via a topic, you can use the following method.

      Admin CLI

      Use the extract subcommand.

      1. $ pulsar-admin schemas extract --classname <class-name> --jar <jar-path> --type <type-name>

      To delete a schema for a topic, you can use one of the following methods.

      Admin CLI

      REST API

      Java Admin API

      使用 delete 子命令。

      1. $ pulsar-admin schemas delete <topic-name>

      向 Schema 端点发送DELETE请求:DELETE /admin/v2/schemas/:tenant/:namespace/:topic/schema

      Here is an example of a response, which is returned in JSON format.

      1. { "version": "<the-latest-version-number-of-the-schema>",}

      The response includes the following field:

      字段说明
      versionSchema 版本,是一个长数。
      1. void deleteSchema(String topic)

      Here is an example of deleting a schema.

      1. PulsarAdmin admin = …;admin.deleteSchema("my-tenant/my-ns/my-topic");

      By default, Pulsar stores various data types of schemas in deployed alongside Pulsar.

      However, you can use another storage system if needed.

      实现

      To use a non-default (non-BookKeeper) storage system for Pulsar schemas, you need to implement the following Java interfaces:

      SchemaStorage 接口

      SchemaStorage接口包含以下方法:

      1. public interface SchemaStorage {
      2. // 如何更新schema
      3. CompletableFuture<SchemaVersion> put(String key, byte[] value, byte[] hash);
      4. // 如何从存储中获取schema
      5. CompletableFuture<StoredSchema> get(String key, SchemaVersion version);
      6. // 如何删除schema
      7. CompletableFuture<SchemaVersion> delete(String key);
      8. // 用户将schema字节数据转换为SchemaVersion 对象的工具方法
      9. SchemaVersion versionFromBytes(byte[] version);
      10. // 启动schema存储客户端
      11. void start() throws Exception;
      12. // 关闭schema存储客户端
      13. void close() throws Exception;
      14. }

      SchemaStorageFactory接口

      The SchemaStorageFactory interface has the following method:

      1. public interface SchemaStorageFactory {
      2. @NotNull
      3. SchemaStorage create(PulsarService pulsar) throws Exception;
      4. }

      部署

      To use your custom schema storage implementation, perform the following steps.

      1. 将实现打包到 JAR 文件中。

      2. 将 JAR 文件添加到 Pulsar 二进制包或源码中的 lib 文件夹。

      3. broker.conf 中的 schemaRegistryStorageClassName 配置更改为自定义工厂类。

      4. 启动 Pulsar。

      You can set at namespace or broker level.

      • If you set schema compatibility check strategy at both namespace or broker level, it uses the strategy set for the namespace level.

      • If you do not set schema compatibility check strategy at both namespace or broker level, it uses the FULL strategy.

      • If you set schema compatibility check strategy at broker level rather than namespace level, it uses the strategy set for the broker level.

      • If you set schema compatibility check strategy at namespace level rather than broker level, it uses the strategy set for the namespace level.

      命名空间(Namespace)

      You can set schema compatibility check strategy at namespace level using one of the following methods.

      pulsar-admin

      REST API

      Java

      Use the command.

      1. pulsar-admin namespaces set-schema-compatibility-strategy options

      Send a PUT request to this endpoint: PUT /admin/v2/namespaces/:tenant/:namespace

      Use the method.

      You can set schema compatibility check strategy at broker level by setting schemaCompatibilityStrategy in broker.conf or file.

      1. schemaCompatibilityStrategy=ALWAYS_INCOMPATIBLE