×

Postgres Citus 高可用

Postgres高可用之Citus分布式集群搭建及使用

admin admin 发表于2022-07-02 15:15:11 浏览100 评论0

抢沙发发表评论

Citus集群简介Citus是Postgres的开源扩展,将Postgres转换成一个分布式数据库,在集群的多个节点上分发数据和查询,具有像分片、分布式SQL引擎、复制表和分布式表等特性。

因为Citus是Postgres的扩展(而不是一个独立的代码分支),所以当你使用Citus时,你也在使用Postgres,可以利用最新的Postgres特性、工具和生态系统。

Citus是一款基于PostgreSQL的开源分布式数据库,自动继承了PostgreSQL强大的SQL支持能力和应用生态(不仅是客户端协议的兼容还包括服务端扩展和管理工具的完全兼容)。Citus是PostgreSQL的扩展(not a fork),采用shared nothing架构,节点之间无共享数据,由协调器节点和Work节点构成一个数据库集群。专注于高性能HTAP分布式数据库。-Citus

相比单机PostgreSQL,Citus可以使用更多的CPU核心,更多的内存数量,保存更多的数据。通过向集群添加节点,可以轻松的扩展数据库。与其他类似的基于PostgreSQL的分布式方案,比如Greenplum,PostgreSQL-XL相比,citus最大的不同在于它是一个PostgreSQL扩展而不是一个独立的代码分支。 Citus可以用很小的代价和更快的速度紧跟PostgreSQL的版本演进;同时又能最大程度的保证数据库的稳定性和兼容性。-Postgres

Citus支持新版本PostgreSQL的特性,并保持与现有工具的兼容。 Citus使用分片和复制在多台机器上横向扩展PostgreSQL。它的查询引擎将在这些服务器上执行SQL进行并行化查询,以便在大型数据集上实现实时(不到一秒)的响应。-Citus

Citus集群由一个中心的协调节点(CN)和若干个工作节点(Worker)构成。

coordinate:协调节点,一般称为cn,存储所有元数据,不存实际数据,该节点直接对用户开放,等于一个客户端。worker:工作节点,不存储元数据,存储实际数据。执行协调节点发来的查询请求。一般不直接对用户开放。-Postgres

环境

主机名

IP

角色

端口

备注

coordinate

172.72.6.80

coordinate

5432

安装PG 13.3 + Citus 10.2.4

worker1

172.72.6.81

worker

5432

安装PG 13.3 + Citus 10.2.4

worker2

172.72.6.82

worker

5432

安装PG 13.3 + Citus 10.2.4

worker3

172.72.6.83

worker

5432

安装PG 13.3 + Citus 10.2.4

worker4

172.72.6.84

worker

5432

安装PG 13.3 + Citus 10.2.4

-- 网卡docker network create --subnet=172.72.6.0/24 pg-network-- pg cndocker rm -f lhrpgcituscn80docker run -d --name lhrpgcituscn80 -h lhrpgcituscn80 \ --net=pg-network --ip 172.72.6.80 \ -p 64380:5432 \ -v /sys/fs/cgroup:/sys/fs/cgroup \ --privileged=true lhrbest/lhrpgall:2.0 \ /usr/sbin/initdocker rm -f lhrpgcitusdn81docker run -d --name lhrpgcitusdn81 -h lhrpgcitusdn81 \ --net=pg-network --ip 172.72.6.81 \ -p 64381:5432 \ -v /sys/fs/cgroup:/sys/fs/cgroup \ --privileged=true lhrbest/lhrpgall:2.0 \ /usr/sbin/initdocker rm -f lhrpgcitusdn82docker run -d --name lhrpgcitusdn82 -h lhrpgcitusdn82 \ --net=pg-network --ip 172.72.6.82 \ -p 64382:5432 \ -v /sys/fs/cgroup:/sys/fs/cgroup \ --privileged=true lhrbest/lhrpgall:2.0 \ /usr/sbin/initdocker rm -f lhrpgcitusdn83docker run -d --name lhrpgcitusdn83 -h lhrpgcitusdn83 \ --net=pg-network --ip 172.72.6.83 \ -p 64383:5432 \ -v /sys/fs/cgroup:/sys/fs/cgroup \ --privileged=true lhrbest/lhrpgall:2.0 \ /usr/sbin/initdocker rm -f lhrpgcitusdn84docker run -d --name lhrpgcitusdn84 -h lhrpgcitusdn84 \ --net=pg-network --ip 172.72.6.84 \ -p 64384:5432 \ -v /sys/fs/cgroup:/sys/fs/cgroup \ --privileged=true lhrbest/lhrpgall:2.0 \ /usr/sbin/init[root@docker35 ~]# docker psCONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES0183e7a9704a lhrbest/lhrpgall:2.0 '/usr/sbin/init' 6 seconds ago Up 3 seconds 0.0.0.0:64384->5432/tcp, :::64384->5432/tcp lhrpgcitusdn84877d897a5a76 lhrbest/lhrpgall:2.0 '/usr/sbin/init' 8 seconds ago Up 6 seconds 0.0.0.0:64383->5432/tcp, :::64383->5432/tcp lhrpgcitusdn8398dafcefc505 lhrbest/lhrpgall:2.0 '/usr/sbin/init' 10 seconds ago Up 7 seconds 0.0.0.0:64382->5432/tcp, :::64382->5432/tcp lhrpgcitusdn8204510e0bfa96 lhrbest/lhrpgall:2.0 '/usr/sbin/init' 11 seconds ago Up 10 seconds 0.0.0.0:64381->5432/tcp, :::64381->5432/tcp lhrpgcitusdn818cf991b0633f lhrbest/lhrpgall:2.0 '/usr/sbin/init' 13 seconds ago Up 11 seconds 0.0.0.0:64380->5432/tcp, :::64380->5432/tcp lhrpgcituscn80防火墙修改其中,coordinate节点的pg_hba.conf配置:-Citus

cat >> /var/lib/pgsql/13/data/pg_hba.conf <<'EOF'host all all 0.0.0.0/0 md5EOFworker节点的pg_hba.conf配置:-Postgres

cat >> /var/lib/pgsql/13/data/pg_hba.conf <<'EOF'host all all 172.72.6.0/24 trustEOF安装citus在每个节点上都安装citus,包括cn和dn。可以在以下位置下载citus的源码:-Citus

