from datetime import datetime
import sys
import logging
import os
import pymysql
# 配置日志格式和文件名
LOG_DIR = '/home/oracle/log'
log_filename = f'auto_statistics_collection_{datetime.now().strftime("%Y%m%d")}.log'
log_file = os.path.join(LOG_DIR, log_filename)
# 确保日志目录存在
os.makedirs(LOG_DIR, exist_ok=True)
# 配置日志,设置为追加模式
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file, mode='a')
]
)
logger = logging.getLogger(__name__)
# 获取今天的日期
current_date = datetime.today().date()
current_date_str = current_date.strftime('%Y%m%d')
def ob_mysql_partition_statistics_collect(cursor_ob_db):
try:
logger.info(f"{current_date_str}")
# 查询当月分区POSITION
query_partition = """select table_name,min(PARTITION_POSITION) from
(select * from (select table_owner,table_name,PARTITION_NAME,PARTITION_POSITION,
replace(substring(replace(HIGH_VALUE,"'",''),1,10),'-','') as time_format
from dba_tab_partitions where table_owner='table_owner') a where a.time_format >= %s) group by table_owner,table_name"""
logger.info("执行查询分区信息SQL")
cursor_ob_db.execute(query_partition,(current_date_str,))
partitions = cursor_ob_db.fetchall()
logger.info(f"查询当月分区信息完成")
if partitions is None or not partitions:
logger.info(f"不存在分区表")
return None
else:
logger.info(f"待处理 {len(partitions)} 个分区")
for partition in partitions:
# 查询当月分区号
query_partition_name = """select PARTITION_NAME from dba_tab_partitions where table_owner='table_owner' and table_name=%s and PARTITION_POSITION=%s"""
logger.info(f"查询参数:{partition[0]},{partition[1]}")
cursor_ob_db.execute(query_partition_name, (partition[0], partition[1]))
partition_name_result = cursor_ob_db.fetchone()
if not partition_name_result:
logger.error(f"未能获取到分区名,跳过该分区")
continue
logger.info(f" 表: {partition[0]} , 当月分区名: {partition_name_result[0]}")
table_name_real = partition[0].strip()
partition_name_real = partition_name_result[0].strip()
if partition[0] and partition_name_result[0] and table_name_real == partition[0] and partition_name_real == partition_name_result[0]:
# 查询最近收集时间
check_before_query = """select max(replace(replace(replace(substring(LAST_ANALYZED,1,19),'-',''),':',''),' ','')) from dba_part_col_statistics
where OWNER='table_owner' and TABLE_NAME=%s and PARTITION_NAME=%s"""
cursor_ob_db.execute(check_before_query,(partition[0], partition_name_result[0]))
check_before_result = cursor_ob_db.fetchone()
# 处理查询结果
if check_before_result and check_before_result[0]:
check_before_result_value = check_before_result[0]
else:
check_before_result_value = None
logger.info(f"最近收集时间查询完成为: {check_before_result_value}")
# 统计信息收集
auto_statistics_collect_sql = """
CALL dbms_stats.gather_table_stats(ownname=>'table_owner', tabname=>%s, partname=>%s,granularity=>'APPROX_GLOBAL AND PARTITION')
"""
logger.info(f"统计信息收集带入参数:{partition[0]},{partition_name_result[0]}")
try:
cursor_ob_db.execute(auto_statistics_collect_sql, (partition[0], partition_name_result[0]))
logger.info(f"统计信息收集执行完成")
except Exception as e:
logger.error(f"统计信息收集执行失败: {e}")
continue
# 查询最新收集时间
check_after_query = """select max(replace(replace(replace(substring(LAST_ANALYZED,1,19),'-',''),':',''),' ','')) from dba_part_col_statistics
where OWNER='table_owner' and TABLE_NAME=%s and PARTITION_NAME=%s"""
cursor_ob_db.execute(check_after_query,(partition[0], partition_name_result[0]))
check_after_result = cursor_ob_db.fetchone()
# 处理查询结果
if check_after_result and check_after_result[0]:
check_after_result_value = check_after_result[0]
else:
check_after_result_value = None
logger.info(f"最新收集时间为:{check_after_result_value}")
# 比较时间
if check_after_result_value and check_before_result_value:
if check_after_result_value > check_before_result_value:
logger.info(f"表{partition[0]} 统计信息收集成功,收集时间从 {check_before_result_value} 更新为 {check_after_result_value}")
else:
logger.error(f"表{partition[0]} 统计信息收集失败,当前收集时间为{check_before_result_value}")
elif check_before_result_value is None:
logger.info(f"表{partition[0]} 之前没有统计信息,现在已收集")
else:
logger.error(f"表{partition[0]} 统计信息收集失败,无法获取新时间")
else:
logger.error(f"表信息存在问题,跳过该表: {partition[0]}")
return partitions
except Exception as e:
logger.error(f"查询分区信息出错:{e}")
return None
if __name__ == '__main__':
try:
logger.info(f"OB-Mysql租户table_owner数据库自动统计信息收集开始:")
try:
username = f'insp_user@租户名#obarchprd'
db_config = {
'host': 'ip',
'port': 2883,
'user': username,
'password': 'password',
'database': 'oceanbase'
}
conn_ob_mysql = pymysql.connect(**db_config)
cursor_ob_db = conn_ob_mysql.cursor()
partitions = ob_mysql_partition_statistics_collect(cursor_ob_db)
cursor_ob_db.close()
conn_ob_mysql.close()
except Exception as e:
logger.error(f"MySQL连接或查询失败: {e}")
except ValueError as e:
logger.critical(f"错误: {e}")
except Exception as e:
logger.critical(f"未知错误: {e}")再编写一个shell脚本调用python脚本,配置crontab即可开启自动统计信息收集