filebeat写入数据到kafka topic失败问题排查

大数据组有个需求,需要将两台cvm之前落盘的历史日志文件迁移到大数据集群上,一共600多个G,数据量不是很大,想着直接使用filebeat采集到kafka上让他们消费即可。中途遇到日志无法写入到kafka topic的问题,但是在机器上用脚本测试数据是能通过filebeat写入kafka的,Mark一下。遇到的报错如下:

Feb 28 16:48:57 chegva_centos filebeat[26009]: 2024-02-28T16:48:57.355+0800        DEBUG        [input]        log/input.go:222        Start next scan        {"input_id": "9d00bad2-0141-4f38-ab9f-39b9dd20e536"}
Feb 28 16:48:57 chegva_centos filebeat[26009]: 2024-02-28T16:48:57.355+0800        DEBUG        [input]        log/input.go:472        Check file for harvesting: /root/chegva.com.2024022816-16.log        {"input_id": "9d00bad2-0141-4f38-ab9f-39b9dd20e536"}
Feb 28 16:48:57 chegva_centos filebeat[26009]: 2024-02-28T16:48:57.355+0800        DEBUG        [input]        log/input.go:570        Update existing file for harvesting: /root/chegva.com.2024022816-16.log, offset: 736        {"input_id": "9d00bad2-0141-4f38-ab9f-39b9dd20e536", "source": "/root/chegva.com.2024022816-16.log", "state_id": "native::475592-64769", "finished": false, "os_id": "475592-64769", "old_source": "/root/chegva.com.2024022816-16.log", "old_finished": false, "old_os_id": "475592-64769"}
Feb 28 16:48:57 chegva_centos filebeat[26009]: 2024-02-28T16:48:57.355+0800        DEBUG        [input]        log/input.go:623        Harvester for file is still running: /root/chegva.com.2024022816-16.log        {"input_id": "9d00bad2-0141-4f38-ab9f-39b9dd20e536", "source": "/root/chegva.com.2024022816-16.log", "state_id": "native::475592-64769", "finished": false, "os_id": "475592-64769", "old_source": "/root/chegva.com.2024022816-16.log", "old_finished": false, "old_os_id": "475592-64769"}
Feb 28 16:48:57 chegva_centos filebeat[26009]: 2024-02-28T16:48:57.355+0800        DEBUG        [input]        log/input.go:286        input states cleaned up. Before: 1, After: 1, Pending: 0        {"input_id": "9d00bad2-0141-4f38-ab9f-39b9dd20e536"}
Feb 28 16:49:01 chegva_centos filebeat[26009]: 2024-02-28T16:49:01.396+0800        DEBUG        [kafka]        kafka/client.go:371        finished kafka batch
Feb 28 16:49:01 chegva_centos filebeat[26009]: 2024-02-28T16:49:01.396+0800        DEBUG        [kafka]        kafka/client.go:385        Kafka publish failed with: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)
Feb 28 16:49:01 chegva_centos filebeat[26009]: 2024-02-28T16:49:01.396+0800        INFO        [publisher]        pipeline/retry.go:219        retryer: send unwait signal to consumer
Feb 28 16:49:01 chegva_centos filebeat[26009]: 2024-02-28T16:49:01.396+0800        INFO        [publisher]        pipeline/retry.go:223          done
Feb 28 16:49:02 chegva_centos filebeat[26009]: 2024-02-28T16:49:02.151+0800        ERROR        [kafka]        kafka/client.go:317        Kafka (topic=TOPIC_TEST_LOG): kafka: client has run out of available brokers to talk to (Is your cluster reachable?)

1.Dropping event: no topic could be selected
这个问题是没有找到kafka topic,改filebeat配置文件即可

2.kafka/client.go:317  Kafka (topic=TOPIC_TEST_LOG): kafka: client has run out of available brokers to talk to (Is your cluster reachable?)
这个问题起初我以为是filebeat版本与kafka不匹配,特意将filebeat从7.14升级到7.17,还是报错
最后在output.kafka中配置上kafka版本后解决:version: 0.10.2.1
腾讯云kafka不加version配置时的kafka版本默认是1.1.1,之前在别的地方使用没设置version版本也能正常写入,这个实例版本比1.1.1要低,坑了一把
官方说明:https://www.elastic.co/guide/en/beats/filebeat/7.17/kafka-output.html#_version_3
versionedit
Kafka version filebeat is assumed to run against. Defaults to 1.0.0.
Valid values are all kafka releases in between 0.8.2.0 and 2.0.0.
See Compatibility for information on supported versions.

