此脚本支持MySql8 和 MySql5 版本
在“记录结果的ip”的mysql中建表,所有巡检结果都放在该表中,后续可以配合zabbix做定期巡检
create table binlog_bigtrx_records
(
id int not null auto_increment,
mysql_ip varchar(30),
min_binlog varchar(30),
max_binlog varchar(30),
record_time timestamp null default current_timestamp,
update_time timestamp null default current_timestamp on update current_timestamp,
exist_big_trx varchar(10),
primary key (id));binlog_bigtrx_show.py
'''
要求:
1、将事务大小存于表中,并且记录上一次记录的最大binlog号,下一次检查从该binlog开始
2、并且增加巡检脚本
表内容:
ip、开始binlog、结束binlog、库名、表名、事务大小、操作类型、操作事件
1、根据最小最大binlog号,得出所有binlog号(根据update_time判断是否当天更新,未更新的不检查事务)
2、遍历所有binlog号,将大事务记录到表中(每天更新),生成日志(每天汇总一条)
3、
'''
import argparse
import sys
from datetime import datetime
import pymysql
import logging
import os
from datetime import datetime
# 配置日志格式和文件名
LOG_DIR = '/home/mysql/mysql_big_trx_check/log'
log_filename = f'binlog_event_monitor_{datetime.now().strftime("%Y%m%d")}.log'
log_file = os.path.join(LOG_DIR, log_filename)
# 确保日志目录存在
os.makedirs(LOG_DIR, exist_ok=True)
# 配置日志,设置为追加模式
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler(log_file, mode='a'),  
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)
curr_dt = datetime.now().strftime("%Y%m%d_%H%M%S")
db_source_config = {
    'host': '记录结果的ip',
    'port': 3306,
    'user': 'binlogxunjian',
    'password': 'binlogxunjian',
    'database': 'blxjdb'
}
def append_file(file_name, content):
    # 以写入的方式打开
    f = open(file_name, 'a+')
    # 写入内容
    f.write(content)
    # 换行符
    f.write('\n')
    # 关闭文件
    f.close()
def get_event_from_binlog_by_sql(binlog, mysql_ip, mysql_port, user_name, password, trx_min_size):
    logger.info(f"{mysql_ip}:正在解析日志: {binlog}")
    result_file = f"/home/mysql/mysql_big_trx_check/big_trx_info/event_result_{mysql_ip}_{binlog}_{curr_dt}.log"
    dbconn = pymysql.connect(host=mysql_ip, user=user_name, passwd=password, port=mysql_port)  # use_unicode=False,
    cur = dbconn.cursor()
    show_binlog_events_sql = f"show binlog events in '{binlog}'"
    cur.execute(show_binlog_events_sql)
    binlog_events = cur.fetchall()
    if len(binlog_events) > 0:
        last_event_type = ''
        last_tab_name = ''
        last_event_size = 0
        # 大事务发现标志
        bigtrx_found = False
        for bv_row in binlog_events:
            try:
                log_name, start_pos, event_type, server_id, end_pos, info = bv_row
                if info[0:5].upper() == 'BEGIN':
                    logger.debug(f"Transaction started at position: {start_pos}")
                    # tab_fname_list = []
                    trx_start_pos = start_pos
                    update_events_size = {}
                    # update_events_count = {}
                    delete_events_size = {}
                    # delete_events_count = {}
                    write_events_size = {}
                    # write_events_count = {}
                    trx_size = 0
                    logger.debug(f"Transaction started at position: {trx_start_pos}")
                if event_type.upper() == 'TABLE_MAP':
                    tab_fname = info.split()[2].replace('(', '').replace(')', '')
                    logger.debug(f"Mapped table: {tab_fname}")
                if event_type.upper() == 'UPDATE_ROWS':
                    trx_size += end_pos - start_pos
                    if tab_fname in update_events_size:
                        update_events_size[tab_fname] = update_events_size[tab_fname] + end_pos - start_pos
                        # update_events_count[tab_name] = update_events_count[tab_name] + 1
                    else:
                        update_events_size[tab_fname] = end_pos - start_pos
                        # update_events_count[tab_name] = 1
                if event_type.upper() == 'DELETE_ROWS':
                    trx_size += end_pos - start_pos
                    if tab_fname in delete_events_size:
                        delete_events_size[tab_fname] = delete_events_size[tab_fname] + end_pos - start_pos
                        # delete_events_count[tab_name] = delete_events_count[tab_name] + 1
                    else:
                        delete_events_size[tab_fname] = end_pos - start_pos
                        # delete_events_count[tab_name] = 1
                if event_type.upper() == 'WRITE_ROWS':
                    trx_size += end_pos - start_pos
                    if tab_fname in write_events_size:
                        write_events_size[tab_fname] = write_events_size[tab_fname] + end_pos - start_pos
                        # write_events_count[tab_name] = write_events_count[tab_name] + 1
                    else:
                        write_events_size[tab_fname] = end_pos - start_pos
                        # write_events_count[tab_name] = 1
                if info[0:6].upper() == 'COMMIT':
                    logger.debug(f"Transaction committed at position: {end_pos}")
                    trx_end_pos = end_pos
                    if trx_size / 1024 / 1024 > trx_min_size:
                        bigtrx_found = True
                        output_str = f"{mysql_ip};{round((trx_end_pos - trx_start_pos) / 1024 / 1024, 2)} MB;"
                        if update_events_size:
                            for tab_fname1 in update_events_size:
                                db_name = tab_fname1.split('.')[0].strip().strip('`')
                                tab_bname = tab_fname1.split('.')[1].strip().strip('`')
                                output_str += f"UPD-size:{update_events_size};trx_start_pos:{trx_start_pos};'### UPDATE `{db_name}`.`{tab_bname}`';"
                        if delete_events_size:
                            for tab_fname1 in delete_events_size:
                                db_name = tab_fname1.split('.')[0].strip().strip('`')
                                tab_bname = tab_fname1.split('.')[1].strip().strip('`')
                            output_str += f"DLT-size:{delete_events_size};trx_start_pos:{trx_start_pos};'### DELETE FROM `{db_name}`.`{tab_bname}`';"
                        if write_events_size:
                            for tab_fname1 in write_events_size:
                                db_name = tab_fname1.split('.')[0].strip().strip('`')
                                tab_bname = tab_fname1.split('.')[1].strip().strip('`')
                            output_str += f"WTE-size:{write_events_size};trx_start_pos:{trx_start_pos};'### INSERT INTO `{db_name}`.`{tab_bname}`';"
                        append_file(result_file, output_str)
            except Exception as e:
                logger.error(f"{mysql_ip}:Parse event:{bv_row} is error, Detail is :{e}.")
                sys.exit(1)
        if bigtrx_found:
            logger.info(f"{mysql_ip}:Full event result:[ {result_file} ]")
    else:
        logger.error(f"{mysql_ip}:binlog file:{binlog} is invalid")
    # print(f"Full event result:[ {result_file} ]")
    dbconn.close()
