时间函数
from datetime import date, timedelta
def last_n_days (current_date=date.today() , n=0 ) :
if n in (0 ,1 ):
return str(current_date - timedelta(days=n))
return [str(current_date - timedelta(x)) for x in range(n, 0 , -1 )]
生成shell命令
import subprocess
file_list = last_n_days(n=7 )
mapper = "mapper.py"
reducer = "reducer.py"
input_files = " " .join(['-input /dm/qq/userinfo_qq/{date}-*/qq_guid.txt' .format(date=each_date) for each_date in file_list])
output = '/dm/qq/merge'
mr_cmd = """hadoop jar /opt/cloudera/parcels/CDH-4.2.0-1.cdh4.2.0.p0.10/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.0.0-mr1-cdh4.2.0.jar \
-output {output} \
-mapper 'python {mapper}' \
-reducer 'python {reducer}' \
-file {mapper} \
-file {reducer} \
{input_files}""" .format(output=output, mapper=mapper,
reducer=reducer, input_files=input_files)
if __name__ = "__main__" :
print mr_cmd
subprocess.call(mr_cmd)