# -*- coding: utf-8 -*-
# author: qinshixu
import requests
import logging
class Dingding(object):
"""
dingding controller include get_user_info send_message
"""
ding_api = "https://oapi.dingtalk.com/"
def __init__(self, copy_id=None, copy_secret=None, agent_id=None):
self.logger = logging.getLogger("dingding")
self.copy_id = copy_id
self.copy_secret = copy_secret
self.agent_id = agent_id
self._init_token()
def _init_token(self):
get_token_url = "%s%s?corpid=%s&corpsecret=%s" % (
self.ding_api, "gettoken", self.copy_id, self.copy_secret)
try:
res = requests.get(get_token_url)
if res.status_code != 200:
self.access_token = None
self.logger.error(
"Get dingding access_token error: msg:%s" % res.text)
elif res.json()["errcode"] != 0:
self.access_token = None
self.logger.error(
"Get dingding access_token error: msg:%s" % res.json()["errmsg"])
else:
self.access_token = res.json()["access_token"]
except Exception as e:
self.logger.error(str(e))
def department_list(self):
get_department_url = "%sdepartment/list?access_token=%s" % (self.ding_api, self.access_token)
try:
res = requests.get(get_department_url)
if res.status_code != 200:
self.logger.error(
"Get dingding department error: msg:%s" % res.text)
elif res.json()["errcode"] != 0:
self.logger.error("Get dingding department error: msg:%s" % res.json()["errmsg"])
for i in res.json()["department"]:
parentid = i["parentid"] if not i["createDeptGroup"] else None
department = {
"name": i["name"],
"id": i["id"],
"parentid": parentid
}
yield department
except Exception as e:
self.logger.error(str(e))
return None
def get_department_users(self, dep_id=None):
get_users_url = "%s%s?access_token=%s&department_id=%s&offset=0&size=100" % (
self.ding_api, "user/listbypage", self.access_token, dep_id)
res = requests.get(get_users_url)
if res.status_code != 200:
self.logger.error("Get depart users error: msg:%s" % res.text)
return None
elif res.json()["errcode"] != 0:
self.logger.error("Get depart users error: msg:%s" %
res.json()["errmsg"])
return None
else:
dep_users = res.json()["userlist"]
return dep_users
def get_all_users(self):
for i in self.department_list():
for j in self.get_department_users(i["id"]):
userinfo = {}
userinfo["mobile"] = j.get("mobile")
userinfo["position"] = j.get("position")
userinfo["userid"] = j.get("userid")
userinfo["name"] = j.get("name")
userinfo["isLeader"] = j.get("isLeader")
userinfo["openId"] = j.get("openId")
userinfo["department"] = sorted(j.get("department"))[-1]
userinfo["email"] = j.get("orgEmail")
if j.get("orgEmail") is None:
userinfo["email"] = j.get("email")
if not userinfo["email"]:
print("%s no config email" % j.get("name"))
continue
yield userinfo
def get_user_info(self, username):
for userinfo in self.get_all_users():
if userinfo.email.split("@")[0] == username:
return userinfo
return None
# example
# if __name__ == "__main__":
# ding = Dingding("dingk*************", "****", "****")
# for i in ding.get_all_users():
# print(i)
同步钉钉通讯录
猜你喜欢
转载自blog.51cto.com/12113362/2540833
今日推荐
周排行