工程目录结构
jenkinsrole.py
import requests import json class JenkinsRole: def __init__(self, host, username, port=8080, password=None, token=None, ssl=False): """ password和token使用其中一个即可 :param host: Jenkins主机 :param username: 管理员用户 :param port: Jenkins端口 :param password: 管理员密码 :param token: 管理员的Token :param ssl: Jenkins地址是否是https协议 """ self.host = host self.username = username self.port = port self.password = password self.token = token self.ssl = ssl @property def pwd_or_token(self): if self.password and self.token: raise ConnectionError("password与token填写一个即可") return self.password if self.password else self.token @property def proto(self): return 'https' if self.ssl else 'http' def get_crumb(self) -> dict: res = requests.get( f'{self.proto}://{self.username}:{self.pwd_or_token}@{self.host}:{self.port}/crumbIssuer/api/xml?' f'xpath=concat(//crumbRequestField,":",//crumb)') return {res.text.split(':')[0]: res.text.split(':')[1]} def add_role(self, role_type, role_name, permissions: str, role_pattern=None, overwrite=True): """ 添加角色 如果添加的权限不属于对应的角色类型,两种情况: 1、添加的权限都不属于对应的角色类型,则会添加一个空权限的角色 比如向projectRoles中添加视图权限hudson.model.View.Create命名为p1, 则在projectRoles列表中依然会添加p1角色,但是该角色没有任何权限 2、添加的权限部分不属于对应的角色类型,则会将属于该角色类型的权限添加上 :param role_type: 只能是globalRoles或projectRoles或slaveRoles :param role_name: 角色名称 :param permissions: 角色ID,多个角色ID使用 , 号隔开,比如:'hudson.model.Hudson.Read,hudson.model.Computer.Build' :param role_pattern: 角色模式,支持正则表达式,当添加的是项目角色时需要指定 :param overwrite: 如果新增的权限已经存在是否覆盖,如果选择不覆盖,即使权限已经存在,也不会返回任何报错 :return: """ if role_type not in ('globalRoles', 'projectRoles', 'slaveRoles'): raise AttributeError("role_type必须是'globalRoles', 'projectRoles', 'slaveRoles' 其中一个") if role_type in ('projectRoles', 'slaveRoles') and not role_pattern: raise AttributeError("如果增加项目权限或节点权限,必须指定role_pattern,否则将匹配 .* ") role_data = { "type": role_type, "roleName": role_name, "permissionIds": permissions, "overwrite": overwrite, "pattern": role_pattern } headers = self.get_crumb() res = requests.post(f'{self.proto}://{self.host}:{self.port}/role-strategy/strategy/addRole', data=role_data, headers=headers, auth=(self.username, self.pwd_or_token)) return res.status_code def get_role(self, role_type, role_name): """ 获取指定角色的详细,返回结果示例: {'permissionIds': {'hudson.model.Computer.Build': True}, 'sids': ['admin']} :param role_type: :param role_name: :return: """ if role_type not in ('globalRoles', 'projectRoles', 'slaveRoles'): raise AttributeError("role_type必须是'globalRoles', 'projectRoles', 'slaveRoles' 其中一个") params = { "type": role_type, "roleName": role_name } res = requests.get(f'{self.proto}://{self.host}:{self.port}/role-strategy/strategy/getRole', params=params, auth=(self.username, self.pwd_or_token)) return res.json() def remove_roles(self, role_type, role_names: str): """ 删除权限 :param role_type: :param role_names: 多个角色用 , 号隔开 :return: """ if role_type not in ('globalRoles', 'projectRoles', 'slaveRoles'): raise AttributeError("role_type必须是'globalRoles', 'projectRoles', 'slaveRoles' 其中一个") data = { 'type': role_type, 'roleNames': role_names } headers = self.get_crumb() res = requests.post(f'{self.proto}://{self.host}:{self.port}/role-strategy/strategy/removeRoles', data=data, headers=headers, auth=(self.username, self.pwd_or_token)) return res.status_code def assign_role(self, role_type, role_name, sid): """ 将某个角色赋予某个用户 注意:如果赋予用户某个不存在的权限也不会报错 :param role_type: :param role_name: (单个角色) :param sid: 用户名称(单个用户) :return: """ if role_type not in ('globalRoles', 'projectRoles', 'slaveRoles'): raise AttributeError("role_type必须是'globalRoles', 'projectRoles', 'slaveRoles' 其中一个") data = { 'type': role_type, 'roleName': role_name, 'sid': sid } headers = self.get_crumb() res = requests.post(f'{self.proto}://{self.host}:{self.port}/role-strategy/strategy/assignRole', data=data, headers=headers, auth=(self.username, self.pwd_or_token)) return res.status_code def delete_roles_from_sid(self, role_type, sid): """ 删除指定用户所有的相关权限 注意:如果指定了一个不存在的用户,也不会报错 :param role_type: :param sid: 单个用户 :return: """ if role_type not in ('globalRoles', 'projectRoles', 'slaveRoles'): raise AttributeError("role_type必须是'globalRoles', 'projectRoles', 'slaveRoles' 其中一个") data = { 'type': role_type, 'sid': sid } headers = self.get_crumb() res = requests.post(f'{self.proto}://{self.host}:{self.port}/role-strategy/strategy/deleteSid', data=data, headers=headers, auth=(self.username, self.pwd_or_token)) return res.status_code def unassign_role(self, role_type, role_name, sid): """ 删除指定用户的某个权限 注意:即使指定一个不存在的用户或不存在的role,也不会返回错误 :param role_type: :param role_name: :param sid: :return: """ if role_type not in ('globalRoles', 'projectRoles', 'slaveRoles'): raise AttributeError("role_type必须是'globalRoles', 'projectRoles', 'slaveRoles' 其中一个") data = { 'type': role_type, 'roleName': role_name, 'sid': sid } headers = self.get_crumb() res = requests.post(f'{self.proto}://{self.host}:{self.port}/role-strategy/strategy/unassignRole', data=data, headers=headers, auth=(self.username, self.pwd_or_token)) return res.status_code def get_all_roles(self, role_type): """ 获取指定类型角色下的所有角色以及角色下的用户 返回结果示例:{"p1":[],"p2":["zm"],"test":["zm"]} :param role_type: :return: """ if role_type not in ('globalRoles', 'projectRoles', 'slaveRoles'): raise AttributeError("role_type必须是'globalRoles', 'projectRoles', 'slaveRoles' 其中一个") params = { "type": role_type } res = requests.get(f'{self.proto}://{self.host}:{self.port}/role-strategy/strategy/getAllRoles', params=params, auth=(self.username, self.pwd_or_token)) return res.json()
ldapuser.py
from ldap3 import Server, Connection, SAFE_SYNC import time import re def ldap_get_uid(ldap_server, ldap_user, ldap_pwd, ldap_search_base): uidlist = [] server = Server(ldap_server) conn = Connection(server, ldap_user, ldap_pwd, client_strategy=SAFE_SYNC, auto_bind=True) status, result, response, _ = conn.search(ldap_search_base, '(objectclass=*)', attributes=['*']) for i in response: searchObj = re.search('uid=\d+', i['dn']) if searchObj: uidObj = re.search('\d+', searchObj.group()) if uidObj: uidlist.append(uidObj.group()) return uidlist
jenkins_user_grant.py
import myjenkins from myjenkins import jenkinsrole import myldap from myldap import ldapuser import json import re #获取role信息 def get_role_info(role_type, role_name): jekins_role_json = j.get_role(role_type, role_name) js_fomat = json.dumps(jekins_role_json, sort_keys=True, indent=4) print(js_fomat) #将某个角色赋予某个用户 def role_to_user(role_type, role_name, username): res = j.assign_role(role_type, role_name, username) print('%s to %s %d' %(role_name, username, res)) #删除指定用户的某个权限 def role_cancel_user(role_type, role_name, username): res = j.unassign_role(role_type, role_name, username) print('%s del %s %d' %(username, role_name, res)) j = jenkinsrole.JenkinsRole('192.168.41.29', '5010905') j.host = "192.168.41.29" j.port = 8080 j.username = "5010905" j.token = '1130c5e4c3968bfe4c64edb9f0a8ecc5c5' ldap_server = '192.168.41.13' ldap_user = 'cn=admin,cn=manager,dc=pre,dc=venusgroup,dc=com,dc=cn' ldap_pwd = 'root@123' ldap_search_base = 'ou=6334,ou=4474,ou=4260,ou=4259,ou=employee,dc=pre,dc=venusgroup,dc=com,dc=cn' user_list = ldapuser.ldap_get_uid(ldap_server, ldap_user, ldap_pwd, ldap_search_base) role_type = 'projectRoles' role_name = 'item_other' # for user in user_list: # role_to_user(role_type, role_name, user) # role_cancel_user(role_type, role_name, user) role_list = j.get_all_roles(role_type) # for role in role_list.keys(): # if not role == 'item_admin': # role_to_user(role_type, role, '5010958') # role_cancel_user(role_type, role, '5010958') # for role in role_list.keys(): # if re.search('xxl-job', role): # role_to_user(role_type, role, '5010958') # role_cancel_user(role_type, role, '5010958') for role in role_list.keys(): if re.findall(r'alm|hrm|common', role): role_to_user(role_type, role, '5010958') # role_cancel_user(role_type, role, '5010958')