partition_monitor.py
from datetime import datetime
from ConnDB import ConnOracle
import pymysql
import re
import argparse
import sys
import logging
import os
import subprocess
# 过滤表 (/home/oracle/skip_tables.list) 录入格式
# oracle    -- ip:schema:tablename:oracle
# mysql     -- ip:schema:tablename:mysql
# oceanbase -- tenantname:schema:tablename:oceanbase
# 例如:
# 192.168.1.10:t1db:table1:mysql
# 192.168.1.10:t1db:table2:mysql
# 192.168.1.11:t2db:table3:oracle
# tenantname:t3db:table4:oceanbase
# 配置日志格式和文件名
LOG_DIR = '/home/oracle/log'
log_filename = f'partition_monitor_{datetime.now().strftime("%Y%m%d")}.log'
log_file = os.path.join(LOG_DIR, log_filename)
# 确保日志目录存在
os.makedirs(LOG_DIR, exist_ok=True)
# 配置日志,设置为追加模式
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler(log_file, mode='a')
    ]
)
logger = logging.getLogger(__name__)
ora_tns_file_path = '/home/oracle/.tnsnames.ora'
# 获取今天的日期
current_date = datetime.today().date()
current_date_str = current_date.strftime('%Y%m%d')
current_date_int = int(current_date_str)
# Oracle
def parse_tnsnames_ora(ora_tns_file_path, service_name):
    with open(ora_tns_file_path, 'r') as file:
        content = file.read()
    # 使用正则表达式匹配服务名
    # adg_orcl1 = (DESCRIPTION =(ADDRESS = (PROTOCOL = TCP)(HOST = 192.168.1.111)(PORT = 1521))(CONNECT_DATA =(SERVER = DEDICATED)(SERVICE_NAME = ORCL)))
    pattern = re.compile(rf'^{service_name}\s*=\s*\(\s*DESCRIPTION\s*=\s*\(\s*ADDRESS\s*=\s*\(\s*PROTOCOL\s*=\s*TCP\s*\)\s*\(\s*HOST\s*=\s*([^\)]+)\s*\)\s*\(\s*PORT\s*=\s*(\d+)\s*\)\s*\)\s*\(\s*CONNECT_DATA\s*=\s*\(\s*SERVER\s*=\s*DEDICATED\s*\)\s*\(\s*SERVICE_NAME\s*=\s*([^\)]+)\s*\)\s*\)\s*\)', re.MULTILINE | re.DOTALL)
    match = pattern.search(content)
    if match:
        host = match.group(1).strip()
        port = match.group(2).strip()
        dbname = match.group(3).strip()
        return host, port, dbname
    else:
        logger.critical(f"{service_name}: 在 {ora_tns_file_path} 中不存在 ")
def format_high_value(high_value):
    date_pattern1 = re.compile(r"TO_DATE\s*\('\s*(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\s*', 'SYYYY-MM-DD HH24:MI:SS', 'NLS_CALENDAR=GREGORIAN'\)")
    match = date_pattern1.search(high_value)
    if match:
        date_str = match.group(1)
        date_obj = datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S')
        return date_obj.strftime('%Y%m%d')
    else:
        date_pattern2 = re.compile(r"'(\d{8})'")
        match = date_pattern2.search(high_value)
        if match:
            return match.group(1)
        else:
            return high_value