/

最新版本10.2.4,如下:

-- yum直接安装yum list | grep citusyum install -y citus_13su - postgresqlpsqlcreate database lhrdb;\c lhrdbalter system set shared_preload_libraries='citus';select * from pg_available_extensions where name='citus';pg_ctl restartpsql -d lhrdbcreate extension citus;\dx\dx+ citus安装过程:-Postgres

[postgres@lhrpgcituscn80 ~]$ psqlpsql (13.3)Type 'help' for help.postgres=# create database lhrdb;CREATE DATABASEpostgres=# \c lhrdb;You are now connected to database 'lhrdb' as user 'postgres'.lhrdb=# create extension citus;ERROR: Citus can only be loaded via shared_preload_librariesHINT: Add citus to shared_preload_libraries configuration variable in postgresql.conf in master and workers. Note that citus should be at the beginning of shared_preload_libraries.lhrdb=# show shared_preload_libraries; shared_preload_libraries --------------------------(1 row)lhrdb=# alter system set shared_preload_libraries='citus';ALTER SYSTEMlhrdb=# select pg_reload_conf(); pg_reload_conf ---------------- t(1 row)lhrdb=# select * from pg_available_extensions where name='citus'; name | default_version | installed_version | comment -------+-----------------+-------------------+---------------------------- citus | 10.2-4 | | Citus distributed database(1 row)lhrdb=# exit[postgres@lhrpgcituscn80 ~]$ pg_ctl restart waiting for server to shut down.... doneserver stoppedwaiting for server to start....2022-02-17 10:27:34.354 CST [1589] LOG: number of prepared transactions has not been configured, overriding2022-02-17 10:27:34.354 CST [1589] DETAIL: max_prepared_transactions is now set to 2002022-02-17 10:27:34.415 CST [1589] LOG: redirecting log output to logging collector process2022-02-17 10:27:34.415 CST [1589] HINT: Future log output will appear in directory 'pg_log'. doneserver started[postgres@lhrpgcituscn80 ~]$ psqlpsql (13.3)Type 'help' for help.postgres=# \c lhrdbYou are now connected to database 'lhrdb' as user 'postgres'.lhrdb=# show shared_preload_libraries; shared_preload_libraries -------------------------- citus(1 row)lhrdb=# create extension citus;CREATE EXTENSIONlhrdb=# \dx List of installed extensions Name | Version | Schema | Description ---------+---------+------------+------------------------------ citus | 10.2-4 | pg_catalog | Citus distributed database plpgsql | 1.0 | pg_catalog | PL/pgSQL procedural language(2 rows)lhrdb=# \dx+ citus Objects in extension 'citus' Object description ----------------------------------------------------------------------------------------------------------------- access method columnar event trigger citus_cascade_to_partition function alter_columnar_table_reset(regclass,boolean,boolean,boolean,boolean) function alter_columnar_table_set(regclass,integer,integer,name,integer) function alter_distributed_table(regclass,text,integer,text,boolean) function alter_old_partitions_set_access_method(regclass,timestamp with time zone,name) function alter_role_if_exists(text,text) function alter_table_set_access_method(regclass,text) function any_value_agg(anyelement,anyelement) function any_value(anyelement) function array_cat_agg(anyarray) function assign_distributed_transaction_id(integer,bigint,timestamp with time zone) function authinfo_valid(text) function broadcast_intermediate_result(text,text) function check_distributed_deadlocks() function citus_activate_node(text,integer) function citus_add_inactive_node(text,integer,integer,noderole,name) function citus_add_local_table_to_metadata(regclass,boolean) function citus_add_node(text,integer,integer,noderole,name) function citus_add_rebalance_strategy(name,regproc,regproc,regproc,real,real,real) function citus_add_secondary_node(text,integer,text,integer,name) function citus_blocking_pids(integer) function citus_cleanup_orphaned_shards() function citus_conninfo_cache_invalidate() function citus_copy_shard_placement(bigint,text,integer,text,integer,boolean,citus.shard_transfer_mode) function citus_create_restore_point(text) function citus_disable_node(text,integer) function citus_dist_local_group_cache_invalidate() function citus_dist_node_cache_invalidate() function citus_dist_object_cache_invalidate() function citus_dist_partition_cache_invalidate() function citus_dist_placement_cache_invalidate() function citus_dist_shard_cache_invalidate() function citus_dist_stat_activity() function citus_drain_node(text,integer,citus.shard_transfer_mode,name) function citus_drop_all_shards(regclass,text,text,boolean) function citus_drop_trigger() function citus_executor_name(integer) function citus_extradata_container(internal) function citus_finish_pg_upgrade() function citus_get_active_worker_nodes() function citus_internal_add_partition_metadata(regclass,'char',text,integer,'char') function citus_internal_add_placement_metadata(bigint,integer,bigint,integer,bigint) function citus_internal_add_shard_metadata(regclass,bigint,'char',text,text) function citus_internal.columnar_ensure_am_depends_catalog() function citus_internal_delete_shard_metadata(bigint) function citus_internal.downgrade_columnar_storage(regclass) function citus_internal.find_groupid_for_node(text,integer) function citus_internal.pg_dist_node_trigger_func() function citus_internal.pg_dist_rebalance_strategy_trigger_func() function citus_internal.pg_dist_shard_placement_trigger_func() function citus_internal.refresh_isolation_tester_prepared_statement() function citus_internal.replace_isolation_tester_func() function citus_internal.restore_isolation_tester_func() function citus_internal_update_placement_metadata(bigint,integer,integer) function citus_internal_update_relation_colocation(oid,integer) function citus_internal.upgrade_columnar_storage(regclass) function citus_isolation_test_session_is_blocked(integer,integer[]) function citus_jsonb_concatenate_final(jsonb) function citus_jsonb_concatenate(jsonb,jsonb) function citus_json_concatenate_final(json) function citus_json_concatenate(json,json) function citus_local_disk_space_stats() function citus_move_shard_placement(bigint,text,integer,text,integer,citus.shard_transfer_mode) function citus_node_capacity_1(integer) function citus_prepare_pg_upgrade() function citus_query_stats() function citus_relation_size(regclass) function citus_remote_connection_stats() function citus_remove_node(text,integer) function citus_server_id() function citus_set_coordinator_host(text,integer,noderole,name) function citus_set_default_rebalance_strategy(text) function citus_set_node_property(text,integer,text,boolean) function citus_shard_allowed_on_node_true(bigint,integer) function citus_shard_cost_1(bigint) function citus_shard_cost_by_disk_size(bigint) function citus_shard_sizes() function citus_stat_statements() function citus_stat_statements_reset() function citus_table_is_visible(oid) function citus_table_size(regclass) function citus_text_send_as_jsonb(text) function citus_total_relation_size(regclass,boolean) function citus_truncate_trigger() function citus_unmark_object_distributed(oid,oid,integer) function citus_update_node(integer,text,integer,boolean,integer) function citus_update_shard_statistics(bigint) function citus_update_table_statistics(regclass) function citus_validate_rebalance_strategy_functions(regproc,regproc,regproc) function citus_version() function citus_worker_stat_activity() function columnar.columnar_handler(internal) function column_name_to_column(regclass,text) function column_to_column_name(regclass,text) function coord_combine_agg_ffunc(internal,oid,cstring,anyelement) function coord_combine_agg(oid,cstring,anyelement) function coord_combine_agg_sfunc(internal,oid,cstring,anyelement) function create_distributed_function(regprocedure,text,text) function create_distributed_table(regclass,text,citus.distribution_type,text,integer) function create_intermediate_result(text,text) function create_reference_table(regclass) function create_time_partitions(regclass,interval,timestamp with time zone,timestamp with time zone) function distributed_tables_colocated(regclass,regclass) function drop_old_time_partitions(regclass,timestamp with time zone) function dump_global_wait_edges() function dump_local_wait_edges() function fetch_intermediate_results(text[],text,integer) function fix_all_partition_shard_index_names() function fix_partition_shard_index_names(regclass) function fix_pre_citus10_partitioned_table_constraint_names() function fix_pre_citus10_partitioned_table_constraint_names(regclass) function get_all_active_transactions() function get_colocated_shard_array(bigint) function get_colocated_table_array(regclass) function get_current_transaction_id() function get_global_active_transactions() function get_missing_time_partition_ranges(regclass,interval,timestamp with time zone,timestamp with time zone) function get_rebalance_progress() function get_rebalance_table_shards_plan(regclass,real,integer,bigint[],boolean,name,real) function get_shard_id_for_distribution_column(regclass,'any') function isolate_tenant_to_new_shard(regclass,'any',text) function jsonb_cat_agg(jsonb) function json_cat_agg(json) function lock_relation_if_exists(text,text) function lock_shard_metadata(integer,bigint[]) function lock_shard_resources(integer,bigint[]) function master_activate_node(text,integer) function master_add_inactive_node(text,integer,integer,noderole,name) function master_add_node(text,integer,integer,noderole,name) function master_add_secondary_node(text,integer,text,integer,name) function master_append_table_to_shard(bigint,text,text,integer) function master_apply_delete_command(text) function master_copy_shard_placement(bigint,text,integer,text,integer,boolean,citus.shard_transfer_mode) function master_create_empty_shard(text) function master_disable_node(text,integer) function master_drain_node(text,integer,citus.shard_transfer_mode,name) function master_get_active_worker_nodes() function master_get_new_placementid() function master_get_new_shardid() function master_get_table_ddl_events(text) function master_get_table_metadata(text) function master_move_shard_placement(bigint,text,integer,text,integer,citus.shard_transfer_mode) function master_remove_distributed_table_metadata_from_workers(regclass,text,text) function master_remove_node(text,integer) function master_remove_partition_metadata(regclass,text,text) function master_run_on_worker(text[],integer[],text[],boolean) function master_set_node_property(text,integer,text,boolean) function master_unmark_object_distributed(oid,oid,integer) function master_update_node(integer,text,integer,boolean,integer) function master_update_shard_statistics(bigint) function master_update_table_statistics(regclass) function notify_constraint_dropped() function poolinfo_valid(text) function read_intermediate_results(text[],citus_copy_format) function read_intermediate_result(text,citus_copy_format) function rebalance_table_shards(regclass,real,integer,bigint[],citus.shard_transfer_mode,boolean,name) function recover_prepared_transactions() function relation_is_a_known_shard(regclass) function remove_local_tables_from_metadata() function replicate_reference_tables() function replicate_table_shards(regclass,integer,integer,bigint[],citus.shard_transfer_mode) function role_exists(name) function run_command_on_colocated_placements(regclass,regclass,text,boolean) function run_command_on_placements(regclass,text,boolean) function run_command_on_shards(regclass,text,boolean) function run_command_on_workers(text,boolean) function shard_name(regclass,bigint) function start_metadata_sync_to_node(text,integer) function stop_metadata_sync_to_node(text,integer,boolean) function time_partition_range(regclass) function truncate_local_data_after_distributing_table(regclass) function undistribute_table(regclass,boolean) function update_distributed_table_colocation(regclass,text) function worker_append_table_to_shard(text,text,text,integer) function worker_apply_inter_shard_ddl_command(bigint,text,bigint,text,text) function worker_apply_sequence_command(text) function worker_apply_sequence_command(text,regtype) function worker_apply_shard_ddl_command(bigint,text) function worker_apply_shard_ddl_command(bigint,text,text) function worker_change_sequence_dependency(regclass,regclass,regclass) function worker_cleanup_job_schema_cache() function worker_create_or_alter_role(text,text,text) function worker_create_or_replace_object(text) function worker_create_schema(bigint,text) function worker_create_truncate_trigger(regclass) function worker_drop_distributed_table(text) function worker_fetch_foreign_file(text,text,bigint,text[],integer[]) function worker_fetch_partition_file(bigint,integer,integer,integer,text,integer) function worker_fix_partition_shard_index_names(regclass,text,text) function worker_fix_pre_citus10_partitioned_table_constraint_names(regclass,bigint,text) function worker_hash('any') function worker_hash_partition_table(bigint,integer,text,text,oid,anyarray) function worker_last_saved_explain_analyze() function worker_merge_files_into_table(bigint,integer,text[],text[]) function worker_nextval(regclass) function worker_partial_agg_ffunc(internal) function worker_partial_agg(oid,anyelement) function worker_partial_agg_sfunc(internal,oid,anyelement) function worker_partitioned_relation_size(regclass) function worker_partitioned_relation_total_size(regclass) function worker_partitioned_table_size(regclass) function worker_partition_query_result(text,text,integer,citus.distribution_type,text[],text[],boolean) function worker_range_partition_table(bigint,integer,text,text,oid,anyarray) function worker_record_sequence_dependency(regclass,regclass,name) function worker_repartition_cleanup(bigint) function worker_save_query_explain_analyze(text,jsonb) schema citus schema citus_internal schema columnar sequence columnar.storageid_seq sequence pg_dist_colocationid_seq sequence pg_dist_groupid_seq sequence pg_dist_node_nodeid_seq sequence pg_dist_placement_placementid_seq sequence pg_dist_shardid_seq table citus.pg_dist_object table columnar.chunk table columnar.chunk_group table columnar.options table columnar.stripe table pg_dist_authinfo table pg_dist_colocation table pg_dist_local_group table pg_dist_node table pg_dist_node_metadata table pg_dist_partition table pg_dist_placement table pg_dist_poolinfo table pg_dist_rebalance_strategy table pg_dist_shard table pg_dist_transaction type citus_copy_format type citus.distribution_type type citus.shard_transfer_mode type noderole view citus_dist_stat_activity view citus_lock_waits view citus_shard_indexes_on_worker view citus_shards view citus_shards_on_worker view citus_stat_statements view citus_tables view citus_worker_stat_activity view pg_dist_shard_placement view time_partitions(246 rows)集群配置协调节点新增工作节点管理操作仅仅在协调节点(cn)上操作:-Citus

