"""
使用Python检查远程SSH服务器上的文件是否存在
http://cn.voidcc.com/question/p-bxssuxqx-bag.html
如何使用python检查远程服务器上是否存在文件?
https://www.cnpython.com/qa/192046
Paramiko
https://www.cnblogs.com/xiao-apple36/p/9144092.html
python - paramiko怎么判断远程目录是否存在?
https://www.it1352.com/727914.html
"""
import os
import paramiko
from loguru import logger
class SSHConnection(object):
def __init__(self, host_dict):
self.host = host_dict['host']
self.port = host_dict['port']
self.username = host_dict['username']
self.pwd = host_dict['pwd']
# 获取Transport实例
self.__transport = paramiko.Transport(self.host, self.port)
# 连接SSH服务端,使用password
self.__transport.connect(username=self.username, password=self.pwd)
# 获取SFTP实例
self.sftp = paramiko.SFTPClient.from_transport(self.__transport)
# 关闭通道
def close(self):
self.sftp.close()
self.__transport.close()
# 上传文件到远程主机
def upload(self, local_path, remote_path):
self.sftp.put(local_path, remote_path)
# 从远程主机下载文件到本地
def download(self, local_path, remote_path):
self.sftp.get(remote_path, local_path)
# 在远程主机上创建目录
def mkdir(self, target_path, mode=777):
self.sftp.mkdir(target_path, mode)
# 删除远程主机上的目录
def rmdir(self, target_path):
self.sftp.rmdir(target_path)
# 查看目录下文件以及子目录(如果需要更加细粒度的文件信息建议使用listdir_attr)
def listdir(self, target_path):
return self.sftp.listdir(target_path)
# 删除文件
def remove(self, target_path):
self.sftp.remove(target_path)
# 查看目录下文件以及子目录的详细信息(包含内容和参考os.stat返回一个FSTPAttributes对象,对象的具体属性请用__dict__查看)
def listdirattr(self, target_path):
try:
list = self.sftp.listdir_attr(target_path)
except BaseException as e:
logger.error(e)
return list
# 获取文件详情
def stat(self, remote_path):
try:
stat_info = self.sftp.stat(remote_path)
return True, stat_info
except:
return False, '文件不存在'
# SSHClient输入命令远程操作主机
def cmd(self, command):
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy)
ssh._transport = self.__transport
stdin, stdout, stderr = ssh.exec_command(command)
result = stdout.read()
# logger.info(result)
return result
# 递归遍历远程目录下的所有文件
def gci(ssh, remote_path, pathlist):
for f in ssh.listdirattr(remote_path):
if str(f.longname).split(" ")[0].startswith('d'):
gci(os.path.join(remote_path, f.filename), pathlist)
else:
pathlist.append(os.path.join(remote_path, f.filename))
return pathlist
if __name__ == "__main__":
pass
host_dict = {
'host': '0.5.1.0',
'port': 22,
'username': 'ubuntuname',
'pwd': 'xxxxxx'
}
# 初始化ssh
ssh = SSHConnection(host_dict)
logger.info(f"ssh = {ssh}")
remote_path = '/opt/service/ecords.py'
# 获取远程文件的大小
res = ssh.stat(remote_path)
logger.info(f"res={res}")
# 检查文件是否存在
flag, info = ssh.stat(remote_path)
logger.info(f"flag = {flag} | info = {info}")
if flag:
logger.info(f"info = {info.st_size}")
# 创建远程文件夹
# remote_path_new = '/opt/service//test_remote_make'
# ssh.mkdir(remote_path_new, mode=777) # 创建的文件夹权限有问题 TODO
# 执行远程命令
ret = ssh.cmd('df -h')
logger.info(f"ret = {ret}")
ret = ssh.cmd('python /opt/service/test.py')
logger.info(f"ret = {ret}")
ssh.close()
Python之Paramiko方法封装
猜你喜欢
转载自blog.csdn.net/xuezhangjun0121/article/details/121905116
今日推荐
周排行