用python做的windows和linx文件夹同步。解决自动同步、加快传输大量小文件的速度、更丰富的文件上传过滤设置。

现在工具不好用,用的pycharm自动同步,但对于git拉下来的新文件不能自动上传到linux,只有自己编辑过或者手动ctrl + s的文件才会自动同步。导致为了不遗漏文件,经常需要全量上传,速度非常慢。

由于经常需要在windows的pycharm上直接使用linux解释器,要快速测试,频繁在本机和linux用git push pull不方便,测试环境是 用的git,但开发时候还是直接映射文件夹同步比使用git更方便。

采用了连接池的方式,比单线程单linux链接,一个一个的上传体积很小的碎片时候,文件上传速度提高了数十倍。

"""
自动同步文件夹到linux机器
这个更犀利,采用了连接池 加线程池,上传大量碎片文件的速度大幅提升。
"""
import json
import os
import queue
import re
import time
from collections import OrderedDict
from pathlib import Path
from typing import Union
import paramiko
from paramiko import SSHException

from app.utils_ydf import decorators, time_util, LoggerMixinDefaultWithFileHandler, nb_print, BoundedThreadPoolExecutor


@decorators.flyweight
class Uploader(LoggerMixinDefaultWithFileHandler):
    def __init__(self, host, port, username, password):
        self.logger.warning('初始化文件上传类')
        self._host = host
        self._port = port
        self._username = username
        self._password = password
        self._queue_sftp_free = queue.Queue(100)
        self._queue_connnect_busy = queue.Queue(100)
        self._queue_ssh_free = queue.Queue()
        self.build_connect()

    # noinspection PyAttributeOutsideInit
    def set_other_attr(self, local_dir, remote_dir):
        self._local_dir = str(local_dir).replace('\\', '/')
        self._remote_dir = remote_dir

    @decorators.keep_circulating(5, exit_if_function_run_sucsess=True)
    def _build_sftp(self):
        self.logger.warning('建立linux sftp连接中。。。')
        t_start = time.time()
        # noinspection PyTypeChecker
        t = paramiko.Transport((self._host, self._port))
        t.connect(username=self._username, password=self._password)
        sftp = paramiko.SFTPClient.from_transport(t)
        self._queue_sftp_free.put(sftp)
        self.logger.warning(f'建立linux sftp连接耗时 {round(time.time() - t_start, 2)}')

    @decorators.keep_circulating(5, exit_if_function_run_sucsess=True)
    def _bulid_ssh(self):
        self.logger.warning('建立linux ssh连接中。。。。')
        t_start = time.time()
        ssh = paramiko.SSHClient()
        ssh.load_system_host_keys()
        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        ssh.connect(self._host, port=self._port, username=self._username, password=self._password, compress=True)
        self._queue_ssh_free.put(ssh)
        self.logger.warning(f'建立linux ssh连接耗时 {round(time.time() - t_start, 2)}')

    def build_connect(self):
        # decorators.tomorrow_threads(10)(self._build_sftp)()
        # decorators.tomorrow_threads(10)(self.__class__._bulid_ssh)(self)
        connect_num = 50
        executor = BoundedThreadPoolExecutor(connect_num * 2 + 10)
        for _ in range(connect_num):
            time.sleep(0.5)
            executor.submit(self._build_sftp)
            executor.submit(self._bulid_ssh)

    def _do_mkdir_operation(self, file_remote):
        cmd = 'mkdir -p ' + str(Path(file_remote).parent).replace('\\', '/')
        self.logger.info(cmd)
        ssh = self._queue_ssh_free.get()
        try:
            tdin, stdout, stderr = ssh.exec_command(cmd)
        except SSHException:
            self._bulid_ssh()
        except Exception as e:
            self.logger.exception(e)
        else:
            stderr_bytes = stderr.read()
            # self.logger.debug(stderr_bytes)
            if stderr_bytes != b'':
                self.logger.debug(stderr_bytes)
            self._queue_ssh_free.put(ssh)

    @decorators.tomorrow_threads(50)
    def upload(self, file: str):
        self.logger.debug(f'sftp空闲链接数量  {self._queue_sftp_free.qsize()},  ssh空闲链接数量 {self._queue_ssh_free.qsize()}')
        # file = file.replace('\\', '/')
        pattern_str = self._local_dir
        file_remote = file.replace(pattern_str, self._remote_dir)
        # self.logger.debug((file, file_remote))

        for _ in range(10):
            sftp = self._queue_sftp_free.get()
            try:
                time_start = time.time()
                sftp.put(file, file_remote)
                self.logger.debug(f'{file_remote} 上传成功,大小是 {round(os.path.getsize(file) / 1024)} kb,上传时间是 {round(time.time() - time_start, 2)}')
                self._queue_sftp_free.put(sftp)
                break
            except FileNotFoundError:
                self._do_mkdir_operation(file_remote)
                self._queue_sftp_free.put(sftp)
            except (OSError, SSHException) as e:
                self.logger.exception(e)
                self._build_sftp()  # OSError: Socket is closed