if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='A piece of binlog analysis code.')
    parser.add_argument('--binlog', type=str, help="binlog file(Run as [show binlog events in 'mysql-bin.00001' ).",
                        default=None)
    parser.add_argument('--mysql_ip', type=str, help='mysql database ip addr.', default=None)
    parser.add_argument('--port', type=int, help='mysql database ip addr.', default=3306)
    parser.add_argument('--uname', type=str, help='mysql database username.', default='xjuser')
    parser.add_argument('--password', type=str, help='mysql database password.', default = None)
    parser.add_argument('--trx_min_size', type=int, help='need print min trx binlog size(MB).', default=1)
    args = parser.parse_args()
    if not args.binlog or not args.mysql_ip:
        logger.error("Missing required arguments: --binlog and --mysql_ip")
        parser.print_help()
        sys.exit(0)
    try:
        password = 'password'
        get_event_from_binlog_by_sql(args.binlog, args.mysql_ip, args.port, args.uname, password,
                             args.trx_min_size)
        logger.info(f"{args.mysql_ip}:{args.binlog}解析完成!")
    except Exception as e:
        logger.error(f"{args.mysql_ip}:解析过程中出现错误: {e}")
        sys.exit(1)binlog_list.py
import argparse
import sys
from datetime import datetime
import pymysql
import logging
import os
from datetime import datetime
# 配置日志格式和文件名
LOG_DIR = '/home/mysql/mysql_big_trx_check/log'
log_filename = f'binlog_event_monitor_{datetime.now().strftime("%Y%m%d")}.log'
log_file = os.path.join(LOG_DIR, log_filename)
# 确保日志目录存在
os.makedirs(LOG_DIR, exist_ok=True)
# 配置日志,设置为追加模式
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler(log_file, mode='a'),  
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)
curr_dt = datetime.now().strftime("%Y-%m-%d")
def binlog_min_max_query(cursor_source, mysql_ip, curr_dt):
    try:
        sql = "select min_binlog,max_binlog from blxjdb.binlog_bigtrx_records where mysql_ip = %s and DATE(record_time) = %s order by record_time desc limit 1" 
        cursor_source.execute(sql,(mysql_ip, curr_dt))
        result = cursor_source.fetchone()
        if result:
            min_binlog, max_binlog = result[0], result[1]
            return min_binlog, max_binlog
        else:
            logger.info(f"{mysql_ip}:没有找到对应日期的binlog记录")
            return None, None
    except Exception as e:
        logger.error(f"{mysql_ip}:查询binlog_min_max时出错: {e}")
        return None, None
def get_binlog_list(mysql_ip, min_binlog, max_binlog):
    index = len(min_binlog)
    while index > 0 and min_binlog[index - 1].isdigit():
        index -= 1
    min_binlog_num = int(min_binlog[index:])
    max_binlog_num = int(max_binlog[index:])
    binlog_list = []
    for i in range(min_binlog_num, max_binlog_num + 1):
        binlog = f"mysql-bin.{i:06d}"
        binlog_list.append(binlog)
        print(binlog)
    logger.info(f"{mysql_ip}:待解析binlog日志列表:{binlog_list}")
    return binlog_list
