+-
Kafka Protobuf:C序列化到java
我开发了一些C应用程序来生成和使用Kafka消息(使用cppkafka)嵌入Protobuf3消息.两者都很好.制片人的相关代码是:

std::string kafkaString;
cppkafka::MessageBuilder *builder;
...
solidList->SerializeToString(&kafkaString);
builder->payload(kafkaString);

Protobuf对象被序列化为字符串并作为Kafka有效负载插入.到目前为止,一切正常.现在,我正在尝试用Java开发一个消费者.相关代码应为:

KafkaConsumer<Long, String> consumer=new KafkaConsumer<Long, String>(properties);
....
ConsumerRecords<Long, String> records = consumer.poll(100);
  for (ConsumerRecord<Long, String> record : records) {
    SolidList solidList = SolidList.parseFrom(record.value());
    ...

但是在编译时失败了:parseFrom抱怨:Solidlist.SolidList类型中的方法parseFrom(ByteBuffer)不适用于参数(String).所以,我尝试使用ByteBuffer:

KafkaConsumer<Long, ByteBuffer> consumer=new KafkaConsumer<Long, ByteBuffer>(properties);
....
ConsumerRecords<Long, ByteBuffer> records = consumer.poll(100);
  for (ConsumerRecord<Long, ByteBuffer> record : records) {
    SolidList solidList = SolidList.parseFrom(record.value());
    ...

现在,错误是执行时间,仍然在parseFrom()上:线程“main”中的异常java.lang.ClassCastException:java.lang.String无法强制转换为java.nio.ByteBuffer.我知道这是一个java.lang.String !!!所以,我回到原来的,并尝试将其用作字节数组:

    SolidList solidList = SolidList.parseFrom(record.value().getBytes());

现在,错误是执行时间:线程“main”中的异常com.google.protobuf.InvalidProtocolBufferException $InvalidWireTypeException:协议消息标记具有无效的线类型..

protobuf文档说明了C序列化:bool SerializeToString(string output)const;:序列化消息并将字节存储在给定的字符串中.请注意,字节是二进制的,而不是文本;我们只使用字符串类作为方便的容器.*

TL; DR:因此,我应该如何解释Java中的protobuf C“二进制字节”?

这似乎是相关的(相反)但没有帮助:Protobuf Java To C++ Serialization [Binary]

提前致谢.

最佳答案
尝试实现 Deserializer并将其作为值反序列化器传递给 KafkaConsumer构造函数.它可能看起来像这样:

class SolidListDeserializer implements Deserializer<SolidList> {
  public SolidList deserialize(final String topic, byte[] data) {
    return SolidList.parseFrom(data);
  }
  ...
}

...

KafkaConsumer<Long, SolidList> consumer = new KafkaConsumer<>(props, new LongDeserializer(), new SolidListDeserializer())
点击查看更多相关文章

转载注明原文:Kafka Protobuf:C序列化到java - 乐贴网