

发布于 2026-01-05 12:00419浏览import xbot
from xbot import print, sleep
from .import package
from .package import variables as glv
import mysql.connector
import requests
import time
import logging
# === 配置区 ===
MYSQL_CONFIG = {
'user': '*****',#数据库账户
'password': '********', #数据库密码
'host': '*******************',#数据库ip
'database': '测试1' #数据库 名称
}
FEISHU_APP_ID = '*******'#飞书APP_ID
FEISHU_APP_SECRET = '************'#飞书APP_SECRET
APP_TOKEN = 'DCozboLGxawiHisF7CWcYZ59n7b'#飞书APP_TOKEN
TABLE_ID = 'tblM2M0fRgbalbBG'#飞书TABLE_ID
# 所需要导入的字段,映射:MySQL 列名 → 飞书多维表格字段名
FIELD_MAPPING = {
#数据库列名 飞书字段名
'***': '***',
'***': '***',
'**': '**'
}
# === 日志配置 ===
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# === 获取飞书 tenant_access_token ===
def get_tenant_access_token():
url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal"
payload = {"app_id": FEISHU_APP_ID, "app_secret": FEISHU_APP_SECRET}
resp = requests.post(url, json=payload)
if resp.status_code == 200:
data = resp.json()
if data.get("code") == 0:
return data["tenant_access_token"]
else:
logging.error(f"获取 token 失败: {data}")
raise Exception("Failed to get access token")
else:
raise Exception(f"HTTP {resp.status_code}: {resp.text}")
# === 从 MySQL 读取数据(全量 + 限流测试)===
def fetch_mysql_data():
cnx = mysql.connector.connect(**MYSQL_CONFIG)
cursor = cnx.cursor(dictionary=True)
# 先只取 10 条测试!避免写入太多重复数据
cursor.execute("""
SELECT
`***`,
`***`,
`**`
FROM !! #所需要查找的数据 !!表名
LIMIT 10 #编写sql查询语句
""")
rows = cursor.fetchall()
cursor.close()
cnx.close()
return rows
# === 向多维表格写入记录(仅新增,无去重)===
def sync_to_feishu(rows, token):
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json; charset=utf-8"
}
url = f"https://open.feishu.cn/open-apis/bitable/v1/apps/{APP_TOKEN}/tables/{TABLE_ID}/records"
for row in rows:
fields = {}
for mysql_col, feishu_field in FIELD_MAPPING.items():
value = row.get(mysql_col) # 使用 .get() 避免 KeyError
fields[feishu_field] = value if value is not None else ""
payload = {"fields": fields}
resp = requests.post(url, headers=headers, json=payload)
if resp.status_code == 200:
logging.info(f" 成功同步: {fields['**']}")
else:
logging.error(f"同步失败 {fields.get('**', 'N/A')}: {resp.text}")
time.sleep(0.1) # 避免触发飞书 API 频率限制
# === 主函数 ===
def main(args=None):
try:
logging.info("开始同步 MySQL → 飞书多维表格(测试模式)")
token = get_tenant_access_token()
data = fetch_mysql_data()
logging.info(f"共获取 {len(data)} 条测试数据")
if not data:
logging.warning("未查询到任何数据,请检查表名和字段名是否正确!")
return
sync_to_feishu(data, token)
logging.info(" 测试同步完成!")
except Exception as e:
logging.exception(" 同步过程中出错")
raise
if __name__ == "__main__":
main()