def main(mysql_ip):
    db_source_config = {
    'host': '记录结果的ip',
    'port': 3306,
    'user': 'binlogxunjian',
    'password': 'binlogxunjian',
    'database': 'blxjdb'
}
    try:
        conn_source = pymysql.connect(**db_source_config)
        cursor_source = conn_source.cursor()
        min_binlog, max_binlog = binlog_min_max_query(cursor_source, mysql_ip, curr_dt)
        if min_binlog is None or max_binlog is None:
            logger.warning(f"{mysql_ip}:未能获取有效的binlog范围。")
            return
        binlog_list = get_binlog_list(mysql_ip, min_binlog, max_binlog)
#        for binlog in binlog_list:
#            print(binlog)
    except Exception as e:
        logger.error(f"{mysql_ip}:主函数执行时出错: {e}")
    finally:
        # 确保关闭数据库连接和游标
        if cursor_source:
            cursor_source.close()
        if conn_source:
            conn_source.close()
if __name__ == "__main__":
    parser = argparse.ArgumentParser(description='Get binlog list.')
    parser.add_argument('--mysql_ip', type=str, help='mysql database ip addr.', default=None)
    args = parser.parse_args()
    if not args.mysql_ip:
        parser.print_help()
        sys.exit(0)
    main(args.mysql_ip)binlog_records.py
import argparse
import sys
import pymysql
import logging
import os
import re
from datetime import datetime
# 配置日志格式和文件名
LOG_DIR = '/home/mysql/mysql_big_trx_check/log'
log_filename = f'binlog_event_monitor_{datetime.now().strftime("%Y%m%d")}.log'
log_file = os.path.join(LOG_DIR, log_filename)
# 确保日志目录存在
os.makedirs(LOG_DIR, exist_ok=True)
# 配置日志,设置为追加模式
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler(log_file, mode='a'),  
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)
def get_binlog_min_max(cursor_target, mysql_ip):
    """获取指定 binlog 文件的最小和最大位置"""
    try:
        query_binlog = f"show binary logs;"
        #query_binlog = f"select * from binary_logs;"
        cursor_target.execute(query_binlog)
        binlog = cursor_target.fetchall()
        
        if binlog is None or not binlog:
            logger.error(f"{mysql_ip}:Binlog 文件不存在或无法读取。")
            return None
        
        # 自定义排序函数
        def binlog_sort_key(binlog_name):
            match = re.search(r'(\d+)$', binlog_name)
            if match:
                return int(match.group(1))
            return 0
        # 对 binlog 按文件名排序
        sort_binlog = sorted(binlog, key=lambda x: binlog_sort_key(x[0]))
        print(sort_binlog)
        # 解析事件,找到最小和最大位置
        min_binlog = sort_binlog[0][0]  
        max_binlog = sort_binlog[-1][0]  
        
        return min_binlog, max_binlog
    except Exception as e:
        logger.error(f"{mysql_ip}:获取 binlog 最小和最大值时出错: {e}")
        return None
def insert_binlog_record(cursor_source, mysql_ip, min_binlog, max_binlog):
    """将 binlog 的最小和最大位置插入到记录表中"""
    try:
        insert = """
        INSERT INTO blxjdb.binlog_bigtrx_records (mysql_ip, min_binlog, max_binlog) VALUES (%s, %s, %s)
        """
        cursor_source.execute(insert, (mysql_ip, min_binlog, max_binlog))
    except Exception as e:
        logger.error(f"{mysql_ip}:插入记录时出错: {e}")
def get_last_max_binlog(cursor_source, mysql_ip):
    """获取最后一次记录的最大位置"""
    try:
        query = """
        SELECT max(max_binlog) FROM blxjdb.binlog_bigtrx_records WHERE mysql_ip = %s 
        """
        cursor_source.execute(query, (mysql_ip,))
        result = cursor_source.fetchall()
        
        if result is None or len(result) == 0:
            logger.info(f"{mysql_ip}:没有找到上次的 binlog 最大值记录")
            return None
            
        max_binlog = result[0][0]
        logger.info(f"{mysql_ip}:上次记录的最大 binlog 位置为: {max_binlog}")
        return max_binlog
    except Exception as e:
        logger.error(f"{mysql_ip}:获取上次最大值时出错: {e}")
        return None
