python 监控binlog实现hue一个小插件

2025-10-28

python 监控binlog实现hue一个小插件

代码Python可以使用

  1from pymysqlreplication import BinLogStreamReader  
  2from pymysqlreplication.row_event import (  
  3DeleteRowsEvent,  
  4UpdateRowsEvent,  
  5WriteRowsEvent,  
  6)  
  7import threading  
  8import paramiko  
  9import logging.handlers  
 10  
 11LOG_FILE = r'hue_auto_create_user.log'  
 12  
 13handler = logging.handlers.RotatingFileHandler(LOG_FILE, maxBytes=1024 * 1024, backupCount=5, encoding='utf-8') # 实例化handler  
 14fmt = '%(asctime)s - %(levelname)s - %(message)s'  
 15  
 16formatter = logging.Formatter(fmt) # 实例化formatter  
 17handler.setFormatter(formatter) # 为handler添加formatter  
 18  
 19logger = logging.getLogger('hue_auto_create_user') # 获取名为hue_auto_create_user的logger  
 20logger.addHandler(handler) # 为logger添加handler  
 21logger.setLevel(logging.DEBUG)  
 22  
 23  
 24MYSQL_SETTINGS = {  
 25"host": "cdh-m1.temp.online",  
 26"port": 3306,  
 27"user": "root",  
 28"passwd": "12345"  
 29}  
 30  
 31HOST_ARRAY = [  
 32'10.50.40.1',  
 33'10.50.40.2',  
 34'10.50.40.3',  
 35'10.50.40.4',  
 36'10.50.40.5',  
 37'10.50.40.6',  
 38'10.50.40.7',  
 39'10.50.40.8',  
 40'10.50.40.9',  
 41'10.50.40.10',  
 42]  
 43  
 44  
 45def ssh_host_createUserAndGroup(ip, username, passwd, cmd):  
 46try:  
 47# 指定本地的RSA私钥文件,如果建立密钥对时设置的有密码,password为设定的密码,如无不用指定password参数  
 48# pkey = paramiko.RSAKey.from_private_key_file('/home/super/.ssh/id_rsa')  
 49ssh = paramiko.SSHClient()  
 50ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())  
 51ssh.connect(ip, 22, username, passwd, timeout=5)  
 52for m in cmd:  
 53stdin, stdout, stderr = ssh.exec_command(m)  
 54out = stdout.readlines()  
 55# 屏幕输出  
 56for o in out:  
 57print(o)  
 58logger.info(u"ssh 远程执行 {} Success".format(ip))  
 59ssh.close()  
 60except:  
 61logger.error(u"ssh 远程执行 {} Error".format(ip))  
 62  
 63  
 64def startTask(user):  
 65cmd = ['useradd {}'.format(user), 'groupadd {}'.format(user),  
 66'usermod -a -G {} {}'.format(user, user)] # 你要执行的命令列表  
 67username = "root" # 用户名  
 68passwd = "123456" # 密码  
 69  
 70for ip in HOST_ARRAY:  
 71a = threading.Thread(target=ssh_host_createUserAndGroup, args=(ip, username, passwd, cmd))  
 72logger.info(u"Host {} 执行命令 {}".format(ip,cmd))  
 73a.start()  
 74  
 75  
 76def updateMysql(uid,userName):  
 77import pymysql  
 78cnx = pymysql.connect(user='root',  
 79password='123456',  
 80host='cdh-m1.temop.online',  
 81database='hue',  
 82port=3306,  
 83charset='utf8'  
 84)  
 85cursor = cnx.cursor()  
 86try:  
 87cursor.execute("update auth_user set is_superuser=0 where id={}".format(uid))  
 88#INSERT IGNORE INTO  
 89cursor.execute("insert INTO hue.auth_group ( id, name) SELECT (auth_group.id+1), '{}' FROM hue.auth_group order by id DESC limit 1".format(userName))  
 90except Exception:  
 91logger.error("已经存在 Group {}".format(userName))  
 92print("已经存在 Group {}".format(userName))  
 93  
 94  
 95cursor.execute("insert INTO hue.auth_user_groups(user_id,group_id) SELECT {},auth_group.id FROM hue.auth_group where hue.auth_group.name='{}'".format(uid, userName))  
 96cnx.commit()  
 97cnx.close()  
 98  
 99def main():  
100stream = BinLogStreamReader(  
101connection_settings=MYSQL_SETTINGS,  
102server_id=100,  
103blocking=True,  
104resume_stream=True,  
105only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent])  
106  
107for binlogevent in stream:  
108e_start_pos, last_pos = stream.log_pos, stream.log_pos  
109for row in binlogevent.rows:  
110event = {"schema": binlogevent.schema,  
111"table": binlogevent.table,  
112"type": type(binlogevent).__name__,  
113"row": row  
114};  
115if binlogevent.table == "auth_user" and type(binlogevent).__name__ == "DeleteRowsEvent":  
116logger.info(u"DELETE User " + row['values']['username'])  
117print(u"DELETE User " + row['values']['username'])  
118  
119if binlogevent.table == "auth_user" and type(binlogevent).__name__ == "WriteRowsEvent":  
120userName = row['values']['username'];  
121uid = row['values']['id'];  
122print(u"INSERT User {} Uid {}".format(row['values']['username'],uid))  
123  
124startTask(userName)  
125logger.info(u"[INFO] 添加用户到集群所有节点 -> {}".format(userName) + "\n")  
126updateMysql(uid,userName)  
127logger.info(u"[INFO] 去掉Hue SuperUser {} Uid={}".format(userName, uid) + "\n")  
128  
129  
130if __name__ == "__main__":  
131main()  
132  
133