class LinuxSynchronizer(LoggerMixinDefaultWithFileHandler):
    def __init__(self, host, port, username, password, local_dir, remote_dir, file_suffix_tuple_exluded=('.pyc', '.log', '.gz'), file_volume_limit=1000 * 1000,
                 path_pattern_exluded_tuple=('/.git/', '/.idea/'), only_upload_within_the_last_modify_time='7 * 24 * 60 * 60', cycle_interval=10, ):
        """

        :param host:
        :param port:
        :param username:
        :param password:
        :param local_dir:
        :param remote_dir:
        :param file_suffix_tuple_exluded: 排除以这些结尾的文件。
        :param file_volume_limit: 最大文件容量能够限制,如果超过此大小,则该文件不上传
        :param path_pattern_exluded_tuple: 更强大的文件排除功能,比光排除以什么后缀结尾更强大灵活
        :param only_upload_within_the_last_modify_time: 只上传离当前时间最晚修改时间以后的文件
        :param cycle_interval: 每隔多少秒扫描一次需要上传的文件。
        """
        self._host = host
        self._port = port
        self._username = username
        self._password = password
        self._local_dir = str(local_dir).replace('\\', '/')
        self._remote_dir = remote_dir
        self._file_suffix_tuple_exluded = file_suffix_tuple_exluded
        self._path_pattern_exluded_tuple = path_pattern_exluded_tuple
        self._only_upload_within_the_last_modify_time = self._compute_result(only_upload_within_the_last_modify_time)
        self._cycle_interval = cycle_interval
        self._file_volume_limit = self._compute_result(file_volume_limit)
        self.filename__filesize_map = dict()
        self.filename__st_mtime_map = dict()
        self.uploader = Uploader(host, port, username, password)
        self.uploader.set_other_attr(local_dir, remote_dir)

    @staticmethod
    def _compute_result(sth: Union[str, int]):
        return sth if isinstance(sth, int) else eval(sth)

    def _judge_need_filter_a_file(self, filename: str):
        ext = filename.split('.')[-1]
        if '.' + ext in self._file_suffix_tuple_exluded:
            return True
        for path_pattern_exluded in self._path_pattern_exluded_tuple:
            if re.search(path_pattern_exluded, filename):
                return True
        return False

    def find_all_files_meet_the_conditions(self):
        t_start = time.time()
        total_volume = 0
        self.filename__filesize_map.clear()
        for parent, dirnames, filenames in os.walk(self._local_dir):
            for filename in filenames:
                file_full_name = os.path.join(parent, filename).replace('\\', '/')
                if not self._judge_need_filter_a_file(file_full_name):
                    # self.logger.debug(os.stat(file_full_name).st_mtime)
                    file_st_mtime = os.stat(file_full_name).st_mtime
                    volume = os.path.getsize(file_full_name)
                    if time.time() - file_st_mtime < self._only_upload_within_the_last_modify_time and volume < self._file_volume_limit and (file_full_name
                                                                                                                                             not in self.filename__st_mtime_map or time.time() - file_st_mtime < 10 * 60):
                        self.filename__filesize_map[file_full_name] = {'volume': volume, 'last_modify_time': time_util.DatetimeConverter(file_st_mtime).datetime_str}
                        self.filename__st_mtime_map[file_full_name] = file_st_mtime
                        total_volume += volume
        filename__filesize_map_ordered_by_lsat_modify_time = OrderedDict()
        for k, v in sorted(self.filename__filesize_map.items(), key=lambda item: item[1]['last_modify_time']):
            filename__filesize_map_ordered_by_lsat_modify_time[k] = v
        self.filename__filesize_map = filename__filesize_map_ordered_by_lsat_modify_time
        self.logger.warning(f'需要上传的所有文件数量是 {len(self.filename__filesize_map)} ,总大小是 {round(total_volume / 1024, 2)} kb ,'
                            f'查找文件耗时 {round(time.time() - t_start, 2)} 秒,文件分别是 {json.dumps(self.filename__filesize_map, indent=4)}')

    @decorators.tomorrow_threads(10)
    def start_upload_files(self):
        decorators.keep_circulating(self._cycle_interval)(self._start_upload_files)()

    def _start_upload_files(self):
        with decorators.TimerContextManager():
            self.find_all_files_meet_the_conditions()
            for file in self.filename__filesize_map:
                self.uploader.upload(file)
            self.logger.warn('完成')


