From 0641dcecfd0b9d6621df6cf88256476e274ab404 Mon Sep 17 00:00:00 2001 From: huzhujiang Date: Tue, 15 Apr 2025 18:15:57 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E8=A1=A8=E7=BB=93=E6=9E=84?= =?UTF-8?q?=20+=20excel=E5=AF=BC=E5=85=A5=E5=88=B0=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E5=BA=93?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ExcelToMysql.py | 209 ++++++++++++++++++++++++++++++++++++++++++++++++ ddl.mysql.sql | 12 +++ 2 files changed, 221 insertions(+) create mode 100644 ExcelToMysql.py create mode 100644 ddl.mysql.sql diff --git a/ExcelToMysql.py b/ExcelToMysql.py new file mode 100644 index 0000000..7599b7e --- /dev/null +++ b/ExcelToMysql.py @@ -0,0 +1,209 @@ +import pandas as pd +from sqlalchemy import create_engine, text +import logging +import math +import re +import time + +# --- 配置 (与之前相同) --- +EXCEL_FILE_PATH = 'Z:\\xiaohu\\web_data40000-50000.xlsx' +SHEET_NAME = 0 +DB_USER = 'zsjie' +DB_PASSWORD = 'xRekX6Cc3RRK6mBe' +DB_HOST = '111.180.203.166' +DB_PORT = 25506 +DB_NAME = 'zsjie' +TABLE_NAME = 'resource' +UNIQUE_KEY_COLUMNS = ['id'] +DATE_COLUMN_TO_CONVERT = 'update_date' +DEFAULT_FOR_STRING = '' +DEFAULT_FOR_NUMERIC = 0 +COMMIT_BATCH_SIZE = 300 +# --- 新增:用于条件判断的列名 --- +CONDITION_COLUMN = 'resource_url' # 基于此列在数据库中的值来决定是否更新 + +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') + +def excel_to_mysql_upsert_conditional_on_url(): + engine = None + connection = None + transaction = None + rows_processed = 0 + rows_since_last_commit = 0 + total_inserted = 0 + total_updated = 0 # 注意:此计数现在可能包含实际未更改但匹配了重复键的行 + + start_time = time.time() + + try: + # 1. 创建引擎 (带 pool_recycle) + engine_url = f"mysql+mysqlconnector://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}" + engine = create_engine(engine_url, pool_recycle=3600) + logging.info(f"数据库引擎创建成功: {DB_HOST}:{DB_PORT}/{DB_NAME}") + + # 2. 读取 Excel + read_start_time = time.time() + logging.info(f"开始读取 Excel: {EXCEL_FILE_PATH} (Sheet: {SHEET_NAME})") + df = pd.read_excel(EXCEL_FILE_PATH, sheet_name=SHEET_NAME, dtype='object', keep_default_na=True) + logging.info(f"Excel 读取完成,共 {len(df)} 行。耗时: {time.time() - read_start_time:.2f} 秒。") + + # --- 3. 数据清理和准备 --- + transform_start_time = time.time() + logging.info("开始清理和转换数据...") + # a. 重命名列 + rename_map = { + 'URL': 'url', 'Title': 'title', 'Tags': 'tags', + 'source': 'resource_url', 'password': 'resource_pd', + } + df.rename(columns=rename_map, inplace=True) + # b. 删除 Unnamed 列 + unnamed_cols = [col for col in df.columns if str(col).startswith('Unnamed:')] + if unnamed_cols: df.drop(columns=unnamed_cols, inplace=True) + # c. 清理列名空格 + df.columns = [str(col).strip() for col in df.columns] + # d. 检查 'id' 和 条件列 存在 + if 'id' not in df.columns: raise ValueError("错误:唯一键列 'id' 在 DataFrame 中未找到。") + if CONDITION_COLUMN not in df.columns: + # 如果条件列必须存在,则报错 + raise ValueError(f"错误:条件更新所需的列 '{CONDITION_COLUMN}' 在 DataFrame 中未找到。") + # 如果条件列是可选的,可以只记录警告并构建不带条件的SQL + # logging.warning(f"警告: 列 '{CONDITION_COLUMN}' 未找到,将执行无条件 Upsert。") + # build_conditional_sql = False # 控制下方 SQL 构建逻辑 + # e. update_date 转换 + if DATE_COLUMN_TO_CONVERT in df.columns: + date_series = df[DATE_COLUMN_TO_CONVERT].astype(str) + date_series_numeric_str = date_series.str.replace(r'\D', '', regex=True) + df[DATE_COLUMN_TO_CONVERT] = pd.to_numeric(date_series_numeric_str, errors='coerce') + # f. 确定列类型 + col_types = {} + for col in df.columns: + if pd.api.types.is_string_dtype(df[col]) or pd.api.types.is_object_dtype(df[col]): col_types[col] = 'string' + elif pd.api.types.is_numeric_dtype(df[col]): col_types[col] = 'numeric' + else: col_types[col] = 'other' + logging.info(f"数据清理转换完成。耗时: {time.time() - transform_start_time:.2f} 秒。") + + + # g. *** 修改:构建带条件的 SQL 模板 *** + all_columns = df.columns.tolist() + update_columns = [col for col in all_columns if col not in UNIQUE_KEY_COLUMNS] + cols_str = ", ".join([f"`{col}`" for col in all_columns]) + placeholders_str = ", ".join([f":{col}" for col in all_columns]) + + # 构建 ON DUPLICATE KEY UPDATE 部分 + if update_columns: + update_clause_list = [] + for col in update_columns: + # --- 核心条件逻辑 --- + # 如果数据库现有的 resource_url 是 NULL 或空, 则更新为新值, 否则保持旧值 + update_clause = f"`{col}` = IF(`{CONDITION_COLUMN}` IS NULL OR `{CONDITION_COLUMN}` = '', VALUES(`{col}`), `{col}`)" + update_clause_list.append(update_clause) + # --- 结束核心条件逻辑 --- + + update_str = ", ".join(update_clause_list) + sql_template = f""" + INSERT INTO `{TABLE_NAME}` ({cols_str}) + VALUES ({placeholders_str}) + ON DUPLICATE KEY UPDATE {update_str} + """ + logging.info(f"将使用带条件 (基于数据库 '{CONDITION_COLUMN}' 值) 的 INSERT ... ON DUPLICATE KEY UPDATE 模式。") + else: + # 如果只有 id 列,则使用 INSERT IGNORE + sql_template = f"INSERT IGNORE INTO `{TABLE_NAME}` ({cols_str}) VALUES ({placeholders_str})" + logging.info("仅配置了唯一键,将使用 INSERT IGNORE 模式。") + # --- 结束 SQL 构建 --- + + + # --- 4. 数据库交互与周期性提交 (与之前类似) --- + db_interaction_start_time = time.time() + connection = engine.connect() + logging.info("数据库连接成功。") + transaction = connection.begin() + logging.info("已开始第一个事务。") + + logging.info(f"开始处理 {len(df)} 行数据 (每 {COMMIT_BATCH_SIZE} 行提交一次)...") + for record_original in df.to_dict(orient='records'): + rows_processed += 1 + record_processed = {} + + # --- 应用默认值 (与之前相同) --- + for col_name, value in record_original.items(): + processed_value = value + if pd.isna(processed_value): + if col_name not in UNIQUE_KEY_COLUMNS: + column_type = col_types.get(col_name, 'other') + if column_type == 'string': processed_value = DEFAULT_FOR_STRING + elif column_type == 'numeric': processed_value = DEFAULT_FOR_NUMERIC + else: processed_value = None + record_processed[col_name] = processed_value + + # --- 检查 'id' (与之前相同) --- + if record_processed.get('id') is None: + logging.warning(f"跳过第 {rows_processed} 行,因为 'id' 列为空或无效: {record_original}") + continue # 跳过该行 + + # 增加有效行批次计数器 + rows_since_last_commit += 1 + + # --- 执行 SQL (使用新的条件模板) --- + try: + result = connection.execute(text(sql_template), record_processed) + # --- 解释 rowcount (可能有歧义) --- + # 1: 插入了新行 + # 2: 匹配了重复键,并执行了 UPDATE 子句(即使所有 IF 条件都为 false,导致无实际更改) + # 0: 匹配了重复键,但某些 MySQL 版本/配置下,无实际更改的 UPDATE 可能报告 0 + if result.rowcount == 1: + total_inserted += 1 + elif result.rowcount == 2: # 认为匹配了重复键并尝试了更新 + total_updated += 1 + # 注意:total_updated 不再精确代表“实际发生值改变的更新行数” + + except Exception as row_error: + logging.error(f"处理行数据时出错 (行号约 {rows_processed}):\n 原始: {record_original}\n 处理后: {record_processed}\n 错误: {row_error}") + if transaction: + try: transaction.rollback() + except Exception as rb_err: logging.error(f"回滚事务时也出错: {rb_err}") + raise + + # --- 周期性提交 (与之前相同) --- + if rows_since_last_commit >= COMMIT_BATCH_SIZE: + try: + commit_start = time.time() + transaction.commit() + commit_duration = time.time() - commit_start + logging.info(f"已提交 {rows_since_last_commit} 行 (处理总数: {rows_processed})。本次提交耗时: {commit_duration:.2f} 秒。") + transaction = connection.begin() + rows_since_last_commit = 0 + except Exception as commit_error: + logging.error(f"提交事务时出错 (行号约 {rows_processed}): {commit_error}") + raise + + # --- 循环结束后的最终提交 (与之前相同) --- + if rows_since_last_commit > 0: + try: + final_commit_start = time.time() + logging.info(f"准备提交最后 {rows_since_last_commit} 行...") + transaction.commit() + final_commit_duration = time.time() - final_commit_start + logging.info(f"最后 {rows_since_last_commit} 行已成功提交。耗时: {final_commit_duration:.2f} 秒。") + except Exception as final_commit_error: + logging.error(f"提交最后批次事务时出错: {final_commit_error}") + raise + + total_db_time = time.time() - db_interaction_start_time + logging.info(f"数据库交互完成。总耗时: {total_db_time:.2f} 秒。") + logging.info(f"处理完成。总处理行: {rows_processed}, 总插入: {total_inserted}, 总匹配重复键(尝试更新): {total_updated}.") + + except ValueError as ve: + logging.error(f"配置或数据错误: {ve}") + except Exception as e: + logging.error(f"发生严重错误,脚本已停止: {e}", exc_info=False) + finally: + if connection: + connection.close() + logging.info("数据库连接已关闭。") + total_script_time = time.time() - start_time + logging.info(f"脚本总运行时间: {total_script_time:.2f} 秒。") + + +if __name__ == "__main__": + excel_to_mysql_upsert_conditional_on_url() \ No newline at end of file diff --git a/ddl.mysql.sql b/ddl.mysql.sql new file mode 100644 index 0000000..ab17b50 --- /dev/null +++ b/ddl.mysql.sql @@ -0,0 +1,12 @@ +CREATE TABLE resource ( + id int not null default 0 comment '资源ID', + url varchar(100) binary not null default ' ' comment '资源URL', + title varchar(100) binary not null default ' ' comment '标题', + tags varchar(50) binary not null default ' ' comment '标签', + update_date int not null default 0 comment '更新日期', + resource_url varchar(100) binary not null default ' ' comment '资源地址', + resource_pd varchar(10) binary not null default ' ' comment '资源密码', + alias_name varchar(100) binary not null default ' ' comment '资源别名', + available_flag varchar(1) binary not null default ' ' comment '是否可用', + constraint pk_resource primary key (id) +)COMMENT='资源表'; \ No newline at end of file