[postgres@lhrpgcituscn80 ~]$ psql -d lhrdbpsql (13.3)Type 'help' for help.lhrdb=# -- 节点可以是ip或者dns nameSELECT * from master_add_node('172.72.6.81', 5432);SELECT * from master_add_node('172.72.6.82', 5432);SELECT * from master_add_node('172.72.6.83', 5432);SELECT * from master_add_node('172.72.6.84', 5432);-- 查看工作节点:lhrdb=# SELECT * FROM master_get_active_worker_nodes(); node_name | node_port-------------+----------- 172.72.6.81 | 5432 172.72.6.83 | 5432 172.72.6.84 | 5432 172.72.6.82 | 5432(4 rows)lhrdb=# select * from pg_dist_node; nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced | shouldhaveshards--------+---------+-------------+----------+----------+-------------+----------+----------+-------------+----------------+------------------ 1 | 1 | 172.72.6.81 | 5432 | default | f | t | primary | default | f | t 2 | 2 | 172.72.6.82 | 5432 | default | f | t | primary | default | f | t 3 | 3 | 172.72.6.83 | 5432 | default | f | t | primary | default | f | t 4 | 4 | 172.72.6.84 | 5432 | default | f | t | primary | default | f | t(4 rows)创建分片表lhrdb=# create table test(id int primary key ,name varchar);#配置分片策略#设置分片数,4个主机,设置分片4,每个主机一个分片lhrdb=# set citus.shard_count=4;# 配置副本数lhrdb=# set citus.shard_replication_factor=2;lhrdb=# SELECT create_distributed_table('test', 'id', 'hash');lhrdb=# insert into test select id,md5(random()::text) from generate_series(1,500) as id;# 查看分片分布lhrdb=# select * from citus_tables; table_name | citus_table_type | distribution_column | colocation_id | table_size | shard_count | table_owner | access_method------------+------------------+---------------------+---------------+------------+-------------+-------------+--------------- test | distributed | id | 1 | 384 kB | 4 | postgres | heap(1 row)lhrdb=# select * from master_get_table_metadata('test'); logical_relid | part_storage_type | part_method | part_key | part_replica_count | part_max_size | part_placement_policy---------------+-------------------+-------------+----------+--------------------+---------------+----------------------- 16995 | t | h | id | 2 | 1073741824 | 2(1 row)lhrdb=# select * from pg_dist_placement where shardid in (select shardid from pg_dist_shard where logicalrelid='test'::regclass); placementid | shardid | shardstate | shardlength | groupid-------------+---------+------------+-------------+--------- 1 | 102008 | 1 | 0 | 1 2 | 102008 | 1 | 0 | 2 3 | 102009 | 1 | 0 | 2 4 | 102009 | 1 | 0 | 3 5 | 102010 | 1 | 0 | 3 6 | 102010 | 1 | 0 | 4 7 | 102011 | 1 | 0 | 4 8 | 102011 | 1 | 0 | 1(8 rows)lhrdb=# SELECT * from pg_dist_shard_placement order by shardid, placementid; shardid | shardstate | shardlength | nodename | nodeport | placementid---------+------------+-------------+-------------+----------+------------- 102008 | 1 | 0 | 172.72.6.81 | 5432 | 1 102008 | 1 | 0 | 172.72.6.82 | 5432 | 2 102009 | 1 | 0 | 172.72.6.82 | 5432 | 3 102009 | 1 | 0 | 172.72.6.83 | 5432 | 4 102010 | 1 | 0 | 172.72.6.83 | 5432 | 5 102010 | 1 | 0 | 172.72.6.84 | 5432 | 6 102011 | 1 | 0 | 172.72.6.84 | 5432 | 7 102011 | 1 | 0 | 172.72.6.81 | 5432 | 8(8 rows)lhrdb=# select count(*) from test; count------- 500(1 row)-- 查看分片表[postgres@lhrpgcitusdn84 ~]$ psql -U postgres -h 172.72.6.80 -d lhrdb -c '\dt'; List of relations Schema | Name | Type | Owner --------+------+-------+---------- public | test | table | postgres(1 row)[postgres@lhrpgcitusdn84 ~]$ psql -U postgres -h 172.72.6.81 -d lhrdb -c '\dt'; List of relations Schema | Name | Type | Owner --------+-------------+-------+---------- public | test_102008 | table | postgres public | test_102011 | table | postgres(2 rows)[postgres@lhrpgcitusdn84 ~]$ psql -U postgres -h 172.72.6.82 -d lhrdb -c '\dt'; List of relations Schema | Name | Type | Owner --------+-------------+-------+---------- public | test_102008 | table | postgres public | test_102009 | table | postgres(2 rows)[postgres@lhrpgcitusdn84 ~]$ psql -U postgres -h 172.72.6.83 -d lhrdb -c '\dt'; List of relations Schema | Name | Type | Owner --------+-------------+-------+---------- public | test_102009 | table | postgres public | test_102010 | table | postgres(2 rows)[postgres@lhrpgcitusdn84 ~]$ psql -U postgres -h 172.72.6.84 -d lhrdb -c '\dt'; List of relations Schema | Name | Type | Owner --------+-------------+-------+---------- public | test_102010 | table | postgres public | test_102011 | table | postgres(2 rows)有4个worker,所以数据分片为4,每个分片,做两个副本。-Postgres