def ora_partition_monitor(service_name):
    try:
        host, port, dbname = parse_tnsnames_ora(ora_tns_file_path, service_name)
        #读取跳过表的列表
        excluded_tables = set()
        try:
            with open('/home/oracle/skip_tables.list', 'r') as f:
                for line in f:
                    line = line.strip()
                    # 忽略空行和注释的行
                    if line and not line.startswith('#') and line.endswith('oracle'):
                        parts = line.split(':')
                        if len(parts) >= 3:
                            ip, schema, table = parts[0], parts[1], parts[2]
                            # 只处理当前ip
                            if ip == host:
                                excluded_tables.add((schema, table))
        except FileNotFoundError:
            # 查不到则使用默认配置
            pass
        zytj_db = ConnOracle('query','query',host,port,dbname)
        # 执行 SQL 查询
        status_sql = '''
        select 1 from dual
        '''
        ora_sql = '''
        select a.table_owner, a.table_name, max(partition_position)
        from dba_tab_partitions a, dba_part_tables b, dba_tables c
        where a.table_name = b.table_name
        and b.table_name = c.table_name
        and a.table_owner not in ('SYS', 'SYSTEM', 'MDSYS', 'AUDSYS')
        -- and b.PARTITIONING_TYPE = 'RANGE'
        and a.high_value is not null
        and b.INTERVAL is null
        and a.table_name not in ('PUB_TRAN_LOG_MID')
        and a.partition_name not like '%MAXVALUE%'
        group by a.table_owner, a.table_name
        '''
        status_db = zytj_db.select_noheader(status_sql)
        if status_db is None or len(status_db) == 0:
            logger.critical(f"{service_name}: 无法获取数据库状态")
            return
        else:
            results = zytj_db.select_noheader(ora_sql)
            if results is None or len(results) == 0:
                logger.info(f"{service_name}: 没有分区表或不存在满足条件的分区表")
                return
            else:
                filtered_patitions = []
                for result in results:
                    table_owner = result[0]
                    table_name = result[1]
                    # 如果需要跳过,则跳过
                    if (table_owner, table_name) in excluded_tables:
                        continue
                    filtered_patitions.append(result)
                for partition in filtered_patitions:
                    table_owner = partition[0]
                    table_name = partition[1]
                    partition_position = partition[2]
                    detail_sql = f'''
                    select table_owner, table_name, partition_position, partition_name, high_value
                    from dba_tab_partitions
                    where table_owner = '{table_owner}' and table_name = '{table_name}' and partition_position = {partition_position}
                    '''
                    detail_results = zytj_db.select_noheader(detail_sql)
                    for detail_result in detail_results:
                        table_owner = detail_result[0]
                        table_name = detail_result[1]
                        partition_position = detail_result[2]
                        partition_name = detail_result[3]
                        high_value = detail_result[4]
                        formatted_high_value = format_high_value(high_value)
                        # 将格式化后的 high_value 转换为整数以便比较,设置为30内过期就告警
                        try:
                            formatted_high_value_int = int(formatted_high_value)
                            if formatted_high_value_int < 19000000 or formatted_high_value_int > 30000000:
                                logger.error(f"{host}:库名:{table_owner},表名:{table_name},目前最大分区为:{partition_name},分区描述:{high_value},分区最高范围至:{formatted_high_value},分区异常")
                            elif formatted_high_value_int > 19000000 and formatted_high_value_int < current_date_int:
                                logger.error(f"{host}:库名:{table_owner},表名:{table_name},目前最大分区为:{partition_name},分区描述:{high_value},分区最高范围至:{formatted_high_value},分区已过期")
                            elif formatted_high_value_int - current_date_int <= 30:
                                logger.warning(f"{host}:库名:{table_owner},表名:{table_name},目前最大分区为:{partition_name},分区描述:{high_value},分区最高范围至:{formatted_high_value},需及时建立新分区")
                            else:
                                logger.info(f"{host}:库名:{table_owner},表名:{table_name},目前最大分区为:{partition_name},分区描述:{high_value},分区最高范围至:{formatted_high_value},分区正常")
                        except ValueError:
                            formatted_high_value_int = 0  # 如果无法转换为整数,则设为 0
        zytj_db.disconnect()
    except ValueError as e:
        logger.critical(f"{service_name}: 错误: {e}")
    except Exception as e:
        logger.critical(f"{service_name}: 未知错误: {e}")
