网站首页 > 技术文章 正文
大家好,我是老李。在日常数据存储和查询时,很多小伙伴都喜欢用ES做索引,很多还把ES当成数据库来用。诚然ES的读写性能非常优秀,但是大家有没有遇到过ES丢数据的问题?也就是说数据库和ES的数据不一致。今天老李正好看在公众号铭毅天下Elasticsearch上看到一篇介绍这个问题的文章,里面的内容写的非常的清楚,把对数据的方法和思路全都理了出来。下面把文章分享给大家,希望能够使大家在日常工作中少踩一点坑。当然了,能用来填坑就更好了。
1、实战线上问题
- Q1:Logstash 同步 postgreSQL 到 Elasticsearch 数据不一致。
在使用 Logstash 从 pg 库中将一张表导入到 ES 中时,发现 ES 中的数据量和 PG 库中的这张表的数据量存在较大差距。如何快速比对哪些数据没有插入?导入过程中,Logstash 日志没有异常。PG 中这张表有 7600W。
- Q2:mq 异步双写数据库、es 的方案中,如何保证数据库数据和 es 数据的一致性?
2、推荐解决方案之一——ID 比较法
如下示例,仅拿问题1举例验证,问题2原理一致。
2.1 方案探讨
要找出哪些数据没有插入到 Elasticsearch 中,可以采用以下方法:
- 确保 Logstash 配置文件中的 input 插件的 JDBC 驱动程序正确配置,以便从 PostgreSQL 数据库中提取所有数据。注意 statement 参数,确保它选择了所有需要的数据。
- 检查 Logstash 配置文件的 output 插件,确保正确配置了 Elasticsearch 的连接参数。同时,检查是否有过滤器在导入过程中过滤掉了部分数据。
- 在 Logstash 配置文件中添加一个 stdout 插件,将从 PostgreSQL 数据库中读取的数据记录到文件中。
例如,可以添加以下内容:
output {
elasticsearch {
...Elasticsearch 配置...
}
stdout {
codec => json_lines
path => "/path/to/logstash_output.log"
}
}
将 Logstash 输出文件与 PostgreSQL 数据库中的原始数据进行比较,以找出未导入的数据。可以使用 Python、Shell 脚本或其他编程语言编写一个简单的脚本来执行此操作。
如果 Logstash 输出文件中的记录数与 PostgreSQL 数据库中的记录数一致,但 Elasticsearch 中的记录数不一致,请检查 Elasticsearch 集群的健康状况和日志。确认集群是否在接收和索引数据时遇到问题。
如果问题仍然存在,尝试将批量操作的大小减小,以减轻 Elasticsearch 和 Logstash 的负担。可以通过在 Logstash 配置文件的 output 插件中设置 flush_size 和 idle_flush_time 参数来实现。
处理大量数据时,可能需要调整 Logstash 和 Elasticsearch 的性能和资源配置。根据硬件和网络条件,可能需要优化批量操作、JVM 设置、线程池大小等方面的设置。
2.2 比较脚本的实现
以下是一个简单的 Shell 脚本示例,用于比较 Logstash 输出文件(JSON 格式)和 PostgreSQL 数据库中的数据。该脚本将比较特定字段(如 id)以确定哪些数据可能未导入到 Elasticsearch。
首先,从 PostgreSQL 数据库中导出数据,将其保存为 CSV 文件:
COPY (SELECT id FROM your_table) TO '/path/to/postgres_data.csv' WITH
接下来,创建一个名为 compare.sh 的 Shell 脚本:
#!/bin/bash
# 将 JSON 文件中的 ID 提取到一个文件中
jq '.id' /path/to/logstash_output.log > logstash_ids.txt
# 删除 JSON 中的双引号
sed -i 's/"//g' logstash_ids.txt
# 对 Logstash 和 PostgreSQL 的 ID 文件进行排序
sort -n logstash_ids.txt > logstash_ids_sorted.txt
sort -n /path/to/postgres_data.csv > postgres_ids_sorted.txt
# 使用 comm 比较两个已排序的 ID 文件
comm -23 postgres_ids_sorted.txt logstash_ids_sorted.txt > missing_ids.txt
# 输出结果
echo "以下 ID 在 Logstash 输出文件中未找到:"
cat missing_ids.txt
为脚本添加可执行权限并运行:
chmod +x compare.sh
./compare.sh
此脚本会比较 logstash_output.log 和 postgres_data.csv 文件中的 ID。如果发现缺失的 ID,它们将被保存在 missing_ids.txt 文件中,并输出到控制台。请注意,该脚本假设已经安装了 jq(一个命令行 JSON 处理器)。如果没有,请先安装 jq。
3、推荐方案二——Redis 加速对比
在这种情况下,可以使用 Redis 的集合数据类型来存储 PostgreSQL 数据库和 Logstash 输出文件中的 ID。接下来,可以使用 Redis 提供的集合操作来找到缺失的 ID。
以下是一个使用 Redis 实现加速比对的示例:
首先,从 PostgreSQL 数据库中导出数据,将其保存为 CSV 文件:
COPY (SELECT id FROM your_table) TO '/path/to/postgres_data.csv' WITH CSV HEADER;
安装并启动 Redis。
使用 Python 脚本将 ID 数据加载到 Redis:
import redis
import csv
# 连接到 Redis
r = redis.StrictRedis(host='localhost', port=6379, db=0)
# 从 PostgreSQL 导出的 CSV 文件中加载数据
with open('/path/to/postgres_data.csv', newline='') as csvfile:
csv_reader = csv.reader(csvfile)
next(csv_reader) # 跳过表头
for row in csv_reader:
r.sadd('postgres_ids', row[0])
# 从 Logstash 输出文件中加载数据
with open('/path/to/logstash_output.log', newline='') as logstash_file:
for line in logstash_file:
id = line.split('"id":')[1].split(',')[0].strip()
r.sadd('logstash_ids', id)
# 计算差集
missing_ids = r.sdiff('postgres_ids', 'logstash_ids')
# 输出缺失的 ID
print("以下 ID 在 Logstash 输出文件中未找到:")
for missing_id in missing_ids:
print(missing_id)
这个 Python 脚本使用 Redis 集合数据类型存储 ID,然后计算它们之间的差集以找到缺失的 ID。需要先安装 Python 的 Redis 库。可以使用以下命令安装:
pip install redis
这个脚本是一个基本示例,可以根据需要修改和扩展它。使用 Redis 的优点是它能在内存中快速处理大量数据,而不需要在磁盘上读取和写入临时文件。
4、小结
方案一:使用 Shell 脚本和 grep 命令
- 优点:
(1)简单,易于实现。
(2)不需要额外的库或工具。
- 缺点:
(1)速度较慢,因为它需要在磁盘上读写临时文件。
(2)对于大数据量的情况,可能会导致较高的磁盘 I/O 和内存消耗。
方案二:使用 Redis 实现加速比对
- 优点:
(1)速度更快,因为 Redis 是基于内存的数据结构存储。
(2)可扩展性较好,可以处理大量数据。
- 缺点:
(1)实现相对复杂,需要编写额外的脚本。
(2)需要安装和运行 Redis 服务器。
根据需求和数据量,可以选择合适的方案。如果处理的数据量较小,且对速度要求不高,可以选择方案一,使用 Shell 脚本和 grep 命令。这种方法简单易用,但可能在大数据量下表现不佳。
如果需要处理大量数据,建议选择方案二,使用 Redis 实现加速比对。这种方法速度更快,能够有效地处理大数据量。然而,这种方法需要额外的设置和配置,例如安装 Redis 服务器和编写 Python 脚本。
在实际应用中,可能需要根据具体需求进行权衡,以选择最适合的解决方案。
猜你喜欢
- 2024-10-21 (建议收藏)小白视角总结分布式搜索组件elasticsearch《二》
- 2024-10-21 RabbitMQ消息服务用户手册(rabbitmq消息id)
- 2024-10-21 索引生命周期管理ILM看完不懂你锤我
- 2024-10-21 Elasticsearch技术问答系列-NO3(elasticsearch curator)
- 2024-10-21 从裸机到700亿参数大模型,这里有份教程,还有现成可用的脚本
- 2024-10-21 「一文搞懂」Nacos健康检查机制(nacos修改健康检查模式)
- 2024-10-21 「ceph-deploy」CentOS7部署Ceph-nautilus 14.2.18版本集群学习
- 2024-10-21 Kibana 最常见的“启动报错”的故障原因及解决方案汇总
- 2024-10-21 二进制部署Kubernetes V1.18.X(etcd集群篇)
- 2024-10-21 「超级详细」Nacos健康检查源码解析
- 最近发表
- 标签列表
-
- cmd/c (57)
- c++中::是什么意思 (57)
- sqlset (59)
- ps可以打开pdf格式吗 (58)
- phprequire_once (61)
- localstorage.removeitem (74)
- routermode (59)
- vector线程安全吗 (70)
- & (66)
- java (73)
- org.redisson (64)
- log.warn (60)
- cannotinstantiatethetype (62)
- js数组插入 (83)
- resttemplateokhttp (59)
- gormwherein (64)
- linux删除一个文件夹 (65)
- mac安装java (72)
- reader.onload (61)
- outofmemoryerror是什么意思 (64)
- flask文件上传 (63)
- eacces (67)
- 查看mysql是否启动 (70)
- java是值传递还是引用传递 (58)
- 无效的列索引 (74)