此脚本支持MySql8 和 MySql5 版本
在“记录结果的ip”的mysql中建表,所有巡检结果都放在该表中,后续可以配合zabbix做定期巡检
create table binlog_bigtrx_records
(
id int not null auto_increment,
mysql_ip varchar(30),
min_binlog varchar(30),
max_binlog varchar(30),
record_time timestamp null default current_timestamp,
update_time timestamp null default current_timestamp on update current_timestamp,
exist_big_trx varchar(10),
primary key (id));binlog_bigtrx_show.py
'''
要求:
1、将事务大小存于表中,并且记录上一次记录的最大binlog号,下一次检查从该binlog开始
2、并且增加巡检脚本
表内容:
ip、开始binlog、结束binlog、库名、表名、事务大小、操作类型、操作事件
1、根据最小最大binlog号,得出所有binlog号(根据update_time判断是否当天更新,未更新的不检查事务)
2、遍历所有binlog号,将大事务记录到表中(每天更新),生成日志(每天汇总一条)
3、
'''
import argparse
import sys
from datetime import datetime
import pymysql
import logging
import os
from datetime import datetime
# 配置日志格式和文件名
LOG_DIR = '/home/mysql/mysql_big_trx_check/log'
log_filename = f'binlog_event_monitor_{datetime.now().strftime("%Y%m%d")}.log'
log_file = os.path.join(LOG_DIR, log_filename)
# 确保日志目录存在
os.makedirs(LOG_DIR, exist_ok=True)
# 配置日志,设置为追加模式
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file, mode='a'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
curr_dt = datetime.now().strftime("%Y%m%d_%H%M%S")
db_source_config = {
'host': '记录结果的ip',
'port': 3306,
'user': 'binlogxunjian',
'password': 'binlogxunjian',
'database': 'blxjdb'
}
def append_file(file_name, content):
# 以写入的方式打开
f = open(file_name, 'a+')
# 写入内容
f.write(content)
# 换行符
f.write('\n')
# 关闭文件
f.close()
def get_event_from_binlog_by_sql(binlog, mysql_ip, mysql_port, user_name, password, trx_min_size):
logger.info(f"{mysql_ip}:正在解析日志: {binlog}")
result_file = f"/home/mysql/mysql_big_trx_check/big_trx_info/event_result_{mysql_ip}_{binlog}_{curr_dt}.log"
dbconn = pymysql.connect(host=mysql_ip, user=user_name, passwd=password, port=mysql_port) # use_unicode=False,
cur = dbconn.cursor()
show_binlog_events_sql = f"show binlog events in '{binlog}'"
cur.execute(show_binlog_events_sql)
binlog_events = cur.fetchall()
if len(binlog_events) > 0:
last_event_type = ''
last_tab_name = ''
last_event_size = 0
# 大事务发现标志
bigtrx_found = False
for bv_row in binlog_events:
try:
log_name, start_pos, event_type, server_id, end_pos, info = bv_row
if info[0:5].upper() == 'BEGIN':
logger.debug(f"Transaction started at position: {start_pos}")
# tab_fname_list = []
trx_start_pos = start_pos
update_events_size = {}
# update_events_count = {}
delete_events_size = {}
# delete_events_count = {}
write_events_size = {}
# write_events_count = {}
trx_size = 0
logger.debug(f"Transaction started at position: {trx_start_pos}")
if event_type.upper() == 'TABLE_MAP':
tab_fname = info.split()[2].replace('(', '').replace(')', '')
logger.debug(f"Mapped table: {tab_fname}")
if event_type.upper() == 'UPDATE_ROWS':
trx_size += end_pos - start_pos
if tab_fname in update_events_size:
update_events_size[tab_fname] = update_events_size[tab_fname] + end_pos - start_pos
# update_events_count[tab_name] = update_events_count[tab_name] + 1
else:
update_events_size[tab_fname] = end_pos - start_pos
# update_events_count[tab_name] = 1
if event_type.upper() == 'DELETE_ROWS':
trx_size += end_pos - start_pos
if tab_fname in delete_events_size:
delete_events_size[tab_fname] = delete_events_size[tab_fname] + end_pos - start_pos
# delete_events_count[tab_name] = delete_events_count[tab_name] + 1
else:
delete_events_size[tab_fname] = end_pos - start_pos
# delete_events_count[tab_name] = 1
if event_type.upper() == 'WRITE_ROWS':
trx_size += end_pos - start_pos
if tab_fname in write_events_size:
write_events_size[tab_fname] = write_events_size[tab_fname] + end_pos - start_pos
# write_events_count[tab_name] = write_events_count[tab_name] + 1
else:
write_events_size[tab_fname] = end_pos - start_pos
# write_events_count[tab_name] = 1
if info[0:6].upper() == 'COMMIT':
logger.debug(f"Transaction committed at position: {end_pos}")
trx_end_pos = end_pos
if trx_size / 1024 / 1024 > trx_min_size:
bigtrx_found = True
output_str = f"{mysql_ip};{round((trx_end_pos - trx_start_pos) / 1024 / 1024, 2)} MB;"
if update_events_size:
for tab_fname1 in update_events_size:
db_name = tab_fname1.split('.')[0].strip().strip('`')
tab_bname = tab_fname1.split('.')[1].strip().strip('`')
output_str += f"UPD-size:{update_events_size};trx_start_pos:{trx_start_pos};'### UPDATE `{db_name}`.`{tab_bname}`';"
if delete_events_size:
for tab_fname1 in delete_events_size:
db_name = tab_fname1.split('.')[0].strip().strip('`')
tab_bname = tab_fname1.split('.')[1].strip().strip('`')
output_str += f"DLT-size:{delete_events_size};trx_start_pos:{trx_start_pos};'### DELETE FROM `{db_name}`.`{tab_bname}`';"
if write_events_size:
for tab_fname1 in write_events_size:
db_name = tab_fname1.split('.')[0].strip().strip('`')
tab_bname = tab_fname1.split('.')[1].strip().strip('`')
output_str += f"WTE-size:{write_events_size};trx_start_pos:{trx_start_pos};'### INSERT INTO `{db_name}`.`{tab_bname}`';"
append_file(result_file, output_str)
except Exception as e:
logger.error(f"{mysql_ip}:Parse event:{bv_row} is error, Detail is :{e}.")
sys.exit(1)
if bigtrx_found:
logger.info(f"{mysql_ip}:Full event result:[ {result_file} ]")
else:
logger.error(f"{mysql_ip}:binlog file:{binlog} is invalid")
# print(f"Full event result:[ {result_file} ]")
dbconn.close()
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='A piece of binlog analysis code.')
parser.add_argument('--binlog', type=str, help="binlog file(Run as [show binlog events in 'mysql-bin.00001' ).",
default=None)
parser.add_argument('--mysql_ip', type=str, help='mysql database ip addr.', default=None)
parser.add_argument('--port', type=int, help='mysql database ip addr.', default=3306)
parser.add_argument('--uname', type=str, help='mysql database username.', default='xjuser')
parser.add_argument('--password', type=str, help='mysql database password.', default = None)
parser.add_argument('--trx_min_size', type=int, help='need print min trx binlog size(MB).', default=1)
args = parser.parse_args()
if not args.binlog or not args.mysql_ip:
logger.error("Missing required arguments: --binlog and --mysql_ip")
parser.print_help()
sys.exit(0)
try:
password = 'password'
get_event_from_binlog_by_sql(args.binlog, args.mysql_ip, args.port, args.uname, password,
args.trx_min_size)
logger.info(f"{args.mysql_ip}:{args.binlog}解析完成!")
except Exception as e:
logger.error(f"{args.mysql_ip}:解析过程中出现错误: {e}")
sys.exit(1)binlog_list.py
import argparse
import sys
from datetime import datetime
import pymysql
import logging
import os
from datetime import datetime
# 配置日志格式和文件名
LOG_DIR = '/home/mysql/mysql_big_trx_check/log'
log_filename = f'binlog_event_monitor_{datetime.now().strftime("%Y%m%d")}.log'
log_file = os.path.join(LOG_DIR, log_filename)
# 确保日志目录存在
os.makedirs(LOG_DIR, exist_ok=True)
# 配置日志,设置为追加模式
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file, mode='a'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
curr_dt = datetime.now().strftime("%Y-%m-%d")
def binlog_min_max_query(cursor_source, mysql_ip, curr_dt):
try:
sql = "select min_binlog,max_binlog from blxjdb.binlog_bigtrx_records where mysql_ip = %s and DATE(record_time) = %s order by record_time desc limit 1"
cursor_source.execute(sql,(mysql_ip, curr_dt))
result = cursor_source.fetchone()
if result:
min_binlog, max_binlog = result[0], result[1]
return min_binlog, max_binlog
else:
logger.info(f"{mysql_ip}:没有找到对应日期的binlog记录")
return None, None
except Exception as e:
logger.error(f"{mysql_ip}:查询binlog_min_max时出错: {e}")
return None, None
def get_binlog_list(mysql_ip, min_binlog, max_binlog):
index = len(min_binlog)
while index > 0 and min_binlog[index - 1].isdigit():
index -= 1
min_binlog_num = int(min_binlog[index:])
max_binlog_num = int(max_binlog[index:])
binlog_list = []
for i in range(min_binlog_num, max_binlog_num + 1):
binlog = f"mysql-bin.{i:06d}"
binlog_list.append(binlog)
print(binlog)
logger.info(f"{mysql_ip}:待解析binlog日志列表:{binlog_list}")
return binlog_list
def main(mysql_ip):
db_source_config = {
'host': '记录结果的ip',
'port': 3306,
'user': 'binlogxunjian',
'password': 'binlogxunjian',
'database': 'blxjdb'
}
try:
conn_source = pymysql.connect(**db_source_config)
cursor_source = conn_source.cursor()
min_binlog, max_binlog = binlog_min_max_query(cursor_source, mysql_ip, curr_dt)
if min_binlog is None or max_binlog is None:
logger.warning(f"{mysql_ip}:未能获取有效的binlog范围。")
return
binlog_list = get_binlog_list(mysql_ip, min_binlog, max_binlog)
# for binlog in binlog_list:
# print(binlog)
except Exception as e:
logger.error(f"{mysql_ip}:主函数执行时出错: {e}")
finally:
# 确保关闭数据库连接和游标
if cursor_source:
cursor_source.close()
if conn_source:
conn_source.close()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Get binlog list.')
parser.add_argument('--mysql_ip', type=str, help='mysql database ip addr.', default=None)
args = parser.parse_args()
if not args.mysql_ip:
parser.print_help()
sys.exit(0)
main(args.mysql_ip)binlog_records.py
import argparse
import sys
import pymysql
import logging
import os
import re
from datetime import datetime
# 配置日志格式和文件名
LOG_DIR = '/home/mysql/mysql_big_trx_check/log'
log_filename = f'binlog_event_monitor_{datetime.now().strftime("%Y%m%d")}.log'
log_file = os.path.join(LOG_DIR, log_filename)
# 确保日志目录存在
os.makedirs(LOG_DIR, exist_ok=True)
# 配置日志,设置为追加模式
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file, mode='a'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
def get_binlog_min_max(cursor_target, mysql_ip):
"""获取指定 binlog 文件的最小和最大位置"""
try:
query_binlog = f"show binary logs;"
#query_binlog = f"select * from binary_logs;"
cursor_target.execute(query_binlog)
binlog = cursor_target.fetchall()
if binlog is None or not binlog:
logger.error(f"{mysql_ip}:Binlog 文件不存在或无法读取。")
return None
# 自定义排序函数
def binlog_sort_key(binlog_name):
match = re.search(r'(\d+)$', binlog_name)
if match:
return int(match.group(1))
return 0
# 对 binlog 按文件名排序
sort_binlog = sorted(binlog, key=lambda x: binlog_sort_key(x[0]))
print(sort_binlog)
# 解析事件,找到最小和最大位置
min_binlog = sort_binlog[0][0]
max_binlog = sort_binlog[-1][0]
return min_binlog, max_binlog
except Exception as e:
logger.error(f"{mysql_ip}:获取 binlog 最小和最大值时出错: {e}")
return None
def insert_binlog_record(cursor_source, mysql_ip, min_binlog, max_binlog):
"""将 binlog 的最小和最大位置插入到记录表中"""
try:
insert = """
INSERT INTO blxjdb.binlog_bigtrx_records (mysql_ip, min_binlog, max_binlog) VALUES (%s, %s, %s)
"""
cursor_source.execute(insert, (mysql_ip, min_binlog, max_binlog))
except Exception as e:
logger.error(f"{mysql_ip}:插入记录时出错: {e}")
def get_last_max_binlog(cursor_source, mysql_ip):
"""获取最后一次记录的最大位置"""
try:
query = """
SELECT max(max_binlog) FROM blxjdb.binlog_bigtrx_records WHERE mysql_ip = %s
"""
cursor_source.execute(query, (mysql_ip,))
result = cursor_source.fetchall()
if result is None or len(result) == 0:
logger.info(f"{mysql_ip}:没有找到上次的 binlog 最大值记录")
return None
max_binlog = result[0][0]
logger.info(f"{mysql_ip}:上次记录的最大 binlog 位置为: {max_binlog}")
return max_binlog
except Exception as e:
logger.error(f"{mysql_ip}:获取上次最大值时出错: {e}")
return None
def main(mysql_ip):
db_source_config = {
'host': '记录结果的ip',
'port': 3306,
'user': 'binlogxunjian',
'password': 'binlogxunjian'
}
db_target_config = {
'host': mysql_ip,
'port': 3306,
'user': 'username',
'password': 'password'
}
#db_target_config['host'] = mysql_ip
conn_source = pymysql.connect(**db_source_config)
cursor_source = conn_source.cursor()
conn_target = pymysql.connect(**db_target_config)
cursor_target = conn_target.cursor()
try:
# 获取最后一次记录的最大位置
last_max = get_last_max_binlog(cursor_source, mysql_ip)
# 获取当前 binlog 文件的最小和最大位置
min_binlog, max_binlog = get_binlog_min_max(cursor_target, mysql_ip)
if min_binlog is None or max_binlog is None:
return
# 确定新的最小位置
if last_max is None:
# 无记录则插入
logger.info(f"{mysql_ip}:没有记录,开始新数据插入!")
insert_binlog_record(cursor_source, mysql_ip, min_binlog, max_binlog)
logger.info(f"{mysql_ip}:新数据插入完成!")
conn_source.commit()
elif last_max < max_binlog or last_max == max_binlog:
new_min = last_max
# last_max < max_binlog 则更新
insert_binlog_record(cursor_source, mysql_ip, new_min, max_binlog)
conn_source.commit()
logger.info(f"{mysql_ip}:binlog存在变化!")
else:
logger.info(f"{mysql_ip}:没有新的 binlog 变化!")
except Exception as e:
logger.error(f"{mysql_ip}:主函数执行时出错: {e}")
conn_source.rollback()
finally:
if conn_source:
conn_source.close()
cursor_source.close()
if conn_target:
conn_target.close()
cursor_target.close()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Insert Binlog Records')
parser.add_argument('--mysql_ip', type=str, help='mysql database ip addr.', default=None)
args = parser.parse_args()
if not args.mysql_ip:
parser.print_help()
sys.exit(0)
main(args.mysql_ip)binlog_event_analyze.sh
#!/bin/bash
workdir="/home/oracle/monitor/mysql_big_trx_check"
cd $workdir
#调用py3环境
source /home/oracle/py3/bin/activate
log_bigtrx_dir=/home/oracle/monitor/mysql_big_trx_check/big_trx_info
curr_dt=$(date +%Y%m%d)
export MYSQL_PWD=binlogxunjian
#设置最小事务大小
trx_min_size=90
mysql_ip=$1
function update_binlog_bigtrx_records() {
local mysql_ip=$1
local exist_big_trx=$2
mysql -ubinlogxunjian -hip --silent --skip-column-names -e \
"update blxjdb.binlog_bigtrx_records set exist_big_trx='$exist_big_trx' where mysql_ip='$mysql_ip' and date_format(record_time, '%Y%m%d')=${curr_dt};"
}
# 调用Python脚本获取binlog记录
python binlog_records.py --mysql_ip $mysql_ip
binlog_list=$(python binlog_list.py --mysql_ip $mysql_ip | tr '\n' ' ')
# 记录是否存在大事务
for binlog in $binlog_list;do
# 将任务添加到作业队列
# {
python binlog_bigtrx_show.py --mysql_ip $mysql_ip --binlog $binlog --trx_min_size $trx_min_size
# } &
done
if [ "$(ls ${log_bigtrx_dir}/event_result_${mysql_ip}_*_${curr_dt}_*.log 2> /dev/null |wc -l)" -gt 0 ];then
update_binlog_bigtrx_records $mysql_ip 'Y'
else
update_binlog_bigtrx_records $mysql_ip 'N'
fibinlog_event_monitor.sh
Mysql8版本的待巡检Mysql的ip是记录在oracle库的表中的,可根据实际要求修改
下面的Mysql5版本的待巡检Mysql的ip是写死在文件中的,可进行参考
#!/bin/bash
# 日志记录
curr_dt=`date +%Y%m%d`
log_filename="/home/mysql/mysql_big_trx_check/log/binlog_event_monitor_${curr_dt}.log"
# 最大并行任务数
MAX_PARALLEL=4
# 任务超时时间(秒)
TIMEOUT=7200 # 例如,设置为120分钟
# Oracle 数据库连接信息
ORACLE_USER="user"
ORACLE_PASS="password"
ORACLE_SID="orcl"
# 从Oracle表中获取MySQL IP列表
mysql_ips=$(sqlplus -S ${ORACLE_USER}/${ORACLE_PASS}@ip:1521/${ORACLE_SID} <<EOF
set heading off
set feedback off
set pagesize 0
set timing off
select HOST from MYSQL_HOST;
exit;
EOF
)
# 存储待处理的任务队列
queue=($mysql_ips)
# 存储正在运行的任务PID及其启动时间
declare -A tasks=() # 使用关联数组,键为PID,值为启动时间
# 陷阱处理,捕获终止信号
trap ctrl_c INT TERM
# 终止信号处理函数
function ctrl_c() {
# 终止所有子进程
for pid in "${!tasks[@]}"; do
if kill -0 "$pid" 2>/dev/null; then
kill "$pid"
unset tasks["$pid"]
echo "ERROR: 终止任务PID: $pid" >> $log_filename
fi
done
echo "ERROR: 脚本终止,所有子进程已关闭。" >> $log_filename
exit 1
}
# 遍历每个MySQL IP进行解析
function binlog_event_analyze() {
local mysql_ip=$1
echo "现在开始解析: $mysql_ip" >> $log_filename
binlog_event_analyze.sh "$mysql_ip" &
new_pid=$!
curr_time=`date +%Y%m%d_%H%M%S`
current_time=$(date +%s)
tasks["$new_pid"]=$current_time
echo "$mysql_ip: 任务PID: $new_pid, 启动时间: $curr_time" >> $log_filename
}
# 检查任务状态并处理超时
function check_task_status() {
local current_time=$(date +%s)
for pid in "${!tasks[@]}"; do
if ! kill -0 "$pid" 2>/dev/null; then
unset tasks["$pid"]
echo "$mysql_ip: 任务PID: $pid 已完成或不存在,已释放资源。" >> $log_filename
else
# 检查任务是否超时
start_time=${tasks[$pid]}
if [ $((current_time - start_time)) -ge $TIMEOUT ]; then
kill "$pid"
unset tasks["$pid"]
echo "ERROR: $mysql_ip: 任务PID: $pid 超时终止,已释放资源。" >> $log_filename
fi
fi
done
}
# 主循环:遍历所有MySQL IP
log_interval=120
log_count=0
# 主循环:遍历所有MySQL IP
while [ ${#queue[@]} -gt 0 ]; do
# 检查任务状态并处理超时
check_task_status
# 启动新任务
if [ "${#tasks[@]}" -lt "$MAX_PARALLEL" ]; then
mysql_ip=${queue[0]}
binlog_event_analyze "$mysql_ip"
queue=("${queue[@]:1}") # 移除已启动的任务
log_count=0
else
# 每120次(1min)循环输出一次队列状态
log_count=$((log_count + 1))
if [ $log_count -ge $log_interval ]; then
echo "并行任务已达最大值$MAX_PARALLEL,请稍等..." >> $log_filename
echo "待处理的IP队列: ${queue[@]}" >> $log_filename
log_count=0
fi
fi
# 短暂等待,避免过于频繁的进程检查
sleep 0.5
done
# 等待所有任务完成
for pid in "${!tasks[@]}"; do
wait "$pid" 2>/dev/null
unset tasks["$pid"]
done
echo "所有任务已完成。" >> $log_filename以下是针对MySql5版本的脚本(binlog_list.py 复用MySql8版本的)
binlog_bigtrx_show_for_mysql5.py
import argparse
import sys
from datetime import datetime
import pymysql
import logging
import os
# 配置日志格式和文件名
LOG_DIR = '/home/mysql/mysql_big_trx_check/log'
log_filename = f'binlog_event_monitor_{datetime.now().strftime("%Y%m%d")}.log'
log_file = os.path.join(LOG_DIR, log_filename)
# 确保日志目录存在
os.makedirs(LOG_DIR, exist_ok=True)
# 配置日志,设置为追加模式
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file, mode='a'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
curr_dt = datetime.now().strftime("%Y%m%d_%H%M%S")
db_source_config = {
'host': '记录结果的ip',
'port': 3306,
'user': 'binlogxunjian',
'password': 'binlogxunjian',
'database': 'blxjdb'
}
def append_file(file_name, content):
# 以写入的方式打开
f = open(file_name, 'a+')
# 写入内容
f.write(content)
# 换行符
f.write('\n')
# 关闭文件
f.close()
def get_event_from_binlog_by_sql(binlog, mysql_ip, mysql_port, user_name, password, trx_min_size):
logger.info(f"{mysql_ip}:正在解析日志: {binlog}")
result_file = f"/home/mysql/mysql_big_trx_check/big_trx_info/event_result_{mysql_ip}_{binlog}_{curr_dt}.log"
dbconn = pymysql.connect(host=mysql_ip, user=user_name, passwd=password, port=mysql_port) # use_unicode=False,
cur = dbconn.cursor()
show_binlog_events_sql = f"show binlog events in '{binlog}'"
cur.execute(show_binlog_events_sql)
binlog_events = cur.fetchall()
if len(binlog_events) > 0:
last_event_type = ''
last_tab_name = ''
last_event_size = 0
# 大事务发现标志
bigtrx_found = False
for bv_row in binlog_events:
try:
log_name, start_pos, event_type, server_id, end_pos, info = bv_row
if info[0:5].upper() == 'BEGIN':
logger.debug(f"Transaction started at position: {start_pos}")
# tab_fname_list = []
trx_start_pos = start_pos
update_events_size = {}
# update_events_count = {}
delete_events_size = {}
# delete_events_count = {}
write_events_size = {}
# write_events_count = {}
trx_size = 0
logger.debug(f"Transaction started at position: {trx_start_pos}")
if event_type.upper() == 'TABLE_MAP':
tab_fname = info.split()[2].replace('(', '').replace(')', '')
logger.debug(f"Mapped table: {tab_fname}")
if event_type.upper() == 'UPDATE_ROWS':
trx_size += end_pos - start_pos
if tab_fname in update_events_size:
update_events_size[tab_fname] = update_events_size[tab_fname] + end_pos - start_pos
# update_events_count[tab_name] = update_events_count[tab_name] + 1
else:
update_events_size[tab_fname] = end_pos - start_pos
# update_events_count[tab_name] = 1
if event_type.upper() == 'DELETE_ROWS':
trx_size += end_pos - start_pos
if tab_fname in delete_events_size:
delete_events_size[tab_fname] = delete_events_size[tab_fname] + end_pos - start_pos
# delete_events_count[tab_name] = delete_events_count[tab_name] + 1
else:
delete_events_size[tab_fname] = end_pos - start_pos
# delete_events_count[tab_name] = 1
if event_type.upper() == 'WRITE_ROWS':
trx_size += end_pos - start_pos
if tab_fname in write_events_size:
write_events_size[tab_fname] = write_events_size[tab_fname] + end_pos - start_pos
# write_events_count[tab_name] = write_events_count[tab_name] + 1
else:
write_events_size[tab_fname] = end_pos - start_pos
# write_events_count[tab_name] = 1
if info[0:6].upper() == 'COMMIT':
logger.debug(f"Transaction committed at position: {end_pos}")
trx_end_pos = end_pos
if trx_size / 1024 / 1024 > trx_min_size:
bigtrx_found = True
output_str = f"{mysql_ip};{round((trx_end_pos - trx_start_pos) / 1024 / 1024, 2)} MB;"
if update_events_size:
for tab_fname1 in update_events_size:
db_name = tab_fname1.split('.')[0].strip().strip('`')
tab_bname = tab_fname1.split('.')[1].strip().strip('`')
output_str += f"UPD-size:{update_events_size};trx_start_pos:{trx_start_pos};'### UPDATE `{db_name}`.`{tab_bname}`';"
if delete_events_size:
for tab_fname1 in delete_events_size:
db_name = tab_fname1.split('.')[0].strip().strip('`')
tab_bname = tab_fname1.split('.')[1].strip().strip('`')
output_str += f"DLT-size:{delete_events_size};trx_start_pos:{trx_start_pos};'### DELETE FROM `{db_name}`.`{tab_bname}`';"
if write_events_size:
for tab_fname1 in write_events_size:
db_name = tab_fname1.split('.')[0].strip().strip('`')
tab_bname = tab_fname1.split('.')[1].strip().strip('`')
output_str += f"WTE-size:{write_events_size};trx_start_pos:{trx_start_pos};'### INSERT INTO `{db_name}`.`{tab_bname}`';"
append_file(result_file, output_str)
except Exception as e:
logger.error(f"{mysql_ip}:Parse event:{bv_row} is error, Detail is :{e}.")
sys.exit(1)
if bigtrx_found:
logger.info(f"{mysql_ip}:Full event result:[ {result_file} ]")
else:
logger.error(f"{mysql_ip}:binlog file:{binlog} is invalid")
# print(f"Full event result:[ {result_file} ]")
dbconn.close()
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='A piece of binlog analysis code.')
parser.add_argument('--binlog', type=str, help="binlog file(Run as [show binlog events in 'mysql-bin.00001' ).",
default=None)
parser.add_argument('--mysql_ip', type=str, help='mysql database ip addr.', default=None)
parser.add_argument('--port', type=int, help='mysql database ip addr.', default=3306)
parser.add_argument('--uname', type=str, help='mysql database username.', default='blxjuser')
parser.add_argument('--password', type=str, help='mysql database password.', default = None)
parser.add_argument('--trx_min_size', type=int, help='need print min trx binlog size(MB).', default=1)
args = parser.parse_args()
if not args.binlog or not args.mysql_ip:
logger.error("Missing required arguments: --binlog and --mysql_ip")
parser.print_help()
sys.exit(0)
try:
password = 'password'
get_event_from_binlog_by_sql(args.binlog, args.mysql_ip, args.port, args.uname, password,
args.trx_min_size)
logger.info(f"{args.mysql_ip}:{args.binlog}解析完成!")
except Exception as e:
logger.error(f"{args.mysql_ip}:解析过程中出现错误: {e}")
sys.exit(1)binlog_records_for_mysql5.py
import argparse
import sys
import pymysql
import logging
import os
import re
from datetime import datetime
# 配置日志格式和文件名
LOG_DIR = '/home/mysql/mysql_big_trx_check/log'
log_filename = f'binlog_event_monitor_{datetime.now().strftime("%Y%m%d")}.log'
log_file = os.path.join(LOG_DIR, log_filename)
# 确保日志目录存在
os.makedirs(LOG_DIR, exist_ok=True)
# 配置日志,设置为追加模式
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file, mode='a'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
def get_binlog_min_max(cursor_target, mysql_ip):
"""获取指定 binlog 文件的最小和最大位置"""
try:
query_binlog = f"show binary logs;"
#query_binlog = f"select * from binary_logs;"
cursor_target.execute(query_binlog)
binlog = cursor_target.fetchall()
if binlog is None or not binlog:
logger.error(f"{mysql_ip}:Binlog 文件不存在或无法读取。")
return None
# 自定义排序函数
def binlog_sort_key(binlog_name):
match = re.search(r'(\d+)$', binlog_name)
if match:
return int(match.group(1))
return 0
# 对 binlog 按文件名排序
sort_binlog = sorted(binlog, key=lambda x: binlog_sort_key(x[0]))
print(sort_binlog)
# 解析事件,找到最小和最大位置
min_binlog = sort_binlog[0][0]
max_binlog = sort_binlog[-1][0]
return min_binlog, max_binlog
except Exception as e:
logger.error(f"{mysql_ip}:获取 binlog 最小和最大值时出错: {e}")
return None
def insert_binlog_record(cursor_source, mysql_ip, min_binlog, max_binlog):
"""将 binlog 的最小和最大位置插入到记录表中"""
try:
insert = """
INSERT INTO blxjdb.binlog_bigtrx_records (mysql_ip, min_binlog, max_binlog) VALUES (%s, %s, %s)
"""
cursor_source.execute(insert, (mysql_ip, min_binlog, max_binlog))
except Exception as e:
logger.error(f"{mysql_ip}:插入记录时出错: {e}")
def get_last_max_binlog(cursor_source, mysql_ip):
"""获取最后一次记录的最大位置"""
try:
query = """
SELECT max_binlog FROM blxjdb.binlog_bigtrx_records WHERE mysql_ip = %s
"""
cursor_source.execute(query, (mysql_ip,))
result = cursor_source.fetchall()
if result is None or len(result) == 0:
logger.info(f"{mysql_ip}:没有找到上次的 binlog 最大值记录")
return None
max_binlog = result[0][0]
logger.info(f"{mysql_ip}:上次记录的最大 binlog 位置为: {max_binlog}")
return max_binlog
except Exception as e:
logger.error(f"{mysql_ip}:获取上次最大值时出错: {e}")
return None
def main(mysql_ip):
db_source_config = {
'host': '记录结果的ip',
'port': 3306,
'user': 'binlogxunjian',
'password': 'binlogxunjian'
}
db_target_config = {
'host': mysql_ip,
'port': 3306,
'user': 'blxjuser',
'password': 'password'
}
#db_target_config['host'] = mysql_ip
conn_source = pymysql.connect(**db_source_config)
cursor_source = conn_source.cursor()
conn_target = pymysql.connect(**db_target_config)
cursor_target = conn_target.cursor()
try:
# 获取最后一次记录的最大位置
last_max = get_last_max_binlog(cursor_source, mysql_ip)
# 获取当前 binlog 文件的最小和最大位置
min_binlog, max_binlog = get_binlog_min_max(cursor_target, mysql_ip)
if min_binlog is None or max_binlog is None:
return
# 确定新的最小位置
if last_max is None:
# 无记录则插入
logger.info(f"{mysql_ip}:没有记录,开始新数据插入!")
insert_binlog_record(cursor_source, mysql_ip, min_binlog, max_binlog)
logger.info(f"{mysql_ip}:新数据插入完成!")
conn_source.commit()
elif last_max < max_binlog or last_max == max_binlog:
new_min = last_max
# last_max < max_binlog 则更新
insert_binlog_record(cursor_source, mysql_ip, new_min, max_binlog)
conn_source.commit()
logger.info(f"{mysql_ip}:binlog存在变化!")
else:
logger.info(f"{mysql_ip}:没有新的 binlog 变化!")
except Exception as e:
logger.error(f"{mysql_ip}:主函数执行时出错: {e}")
conn_source.rollback()
finally:
if conn_source:
conn_source.close()
cursor_source.close()
if conn_target:
conn_target.close()
cursor_target.close()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Insert Binlog Records')
parser.add_argument('--mysql_ip', type=str, help='mysql database ip addr.', default=None)
args = parser.parse_args()
if not args.mysql_ip:
parser.print_help()
sys.exit(0)
main(args.mysql_ip)binlog_event_analyze_for_mysql5.sh
#!/bin/bash
workdir="/home/mysql/mysql_big_trx_check"
cd $workdir
#调用py3环境
source /home/oracle/py3/bin/activate
log_bigtrx_dir=/home/mysql/mysql_big_trx_check/big_trx_info
curr_dt=$(date +%Y%m%d)
export MYSQL_PWD=binlogxunjian
#设置最小事务大小
trx_min_size=90
mysql_ip=$1
function update_binlog_bigtrx_records() {
local mysql_ip=$1
local exist_big_trx=$2
mysql -ubinlogxunjian -h记录结果的ip --silent --skip-column-names -e \
"update blxjdb.binlog_bigtrx_records set exist_big_trx='$exist_big_trx' where mysql_ip='$mysql_ip' and date_format(record_time, '%Y%m%d')=${curr_dt};"
}
# 调用Python脚本获取binlog记录
python binlog_records_for_mysql5.py --mysql_ip $mysql_ip
binlog_list=$(python binlog_list.py --mysql_ip $mysql_ip | tr '\n' ' ')
# 记录是否存在大事务
for binlog in $binlog_list;do
# 将任务添加到作业队列
# {
python binlog_bigtrx_show_for_mysql5.py --mysql_ip $mysql_ip --binlog $binlog --trx_min_size $trx_min_size
# } &
done
if [ "$(ls ${log_bigtrx_dir}/event_result_${mysql_ip}_*_${curr_dt}_*.log 2> /dev/null |wc -l)" -gt 0 ];then
update_binlog_bigtrx_records $mysql_ip 'Y'
else
update_binlog_bigtrx_records $mysql_ip 'N'
fibinlog_event_monitor_for_mysql5.sh
Mysql5版本的待巡检Mysql的ip是写死在文本里的
#!/bin/bash
# 日志记录
curr_dt=`date +%Y%m%d`
log_filename="/home/mysql/mysql_big_trx_check/log/binlog_event_monitor_for_mysql5_${curr_dt}.log"
# 最大并行任务数
MAX_PARALLEL=4
# 任务超时时间(秒)
TIMEOUT=7200 # 例如,设置为120分钟
# 读取MySQL IP列表
mysql_ips=$(cat /home/mysql/mysql_big_trx_check/mysql_ip.list)
# 存储待处理的任务队列
queue=($mysql_ips)
# 存储正在运行的任务PID及其启动时间
declare -A tasks=() # 使用关联数组,键为PID,值为启动时间
# 陷阱处理,捕获终止信号
trap ctrl_c INT TERM
# 终止信号处理函数
function ctrl_c() {
# 终止所有子进程
for pid in "${!tasks[@]}"; do
if kill -0 "$pid" 2>/dev/null; then
kill "$pid"
unset tasks["$pid"]
echo "ERROR: 终止任务PID: $pid" >> $log_filename
fi
done
echo "ERROR: 脚本终止,所有子进程已关闭。" >> $log_filename
exit 1
}
# 遍历每个MySQL IP进行解析
function binlog_event_analyze_for_mysql5() {
local mysql_ip=$1
echo "现在开始解析: $mysql_ip" >> $log_filename
binlog_event_analyze_for_mysql5.sh "$mysql_ip" &
new_pid=$!
curr_time=`date +%Y%m%d_%H%M%S`
current_time=$(date +%s)
tasks["$new_pid"]=$current_time
echo "$mysql_ip: 任务PID: $new_pid, 启动时间: $curr_time" >> $log_filename
}
# 检查任务状态并处理超时
function check_task_status() {
local current_time=$(date +%s)
for pid in "${!tasks[@]}"; do
if ! kill -0 "$pid" 2>/dev/null; then
unset tasks["$pid"]
echo "$mysql_ip: 任务PID: $pid 已完成或不存在,已释放资源。" >> $log_filename
else
# 检查任务是否超时
start_time=${tasks[$pid]}
if [ $((current_time - start_time)) -ge $TIMEOUT ]; then
kill "$pid"
unset tasks["$pid"]
echo "ERROR: $mysql_ip: 任务PID: $pid 超时终止,已释放资源。" >> $log_filename
fi
fi
done
}
# 主循环:遍历所有MySQL IP
log_interval=120
log_count=0
# 主循环:遍历所有MySQL IP
while [ ${#queue[@]} -gt 0 ]; do
# 检查任务状态并处理超时
check_task_status
# 启动新任务
if [ "${#tasks[@]}" -lt "$MAX_PARALLEL" ]; then
mysql_ip=${queue[0]}
binlog_event_analyze_for_mysql5 "$mysql_ip"
queue=("${queue[@]:1}") # 移除已启动的任务
log_count=0
else
# 每120次(1min)循环输出一次队列状态
log_count=$((log_count + 1))
if [ $log_count -ge $log_interval ]; then
echo "并行任务已达最大值$MAX_PARALLEL,请稍等..." >> $log_filename
echo "待处理的IP队列: ${queue[@]}" >> $log_filename
log_count=0
fi
fi
# 短暂等待,避免过于频繁的进程检查
sleep 0.5
done
# 等待所有任务完成
for pid in "${!tasks[@]}"; do
wait "$pid" 2>/dev/null
unset tasks["$pid"]
done
echo "所有任务已完成。" >> $log_filename