def main(mysql_ip):
    db_source_config = {
        'host': '记录结果的ip',
        'port': 3306,
        'user': 'binlogxunjian',
        'password': 'binlogxunjian'
    }
    
    db_target_config = {
        'host': mysql_ip,
        'port': 3306,
        'user': 'username',
        'password': 'password'
    }
    #db_target_config['host'] = mysql_ip
    conn_source = pymysql.connect(**db_source_config)
    cursor_source = conn_source.cursor()
    conn_target = pymysql.connect(**db_target_config)
    cursor_target = conn_target.cursor()
    try:
        # 获取最后一次记录的最大位置
        last_max = get_last_max_binlog(cursor_source, mysql_ip)
        # 获取当前 binlog 文件的最小和最大位置
        min_binlog, max_binlog = get_binlog_min_max(cursor_target, mysql_ip)
        if min_binlog is None or max_binlog is None:
            return
        # 确定新的最小位置
        if last_max is None:
            # 无记录则插入
            logger.info(f"{mysql_ip}:没有记录,开始新数据插入!")
            insert_binlog_record(cursor_source, mysql_ip, min_binlog, max_binlog)
            logger.info(f"{mysql_ip}:新数据插入完成!")
            conn_source.commit()
        elif last_max < max_binlog or last_max == max_binlog:
            new_min = last_max
            # last_max < max_binlog 则更新
            insert_binlog_record(cursor_source, mysql_ip, new_min, max_binlog)
            conn_source.commit()
            logger.info(f"{mysql_ip}:binlog存在变化!")
        else:
            logger.info(f"{mysql_ip}:没有新的 binlog 变化!")
    except Exception as e:
        logger.error(f"{mysql_ip}:主函数执行时出错: {e}")
        conn_source.rollback()
    finally:
        if conn_source:
            conn_source.close()
            cursor_source.close()
        if conn_target:
            conn_target.close()
            cursor_target.close()
if __name__ == "__main__":
    parser = argparse.ArgumentParser(description='Insert Binlog Records')
    parser.add_argument('--mysql_ip', type=str, help='mysql database ip addr.', default=None)
    args = parser.parse_args()
    if not args.mysql_ip:
        parser.print_help()
        sys.exit(0)
    main(args.mysql_ip)binlog_event_analyze.sh
#!/bin/bash
workdir="/home/oracle/monitor/mysql_big_trx_check"
cd $workdir
#调用py3环境
source /home/oracle/py3/bin/activate
log_bigtrx_dir=/home/oracle/monitor/mysql_big_trx_check/big_trx_info
curr_dt=$(date +%Y%m%d)
export MYSQL_PWD=binlogxunjian
#设置最小事务大小
trx_min_size=90
mysql_ip=$1
function update_binlog_bigtrx_records() {
    
    local mysql_ip=$1
    local exist_big_trx=$2
    mysql -ubinlogxunjian -hip --silent --skip-column-names -e \
        "update blxjdb.binlog_bigtrx_records set exist_big_trx='$exist_big_trx' where mysql_ip='$mysql_ip' and date_format(record_time, '%Y%m%d')=${curr_dt};"
}
# 调用Python脚本获取binlog记录
python binlog_records.py --mysql_ip $mysql_ip
binlog_list=$(python binlog_list.py --mysql_ip $mysql_ip | tr '\n' ' ')
# 记录是否存在大事务
for binlog in $binlog_list;do
    # 将任务添加到作业队列
#    {
        python binlog_bigtrx_show.py --mysql_ip $mysql_ip --binlog $binlog --trx_min_size $trx_min_size
#    } &
done
if [ "$(ls ${log_bigtrx_dir}/event_result_${mysql_ip}_*_${curr_dt}_*.log 2> /dev/null |wc -l)" -gt 0 ];then
    update_binlog_bigtrx_records $mysql_ip 'Y'
else
    update_binlog_bigtrx_records $mysql_ip 'N'