# noinspection PyPep8
if __name__ == '__main__':
    """
    配置里面的内容格式如下,支持同步多个文件夹映射。
    [
      {
        "host": "112.xx.xx.16",
        "port": 10345,
        "username": "roxx",
        "password": "@0^Lc97MewIxxxx",
        "local_dir": "D:\\Users\\ydf\\Desktop\\oschina\\coding\\hotel_fares",
        "remote_dir": "/home/ydf/hotelf15",
        "file_suffix_tuple_exluded": [
          ".pyc",
          ".log",
          ".gz"
        ],
        "path_pattern_exluded_tuple": [
          "/.git/",
          "/.idea/",
          "cnbooking_cn_all.json"
        ],
        "only_upload_within_the_last_modify_time": "365 * 24 * 3600",
        "file_volume_limit": "2 * 1000 * 1000",
        "cycle_interval": 10
      }
    ]
    """
    for config_item in json.load(Path('/windows_to_linux_syn_config.json').open()):
        nb_print(json.dumps(config_item))
        LinuxSynchronizer(**config_item).start_upload_files()

    # sc create PythonApp6 binPath= "D:\Users\ydf\Desktop\oschina\coding\hotel_fares\dist\windows_to_linux_syn2\windows_to_linux_syn2.exe"
    # pyinstaller --distpath=D:\Users\ydf\Desktop\oschina\pyinstallerdir --workpath=D:\Users\ydf\Desktop\oschina\pyinstallerdir --specpath=D:\Users\ydf\Desktop\oschina\specify_pyinstaller --icon="D:\Users\ydf\Desktop\oschina\coding\hotel_fares\app\utils_ydf\windows_to_linux_syn.ico" D:\Users\ydf\Desktop\oschina\coding\hotel_fares\app\utils_ydf\windows_to_linux_syn3.py
    # 可以使用pyinstaller打包这个文件。先添加PYTHONPATH变量,在另外的文件夹执行这个命令。
    # pyinstaller --icon="D:\Users\ydf\Desktop\oschina\coding\hotel_fares\app\utils_ydf\windows_to_linux_syn.ico" D:\Users\ydf\Desktop\oschina\coding\hotel_fares\app\utils_ydf\windows_to_linux_syn3.py

第一次运行是对指定最晚修改间之内的文件进行全量上传,之后是每隔10秒(动态配置)检查一次,将最近 10分钟之内变化的文件,上传到linux。

猜你喜欢

转载自www.cnblogs.com/ydf0509/p/10924130.html