zsjie/ExcelToMysql.py

209 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import pandas as pd
from sqlalchemy import create_engine, text
import logging
import math
import re
import time
# --- 配置 (与之前相同) ---
EXCEL_FILE_PATH = 'Z:\\xiaohu\\web_data40000-50000.xlsx'
SHEET_NAME = 0
DB_USER = 'zsjie'
DB_PASSWORD = 'xRekX6Cc3RRK6mBe'
DB_HOST = '111.180.203.166'
DB_PORT = 25506
DB_NAME = 'zsjie'
TABLE_NAME = 'resource'
UNIQUE_KEY_COLUMNS = ['id']
DATE_COLUMN_TO_CONVERT = 'update_date'
DEFAULT_FOR_STRING = ''
DEFAULT_FOR_NUMERIC = 0
COMMIT_BATCH_SIZE = 300
# --- 新增:用于条件判断的列名 ---
CONDITION_COLUMN = 'resource_url' # 基于此列在数据库中的值来决定是否更新
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def excel_to_mysql_upsert_conditional_on_url():
engine = None
connection = None
transaction = None
rows_processed = 0
rows_since_last_commit = 0
total_inserted = 0
total_updated = 0 # 注意:此计数现在可能包含实际未更改但匹配了重复键的行
start_time = time.time()
try:
# 1. 创建引擎 (带 pool_recycle)
engine_url = f"mysql+mysqlconnector://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
engine = create_engine(engine_url, pool_recycle=3600)
logging.info(f"数据库引擎创建成功: {DB_HOST}:{DB_PORT}/{DB_NAME}")
# 2. 读取 Excel
read_start_time = time.time()
logging.info(f"开始读取 Excel: {EXCEL_FILE_PATH} (Sheet: {SHEET_NAME})")
df = pd.read_excel(EXCEL_FILE_PATH, sheet_name=SHEET_NAME, dtype='object', keep_default_na=True)
logging.info(f"Excel 读取完成,共 {len(df)} 行。耗时: {time.time() - read_start_time:.2f} 秒。")
# --- 3. 数据清理和准备 ---
transform_start_time = time.time()
logging.info("开始清理和转换数据...")
# a. 重命名列
rename_map = {
'URL': 'url', 'Title': 'title', 'Tags': 'tags',
'source': 'resource_url', 'password': 'resource_pd',
}
df.rename(columns=rename_map, inplace=True)
# b. 删除 Unnamed 列
unnamed_cols = [col for col in df.columns if str(col).startswith('Unnamed:')]
if unnamed_cols: df.drop(columns=unnamed_cols, inplace=True)
# c. 清理列名空格
df.columns = [str(col).strip() for col in df.columns]
# d. 检查 'id' 和 条件列 存在
if 'id' not in df.columns: raise ValueError("错误:唯一键列 'id' 在 DataFrame 中未找到。")
if CONDITION_COLUMN not in df.columns:
# 如果条件列必须存在,则报错
raise ValueError(f"错误:条件更新所需的列 '{CONDITION_COLUMN}' 在 DataFrame 中未找到。")
# 如果条件列是可选的可以只记录警告并构建不带条件的SQL
# logging.warning(f"警告: 列 '{CONDITION_COLUMN}' 未找到,将执行无条件 Upsert。")
# build_conditional_sql = False # 控制下方 SQL 构建逻辑
# e. update_date 转换
if DATE_COLUMN_TO_CONVERT in df.columns:
date_series = df[DATE_COLUMN_TO_CONVERT].astype(str)
date_series_numeric_str = date_series.str.replace(r'\D', '', regex=True)
df[DATE_COLUMN_TO_CONVERT] = pd.to_numeric(date_series_numeric_str, errors='coerce')
# f. 确定列类型
col_types = {}
for col in df.columns:
if pd.api.types.is_string_dtype(df[col]) or pd.api.types.is_object_dtype(df[col]): col_types[col] = 'string'
elif pd.api.types.is_numeric_dtype(df[col]): col_types[col] = 'numeric'
else: col_types[col] = 'other'
logging.info(f"数据清理转换完成。耗时: {time.time() - transform_start_time:.2f} 秒。")
# g. *** 修改:构建带条件的 SQL 模板 ***
all_columns = df.columns.tolist()
update_columns = [col for col in all_columns if col not in UNIQUE_KEY_COLUMNS]
cols_str = ", ".join([f"`{col}`" for col in all_columns])
placeholders_str = ", ".join([f":{col}" for col in all_columns])
# 构建 ON DUPLICATE KEY UPDATE 部分
if update_columns:
update_clause_list = []
for col in update_columns:
# --- 核心条件逻辑 ---
# 如果数据库现有的 resource_url 是 NULL 或空, 则更新为新值, 否则保持旧值
update_clause = f"`{col}` = IF(`{CONDITION_COLUMN}` IS NULL OR `{CONDITION_COLUMN}` = '', VALUES(`{col}`), `{col}`)"
update_clause_list.append(update_clause)
# --- 结束核心条件逻辑 ---
update_str = ", ".join(update_clause_list)
sql_template = f"""
INSERT INTO `{TABLE_NAME}` ({cols_str})
VALUES ({placeholders_str})
ON DUPLICATE KEY UPDATE {update_str}
"""
logging.info(f"将使用带条件 (基于数据库 '{CONDITION_COLUMN}' 值) 的 INSERT ... ON DUPLICATE KEY UPDATE 模式。")
else:
# 如果只有 id 列,则使用 INSERT IGNORE
sql_template = f"INSERT IGNORE INTO `{TABLE_NAME}` ({cols_str}) VALUES ({placeholders_str})"
logging.info("仅配置了唯一键,将使用 INSERT IGNORE 模式。")
# --- 结束 SQL 构建 ---
# --- 4. 数据库交互与周期性提交 (与之前类似) ---
db_interaction_start_time = time.time()
connection = engine.connect()
logging.info("数据库连接成功。")
transaction = connection.begin()
logging.info("已开始第一个事务。")
logging.info(f"开始处理 {len(df)} 行数据 (每 {COMMIT_BATCH_SIZE} 行提交一次)...")
for record_original in df.to_dict(orient='records'):
rows_processed += 1
record_processed = {}
# --- 应用默认值 (与之前相同) ---
for col_name, value in record_original.items():
processed_value = value
if pd.isna(processed_value):
if col_name not in UNIQUE_KEY_COLUMNS:
column_type = col_types.get(col_name, 'other')
if column_type == 'string': processed_value = DEFAULT_FOR_STRING
elif column_type == 'numeric': processed_value = DEFAULT_FOR_NUMERIC
else: processed_value = None
record_processed[col_name] = processed_value
# --- 检查 'id' (与之前相同) ---
if record_processed.get('id') is None:
logging.warning(f"跳过第 {rows_processed} 行,因为 'id' 列为空或无效: {record_original}")
continue # 跳过该行
# 增加有效行批次计数器
rows_since_last_commit += 1
# --- 执行 SQL (使用新的条件模板) ---
try:
result = connection.execute(text(sql_template), record_processed)
# --- 解释 rowcount (可能有歧义) ---
# 1: 插入了新行
# 2: 匹配了重复键,并执行了 UPDATE 子句(即使所有 IF 条件都为 false导致无实际更改
# 0: 匹配了重复键,但某些 MySQL 版本/配置下,无实际更改的 UPDATE 可能报告 0
if result.rowcount == 1:
total_inserted += 1
elif result.rowcount == 2: # 认为匹配了重复键并尝试了更新
total_updated += 1
# 注意total_updated 不再精确代表“实际发生值改变的更新行数”
except Exception as row_error:
logging.error(f"处理行数据时出错 (行号约 {rows_processed}):\n 原始: {record_original}\n 处理后: {record_processed}\n 错误: {row_error}")
if transaction:
try: transaction.rollback()
except Exception as rb_err: logging.error(f"回滚事务时也出错: {rb_err}")
raise
# --- 周期性提交 (与之前相同) ---
if rows_since_last_commit >= COMMIT_BATCH_SIZE:
try:
commit_start = time.time()
transaction.commit()
commit_duration = time.time() - commit_start
logging.info(f"已提交 {rows_since_last_commit} 行 (处理总数: {rows_processed})。本次提交耗时: {commit_duration:.2f} 秒。")
transaction = connection.begin()
rows_since_last_commit = 0
except Exception as commit_error:
logging.error(f"提交事务时出错 (行号约 {rows_processed}): {commit_error}")
raise
# --- 循环结束后的最终提交 (与之前相同) ---
if rows_since_last_commit > 0:
try:
final_commit_start = time.time()
logging.info(f"准备提交最后 {rows_since_last_commit} 行...")
transaction.commit()
final_commit_duration = time.time() - final_commit_start
logging.info(f"最后 {rows_since_last_commit} 行已成功提交。耗时: {final_commit_duration:.2f} 秒。")
except Exception as final_commit_error:
logging.error(f"提交最后批次事务时出错: {final_commit_error}")
raise
total_db_time = time.time() - db_interaction_start_time
logging.info(f"数据库交互完成。总耗时: {total_db_time:.2f} 秒。")
logging.info(f"处理完成。总处理行: {rows_processed}, 总插入: {total_inserted}, 总匹配重复键(尝试更新): {total_updated}.")
except ValueError as ve:
logging.error(f"配置或数据错误: {ve}")
except Exception as e:
logging.error(f"发生严重错误,脚本已停止: {e}", exc_info=False)
finally:
if connection:
connection.close()
logging.info("数据库连接已关闭。")
total_script_time = time.time() - start_time
logging.info(f"脚本总运行时间: {total_script_time:.2f} 秒。")
if __name__ == "__main__":
excel_to_mysql_upsert_conditional_on_url()