代码Python可以使用
from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import (
DeleteRowsEvent,
UpdateRowsEvent,
WriteRowsEvent,
)
import threading
import paramiko
import logging.handlers
LOG_FILE = r'hue_auto_create_user.log'
handler = logging.handlers.RotatingFileHandler(LOG_FILE, maxBytes=1024 * 1024, backupCount=5, encoding='utf-8') # 实例化handler
fmt = '%(asctime)s - %(levelname)s - %(message)s'
formatter = logging.Formatter(fmt) # 实例化formatter
handler.setFormatter(formatter) # 为handler添加formatter
logger = logging.getLogger('hue_auto_create_user') # 获取名为hue_auto_create_user的logger
logger.addHandler(handler) # 为logger添加handler
logger.setLevel(logging.DEBUG)
MYSQL_SETTINGS = {
"host": "cdh-m1.temp.online",
"port": 3306,
"user": "root",
"passwd": "12345"
}
HOST_ARRAY = [
'10.50.40.1',
'10.50.40.2',
'10.50.40.3',
'10.50.40.4',
'10.50.40.5',
'10.50.40.6',
'10.50.40.7',
'10.50.40.8',
'10.50.40.9',
'10.50.40.10',
]
def ssh_host_createUserAndGroup(ip, username, passwd, cmd):
try:
# 指定本地的RSA私钥文件,如果建立密钥对时设置的有密码,password为设定的密码,如无不用指定password参数
# pkey = paramiko.RSAKey.from_private_key_file('/home/super/.ssh/id_rsa')
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(ip, 22, username, passwd, timeout=5)
for m in cmd:
stdin, stdout, stderr = ssh.exec_command(m)
out = stdout.readlines()
# 屏幕输出
for o in out:
print(o)
logger.info(u"ssh 远程执行 {} Success".format(ip))
ssh.close()
except:
logger.error(u"ssh 远程执行 {} Error".format(ip))
def startTask(user):
cmd = ['useradd {}'.format(user), 'groupadd {}'.format(user),
'usermod -a -G {} {}'.format(user, user)] # 你要执行的命令列表
username = "root" # 用户名
passwd = "123456" # 密码
for ip in HOST_ARRAY:
a = threading.Thread(target=ssh_host_createUserAndGroup, args=(ip, username, passwd, cmd))
logger.info(u"Host {} 执行命令 {}".format(ip,cmd))
a.start()
def updateMysql(uid,userName):
import pymysql
cnx = pymysql.connect(user='root',
password='123456',
host='cdh-m1.temop.online',
database='hue',
port=3306,
charset='utf8'
)
cursor = cnx.cursor()
try:
cursor.execute("update auth_user set is_superuser=0 where id={}".format(uid))
#INSERT IGNORE INTO
cursor.execute("insert INTO hue.auth_group ( id, name) SELECT (auth_group.id+1), '{}' FROM hue.auth_group order by id DESC limit 1".format(userName))
except Exception:
logger.error("已经存在 Group {}".format(userName))
print("已经存在 Group {}".format(userName))
cursor.execute("insert INTO hue.auth_user_groups(user_id,group_id) SELECT {},auth_group.id FROM hue.auth_group where hue.auth_group.name='{}'".format(uid, userName))
cnx.commit()
cnx.close()
def main():
stream = BinLogStreamReader(
connection_settings=MYSQL_SETTINGS,
server_id=100,
blocking=True,
resume_stream=True,
only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent])
for binlogevent in stream:
e_start_pos, last_pos = stream.log_pos, stream.log_pos
for row in binlogevent.rows:
event = {"schema": binlogevent.schema,
"table": binlogevent.table,
"type": type(binlogevent).__name__,
"row": row
};
if binlogevent.table == "auth_user" and type(binlogevent).__name__ == "DeleteRowsEvent":
logger.info(u"DELETE User " + row['values']['username'])
print(u"DELETE User " + row['values']['username'])
if binlogevent.table == "auth_user" and type(binlogevent).__name__ == "WriteRowsEvent":
userName = row['values']['username'];
uid = row['values']['id'];
print(u"INSERT User {} Uid {}".format(row['values']['username'],uid))
startTask(userName)
logger.info(u"[INFO] 添加用户到集群所有节点 -> {}".format(userName) + "\n")
updateMysql(uid,userName)
logger.info(u"[INFO] 去掉Hue SuperUser {} Uid={}".format(userName, uid) + "\n")
if __name__ == "__main__":
main()
转载请注明出处,本文采用 CC4.0 协议授权