前言
学校开了一门密码学的课程,唯一比较感兴趣的是老师关于比特币的一些介绍,旷工、矿池、攻击,以及挖矿的原理,听起来真是不亦乐乎。快要结课了来了一个作业,让我们用Python是实现五种常用的加密方式,有对称加密如DES、SM4,也有非对称加密如RSA。要求就是简单实现,供用户使用的web前端是扩展,但是想着这个作业占20分的期末总分,还是卷一卷吧。于是想到之前用uni-app与flask搭建的智能监控,前端提供用户可以交互的界面,后端通过FLask设置的路由执行相应的操作,这个项目当然也可以按这个思路来。
技术栈
前端:Vue 3
组件:Element Plus
后端:Flask
语言:Python、Typescript、H5
功能库:Axios(http请求)、CORS(跨域)、Gmmsl(SM4加密库)、pyDes(DES加密库)
效果展示
前端
主页
文件上传 加密算法选择
操作按钮
后端
实现函数以及两个路由
原文:
DES加密:
移位加密:
一次一密:
RSA:
SM4:
代码
前端
主页
<template>
<div class="container">
<div class="container header">
<span>文件加密软件</span>
</div>
<div class="container main">
<el-upload
class="upload-demo"
drag
action="https://run.mocky.io/v3/9d059bf9-4660-45f2-925d-ce80ad6c4d15"
multiple
:headers="headers"
:file-list="fileList"
:on-change="handleChange"
:auto-upload="false"
>
<el-icon class="el-icon--upload"><upload-filled /></el-icon>
<div class="el-upload__text">推动文件到这里或者<em>点击我</em></div>
<template #tip>
<div class="el-upload__tip">
jpg/png files with a size less than 500kb
</div>
</template>
</el-upload>
<div class="slect">
<span>选择加密的算法</span>
<el-select v-model="optionValue" placeholder="Select">
<el-option
v-for="item in options"
:key="item.value"
:label="item.label"
:value="item.value"
:disabled="item.disabled"
/>
</el-select>
</div>
<div class="button">
<el-button type="danger" @click="dialogOfUpload = false"
>取 消</el-button
>
<el-button type="primary" @click="confirmUpload()">确 认</el-button>
<el-button @click="downloadFile()">下 载</el-button>
</div>
</div>
</div>
</template>
<script setup lang="ts">
import { VueElement, reactive, ref, resolveComponent } from "vue";
import { UploadFilled } from "@element-plus/icons-vue";
// import getItem from "@/api";
import axios from "axios";
import { ElMessage } from "element-plus";
// 设置axios的基准地址
axios.defaults.baseURL = "http://127.0.0.1:5000";
const fileList = reactive([]);
const headers = reactive({
"Content-Type": "multipart/form-data",
});
// 接受上传文件的列表
const handleChange = (file, newFileList) => {
fileList.value = newFileList;
};
const dialogOfUpload = ref(false);
const optionValue = ref("");
const options = [
{
value: "ONCECryption",
label: "ONCECryption",
},
{
value: "MOVECryption",
label: "MOVECryption",
},
{
value: "DESCryption",
label: "DESCryption",
},
{
value: "SM4Cryption",
label: "SM4Cryption",
},
{
value: "RSACryption",
label: "RSACryption",
},
];
// 处理文件下载操作
const downloadFile = () => {
return new Promise((resolve, reject) => {
// 配置axios参数
axios({
url: "/api/download",
method: "get",
params: { operation: optionValue.value },
responseType: "blob",
})
// 请求成功
.then((res) => {
try {
// 使用 blob 接受后端数据
const blob = new Blob([res.data], {
type: res.headers["Content-Type"],
});
console.log("blob", blob);
// 创建一个链接并设置内容
const url = window.URL.createObjectURL(blob);
// 创建一个链接元素以创建下载
const a = document.createElement("a");
a.style.display = "none";
a.href = url;
a.download = "sample.txt";
document.body.append(a);
// 触发下载
a.click();
// 清理
window.URL.revokeObjectURL(url);
} catch (e) {
console.log("下载错误", e);
}
})
.catch((err) => {
console.error(err);
});
});
};
// 请求后端对上传的文件进行加密
const confirmUpload = () => {
// 文件 需要使用FormData对象来存储
var param = new FormData();
fileList.value.forEach((val, index) => {
param.append("file", val.raw);
param.append("optionValue", optionValue.value);
});
axios
.post("/api/encryption", param)
.then((res) => {
console.log("response", res);
ElMessage({
message: "加密成功,请点击下载",
duration: 5000,
type: "success",
});
})
.catch((err) => {
ElMessage({
message: "加密失败,请查看路径是否正确",
duration: 1000,
type: "error",
});
console.log("err", err);
});
};
</script>
<style lang="scss" scoped>
@import "./index.scss";
</style>
样式
.container {
width: 100%;
height: 95vh;
// background: url("./images/bgNew.jpg") no-repeat;
background-color: rgb(241, 242, 244);
display: flex;
align-items: center;
// justify-content: center;
flex-direction: column;
padding: 10px;
.header {
font-family: YouSheBiaoTiHei;
font-size: 48px;
line-height: 78px;
width: 95%;
height: 15%;
border: 1px solid rgb(233, 231, 231);
border-radius: 6px;
background-color: #ffffff;
margin-top: 10px;
box-shadow: 0 0 12px rgba(0, 0, 0, 0.5);
}
.main {
width: 95%;
height: 80%;
border: 1px solid rgb(233, 231, 231);
border-radius: 6px;
background-color: #ffffff;
margin-top: 10px;
box-shadow: 0 0 12px rgba(0, 0, 0, 0.5);
align-items: center;
justify-content: center;
.slect {
display: flex;
flex-direction: column;
margin-top: 10px;
margin-bottom: 10px;
// width: 100%;
height: 100px;
}
}
}
后端
# 编写程序实现加密算法
import binascii
import io
import os
import random
from gmssl.sm4 import CryptSM4, SM4_ENCRYPT, SM4_DECRYPT
from pyDes import des, PAD_PKCS5, ECB
from flask import Flask, request, send_file
from flask_cors import CORS
app = Flask(__name__)
CORS(app, resources=r'/*')
raw_binary_list = []
content = ""
# 查看是否会将‘\n’ 也计算进去
# binary_dic = dict()
# for binary in binary_content.split(" "):
# raw_str = chr(int(binary, 2))
# if raw_str not in binary_dic.keys():
# binary_dic[raw_str] = 1
# else:
# binary_dic[raw_str] += 1
# print(binary_dic.items())
# 一次一密
def generate_binary(length):
return ''.join(random.choice('01') for _ in range(length))
def once_oneCryption():
# print("原始文本为:", " ".join(raw_binary_list))
# 获取每个字符编码后的长度,并生成相同长度的随机文件作为密钥
random_binary_list = [generate_binary(len(cha)) for cha in raw_binary_list]
# print("密钥为:", " ".join(random_binary_list))
with open("ONCE_key.txt", "w", encoding="utf-8") as file_write:
file_write.write(" ".join(random_binary_list))
print('ONCE 密钥存储成功')
# 与原始文件按位异或,实现加密
ciphertext = " ".join([str(bin(int(raw_binary_list[index], 2) ^ int(random_binary_list[index], 2))).replace('0b', '') for index in range(len(random_binary_list))])
# 打印密文
with open("ONCE_encryption.txt", "w", encoding="utf-8") as file_write:
file_write.write(ciphertext)
print("ONCE 加密成功")
# 将密文与密钥按位异或,实现解密
cryption_list = ciphertext.split(" ")
# 得到原文二进制文件
progress_list = [int(cryption_list[index], 2) ^ int(random_binary_list[index], 2)for index in range(len(cryption_list))]
# print("progress_list", progress_list)
# 恢复二进制为字符
raw_text = "".join([str(chr(binary)) for binary in progress_list])
# 打印文件内容
with open("ONCE_decryption.txt", "w", encoding="utf-8") as file_write:
file_write.write(raw_text)
print("ONCE 解密成功")
# 移位加密
def move_cryption():
# 生成随机密钥1~255
key = random.randint(1, 255)
# key = 200
with open('MOVE_key.txt', mode='w', encoding='utf-8') as writer:
writer.write(str(key))
print('MOVE 密钥存储成功')
# 原始文本
raw_value_list = [int(binary, 2) for binary in raw_binary_list]
# print("原始文本", raw_value_list)
# 加密
cryption_list = []
for binary in raw_binary_list:
if int(binary, 2) + key > 255:
cryption_list.append(str(int(binary, 2) + key - 255))
else:
cryption_list.append(str(int(binary, 2) + key))
with open('MOVE_encryption.txt', mode='w', encoding='utf-8') as writer:
writer.write(''.join(cryption_list))
print('MOVE 加密成功')
# print("密文为:", cryption_list)
# 解密
raw_text_list = "".join([str(chr(int(value) - key + 255)) if (int(value) - key) < 255 else str(chr(int(value) - key)) for value in cryption_list])
with open('MOVE_decryption.txt', mode='w', encoding='utf-8') as writer:
writer.write(raw_text_list)
print('MOVE 解密成功')
def DES():
# 随机字符串作为密钥 长度未为8
key_length = 8
key = "".join([random.choice('01') for index in range(key_length)])
with open('DES_key.txt', mode='w', encoding='utf-8') as writer:
writer.write(key)
print('DES 密钥存储成功')
# 使用pyDes 产生密钥 对原文进行加密
# 创建加密器
k = des(key, ECB, padmode=PAD_PKCS5)
# print("原文:", content)
# 将文本内容编码为 utf-8
encoded_content = content.encode('utf-8')
encryption = k.encrypt(encoded_content)
with open('DES_encryption.txt', mode='w', encoding='utf-8') as writer:
writer.write(str(encryption))
print('DES 加密成功')
# 使用pyDes 密钥 对原文进行解密
raw_text_encoded = k.decrypt(encryption)
raw_text = raw_text_encoded.decode('utf-8')
with open('DES_decryption.txt', mode='w', encoding='utf-8') as writer:
writer.write(raw_text)
print('DES 解密成功')
def SM4_encrypt(crypt_sm4, key, value):
"""
加密
:param crypt: 加密器
:param key: 密钥
:param value: 原文
:return:密文
"""
crypt_sm4.set_key(key, SM4_ENCRYPT)
encrypt_value = crypt_sm4.crypt_ecb(value)
return encrypt_value.hex()
def str_to_hexStr(hex_str):
"""
字符串转hex
:param hex_str:
:return:
"""
hex_data = hex_str.encode('utf-8')
str_bin = binascii.unhexlify(hex_data)
return str_bin.decode('utf-8')
def SM4_decrypt(crypt_sm4, key, value):
"""
解密
:param crypt_sm4: 解密器
:param key: 密钥
:param value: 密文
:return: 原文
"""
crypt_sm4.set_key(key, SM4_DECRYPT)
# 将十六进制系列转换成字节序列
decrypt_value = crypt_sm4.crypt_ecb(bytes.fromhex(value))
return str_to_hexStr(decrypt_value.hex())
def SM4():
# 每次读取4个字节 32位 文本
# 将文本进行encode
encoded_content = content.encode('utf-8')
# print(encoded_content)
# 每次读入4个字
chunk_size = 128
start = 0
end = start + chunk_size
# 创建加密器
crypt_sm4 = CryptSM4()
# 随机生成一个密钥
key = os.urandom(16)
with open('SM4_key.txt', mode='w', encoding='utf-8') as writer:
writer.write(f'{key}')
print('SM4 密钥存储成功')
encrypt_list = []
decrypt_list = []
# 加密
while True:
input = encoded_content[start: end - 1]
start = end + chunk_size
end = start + chunk_size
if end > len(encoded_content):
break
# 加密
encrypt_value = SM4_encrypt(crypt_sm4, key, input)
encrypt_list.append(encrypt_value)
with open('SM4_encryption.txt', mode='w', encoding='utf-8') as writer:
writer.write(''.join(encrypt_list))
print('SM4 加密成功')
# 解密
for item in encrypt_list:
decrypt_value = SM4_decrypt(crypt_sm4, key, item)
decrypt_list.append(decrypt_value)
with open('SM4_decryption.txt', mode='w', encoding='utf-8') as writer:
writer.write(''.join(decrypt_list))
print('SM4 解密成功')
def gcd(a, b):
if b == 0:
return a
else:
return gcd(b, a % b)
def ext_gcd(a, b):
if b == 0:
x1 = 1
y1 = 0
x = x1
y = y1
r = a
return r, x, y
else:
r, x1, y1 = ext_gcd(b, a % b)
x = y1
y = x1 - a // b * y1
return r, x, y
def get_key():
# 输入两个素数
p = 11
q = 13
# 计算n = P X Q \ Z = ( P -1 ) X ( Q - 1)
n = p * q
z = (p - 1) * (q - 1)
# 计算公私密钥 e X d = 1 (mod n) 通过欧几里得与扩展的欧几里得算法得到
e = z # 让e与z互素 且 d > 0
while e > 1:
e -= 1
r, x, y = ext_gcd(e, z)
if (gcd(e, z) == 1) & (x > 0):
break
d = x
return (n, e), (n, d)
def RSA_encrypt(x, key):
n = key[0]
e = key[1]
y = x ** e % n
return y
def RSA_decrypt(y, key):
n = key[0]
d = key[1]
x = y ** d % n
return x
def RSA():
# 生成公私钥
pub_k, pri_k = get_key()
with open('RSA_key.txt', mode='w', encoding='utf-8') as writer:
writer.write(f'{pub_k}, {pri_k}')
print('RSA 密钥存储成功')
# 每次读取4个字节 32位 文本
# 将文本进行encode
encoded_content = content.encode('utf-8')
# print(encoded_content)
# 每次读入4个字
start = 0
encrypt_list = []
decrypt_list = []
# 加密
while True:
input = content[start]
start += 1
if start >= len(content):
break
# print(input)
# 加密
y = RSA_encrypt(ord(input), pub_k)
encrypt_list.append(f'{y}')
# print('加密后', encrypt_list)
with open('RSA_encryption.txt', mode='w', encoding='utf-8') as writer:
writer.write(''.join(encrypt_list))
print("RSA 加密成功")
# 解密
for item in encrypt_list:
# 解密
x = RSA_decrypt(int(item), pri_k)
decrypt_list.append(chr(x))
with open('RSA_decryption.txt', mode='w', encoding='utf') as writer:
writer.write(''.join(decrypt_list))
print('RSA 解密成功')
def read_file(file_name):
# 文件路径
# file_name = "practice_record.txt"
global content
# 读取文件
with open(file_name, mode="r", encoding="utf-8") as file_reader:
content = file_reader.read()
# 将文件字符串进行编码 返回字符串的unicode编码 A -> 65
global raw_binary_list
raw_binary_list = [bin(ord(item)).replace('0b', '') for item in content]
@app.route('/api/download', methods=['GET'])
def down_file():
operation = request.args.get('operation')
print(operation)
content = ''
# 一次一密
if operation == "ONCECryption":
# 以二进制模式打开文本
with open('ONCE_encryption.txt', 'rb') as reader:
content = reader.read()
# 将文本内容转化为字节流
stream = io.BytesIO(content)
return send_file(stream, as_attachment=True, attachment_filename="ONCE_encryption.txt", mimetype='application/octet-stream')
elif operation == "MOVECryption":
# 以二进制模式打开文本
with open('MOVE_encryption.txt', 'rb') as reader:
content = reader.read()
# 将文本内容转化为字节流
stream = io.BytesIO(content)
return send_file(stream, as_attachment=True, attachment_filename="MOVE_encryption.txt",
mimetype='application/octet-stream')
elif operation == "DESCryption":
# 以二进制模式打开文本
with open('DES_encryption.txt', 'rb') as reader:
content = reader.read()
# 将文本内容转化为字节流
stream = io.BytesIO(content)
return send_file(stream, as_attachment=True, attachment_filename="DES_encryption.txt",
mimetype='application/octet-stream')
elif operation == "SM4Cryption":
# 以二进制模式打开文本
with open('SM4_encryption.txt', 'rb') as reader:
content = reader.read()
# 将文本内容转化为字节流
stream = io.BytesIO(content)
return send_file(stream, as_attachment=True, attachment_filename="SM4_encryption.txt",
mimetype='application/octet-stream')
elif operation == "RSACryption":
# 以二进制模式打开文本
with open('RSA_encryption.txt', 'rb') as reader:
content = reader.read()
# 将文本内容转化为字节流
stream = io.BytesIO(content)
return send_file(stream, as_attachment=True, attachment_filename="RSA_encryption.txt",
mimetype='application/octet-stream')
else:
return "error"
@app.route('/api/encryption', methods=['POST', 'GET'])
def handle_cryption():
# 接受上传的文件
fileList = request.files.getlist("file")
# 接受需要执行的加密算法
cryption_type = request.form.get("optionValue")
file_name = ""
for file in fileList:
# 读取数据
file_stream = file.stream.read()
with open(file.filename, 'wb') as writer:
writer.write(file_stream)
file_name = file.filename
print(fileList[0], cryption_type)
# 读取文件 得到原始文本和二进制的列表
read_file(file_name)
if cryption_type == "ONCECryption":
once_oneCryption()
elif cryption_type == "MOVECryption":
move_cryption()
elif cryption_type == "DESCryption":
DES()
elif cryption_type == "SM4Cryption":
SM4()
elif cryption_type == "RSACryption":
RSA()
else:
return "error"
return "data received"
if __name__ == '__main__':
# once_oneCryption()
# move_cryption()
# DES()
# SM4()
# RSA()
# print(raw_binary_list)
app.run(debug=True)
技术实现
界面设计
该项目对界面美观要求不高,功能也相对简单。采用一个上下占比2:8的布局,整体背景使用浅灰色,上下两个部分单独使用div包裹,对边框进行圆润以及添加的阴影使其与背景分离,简单地形成一定的层次感。
文件上传使用element plus中提供的组件,支持拖拽等功能,将其放置在最上面,紧接着下方放置一个多选框,用户可以选择自己需要使用的加密算法,最后提供三个按钮,取消、确定、下载。
功能分析
文件上传:elemeng plus在原生upload上进行开发,提供了可拖拽功能。为了能够获取到上传的文件,先初始化一个变量fileList,在el-upload中使用:file-list动态获取上传的文件,当上传多个文件时,文件会以列表的形式存储,而后使用axios讲文件封装成formdata格式的数据进行传输。
算法选择:使用el-elect组件,配置选项参数。初始化一个optionValue变量,将该变量放置在el-elect组件中使用:optionValue动态接受选项的值。
传输文件与选择的算法:文件使用变量filelist存储、算法使用变量optionValue存储。在后端已经启动了一个flask服务,地址为http://localhost,并且配置了加密路由与下载路由。配置axios目标地址、请求方法、传输数据,向后端发送。后端接受请求的参数,执行不同的加密算法。
下载文件:前端使用axios向后端下载路由发送需要下载的加密后的文件,后端以字节流的形式发送到前端,前端使用blob存放数据,创建一个链接并放置blob数据,再创建一个<a>标签用于触发下载事件。
接受文件并加密:后端使用flask启动一个web服务,将接受文件与执行加密操作放置在函数中,并为该函数配置一个路由地址,使得前端能够通过后端ip以及路由地址请求该函数。后端使用request函数接收前端传来的参数,根据参数调用不同的函数执行不同的加密算法。密钥与密文都会存储在本地的项目的根目录下。
返回密文文件:接收到前端发来的下载密文文件的请求,根据参数读取相应算法加密后的密文,读取文件时要配置open函数中mode参数为‘rb’,因为需要将文件以二进制读取,而后再通过send_file将二进制以字节流的方式发往前端。
问题与解决
1. 前后发送request请求,报了bad request的错误,大概率是因为前后发送与接收数据的方式不一致,比如前端使用下面的方式发送params,后端需要使用request.args.get()的方式接收
axios({
url: "/api/download",
method: "get",
params: { operation: optionValue.value },
responseType: "blob",
})
2. 针对跨域问题,在后端使用flask框架时,可以直接在后端使用CORS库解决这个问题,不需要在前端进行配置。
3. 针对前端下载文件这种任务,后端不能直接传文件,需要将文件以二进制的形式读取,再通过字节流的形式发送。