def retry_decorate(retry_count = 10, sleep_seconds = 10):
'''
让函数可以拥有重试的机会,因为计划任务启动后,需要一段时间才有可能再次触发
'''
'''
example:
@retry_decorate
def add(a, b):
return a+b
if __name__ == '__main__':
# this is ok, and just run once.
add(1, 2)
# this will raises exception, so it will run retry_count times.
add([], {}) '''
import time, traceback
def new_func(old_func):
def func(*args, **kwargs):
result = None
retry_count_tmp = retry_count
while retry_count_tmp > 0:
try:
result = old_func(*args, **kwargs)
return result
except Exception, msg:
print 'failed : %s'%msg
traceback.print_exc()
print 'retry getting after %ss '%sleep_seconds
time.sleep(sleep_seconds)
retry_count_tmp -= 1
print 'all choices for retrying have consumed. Byebye!'
return func
return new_func
def save_as(filename,
datatype = 'json',
dir_path = os.path.dirname(__file__),
encoding = 'utf-8'):
'''
保存content至文件,直接写入
'''
'''
example:
@save_as('test', datatype = 'xml')
def test_save():
return 'test!!'
'''
dir_path = os.path.join(dir_path, 'data')
import simplejson
def new_func(old_func):
def func(*args, **kwargs):
print 'getting %s'%filename
content = old_func(*args, **kwargs)
if not content:
return print 'got %s'%filename
pardir = dir_path
if not os.path.exists(pardir):
os.mkdir(pardir) t_content = None
if datatype == 'json':
t_content = simplejson.dumps(content)
elif datatype in ['plain', 'xml']:
t_content = content
elif datatype == 'csv':
t_content = content if t_content:
import codecs
file_path = os.path.join(dir_path, '%s.%s'% (filename, datatype))
print 'saving file to disk: %s'%file_path
with codecs.open(file_path, 'wb', encoding) as file:
if datatype == 'csv': # for csv format file
import csv
writer = csv.writer(file)
writer.writerows(t_content)
else:
file.write(t_content)
print 'saved file'
return file_path
return None
return func
return new_funcclass TaskManager:
'''
根据命令行输入的参数激活具体的计划任务
'''
'''
example:
taskmanager = TaskManager() @taskmanager.task_func('do_test'):
def hellworld(h, w):
print h, w
return 'hello' if __name__ == '__main__':
import sys
target = sys.argv[1]
arg1 = sys.argv[2]
arg2 = sys.argv[3]
print taskmanager(target, arg1, arg2) # to return and then print 'hello'
'''
def __init__(self):
'''
功能跟字符串的映射
'''
self.func_list = {} def task_func(self, key = None):
'''
@key 该计划任务所关联的字符串
'''
def new_func(old_func, key = key):
if not key:
key = old_func.__name__
self.func_list[key] = old_func # key-func
return old_func # do not interrupt
return new_func def __call__(self, arg_from_input, *args, **kwargs):
'''
通过命令行输入激活计划任务
@arg_from_input 从命令行输入的参数
'''
func = self.func_list.get(arg_from_input)
if func:
result = func(*args, **kwargs)
return resultperformace_log = CommLogger().getLogger(
'%s_performace_detector' %datetime.datetime.now().strftime('%Y-%m-%d'),
bPrint = False)def performace_detector_for_time(key = None, id = None, _log = None):
'''
跟踪业务函数的时间消耗。
@key: 生成log记录标识的回调方法
@id: 被跟踪的应用id 由于在线上环境下的web应用是多线程并发的,而某一个函数是希望可以跟另一个函数连贯跟踪的,
所以不能随便地修饰一个函数去跟踪,而需要做一个log_id去跟踪,
标示这些函数都是在同一次请求上的触发
'''
def func_decorate(old_func, key = key, id = id, _log = _log):
def new_func(*args, **kwargs):
# 生成日志标识log_id
if key and callable(key):
log_id = key() # 得到应用标识,用于唯一标识业务过程,如md5
if isinstance(id, int) and 0 <= id <= len(args):
app_id = args[id]
elif isinstance(id, basestring) and id in kwargs:
app_id = kwargs.get(id)
else:
app_id = None start_time = datetime.datetime.now()
# 真正的业务函数
result = old_func(*args, **kwargs)
end_time = datetime.datetime.now() consumed_time = end_time - start_time # 迭代调用栈,找出日志标识号
if 'log_id' not in locals():
for f_info in inspect.stack():
try:
f_list = inspect.getargvalues(f_info[0])
if 'log_id' in f_list[3]:
log_id = f_list[3].get('log_id')
break
except:
pass
else:
log_id = None if _log:
_log.info(u'start_time:%s, end_time:%s, consumed_time:%s, '
u'log_id:%s, app_id:%s, from func: %s' %(
start_time.strftime('%Y-%m-%d %H:%M:%S'),
end_time.strftime('%Y-%m-%d %H:%M:%S'),
consumed_time,
log_id, app_id, old_func.__name__))
return result
return new_func
return func_decorate