简介
在工作中用到python来开发些功能模块,使用到一些实用的类和方法,做个记录。
SSH免密登录
import paramiko
def session(host):
try:
port = "22"
username = "root"
private_key = paramiko.RSAKey.from_private_key_file('id_rsa')
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(host,int(port),username,private_key)
print "Login %s is successful" % host
return ssh
except Exception as e:
print e.message
以上是一段用key的免密登录代码,使用密码登录可以自己去百度一下,网上很多。使用这个方法的代码如下:
#!/usr/bin/env python
from mod_ssh import session
def remote_path_check(host, pre_path):
client = session(host)
command = "ls %s" % pre_path
stdin, stdout, stderr = client.exec_command(command)
rs_err = stderr.read().decode('utf-8')
# print(rs_err)
client.close()
print(rs_err)
return rs_err
mysql操作类
import pymysql
import logging
import sys
logger = logging.getLogger("baseSpider")
formatter = logging.Formatter('%(asctime)s\
%(levelname)-8s:%(message)s')
file_handler = logging.FileHandler("baseSpider.log")
file_handler.setFormatter(formatter)
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(formatter)
logger.addHandler(file_handler)
logger.addHandler(console_handler)
logger.setLevel(logging.INFO)
class database:
def __init__(self, host='192.168.2.87', user='test', pwd='123456', db='test'):
self.host = host
self.user = user
self.pwd = pwd
self.db = db
self.conn = None
self.cur = None
def connectdatabase(self):
try:
self.conn = pymysql.connect(self.host, self.user, self.pwd, self.db, charset='utf8')
except:
logger.error("connectDatabase failed")
return False
self.cur = self.conn.cursor()
return True
def close(self):
if self.conn and self.cur:
self.cur.close()
self.conn.close()
return True
def execute(self, sql, params=None):
self.connectdatabase()
try:
if self.conn and self.cur:
self.cur.execute(sql, params)
print ("ID of inserted record is ", self.conn.insert_id())
lastid = self.conn.insert_id()
self.conn.commit()
if lastid:
return lastid
except:
logger.error("execute failed: " + sql)
logger.error("params: " + params)
return False
finally:
self.close()
return True
def fetchall(self, sql, params=None):
self.execute(sql, params)
return self.cur.fetchall()
这个写法还是比较奥妙的,增删改使用exec,查使用fetchall。以下是调用方法:
#!/usr/bin/env python
from mod_db import database
def mail_receiver_list(mail_receiver_id):
db = database()
receiver_id_list = (mail_receiver_id).split(",")
mail_list = []
for i in receiver_id_list:
sql_str = "select user_email from bx_users where user_id = %s" %i
for value in db.fetchall(sql_str):
rs = value[0]
mail_list.append(rs)
mail_str = ",".join(mail_list)
return mail_str
redis-mq类
import redis
class RedisQueue(object):
"""Simple Queue with Redis Backend"""
def __init__(self, name, namespace='queue', **redis_kwargs):
"""The default connection parameters are: host='localhost', port=6379, db=0"""
self.__db = redis.Redis(**redis_kwargs)
self.key = '%s:%s' % (namespace, name)
def qsize(self):
"""Return the approximate size of the queue."""
return self.__db.llen(self.key)
def empty(self):
"""Return True if the queue is empty, False otherwise."""
return self.qsize() == 0
def put(self, item):
"""Put item into the queue."""
self.__db.rpush(self.key, item)
def get(self, block=True, timeout=None):
"""Remove and return an item from the queue.
If optional args block is true and timeout is None (the default), block
if necessary until an item is available."""
if block:
item = self.__db.blpop(self.key, timeout=timeout)
else:
item = self.__db.lpop(self.key)
if item:
item = item[1]
return item
def get_nowait(self):
"""Equivalent to get(False)."""
return self.get(False)
使用redis-mq队列
from RedisQueue import RedisQueue
def switch_case(value):
task_queue = ("{'task_queue_%s':'rq%s'}")%(value,value)
print (task_queue)
return task_queue
def check_tag()
'''
此处省略若干行
key通过for来取值,这里key是1和2,rq = rq1 和 rq2,目的就是建立两个消息对列
'''
task_queue = eval(switch_case(key))
# print task_queue
for queue_key in task_queue:
print(queue_key, task_queue[queue_key])
rq = task_queue[queue_key]
redis_queue = RedisQueue(rq, host='192.168.2.87')
print('redis_queue', redis_queue)
print('host', host)
#task_id = "%s:%s:%s" % (Task_Name, Template_Id, file_dict)
task_mq = "{'file_dict':'%s','host':'%s','template_id':'%s','task_name':'%s','task_id':'%s'}" % (
file_dict.replace("\n",""),host,Template_Id,Task_Name,task_id)
redis_queue.put(task_mq)
content = "Task %s start." %Task_Name
email_recoard(mail_receiver,content,task_id)
从队列取数据
def long_time_task(rq_id):
rq = 'rq'+str(rq_id)
print ('rq', rq)
redis_queue = RedisQueue(rq, host='192.168.2.87')
while 1:
if redis_queue.empty():
print ("queue is empty!")
time.sleep(5)
else:
result = eval(redis_queue.get())
print('redis_queue', result)
for key,value in result.items():
print ("result key and value")
print (key,value)
host = result.get('host')
file_dict = result.get('file_dict')
Template_Id = result.get('template_id')
Task_Name = result.get('task_name')
task_id = result.get('task_id')
多进程并发方法
from mod_db import database
from multiprocessing import Pool
def data_process():
#time.sleep(2)
p = Pool(4)
db = database()
sql_str = "select rq_id from bx_rq_info;"
for value in db.fetchall(sql_str):
xxx_id = value[0]
#task_queue = eval(switch_case(rq_id))
print ('rq_id',rq_id)
p.apply_async(long_time_task, args=(rq_id,))
#long_time_task(task_queue)
print('waiting for all subprocesses done...')
p.close()
p.join()
print('all subprocesses done')
这里的rq_id是1和2,那么就是并发跑两个任务。
数值单位自动转化方法
def space_calculate(data):
count = 0
for i in data:
size = i['size']
count = count + size
print ('count,size', count, size)
try:
bytes = float(count)
#kb = bytes / 1024
G = bytes/1024/1024/1024
except:
return "Error"
return "%.2f" % (G)
'''
if kb >= 1024:
M = kb / 1024
if M >= 1024:
G = M / 1024
#return "%fG" % (G)
return int(G)
else:
#return "%fM" % (M)
return int(M)
else:
#return "%fkb" % (kb)
return int(kb)
'''
我这里是强制转换为G字节,打开注释部分,就是按K M G自动转换。