eis/TestProject/pyml/pydb_test.py

266 lines
9.2 KiB
Python
Raw Permalink Normal View History

import ibm_db
import os
import datetime
from log_config import d
import pandas as pd
import json
# 连接信息L2本地
dbn_database = "pltcm" #数据库名称
dbn_uid = "app" #用户名
dbn_pwd = "appapp1" #密码
dbn_driver = "{IBM DB2 ODBC DRIVER}"
dbn_port = "50000" #端口
dbn_protocol = "TCPIP" #方式
# 创建本地数据库的 dbn 连接字符串
dbn = (
"DATABASE={0};UID={1};PWD={2};DRIVER={3};PORT={4};PROTOCOL={5};"
.format(dbn_database, dbn_uid, dbn_pwd, dbn_driver, dbn_port, dbn_protocol)
)
# 数字钢卷数据库连接信息
dsn_hostname = "10.25.101.64"
dsn_uid = "dsc"
dsn_pwd = "dscdsc1"
dsn_driver = "{IBM DB2 ODBC DRIVER}"
dsn_database = "appdb"
dsn_port = "50000" # DB2通常使用端口50000如果不是需要更改为你的端口
dsn_protocol = "TCPIP"
# 创建DSN连接字符串
dsn = (
"DRIVER={0};"
"DATABASE={1};"
"HOSTNAME={2};"
"PORT={3};"
"PROTOCOL={4};"
"UID={5};"
"PWD={6};").format(dsn_driver, dsn_database, dsn_hostname, dsn_port, dsn_protocol, dsn_uid, dsn_pwd)
# MODFSPRE_ExplainDBWrite(MUID,前滑结果数据)
# 写入本地数据库MODFSPRE写入数字钢卷服务器MODFSPRE
def MODFSPRE_PreDBWrite(MUID,SF_list):
# 建立连接
try:
conn = ibm_db.connect(dbn, "", "")
d.info("Connected to local database ")
except Exception as e:
d.error("Unable to connect local: ", str(e))
# 第二步根据MUID检查数据是否存在
muid_to_check = MUID
#print(muid_to_check)
check_sql = "SELECT COUNT(*) FROM MODFSPRE WHERE COIL_ID=?"
stmt = ibm_db.prepare(conn, check_sql)
ibm_db.bind_param(stmt, 1, muid_to_check)
ibm_db.execute(stmt)
result = ibm_db.fetch_tuple(stmt)
# 第三步,根据查询结果决定是插入还是更新
SF_COEF1=SF_list[0]
SF_COEF2=SF_list[1]
SF_COEF3=SF_list[2]
SF_COEF4=SF_list[3]
SF_COEF5=SF_list[4]
if result[0] > 0:
# 更新操作
update_sql = "UPDATE MODFSPRE SET SF_COEF1=?, SF_COEF2=?, SF_COEF3=?, SF_COEF4=?, SF_COEF5=? WHERE COIL_ID=?"
update_params = (SF_COEF1, SF_COEF2, SF_COEF3, SF_COEF4, SF_COEF5, muid_to_check)
stmt = ibm_db.prepare(conn, update_sql)
if ibm_db.execute(stmt, update_params):
d.info("local Data updated successfully")
else:
d.error("Error updating local data:", ibm_db.stmt_errormsg())
else:
# 插入操作
insert_sql = "INSERT INTO MODFSPRE (COIL_ID, SF_COEF1, SF_COEF2, SF_COEF3, SF_COEF4, SF_COEF5) VALUES (?, ?, ?, ?, ?, ?)"
insert_params = (muid_to_check, SF_COEF1, SF_COEF2, SF_COEF3, SF_COEF4, SF_COEF5)
stmt = ibm_db.prepare(conn, insert_sql)
if ibm_db.execute(stmt, insert_params):
d.info("local Data inserted successfully")
else:
d.error("Error inserting local data:", ibm_db.stmt_errormsg())
# 第四步,关闭数据库连接
ibm_db.close(conn)
# 建立连接
try:
conn = ibm_db.connect(dsn, "", "")
d.info("Connected to database")
except Exception as e:
d.error("Unable to connect: ", str(e))
# 第二步根据MUID检查数据是否存在
check_sql = "SELECT COUNT(*) FROM T_MODFSPRE WHERE COIL_ID=?"
stmt = ibm_db.prepare(conn, check_sql)
ibm_db.bind_param(stmt, 1, muid_to_check)
ibm_db.execute(stmt)
result = ibm_db.fetch_tuple(stmt)
# 第三步,根据查询结果决定是插入还是更新
if result[0] > 0:
# 更新操作
update_sql = "UPDATE T_MODFSPRE SET SF_COEF1=?, SF_COEF2=?, SF_COEF3=?, SF_COEF4=?, SF_COEF5=? WHERE COIL_ID=?"
update_params = (SF_COEF1, SF_COEF2, SF_COEF3, SF_COEF4, SF_COEF5, muid_to_check)
stmt = ibm_db.prepare(conn, update_sql)
if ibm_db.execute(stmt, update_params):
d.info("Data updated successfully")
else:
d.error("Error updating data:", ibm_db.stmt_errormsg())
else:
# 插入操作
insert_sql = "INSERT INTO T_MODFSPRE (COIL_ID, SF_COEF1, SF_COEF2, SF_COEF3, SF_COEF4, SF_COEF5) VALUES (?, ?, ?, ?, ?, ?)"
insert_params = (muid_to_check, SF_COEF1, SF_COEF2, SF_COEF3, SF_COEF4, SF_COEF5)
stmt = ibm_db.prepare(conn, insert_sql)
if ibm_db.execute(stmt, insert_params):
d.info("Data inserted successfully")
else:
d.error("Error inserting data:", ibm_db.stmt_errormsg())
# 第四步,关闭数据库连接
ibm_db.close(conn)
# MODFSPRE_ExplainDBWrite(MUID,写入内容,机架号)
# 写入本地数据库MODFSPRE写入数字钢卷服务器MODFSPRE
def MODFSPRE_ExplainDBWrite(MUID,json_data,num):
# 建立连接
# try:
# conn = ibm_db.connect(dbn, "", "")
# d.info("Connected to database")
# except Exception as e:
# d.error("Unable to connect: ", str(e))
# 第二步根据MUID检查数据是否存在
muid_to_check = MUID
# check_sql = "SELECT COUNT(*) FROM MODFSPRE WHERE COIL_ID=?"
# stmt = ibm_db.prepare(conn, check_sql)
# ibm_db.bind_param(stmt, 1, muid_to_check)
# ibm_db.execute(stmt)
# result = ibm_db.fetch_tuple(stmt)
# 第三步,根据查询结果决定是插入还是更新
EXPLAIN=json_data
# if result[0] > 0:
# 更新操作
# update_sql = "UPDATE MODFSPRE SET EXPLAIN%d=? WHERE COIL_ID=?"%num
# update_params = (EXPLAIN, muid_to_check)
# stmt = ibm_db.prepare(conn, update_sql)
# if ibm_db.execute(stmt, update_params):
# d.info("Data updated successfully")
# else:
# d.error("Error updating data:", ibm_db.stmt_errormsg())
# else:
# 插入操作
# insert_sql = "INSERT INTO MODFSPRE (COIL_ID, EXPLAIN%d) VALUES (?, ?)"%num
# insert_params = (muid_to_check, EXPLAIN)
# stmt = ibm_db.prepare(conn, insert_sql)
# if ibm_db.execute(stmt, insert_params):
# d.info("Data inserted successfully")
# else:
# d.error("Error inserting data:", ibm_db.stmt_errormsg())
# 第四步,关闭数据库连接
# ibm_db.close(conn)
# 建立连接
try:
conn = ibm_db.connect(dsn, "", "")
d.info("Connected to database")
except Exception as e:
d.error("Unable to connect: ", str(e))
# 第二步根据MUID检查数据是否存在
check_sql = "SELECT COUNT(*) FROM T_MODFSPRE WHERE COIL_ID=?"
stmt = ibm_db.prepare(conn, check_sql)
ibm_db.bind_param(stmt, 1, muid_to_check)
ibm_db.execute(stmt)
result = ibm_db.fetch_tuple(stmt)
# 第三步,根据查询结果决定是插入还是更新
if result[0] > 0:
# 更新操作
update_sql = "UPDATE T_MODFSPRE SET EXPLAIN%d=? WHERE COIL_ID=?"%num
update_params = (EXPLAIN, muid_to_check)
stmt = ibm_db.prepare(conn, update_sql)
if ibm_db.execute(stmt, update_params):
d.info("Data updated successfully")
else:
d.error("Error updating data:", ibm_db.stmt_errormsg())
else:
# 插入操作
insert_sql = "INSERT INTO T_MODFSPRE (COIL_ID, EXPLAIN%d) VALUES (?, ?)"%num
insert_params = (muid_to_check, EXPLAIN)
stmt = ibm_db.prepare(conn, insert_sql)
if ibm_db.execute(stmt, insert_params):
d.info("Data inserted successfully")
else:
d.error("Error inserting data:", ibm_db.stmt_errormsg())
# 第四步,关闭数据库连接
ibm_db.close(conn)
# ReadEntireTable(数据库表名,数据库列名)
# 直接读取整张数据库表的名字写入csv中
def ReadEntireTable(Tbl_name):
d.info(f"ReadEntireTable: {Tbl_name} start")
try:
conn = ibm_db.connect(dbn, "", "")
d.info("Connected to local database ")
except Exception as e:
d.error("Unable to connect local: ", str(e))
# 准备SQL查询语句
sql = "SELECT * FROM %s"%Tbl_name
# 执行SQL查询
stmt = ibm_db.exec_immediate(conn, sql)
# 从结果中获取所有数据并转换为DataFrame
columns = [ibm_db.field_name(stmt, i) for i in range(ibm_db.num_fields(stmt))]
data = []
row = ibm_db.fetch_tuple(stmt)
while row:
data.append(row)
row = ibm_db.fetch_tuple(stmt)
df = pd.DataFrame(data, columns=columns)
d.info(f"len(df) = '{len(df)}' ")
# 获取当前日期
i = datetime.datetime.now()
ModFSSavePath = "/users/app/src/ModFSPre/"+'%d/' % i.year
# 检查目录是否存在
if not os.path.exists(ModFSSavePath):
# 如果目录不存在,则创建目录
os.makedirs(ModFSSavePath)
d.info(f"Directory '{ModFSSavePath}' created successfully.")
else:
d.info(f"Directory '{ModFSSavePath}' already exists.")
# 将DataFrame保存为 CSV 文件
df.to_csv(ModFSSavePath +'%d' % i.month+'%d' % i.day+'%s_data.csv'%Tbl_name, index=False)
# 关闭数据库连接
ibm_db.close(conn)
if __name__ == "__main__":
#test MODFSPRE_PreDBWrite
#MUID = "1111"
#SF_list = [0.3, 0.4, 0.5, 0.6, 0.7]
#MODFSPRE_PreDBWrite(MUID,SF_list)
#test MODFSPRE_ExplainDBWrite
MUID = "62"
json_data = [{
"key1": 1,
"key2": 2
}]
json_string = json.dumps(json_data)
MODFSPRE_ExplainDBWrite(MUID,json_string,2)
#test ReadEntireTable
#ReadEntireTable('MODAD')