# MYSQL
def mysql_partition_monitor(cursor_db, mysql_ip):
    try:
        #读取跳过表的列表
        excluded_tables = set()
        try:
            with open('/home/oracle/skip_tables.list', 'r') as f:
                for line in f:
                    line = line.strip()
                    # 忽略空行和注释的行
                    if line and not line.startswith('#') and line.endswith('mysql'):
                        parts = line.split(':')
                        if len(parts) >= 3:
                            ip, schema, table = parts[0], parts[1], parts[2]
                            # 只处理当前ip
                            if ip == mysql_ip:
                                excluded_tables.add((schema, table))
        except FileNotFoundError:
            # 查不到则使用默认配置
            pass
        mysql_query_partition = """SELECT
            TABLE_SCHEMA,
            TABLE_NAME,
            MAX(PARTITION_ORDINAL_POSITION) as MAX_PARTITION_ORDINAL_POSITION
            FROM information_schema.partitions
            WHERE TABLE_SCHEMA not in ('mysql','sys','information_schema','performance_schema')
            AND PARTITION_METHOD is not null
            AND PARTITION_DESCRIPTION <> 'MAXVALUE'
            GROUP BY TABLE_SCHEMA, TABLE_NAME
            ORDER BY TABLE_SCHEMA, TABLE_NAME"""
        cursor_db.execute(mysql_query_partition)
        partitions = cursor_db.fetchall()
        if partitions is None or not partitions:
            logger.info(f"{mysql_ip}:不存在分区表")
            return None
        else:
            # 跳过无需巡检的分区表
            filtered_patitions = []
            for partition in partitions:
                schema_name = partition[0]
                table_name = partition[1]
                # 如果需要跳过,则跳过
                if (schema_name, table_name) in excluded_tables:
                    continue
                filtered_patitions.append(partition)
            for partition in filtered_patitions:
                detail_query = """SELECT TABLE_SCHEMA, TABLE_NAME, PARTITION_NAME, PARTITION_EXPRESSION, PARTITION_DESCRIPTION
                 FROM information_schema.partitions
                 WHERE TABLE_SCHEMA=%s AND TABLE_NAME=%s AND PARTITION_ORDINAL_POSITION=%s"""
                cursor_db.execute(detail_query, (partition[0], partition[1], partition[2]))
                detail_result = cursor_db.fetchall()
                for row_result in detail_result:
                    TABLE_SCHEMA = row_result[0]
                    TABLE_NAME = row_result[1]
                    PARTITION_NAME = row_result[2]
                    PARTITION_EXPRESSION = row_result[3]
                    PARTITION_DESCRIPTION = row_result[4]
                    formatted_partition_value = format_partition_description(PARTITION_EXPRESSION, PARTITION_DESCRIPTION, cursor_db)
                    # 将格式化后的 PARTITION_DESCRIPTION 转换为整数以便比较
                    try:
                        formatted_partition_value_int = int(formatted_partition_value)
                        if formatted_partition_value_int < current_date_int:
                            logger.error(f"{mysql_ip}:库名:{TABLE_SCHEMA},表名:{TABLE_NAME},目前最大分区为:{PARTITION_NAME},分区描述:{PARTITION_DESCRIPTION},分区最高范围至:{formatted_partition_value},分区已过期")
                        elif formatted_partition_value_int - current_date_int <= 30:
                            logger.warning(f"{mysql_ip}:库名:{TABLE_SCHEMA},表名:{TABLE_NAME},目前最大分区为:{PARTITION_NAME},分区描述:{PARTITION_DESCRIPTION},分区最高范围至:{formatted_partition_value},需及时建立新分区")
                        else:
                            logger.info(f"{mysql_ip}:库名:{TABLE_SCHEMA},表名:{TABLE_NAME},目前最大分区为:{PARTITION_NAME},分区描述:{PARTITION_DESCRIPTION},分区最高范围至:{formatted_partition_value},分区正常")
                    except ValueError:
                        formatted_partition_value_int = 0  # 如果无法转换为整数,则设为 0
            return filtered_patitions
    except Exception as e:
        logger.error(f"{mysql_ip}:查询分区信息出错:{e}")
        return None
