partition_monitor.py
from datetime import datetime
from ConnDB import ConnOracle
import pymysql
import re
import argparse
import sys
import logging
import os
import subprocess
# 过滤表 (/home/oracle/skip_tables.list) 录入格式
# oracle -- ip:schema:tablename:oracle
# mysql -- ip:schema:tablename:mysql
# oceanbase -- tenantname:schema:tablename:oceanbase
# 例如:
# 192.168.1.10:t1db:table1:mysql
# 192.168.1.10:t1db:table2:mysql
# 192.168.1.11:t2db:table3:oracle
# tenantname:t3db:table4:oceanbase
# 配置日志格式和文件名
LOG_DIR = '/home/oracle/log'
log_filename = f'partition_monitor_{datetime.now().strftime("%Y%m%d")}.log'
log_file = os.path.join(LOG_DIR, log_filename)
# 确保日志目录存在
os.makedirs(LOG_DIR, exist_ok=True)
# 配置日志,设置为追加模式
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file, mode='a')
]
)
logger = logging.getLogger(__name__)
ora_tns_file_path = '/home/oracle/.tnsnames.ora'
# 获取今天的日期
current_date = datetime.today().date()
current_date_str = current_date.strftime('%Y%m%d')
current_date_int = int(current_date_str)
# Oracle
def parse_tnsnames_ora(ora_tns_file_path, service_name):
with open(ora_tns_file_path, 'r') as file:
content = file.read()
# 使用正则表达式匹配服务名
# adg_orcl1 = (DESCRIPTION =(ADDRESS = (PROTOCOL = TCP)(HOST = 192.168.1.111)(PORT = 1521))(CONNECT_DATA =(SERVER = DEDICATED)(SERVICE_NAME = ORCL)))
pattern = re.compile(rf'^{service_name}\s*=\s*\(\s*DESCRIPTION\s*=\s*\(\s*ADDRESS\s*=\s*\(\s*PROTOCOL\s*=\s*TCP\s*\)\s*\(\s*HOST\s*=\s*([^\)]+)\s*\)\s*\(\s*PORT\s*=\s*(\d+)\s*\)\s*\)\s*\(\s*CONNECT_DATA\s*=\s*\(\s*SERVER\s*=\s*DEDICATED\s*\)\s*\(\s*SERVICE_NAME\s*=\s*([^\)]+)\s*\)\s*\)\s*\)', re.MULTILINE | re.DOTALL)
match = pattern.search(content)
if match:
host = match.group(1).strip()
port = match.group(2).strip()
dbname = match.group(3).strip()
return host, port, dbname
else:
logger.critical(f"{service_name}: 在 {ora_tns_file_path} 中不存在 ")
def format_high_value(high_value):
date_pattern1 = re.compile(r"TO_DATE\s*\('\s*(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\s*', 'SYYYY-MM-DD HH24:MI:SS', 'NLS_CALENDAR=GREGORIAN'\)")
match = date_pattern1.search(high_value)
if match:
date_str = match.group(1)
date_obj = datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S')
return date_obj.strftime('%Y%m%d')
else:
date_pattern2 = re.compile(r"'(\d{8})'")
match = date_pattern2.search(high_value)
if match:
return match.group(1)
else:
return high_value
def ora_partition_monitor(service_name):
try:
host, port, dbname = parse_tnsnames_ora(ora_tns_file_path, service_name)
#读取跳过表的列表
excluded_tables = set()
try:
with open('/home/oracle/skip_tables.list', 'r') as f:
for line in f:
line = line.strip()
# 忽略空行和注释的行
if line and not line.startswith('#') and line.endswith('oracle'):
parts = line.split(':')
if len(parts) >= 3:
ip, schema, table = parts[0], parts[1], parts[2]
# 只处理当前ip
if ip == host:
excluded_tables.add((schema, table))
except FileNotFoundError:
# 查不到则使用默认配置
pass
zytj_db = ConnOracle('query','query',host,port,dbname)
# 执行 SQL 查询
status_sql = '''
select 1 from dual
'''
ora_sql = '''
select a.table_owner, a.table_name, max(partition_position)
from dba_tab_partitions a, dba_part_tables b, dba_tables c
where a.table_name = b.table_name
and b.table_name = c.table_name
and a.table_owner not in ('SYS', 'SYSTEM', 'MDSYS', 'AUDSYS')
-- and b.PARTITIONING_TYPE = 'RANGE'
and a.high_value is not null
and b.INTERVAL is null
and a.table_name not in ('PUB_TRAN_LOG_MID')
and a.partition_name not like '%MAXVALUE%'
group by a.table_owner, a.table_name
'''
status_db = zytj_db.select_noheader(status_sql)
if status_db is None or len(status_db) == 0:
logger.critical(f"{service_name}: 无法获取数据库状态")
return
else:
results = zytj_db.select_noheader(ora_sql)
if results is None or len(results) == 0:
logger.info(f"{service_name}: 没有分区表或不存在满足条件的分区表")
return
else:
filtered_patitions = []
for result in results:
table_owner = result[0]
table_name = result[1]
# 如果需要跳过,则跳过
if (table_owner, table_name) in excluded_tables:
continue
filtered_patitions.append(result)
for partition in filtered_patitions:
table_owner = partition[0]
table_name = partition[1]
partition_position = partition[2]
detail_sql = f'''
select table_owner, table_name, partition_position, partition_name, high_value
from dba_tab_partitions
where table_owner = '{table_owner}' and table_name = '{table_name}' and partition_position = {partition_position}
'''
detail_results = zytj_db.select_noheader(detail_sql)
for detail_result in detail_results:
table_owner = detail_result[0]
table_name = detail_result[1]
partition_position = detail_result[2]
partition_name = detail_result[3]
high_value = detail_result[4]
formatted_high_value = format_high_value(high_value)
# 将格式化后的 high_value 转换为整数以便比较,设置为30内过期就告警
try:
formatted_high_value_int = int(formatted_high_value)
if formatted_high_value_int < 19000000 or formatted_high_value_int > 30000000:
logger.error(f"{host}:库名:{table_owner},表名:{table_name},目前最大分区为:{partition_name},分区描述:{high_value},分区最高范围至:{formatted_high_value},分区异常")
elif formatted_high_value_int > 19000000 and formatted_high_value_int < current_date_int:
logger.error(f"{host}:库名:{table_owner},表名:{table_name},目前最大分区为:{partition_name},分区描述:{high_value},分区最高范围至:{formatted_high_value},分区已过期")
elif formatted_high_value_int - current_date_int <= 30:
logger.warning(f"{host}:库名:{table_owner},表名:{table_name},目前最大分区为:{partition_name},分区描述:{high_value},分区最高范围至:{formatted_high_value},需及时建立新分区")
else:
logger.info(f"{host}:库名:{table_owner},表名:{table_name},目前最大分区为:{partition_name},分区描述:{high_value},分区最高范围至:{formatted_high_value},分区正常")
except ValueError:
formatted_high_value_int = 0 # 如果无法转换为整数,则设为 0
zytj_db.disconnect()
except ValueError as e:
logger.critical(f"{service_name}: 错误: {e}")
except Exception as e:
logger.critical(f"{service_name}: 未知错误: {e}")
# MYSQL
def mysql_partition_monitor(cursor_db, mysql_ip):
try:
#读取跳过表的列表
excluded_tables = set()
try:
with open('/home/oracle/skip_tables.list', 'r') as f:
for line in f:
line = line.strip()
# 忽略空行和注释的行
if line and not line.startswith('#') and line.endswith('mysql'):
parts = line.split(':')
if len(parts) >= 3:
ip, schema, table = parts[0], parts[1], parts[2]
# 只处理当前ip
if ip == mysql_ip:
excluded_tables.add((schema, table))
except FileNotFoundError:
# 查不到则使用默认配置
pass
mysql_query_partition = """SELECT
TABLE_SCHEMA,
TABLE_NAME,
MAX(PARTITION_ORDINAL_POSITION) as MAX_PARTITION_ORDINAL_POSITION
FROM information_schema.partitions
WHERE TABLE_SCHEMA not in ('mysql','sys','information_schema','performance_schema')
AND PARTITION_METHOD is not null
AND PARTITION_DESCRIPTION <> 'MAXVALUE'
GROUP BY TABLE_SCHEMA, TABLE_NAME
ORDER BY TABLE_SCHEMA, TABLE_NAME"""
cursor_db.execute(mysql_query_partition)
partitions = cursor_db.fetchall()
if partitions is None or not partitions:
logger.info(f"{mysql_ip}:不存在分区表")
return None
else:
# 跳过无需巡检的分区表
filtered_patitions = []
for partition in partitions:
schema_name = partition[0]
table_name = partition[1]
# 如果需要跳过,则跳过
if (schema_name, table_name) in excluded_tables:
continue
filtered_patitions.append(partition)
for partition in filtered_patitions:
detail_query = """SELECT TABLE_SCHEMA, TABLE_NAME, PARTITION_NAME, PARTITION_EXPRESSION, PARTITION_DESCRIPTION
FROM information_schema.partitions
WHERE TABLE_SCHEMA=%s AND TABLE_NAME=%s AND PARTITION_ORDINAL_POSITION=%s"""
cursor_db.execute(detail_query, (partition[0], partition[1], partition[2]))
detail_result = cursor_db.fetchall()
for row_result in detail_result:
TABLE_SCHEMA = row_result[0]
TABLE_NAME = row_result[1]
PARTITION_NAME = row_result[2]
PARTITION_EXPRESSION = row_result[3]
PARTITION_DESCRIPTION = row_result[4]
formatted_partition_value = format_partition_description(PARTITION_EXPRESSION, PARTITION_DESCRIPTION, cursor_db)
# 将格式化后的 PARTITION_DESCRIPTION 转换为整数以便比较
try:
formatted_partition_value_int = int(formatted_partition_value)
if formatted_partition_value_int < current_date_int:
logger.error(f"{mysql_ip}:库名:{TABLE_SCHEMA},表名:{TABLE_NAME},目前最大分区为:{PARTITION_NAME},分区描述:{PARTITION_DESCRIPTION},分区最高范围至:{formatted_partition_value},分区已过期")
elif formatted_partition_value_int - current_date_int <= 30:
logger.warning(f"{mysql_ip}:库名:{TABLE_SCHEMA},表名:{TABLE_NAME},目前最大分区为:{PARTITION_NAME},分区描述:{PARTITION_DESCRIPTION},分区最高范围至:{formatted_partition_value},需及时建立新分区")
else:
logger.info(f"{mysql_ip}:库名:{TABLE_SCHEMA},表名:{TABLE_NAME},目前最大分区为:{PARTITION_NAME},分区描述:{PARTITION_DESCRIPTION},分区最高范围至:{formatted_partition_value},分区正常")
except ValueError:
formatted_partition_value_int = 0 # 如果无法转换为整数,则设为 0
return filtered_patitions
except Exception as e:
logger.error(f"{mysql_ip}:查询分区信息出错:{e}")
return None
# mysql可能存在的格式'2023-03-01 00:00:00','20240701',20230318,1754582400000,1727712000,'202501',202501,738764(to_days)
def format_partition_description(PARTITION_EXPRESSION, partition_description, cursor_db):
value_str = str(partition_description).strip()
if 'to_days' not in str(PARTITION_EXPRESSION).lower():
date_pattern1 = re.compile(r"'(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})'")
match = date_pattern1.search(value_str)
if match:
try:
date_str = match.group(1)
date_obj = datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S')
return date_obj.strftime('%Y%m%d')
except ValueError:
pass
date_pattern2 = re.compile(r"'(\d{8})'")
match = date_pattern2.search(value_str)
if match:
return match.group(1)
date_pattern3 = re.compile(r"^(\d{8})$")
match = date_pattern3.search(value_str)
if match:
return match.group(1)
date_pattern4 = re.compile(r"'(\d{6})'")
match = date_pattern4.search(value_str)
if match:
date_str = match.group(1)
return f"{date_str}01"
date_pattern5 = re.compile(r"^(\d{6})$")
match = date_pattern5.search(value_str)
if match:
date_str = match.group(1)
return f"{date_str}01"
date_pattern6 = re.compile(r"^(\d{9,})$")
match = date_pattern6.search(value_str)
if match:
timestamp = match.group(1)
try:
if len(timestamp) >= 13:
timestamp_seconds = int(timestamp[:10])
date_obj = datetime.fromtimestamp(timestamp_seconds)
# print(date_obj.strftime('%Y-%m-%d %H:%M:%S'))
return date_obj.strftime('%Y%m%d')
elif len(timestamp) >= 10:
timestamp_seconds = int(timestamp)
date_obj = datetime.fromtimestamp(timestamp_seconds)
# print(date_obj.strftime('%Y-%m-%d %H:%M:%S'))
return date_obj.strftime('%Y%m%d')
except (ValueError, OSError):
# 如果时间戳解析失败,返回原值
return timestamp
else:
return value_str
else:
# 针对to_days函数做分区条件的转化
date_pattern7 = re.compile(r"(\d{6})")
date_format_current = date_pattern7.search(value_str)
if date_format_current:
date_format_current_str = date_format_current.group(1)
if len(date_format_current_str) == 6 and date_format_current_str.startswith('7'):
try:
sql = """select date_format(str_to_date(from_days(%s),'%%Y-%%m-%%d'),'%%Y%%m%%d') from dual"""
cursor_db.execute(sql, (date_format_current_str,))
result = cursor_db.fetchone()
if result:
return result[0]
else:
return value_str
except Exception as e:
logger.error(f"执行 from_days 查询失败: {e}")
return value_str
else:
return value_str
# 密码通过特殊处理,若密码写死则不需要该方法
def get_mysql_password(mysql_ip, username):
try:
result = subprocess.run(['/home/oracle/bin/get_pswd.sh', mysql_ip, username], stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
return result.stdout.strip()
except subprocess.CalledProcessError as e:
logger.critical(f"{mysql_ip}:获取MySQL密码失败: {e}")
return None
except FileNotFoundError:
logger.critical("找不到get_user_pswd.sh脚本")
return None
#OB_MYSQL
def ob_mysql_partition_monitor(cursor_ob_db, tenant_name):
try:
#读取跳过表的列表
excluded_tables = set()
try:
with open('/home/oracle/skip_tables.list', 'r') as f:
for line in f:
line = line.strip()
# 忽略空行和注释的行
if line and not line.startswith('#') and line.endswith('oceanbase'):
parts = line.split(':')
if len(parts) >= 2:
tenantname, schema, table = parts[0], parts[1], parts[2]
if tenantname == tenant_name:
excluded_tables.add((schema, table))
except FileNotFoundError:
# 查不到则使用默认配置
pass
query_partition = """SELECT
TABLE_SCHEMA,
TABLE_NAME,
MAX(PARTITION_ORDINAL_POSITION) as MAX_PARTITION_ORDINAL_POSITION
FROM information_schema.partitions
WHERE TABLE_SCHEMA not in ('mysql','sys','information_schema','performance_schema')
AND PARTITION_METHOD is not null
AND PARTITION_DESCRIPTION <> 'MAXVALUE'
GROUP BY TABLE_SCHEMA, TABLE_NAME
ORDER BY TABLE_SCHEMA, TABLE_NAME"""
cursor_ob_db.execute(query_partition)
partitions = cursor_ob_db.fetchall()
if partitions is None or not partitions:
logger.info(f"{tenant_name}:不存在分区表")
return None
else:
# 跳过无需巡检的分区表
filtered_patitions = []
for partition in partitions:
schema_name = partition[0]
table_name = partition[1]
# 如果需要跳过,则跳过
if (schema_name, table_name) in excluded_tables:
continue
filtered_patitions.append(partition)
for partition in filtered_patitions:
detail_query = """SELECT TABLE_SCHEMA, TABLE_NAME, PARTITION_NAME, PARTITION_DESCRIPTION
FROM information_schema.partitions
WHERE TABLE_SCHEMA=%s AND TABLE_NAME=%s AND PARTITION_ORDINAL_POSITION=%s"""
cursor_ob_db.execute(detail_query, (partition[0], partition[1], partition[2]))
detail_result = cursor_ob_db.fetchall()
for row_result in detail_result:
OB_TABLE_SCHEMA = row_result[0]
OB_TABLE_NAME = row_result[1]
OB_PARTITION_NAME = row_result[2]
OB_PARTITION_DESCRIPTION = row_result[3]
ob_formatted_partition_value = ob_format_partition_description(OB_PARTITION_DESCRIPTION)
# 将格式化后的 PARTITION_DESCRIPTION 转换为整数以便比较
try:
ob_formatted_partition_value_int = int(ob_formatted_partition_value)
if ob_formatted_partition_value_int < current_date_int:
logger.error(f"{tenant_name}:库名:{OB_TABLE_SCHEMA},表名:{OB_TABLE_NAME},目前最大分区为:{OB_PARTITION_NAME},分区描述:{OB_PARTITION_DESCRIPTION},分区最高范围至:{ob_formatted_partition_value},分区已过期")
elif ob_formatted_partition_value_int - current_date_int <= 30:
logger.warning(f"{tenant_name}:库名:{OB_TABLE_SCHEMA},表名:{OB_TABLE_NAME},目前最大分区为:{OB_PARTITION_NAME},分区描述:{OB_PARTITION_DESCRIPTION},分区最高范围至:{ob_formatted_partition_value},需及时建立新分区")
else:
logger.info(f"{tenant_name}:库名:{OB_TABLE_SCHEMA},表名:{OB_TABLE_NAME},目前最大分区为:{OB_PARTITION_NAME},分区描述:{OB_PARTITION_DESCRIPTION},分区最高范围至:{ob_formatted_partition_value},分区正常")
except ValueError:
ob_formatted_partition_value_int = 0 # 如果无法转换为整数,则设为 0
return partitions
except Exception as e:
logger.error(f"{tenant_name}:查询分区信息出错:{e}")
return None
# ob-mysql可能存在的格式'2021-01-01 00:00:00','2021-01-01'
def ob_format_partition_description(OB_PARTITION_DESCRIPTION):
value_str = str(OB_PARTITION_DESCRIPTION).strip()
date_pattern1 = re.compile(r"'(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})'")
match = date_pattern1.search(value_str)
if match:
try:
date_str = match.group(1)
date_obj = datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S')
return date_obj.strftime('%Y%m%d')
except ValueError:
pass
date_pattern2 = re.compile(r"'(\d{4}-\d{2}-\d{2})'")
match = date_pattern2.search(value_str)
if match:
date_obj = datetime.strptime(match.group(1), '%Y-%m-%d')
return date_obj.strftime('%Y%m%d')
else:
return value_str
if __name__ == '__main__':
logger.info(f"分区巡检开始,当前日期: {current_date_str}")
logger.info(f"分区巡检日志路径:{log_file}")
service_names = ['adg_orcl1','adg_orcl2']
try:
logger.info(f"Oracle分区巡检开始:")
for service_name in service_names:
ora_partition_monitor(service_name)
except cx_Oracle.DatabaseError as e:
logger.critical(f"{service_name}: {e}")
except ValueError as e:
logger.critical(f"{service_name}: {e}")
try:
logger.info(f"Mysql分区巡检开始:")
# 此处的mysql_ip存储在oracle表中,也可以写死在文件当中,根据需求修改
zytj_db = ConnOracle('query', 'password', '带入ip', 1521, 'xjdb')
mysql_ips_sql='''select HOST from MYSQL_HOST'''
mysql_ips=zytj_db.select_noheader(mysql_ips_sql)
if mysql_ips is None or len(mysql_ips) == 0:
logger.critical("No MySQL IPs found in the database.")
else:
for row in mysql_ips:
mysql_ip = row[0]
try:
# 此处密码通过特殊处理,也可以保持待巡检机器的账号密码相同,只要在脚本中写死即可
password = get_mysql_password(mysql_ip, 'query')
db_config = {
'host': mysql_ip,
'port': 3306,
'user': 'query',
'password': password
}
conn_db = pymysql.connect(**db_config)
cursor_db = conn_db.cursor()
#获取分区信息
partitions = mysql_partition_monitor(cursor_db, mysql_ip)
cursor_db.close()
conn_db.close()
except Exception as e:
logger.error(f"MySQL连接或查询失败 {mysql_ip}: {e}")
except ValueError as e:
logger.critical(f"错误: {e}")
except Exception as e:
logger.critical(f"未知错误: {e}")
try:
logger.info(f"OB-Mysql分区巡检开始:")
if tenant_info.returncode == 0:
lines = tenant_info.stdout.strip().split('\n')
mysql_tenants = []
for line in lines:
if line.startswith('MYSQL') and not line.endswith('sys'):
parts = line.split()
if len(parts) >= 2:
tenant_name = parts[1]
mysql_tenants.append(tenant_name)
logger.info(f"获取MYSQL租户:{mysql_tenants}")
for tenant_name in mysql_tenants:
try:
# 根据实际情况带入租户名,提前在ocp建好巡检用户
username = f'xj_user@{tenant_name}#obtestprd'
db_config = {
'host': 'ob_ip',
'port': 2883,
'user': username,
'password': 'password'
}
conn_ob_mysql = pymysql.connect(**db_config)
cursor_ob_db = conn_ob_mysql.cursor()
#获取分区信息
partitions = ob_mysql_partition_monitor(cursor_ob_db, tenant_name)
cursor_ob_db.close()
conn_ob_mysql.close()
except Exception as e:
logger.error(f"MySQL连接或查询失败 {tenant_name}: {e}")
except ValueError as e:
logger.critical(f"错误: {e}")
except Exception as e:
logger.critical(f"未知错误: {e}")