1
2
3
4
5
6
7
from clickhouse_driver import Client
import pandas as pd
import re

client = Client(host='192.168.0.246',port='9000',user='user_r', password='1q2w3e4r', database='default')
sql = "select * from test02"
df = client.execute(sql,columnar=True, with_column_types=True)

#####账号和密码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
192.168.0.246
root
密码abc.com123
popo
密码popo.com123

clickhouse
ip:192.168.0.246
tcp端口8123
http端口9000

超管账号:default
密码:123456

读:user_r
密码:1q2w3e4r

读写:user_rw
密码:1a2s3d4f

读写删:user_rwd
密码:1z2x3c4v

方法
1
2
3
4
5
6
7
8
9
10
11
12
def save_clickhouse(data,tb_name):
client = Client(host='192.168.0.246',port='9000',user='user_rwd', password='1z2x3c4v', database='default')
cols = ','.join(data.columns)
data_tup = tuple(map(tuple, data.values))
client.execute(f"DROP TABLE IF EXISTS %s;"%(tb_name))
if data.shape[1]==2:
client.execute(f"CREATE TABLE %s ( %s String, num UInt16, create_date Date DEFAULT toDate(now()) ) ENGINE = MergeTree(create_date,(%s), 8192);"%(tb_name,data.columns[0], data.columns[0]) )
else:
client.execute(f"CREATE TABLE %s ( %s String, num UInt16, label UInt16, create_date Date DEFAULT toDate(now()) ) ENGINE = MergeTree(create_date,(%s), 8192);"%(tb_name,data.columns[0], data.columns[0]) )
client.execute(f"INSERT INTO {tb_name} ({cols}) VALUES", data_tup, types_check=True)
client.disconnect()
return
1
2
3
4
5
6
7
8
t1 = datetime.now()
sql = 'select applicant_name from company_20210208'
client = Client(host='192.168.0.246', port='9000', user='user_rwd', password='1z2x3c4v', database='pre_formal_1')
df = client.execute(sql)
print(datetime.now()-t1)
t1 = datetime.now()
df = pd.DataFrame(df)
print(datetime.now()-t1)

SQL语法

1
2
3
4
5
6
7
8
tuple_data = [tuple(i) for i in df_indus.values]

client = Client(host='192.168.0.246', port='9000', user='user_rwd', password='1z2x3c4v', database='pre_formal_1')
sql_save = 'INSERT INTO industry_company_wilson_20210208 (`industryId`,`companys`,`companysee`,`applicants`,`companyee_num`,`applicant_num`,`company_num`,`industryname`) VALUES'
client.execute(sql_save,tuple_data,types_check=True)

集群方式创建表
create table kl on cluster ch_cluster(id Int32,name String)ENGINE = MergeTree()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
1.添加新字段
alter table test add column name String afer zone
alter table test add column name String Default 'xiaoming'

#实现拼接字符串和表字段
SELECT 'SELECT * FROM dwd_ptl_info where PORT_ID = \''||PORT_ID||'\';' from dwd_ptl_info

2.修改数据类型
alter table test modify column name name2 String
注意:修改类型实际调用的是类似toString的方法,如果将字符串转换为浮点数(不兼容),会报错

3.添加字段备注
alter table test comment column name '姓名'

4.删除字段
altet table test drop column if exists name

5.清空表
truncate table test

6.重命名
rename table algorithm.zl_zu_L_new_20210608 to algorithm.zl_zu_L_20210608
1
2


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
读全部字段数据
select * from company_20210208
读指定字段数据
select applicant_name from company_20210208
筛选指定字段的指定数据has:元素/hasAny:交集/hasAll:子集
select app_text,app_date from zl_zu_L_20210208 where has(applicants,'华为技术有限公司')
筛选为空/不为空的数据(empty/notEmpty)
select app_text,inventors,ceased_date from zl_zu_L_20210208 where empty(ceased_date)=1
返回数组长度
select app_text,applicants,length(applicants) from zl_zu_L_20210208 limit 10
数组拆分一行变多行
select arrayJoin(applicants) as app1 from zl_zu_L_20210208 limit 10
对数组内部去重(每一行数据)
select applicants,arrayDistinct(applicants) from zl_zu_L_20210208 limit 10
将数据转换为数组,按行
select array(applicant_name) from company_20210208 limit 10
select array(1,2,3,4)
合并两列数据
select arrayConcat(applicants,patentee_others) from zl_zu_L_20210208 limit 20
SELECT arrayConcat([1, 2], [3, 4], [5, 6]) AS res
结果:[1,2,3,4,5,6]
数组中的元素计数
select arrayCount([1,2,1])====>3,同length ,但是arrayCount统计的是非0元素个数
select arrayCount(x->x=1,[1,2,1])====>2,数组中指定元素的个数 等价于
select countEqual([1,2,1],1)=====>2
获取数组中指定索引的元素
select arrayElement(applicants,1) from zl_zu_L_20210208
数组内部排序
select arraySort(applicants) from zl_zu_L_20210208
展平数组,合并列表成一个(列表嵌套)
select arrayFlatten([[1,3,4],[2,3,4,5]])
数组中连续出现的数据去重,数组内部数据类型须一致
select arrayCompact([1,1,2,3,2])
Python中的zip操作
select arrayZip(['a','b','c'],[1,2,3])
对数据内部数据过滤arrayFill,下面是筛掉空值null
select arrayFill(x -> not isNull(x),applicants) from zl_zu_L_20210208 limit 10
对数组切片从索引1开始切0个
arraySlice(groupArray(app_text),1,0)
截取字符串(从第一个位置开始往后截取2个)
substring(x,1,2)
数组内元素拼接
arrayStringConcat(array1,',')
聚合后某个字段转为列表
select patentee,groupArray(app_text) from zl_zu group by patentee