通过分片分布,如102008分布在172.72.6.81,172.72.6.82上,同理102009分布在172.72.6.82,172.72.6.83上。假设6.81机器宕机了,集群访问102008原先是方位6.81的,现在会自动访问6.82上的102008分片。也就是说,单个数据节点故障,集群还能正常用,通过多设置副本,多个节点故障也能更强壮。-Citus

CN节点的进程:

[root@lhrpgcituscn80 /]# ps -ef|grep postpostgres 1589 0 0 10:27 ? 00:00:00 /usr/pgsql-13/bin/postgres -D /var/lib/pgsql/13/data/postgres 1590 1589 0 10:27 ? 00:00:00 postgres: logger postgres 1592 1589 0 10:27 ? 00:00:00 postgres: checkpointer postgres 1593 1589 0 10:27 ? 00:00:00 postgres: background writer postgres 1594 1589 0 10:27 ? 00:00:00 postgres: walwriter postgres 1595 1589 0 10:27 ? 00:00:00 postgres: autovacuum launcher postgres 1596 1589 0 10:27 ? 00:00:00 postgres: stats collector postgres 1597 1589 0 10:27 ? 00:00:00 postgres: logical replication launcher postgres 1641 1589 0 10:28 ? 00:00:03 postgres: Citus Maintenance Daemon: 16430/10DN节点的进程:-Postgres

