新增excel转ta申请文件的脚本

This commit is contained in:
huzhujiang 2025-06-12 10:17:46 +08:00
parent 639371daee
commit 590f7ba6a1
2 changed files with 401 additions and 0 deletions

289
GenSql.py Normal file
View File

@ -0,0 +1,289 @@
import pandas as pd
import mysql.connector
from mysql.connector import errorcode
import sys
DB_CONFIG = {
'user': 'ifm50', # 您的数据库用户名
'password': 'hundsun', # 您的数据库密码
'host': '192.168.200.128', # 您的数据库主机名 (例如: '127.0.0.1' 或 'localhost')
'database': 'ifm50', # 您要连接的数据库名称
'port': 3306,
'raise_on_warnings': True # 在警告时引发异常
}
def connect_to_mysql():
"""连接到 MySQL 数据库并返回连接和游标对象。"""
try:
cnx = mysql.connector.connect(**DB_CONFIG)
cursor = cnx.cursor()
print("成功连接到 MySQL 数据库")
return cnx, cursor
except mysql.connector.Error as err:
if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
print("访问被拒绝:用户名或密码错误")
elif err.errno == errorcode.ER_BAD_DB_ERROR:
print(f"数据库 '{DB_CONFIG['database']}' 不存在")
else:
print(f"连接 MySQL 时发生错误: {err}")
return None, None
def genTestTxtFromExcel(demoStr, excelPath, rename_map, final_file_poath):
cnx, cursor = connect_to_mysql()
bankTATemplate = {}
templateDictMap = {}
templateList = []
try:
cursor.execute("""
select t.bank_no, t.seller_code, t.templet from tbbankta t
""" )
results = cursor.fetchall()
if results:
for item in results:
bankTATemplate[item[1]] = item[2]
if item[2] not in templateList:
templateList.append(item[2])
for template in templateList:
fileTypes = ["03"]
if str(template).startswith("WMDEP"):
fileTypes = ["005"]
if template != "WMDEP1.1":
if str(template).startswith("WMDEP"):
fileTypes.append("052")
else:
fileTypes.append("F3")
for fileType in fileTypes:
key = template+"-"+fileType
if key not in demoStr or len(demoStr[key]) == 0:
print(f"未提供[{template}+{fileType}]的正确数据!")
continue
finalLen = len((demoStr[template+"-"+fileType]).encode("gb2312"))
cursor.execute(f"""
select t.templet, t.file_type , t.field_no , t2.data_type , t2.field_len , t2.scale , t2.field_name , t.idno
from tbfieldmap t left join tbtadict t2 on t.templet = t2.templet and t.idno = t2.idno
where t.templet = '{template}' and file_type = '{fileType}'
order by t.templet, t.file_type , t.field_no
""" )
results = cursor.fetchall()
if results:
count = 0
dictMap = {}
for item in results:
# 0-templet,1-file_type,2-field_no,3-data_type,4-field_len,5-scale,5-field_name,7-idno
# idno : [类型, 开始位置, 长度]
dictMap[str(item[7])] = [item[0],item[1],item[2],item[3],item[4],item[5],item[6], count]
count += item[4]
if count != finalLen:
print(f"模板[{template}-{fileType}]数据库长度[{str(count)}]不等于案例长度[{str(finalLen)}]")
templateDictMap[template+"-"+fileType] = dictMap
df = pd.read_excel(excelPath, sheet_name="Sheet2", dtype='object', keep_default_na=True)
df.rename(columns=rename_map, inplace=True)
if "121" not in df.columns:
print("未提供销售商代码!", file=sys.stderr)
return
notUseCol = []
for col in df.columns:
if str(col) not in dictMap:
notUseCol.append(str(col))
if notUseCol:
print(f"excel存在没有用到的数据idno{notUseCol}!")
sellerCount = {}
for index, row in df.iterrows():
fileTypes = ["03"]
if row["121"] not in bankTATemplate:
template = "WMDEP1.1"
else:
template = bankTATemplate[row["121"]]
if str(template).startswith("WMDEP"):
fileTypes = ["005"]
if template != "WMDEP1.1":
if str(template).startswith("WMDEP"):
fileTypes.append("052")
else:
fileTypes.append("F3")
for fileType in fileTypes:
final_file = final_file_poath + "final-" + row["121"] + "-"+ template +"-" + fileType + ".txt"
key = template+"-"+fileType
sellerFileType = row["121"] +"-"+ fileType
finalStr = demoStr[key]
if sellerFileType not in sellerCount:
with open(final_file, mode='w', encoding='gb2312') as f:
f.write("")
sellerCount[sellerFileType] = 0
sellerCount[sellerFileType] += 1
dictMap = templateDictMap[key]
for col in df.columns:
column = str(col)
if column not in dictMap:
continue
if pd.isna(row[col]):
continue
# print("字段ID[%s],偏移量[%s],长度[%s]", column, dictMap[column][7], dictMap[column][4])
front = slice_up_to_bytes(finalStr, dictMap[column][7])
end = slice_from_byte_offset(finalStr, dictMap[column][7]+dictMap[column][4])
content = ''
if 'c' == dictMap[column][3].lower():
content = pad_by_bytes(str(row[col]), dictMap[column][4], direction='right')
else:
value = str(row[col])
if dictMap[column][5] > 0 :
# 精度大于0
print(value, file=sys.stderr)
content = pad_by_bytes(row[col], dictMap[column][4], pad_char="0")
finalStr = front + content + end
if len(finalStr.encode("gb2312")) != finalLen:
print(f"替换["+column+f"]后长度变化[{finalLen}]->[{len(finalStr.encode("gb2312"))}]!请检查!")
print(demoStr)
print(finalStr)
break
with open(final_file, mode='a', encoding='gb2312') as f:
f.write(finalStr+"\n")
finally:
if cnx:
cnx.close()
print("cnx 已关闭!")
if cursor:
cursor.close()
print("cursor 已关闭!")
def slice_up_to_bytes(text: str, max_bytes: int, encoding: str = 'gb2312') -> str:
"""
1. 截取到 A从字符串开始截取不超过指定字节数(A)的子字符串
"""
byte_count = 0
char_index = 0
for i, char in enumerate(text):
char_byte_len = len(char.encode(encoding))
if byte_count + char_byte_len > max_bytes:
break
byte_count += char_byte_len
char_index = i + 1
return text[:char_index]
def slice_from_byte_offset(text: str, start_byte: int, encoding: str = 'gb2312') -> str:
"""
2. B 开始截取从指定的字节偏移量(B)开始截取到字符串末尾
"""
bytes_passed = 0
start_char_index = len(text) # 默认为末尾如果start_byte超长则返回空字符串
for i, char in enumerate(text):
# 如果当前字符的起始字节位置已经达到或超过了指定的偏移量
if bytes_passed >= start_byte:
if bytes_passed > start_byte:
print("!!!")
start_char_index = i
break
bytes_passed += len(char.encode(encoding))
return text[start_char_index:]
def truncate_by_bytes(text: str, max_bytes: int, encoding: str = 'gb2312') -> str:
"""
安全地从字符串开始截取不超过指定字节数的子字符串
这是一个辅助函数用于处理超长情况
"""
encoded_text = text.encode(encoding)
if len(encoded_text) <= max_bytes:
return text
# 从字节串截断,并忽略可能产生的解码错误
return encoded_text[:max_bytes].decode(encoding, 'ignore')
def pad_by_bytes(
text: str,
total_byte_length: int,
encoding: str = 'gb2312',
pad_char: str = ' ',
direction: str = 'left'
) -> str:
"""
将字符串按指定字节长度进行补全
:param text: 原始字符串
:param total_byte_length: 补全后的目标总字节数
:param encoding: 用于计算字节长度的编码
:param pad_char: 用于补全的字符必须是单字节字符
:param direction: 补全方向, 'left' () 'right' ()
:return: 补全或截断后的字符串
"""
# 1. 验证补全字符必须是单字节字符
if len(pad_char.encode(encoding)) != 1:
raise ValueError(f"补全字符 '{pad_char}' 在编码 '{encoding}' 下不是单字节字符。")
# 2. 计算当前字符串的字节长度
current_byte_length = len(text.encode(encoding))
# 3. 如果当前长度超过目标长度,进行截断
if current_byte_length > total_byte_length:
print(text+"超过限制长度:" + str(total_byte_length), file=sys.stderr)
return truncate_by_bytes(text, total_byte_length, encoding)
# 4. 计算需要补全的字节数
bytes_to_pad = total_byte_length - current_byte_length
# 5. 生成补全字符串
padding_string = pad_char * bytes_to_pad
# 6. 根据方向进行补全
if direction == 'right':
return text + padding_string
elif direction == 'left':
return padding_string + text
else:
raise ValueError("补全方向 (direction) 必须是 'left''right'")
if __name__ == "__main__":
# spide()
demoStr = {
"BZD21-03": "202504010100000000000001w130001 20250401120000jyzhsellercod0001033 00000000000000000000000030000000020 00000 156 2025040100000000 00000000 00000 0000000000 0000000000000000测试认购 0000000000000000000000000 000000000 00000000 110000 00 00000 0000000000000000 00000 0000000000000000000000000"
,"BZD21-F3": "033 0012025040101002025040100010140802199205188210 01 C101021 0"
,"WMDEP1.0-005": "202504010100000000000001000000 0000 156 0000000000000000 130001 0 2025040120250401120000 0 00011630000000048020 000000000000000000000000000000000000000030000000022NY0000000048 000000 0000000000000000 00000 00000测试申购 0 000 000000000000000000000 00000 0000000000000000000000000 110000 "
,"WMDEP1.0-052": "2025040101000000000000010 130001 E90000001 00011630000000048020 NY00000000486214855862774691 110000C20250401000000000000000000001 01T20250401000000000000000000001C100020001 交通银行武汉支行 "
,"WMDEP1.1-005": "202504010100000000000001000000 0000 156 0000000000000000 130001 0 2025040120250401120000 0 00011630000000048028 000000000000000000000000000000000000000030000000022NY0000000048 000000 0000000000000000 00000 00000测试申购 0 000 000000000000000000000 00000 0000000000000000000000000 110000 6214855862774691 110000C20250401000000000000000000001 01T20250401000000000000000000001C100020001 交通银行武汉支行 "
}
excelPath = "C:\\Users\\huzhujiang\\Desktop\\测试数据\\测试造数excel.xlsx"
final_file_poath = "C:\\Users\\huzhujiang\\Desktop\\测试数据\\"
rename_map = {
"流水号": "8",
"销售商": "121",
"客户名称": "85",
"个人/机构标志": "98",
"理财账号": "136",
"交易账号": "120",
"证件类型": "27",
"证件号码": "72",
"个人养老金账户": "11103",
"个人养老金资金账户": "11104",
"理财行业平台养老金账户": "11105",
"投资者类别": "1012",
"其他证件名称": "1013",
"合格投资者标识": "1150",
"SPV资金托管账户开户行": "1151",
"其他资金托管账户开户行": "1152",
"核心客户号": "9904",
"关联活期存款账号": "9902",
"关联活期存款账号开户行代码": "10003",
"关联活期存款账号开户行名称": "10004",
"交易日期": "92",
"性别": "126",
"投资者身份日期": "23",
"个人机构类型": "325",
"投资者户名简称": "122",
"产品代码": "67",
"交易码": "135",
}
genTestTxtFromExcel(demoStr, excelPath, rename_map, final_file_poath)

