python定时上传实现
定时随机上传一张图片
import requests
import os
from apscheduler.schedulers.blocking import BlockingScheduler
import random
from PIL import Image
import time
from pathlib import Path
import cv2 as cv
import time
def post_url():
url = ""
file_a1 = ["a1", "a11", "a111", "a1111", "a11111"]
file_a2 = ["a3", "a33", "a333", "a3333", "a33333"]
file_res = '.' + os.sep + str(Path("res")) + os.sep
path_parent = Path(file_res).parent
print(f"path_parent:{
path_parent}")
# file_res = "D:" + os.sep + "dingshi" + os.sep + "res" + os.sep
random_a1 = random.choice(file_a1)
random_a1_dir = str(Path(random_a1)) + os.sep
if os.path.isdir(random_a1_dir):
for file in os.listdir(random_a1_dir):
# 随机选择图片名字
file_path = random_a1_dir + file
# 打开img图片
# image = Image.open(file_path)
time1 = time.time()
image = cv.imread(file_path)
# 保存到要上传的图片目录中
# image.save(file_res + file)
cv.imwrite(file_res + file, image)
time2 = time.time()
end_time = time2 - time1
print(f"cv_expand time:{
end_time}")
# 拿到存好的上传图片目录
filename_res = file_res + file
print(filename_res)
# 指定上传图片的名字
name = file
# 上传图片参数,参数是files,以form-data形式提交, 具体见源码
dict_file = {
'file': open(filename_res, 'rb'),
'Content-Disposition': 'form-data',
'fileName': name
}
# 上传图片
with requests.Session() as s:
r = s.post(url, files=dict_file)
print(r.text)
time.sleep(5)
else:
raise FileNotFoundError("Dictionary is not found!")
time.sleep(1)
random_a2 = random.choice(file_a2)
random_a2_dir = str(Path(random_a2)) + os.sep
if os.path.isdir(random_a1_dir):
for file in os.listdir(random_a2_dir):
# 随机选择图片名字
file_path = random_a2_dir + file
time2 = time.time()
# 打开img图片
image = Image.open(file_path)
# 保存到要上传的图片目录中
image.save(file_res + file)
time3 = time.time()
end_time = time3 - time2
print(f"pillow_expand time{
end_time}")
# 拿到存好的上传图片目录
filename_res = file_res + file
print(filename_res)
# 指定上传图片的名字
name = file
# 上传图片参数,参数是files,以form-data形式提交
dict_file = {
'file': open(filename_res, 'rb'),
'Content-Disposition': 'form-data',
'fileName': name
}
# 上传图片
with requests.Session() as s:
r = s.post(url, files=dict_file)
print(r.text)
time.sleep(5)
if __name__ == '__main__':
scheduler = BlockingScheduler()
scheduler.add_job(post_url, 'interval', seconds=3, max_instances=1000)# set max_instances solve concurrent problem
scheduler.start()
post_url()