安装kafka-python:

pip3 install -i https://pypi.tuna.tsinghua.edu.cn/simple kafka-python

kafka-producer.py 脚本:

from kafka import KafkaProducer
import json
import time

producer = KafkaProducer(bootstrap_servers='10.2.3.4:9092',
                         value_serializer=lambda v: json.dumps(v).encode('utf-8'))

for i in range(10):
    print(f"发送数据{i}")
    data = {'message': 'Test message ' + str(i)}
    producer.send('TOPIC_TEST_LOG', value=data)
    time.sleep(1)
    
######################
# 输入账号名/密码连接:
from kafka import KafkaProducer
import json
import time

producer = KafkaProducer(bootstrap_servers='152.xx.xx.xx:9092',
                         security_protocol='SASL_PLAINTEXT',
                         sasl_mechanism='PLAIN',
                         sasl_plain_username='anzhihe',
                         sasl_plain_password='chegva.com',
                         value_serializer=lambda v: json.dumps(v).encode('utf-8'))

for i in range(10):
    print(f"发送数据{i}")
    data = {'message': 'Test message ' + str(i)}
    producer.send('chegva_op_test', value=data)
    time.sleep(1)

kafka-consumer.py 脚本:

from kafka import KafkaConsumer
from json

def forgiving_json_deserializer(v):
    if v is None:
        return None
    try:
        return json.loads(v.decode('utf-8'))
    except json.decoder.JSONDecodeError:
        print(f'Unable to decode: {v}', )
        return None

consumer = KafkaConsumer(
    'TOPIC_TEST_LOG',
    bootstrap_servers=['10.2.3.4:9092'],
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    group_id='local_test1',
    value_deserializer=forgiving_json_deserializer
)

for message in consumer:
    print(message.value)
    
######################
# 输入账号名/密码连接:
consumer = KafkaConsumer(
    'TOPIC_TEST_LOG',
    bootstrap_servers=['10.2.3.4:9092'],
    security_protocol='SASL_PLAINTEXT',
    sasl_mechanism='PLAIN',
    sasl_plain_username='anzhihe',
    sasl_plain_password='chegva.com',
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    group_id='local_test1',
    value_deserializer=forgiving_json_deserializer
)

接下来理下整个过程,首先在centos上安装并配置filebeat:

# yum安装filebeat
yum -y install https://mirrors.tuna.tsinghua.edu.cn/elasticstack/7.x/yum/7.17.9/filebeat-7.17.9-x86_64.rpm

vim /etc/filebeat/filebeat.yml 
# 配置日志收集
filebeat.inputs:
- type: log
  enabled: true
  encoding: utf-8
  tail_files: true
  paths:
    - /data/www/test/logs/chegva.com.*.log
  fields:
    type: TOPIC_TEST_LOG
- type: log
  enabled: true
  encoding: utf-8
  tail_files: true
  paths:
    - /data/www/sdk/logs/chegva.com.*.log
  fields:
    type: TOPIC_SDK_LOG

#logging.level: debug 
# ============================== kafka ================================
output.kafka:
  version: 0.10.2.1
  enabled: true
  hosts: ["10.2.3.4:9092"]
  partition.round_robin:
    reachable_only: false
  required_acks: 1
  compression: none
  max_message_bytes: 1000000
  topic: '%{[fields.type]}'

检察filebeat配置文件及输出连接性:

# 使用以下命令来检查Filebeat配置文件是否正常(默认/etc/filebeat/filebeat.yml)
filebeat test config

# 指定配置文件路径
filebeat test config -e -c /etc/filebeat/filebeat2.yml

# 使用以下命令来测试输出块,以验证Filebeat是否能够连接到Elasticsearch实例或Kafka代理
filebeat test output
filebeat test output -e -c /etc/filebeat/filebeat2.yml

# 输出如下:
Kafka: 10.2.3.4:9092...
  parse host... OK
  dns lookup... OK
  addresses: 10.2.3.4
  dial up... OK

filebeat服务启停命令:

systemctl enable filebeat
systemctl start filebeat
systemctl status -l filebeat
journalctl -r -u filebeat

# 默认日志在 /var/log/filebeat/目录下
# 排查问题时可以在配置文件中把Debug日志打开:logging.level: debug


参考:

anzhihe 安志合个人博客,版权所有 丨 如未注明,均为原创 丨 转载请注明转自:https://chegva.com/5953.html | ☆★★每天进步一点点,加油!★★☆ | 

您可能还感兴趣的文章!

发表评论

电子邮件地址不会被公开。 必填项已用*标注