Pulsar Encryption

    Pulsar使用动态生成的对称AES秘钥来加密消息(数据)。 你可以使用应用程序提供的 ECDSA/RSA 密钥对来加密 AES 密钥(数据密钥), 所以你不必与大家分享秘密。

    Key is a public and private key pair used for encryption or decryption. The producer key is the public key of the key pair, and the consumer key is the private key of the key pair.

    应用程序用公钥配置生产者。 你可以使用此密钥来加密 AES 数据密钥。 加密数据秘钥作为消息头部的一部分发送。 只有拥有私钥的实体 (在这种情况下是消费者) 才能解密用于解密消息的数据密钥。

    You can encrypt a message with more than one key. Any one of the keys used for encrypting the message is sufficient to decrypt the message.

    Pulsar does not store the encryption key anywhere in the Pulsar service. If you lose or delete the private key, your message is irretrievably lost, and is unrecoverable.

    生产者(Producer)

    alt text

    开始

    1. 将公钥和私钥添加到秘钥管理中,并且配置你的生产者去得到公钥,消费者去得到私钥。

    2. 向生产者 builder 添加加密密钥名称: PulsarClient.newProducer().addEncryptionKey(“myapp.key”)。

    3. 配置 到生产者、消费者或观察者。

    Java

    C++

    Python

    Node.JS

    1. PulsarClient pulsarClient = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();String topic = "persistent://my-tenant/my-ns/my-topic";// RawFileKeyReader 只是 Pulsar 未提供的示例实现CryptoKeyReader keyReader = new RawFileKeyReader("test_ecdsa_pubkey.pem", "test_ecdsa_privkey.pem");Producer<byte[]> producer = pulsarClient.newProducer() .topic(topic) .cryptoKeyReader(keyReader) .addEncryptionKey(“myappkey”) .create();Consumer<byte[]> consumer = pulsarClient.newConsumer() .topic(topic) .subscriptionName("my-subscriber-name") .cryptoKeyReader(keyReader) .subscribe();Reader<byte[]> reader = pulsarClient.newReader() .topic(topic) .startMessageId(MessageId.earliest) .cryptoKeyReader(keyReader) .create();
    1. from pulsar import Client, CryptoKeyReaderclient = Client('pulsar://localhost:6650')topic = 'persistent://my-tenant/my-ns/my-topic'# CryptoKeyReader 是一个从文件中读取公钥和私钥的内置实现key_reader = CryptoKeyReader('test_ecdsa_pubkey.pem', 'test_ecdsa_privkey.pem')producer = client.create_producer( topic=topic, encryption_key='myappkey', crypto_key_reader=key_reader)consumer = client.subscribe( topic=topic, subscription_name='my-subscriber-name', crypto_key_reader=key_reader)reader = client.create_reader( topic=topic, start_message_id=MessageId.earliest, crypto_key_reader=key_reader)client.close()
    1. Below is an example of a customized CryptoKeyReader implementation.

    Java

    C++

    Node.JS

    1. class RawFileKeyReader implements CryptoKeyReader { String publicKeyFile = ""; String privateKeyFile = ""; RawFileKeyReader(String pubKeyFile, String privKeyFile) { publicKeyFile = pubKeyFile; privateKeyFile = privKeyFile; } @Override public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) { EncryptionKeyInfo keyInfo = new EncryptionKeyInfo(); try { keyInfo.setKey(Files.readAllBytes(Paths.get(publicKeyFile))); } catch (IOException e) { System.out.println("ERROR: Failed to read public key from file " + publicKeyFile); e.printStackTrace(); } return keyInfo; } @Override public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) { EncryptionKeyInfo keyInfo = new EncryptionKeyInfo(); try { keyInfo.setKey(Files.readAllBytes(Paths.get(privateKeyFile))); } catch (IOException e) { System.out.println("ERROR: Failed to read private key from file " + privateKeyFile); e.printStackTrace(); } return keyInfo; }}

    Besides, you can use the default implementation of by specifying the paths of private key and public key.

    Currently, customized CryptoKeyReader implementation is not supported in Python. However, you can use the default implementation by specifying the path of private key and .

    Currently, customized CryptoKeyReader implementation is not supported in Node.JS. However, you can use the default implementation by specifying the path of private key and public key.

    Pulsar 每隔 4 小时或在发布一定数量的消息后生成一个新的 AES 数据密钥。 生产者通过调用 CryptoKeyReader.getPublicKey() 获取最新版本,以此来每隔 4 小时获取一次不对称公钥。

    在生产者应用程序中启用加密

    如果生产和消费的程序不是同一个, 你需要确保消费者能够访问能够解密消息的私钥。 You can do this in two ways:

    1. 你授予访问权限给生产者使用的密钥对中的其中一个私钥。

    When producers want to encrypt the messages with multiple keys, producers add all such keys to the config. Consumer can decrypt the message as long as the consumer has access to at least one of the keys.

    如果你需要使用两个键(myapp.messagekey1和myapp.messagekey2)加密消息,请参阅下面的例子。

    1. PulsarClient.newProducer().addEncryptionKey("myapp.messagekey1").addEncryptionKey("myapp.messagekey2");

    处理失败

    • 生产者/消费者无法访问秘钥
      • 生产者操作失败指明失败的原因。 在这种情况下,应用可以选择继续发送未加密的消息。 调用 PulsarClient.newProducer().cryptoFailureAction(ProducerCryptoFailureAction) 控制生产者的行为。 默认的行为是请求失败。
      • 如果消费因解密失败或消费者缺失密钥而失败,应用程序可以选择消费加密的消息或丢弃它。 调用 PulsarClient.newConsumer().cryptoFailureAction(ConsumerCryptoFailureAction) 控制消费者的行为。 默认的行为是请求失败。 如果私钥永久丢失,应用程序永远无法解密消息。
    • 批量消息
      • 如果解密失败且消息包含批量消息,客户端会无法在批次中检索单独的消息, 因此,即使将 cryptoFailureAction() 设置为 ConsumerCryptoFailureAction.CONSUME,消息消费还是会失败。
    • 如果解密失败,消息消费会停止,应用程序除了在客户端日志中记录解密失败消息外还会通知积压增加。 如果应用程序不能访问私钥来解密消息,唯一的选项是跳过或丢弃已经积压的消息。