python 监控binlog实现hue一个小插件
代码Python可以使用
1from pymysqlreplication import BinLogStreamReader
2from pymysqlreplication.row_event import (
3DeleteRowsEvent,
4UpdateRowsEvent,
5WriteRowsEvent,
6)
7import threading
8import paramiko
9import logging.handlers
10
11LOG_FILE = r'hue_auto_create_user.log'
12
13handler = logging.handlers.RotatingFileHandler(LOG_FILE, maxBytes=1024 * 1024, backupCount=5, encoding='utf-8') # 实例化handler
14fmt = '%(asctime)s - %(levelname)s - %(message)s'
15
16formatter = logging.Formatter(fmt) # 实例化formatter
17handler.setFormatter(formatter) # 为handler添加formatter
18
19logger = logging.getLogger('hue_auto_create_user') # 获取名为hue_auto_create_user的logger
20logger.addHandler(handler) # 为logger添加handler
21logger.setLevel(logging.DEBUG)
22
23
24MYSQL_SETTINGS = {
25"host": "cdh-m1.temp.online",
26"port": 3306,
27"user": "root",
28"passwd": "12345"
29}
30
31HOST_ARRAY = [
32'10.50.40.1',
33'10.50.40.2',
34'10.50.40.3',
35'10.50.40.4',
36'10.50.40.5',
37'10.50.40.6',
38'10.50.40.7',
39'10.50.40.8',
40'10.50.40.9',
41'10.50.40.10',
42]
43
44
45def ssh_host_createUserAndGroup(ip, username, passwd, cmd):
46try:
47# 指定本地的RSA私钥文件,如果建立密钥对时设置的有密码,password为设定的密码,如无不用指定password参数
48# pkey = paramiko.RSAKey.from_private_key_file('/home/super/.ssh/id_rsa')
49ssh = paramiko.SSHClient()
50ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
51ssh.connect(ip, 22, username, passwd, timeout=5)
52for m in cmd:
53stdin, stdout, stderr = ssh.exec_command(m)
54out = stdout.readlines()
55# 屏幕输出
56for o in out:
57print(o)
58logger.info(u"ssh 远程执行 {} Success".format(ip))
59ssh.close()
60except:
61logger.error(u"ssh 远程执行 {} Error".format(ip))
62
63
64def startTask(user):
65cmd = ['useradd {}'.format(user), 'groupadd {}'.format(user),
66'usermod -a -G {} {}'.format(user, user)] # 你要执行的命令列表
67username = "root" # 用户名
68passwd = "123456" # 密码
69
70for ip in HOST_ARRAY:
71a = threading.Thread(target=ssh_host_createUserAndGroup, args=(ip, username, passwd, cmd))
72logger.info(u"Host {} 执行命令 {}".format(ip,cmd))
73a.start()
74
75
76def updateMysql(uid,userName):
77import pymysql
78cnx = pymysql.connect(user='root',
79password='123456',
80host='cdh-m1.temop.online',
81database='hue',
82port=3306,
83charset='utf8'
84)
85cursor = cnx.cursor()
86try:
87cursor.execute("update auth_user set is_superuser=0 where id={}".format(uid))
88#INSERT IGNORE INTO
89cursor.execute("insert INTO hue.auth_group ( id, name) SELECT (auth_group.id+1), '{}' FROM hue.auth_group order by id DESC limit 1".format(userName))
90except Exception:
91logger.error("已经存在 Group {}".format(userName))
92print("已经存在 Group {}".format(userName))
93
94
95cursor.execute("insert INTO hue.auth_user_groups(user_id,group_id) SELECT {},auth_group.id FROM hue.auth_group where hue.auth_group.name='{}'".format(uid, userName))
96cnx.commit()
97cnx.close()
98
99def main():
100stream = BinLogStreamReader(
101connection_settings=MYSQL_SETTINGS,
102server_id=100,
103blocking=True,
104resume_stream=True,
105only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent])
106
107for binlogevent in stream:
108e_start_pos, last_pos = stream.log_pos, stream.log_pos
109for row in binlogevent.rows:
110event = {"schema": binlogevent.schema,
111"table": binlogevent.table,
112"type": type(binlogevent).__name__,
113"row": row
114};
115if binlogevent.table == "auth_user" and type(binlogevent).__name__ == "DeleteRowsEvent":
116logger.info(u"DELETE User " + row['values']['username'])
117print(u"DELETE User " + row['values']['username'])
118
119if binlogevent.table == "auth_user" and type(binlogevent).__name__ == "WriteRowsEvent":
120userName = row['values']['username'];
121uid = row['values']['id'];
122print(u"INSERT User {} Uid {}".format(row['values']['username'],uid))
123
124startTask(userName)
125logger.info(u"[INFO] 添加用户到集群所有节点 -> {}".format(userName) + "\n")
126updateMysql(uid,userName)
127logger.info(u"[INFO] 去掉Hue SuperUser {} Uid={}".format(userName, uid) + "\n")
128
129
130if __name__ == "__main__":
131main()
132
133