您的位置:首页 > 编程语言 > Python开发

python3-通过pyCryptodome加解密sqlite3数据

2020-07-01 20:40 13 查看

通过AES加解密sqlite3数据库数据(CBC模式),可保存账号密码等隐私数据。目前该示例代码仍有许多不完善之处,抽空再完善。

代码示例如下:

from Crypto.Cipher import AES
from Crypto.Util.Padding import pad
from Crypto.Util.Padding import unpad
from Crypto.Random import get_random_bytes
import sqlite3

class Interface:
def __init__(self):
self.calc_man = AESdata()
self.text1 = 'Please input number:'
self.text2 = '1. En/Decrypt Center\n2. Database Center'
self.text3 = '1. Encrypt\n2. Decrypt'
self.text4 = '1. Create a new DB\n2. Select current DB'
self.text5 = '1. Insert\n2. update\n3. read\n4. delete\n5. move'
print('--------------------------------')
print('|           Database            |')
print('|      Infomation Center        |')
print('--------------------------------')

def select(self, show_info):
print(show_info)
num = input(self.text1)
return int(num)

def run(self):
num1 = self.select(self.text2)
if num1 == 1:
num2 = self.select(self.text3)
if num2 == 1:
path = input('File path: ')
self.calc_man.encrypt_data(path)
elif num2 == 2:
path = input('File path: ')
self.calc_man.decrypt_data(path)
elif num1 == 2:
num3 = self.select(self.text4)
if num3 == 1:
db_man = MyDB(input('Please input file name: ')).cDB()
elif num3 == 2:
db_man = MyDB(input('Please input file name: '))
num4 = self.select(self.text5)
if num4 == 1:
db_man.wDB()
elif num4 == 2:
db_man.uDB()
elif num4 == 3:
db_man.rDB()
elif num4 == 4:
db_man.dDB()
elif num4 == 5:
db_man.mDB()

class AESdata:
def __init__(self):
# 密钥key 长度必须为16(AES-128)、24(AES-192)或 32(AES-256)字节
self.key = b'1234567890ABCDEF'

def encrypt_data(self, path):
# 实例化加密套件,使用CBC模式
encrypt_cipher = AES.new(self.key, AES.MODE_CBC)
with open(path, 'rb') as f:
data = f.read()
# 对内容进行加密,pad函数用于分组和填充
encrypted_data = encrypt_cipher.encrypt(pad(data, AES.block_size))
new_path = F"{path}.bin"
with open(new_path, 'wb') as h:
# 在文件中依次写入iv和密文encrypted_data
[h.write(content) for content in (encrypt_cipher.iv, encrypted_data)]
print('Encrypted.')

def decrypt_data(self, path):
with open(path, 'rb') as f:
# 依次读取iv和密文encrypted_data,最后的-1则表示读取到文件末尾
iv, encrypted_data = [f.read(content) for content in (AES.block_size, -1)]
# 实例化加密套件
decrypt_cipher = AES.new(self.key, AES.MODE_CBC, iv)
# 解密
try:
decrypted_data = unpad(decrypt_cipher.decrypt(encrypted_data), AES.block_size)
new_path = F"{path.replace('.bin', '')}"
with open(new_path, 'wb') as h:
h.write(decrypted_data)
print('Decrypted.')
except ValueError as reason:
print(reason)

class MyDB:
def __init__(self, name):
self.name = name

def cDB(self):
# create a database
table_num = int(input('How many table would you want: '))
count_1 = 1
while count_1 <= table_num:
table= input(F"Please input table_{count_1}'s name: ")
columns = []
columns_num = int(input('How many columns would you want: '))
count_2 = 1
while count_2 <= columns_num:
text = F"Please complete column_{count_2} [format: name type (primary key)]:\n==>"
columns.append(input(text))
count_2 += 1
conn = sqlite3.connect(self.name)
man = conn.cursor()
columns_new = ""
for each in columns:
columns_new += F"{each}, "
add_time = "date timestamp not null default(datetime('now', 'localtime'))"
command = F"CREATE TABLE {table}({columns_new}{add_time})"
try:
man.execute(command)
conn.commit()
print(F'TABLE {table} created.')
except sqlite3.OperationalError as reason:
print(reason)
conn.close()
count_1 += 1
print(F'{self.name} created.')

def wDB(self):
# write values into database
self.rDB(0)
table = input("Which table would you want to write: ")
self.rDB(1, table)
content = input('Please input values [format: col_1 val_1, col_2 val_2, ...]:\n==>')
col_val = content.split(', ')
columns = ''
values = ''
conn = sqlite3.connect(self.name)
man = conn.cursor()
for each in col_val:
columns += F"{each.split(' ')[0]}, "
values += F"{each.split(' ')[1]}, "
columns = columns[:-2]
values = values[:-2]
command = F"INSERT INTO {table}({columns}) VALUES({values})"
try:
man.execute(command)
conn.commit()
print('Data Inserted.')
except sqlite3.IntegrityError as reason:
print(reason)
conn.close()

def uDB(self):
# update data in database with primary key
conn = sqlite3.connect(self.name)
man = conn.cursor()
self.rDB(0)
table = input("Which table would you want to update: ")
self.rDB(1, table)
content = input("Please input values [format: key='val'|col_1='val_1', col_2='val_2', ...]:\n==>")
key = content.split('|')[0]
values = content.split('|')[1]
command = F"UPDATE {table} set {values} where {key}"
try:
man.execute(command)
conn.commit()
print('Data Updated.')
except sqlite3.OperationalError as reason:
print(reason)
conn.close()

def rDB(self, num=0, table=''):
# read data from database
conn = sqlite3.connect(self.name)
man = conn.cursor()
if num == 0:
# show all tables
command = "SELECT name FROM sqlite_master WHERE type='table' ORDER BY name"
elif num == 1:
# show a table's volumns
command = F"pragma table_info({table})"
elif num == -1:
self.rDB(0)
table = input("Which table would you want to show: ")
command = F"SELECT * FROM {table}"
try:
for each in man.execute(command):
print(F'{each}')
except sqlite3.OperationalError as reason:
print(reason)
conn.close()

def dDB(self, table='example'):
# delete a database
conn = sqlite3.connect(self.name)
man = conn.cursor()
command = F"DROP TABLE '{table}'"
try:
man.execute(command)
conn.commit()
print('Delete successfully')
except sqlite3.OperationalError as reason:
print(reason)
conn.close()

def mDB(self):
# move data from one to another
pass

if __name__ == '__main__':
iron_man = Interface()
iron_man.run()

参考:加解密部分代码来源

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: