V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
InDom
V2EX  ›  Logstash

使用 logstash 消费 avro 数据出现部分字节被替换为 ef bf bd 导致解析出错。

  •  
  •   InDom · 232 天前 · 508 次点击
    这是一个创建于 232 天前的主题,其中的信息可能已经有所发展或是发生改变。

    消费渠道 kafka 与 redis 均出现相同问题,直接以 plain => { charset => "BINARY" } 依然出现此问题。

    avro 数据原文( 16 进制)

    00000000: 00 80 40 00 cc e1 85 98 0e 00 3a 32 30 32 34 2d  ..@.......:2024-
    00000010: 30 34 2d 31 38 54 30 36 3a 33 32 3a 30 34 2e 30  04-18T06:32:04.0
    00000020: 30 30 2b 30 38 3a 30 30                          00+08:00
    

    logstash 获取到的内容

    00000000: 00 ef bf bd 40 00 ef bf bd ef bf bd ef bf bd ef  ....@...........
    00000010: bf bd 0e 00 3a 32 30 32 34 2d 30 34 2d 31 38 54  ....:2024-04-18T
    00000020: 30 36 3a 33 32 3a 30 34 2e 30 30 30 2b 30 38 3a  06:32:04.000+08:
    00000030: 30 30 0a                                         00.
    

    logstash 是通过 docker 起的 8.13.0 ,以下是容器相关配置

    docker-compose.yml

      logstash:
        image: logstash:8.13.0
        volumes:
          - /root/kafka-2-es/logstash.conf:/usr/share/logstash/pipeline/logstash.conf
        environment:
          - "XPACK_MONITORING_ENABLED=false"
          - "KAFKA_GROUP_ID"
    

    logstash.conf

    input {
      kafka {
        bootstrap_servers => "kafka:9092"
        topics => ["urllog-test-01"]
        codec => plain {
          charset => "BINARY"
        }
        group_id => "${KAFKA_GROUP_ID}"
        auto_offset_reset => "earliest"
      }
      redis {
        host => "192.168.4.101"
        port => 6379
        password => "Fle7YuG22qIh7ZNPkceopo3oZb1UFZrX"
        data_type => "list"
        key => "urllog-test-01"
        codec => plain {
          charset => "BINARY"
        }
      }
    }
    
    output {
      stdout {
        codec => rubydebug
      }
      elasticsearch {
        hosts => ["https://192.168.1.4:9200"]
        index => "urllog-test-01"
        user => "elastic"
        password => "123456"
        ssl => true
        ssl_certificate_verification => false
        manage_template => true
      }
    }
    

    已经确认的是:

    1. 使用 go 从 kafka 中消费获取到的二进制内容是正确的。
    2. 使用 logstash 从 kafka 与 redis 消费相同的内容,均出现特殊字符被替换为 ef bf bd 的情况
    3. 使用 kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic urllog-test-01 --from-beginning >> /tmp/data-1.txt 直接从 kafka 中得到的内容是正确的

    有没有大佬知道是这么回事,给指条路呗?万分感谢。

    第 1 条附言  ·  231 天前

    果真是不发帖就找不到答案,官方文档没找到任何有用的线索,搜索关键词也找不到答案,通过遍历所有的 github 上相关的 issue,被发现了一个早前忽视的东西。

    https://github.com/revpoint/logstash-codec-avro_schema_registry?tab=readme-ov-file#basic-usage-with-kafka-input-and-output

    在这里看到了一个选项: value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"

    一周前就看到了这个库,但没有时间仔细阅读代码段,不然一周前就应该解决这个问题了。

    最后贴一下结果,希望下一个“我”可以看到。

    input {
      kafka {
        bootstrap_servers => "kafka:9092"
        topics => ["test-01"]
        codec => avro {
          schema_uri => "/app/test.avsc"
          tag_on_failure => true
          encoding => binary
        }
        value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
        group_id => "${KAFKA_GROUP_ID}"
        auto_offset_reset => "earliest"
      }
    }
    output {
      stdout {
        codec => rubydebug
      }
    }
    

    我的一周啊,因为这个破事,甚至险些破坏了我的五一假期。

    1 条回复    2024-05-07 11:22:31 +08:00
    InDom
        1
    InDom  
    OP
       231 天前
    另外,贴一个有意思的东西,相同的问题,解决前和解决后。





    对,已经指向这个帖子了。
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   3096 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 20ms · UTC 12:57 · PVG 20:57 · LAX 04:57 · JFK 07:57
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.