112
zsjieSpiderComplete.py Normal file
View File

@ -0,0 +1,112 @@
import mysql.connector
from mysql.connector import errorcode
import requests
from bs4 import BeautifulSoup
DB_CONFIG = {
'user': 'zsjie', # 您的数据库用户名
'password': 'xRekX6Cc3RRK6mBe', # 您的数据库密码
'host': '111.180.203.166', # 您的数据库主机名 (例如: '127.0.0.1' 或 'localhost')
'database': 'zsjie', # 您要连接的数据库名称
'port': 25506,
'raise_on_warnings': True # 在警告时引发异常
}
def connect_to_mysql():
"""连接到 MySQL 数据库并返回连接和游标对象。"""
try:
cnx = mysql.connector.connect(**DB_CONFIG)
cursor = cnx.cursor()
print("成功连接到 MySQL 数据库")
return cnx, cursor
except mysql.connector.Error as err:
if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
print("访问被拒绝:用户名或密码错误")
elif err.errno == errorcode.ER_BAD_DB_ERROR:
print(f"数据库 '{DB_CONFIG['database']}' 不存在")
else:
print(f"连接 MySQL 时发生错误: {err}")
return None, None
def getHttpContext(url):
headers = {
'authority': 'www.zsjie.com',
'method': 'POST', # 通常 requests 库会自动处理,但明确写出无妨
'path': '/wp-admin/admin-ajax.php', # 通常 requests 库会自动处理
'scheme': 'https', # 通常 requests 库会自动处理
'Accept': 'text/html,*/*;q=0.01',
'Accept-Encoding': 'gzip, deflate, br, zstd',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
# 从图片中复制的 Cookie 值非常长,并且可能已过期或特定于某个会话。
# 您需要确保使用有效的 Cookie。
# 例如: 'PHPSESSID=your_session_id; wordpress_logged_in_...=your_login_cookie_value'
'Cookie': 'wordpress_012aa954c15bd0242a65bd94f397e616=xiaohu123%7C1750330106%7CFksFn5yihGolM16iilCGsndgKLMG7gJNsDOfHU2ubT0%7C61270247574c98cc268948c3c7c816a59384f854f68a4628f7cf28eac5cbe494; wordpress_sec_012aa954c15bd0242a65bd94f397e616=xiaohu123%7C1750330106%7CrTY9VDsB6sR3epotTH0AkYKpC58mmV6mQL1zF8aD92m%7Cb0bccbd512648a4019f6cdd38fd79dd95d2ee24b9c5e15f4f32682699306209e; PHPSESSID=5eb3ga6pcdliincbicktc0tvbe; wordpress_logged_in_012aa954c15bd0242a65bd94f397e616=xiaohu123%7C1750330106%7CrTY9VDsB6sR3epotTH0AkYKpC58mmV6mQL1zF8aD92m%7C516a19884972ca894cbaef06c0b5008763ba65df9d1118c1bccdba93d14368c6',
'Origin': 'https://www.zsjie.com',
'Priority': 'u=1, i', # 这个头可能不是所有情况都必须
'Referer': 'https://www.zsjie.com/61007.html', # 指示请求的来源页面
'Sec-Ch-Ua': '"Chromium";v="136", "Microsoft Edge";v="136", "Not.A/Brand";v="99"',
'Sec-Ch-Ua-Mobile': '?0', # ?0 表示非移动设备
'Sec-Ch-Ua-Platform': '"Windows"',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-origin',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36 Edg/136.0.0.0',
'X-Requested-With': 'XMLHttpRequest' # 表明这是一个 AJAX 请求
}
payload1 = {'action':'add_post_views_num', 'id':'61007'}
payload = {'action':'get_async_shop_down', 'post_id':'61007'}
response = requests.post( url=url, headers=headers, params=payload1, verify=False)
print(response.text)
response = requests.post( url=url, headers=headers, params=payload, verify=False)
print(response.text)
soup = BeautifulSoup(response.text, 'html.parser')
div_elements = soup.find_all('div', class_='btn-group btn-block mt-2')
target_link_prefix = "https://www.zsjie.com/goto?"
extracted_links = []
# if not div_elements:
# print(f"未找到 class div 元素。")
# print(f"找到了 {len(div_elements)} 个 div 元素。")
# for div in div_elements:
# # 在每个符合条件的 div 中查找所有的 a 标签
# a_tags = div.find_all('a', href=True) # href=True 确保只选择有 href 属性的 a 标签
# if not a_tags:
# print(f"在 div 中未找到 <a> 标签。")
# continue
#
# for a_tag in a_tags:
# href = a_tag['href']
# # 检查 href 是否以目标前缀开头
# if href.startswith(target_link_prefix):
# extracted_links.append(href)
# print(f" 提取到链接: {href}")
# else:
# print(f" 跳过链接 (前缀不匹配): {href}")
def spide():
cnx, cursor = connect_to_mysql()
try:
cursor.execute("select * from resource where resource_url =' ' and available_flag !='0' order by id desc limit 0,50 ")
results = cursor.fetchall()
if results:
for item in results:
print(item)
getHttpContext(item[1])
finally:
if cnx:
cnx.close()
print("cnx 已关闭!")
if cursor:
cursor.close()
print("cursor 已关闭!")
if __name__ == "__main__":
# spide()
getHttpContext('https://www.zsjie.com/wp-admin/admin-ajax.php')