[root@lhrpgcitusdn81 /]# ps -ef|grep postpostgres 8661 0 0 11:09 ? 00:00:00 /usr/pgsql-13/bin/postgres -D /var/lib/pgsql/13/data/postgres 8662 8661 0 11:09 ? 00:00:00 postgres: logger postgres 8665 8661 0 11:09 ? 00:00:00 postgres: checkpointer postgres 8666 8661 0 11:09 ? 00:00:00 postgres: background writer postgres 8667 8661 0 11:09 ? 00:00:00 postgres: walwriter postgres 8668 8661 0 11:09 ? 00:00:00 postgres: autovacuum launcher postgres 8669 8661 0 11:09 ? 00:00:00 postgres: stats collector postgres 8670 8661 0 11:09 ? 00:00:00 postgres: logical replication launcher postgres 8710 8661 0 11:10 ? 00:00:00 postgres: Citus Maintenance Daemon: 13255/10 postgres 8720 8661 0 11:10 ? 00:00:00 postgres: Citus Maintenance Daemon: 16430/10 postgres 9591 8661 0 11:25 ? 00:00:00 postgres: postgres lhrdb 172.72.6.80(58852) idlepostgres 13145 8661 0 12:27 ? 00:00:00 postgres: postgres lhrdb 172.72.6.80(58998) idle所有变量查询,可以使用tab键自动返回相关变量:-Citus

lhrdb=# set citus.citus.all_modifications_commutative citus.count_distinct_error_rate citus.enable_binary_protocol citus.enable_local_execution citus.enable_repartition_joins citus.explain_analyze_sort_method citus.local_hostname citus.log_remote_commands citus.max_cached_connection_lifetime citus.max_intermediate_result_size citus.multi_shard_modify_mode citus.node_connection_timeout citus.propagate_set_commands citus.shard_count citus.shard_placement_policy citus.task_assignment_policy citus.values_materialization_threshold citus.writable_standby_coordinator citus.coordinator_aggregation_strategy citus.defer_drop_after_shard_move citus.enable_deadlock_prevention citus.enable_local_reference_table_foreign_keys citus.explain_all_tasks citus.limit_clause_row_fetch_count citus.local_table_join_policy citus.max_adaptive_executor_pool_size citus.max_cached_conns_per_worker citus.multi_shard_commit_protocol citus.multi_task_query_log_level citus.partition_buffer_size citus.remote_task_check_interval citus.shard_max_size citus.shard_replication_factor citus.task_executor_type citus.worker_min_messages lhrdb=# set citus.shard_citus.shard_count citus.shard_max_size citus.shard_placement_policy citus.shard_replication_factor lhrdb=# show citus.shard_count ; citus.shard_count ------------------- 32(1 row)查看所有执行计划默认情况下,Citus中查看执行计划会省略大部分不同节点的相同计划,如果想查看完整的查询计划,会话设置如下:-Postgres

lhrdb=# explain select count(*) from test; QUERY PLAN------------------------------------------------------------------------------------------ Aggregate (cost=250.00..250.02 rows=1 width=8) -> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=100000 width=8) Task Count: 4 Tasks Shown: One of 4 -> Task Node: host=172.72.6.81 port=5432 dbname=lhrdb -> Aggregate (cost=2.49..2.50 rows=1 width=8) -> Seq Scan on test_102008 test (cost=0.00..2.19 rows=119 width=0)(8 rows)lhrdb=# SET citus.explain_all_tasks = 'TRUE';SETlhrdb=# explain select count(*) from test; QUERY PLAN------------------------------------------------------------------------------------------ Aggregate (cost=250.00..250.02 rows=1 width=8) -> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=100000 width=8) Task Count: 4 Tasks Shown: All -> Task Node: host=172.72.6.81 port=5432 dbname=lhrdb -> Aggregate (cost=2.49..2.50 rows=1 width=8) -> Seq Scan on test_102008 test (cost=0.00..2.19 rows=119 width=0) -> Task Node: host=172.72.6.82 port=5432 dbname=lhrdb -> Aggregate (cost=3.73..3.73 rows=1 width=8) -> Seq Scan on test_102009 test (cost=0.00..3.38 rows=138 width=0) -> Task Node: host=172.72.6.83 port=5432 dbname=lhrdb -> Aggregate (cost=2.47..2.48 rows=1 width=8) -> Seq Scan on test_102010 test (cost=0.00..2.18 rows=118 width=0) -> Task Node: host=172.72.6.84 port=5432 dbname=lhrdb -> Aggregate (cost=3.56..3.57 rows=1 width=8) -> Seq Scan on test_102011 test (cost=0.00..3.25 rows=125 width=0)(20 rows)性能测试参考:-Citus