# mysql可能存在的格式'2023-03-01 00:00:00','20240701',20230318,1754582400000,1727712000,'202501',202501,738764(to_days)
def format_partition_description(PARTITION_EXPRESSION, partition_description, cursor_db):
    value_str = str(partition_description).strip()
    if 'to_days' not in str(PARTITION_EXPRESSION).lower():
        date_pattern1 = re.compile(r"'(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})'")
        match = date_pattern1.search(value_str)
        if match:
            try:
                date_str = match.group(1)
                date_obj = datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S')
                return date_obj.strftime('%Y%m%d')
            except ValueError:
                pass
        date_pattern2 = re.compile(r"'(\d{8})'")
        match = date_pattern2.search(value_str)
        if match:
            return match.group(1)
        date_pattern3 = re.compile(r"^(\d{8})$")
        match = date_pattern3.search(value_str)
        if match:
            return match.group(1)
        date_pattern4 = re.compile(r"'(\d{6})'")
        match = date_pattern4.search(value_str)
        if match:
            date_str = match.group(1)
            return f"{date_str}01"
        date_pattern5 = re.compile(r"^(\d{6})$")
        match = date_pattern5.search(value_str)
        if match:
            date_str = match.group(1)
            return f"{date_str}01"
        date_pattern6 = re.compile(r"^(\d{9,})$")
        match = date_pattern6.search(value_str)
        if match:
            timestamp = match.group(1)
            try:
                if len(timestamp) >= 13:
                    timestamp_seconds = int(timestamp[:10])
                    date_obj = datetime.fromtimestamp(timestamp_seconds)
                    # print(date_obj.strftime('%Y-%m-%d %H:%M:%S'))
                    return date_obj.strftime('%Y%m%d')
                elif len(timestamp) >= 10:
                    timestamp_seconds = int(timestamp)
                    date_obj = datetime.fromtimestamp(timestamp_seconds)
                    # print(date_obj.strftime('%Y-%m-%d %H:%M:%S'))
                    return date_obj.strftime('%Y%m%d')
            except (ValueError, OSError):
                # 如果时间戳解析失败,返回原值
                return timestamp
        else:
            return value_str
    else:
        # 针对to_days函数做分区条件的转化
        date_pattern7 = re.compile(r"(\d{6})")
        date_format_current = date_pattern7.search(value_str)
        if date_format_current:
            date_format_current_str = date_format_current.group(1)            
            
            if len(date_format_current_str) == 6 and date_format_current_str.startswith('7'):
                try:
                    sql = """select date_format(str_to_date(from_days(%s),'%%Y-%%m-%%d'),'%%Y%%m%%d') from dual"""
                    cursor_db.execute(sql, (date_format_current_str,))
                    result = cursor_db.fetchone()
                    if result:
                        return result[0]
                    else:
                        return value_str
                except Exception as e:
                    logger.error(f"执行 from_days 查询失败: {e}")
                    return value_str
        else:
            return value_str