================================高阶函数==================
x -> 2 * x, str -> str != Referer
对数组每一个元素处理
arrayMap(x -> substring(x,1,2), groupArray(app_text))

对数组过滤
SELECT arrayFilter(x -> x LIKE '%World%', ['Hello', 'abc World']) AS res

拆分数组为多个数组
SELECT arraySplit((x, y) -> y, [1, 2, 3, 4, 5], [1, 0, 1, 1, 0]) AS res
返回数组中最小数
select arrayMin([1,2,3])
返回数组中最大数
select arrayMax([1,2,3])
返回数组的求和
select arraySum([1,2,3])
返回数组均值
select arrayAvg([1,2,3])
对数组聚合
select arrayReduce('max',[1,2,3])
公募基金的底层持仓是不是在持仓表
字符串条件判断处理
multiIf(grant_date='','999999',grant_date)
合并字符串concat
concat(a,b,c,',',toString(d))
将字符串转换成数组
select applicant_name,splitByString(',',`groups`) from algorithm_dis.patent_num_L_20210608

按照两列explode
SELECT s, `nest.x`, `nest.y` FROM nested_test ARRAY JOIN nest;
SELECT s, `nest.x`, `nest.y` FROM nested_test ARRAY JOIN `nest.x`, `nest.y`;

其他方法:https://blog.csdn.net/u012111465/article/details/85250030

①DECIMAL做除法运算的时候,分子的精度不能小于分母的精度

创建分布式表:
sql = "CREATE TABLE IF NOT EXISTS algorithm_local.`{}` on cluster cluster_3shards_0replicas (`app_text` String, `pub_text` String,`app_date` Nullable(Date), `pub_date` Nullable(Date), `title` String, `inventors` Array(String), `applicants` Array(String),`patentee_others` Array(String), `ipc` Array(String),`group_name` Array(String),`grant_date` String, `ceased_date` String,`patent_type` String,`current_status` String,`main_family` Array(String),`zl_num_zong` Int64,`app_num_zong` Int64, `count_num` Int64,`paiming` String,`average` Float64,`sm` Float64,`xingji` Int8,`fwdcits` Array(String), `create_date` Date DEFAULT toDate(now())) ENGINE = ReplicatedReplacingMergeTree('/clickhouse/tables/ch170/{{shard}}/{}','{{replica}}') PRIMARY KEY app_text ORDER BY app_text SETTINGS index_granularity = 8192".format(zl_zu_L,zl_zu_L)
client.execute(sql)
sql = "CREATE TABLE IF NOT EXISTS algorithm_dis.`{}` (`app_text` String, `pub_text` String,`app_date` Nullable(Date), `pub_date` Nullable(Date), `title` String, `inventors` Array(String), `applicants` Array(String),`patentee_others` Array(String), `ipc` Array(String),`group_name` Array(String),`grant_date` String, `ceased_date` String,`patent_type` String,`current_status` String,`main_family` Array(String),`zl_num_zong` Int64,`app_num_zong` Int64, `count_num` Int64,`paiming` String,`average` Float64,`sm` Float64,`xingji` Int8,`fwdcits` Array(String), `create_date` Date DEFAULT toDate(now())) ENGINE = Distributed('cluster_3shards_0replicas','algorithm_local','{}',halfMD5(app_text))".format(zl_zu_L,zl_zu_L)
client.execute(sql)

#####dataframe写入clickhouse

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
========================================1===================================
from clickhouse_driver import Client

client = Client('localhost')
df = pandas.DataFrame.from_records([
{'year': 1994, 'first_name': 'Vova'},
{'year': 1995, 'first_name': 'Anja'},
{'year': 1996, 'first_name': 'Vasja'},
{'year': 1997, 'first_name': 'Petja'},
])
# df processing blablabla...
client.execute("INSERT INTO your_table VALUES", df.to_dict('records'))

========================================2===================================
import pandahouse as ph

pdf = pd.DataFrame.from_records([
{'year': '1994', 'first_name': 'Vova'},
{'year': '1995', 'first_name': 'Anja'},
{'year': '1996', 'first_name': 'Vasja'},
{'year': '1997', 'first_name': 'Petja'},
])

connection = dict(database='pre_formal_1',
host='http://192.168.0.246:8123',
user='user_rwd',
password='1z2x3c4v',
)
ph.to_clickhouse(pdf, 'test_humans2', index=False, chunksize=100000, connection=connection)


======================================3======================================
client = Client(host='192.168.0.170', port='9000', user='algorithm', password='1a2s3d4f', database='algorithm_dis',settings={'use_numpy': True})

sql_insert = 'insert into industryid_name_20211208 values'
client.insert_dataframe(sql_insert,df)

1
2
分区键:通常按照日期,存在不同的文件.我们这用不到,当版本数据存在一个表的时候可以用到
分片:sharding,数据存储在不同的节点,那个distributed那种

##client.query_dataframe(sql) 读成dataframe

####写dataframe到ch

1
2
3
4
client = Client(host='192.168.0.170', port='9000', user='algorithm', password='1a2s3d4f', database='algorithm_dis',settings={'use_numpy': True})
df['create_date'] = '20220111'
sql_insert = 'insert into industryid_name_20211208 values'
client.insert_dataframe(sql_insert,df)
1
2
3
systemctl start clickhouse-server
systemctl stop clickhouse-server
systemctl status clickhouse-server

https://github.com/mymarilyn/clickhouse-driver