有一个注意点,就是代码放在服务器上时,会出现频繁请求数据时,eloqua会重定向url([Errno 104] Connection reset by peer)),这里处理的方法是,添加循环,重新请求,当然请求的次数限制在10次内,如果十次都没有通过,则停止请求(在windows系统上没发现这个问题)
import json
import base64
import requests
import datetime
import time
import psycopg2
import os
import configparser
# 用来操作数据库的类
class GPCommand(object):
# 类的初始化
def __init__(self):
self.hostname = 'xxxxxx'
self.username = 'xxxxxx'
self.password = 'xxxxxxxxxx'
self.database = 'xxxxx'
def connectGp(self):
try:
#链接数据库
#读取配置利用connect链接数据库
self.connect = psycopg2.connect( host=self.hostname, user=self.username, password=self.password, dbname=self.database )
#创建一个新的cursor
self.cursor = self.connect.cursor()
print("connect gp successful.")
return ('con_successful')
except psycopg2.Error:
error = 'Failed to setup Postgres environment.\n{0}'.format(sys.exc_info())
print('connect gp error.'+'\n')
return 'con_error'+ error
#关闭数据库
def closeMysql(self):
self.cursor.close()
self.connect.close()
print("database closed")
def insert_data(self,table_name,dict):
try:
dicts_values = []
# print(dict)
dict_key = list(dict.keys())
table_key = ",".join(dict_key)
ROWstr = '' # 行字段
for key in dict.keys():
# ROWstr = (ROWstr + "'%s'" + ',') % (escape_character(dict[key]))
ROWstr = (ROWstr + "'" + escape_character(dict[key]) + "'" + ',')
# print(ROWstr)
insert_sql = "insert into %s (%s) values (%s)"\
%(table_name,table_key,ROWstr[:-1])
# print(insert_sql)
self.cursor.execute(insert_sql)
self.connect.commit()
except Exception as e:
print(e)
os._exit(0)
def delete_data(self,table_name):
try:
delete_sql = "delete from %s where activitydate between '%s' and '%s'"%(table_name,befyesterday,today)
self.cursor.execute(delete_sql)
self.connect.commit()
except Exception as e:
print(e)
os._exit(0)
#去除/转义获取的数据中存在的单引号等特殊字符
def escape_character(string):
script = ''
if type(string) == type([]):
script_tmp = ",".join(string)
elif type(string) == type({}):
script_tmp = str(string)
else:
script_tmp = string
if "'" in script_tmp:
nops = []
new_loop = []
itemplist = list(script_tmp)
for i in range(len(itemplist)):
if itemplist[i] == "'":
nops.append(i)
for item in nops:
new_loop.append(item + nops.index(item))
for i in new_loop:
itemplist.insert(i, "'")
script = "".join(itemplist)
else:
script = script_tmp
return script
#读取配置文件config_data
def read_config():
with open("config_data.ini", "r") as f:
config_result = f.read() # 一次性读全部成一个字符串
return config_result
# 获取时间
today = datetime.date.today()
# yesterday = today - datetime.timedelta(days=1)
befyesterday = today - datetime.timedelta(days=2)
today = today.strftime("%Y-%m-%d")
print(today)
befyesterday = befyesterday.strftime("%Y-%m-%d")
print(befyesterday)
def main():
config = []
config_result = read_config()
config_result = config_result.split('||@@||')
# 将时间打上特殊标记,利用replace替换(因为不同的主题的时间增量字段不同,故需要设置)
for item in config_result:
reitem = item.replace('xxxx-xx-xx1',befyesterday)
reitem = reitem.replace('xxxx-xx-xx2',today)
config.append(eval(reitem))
for item in config:
config_data = item
# print(item)
gpCommand = GPCommand()
gpCommand.connectGp()
gpCommand.delete_data(config_data['table_name'])
url = 'https://login.eloqua.com/id'
str_encrypt= 'GenScript\manzu.shu:Shusu891002'
base64_encrypt = str(base64.b64encode(str_encrypt.encode('utf-8')),'utf-8')
Authorization = "Basic %s" % base64_encrypt
headers = { 'Authorization' : Authorization }
#添加循环,重新请求,当然请求的次数限制在10次内,如果十次都没有通过,则停止请求(在windows系统上没发现这个问题)
for t in range(10):
try:
r = requests.get(url,headers=headers)
base_url = 'https://secure.p04.eloqua.com/api/bulk/2.0'+ config_data['url']
# print(base_url)
baseheaders = { 'Authorization' : Authorization,'Accept':'application/json','Content-Type':'application/json'}
# baseheaders = {'Accept':'application/json'}
body = config_data['body']
print(config_data['table_name'] + "本地时间为1 :", time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
time.sleep(0.1)
br = requests.post(base_url,headers=baseheaders,data=json.dumps(body))
res = br.content.decode()
# print(res)
dict_json = json.loads(res)
#获取到规则的uri
syncs_uri = dict_json['uri']
# print('Define--uri: '+syncs_uri)
#创建syncs规则
syncs_post = 'https://secure.p04.eloqua.com/api/bulk/2.0/syncs'
syncs_body = {
"syncedInstanceUri": syncs_uri
}
time.sleep(0.1)
syncs_res = requests.post(syncs_post,headers=baseheaders,data=json.dumps(syncs_body))
time.sleep(0.2)
syncs_res = syncs_res.content.decode()
syncs_dict_json = json.loads(syncs_res)
#获取到syncs的uri
getdata_uri = syncs_dict_json['uri']
# print('syncs--uri : '+ getdata_uri)
retrieve_data = 'https://secure.p04.eloqua.com/api/bulk/2.0' + getdata_uri + '/data'
# print('最终请求的url: '+ retrieve_data)
retrieve = requests.get(retrieve_data,headers=baseheaders)
time.sleep(5)
# print(retrieve.content.decode())
results = json.loads(retrieve.content.decode())
# print(results)
totalresults = results['totalResults']
print(totalresults)
if totalresults != 0:
results_items = []
results_items.append(results['items'])
# print(totalresults)
if totalresults > 1000:
for i in range(1000,totalresults,1000):
get_url = 'https://secure.p04.eloqua.com/api/bulk/2.0' + getdata_uri + '/data?offset='+ str(i)
# print(get_url)
time.sleep(0.2)
get_retrieve = requests.get(get_url, headers=baseheaders)
time.sleep(5)
get_results = json.loads(get_retrieve.content.decode())
results_items.append(get_results['items'])
result_data_temp = []
for item in results_items:
for it in item:
result_data_temp.append(it)
# print(result_data_temp)
print( "本地时间为2 :", time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
for item in result_data_temp:
gpCommand.insert_data(config_data['table_name'], item)
else:
pass
gpCommand.closeMysql()
except Exception as e:
if t >= 9:
print(config_data['table_name'] + '10次重试都没有通过连接')
else:
time.sleep(0.8)
print(e)
else:
time.sleep(1)
break
main()
通过api 获取eloqua的数据
猜你喜欢
转载自blog.csdn.net/qq_22994783/article/details/84098985
今日推荐
周排行