# 密码通过特殊处理,若密码写死则不需要该方法
def get_mysql_password(mysql_ip, username):
    try:
        result = subprocess.run(['/home/oracle/bin/get_pswd.sh', mysql_ip, username], stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
        return result.stdout.strip()
    except subprocess.CalledProcessError as e:
        logger.critical(f"{mysql_ip}:获取MySQL密码失败: {e}")
        return None
    except FileNotFoundError:
        logger.critical("找不到get_user_pswd.sh脚本")
        return None
#OB_MYSQL
def ob_mysql_partition_monitor(cursor_ob_db, tenant_name):
    try:
        #读取跳过表的列表
        excluded_tables = set()
        try:
            with open('/home/oracle/skip_tables.list', 'r') as f:
                for line in f:
                    line = line.strip()
                    # 忽略空行和注释的行
                    if line and not line.startswith('#') and line.endswith('oceanbase'):
                        parts = line.split(':')
                        if len(parts) >= 2:
                            tenantname, schema, table = parts[0], parts[1], parts[2]
                            if tenantname == tenant_name:
                                excluded_tables.add((schema, table))
        except FileNotFoundError:
            # 查不到则使用默认配置
            pass
        query_partition = """SELECT
            TABLE_SCHEMA,
            TABLE_NAME,            
            MAX(PARTITION_ORDINAL_POSITION) as MAX_PARTITION_ORDINAL_POSITION
            FROM information_schema.partitions
            WHERE TABLE_SCHEMA not in ('mysql','sys','information_schema','performance_schema')
            AND PARTITION_METHOD is not null
            AND PARTITION_DESCRIPTION <> 'MAXVALUE'
            GROUP BY TABLE_SCHEMA, TABLE_NAME
            ORDER BY TABLE_SCHEMA, TABLE_NAME"""
        cursor_ob_db.execute(query_partition)
        partitions = cursor_ob_db.fetchall()
        if partitions is None or not partitions:
            logger.info(f"{tenant_name}:不存在分区表")
            return None
        else:
            # 跳过无需巡检的分区表
            filtered_patitions = []
            for partition in partitions:
                schema_name = partition[0]
                table_name = partition[1]
                # 如果需要跳过,则跳过
                if (schema_name, table_name) in excluded_tables:
                    continue
                filtered_patitions.append(partition)
            for partition in filtered_patitions:
                detail_query = """SELECT TABLE_SCHEMA, TABLE_NAME, PARTITION_NAME, PARTITION_DESCRIPTION
                 FROM information_schema.partitions
                 WHERE TABLE_SCHEMA=%s AND TABLE_NAME=%s AND PARTITION_ORDINAL_POSITION=%s"""
                cursor_ob_db.execute(detail_query, (partition[0], partition[1], partition[2]))
                detail_result = cursor_ob_db.fetchall()
                for row_result in detail_result:
                    OB_TABLE_SCHEMA = row_result[0]
                    OB_TABLE_NAME = row_result[1]
                    OB_PARTITION_NAME = row_result[2]
                    OB_PARTITION_DESCRIPTION = row_result[3]
                    ob_formatted_partition_value = ob_format_partition_description(OB_PARTITION_DESCRIPTION)
                    # 将格式化后的 PARTITION_DESCRIPTION 转换为整数以便比较
                    try:
                        ob_formatted_partition_value_int = int(ob_formatted_partition_value)
                        if ob_formatted_partition_value_int < current_date_int:
                            logger.error(f"{tenant_name}:库名:{OB_TABLE_SCHEMA},表名:{OB_TABLE_NAME},目前最大分区为:{OB_PARTITION_NAME},分区描述:{OB_PARTITION_DESCRIPTION},分区最高范围至:{ob_formatted_partition_value},分区已过期")
                        elif ob_formatted_partition_value_int - current_date_int <= 30:
                            logger.warning(f"{tenant_name}:库名:{OB_TABLE_SCHEMA},表名:{OB_TABLE_NAME},目前最大分区为:{OB_PARTITION_NAME},分区描述:{OB_PARTITION_DESCRIPTION},分区最高范围至:{ob_formatted_partition_value},需及时建立新分区")
                        else:
                            logger.info(f"{tenant_name}:库名:{OB_TABLE_SCHEMA},表名:{OB_TABLE_NAME},目前最大分区为:{OB_PARTITION_NAME},分区描述:{OB_PARTITION_DESCRIPTION},分区最高范围至:{ob_formatted_partition_value},分区正常")
                    except ValueError:
                        ob_formatted_partition_value_int = 0  # 如果无法转换为整数,则设为 0
            return partitions
    except Exception as e:
        logger.error(f"{tenant_name}:查询分区信息出错:{e}")
        return None
# ob-mysql可能存在的格式'2021-01-01 00:00:00','2021-01-01'
def ob_format_partition_description(OB_PARTITION_DESCRIPTION):
    value_str = str(OB_PARTITION_DESCRIPTION).strip()
    date_pattern1 = re.compile(r"'(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})'")
    match = date_pattern1.search(value_str)
    if match:
        try:
            date_str = match.group(1)
            date_obj = datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S')
            return date_obj.strftime('%Y%m%d')
        except ValueError:
            pass
    date_pattern2 = re.compile(r"'(\d{4}-\d{2}-\d{2})'")
    match = date_pattern2.search(value_str)
    if match:
        date_obj = datetime.strptime(match.group(1), '%Y-%m-%d')
        return date_obj.strftime('%Y%m%d')
    else:
        return value_str
if __name__ == '__main__':
    logger.info(f"分区巡检开始,当前日期: {current_date_str}")
    logger.info(f"分区巡检日志路径:{log_file}")
    service_names = ['adg_orcl1','adg_orcl2']
    try:
        logger.info(f"Oracle分区巡检开始:")        
        for service_name in service_names:
            ora_partition_monitor(service_name)
    except cx_Oracle.DatabaseError as e:
        logger.critical(f"{service_name}: {e}")
    except ValueError as e:
        logger.critical(f"{service_name}: {e}")
    try:
        logger.info(f"Mysql分区巡检开始:")
        
        # 此处的mysql_ip存储在oracle表中,也可以写死在文件当中,根据需求修改
        zytj_db = ConnOracle('query', 'password', '带入ip', 1521, 'xjdb')
        mysql_ips_sql='''select HOST from MYSQL_HOST'''
        mysql_ips=zytj_db.select_noheader(mysql_ips_sql)
        if mysql_ips is None or len(mysql_ips) == 0:
            logger.critical("No MySQL IPs found in the database.")
        else:
            for row in mysql_ips:
            
                mysql_ip = row[0]
                try:
                
                    # 此处密码通过特殊处理,也可以保持待巡检机器的账号密码相同,只要在脚本中写死即可
                    password = get_mysql_password(mysql_ip, 'query')
                    db_config = {
                        'host': mysql_ip,
                        'port': 3306,
                        'user': 'query',
                        'password': password
                    }
                    conn_db = pymysql.connect(**db_config)
                    cursor_db = conn_db.cursor()
                    #获取分区信息
                    partitions = mysql_partition_monitor(cursor_db, mysql_ip)
                    cursor_db.close()
                    conn_db.close()
                except Exception as e:
                    logger.error(f"MySQL连接或查询失败 {mysql_ip}: {e}")
    except ValueError as e:
        logger.critical(f"错误: {e}")
    except Exception as e:
        logger.critical(f"未知错误: {e}")
    try:
        logger.info(f"OB-Mysql分区巡检开始:")
        if tenant_info.returncode == 0:
            lines = tenant_info.stdout.strip().split('\n')
            mysql_tenants = []
            for line in lines:
                if line.startswith('MYSQL') and not line.endswith('sys'):
                    parts = line.split()
                    if len(parts) >= 2:
                        tenant_name = parts[1]
                        mysql_tenants.append(tenant_name)
            logger.info(f"获取MYSQL租户:{mysql_tenants}")
            for tenant_name in mysql_tenants:
                try:
                
                    # 根据实际情况带入租户名,提前在ocp建好巡检用户
                    username = f'xj_user@{tenant_name}#obtestprd'
                    db_config = {
                        'host': 'ob_ip',
                        'port': 2883,
                        'user': username,
                        'password': 'password'
                    }
                    conn_ob_mysql = pymysql.connect(**db_config)
                    cursor_ob_db = conn_ob_mysql.cursor()
                    #获取分区信息
                    partitions = ob_mysql_partition_monitor(cursor_ob_db, tenant_name)
                    cursor_ob_db.close()
                    conn_ob_mysql.close()
                except Exception as e:
                    logger.error(f"MySQL连接或查询失败 {tenant_name}: {e}")
    except ValueError as e:
        logger.critical(f"错误: {e}")
    except Exception as e:
        logger.critical(f"未知错误: {e}")