fibinlog_event_monitor.sh
Mysql8版本的待巡检Mysql的ip是记录在oracle库的表中的,可根据实际要求修改
下面的Mysql5版本的待巡检Mysql的ip是写死在文件中的,可进行参考
#!/bin/bash
# 日志记录
curr_dt=`date +%Y%m%d`
log_filename="/home/mysql/mysql_big_trx_check/log/binlog_event_monitor_${curr_dt}.log"
# 最大并行任务数
MAX_PARALLEL=4
# 任务超时时间(秒)
TIMEOUT=7200  # 例如,设置为120分钟
# Oracle 数据库连接信息
ORACLE_USER="user"
ORACLE_PASS="password"
ORACLE_SID="orcl"
# 从Oracle表中获取MySQL IP列表
mysql_ips=$(sqlplus -S ${ORACLE_USER}/${ORACLE_PASS}@ip:1521/${ORACLE_SID} <<EOF
set heading off
set feedback off
set pagesize 0
set timing off
select HOST from MYSQL_HOST;
exit;
EOF
)
# 存储待处理的任务队列
queue=($mysql_ips)
# 存储正在运行的任务PID及其启动时间
declare -A tasks=()  # 使用关联数组,键为PID,值为启动时间
# 陷阱处理,捕获终止信号
trap ctrl_c INT TERM
# 终止信号处理函数
function ctrl_c() {
    # 终止所有子进程
    for pid in "${!tasks[@]}"; do
        if kill -0 "$pid" 2>/dev/null; then
            kill "$pid"
            unset tasks["$pid"]
            echo "ERROR: 终止任务PID: $pid" >> $log_filename
        fi
    done
    echo "ERROR: 脚本终止,所有子进程已关闭。" >> $log_filename
    exit 1
}
# 遍历每个MySQL IP进行解析
function binlog_event_analyze() {
    local mysql_ip=$1
    echo "现在开始解析: $mysql_ip" >> $log_filename
    binlog_event_analyze.sh "$mysql_ip" &
    new_pid=$!
    curr_time=`date +%Y%m%d_%H%M%S`
    current_time=$(date +%s)
    tasks["$new_pid"]=$current_time
    echo "$mysql_ip: 任务PID: $new_pid, 启动时间: $curr_time" >> $log_filename
}
# 检查任务状态并处理超时
function check_task_status() {
    local current_time=$(date +%s)
    for pid in "${!tasks[@]}"; do
        if ! kill -0 "$pid" 2>/dev/null; then
            unset tasks["$pid"]
            echo "$mysql_ip: 任务PID: $pid 已完成或不存在,已释放资源。" >> $log_filename
        else
            # 检查任务是否超时
            start_time=${tasks[$pid]}
            if [ $((current_time - start_time)) -ge $TIMEOUT ]; then
                kill "$pid"
                unset tasks["$pid"]
                echo "ERROR: $mysql_ip: 任务PID: $pid 超时终止,已释放资源。" >> $log_filename
            fi
        fi
    done
}
# 主循环:遍历所有MySQL IP
log_interval=120
log_count=0
# 主循环:遍历所有MySQL IP
while [ ${#queue[@]} -gt 0 ]; do
    # 检查任务状态并处理超时
    check_task_status
    # 启动新任务
    if [ "${#tasks[@]}" -lt "$MAX_PARALLEL" ]; then
        mysql_ip=${queue[0]}
        binlog_event_analyze "$mysql_ip"
        queue=("${queue[@]:1}")  # 移除已启动的任务
        log_count=0
    else
        # 每120次(1min)循环输出一次队列状态
        log_count=$((log_count + 1))
        if [ $log_count -ge $log_interval ]; then
            echo "并行任务已达最大值$MAX_PARALLEL,请稍等..." >> $log_filename
            echo "待处理的IP队列: ${queue[@]}" >> $log_filename
            log_count=0
        fi
    fi
    # 短暂等待,避免过于频繁的进程检查
    sleep 0.5
done
# 等待所有任务完成
for pid in "${!tasks[@]}"; do
    wait "$pid" 2>/dev/null
    unset tasks["$pid"]
done
echo "所有任务已完成。" >> $log_filename以下是针对MySql5版本的脚本(binlog_list.py 复用MySql8版本的)
binlog_bigtrx_show_for_mysql5.py
import argparse
import sys
from datetime import datetime
import pymysql
import logging
import os
# 配置日志格式和文件名
LOG_DIR = '/home/mysql/mysql_big_trx_check/log'
log_filename = f'binlog_event_monitor_{datetime.now().strftime("%Y%m%d")}.log'
log_file = os.path.join(LOG_DIR, log_filename)
# 确保日志目录存在
os.makedirs(LOG_DIR, exist_ok=True)
# 配置日志,设置为追加模式
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler(log_file, mode='a'),  
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)
curr_dt = datetime.now().strftime("%Y%m%d_%H%M%S")
db_source_config = {
    'host': '记录结果的ip',
    'port': 3306,
    'user': 'binlogxunjian',
    'password': 'binlogxunjian',
    'database': 'blxjdb'
}
def append_file(file_name, content):
    # 以写入的方式打开
    f = open(file_name, 'a+')
    # 写入内容
    f.write(content)
    # 换行符
    f.write('\n')
    # 关闭文件
    f.close()
def get_event_from_binlog_by_sql(binlog, mysql_ip, mysql_port, user_name, password, trx_min_size):
    logger.info(f"{mysql_ip}:正在解析日志: {binlog}")
    result_file = f"/home/mysql/mysql_big_trx_check/big_trx_info/event_result_{mysql_ip}_{binlog}_{curr_dt}.log"
    dbconn = pymysql.connect(host=mysql_ip, user=user_name, passwd=password, port=mysql_port)  # use_unicode=False,
    cur = dbconn.cursor()
    show_binlog_events_sql = f"show binlog events in '{binlog}'"
    cur.execute(show_binlog_events_sql)
    binlog_events = cur.fetchall()
    if len(binlog_events) > 0:
        last_event_type = ''
        last_tab_name = ''
        last_event_size = 0
        # 大事务发现标志
        bigtrx_found = False
        for bv_row in binlog_events:
            try:
                log_name, start_pos, event_type, server_id, end_pos, info = bv_row
                if info[0:5].upper() == 'BEGIN':
                    logger.debug(f"Transaction started at position: {start_pos}")
                    # tab_fname_list = []
                    trx_start_pos = start_pos
                    update_events_size = {}
                    # update_events_count = {}
                    delete_events_size = {}
                    # delete_events_count = {}
                    write_events_size = {}
                    # write_events_count = {}
                    trx_size = 0
                    logger.debug(f"Transaction started at position: {trx_start_pos}")
                if event_type.upper() == 'TABLE_MAP':
                    tab_fname = info.split()[2].replace('(', '').replace(')', '')
                    logger.debug(f"Mapped table: {tab_fname}")
                if event_type.upper() == 'UPDATE_ROWS':
                    trx_size += end_pos - start_pos
                    if tab_fname in update_events_size:
                        update_events_size[tab_fname] = update_events_size[tab_fname] + end_pos - start_pos
                        # update_events_count[tab_name] = update_events_count[tab_name] + 1
                    else:
                        update_events_size[tab_fname] = end_pos - start_pos
                        # update_events_count[tab_name] = 1
                if event_type.upper() == 'DELETE_ROWS':
                    trx_size += end_pos - start_pos
                    if tab_fname in delete_events_size:
                        delete_events_size[tab_fname] = delete_events_size[tab_fname] + end_pos - start_pos
                        # delete_events_count[tab_name] = delete_events_count[tab_name] + 1
                    else:
                        delete_events_size[tab_fname] = end_pos - start_pos
                        # delete_events_count[tab_name] = 1
                if event_type.upper() == 'WRITE_ROWS':
                    trx_size += end_pos - start_pos
                    if tab_fname in write_events_size:
                        write_events_size[tab_fname] = write_events_size[tab_fname] + end_pos - start_pos
                        # write_events_count[tab_name] = write_events_count[tab_name] + 1
                    else:
                        write_events_size[tab_fname] = end_pos - start_pos
                        # write_events_count[tab_name] = 1
                if info[0:6].upper() == 'COMMIT':
                    logger.debug(f"Transaction committed at position: {end_pos}")
                    trx_end_pos = end_pos
                    if trx_size / 1024 / 1024 > trx_min_size:
                        bigtrx_found = True
                        output_str = f"{mysql_ip};{round((trx_end_pos - trx_start_pos) / 1024 / 1024, 2)} MB;"
                        if update_events_size:
                            for tab_fname1 in update_events_size:
                                db_name = tab_fname1.split('.')[0].strip().strip('`')
                                tab_bname = tab_fname1.split('.')[1].strip().strip('`')
                                output_str += f"UPD-size:{update_events_size};trx_start_pos:{trx_start_pos};'### UPDATE `{db_name}`.`{tab_bname}`';"
                        if delete_events_size:
                            for tab_fname1 in delete_events_size:
                                db_name = tab_fname1.split('.')[0].strip().strip('`')
                                tab_bname = tab_fname1.split('.')[1].strip().strip('`')
                            output_str += f"DLT-size:{delete_events_size};trx_start_pos:{trx_start_pos};'### DELETE FROM `{db_name}`.`{tab_bname}`';"
                        if write_events_size:
                            for tab_fname1 in write_events_size:
                                db_name = tab_fname1.split('.')[0].strip().strip('`')
                                tab_bname = tab_fname1.split('.')[1].strip().strip('`')
                            output_str += f"WTE-size:{write_events_size};trx_start_pos:{trx_start_pos};'### INSERT INTO `{db_name}`.`{tab_bname}`';"
                        append_file(result_file, output_str)
            except Exception as e:
                logger.error(f"{mysql_ip}:Parse event:{bv_row} is error, Detail is :{e}.")
                sys.exit(1)
        if bigtrx_found:
            logger.info(f"{mysql_ip}:Full event result:[ {result_file} ]")
    else:
        logger.error(f"{mysql_ip}:binlog file:{binlog} is invalid")
    # print(f"Full event result:[ {result_file} ]")
    dbconn.close()
if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='A piece of binlog analysis code.')
    parser.add_argument('--binlog', type=str, help="binlog file(Run as [show binlog events in 'mysql-bin.00001' ).",
                        default=None)
    parser.add_argument('--mysql_ip', type=str, help='mysql database ip addr.', default=None)
    parser.add_argument('--port', type=int, help='mysql database ip addr.', default=3306)
    parser.add_argument('--uname', type=str, help='mysql database username.', default='blxjuser')
    parser.add_argument('--password', type=str, help='mysql database password.', default = None)
    parser.add_argument('--trx_min_size', type=int, help='need print min trx binlog size(MB).', default=1)
    args = parser.parse_args()
    if not args.binlog or not args.mysql_ip:
        logger.error("Missing required arguments: --binlog and --mysql_ip")
        parser.print_help()
        sys.exit(0)
    try:
        password = 'password'
        get_event_from_binlog_by_sql(args.binlog, args.mysql_ip, args.port, args.uname, password,
                             args.trx_min_size)
        logger.info(f"{args.mysql_ip}:{args.binlog}解析完成!")
    except Exception as e:
        logger.error(f"{args.mysql_ip}:解析过程中出现错误: {e}")
        sys.exit(1)binlog_records_for_mysql5.py
