Compare commits
2 Commits
ae5bfa04c3
...
0641dcecfd
Author | SHA1 | Date | |
---|---|---|---|
0641dcecfd | |||
dc908089fc |
209
ExcelToMysql.py
Normal file
209
ExcelToMysql.py
Normal file
@ -0,0 +1,209 @@
|
|||||||
|
import pandas as pd
|
||||||
|
from sqlalchemy import create_engine, text
|
||||||
|
import logging
|
||||||
|
import math
|
||||||
|
import re
|
||||||
|
import time
|
||||||
|
|
||||||
|
# --- 配置 (与之前相同) ---
|
||||||
|
EXCEL_FILE_PATH = 'Z:\\xiaohu\\web_data40000-50000.xlsx'
|
||||||
|
SHEET_NAME = 0
|
||||||
|
DB_USER = 'zsjie'
|
||||||
|
DB_PASSWORD = 'xRekX6Cc3RRK6mBe'
|
||||||
|
DB_HOST = '111.180.203.166'
|
||||||
|
DB_PORT = 25506
|
||||||
|
DB_NAME = 'zsjie'
|
||||||
|
TABLE_NAME = 'resource'
|
||||||
|
UNIQUE_KEY_COLUMNS = ['id']
|
||||||
|
DATE_COLUMN_TO_CONVERT = 'update_date'
|
||||||
|
DEFAULT_FOR_STRING = ''
|
||||||
|
DEFAULT_FOR_NUMERIC = 0
|
||||||
|
COMMIT_BATCH_SIZE = 300
|
||||||
|
# --- 新增:用于条件判断的列名 ---
|
||||||
|
CONDITION_COLUMN = 'resource_url' # 基于此列在数据库中的值来决定是否更新
|
||||||
|
|
||||||
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
||||||
|
|
||||||
|
def excel_to_mysql_upsert_conditional_on_url():
|
||||||
|
engine = None
|
||||||
|
connection = None
|
||||||
|
transaction = None
|
||||||
|
rows_processed = 0
|
||||||
|
rows_since_last_commit = 0
|
||||||
|
total_inserted = 0
|
||||||
|
total_updated = 0 # 注意:此计数现在可能包含实际未更改但匹配了重复键的行
|
||||||
|
|
||||||
|
start_time = time.time()
|
||||||
|
|
||||||
|
try:
|
||||||
|
# 1. 创建引擎 (带 pool_recycle)
|
||||||
|
engine_url = f"mysql+mysqlconnector://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
|
||||||
|
engine = create_engine(engine_url, pool_recycle=3600)
|
||||||
|
logging.info(f"数据库引擎创建成功: {DB_HOST}:{DB_PORT}/{DB_NAME}")
|
||||||
|
|
||||||
|
# 2. 读取 Excel
|
||||||
|
read_start_time = time.time()
|
||||||
|
logging.info(f"开始读取 Excel: {EXCEL_FILE_PATH} (Sheet: {SHEET_NAME})")
|
||||||
|
df = pd.read_excel(EXCEL_FILE_PATH, sheet_name=SHEET_NAME, dtype='object', keep_default_na=True)
|
||||||
|
logging.info(f"Excel 读取完成,共 {len(df)} 行。耗时: {time.time() - read_start_time:.2f} 秒。")
|
||||||
|
|
||||||
|
# --- 3. 数据清理和准备 ---
|
||||||
|
transform_start_time = time.time()
|
||||||
|
logging.info("开始清理和转换数据...")
|
||||||
|
# a. 重命名列
|
||||||
|
rename_map = {
|
||||||
|
'URL': 'url', 'Title': 'title', 'Tags': 'tags',
|
||||||
|
'source': 'resource_url', 'password': 'resource_pd',
|
||||||
|
}
|
||||||
|
df.rename(columns=rename_map, inplace=True)
|
||||||
|
# b. 删除 Unnamed 列
|
||||||
|
unnamed_cols = [col for col in df.columns if str(col).startswith('Unnamed:')]
|
||||||
|
if unnamed_cols: df.drop(columns=unnamed_cols, inplace=True)
|
||||||
|
# c. 清理列名空格
|
||||||
|
df.columns = [str(col).strip() for col in df.columns]
|
||||||
|
# d. 检查 'id' 和 条件列 存在
|
||||||
|
if 'id' not in df.columns: raise ValueError("错误:唯一键列 'id' 在 DataFrame 中未找到。")
|
||||||
|
if CONDITION_COLUMN not in df.columns:
|
||||||
|
# 如果条件列必须存在,则报错
|
||||||
|
raise ValueError(f"错误:条件更新所需的列 '{CONDITION_COLUMN}' 在 DataFrame 中未找到。")
|
||||||
|
# 如果条件列是可选的,可以只记录警告并构建不带条件的SQL
|
||||||
|
# logging.warning(f"警告: 列 '{CONDITION_COLUMN}' 未找到,将执行无条件 Upsert。")
|
||||||
|
# build_conditional_sql = False # 控制下方 SQL 构建逻辑
|
||||||
|
# e. update_date 转换
|
||||||
|
if DATE_COLUMN_TO_CONVERT in df.columns:
|
||||||
|
date_series = df[DATE_COLUMN_TO_CONVERT].astype(str)
|
||||||
|
date_series_numeric_str = date_series.str.replace(r'\D', '', regex=True)
|
||||||
|
df[DATE_COLUMN_TO_CONVERT] = pd.to_numeric(date_series_numeric_str, errors='coerce')
|
||||||
|
# f. 确定列类型
|
||||||
|
col_types = {}
|
||||||
|
for col in df.columns:
|
||||||
|
if pd.api.types.is_string_dtype(df[col]) or pd.api.types.is_object_dtype(df[col]): col_types[col] = 'string'
|
||||||
|
elif pd.api.types.is_numeric_dtype(df[col]): col_types[col] = 'numeric'
|
||||||
|
else: col_types[col] = 'other'
|
||||||
|
logging.info(f"数据清理转换完成。耗时: {time.time() - transform_start_time:.2f} 秒。")
|
||||||
|
|
||||||
|
|
||||||
|
# g. *** 修改:构建带条件的 SQL 模板 ***
|
||||||
|
all_columns = df.columns.tolist()
|
||||||
|
update_columns = [col for col in all_columns if col not in UNIQUE_KEY_COLUMNS]
|
||||||
|
cols_str = ", ".join([f"`{col}`" for col in all_columns])
|
||||||
|
placeholders_str = ", ".join([f":{col}" for col in all_columns])
|
||||||
|
|
||||||
|
# 构建 ON DUPLICATE KEY UPDATE 部分
|
||||||
|
if update_columns:
|
||||||
|
update_clause_list = []
|
||||||
|
for col in update_columns:
|
||||||
|
# --- 核心条件逻辑 ---
|
||||||
|
# 如果数据库现有的 resource_url 是 NULL 或空, 则更新为新值, 否则保持旧值
|
||||||
|
update_clause = f"`{col}` = IF(`{CONDITION_COLUMN}` IS NULL OR `{CONDITION_COLUMN}` = '', VALUES(`{col}`), `{col}`)"
|
||||||
|
update_clause_list.append(update_clause)
|
||||||
|
# --- 结束核心条件逻辑 ---
|
||||||
|
|
||||||
|
update_str = ", ".join(update_clause_list)
|
||||||
|
sql_template = f"""
|
||||||
|
INSERT INTO `{TABLE_NAME}` ({cols_str})
|
||||||
|
VALUES ({placeholders_str})
|
||||||
|
ON DUPLICATE KEY UPDATE {update_str}
|
||||||
|
"""
|
||||||
|
logging.info(f"将使用带条件 (基于数据库 '{CONDITION_COLUMN}' 值) 的 INSERT ... ON DUPLICATE KEY UPDATE 模式。")
|
||||||
|
else:
|
||||||
|
# 如果只有 id 列,则使用 INSERT IGNORE
|
||||||
|
sql_template = f"INSERT IGNORE INTO `{TABLE_NAME}` ({cols_str}) VALUES ({placeholders_str})"
|
||||||
|
logging.info("仅配置了唯一键,将使用 INSERT IGNORE 模式。")
|
||||||
|
# --- 结束 SQL 构建 ---
|
||||||
|
|
||||||
|
|
||||||
|
# --- 4. 数据库交互与周期性提交 (与之前类似) ---
|
||||||
|
db_interaction_start_time = time.time()
|
||||||
|
connection = engine.connect()
|
||||||
|
logging.info("数据库连接成功。")
|
||||||
|
transaction = connection.begin()
|
||||||
|
logging.info("已开始第一个事务。")
|
||||||
|
|
||||||
|
logging.info(f"开始处理 {len(df)} 行数据 (每 {COMMIT_BATCH_SIZE} 行提交一次)...")
|
||||||
|
for record_original in df.to_dict(orient='records'):
|
||||||
|
rows_processed += 1
|
||||||
|
record_processed = {}
|
||||||
|
|
||||||
|
# --- 应用默认值 (与之前相同) ---
|
||||||
|
for col_name, value in record_original.items():
|
||||||
|
processed_value = value
|
||||||
|
if pd.isna(processed_value):
|
||||||
|
if col_name not in UNIQUE_KEY_COLUMNS:
|
||||||
|
column_type = col_types.get(col_name, 'other')
|
||||||
|
if column_type == 'string': processed_value = DEFAULT_FOR_STRING
|
||||||
|
elif column_type == 'numeric': processed_value = DEFAULT_FOR_NUMERIC
|
||||||
|
else: processed_value = None
|
||||||
|
record_processed[col_name] = processed_value
|
||||||
|
|
||||||
|
# --- 检查 'id' (与之前相同) ---
|
||||||
|
if record_processed.get('id') is None:
|
||||||
|
logging.warning(f"跳过第 {rows_processed} 行,因为 'id' 列为空或无效: {record_original}")
|
||||||
|
continue # 跳过该行
|
||||||
|
|
||||||
|
# 增加有效行批次计数器
|
||||||
|
rows_since_last_commit += 1
|
||||||
|
|
||||||
|
# --- 执行 SQL (使用新的条件模板) ---
|
||||||
|
try:
|
||||||
|
result = connection.execute(text(sql_template), record_processed)
|
||||||
|
# --- 解释 rowcount (可能有歧义) ---
|
||||||
|
# 1: 插入了新行
|
||||||
|
# 2: 匹配了重复键,并执行了 UPDATE 子句(即使所有 IF 条件都为 false,导致无实际更改)
|
||||||
|
# 0: 匹配了重复键,但某些 MySQL 版本/配置下,无实际更改的 UPDATE 可能报告 0
|
||||||
|
if result.rowcount == 1:
|
||||||
|
total_inserted += 1
|
||||||
|
elif result.rowcount == 2: # 认为匹配了重复键并尝试了更新
|
||||||
|
total_updated += 1
|
||||||
|
# 注意:total_updated 不再精确代表“实际发生值改变的更新行数”
|
||||||
|
|
||||||
|
except Exception as row_error:
|
||||||
|
logging.error(f"处理行数据时出错 (行号约 {rows_processed}):\n 原始: {record_original}\n 处理后: {record_processed}\n 错误: {row_error}")
|
||||||
|
if transaction:
|
||||||
|
try: transaction.rollback()
|
||||||
|
except Exception as rb_err: logging.error(f"回滚事务时也出错: {rb_err}")
|
||||||
|
raise
|
||||||
|
|
||||||
|
# --- 周期性提交 (与之前相同) ---
|
||||||
|
if rows_since_last_commit >= COMMIT_BATCH_SIZE:
|
||||||
|
try:
|
||||||
|
commit_start = time.time()
|
||||||
|
transaction.commit()
|
||||||
|
commit_duration = time.time() - commit_start
|
||||||
|
logging.info(f"已提交 {rows_since_last_commit} 行 (处理总数: {rows_processed})。本次提交耗时: {commit_duration:.2f} 秒。")
|
||||||
|
transaction = connection.begin()
|
||||||
|
rows_since_last_commit = 0
|
||||||
|
except Exception as commit_error:
|
||||||
|
logging.error(f"提交事务时出错 (行号约 {rows_processed}): {commit_error}")
|
||||||
|
raise
|
||||||
|
|
||||||
|
# --- 循环结束后的最终提交 (与之前相同) ---
|
||||||
|
if rows_since_last_commit > 0:
|
||||||
|
try:
|
||||||
|
final_commit_start = time.time()
|
||||||
|
logging.info(f"准备提交最后 {rows_since_last_commit} 行...")
|
||||||
|
transaction.commit()
|
||||||
|
final_commit_duration = time.time() - final_commit_start
|
||||||
|
logging.info(f"最后 {rows_since_last_commit} 行已成功提交。耗时: {final_commit_duration:.2f} 秒。")
|
||||||
|
except Exception as final_commit_error:
|
||||||
|
logging.error(f"提交最后批次事务时出错: {final_commit_error}")
|
||||||
|
raise
|
||||||
|
|
||||||
|
total_db_time = time.time() - db_interaction_start_time
|
||||||
|
logging.info(f"数据库交互完成。总耗时: {total_db_time:.2f} 秒。")
|
||||||
|
logging.info(f"处理完成。总处理行: {rows_processed}, 总插入: {total_inserted}, 总匹配重复键(尝试更新): {total_updated}.")
|
||||||
|
|
||||||
|
except ValueError as ve:
|
||||||
|
logging.error(f"配置或数据错误: {ve}")
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(f"发生严重错误,脚本已停止: {e}", exc_info=False)
|
||||||
|
finally:
|
||||||
|
if connection:
|
||||||
|
connection.close()
|
||||||
|
logging.info("数据库连接已关闭。")
|
||||||
|
total_script_time = time.time() - start_time
|
||||||
|
logging.info(f"脚本总运行时间: {total_script_time:.2f} 秒。")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
excel_to_mysql_upsert_conditional_on_url()
|
12
ddl.mysql.sql
Normal file
12
ddl.mysql.sql
Normal file
@ -0,0 +1,12 @@
|
|||||||
|
CREATE TABLE resource (
|
||||||
|
id int not null default 0 comment '资源ID',
|
||||||
|
url varchar(100) binary not null default ' ' comment '资源URL',
|
||||||
|
title varchar(100) binary not null default ' ' comment '标题',
|
||||||
|
tags varchar(50) binary not null default ' ' comment '标签',
|
||||||
|
update_date int not null default 0 comment '更新日期',
|
||||||
|
resource_url varchar(100) binary not null default ' ' comment '资源地址',
|
||||||
|
resource_pd varchar(10) binary not null default ' ' comment '资源密码',
|
||||||
|
alias_name varchar(100) binary not null default ' ' comment '资源别名',
|
||||||
|
available_flag varchar(1) binary not null default ' ' comment '是否可用',
|
||||||
|
constraint pk_resource primary key (id)
|
||||||
|
)COMMENT='资源表';
|
55
zsjieSpider.py
Normal file
55
zsjieSpider.py
Normal file
@ -0,0 +1,55 @@
|
|||||||
|
from dbm import error
|
||||||
|
|
||||||
|
import requests
|
||||||
|
from bs4 import BeautifulSoup
|
||||||
|
import openpyxl
|
||||||
|
import re
|
||||||
|
|
||||||
|
|
||||||
|
range_start = 40000
|
||||||
|
range_end = 50000
|
||||||
|
# 创建Excel工作簿和工作表对象
|
||||||
|
workbook = openpyxl.Workbook()
|
||||||
|
sheet = workbook.active
|
||||||
|
sheet.title = 'Web Data'
|
||||||
|
sheet._current_row = 1
|
||||||
|
rownum = 1
|
||||||
|
sheet.append(['id', 'URL', 'Title', 'Tags', 'update_date', 'source', 'password']) # 标题行
|
||||||
|
|
||||||
|
# 定义起始URL模板和范围(假设是五位数字)
|
||||||
|
url_template = 'https://www.zsjie.com/¥num.html'
|
||||||
|
num_range = range(range_start, range_end) # 从10000到99999的五个数字组合范围
|
||||||
|
|
||||||
|
headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.99 Safari/537.36"}
|
||||||
|
|
||||||
|
# 循环遍历所有可能的URL并获取数据
|
||||||
|
for num in num_range:
|
||||||
|
url = url_template.replace('¥num', str(num)) # 构建URL
|
||||||
|
print(url)
|
||||||
|
try:
|
||||||
|
response = requests.get(url, headers=headers) # 发起网络请求获取网页内容
|
||||||
|
if response.status_code == 200: # 检查请求是否成功
|
||||||
|
soup = BeautifulSoup(response.content, 'html.parser') # 解析HTML内容
|
||||||
|
# print(soup)
|
||||||
|
# 使用CSS选择器定位目标元素,以下CSS选择器需根据实际情况更改
|
||||||
|
el = soup.select('.entry-title')
|
||||||
|
if len(el) < 1:
|
||||||
|
continue
|
||||||
|
title = soup.select('.entry-title')[0].text.replace('\n', '').replace(' ', '').replace('\'', '')
|
||||||
|
tags = soup.select('.meta-category > a')[0].text
|
||||||
|
update_date = soup.select('.meta-date > time')[0].text.replace('\n', '').replace(' ', '').replace('\'',
|
||||||
|
'')
|
||||||
|
rownum += 1
|
||||||
|
sheet._current_row = rownum
|
||||||
|
sheet.append([num, url, title, tags, update_date])
|
||||||
|
else:
|
||||||
|
print(url, response.status_code)
|
||||||
|
except Exception as e:
|
||||||
|
print(url)
|
||||||
|
print(e)
|
||||||
|
continue
|
||||||
|
|
||||||
|
workbook.save('web_data' + str(range_start) + '-' + str(range_end) + '.xlsx') # 请注意更改此路径为你的输出路径及文件名需求。如果存在同名文件将被覆盖。需要小心操作以防止意外覆盖原有文件。 最后请确保脚本运行在有足够权限的环境下,并且正确处理了可能出现的异常。注意检查网页结构是否稳定,因为网页结构的变化可能导致脚本失效。同时,请遵守网站的爬虫政策以避免不必要的麻烦。
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
x
Reference in New Issue
Block a user