pp模块

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# 大家一起完善
import paramiko
import time
import random
import pp
# 本机ip
local_ip = [x.split(' ')[0] for x in os.popen('hostname -I')][0]
host_tuple = ({'ip': '192.168.0.231', 'port': 22, 'username': 'root', 'password': 'e6772fc39c25'},
{'ip': '192.168.0.227', 'port': 22, 'username': 'root', 'password': 'fa2cadd1cc81'})
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 根据内存使用判断进程数
def ppserve_1(host_tuple):
remote_cpu_nums=[]
ports = []
for host in host_tuple:
# 内存使用情况
ssh.connect(hostname=host['ip'], port=host['port'], username=host['username'], password=host['password'])
print(host['ip'])
stdin, stdout, stderr = ssh.exec_command('cat /proc/meminfo')
str_out = stdout.read().decode()
str_err = stderr.read().decode()
if str_err != "":
print(str_err)
continue
str_total = re.search('MemTotal:.*?\n', str_out).group()
# print(str_total)
totalmem = re.search('\d+', str_total).group()
str_free = re.search('MemFree:.*?\n', str_out).group()
# print(str_free)
freemem = re.search('\d+', str_free).group()
mem_free = round(float(freemem) / float(totalmem), 2)
print('当前内存空闲率为:' + str(mem_free))
# 本地可以不走分布式,但分布式 debug 模式可以监测在干什么,但增加了数据传输量;减少传输量,传参数过去后再读取或生成比较快。
# if host['ip'] == local_ip:
# local_cpu_num = int((mem_free-0.1)/0.0125)
# if local_cpu_num<0:
# local_cpu_num=0
# else:
# 根据内存使用判断进程数:假设保留 10% 内存,剩余内存按 80 个进程平均分配,不适合所有计算
remote_cpu_num = int((mem_free-0.1) / 0.0125)
if remote_cpu_num < 0:
remote_cpu_num = 0
print('新开进程数为:' + str(remote_cpu_num))
remote_cpu_nums.append(remote_cpu_num)
# 随机生成端口号
port = int(random.random()*(65535-1024)+1024)
ports.append(port)
# nohup 启动不了,不知道 why
# cmd = 'cd /usr/bin;nohup python3 ppserver.py -p '+str(port)+' -d -w '+str(remote_cpu_num)+' -P ppserver.pid -t 3600 -k 3600 > ppserver.out 2>&1 &'
# 启动命令,进入 ppserver.py 所在文件夹,用 Python3 启动,可以调用 python3 下的包, -p 指定端口,-d debug模式,-w 知道进程数, -P 指定一个文件保留进程号,后面 kill 时候用到
cmd = 'cd /usr/bin;python3 ppserver.py -p '+str(port)+' -d -w '+str(remote_cpu_num)+' -P ppserver.pid -t 3600 -k 3600'
# 若在这里执行则后面 ssh.close() 注销掉,否则 close 了就停止(如何 close,在这里分布式,然后分别close?);而且看不到 debug 模式,
# stdin, stdout, stderr = ssh.exec_command(cmd, get_pty=True)
print(cmd)
ssh.close()
return remote_cpu_nums,ports
remote_cpu_nums,ports = ppserve_1(host_tuple)
# 等待 ppserver 生成完成
time.sleep(30)
ppservers = tuple([x['ip']+':'+str(port) for x,port in zip(host_tuple,ports)] )
# ppservers = tuple([x['ip']+':'+str(port) for x in host_list if x['ip'] != local_ip])
# 本地进程数设为0
job_server = pp.Server(0 ,ppservers=ppservers,socket_timeout=3600)
# 问题:worker数、进程数、cpu数的关系?
job_server.get_active_nodes()
# job_server.submit() 指令
job_server.print_stats()
# 关闭 client(python)端
job_server.destroy()
# kill server(服务器)端 ppserver.py 进程
for host in host_tuple:
ssh.connect(hostname=host['ip'], port=host['port'], username=host['username'], password=host['password'])
ssh.exec_command('cd /usr/bin;kill `cat ppserver.pid`')
ssh.close()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import pp
import pandas
import pymysql
import sqlalchemy
import time
import json

db_251_pre2 = {
'host': '192.168.0.251',
'port': 3306,
'user': 'user_rw',
'password': '1a2s3d4f',
'db': 'pre_formal_2',
'charset': 'utf8mb4'}
db_222_pre2 = {
'host': '192.168.0.222',
'port': 3306,
'user': 'user_rw',
'password': '1a2s3d4f',
'db': 'pre_formal_2',
'charset': 'utf8mb4'}

def df2_mysql(df_data, table_name, **kwargs):
"""
To wrap a function that insert DataFrame to Mysql
:param df_data: DataFrame data
:param table_name: The name of mysql table, type is string
:param kwargs: The connection of database, type is dict
"""
engine = sqlalchemy.create_engine(
'mysql+pymysql://{}:{}@{}:{}/{}?charset={}'.format(kwargs['user'], kwargs['password'], kwargs['host'],
kwargs['port'], kwargs['db'], kwargs['charset']))
pandas.io.sql.to_sql(df_data, table_name, engine, index=False, if_exists='append',
chunksize=10000) # if_exists: 'replace', 'append'
engine.dispose()


def main_func(industry_id, db_251_pre2, db_222_pre2, result_tb):
print(industry_id, '开始', '********************')
# 其他指标
print(f'industry[{industry_id}] reading...')
sql1 = f'select * from patent_layout_20200722 where industry_id="{industry_id}"'
conn = pymysql.connect(**db_251_pre2)
df_data = pandas.read_sql(sql1, conn)
conn.close()
df_columns = df_data.columns
if 'id' in df_columns: # 若有id,就删除id
df_data = df_data.drop('id', axis=1)
print(f'industry[{industry_id}] insert into mysql...')
batch_num = 250
for batch_i in range(0, len(df_data), batch_num):
df2_mysql(df_data[batch_i: batch_i + batch_num], result_tb, **db_222_pre2)
print(f'industry[{industry_id}] is done! \n', '=' * 50)


if __name__ == "__main__":
start_time = time.time()
with open('industries_idname.txt', 'r', encoding='utf-8') as f:
industries = json.load(f)
result_tb = 'patent_layout_pptest200727'
# 先选择一个服务器作为客户端,
# 再找到ppserver.py文件
# 然后,启动文件:python ppserver.py - p 3505 - w 5 - i 192.168.0.231 - s "123456"
# 然后在本机中,运行主函数代码
# 注意:主函数代码中,导入模块不能使用import...as,也不能用from ... import,只能用import
# 在服务器上,用ps -ef | grep python | grep pp | grep deng 查询pp进程
ppservers = ('192.168.0.231:3505',) # 远程服务端ip和端口号,为空就是本机
job_server = pp.Server(ncpus=5, ppservers=ppservers, secret='123456') # ncpus:本机进程数量
jobs = [job_server.submit(main_func,
(industry_id, db_251_pre2, db_222_pre2, result_tb),
depfuncs=(df2_mysql,),
modules=("pymysql", "time", "sqlalchemy", "pandas"))
for industry_id in industries]
for job in jobs:
job()
job_server.destroy()