import argparse
import sys
import pymysql
import logging
import os
import re
from datetime import datetime
# 配置日志格式和文件名
LOG_DIR = '/home/mysql/mysql_big_trx_check/log'
log_filename = f'binlog_event_monitor_{datetime.now().strftime("%Y%m%d")}.log'
log_file = os.path.join(LOG_DIR, log_filename)
# 确保日志目录存在
os.makedirs(LOG_DIR, exist_ok=True)
# 配置日志,设置为追加模式
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler(log_file, mode='a'),  
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)
def get_binlog_min_max(cursor_target, mysql_ip):
    """获取指定 binlog 文件的最小和最大位置"""
    try:
        query_binlog = f"show binary logs;"
        #query_binlog = f"select * from binary_logs;"
        cursor_target.execute(query_binlog)
        binlog = cursor_target.fetchall()
        
        if binlog is None or not binlog:
            logger.error(f"{mysql_ip}:Binlog 文件不存在或无法读取。")
            return None
        
        # 自定义排序函数
        def binlog_sort_key(binlog_name):
            match = re.search(r'(\d+)$', binlog_name)
            if match:
                return int(match.group(1))
            return 0
        # 对 binlog 按文件名排序
        sort_binlog = sorted(binlog, key=lambda x: binlog_sort_key(x[0]))
        print(sort_binlog)
        # 解析事件,找到最小和最大位置
        min_binlog = sort_binlog[0][0]  
        max_binlog = sort_binlog[-1][0]  
        
        return min_binlog, max_binlog
    except Exception as e:
        logger.error(f"{mysql_ip}:获取 binlog 最小和最大值时出错: {e}")
        return None
