From f6ef033a77811c5b894ac4f8de86c2b4db9868ad Mon Sep 17 00:00:00 2001 From: MingXu Date: Mon, 5 Dec 2016 19:54:59 +0800 Subject: [PATCH] Add following parameters: -n means no partion info in DDLs;\n -f means taking first column as distribution key;\n -s specifies the target schema;\n -b specifies the buffer size in KB used to sending copy data to target db, the default is 0 --- dbsync/Makefile | 14 +++- dbsync/dbsync-mysql2pgsql.c | 29 ++++++- dbsync/mysql2pgsql.c | 148 ++++++++++++++++++++++++++---------- 3 files changed, 143 insertions(+), 48 deletions(-) diff --git a/dbsync/Makefile b/dbsync/Makefile index b8b4ee5..e48a431 100755 --- a/dbsync/Makefile +++ b/dbsync/Makefile @@ -21,13 +21,19 @@ pgsql_lib_dir := $(shell $(PG_CONFIG) --libdir) PGXS := $(shell $(PG_CONFIG) --pgxs) include $(PGXS) -RPATH_LDFLAGS='-Wl,-rpath,$$ORIGIN,-rpath,$(mysql_lib_dir),-rpath,$(pgsql_lib_dir)' +RPATH_LDFLAGS='-Wl,-rpath,$$ORIGIN,-rpath,$$ORIGIN/lib,-rpath,$$ORIGIN/../lib,-rpath,$(mysql_lib_dir),-rpath,$(pgsql_lib_dir)' export RPATH_LDFLAGS all: demo.o dbsync-pgsql2pgsql.o mysql2pgsql.o dbsync-mysql2pgsql.o readcfg.o - $(CXX) $(CFLAGS) demo.o ali_recvlogical.so $(libpq_pgport) $(RPATH_LDFLAGS) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o demo - $(CXX) $(CFLAGS) readcfg.o ini.o dbsync-pgsql2pgsql.o ali_recvlogical.so $(libpq_pgport) $(RPATH_LDFLAGS) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o pgsql2pgsql - $(CXX) $(CFLAGS) readcfg.o ini.o mysql2pgsql.o dbsync-mysql2pgsql.o misc.o stringinfo.o $(libpq_pgport) $(RPATH_LDFLAGS) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -L$(mysql_lib_dir) -lmysqlclient -o mysql2pgsql + $(CXX) $(CFLAGS) demo.o ali_recvlogical.so $(libpq_pgport) $(RPATH_LDFLAGS) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o demo + $(CXX) $(CFLAGS) readcfg.o ini.o dbsync-pgsql2pgsql.o ali_recvlogical.so $(libpq_pgport) $(RPATH_LDFLAGS) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o pgsql2pgsql + $(CXX) $(CFLAGS) readcfg.o ini.o mysql2pgsql.o dbsync-mysql2pgsql.o misc.o stringinfo.o $(libpq_pgport) $(RPATH_LDFLAGS) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -L$(mysql_lib_dir) -lmysqlclient -o mysql2pgsql clean: rm -rf *.o pgsql2pgsql mysql2pgsql demo + +pkg_mysql2pg: + mkdir -p install + mkdir -p install/bin + mkdir -p install/lib + cp -fr mysql2pgsql install/bin diff --git a/dbsync/dbsync-mysql2pgsql.c b/dbsync/dbsync-mysql2pgsql.c index c579a4e..c438f94 100644 --- a/dbsync/dbsync-mysql2pgsql.c +++ b/dbsync/dbsync-mysql2pgsql.c @@ -8,6 +8,9 @@ #include extern bool get_ddl_only; +extern bool simple_wo_part; +extern bool first_col_as_dist_key; +extern int buffer_size; static int load_table_list_file(const char *filename, char*** p_tables, char*** p_queries); @@ -22,6 +25,7 @@ main(int argc, char **argv) char *sport = NULL; //char *tabname = NULL; int res_getopt = 0; + char *target_schema = NULL; char *table_list_file = NULL; char **tables = NULL, **queries = NULL; @@ -53,7 +57,7 @@ main(int argc, char **argv) src.port = atoi(sport); - while ((res_getopt = getopt(argc, argv, ":l:j:d")) != -1) + while ((res_getopt = getopt(argc, argv, ":l:j:dnfhs:b:")) != -1) { switch (res_getopt) { @@ -63,17 +67,34 @@ main(int argc, char **argv) case 'j': num_thread = atoi(optarg); break; + case 'b': + buffer_size = 1024 * atoi(optarg); + break; + case 's': + target_schema = optarg; + break; case ':': - fprintf(stderr, "No value specified for -%c", optopt); + fprintf(stderr, "No value specified for -%c\n", optopt); break; case 'd': get_ddl_only = true; break; + case 'n': + simple_wo_part = true; + break; + case 'f': + first_col_as_dist_key = true; + break; + case 'h': + fprintf(stderr, "Usage: -l -j -d -n -f -s -b -h\n"); + fprintf(stderr, " -l specifies a file with table listed;\n -j specifies number of threads to do the job;\n -d means get DDL only without fetching data;\n -n means no partion info in DDLs;\n -f means taking first column as distribution key;\n -s specifies the target schema;\n -b specifies the buffer size in KB used to sending copy data to target db, the default is 0"); + return 0; case '?': fprintf(stderr, "Unsupported option: %c", optopt); break; default: fprintf(stderr, "Parameter parsing error: %c", res_getopt); + return -1; } } @@ -94,7 +115,7 @@ main(int argc, char **argv) if (get_ddl_only) num_thread = 1; - return mysql2pgsql_sync_main(desc , num_thread, &src); + return mysql2pgsql_sync_main(desc , num_thread, &src, target_schema); } @@ -212,7 +233,7 @@ int load_table_list_file(const char *filename, char*** p_tables, char*** p_queri table_array[cur_table] = table_begin; query_array[cur_table] = query_begin; cur_table++; - fprintf(stderr, "Adding table: %s\n", table_begin); + fprintf(stderr, "-- Adding table: %s\n", table_begin); } table_begin = p + 1; diff --git a/dbsync/mysql2pgsql.c b/dbsync/mysql2pgsql.c index 7f07250..bdb0a9c 100644 --- a/dbsync/mysql2pgsql.c +++ b/dbsync/mysql2pgsql.c @@ -31,6 +31,10 @@ static volatile bool time_to_abort = false; bool get_ddl_only = false; +bool simple_wo_part = false; +bool first_col_as_dist_key = false; +/* buffer for sending copy data to target, unit is Byte */ +int buffer_size = 0; #define STMT_SHOW_TABLES "show full tables in `%s` where table_type='BASE TABLE'" @@ -38,7 +42,6 @@ bool get_ddl_only = false; static MYSQL *connect_to_mysql(mysql_conn_info* hd); static void *mysql2pgsql_copy_data(void *arg); -static Oid *fetch_colmum_info(char *tabname, MYSQL_RES *my_res, bool is_target_gp); static void quote_literal_local_withoid(StringInfo s, const char *rawstr, Oid type, PQExpBuffer buffer); static int setup_connection_from_mysql(PGconn *conn); static void sigint_handler(int signum); @@ -52,19 +55,25 @@ sigint_handler(int signum) #endif static Oid * -fetch_colmum_info(char *tabname, MYSQL_RES *my_res, bool is_target_gp) +fetch_colmum_info(char *schemaname, char *tabname, MYSQL_RES *my_res, bool is_target_gp) { MYSQL_FIELD *field; int col_num = 0; Oid *col_type = NULL; int i = 0; PQExpBuffer ddl; - bool first = true; + /* Mysql column name len should be no more than 64 */ + char first_col_name[256] = {0}; ddl = createPQExpBuffer(); - appendPQExpBufferStr(ddl, "Reference DDL to create the target table:\n"); - appendPQExpBuffer(ddl, "CREATE TABLE %s%s (", - is_target_gp ? "" : "IF NOT EXISTS ", tabname); + + if (!get_ddl_only) + { + appendPQExpBufferStr(ddl, "-- Reference DDL to create the target table:\n"); + } + + appendPQExpBuffer(ddl, "CREATE TABLE %s%s%s%s (", + is_target_gp ? "" : "IF NOT EXISTS ", schemaname ? schemaname : "", schemaname ? "." : "", tabname); col_num = mysql_num_fields(my_res); col_type = palloc0(sizeof(Oid) * col_num); @@ -72,9 +81,8 @@ fetch_colmum_info(char *tabname, MYSQL_RES *my_res, bool is_target_gp) { int type; - if (first) + if (i == 0) { - first = false; } else { @@ -82,7 +90,13 @@ fetch_colmum_info(char *tabname, MYSQL_RES *my_res, bool is_target_gp) } field = mysql_fetch_field(my_res); - type = field->type; + + if (i == 0) + { + sprintf(first_col_name, "%s", field->org_name); + } + + type = field->type; switch(type) { case MYSQL_TYPE_VARCHAR: @@ -139,6 +153,10 @@ fetch_colmum_info(char *tabname, MYSQL_RES *my_res, bool is_target_gp) appendPQExpBuffer(ddl, "%s %s", field->org_name, "numeric"); col_type[i] = NUMERICOID; break; + case MYSQL_TYPE_INT24: + appendPQExpBuffer(ddl, "%s %s", field->org_name, "int4"); + col_type[i] = INT4OID; + break; default: fprintf(stderr, "unsupported col %s type %d\n", field->org_name, type); @@ -146,9 +164,19 @@ fetch_colmum_info(char *tabname, MYSQL_RES *my_res, bool is_target_gp) } } - appendPQExpBuffer(ddl, ")%s;\n\n", (is_target_gp ? " with (APPENDONLY=true, ORIENTATION=column, CHECKSUM=true, OIDS=false) DISTRIBUTED BY ()" : "")); + if (is_target_gp) + { + appendPQExpBuffer(ddl, ") with (APPENDONLY=true, ORIENTATION=column, COMPRESSTYPE=zlib, COMPRESSLEVEL=1, BLOCKSIZE=1048576, OIDS=false) DISTRIBUTED BY (%s)", first_col_as_dist_key ? first_col_name : ""); + + if (!simple_wo_part) + appendPQExpBuffer(ddl," PARTITION BY RANGE () (START (date '') INCLUSIVE END (date '') EXCLUSIVE EVERY (INTERVAL '<1 month>' ))"); + } + else + { + appendPQExpBuffer(ddl, ")"); + } - fprintf(stderr, "%s", ddl->data); + fprintf(stderr, "%s;\n\n", ddl->data); destroyPQExpBuffer(ddl); @@ -228,7 +256,7 @@ connect_to_mysql(mysql_conn_info* hd) * Entry point for mysql2pgsql */ int -mysql2pgsql_sync_main(char *desc, int nthread, mysql_conn_info *hd) +mysql2pgsql_sync_main(char *desc, int nthread, mysql_conn_info *hd, char* target_schema) { int i = 0; Thread_hd th_hd; @@ -311,15 +339,16 @@ mysql2pgsql_sync_main(char *desc, int nthread, mysql_conn_info *hd) { row = mysql_fetch_row(my_res); th_hd.task[i].id = i; - th_hd.task[i].schemaname = NULL; + th_hd.task[i].schemaname = target_schema; th_hd.task[i].relname = pstrdup(row[0]); + th_hd.task[i].query = NULL; th_hd.task[i].count = 0; th_hd.task[i].complete = false; /* Set the former entry's link to this entry. Last entry's next feild would remain NULL */ if (i != 0) { - th_hd.task[i-1].next = &th_hd.task[i+1]; + th_hd.task[i-1].next = &th_hd.task[i]; } } mysql_free_result(my_res); @@ -339,7 +368,7 @@ mysql2pgsql_sync_main(char *desc, int nthread, mysql_conn_info *hd) for (i = 0, p = hd->tabnames; *p != NULL; p++, i++) { th_hd.task[i].id = i; - th_hd.task[i].schemaname = NULL; + th_hd.task[i].schemaname = target_schema; th_hd.task[i].relname = *p; th_hd.task[i].query = hd->queries[i]; @@ -367,7 +396,10 @@ mysql2pgsql_sync_main(char *desc, int nthread, mysql_conn_info *hd) } pthread_mutex_init(&th_hd.t_lock, NULL); - fprintf(stderr, "starting full sync\n"); + if (!get_ddl_only) + { + fprintf(stderr, "Starting data sync\n"); + } thread = (Thread *)palloc0(sizeof(Thread) * th_hd.nth); for (i = 0; i < th_hd.nth; i++) @@ -397,8 +429,16 @@ mysql2pgsql_sync_main(char *desc, int nthread, mysql_conn_info *hd) t_count += th_hd.task[i].count; } - fprintf(stderr, "number of rows migrated: %ld (number of source tables' rows: %ld) \n", s_count, t_count); - fprintf(stderr, "full sync time cost %.3f ms\n", elapsed_msec); + if (!get_ddl_only) + { + fprintf(stderr, "Number of rows migrated: %ld (number of source tables' rows: %ld) \n", s_count, t_count); + fprintf(stderr, "Data sync time cost %.3f ms\n", elapsed_msec); + } + else + { + fprintf(stderr, "-- Number of tables: %d \n", ntask); + } + if (have_err) { fprintf(stderr, "errors occured during migration\n"); @@ -453,12 +493,13 @@ mysql2pgsql_copy_data(void *arg) query = createPQExpBuffer(); if (get_ddl_only) - fprintf(stderr, "\nReference commands to create target tables %s: \n***************\n\n", + fprintf(stderr, "\n-- Reference commands to create target tables %s: \n---------------\n\n", isgp ? "(Please choose a distribution key and replace it with for each table)" : ""); while(1) { int nlist = 0; int n_col = 0; + int row_count = 0; GETTIMEOFDAY(&before); pthread_mutex_lock(&hd->t_lock); @@ -497,18 +538,25 @@ mysql2pgsql_copy_data(void *arg) //fprintf(stderr, "relname %s, query %s \n", relname, curr->query ? curr->query : ""); if (curr && curr->query) + { appendPQExpBufferStr(query, curr->query); + } else + { appendPQExpBuffer(query, STMT_SELECT, nspname, relname); + } - if (get_ddl_only) + if (get_ddl_only) + { + appendPQExpBufferStr(query, " limit 1 "); + } + else { - appendPQExpBufferStr(query, "limit 1"); + fprintf(stderr, "Query to get source data for target table %s: %s \n", relname, query->data); } - fprintf(stderr, "Query to get source data for target table %s: %s \n", relname, query->data); ret = mysql_query(origin_conn, query->data); if (ret != 0) { @@ -516,10 +564,10 @@ mysql2pgsql_copy_data(void *arg) goto exit; } my_res = mysql_use_result(origin_conn); - column_oids = fetch_colmum_info(relname, my_res, isgp); + column_oids = fetch_colmum_info(curr->schemaname, relname, my_res, isgp); if (column_oids == NULL) { - fprintf(stderr, "get table %s column type error", relname); + fprintf(stderr, "get table %s column type error\n", relname); goto exit; } @@ -534,7 +582,8 @@ mysql2pgsql_copy_data(void *arg) n_col = mysql_num_fields(my_res); resetPQExpBuffer(query); - appendPQExpBuffer(query, "COPY %s FROM stdin DELIMITERS '|' with csv QUOTE ''''", + appendPQExpBuffer(query, "COPY %s%s%s FROM stdin DELIMITERS '|' with csv QUOTE ''''", + curr->schemaname ? PQescapeIdentifier(target_conn, curr->schemaname, strlen(curr->schemaname)) : "", curr->schemaname ? "." : "", PQescapeIdentifier(target_conn, relname, strlen(relname))); @@ -546,48 +595,67 @@ mysql2pgsql_copy_data(void *arg) goto exit; } + + resetPQExpBuffer(query); while ((row = mysql_fetch_row(my_res)) != NULL) { unsigned long *lengths; - bool first = true; - resetPQExpBuffer(query); lengths = mysql_fetch_lengths(my_res); for (i = 0; i < n_col; i++) { - if (first) - { - first = false; - } - else + if (i != 0) { appendPQExpBufferStr(query, "|"); } - if(lengths[i] != 0) + /* value of the field is NULL if it is fact NULL */ + if(lengths[i] >= 0 && row[i] != NULL) { quote_literal_local_withoid(&s_tmp, row[i], column_oids[i], query); } } + appendPQExpBufferStr(query, "\n"); + row_count++; - if (PQputCopyData(target_conn, query->data, query->len) != 1) + if (query->len >= buffer_size) { - fprintf(stderr,"writing to target table failed destination connection reported: %s", + if (PQputCopyData(target_conn, query->data, query->len) != 1) + { + fprintf(stderr,"writing to target table failed destination connection reported: %s", PQerrorMessage(target_conn)); - goto exit; + goto exit; + } + + /* Reset buffer for next use */ + resetPQExpBuffer(query); } - args->count++; - curr->count++; - if (time_to_abort) { + args->count = row_count; + curr->count = row_count; fprintf(stderr, "receive shutdown sigint\n"); goto exit; } } + if (query->len > 0) + { + if (PQputCopyData(target_conn, query->data, query->len) != 1) + { + fprintf(stderr,"writing to target table failed destination connection reported: %s", + PQerrorMessage(target_conn)); + goto exit; + } + + resetPQExpBuffer(query); + } + + args->count = row_count; + curr->count = row_count; + /* Send local finish */ if (PQputCopyEnd(target_conn, NULL) != 1) { @@ -620,7 +688,7 @@ mysql2pgsql_copy_data(void *arg) args->all_ok = true; if (get_ddl_only) - fprintf(stderr, "***************\n\n"); + fprintf(stderr, "---------------\n\n"); exit: