From f2eb11a979a350140aef957e68673d3f89f192f2 Mon Sep 17 00:00:00 2001 From: Pavlo Mishchenko Date: Wed, 21 Feb 2024 16:57:38 +0200 Subject: [PATCH] Rewrite insert statement in batches Summary: If the paramset size is too big, we get the error `Query cannot be completed because of Parameter Array capacity of 1048576`. In this diff we limit the number of parameters sent with one INSERT statement. Test Plan: https://app.circleci.com/pipelines/github/memsql/singlestore-odbc-connector/2986/workflows/fb4486df-8052-45cd-a571-766d723774c1 Reviewers: amakarovych-ua Reviewed By: amakarovych-ua Subscribers: engineering-list JIRA Issues: PLAT-6976 Differential Revision: https://grizzly.internal.memcompute.com/D66924 --- ma_statement.c | 164 +++++++++++++++++++++++++------------------------ 1 file changed, 84 insertions(+), 80 deletions(-) diff --git a/ma_statement.c b/ma_statement.c index ad3ff51e..f78bc49f 100644 --- a/ma_statement.c +++ b/ma_statement.c @@ -20,6 +20,11 @@ #include #define MADB_MIN_QUERY_LEN 5 +// SingleStore imposes a limit on the number of parameters sent in one statement. It is defined by +// the global var parametrizer_query_max_params which defaults to 1048576.q +// TODO: PLAT-6999 respect parametrizer_query_max_params value +#define S2_MAX_PARAMS 1024000 + struct st_ma_stmt_methods MADB_StmtMethods; /* declared at the end of file */ @@ -1898,7 +1903,7 @@ SQLRETURN MADB_StmtExecute(MADB_Stmt *Stmt, BOOL ExecDirect) unsigned int ErrorCount= 0; unsigned int StatementNr; unsigned int ParamOffset= 0; /* for multi statements */ - SQLULEN j; + SQLULEN j, batchNum; /* For multistatement direct execution */ char *CurQuery= Stmt->Query.RefinedText, *QueriesEnd= Stmt->Query.RefinedText + Stmt->Query.RefinedLength; @@ -1945,12 +1950,6 @@ SQLRETURN MADB_StmtExecute(MADB_Stmt *Stmt, BOOL ExecDirect) continue; /* bad statement - skip it */ } - MADB_DynString all_params_query; - MADB_InitDynamicString(&all_params_query, "", 1024, 1024); - - MADB_DynString combined_insert_query; - MADB_InitDynamicString(&combined_insert_query, "", 1024, 1024); - int numParamRows = Stmt->ParamCount > 0 ? Stmt->Apd->Header.ArraySize : 1; // We want to send queries of the form INSERT INTO [] VALUES (?,...) // as one query with a list of VALUES tuples, e.g. INSERT INTO t VALUES (1, 'i'), (2, 'j'), (3, 'k') @@ -1963,37 +1962,86 @@ SQLRETURN MADB_StmtExecute(MADB_Stmt *Stmt, BOOL ExecDirect) !MADB_FindToken(&Stmt->Query, "ON DUPLICATE KEY UPDATE") && \ !QUERY_IS_MULTISTMT(Stmt->Query); - // when combineQueries is true and rewriteInsert is false, a query of the form - // INSERT INTO [] VALUES (?,...) will be sent as a multi-statement of the form - // INSERT INTO t VALUES (1, 'i'); INSERT INTO t VALUES (2, 'j'); INSERT INTO t VALUES (3, 'k') - // TODO: combineQueries can be true when Stmt->Query.ReturnsResult is true as well, needs testing - my_bool combineQueries = numParamRows > 1 && !(Stmt->Query.ReturnsResult) && DSN_OPTION(Stmt->Connection, MADB_OPT_FLAG_MULTI_STATEMENTS); - unsigned int valuesOffest = MADB_FindToken(&Stmt->Query, "VALUES") + 6 /* strlen(VALUES) */; - if (rewriteInsert) MADB_DynstrAppendMem(&combined_insert_query, MADB_Token(&Stmt->Query, 0), valuesOffest); - for (j = 0; j < numParamRows; ++j) + if (rewriteInsert) { - MADB_DynString final_query; - MADB_InitDynamicString(&final_query, "", 1024, 1024); - - const CspsControlFlowResult InitParamsRes - = CspsInitStatementParams(Stmt, &final_query, &ErrorCount, &ret, CurQuery, ParamOffset, j); - switch(InitParamsRes) { - case CCFR_OK: - break; - case CCFR_CONTINUE: - MADB_DynstrFree(&final_query); - continue; - case CCFR_ERROR: - MADB_DynstrFree(&final_query); - MADB_DynstrFree(&all_params_query); - MADB_DynstrFree(&combined_insert_query); - goto end; - default: - assert(0); - break; + long rowsInBatch = S2_MAX_PARAMS / MADB_STMT_PARAM_COUNT(Stmt); + unsigned int valuesOffest = MADB_FindToken(&Stmt->Query, "VALUES") + 6 /* strlen(VALUES) */; + // we have to send not more than S2_MAX_PARAMS parameters in one query + for (batchNum = 0; batchNum < (numParamRows + rowsInBatch - 1) / rowsInBatch; ++batchNum) + { + MADB_DynString combined_insert_query; + MADB_InitDynamicString(&combined_insert_query, "", 1024, 1024); + MADB_DynstrAppendMem(&combined_insert_query, MADB_Token(&Stmt->Query, 0), valuesOffest); + for (j = batchNum * rowsInBatch; j < numParamRows && j < batchNum * rowsInBatch + rowsInBatch; ++j) + { + MADB_DynString final_query; + MADB_InitDynamicString(&final_query, "", 1024, 1024); + + const CspsControlFlowResult InitParamsRes + = CspsInitStatementParams(Stmt, &final_query, &ErrorCount, &ret, CurQuery, ParamOffset, j); + switch(InitParamsRes) { + case CCFR_OK: + break; + case CCFR_CONTINUE: + MADB_DynstrFree(&final_query); + continue; + case CCFR_ERROR: + MADB_DynstrFree(&final_query); + MADB_DynstrFree(&combined_insert_query); + goto end; + default: + assert(0); + break; + } + if (final_query.length > 0) + { + if (j % rowsInBatch != 0) MADB_DynstrAppend(&combined_insert_query, ","); + MADB_DynstrAppend(&combined_insert_query, final_query.str + valuesOffest); + } + MADB_DynstrFree(&final_query); } - if (!combineQueries && !rewriteInsert) + MDBUG_C_PRINT(Stmt->Connection, "Running INSERT statement with %d rows", j - batchNum * rowsInBatch); + ret = CspsRunStatementQuery(Stmt, &combined_insert_query, &ErrorCount, ParamOffset); + if (Stmt->Ipd->Header.ArrayStatusPtr) + { + for (j = batchNum * rowsInBatch; j < numParamRows && j < batchNum * rowsInBatch + rowsInBatch; ++j) + { + if (!Stmt->Apd->Header.ArrayStatusPtr || Stmt->Apd->Header.ArrayStatusPtr[j] != SQL_PARAM_IGNORE) + { + Stmt->Ipd->Header.ArrayStatusPtr[j] = SQL_SUCCEEDED(ret) ? SQL_PARAM_SUCCESS : SQL_PARAM_ERROR; + } + } + } + MADB_DynstrFree(&combined_insert_query); + } + Stmt->AffectedRows = mysql_stmt_affected_rows(Stmt->stmt); + if (ret == SQL_ERROR) + { + ErrorCount = numParamRows; + } + } + else + { + for (j = 0; j < numParamRows; ++j) { + MADB_DynString final_query; + MADB_InitDynamicString(&final_query, "", 1024, 1024); + + const CspsControlFlowResult InitParamsRes + = CspsInitStatementParams(Stmt, &final_query, &ErrorCount, &ret, CurQuery, ParamOffset, j); + switch(InitParamsRes) { + case CCFR_OK: + break; + case CCFR_CONTINUE: + MADB_DynstrFree(&final_query); + continue; + case CCFR_ERROR: + MADB_DynstrFree(&final_query); + goto end; + default: + assert(0); + break; + } ret = CspsRunStatementQuery(Stmt, &final_query, &ErrorCount, ParamOffset); if (Stmt->Ipd->Header.ArrayStatusPtr) @@ -2011,54 +2059,10 @@ SQLRETURN MADB_StmtExecute(MADB_Stmt *Stmt, BOOL ExecDirect) SQL_PARAM_DIAG_UNAVAILABLE; } } + MADB_DynstrFree(&final_query); } - else - { - if (rewriteInsert && final_query.length > 0) - { - if (j > 0) MADB_DynstrAppend(&combined_insert_query, ","); - MADB_DynstrAppend(&combined_insert_query, final_query.str + valuesOffest); - } - else if (combineQueries) - { - MADB_DynstrAppend(&all_params_query, final_query.str); - MADB_DynstrAppend(&all_params_query, ";"); - } - } - MADB_DynstrFree(&final_query); - } - if (rewriteInsert) - { - MDBUG_C_PRINT(Stmt->Connection, "Running INSERT statement with %d rows", numParamRows); - ret = CspsRunStatementQuery(Stmt, &combined_insert_query, &ErrorCount, ParamOffset); - CspsReceiveStatementResults(Stmt, ret); - if (Stmt->Ipd->Header.ArrayStatusPtr) - { - for (j = 0; j < numParamRows; ++j) - { - if (!Stmt->Apd->Header.ArrayStatusPtr || Stmt->Apd->Header.ArrayStatusPtr[j] != SQL_PARAM_IGNORE) - { - Stmt->Ipd->Header.ArrayStatusPtr[j] = SQL_SUCCEEDED(ret) ? SQL_PARAM_SUCCESS : SQL_PARAM_ERROR; - } - } - } - if (ret == SQL_ERROR) - { - ErrorCount = numParamRows; - } - } - else if (combineQueries) - { - MDBUG_C_PRINT(Stmt->Connection, "Running combined query with %d entries", numParamRows); - ret = CspsRunStatementQuery(Stmt, &all_params_query, &ErrorCount, ParamOffset); - CspsReceiveStatementResultsMulti(Stmt, &ErrorCount, ret); - } - else - { CspsReceiveStatementResults(Stmt, ret); } - MADB_DynstrFree(&all_params_query); - MADB_DynstrFree(&combined_insert_query); // Move forward to the next subquery in the multistatement. if (QUERY_IS_MULTISTMT(Stmt->Query))