def insert_binlog_record(cursor_source, mysql_ip, min_binlog, max_binlog):
    """将 binlog 的最小和最大位置插入到记录表中"""
    try:
        insert = """
        INSERT INTO blxjdb.binlog_bigtrx_records (mysql_ip, min_binlog, max_binlog) VALUES (%s, %s, %s)
        """
        cursor_source.execute(insert, (mysql_ip, min_binlog, max_binlog))
    except Exception as e:
        logger.error(f"{mysql_ip}:插入记录时出错: {e}")
def get_last_max_binlog(cursor_source, mysql_ip):
    """获取最后一次记录的最大位置"""
    try:
        query = """
        SELECT max_binlog FROM blxjdb.binlog_bigtrx_records WHERE mysql_ip = %s 
        """
        cursor_source.execute(query, (mysql_ip,))
        result = cursor_source.fetchall()
        
        if result is None or len(result) == 0:
            logger.info(f"{mysql_ip}:没有找到上次的 binlog 最大值记录")
            return None
            
        max_binlog = result[0][0]
        logger.info(f"{mysql_ip}:上次记录的最大 binlog 位置为: {max_binlog}")
        return max_binlog
    except Exception as e:
        logger.error(f"{mysql_ip}:获取上次最大值时出错: {e}")
        return None
def main(mysql_ip):
    db_source_config = {
        'host': '记录结果的ip',
        'port': 3306,
        'user': 'binlogxunjian',
        'password': 'binlogxunjian'
    }
    
    db_target_config = {
        'host': mysql_ip,
        'port': 3306,
        'user': 'blxjuser',
        'password': 'password'
    }
    #db_target_config['host'] = mysql_ip
    conn_source = pymysql.connect(**db_source_config)
    cursor_source = conn_source.cursor()
    conn_target = pymysql.connect(**db_target_config)
    cursor_target = conn_target.cursor()
    try:
        # 获取最后一次记录的最大位置
        last_max = get_last_max_binlog(cursor_source, mysql_ip)
        # 获取当前 binlog 文件的最小和最大位置
        min_binlog, max_binlog = get_binlog_min_max(cursor_target, mysql_ip)
        if min_binlog is None or max_binlog is None:
            return
        # 确定新的最小位置
        if last_max is None:
            # 无记录则插入
            logger.info(f"{mysql_ip}:没有记录,开始新数据插入!")
            insert_binlog_record(cursor_source, mysql_ip, min_binlog, max_binlog)
            logger.info(f"{mysql_ip}:新数据插入完成!")
            conn_source.commit()
        elif last_max < max_binlog or last_max == max_binlog:
            new_min = last_max
            # last_max < max_binlog 则更新
            insert_binlog_record(cursor_source, mysql_ip, new_min, max_binlog)
            conn_source.commit()
            logger.info(f"{mysql_ip}:binlog存在变化!")
        else:
            logger.info(f"{mysql_ip}:没有新的 binlog 变化!")
    except Exception as e:
        logger.error(f"{mysql_ip}:主函数执行时出错: {e}")
        conn_source.rollback()
    finally:
        if conn_source:
            conn_source.close()
            cursor_source.close()
        if conn_target:
            conn_target.close()
            cursor_target.close()
if __name__ == "__main__":
    parser = argparse.ArgumentParser(description='Insert Binlog Records')
    parser.add_argument('--mysql_ip', type=str, help='mysql database ip addr.', default=None)
    args = parser.parse_args()
    if not args.mysql_ip:
        parser.print_help()
        sys.exit(0)
    main(args.mysql_ip)binlog_event_analyze_for_mysql5.sh
