import paramiko
class SshClass:
"""
ssh连接对象
本对象提供了密钥连接、密码连接、命令执行、关闭连接
"""
ip = ''
port = 22
username = ''
timeout = 0
ssh = None
def __init__(self, ip, username, port=22, timeout=30):
"""
初始化ssh对象
:param ip: str 主机IP
:param username: str 登录用户名
:param port: int ssh端口
:param timeout: int 连接超时
"""
self.ip = ip
self.username = username
self.port = port
self.timeout = timeout
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.ssh = ssh
def conn_by_key(self, key):
"""
密钥连接
:param key: str rsa密钥路径
:return: ssh连接对象
"""
rsa_key = paramiko.RSAKey.from_private_key_file(key)
self.ssh.connect(hostname=self.ip, port=self.port, username=self.username, pkey=rsa_key, timeout=self.timeout)
if self.ssh:
return self.ssh
else:
self.close()
raise Exception("密钥连接失败.")
def conn_by_pwd(self, pwd):
"""
密码连接
:param pwd: str 登录密码
:return: ssh连接对象
"""
self.ssh.connect(hostname=self.ip, port=self.port, username=self.username, password=pwd)
if self.ssh:
return self.ssh
else:
self.close()
raise Exception("密码连接失败.")
def exec_command(self, command):
"""
命令控制
:param command: str 命令
:return: dict 命令执行的返回结果
"""
if command:
stdin, stdout, stderr = self.ssh.exec_command(command)
return {
"stdin": stdin.read(),
"stdout": stdout.read(),
"stderr": stderr.read()
}
else:
self.close()
raise Exception("命令不能为空字符串.")
def close(self):
"""
关闭当前连接
:return:
"""
if self.ssh:
self.ssh.close()
else:
raise Exception("ssh关闭失败,当前对象并没有ssh连接.")
if __name__ == '__main__':
SSH = SshClass("192.168.1.33", "root")
ssh = SSH.conn_by_pwd("123456")
python实现ssh连接远程服务器(密码和密钥)
猜你喜欢
转载自blog.csdn.net/xiaoxin_OK/article/details/119783073
今日推荐
周排行