这里,我做简单的一个压测,创建一个1000万的本地表和分片表,分别做读写测试,压测5分钟,threads=100:

-- 本地表alter system set max_connections=1000;pg_ctl restartsysbench /usr/share/sysbench/oltp_common.lua --db-driver=pgsql --pgsql-host=172.72.6.80 --pgsql-port=5432 --pgsql-user=postgres --pgsql-password=lhr --pgsql-db=lhrdb --table-size=10000000 --tables=1 --threads=100 --events=999999999 --time=300 preparesysbench /usr/share/sysbench/oltp_read_write.lua --db-driver=pgsql --pgsql-host=172.72.6.80 --pgsql-port=5432 --pgsql-user=postgres --pgsql-password=lhr --pgsql-db=lhrdb --table-size=100000 --tables=1 --threads=100 --events=999999999 --time=300 --report-interval=10 --db-ps-mode=disable --forced-shutdown=1 run-- 分片表-- 配置16个分片,2个副本set citus.shard_count=16;set citus.shard_replication_factor=2;-- 转换为分片表SELECT create_distributed_table('sbtest1', 'id', 'hash');测试过程本地表:-Postgres

[postgres@lhrpgcituscn80 ~]$ sysbench /usr/share/sysbench/oltp_common.lua --db-driver=pgsql --pgsql-host=172.72.6.80 --pgsql-port=5432 --pgsql-user=postgres --pgsql-password=lhr --pgsql-db=lhrdb --table-size=10000000 --tables=1 --threads=100 --events=999999999 --time=300 preparesysbench 1.0.17 (using system LuaJIT 2.0.4)Initializing worker threads...Creating table 'sbtest1'...Inserting 10000000 records into 'sbtest1'Creating a secondary index on 'sbtest1'...[postgres@lhrpgcituscn80 ~]$ sysbench /usr/share/sysbench/oltp_read_write.lua --db-driver=pgsql --pgsql-host=172.72.6.80 --pgsql-port=5432 --pgsql-user=postgres --pgsql-password=lhr --pgsql-db=lhrdb --table-size=100000 --tables=1 --threads=100 --events=999999999 --time=300 --report-interval=10 --db-ps-mode=disable --forced-shutdown=1 runsysbench 1.0.17 (using system LuaJIT 2.0.4)Running the test with following options:Number of threads: 100Report intermediate results every 10 second(s)Initializing random number generator from current timeForcing shutdown in 301 secondsInitializing worker threads...Threads started![ 10s ] thds: 100 tps: 2058.53 qps: 42459.02 (r/w/o: 29828.31/8252.63/4378.07) lat (ms,95%): 77.19 err/s: 64.58 reconn/s: 0.00[ 20s ] thds: 100 tps: 1943.48 qps: 39815.96 (r/w/o: 27945.47/7764.92/4105.57) lat (ms,95%): 77.19 err/s: 53.10 reconn/s: 0.00[ 30s ] thds: 100 tps: 1829.76 qps: 37566.41 (r/w/o: 26366.44/7325.95/3874.02) lat (ms,95%): 108.68 err/s: 53.70 reconn/s: 0.00[ 40s ] thds: 100 tps: 2072.87 qps: 42495.29 (r/w/o: 29829.24/8280.29/4385.75) lat (ms,95%): 74.46 err/s: 55.60 reconn/s: 0.00[ 50s ] thds: 100 tps: 2086.30 qps: 42791.05 (r/w/o: 30033.43/8337.41/4420.20) lat (ms,95%): 75.82 err/s: 58.00 reconn/s: 0.00[ 60s ] thds: 100 tps: 2017.70 qps: 41377.51 (r/w/o: 29048.81/8059.00/4269.70) lat (ms,95%): 81.48 err/s: 57.70 reconn/s: 0.00[ 70s ] thds: 100 tps: 1865.25 qps: 38286.44 (r/w/o: 26876.63/7451.91/3957.90) lat (ms,95%): 92.42 err/s: 56.70 reconn/s: 0.00[ 80s ] thds: 100 tps: 1828.59 qps: 37540.92 (r/w/o: 26361.20/7307.47/3872.26) lat (ms,95%): 73.13 err/s: 52.79 reconn/s: 0.00[ 90s ] thds: 100 tps: 1921.21 qps: 39479.24 (r/w/o: 27713.54/7691.54/4074.15) lat (ms,95%): 84.47 err/s: 59.31 reconn/s: 0.00[ 100s ] thds: 100 tps: 2134.06 qps: 43828.35 (r/w/o: 30767.48/8538.54/4522.33) lat (ms,95%): 73.13 err/s: 63.90 reconn/s: 0.00[ 110s ] thds: 100 tps: 1305.49 qps: 26910.18 (r/w/o: 18892.82/5241.78/2775.59) lat (ms,95%): 104.84 err/s: 41.60 reconn/s: 0.00[ 120s ] thds: 100 tps: 1997.63 qps: 41037.51 (r/w/o: 28815.43/7980.83/4241.26) lat (ms,95%): 89.16 err/s: 62.90 reconn/s: 0.00[ 130s ] thds: 100 tps: 2062.18 qps: 42315.11 (r/w/o: 29703.23/8244.11/4367.77) lat (ms,95%): 80.03 err/s: 61.10 reconn/s: 0.00[ 140s ] thds: 100 tps: 1705.73 qps: 34983.78 (r/w/o: 24558.25/6815.21/3610.32) lat (ms,95%): 99.33 err/s: 48.69 reconn/s: 0.00[ 150s ] thds: 100 tps: 1689.21 qps: 34688.18 (r/w/o: 24355.48/6750.55/3582.15) lat (ms,95%): 89.16 err/s: 48.21 reconn/s: 0.00[ 160s ] thds: 100 tps: 1886.56 qps: 38698.17 (r/w/o: 27155.82/7537.53/4004.82) lat (ms,95%): 90.78 err/s: 56.20 reconn/s: 0.00[ 170s ] thds: 100 tps: 1928.81 qps: 39732.12 (r/w/o: 27913.78/7717.92/4100.41) lat (ms,95%): 87.56 err/s: 61.20 reconn/s: 0.00[ 180s ] thds: 100 tps: 2000.75 qps: 41118.15 (r/w/o: 28863.46/8007.89/4246.79) lat (ms,95%): 82.96 err/s: 62.50 reconn/s: 0.00[ 190s ] thds: 100 tps: 2015.51 qps: 41414.15 (r/w/o: 29080.08/8059.95/4274.13) lat (ms,95%): 82.96 err/s: 61.70 reconn/s: 0.00[ 200s ] thds: 100 tps: 2035.94 qps: 41807.51 (r/w/o: 29344.44/8149.18/4313.89) lat (ms,95%): 81.48 err/s: 59.90 reconn/s: 0.00[ 210s ] thds: 100 tps: 1479.80 qps: 30401.46 (r/w/o: 21349.37/5911.79/3140.30) lat (ms,95%): 155.80 err/s: 44.70 reconn/s: 0.00[ 220s ] thds: 100 tps: 1889.21 qps: 38913.33 (r/w/o: 27323.68/7575.64/4014.01) lat (ms,95%): 89.16 err/s: 61.80 reconn/s: 0.00[ 230s ] thds: 100 tps: 1990.00 qps: 40848.46 (r/w/o: 28681.08/7950.48/4216.90) lat (ms,95%): 84.47 err/s: 58.80 reconn/s: 0.00[ 240s ] thds: 100 tps: 2036.04 qps: 41779.80 (r/w/o: 29321.86/8146.27/4311.68) lat (ms,95%): 81.48 err/s: 59.70 reconn/s: 0.00[ 250s ] thds: 100 tps: 1710.84 qps: 35211.32 (r/w/o: 24722.28/6850.06/3638.98) lat (ms,95%): 95.81 err/s: 53.20 reconn/s: 0.00[ 260s ] thds: 100 tps: 1956.01 qps: 40131.82 (r/w/o: 28176.76/7813.74/4141.32) lat (ms,95%): 90.78 err/s: 57.90 reconn/s: 0.00[ 270s ] thds: 100 tps: 1948.48 qps: 40056.11 (r/w/o: 28122.56/7796.60/4136.95) lat (ms,95%): 86.00 err/s: 60.30 reconn/s: 0.00[ 280s ] thds: 100 tps: 1983.93 qps: 40732.06 (r/w/o: 28594.99/7927.81/4209.26) lat (ms,95%): 80.03 err/s: 58.60 reconn/s: 0.00[ 290s ] thds: 100 tps: 1992.00 qps: 40948.79 (r/w/o: 28743.49/7980.10/4225.20) lat (ms,95%): 82.96 err/s: 58.70 reconn/s: 0.00[ 300s ] thds: 100 tps: 2014.59 qps: 41323.30 (r/w/o: 29017.03/8036.08/4270.19) lat (ms,95%): 81.48 err/s: 60.40 reconn/s: 0.00SQL statistics: queries performed: read: 8275582 write: 2295354 other: 1216960 total: 11787896 transactions: 573971 (1910.48 per sec.) queries: 11787896 (39236.46 per sec.) ignored errors: 17142 (57.06 per sec.) reconnects: 0 (0.00 per sec.)General statistics: total time: 300.4295s total number of events: 573971Latency (ms): min: 8.06 avg: 52.30 max: 2816.03 95th percentile: 86.00 sum: 30017266.14Threads fairness: events (avg/stddev): 5739.7100/129.92 execution time (avg/stddev): 300.1727/0.15分片表:-Citus