#!/bin/bash
workdir="/home/mysql/mysql_big_trx_check"
cd $workdir
#调用py3环境
source /home/oracle/py3/bin/activate
log_bigtrx_dir=/home/mysql/mysql_big_trx_check/big_trx_info
curr_dt=$(date +%Y%m%d)
export MYSQL_PWD=binlogxunjian
#设置最小事务大小
trx_min_size=90
mysql_ip=$1
function update_binlog_bigtrx_records() {
    
    local mysql_ip=$1
    local exist_big_trx=$2
    mysql -ubinlogxunjian -h记录结果的ip --silent --skip-column-names -e \
        "update blxjdb.binlog_bigtrx_records set exist_big_trx='$exist_big_trx' where mysql_ip='$mysql_ip' and date_format(record_time, '%Y%m%d')=${curr_dt};"
}
# 调用Python脚本获取binlog记录
python binlog_records_for_mysql5.py --mysql_ip $mysql_ip
binlog_list=$(python binlog_list.py --mysql_ip $mysql_ip | tr '\n' ' ')
# 记录是否存在大事务
for binlog in $binlog_list;do
    # 将任务添加到作业队列
#    {
        python binlog_bigtrx_show_for_mysql5.py --mysql_ip $mysql_ip --binlog $binlog --trx_min_size $trx_min_size
#    } &
done
if [ "$(ls ${log_bigtrx_dir}/event_result_${mysql_ip}_*_${curr_dt}_*.log 2> /dev/null |wc -l)" -gt 0 ];then
    update_binlog_bigtrx_records $mysql_ip 'Y'
else
    update_binlog_bigtrx_records $mysql_ip 'N'
fibinlog_event_monitor_for_mysql5.sh
Mysql5版本的待巡检Mysql的ip是写死在文本里的
#!/bin/bash
# 日志记录
curr_dt=`date +%Y%m%d`
log_filename="/home/mysql/mysql_big_trx_check/log/binlog_event_monitor_for_mysql5_${curr_dt}.log"
# 最大并行任务数
MAX_PARALLEL=4
# 任务超时时间(秒)
TIMEOUT=7200  # 例如,设置为120分钟
# 读取MySQL IP列表
mysql_ips=$(cat /home/mysql/mysql_big_trx_check/mysql_ip.list)
# 存储待处理的任务队列
queue=($mysql_ips)
# 存储正在运行的任务PID及其启动时间
declare -A tasks=()  # 使用关联数组,键为PID,值为启动时间
# 陷阱处理,捕获终止信号
trap ctrl_c INT TERM
# 终止信号处理函数
function ctrl_c() {
    # 终止所有子进程
    for pid in "${!tasks[@]}"; do
        if kill -0 "$pid" 2>/dev/null; then
            kill "$pid"
            unset tasks["$pid"]
            echo "ERROR: 终止任务PID: $pid" >> $log_filename
        fi
    done
    echo "ERROR: 脚本终止,所有子进程已关闭。" >> $log_filename
    exit 1
}
# 遍历每个MySQL IP进行解析
function binlog_event_analyze_for_mysql5() {
    local mysql_ip=$1
    echo "现在开始解析: $mysql_ip" >> $log_filename
    binlog_event_analyze_for_mysql5.sh "$mysql_ip" &
    new_pid=$!
    curr_time=`date +%Y%m%d_%H%M%S`
    current_time=$(date +%s)
    tasks["$new_pid"]=$current_time
    echo "$mysql_ip: 任务PID: $new_pid, 启动时间: $curr_time" >> $log_filename
}
# 检查任务状态并处理超时
function check_task_status() {
    local current_time=$(date +%s)
    for pid in "${!tasks[@]}"; do
        if ! kill -0 "$pid" 2>/dev/null; then
            unset tasks["$pid"]
            echo "$mysql_ip: 任务PID: $pid 已完成或不存在,已释放资源。" >> $log_filename
        else
            # 检查任务是否超时
            start_time=${tasks[$pid]}
            if [ $((current_time - start_time)) -ge $TIMEOUT ]; then
                kill "$pid"
                unset tasks["$pid"]
                echo "ERROR: $mysql_ip: 任务PID: $pid 超时终止,已释放资源。" >> $log_filename
            fi
        fi
    done
}
# 主循环:遍历所有MySQL IP
log_interval=120
log_count=0
# 主循环:遍历所有MySQL IP
while [ ${#queue[@]} -gt 0 ]; do
    # 检查任务状态并处理超时
    check_task_status
    # 启动新任务
    if [ "${#tasks[@]}" -lt "$MAX_PARALLEL" ]; then
        mysql_ip=${queue[0]}
        binlog_event_analyze_for_mysql5 "$mysql_ip"
        queue=("${queue[@]:1}")  # 移除已启动的任务
        log_count=0
    else
        # 每120次(1min)循环输出一次队列状态
        log_count=$((log_count + 1))
        if [ $log_count -ge $log_interval ]; then
            echo "并行任务已达最大值$MAX_PARALLEL,请稍等..." >> $log_filename
            echo "待处理的IP队列: ${queue[@]}" >> $log_filename
            log_count=0
        fi
    fi
    # 短暂等待,避免过于频繁的进程检查
    sleep 0.5
done
# 等待所有任务完成
for pid in "${!tasks[@]}"; do
    wait "$pid" 2>/dev/null
    unset tasks["$pid"]
done
echo "所有任务已完成。" >> $log_filename