大数据组有个需求,需要将两台cvm之前落盘的历史日志文件迁移到大数据集群上,一共600多个G,数据量不是很大,想着直接使用filebeat采集到kafka上让他们消费即可。中途遇到日志无法写入到kafka topic的问题,但是在机器上用脚本测试数据是能通过filebeat写入kafka的,Mark一下。遇到的报错如下:
Feb 28 16:48:57 chegva_centos filebeat[26009]: 2024-02-28T16:48:57.355+0800 DEBUG [input] log/input.go:222 Start next scan {"input_id": "9d00bad2-0141-4f38-ab9f-39b9dd20e536"} Feb 28 16:48:57 chegva_centos filebeat[26009]: 2024-02-28T16:48:57.355+0800 DEBUG [input] log/input.go:472 Check file for harvesting: /root/chegva.com.2024022816-16.log {"input_id": "9d00bad2-0141-4f38-ab9f-39b9dd20e536"} Feb 28 16:48:57 chegva_centos filebeat[26009]: 2024-02-28T16:48:57.355+0800 DEBUG [input] log/input.go:570 Update existing file for harvesting: /root/chegva.com.2024022816-16.log, offset: 736 {"input_id": "9d00bad2-0141-4f38-ab9f-39b9dd20e536", "source": "/root/chegva.com.2024022816-16.log", "state_id": "native::475592-64769", "finished": false, "os_id": "475592-64769", "old_source": "/root/chegva.com.2024022816-16.log", "old_finished": false, "old_os_id": "475592-64769"} Feb 28 16:48:57 chegva_centos filebeat[26009]: 2024-02-28T16:48:57.355+0800 DEBUG [input] log/input.go:623 Harvester for file is still running: /root/chegva.com.2024022816-16.log {"input_id": "9d00bad2-0141-4f38-ab9f-39b9dd20e536", "source": "/root/chegva.com.2024022816-16.log", "state_id": "native::475592-64769", "finished": false, "os_id": "475592-64769", "old_source": "/root/chegva.com.2024022816-16.log", "old_finished": false, "old_os_id": "475592-64769"} Feb 28 16:48:57 chegva_centos filebeat[26009]: 2024-02-28T16:48:57.355+0800 DEBUG [input] log/input.go:286 input states cleaned up. Before: 1, After: 1, Pending: 0 {"input_id": "9d00bad2-0141-4f38-ab9f-39b9dd20e536"} Feb 28 16:49:01 chegva_centos filebeat[26009]: 2024-02-28T16:49:01.396+0800 DEBUG [kafka] kafka/client.go:371 finished kafka batch Feb 28 16:49:01 chegva_centos filebeat[26009]: 2024-02-28T16:49:01.396+0800 DEBUG [kafka] kafka/client.go:385 Kafka publish failed with: kafka: client has run out of available brokers to talk to (Is your cluster reachable?) Feb 28 16:49:01 chegva_centos filebeat[26009]: 2024-02-28T16:49:01.396+0800 INFO [publisher] pipeline/retry.go:219 retryer: send unwait signal to consumer Feb 28 16:49:01 chegva_centos filebeat[26009]: 2024-02-28T16:49:01.396+0800 INFO [publisher] pipeline/retry.go:223 done Feb 28 16:49:02 chegva_centos filebeat[26009]: 2024-02-28T16:49:02.151+0800 ERROR [kafka] kafka/client.go:317 Kafka (topic=TOPIC_TEST_LOG): kafka: client has run out of available brokers to talk to (Is your cluster reachable?) 1.Dropping event: no topic could be selected 这个问题是没有找到kafka topic,改filebeat配置文件即可 2.kafka/client.go:317 Kafka (topic=TOPIC_TEST_LOG): kafka: client has run out of available brokers to talk to (Is your cluster reachable?) 这个问题起初我以为是filebeat版本与kafka不匹配,特意将filebeat从7.14升级到7.17,还是报错 最后在output.kafka中配置上kafka版本后解决:version: 0.10.2.1 腾讯云kafka不加version配置时的kafka版本默认是1.1.1,之前在别的地方使用没设置version版本也能正常写入,这个实例版本比1.1.1要低,坑了一把 官方说明:https://www.elastic.co/guide/en/beats/filebeat/7.17/kafka-output.html#_version_3 versionedit Kafka version filebeat is assumed to run against. Defaults to 1.0.0. Valid values are all kafka releases in between 0.8.2.0 and 2.0.0. See Compatibility for information on supported versions.
安装kafka-python:
pip3 install -i https://pypi.tuna.tsinghua.edu.cn/simple kafka-python
kafka-producer.py 脚本:
from kafka import KafkaProducer import json import time producer = KafkaProducer(bootstrap_servers='10.2.3.4:9092', value_serializer=lambda v: json.dumps(v).encode('utf-8')) for i in range(10): print(f"发送数据{i}") data = {'message': 'Test message ' + str(i)} producer.send('TOPIC_TEST_LOG', value=data) time.sleep(1) ###################### # 输入账号名/密码连接: from kafka import KafkaProducer import json import time producer = KafkaProducer(bootstrap_servers='152.xx.xx.xx:9092', security_protocol='SASL_PLAINTEXT', sasl_mechanism='PLAIN', sasl_plain_username='anzhihe', sasl_plain_password='chegva.com', value_serializer=lambda v: json.dumps(v).encode('utf-8')) for i in range(10): print(f"发送数据{i}") data = {'message': 'Test message ' + str(i)} producer.send('chegva_op_test', value=data) time.sleep(1)
kafka-consumer.py 脚本:
from kafka import KafkaConsumer from json def forgiving_json_deserializer(v): if v is None: return None try: return json.loads(v.decode('utf-8')) except json.decoder.JSONDecodeError: print(f'Unable to decode: {v}', ) return None consumer = KafkaConsumer( 'TOPIC_TEST_LOG', bootstrap_servers=['10.2.3.4:9092'], auto_offset_reset='earliest', enable_auto_commit=True, group_id='local_test1', value_deserializer=forgiving_json_deserializer ) for message in consumer: print(message.value) ###################### # 输入账号名/密码连接: consumer = KafkaConsumer( 'TOPIC_TEST_LOG', bootstrap_servers=['10.2.3.4:9092'], security_protocol='SASL_PLAINTEXT', sasl_mechanism='PLAIN', sasl_plain_username='anzhihe', sasl_plain_password='chegva.com', auto_offset_reset='earliest', enable_auto_commit=True, group_id='local_test1', value_deserializer=forgiving_json_deserializer )
接下来理下整个过程,首先在centos上安装并配置filebeat:
# yum安装filebeat yum -y install https://mirrors.tuna.tsinghua.edu.cn/elasticstack/7.x/yum/7.17.9/filebeat-7.17.9-x86_64.rpm vim /etc/filebeat/filebeat.yml # 配置日志收集 filebeat.inputs: - type: log enabled: true encoding: utf-8 tail_files: true paths: - /data/www/test/logs/chegva.com.*.log fields: type: TOPIC_TEST_LOG - type: log enabled: true encoding: utf-8 tail_files: true paths: - /data/www/sdk/logs/chegva.com.*.log fields: type: TOPIC_SDK_LOG #logging.level: debug # ============================== kafka ================================ output.kafka: version: 0.10.2.1 enabled: true hosts: ["10.2.3.4:9092"] partition.round_robin: reachable_only: false required_acks: 1 compression: none max_message_bytes: 1000000 topic: '%{[fields.type]}'
检察filebeat配置文件及输出连接性:
# 使用以下命令来检查Filebeat配置文件是否正常(默认/etc/filebeat/filebeat.yml) filebeat test config # 指定配置文件路径 filebeat test config -e -c /etc/filebeat/filebeat2.yml # 使用以下命令来测试输出块,以验证Filebeat是否能够连接到Elasticsearch实例或Kafka代理 filebeat test output filebeat test output -e -c /etc/filebeat/filebeat2.yml # 输出如下: Kafka: 10.2.3.4:9092... parse host... OK dns lookup... OK addresses: 10.2.3.4 dial up... OK
filebeat服务启停命令:
systemctl enable filebeat systemctl start filebeat systemctl status -l filebeat journalctl -r -u filebeat # 默认日志在 /var/log/filebeat/目录下 # 排查问题时可以在配置文件中把Debug日志打开:logging.level: debug
参考: