266 lines
9.2 KiB
Python
266 lines
9.2 KiB
Python
|
|
import ibm_db
|
|||
|
|
import os
|
|||
|
|
import datetime
|
|||
|
|
from log_config import d
|
|||
|
|
import pandas as pd
|
|||
|
|
import json
|
|||
|
|
# 连接信息L2本地
|
|||
|
|
dbn_database = "pltcm" #数据库名称
|
|||
|
|
dbn_uid = "app" #用户名
|
|||
|
|
dbn_pwd = "appapp1" #密码
|
|||
|
|
dbn_driver = "{IBM DB2 ODBC DRIVER}"
|
|||
|
|
dbn_port = "50000" #端口
|
|||
|
|
dbn_protocol = "TCPIP" #方式
|
|||
|
|
|
|||
|
|
# 创建本地数据库的 dbn 连接字符串
|
|||
|
|
dbn = (
|
|||
|
|
"DATABASE={0};UID={1};PWD={2};DRIVER={3};PORT={4};PROTOCOL={5};"
|
|||
|
|
.format(dbn_database, dbn_uid, dbn_pwd, dbn_driver, dbn_port, dbn_protocol)
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
# 数字钢卷数据库连接信息
|
|||
|
|
dsn_hostname = "10.25.101.64"
|
|||
|
|
dsn_uid = "dsc"
|
|||
|
|
dsn_pwd = "dscdsc1"
|
|||
|
|
dsn_driver = "{IBM DB2 ODBC DRIVER}"
|
|||
|
|
dsn_database = "appdb"
|
|||
|
|
dsn_port = "50000" # DB2通常使用端口50000,如果不是,需要更改为你的端口
|
|||
|
|
dsn_protocol = "TCPIP"
|
|||
|
|
|
|||
|
|
# 创建DSN连接字符串
|
|||
|
|
dsn = (
|
|||
|
|
"DRIVER={0};"
|
|||
|
|
"DATABASE={1};"
|
|||
|
|
"HOSTNAME={2};"
|
|||
|
|
"PORT={3};"
|
|||
|
|
"PROTOCOL={4};"
|
|||
|
|
"UID={5};"
|
|||
|
|
"PWD={6};").format(dsn_driver, dsn_database, dsn_hostname, dsn_port, dsn_protocol, dsn_uid, dsn_pwd)
|
|||
|
|
|
|||
|
|
|
|||
|
|
# MODFSPRE_ExplainDBWrite(MUID,前滑结果数据),
|
|||
|
|
# 写入本地数据库MODFSPRE,写入数字钢卷服务器MODFSPRE
|
|||
|
|
def MODFSPRE_PreDBWrite(MUID,SF_list):
|
|||
|
|
# 建立连接
|
|||
|
|
try:
|
|||
|
|
conn = ibm_db.connect(dbn, "", "")
|
|||
|
|
d.info("Connected to local database ")
|
|||
|
|
except Exception as e:
|
|||
|
|
d.error("Unable to connect local: ", str(e))
|
|||
|
|
|
|||
|
|
# 第二步,根据MUID检查数据是否存在
|
|||
|
|
muid_to_check = MUID
|
|||
|
|
#print(muid_to_check)
|
|||
|
|
check_sql = "SELECT COUNT(*) FROM MODFSPRE WHERE COIL_ID=?"
|
|||
|
|
stmt = ibm_db.prepare(conn, check_sql)
|
|||
|
|
ibm_db.bind_param(stmt, 1, muid_to_check)
|
|||
|
|
ibm_db.execute(stmt)
|
|||
|
|
result = ibm_db.fetch_tuple(stmt)
|
|||
|
|
|
|||
|
|
# 第三步,根据查询结果决定是插入还是更新
|
|||
|
|
SF_COEF1=SF_list[0]
|
|||
|
|
SF_COEF2=SF_list[1]
|
|||
|
|
SF_COEF3=SF_list[2]
|
|||
|
|
SF_COEF4=SF_list[3]
|
|||
|
|
SF_COEF5=SF_list[4]
|
|||
|
|
|
|||
|
|
if result[0] > 0:
|
|||
|
|
# 更新操作
|
|||
|
|
update_sql = "UPDATE MODFSPRE SET SF_COEF1=?, SF_COEF2=?, SF_COEF3=?, SF_COEF4=?, SF_COEF5=? WHERE COIL_ID=?"
|
|||
|
|
update_params = (SF_COEF1, SF_COEF2, SF_COEF3, SF_COEF4, SF_COEF5, muid_to_check)
|
|||
|
|
stmt = ibm_db.prepare(conn, update_sql)
|
|||
|
|
if ibm_db.execute(stmt, update_params):
|
|||
|
|
d.info("local Data updated successfully")
|
|||
|
|
else:
|
|||
|
|
d.error("Error updating local data:", ibm_db.stmt_errormsg())
|
|||
|
|
else:
|
|||
|
|
# 插入操作
|
|||
|
|
insert_sql = "INSERT INTO MODFSPRE (COIL_ID, SF_COEF1, SF_COEF2, SF_COEF3, SF_COEF4, SF_COEF5) VALUES (?, ?, ?, ?, ?, ?)"
|
|||
|
|
insert_params = (muid_to_check, SF_COEF1, SF_COEF2, SF_COEF3, SF_COEF4, SF_COEF5)
|
|||
|
|
stmt = ibm_db.prepare(conn, insert_sql)
|
|||
|
|
if ibm_db.execute(stmt, insert_params):
|
|||
|
|
d.info("local Data inserted successfully")
|
|||
|
|
else:
|
|||
|
|
d.error("Error inserting local data:", ibm_db.stmt_errormsg())
|
|||
|
|
|
|||
|
|
# 第四步,关闭数据库连接
|
|||
|
|
ibm_db.close(conn)
|
|||
|
|
|
|||
|
|
# 建立连接
|
|||
|
|
try:
|
|||
|
|
conn = ibm_db.connect(dsn, "", "")
|
|||
|
|
d.info("Connected to database")
|
|||
|
|
except Exception as e:
|
|||
|
|
d.error("Unable to connect: ", str(e))
|
|||
|
|
|
|||
|
|
# 第二步,根据MUID检查数据是否存在
|
|||
|
|
check_sql = "SELECT COUNT(*) FROM T_MODFSPRE WHERE COIL_ID=?"
|
|||
|
|
stmt = ibm_db.prepare(conn, check_sql)
|
|||
|
|
ibm_db.bind_param(stmt, 1, muid_to_check)
|
|||
|
|
ibm_db.execute(stmt)
|
|||
|
|
result = ibm_db.fetch_tuple(stmt)
|
|||
|
|
|
|||
|
|
# 第三步,根据查询结果决定是插入还是更新
|
|||
|
|
|
|||
|
|
if result[0] > 0:
|
|||
|
|
# 更新操作
|
|||
|
|
update_sql = "UPDATE T_MODFSPRE SET SF_COEF1=?, SF_COEF2=?, SF_COEF3=?, SF_COEF4=?, SF_COEF5=? WHERE COIL_ID=?"
|
|||
|
|
update_params = (SF_COEF1, SF_COEF2, SF_COEF3, SF_COEF4, SF_COEF5, muid_to_check)
|
|||
|
|
stmt = ibm_db.prepare(conn, update_sql)
|
|||
|
|
if ibm_db.execute(stmt, update_params):
|
|||
|
|
d.info("Data updated successfully")
|
|||
|
|
else:
|
|||
|
|
d.error("Error updating data:", ibm_db.stmt_errormsg())
|
|||
|
|
else:
|
|||
|
|
# 插入操作
|
|||
|
|
insert_sql = "INSERT INTO T_MODFSPRE (COIL_ID, SF_COEF1, SF_COEF2, SF_COEF3, SF_COEF4, SF_COEF5) VALUES (?, ?, ?, ?, ?, ?)"
|
|||
|
|
insert_params = (muid_to_check, SF_COEF1, SF_COEF2, SF_COEF3, SF_COEF4, SF_COEF5)
|
|||
|
|
stmt = ibm_db.prepare(conn, insert_sql)
|
|||
|
|
if ibm_db.execute(stmt, insert_params):
|
|||
|
|
d.info("Data inserted successfully")
|
|||
|
|
else:
|
|||
|
|
d.error("Error inserting data:", ibm_db.stmt_errormsg())
|
|||
|
|
|
|||
|
|
# 第四步,关闭数据库连接
|
|||
|
|
ibm_db.close(conn)
|
|||
|
|
|
|||
|
|
# MODFSPRE_ExplainDBWrite(MUID,写入内容,机架号),
|
|||
|
|
# 写入本地数据库MODFSPRE,写入数字钢卷服务器MODFSPRE
|
|||
|
|
def MODFSPRE_ExplainDBWrite(MUID,json_data,num):
|
|||
|
|
# 建立连接
|
|||
|
|
# try:
|
|||
|
|
# conn = ibm_db.connect(dbn, "", "")
|
|||
|
|
# d.info("Connected to database")
|
|||
|
|
# except Exception as e:
|
|||
|
|
# d.error("Unable to connect: ", str(e))
|
|||
|
|
|
|||
|
|
# 第二步,根据MUID检查数据是否存在
|
|||
|
|
muid_to_check = MUID
|
|||
|
|
# check_sql = "SELECT COUNT(*) FROM MODFSPRE WHERE COIL_ID=?"
|
|||
|
|
# stmt = ibm_db.prepare(conn, check_sql)
|
|||
|
|
# ibm_db.bind_param(stmt, 1, muid_to_check)
|
|||
|
|
# ibm_db.execute(stmt)
|
|||
|
|
# result = ibm_db.fetch_tuple(stmt)
|
|||
|
|
|
|||
|
|
# 第三步,根据查询结果决定是插入还是更新
|
|||
|
|
EXPLAIN=json_data
|
|||
|
|
|
|||
|
|
# if result[0] > 0:
|
|||
|
|
# 更新操作
|
|||
|
|
# update_sql = "UPDATE MODFSPRE SET EXPLAIN%d=? WHERE COIL_ID=?"%num
|
|||
|
|
# update_params = (EXPLAIN, muid_to_check)
|
|||
|
|
# stmt = ibm_db.prepare(conn, update_sql)
|
|||
|
|
# if ibm_db.execute(stmt, update_params):
|
|||
|
|
# d.info("Data updated successfully")
|
|||
|
|
# else:
|
|||
|
|
# d.error("Error updating data:", ibm_db.stmt_errormsg())
|
|||
|
|
# else:
|
|||
|
|
# 插入操作
|
|||
|
|
# insert_sql = "INSERT INTO MODFSPRE (COIL_ID, EXPLAIN%d) VALUES (?, ?)"%num
|
|||
|
|
# insert_params = (muid_to_check, EXPLAIN)
|
|||
|
|
# stmt = ibm_db.prepare(conn, insert_sql)
|
|||
|
|
# if ibm_db.execute(stmt, insert_params):
|
|||
|
|
# d.info("Data inserted successfully")
|
|||
|
|
# else:
|
|||
|
|
# d.error("Error inserting data:", ibm_db.stmt_errormsg())
|
|||
|
|
|
|||
|
|
# 第四步,关闭数据库连接
|
|||
|
|
# ibm_db.close(conn)
|
|||
|
|
|
|||
|
|
|
|||
|
|
# 建立连接
|
|||
|
|
try:
|
|||
|
|
conn = ibm_db.connect(dsn, "", "")
|
|||
|
|
d.info("Connected to database")
|
|||
|
|
except Exception as e:
|
|||
|
|
d.error("Unable to connect: ", str(e))
|
|||
|
|
|
|||
|
|
# 第二步,根据MUID检查数据是否存在
|
|||
|
|
check_sql = "SELECT COUNT(*) FROM T_MODFSPRE WHERE COIL_ID=?"
|
|||
|
|
stmt = ibm_db.prepare(conn, check_sql)
|
|||
|
|
ibm_db.bind_param(stmt, 1, muid_to_check)
|
|||
|
|
ibm_db.execute(stmt)
|
|||
|
|
result = ibm_db.fetch_tuple(stmt)
|
|||
|
|
|
|||
|
|
# 第三步,根据查询结果决定是插入还是更新
|
|||
|
|
|
|||
|
|
if result[0] > 0:
|
|||
|
|
# 更新操作
|
|||
|
|
update_sql = "UPDATE T_MODFSPRE SET EXPLAIN%d=? WHERE COIL_ID=?"%num
|
|||
|
|
update_params = (EXPLAIN, muid_to_check)
|
|||
|
|
stmt = ibm_db.prepare(conn, update_sql)
|
|||
|
|
if ibm_db.execute(stmt, update_params):
|
|||
|
|
d.info("Data updated successfully")
|
|||
|
|
else:
|
|||
|
|
d.error("Error updating data:", ibm_db.stmt_errormsg())
|
|||
|
|
else:
|
|||
|
|
# 插入操作
|
|||
|
|
insert_sql = "INSERT INTO T_MODFSPRE (COIL_ID, EXPLAIN%d) VALUES (?, ?)"%num
|
|||
|
|
insert_params = (muid_to_check, EXPLAIN)
|
|||
|
|
stmt = ibm_db.prepare(conn, insert_sql)
|
|||
|
|
if ibm_db.execute(stmt, insert_params):
|
|||
|
|
d.info("Data inserted successfully")
|
|||
|
|
else:
|
|||
|
|
d.error("Error inserting data:", ibm_db.stmt_errormsg())
|
|||
|
|
|
|||
|
|
# 第四步,关闭数据库连接
|
|||
|
|
ibm_db.close(conn)
|
|||
|
|
|
|||
|
|
# ReadEntireTable(数据库表名,数据库列名)
|
|||
|
|
# 直接读取整张数据库表的名字,写入csv中
|
|||
|
|
def ReadEntireTable(Tbl_name):
|
|||
|
|
d.info(f"ReadEntireTable: {Tbl_name} start")
|
|||
|
|
try:
|
|||
|
|
conn = ibm_db.connect(dbn, "", "")
|
|||
|
|
d.info("Connected to local database ")
|
|||
|
|
except Exception as e:
|
|||
|
|
d.error("Unable to connect local: ", str(e))
|
|||
|
|
# 准备SQL查询语句
|
|||
|
|
sql = "SELECT * FROM %s"%Tbl_name
|
|||
|
|
# 执行SQL查询
|
|||
|
|
stmt = ibm_db.exec_immediate(conn, sql)
|
|||
|
|
# 从结果中获取所有数据并转换为DataFrame
|
|||
|
|
columns = [ibm_db.field_name(stmt, i) for i in range(ibm_db.num_fields(stmt))]
|
|||
|
|
data = []
|
|||
|
|
row = ibm_db.fetch_tuple(stmt)
|
|||
|
|
while row:
|
|||
|
|
data.append(row)
|
|||
|
|
row = ibm_db.fetch_tuple(stmt)
|
|||
|
|
|
|||
|
|
df = pd.DataFrame(data, columns=columns)
|
|||
|
|
d.info(f"len(df) = '{len(df)}' ")
|
|||
|
|
# 获取当前日期
|
|||
|
|
i = datetime.datetime.now()
|
|||
|
|
ModFSSavePath = "/users/app/src/ModFSPre/"+'%d/' % i.year
|
|||
|
|
# 检查目录是否存在
|
|||
|
|
if not os.path.exists(ModFSSavePath):
|
|||
|
|
# 如果目录不存在,则创建目录
|
|||
|
|
os.makedirs(ModFSSavePath)
|
|||
|
|
d.info(f"Directory '{ModFSSavePath}' created successfully.")
|
|||
|
|
else:
|
|||
|
|
d.info(f"Directory '{ModFSSavePath}' already exists.")
|
|||
|
|
# 将DataFrame保存为 CSV 文件
|
|||
|
|
df.to_csv(ModFSSavePath +'%d' % i.month+'%d' % i.day+'%s_data.csv'%Tbl_name, index=False)
|
|||
|
|
# 关闭数据库连接
|
|||
|
|
ibm_db.close(conn)
|
|||
|
|
|
|||
|
|
|
|||
|
|
|
|||
|
|
if __name__ == "__main__":
|
|||
|
|
#test MODFSPRE_PreDBWrite
|
|||
|
|
#MUID = "1111"
|
|||
|
|
#SF_list = [0.3, 0.4, 0.5, 0.6, 0.7]
|
|||
|
|
#MODFSPRE_PreDBWrite(MUID,SF_list)
|
|||
|
|
|
|||
|
|
#test MODFSPRE_ExplainDBWrite
|
|||
|
|
MUID = "62"
|
|||
|
|
json_data = [{
|
|||
|
|
"key1": 1,
|
|||
|
|
"key2": 2
|
|||
|
|
}]
|
|||
|
|
json_string = json.dumps(json_data)
|
|||
|
|
MODFSPRE_ExplainDBWrite(MUID,json_string,2)
|
|||
|
|
|
|||
|
|
#test ReadEntireTable
|
|||
|
|
#ReadEntireTable('MODAD')
|