lhrdb=# set citus.shard_count=16;SETlhrdb=#lhrdb=# set citus.shard_replication_factor=2;SETlhrdb=#lhrdb=# SELECT create_distributed_table('sbtest1', 'id', 'hash');NOTICE: Copying data from local table...NOTICE: copying the data has completedDETAIL: The local data in the table is no longer visible, but is still on disk.HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$public.sbtest1$$) create_distributed_table--------------------------(1 row)lhrdb=# SELECT truncate_local_data_after_distributing_table($$public.sbtest1$$); truncate_local_data_after_distributing_table----------------------------------------------(1 row)[postgres@lhrpgcituscn80 ~]$ sysbench /usr/share/sysbench/oltp_read_write.lua --db-driver=pgsql --pgsql-host=172.72.6.80 --pgsql-port=5432 --pgsql-user=postgres --pgsql-password=lhr --pgsql-db=lhrdb --table-size=100000 --tables=1 --threads=100 --events=999999999 --time=300 --report-interval=10 --db-ps-mode=disable --forced-shutdown=1 runsysbench 1.0.17 (using system LuaJIT 2.0.4)Running the test with following options:Number of threads: 100Report intermediate results every 10 second(s)Initializing random number generator from current timeForcing shutdown in 301 secondsInitializing worker threads...Threads started![ 10s ] thds: 100 tps: 2.00 qps: 300.64 (r/w/o: 265.85/13.69/21.09) lat (ms,95%): 8955.74 err/s: 7.00 reconn/s: 0.00[ 20s ] thds: 100 tps: 0.20 qps: 155.41 (r/w/o: 141.41/3.70/10.30) lat (ms,95%): 15650.42 err/s: 9.90 reconn/s: 0.00[ 30s ] thds: 100 tps: 0.80 qps: 161.90 (r/w/o: 143.80/6.90/11.20) lat (ms,95%): 29926.15 err/s: 9.60 reconn/s: 0.00[ 40s ] thds: 100 tps: 2.20 qps: 201.60 (r/w/o: 173.60/13.50/14.50) lat (ms,95%): 36481.47 err/s: 10.10 reconn/s: 0.00[ 50s ] thds: 100 tps: 1.10 qps: 162.70 (r/w/o: 143.10/8.30/11.30) lat (ms,95%): 48662.51 err/s: 9.10 reconn/s: 0.00[ 60s ] thds: 100 tps: 2.10 qps: 205.50 (r/w/o: 177.50/13.20/14.80) lat (ms,95%): 58263.41 err/s: 10.60 reconn/s: 0.00[ 70s ] thds: 100 tps: 0.80 qps: 138.40 (r/w/o: 122.20/6.70/9.50) lat (ms,95%): 69758.52 err/s: 7.90 reconn/s: 0.00[ 80s ] thds: 100 tps: 0.40 qps: 171.60 (r/w/o: 155.40/4.70/11.50) lat (ms,95%): 74968.33 err/s: 10.70 reconn/s: 0.00[ 90s ] thds: 100 tps: 1.50 qps: 200.50 (r/w/o: 176.40/10.00/14.10) lat (ms,95%): 89759.24 err/s: 11.10 reconn/s: 0.00[ 100s ] thds: 100 tps: 0.90 qps: 114.30 (r/w/o: 99.00/7.30/8.00) lat (ms,95%): 96462.77 err/s: 6.20 reconn/s: 0.00[ 110s ] thds: 100 tps: 1.60 qps: 167.20 (r/w/o: 144.40/10.90/11.90) lat (ms,95%): 100000.00 err/s: 8.70 reconn/s: 0.00[ 120s ] thds: 100 tps: 2.10 qps: 181.30 (r/w/o: 155.30/12.80/13.20) lat (ms,95%): 100000.00 err/s: 9.00 reconn/s: 0.00[ 130s ] thds: 100 tps: 1.40 qps: 211.00 (r/w/o: 185.10/11.30/14.60) lat (ms,95%): 100000.00 err/s: 11.80 reconn/s: 0.00[ 140s ] thds: 100 tps: 0.20 qps: 130.20 (r/w/o: 117.60/4.00/8.60) lat (ms,95%): 43679.10 err/s: 8.20 reconn/s: 0.00[ 150s ] thds: 100 tps: 1.90 qps: 192.00 (r/w/o: 165.20/13.10/13.70) lat (ms,95%): 100000.00 err/s: 9.90 reconn/s: 0.00[ 160s ] thds: 100 tps: 1.70 qps: 147.40 (r/w/o: 126.00/10.70/10.70) lat (ms,95%): 100000.00 err/s: 7.30 reconn/s: 0.00[ 170s ] thds: 100 tps: 1.80 qps: 172.50 (r/w/o: 148.40/11.70/12.40) lat (ms,95%): 100000.00 err/s: 8.80 reconn/s: 0.00[ 180s ] thds: 100 tps: 0.70 qps: 156.30 (r/w/o: 138.60/7.10/10.60) lat (ms,95%): 69758.52 err/s: 9.20 reconn/s: 0.00[ 190s ] thds: 100 tps: 0.90 qps: 143.80 (r/w/o: 127.00/6.80/10.00) lat (ms,95%): 100000.00 err/s: 8.20 reconn/s: 0.00[ 200s ] thds: 100 tps: 0.80 qps: 156.70 (r/w/o: 138.80/7.20/10.70) lat (ms,95%): 88157.45 err/s: 9.10 reconn/s: 0.00[ 210s ] thds: 100 tps: 1.70 qps: 150.90 (r/w/o: 128.60/11.40/10.90) lat (ms,95%): 100000.00 err/s: 7.50 reconn/s: 0.00[ 220s ] thds: 100 tps: 1.30 qps: 135.20 (r/w/o: 116.60/9.00/9.60) lat (ms,95%): 100000.00 err/s: 7.00 reconn/s: 0.00[ 230s ] thds: 100 tps: 0.50 qps: 85.40 (r/w/o: 74.20/5.40/5.80) lat (ms,95%): 100000.00 err/s: 4.80 reconn/s: 0.00[ 240s ] thds: 100 tps: 1.50 qps: 171.20 (r/w/o: 148.40/10.70/12.10) lat (ms,95%): 100000.00 err/s: 9.10 reconn/s: 0.00[ 250s ] thds: 100 tps: 1.30 qps: 195.00 (r/w/o: 170.80/10.70/13.50) lat (ms,95%): 100000.00 err/s: 10.90 reconn/s: 0.00[ 260s ] thds: 100 tps: 0.60 qps: 104.40 (r/w/o: 92.40/4.80/7.20) lat (ms,95%): 100000.00 err/s: 6.00 reconn/s: 0.00[ 270s ] thds: 100 tps: 0.80 qps: 133.60 (r/w/o: 117.60/6.80/9.20) lat (ms,95%): 100000.00 err/s: 7.60 reconn/s: 0.00[ 280s ] thds: 100 tps: 2.40 qps: 195.70 (r/w/o: 166.60/14.80/14.30) lat (ms,95%): 100000.00 err/s: 9.50 reconn/s: 0.00[ 290s ] thds: 100 tps: 1.70 qps: 106.80 (r/w/o: 89.20/9.50/8.10) lat (ms,95%): 100000.00 err/s: 4.70 reconn/s: 0.00[ 300s ] thds: 100 tps: 0.90 qps: 190.60 (r/w/o: 168.00/9.70/12.90) lat (ms,95%): 100000.00 err/s: 11.10 reconn/s: 0.00FATAL: The --max-time limit has expired, forcing shutdown...SQL statistics: queries performed: read: 43336 write: 2772 other: 3475 total: 49583 transactions: 378 (1.26 per sec.) queries: 49583 (164.73 per sec.) ignored errors: 2618 (8.70 per sec.) reconnects: 0 (0.00 per sec.)Number of unfinished transactions on forced shutdown: 100General statistics: total time: 301.0003s total number of events: 378Latency (ms): min: 101.71 avg: 59858.70 max: 282897.47 95th percentile: 100000.00 sum: 22626588.73Threads fairness: events (avg/stddev): 4.7800/1.83 execution time (avg/stddev): 226.2659/74.67结果环境-Postgres

测试类型

TPS

QPS

本地表

读写

1910.48

39236.46

分片表

读写

1.26

164.73

分片表比本地表的性能反而更差了,, 这个估计和服务器有关系,因为我这套系统是部署再同一台主机上的。算了,等后续有服务器的时候,再测试一次。