• 关于使用mysql_fdw无法查询长度超过64K字段的原因分析


    LightDB 23.3 版本修复无法使用mysql_fdw查询超过64K字段的问题。

    注:笔者使用的是mysql_fdw REL-2_7_0,这个bug已在REL-2_8_0中修复,我们来复盘一下造成这个问题的原因所在。

    问题复现

    1. -- mysql上创建测试表
    2. create table fdwtest(a longtext);
    3. insert into fdwtest select repeat('a',65534);
    4. insert into fdwtest select repeat('b',65535);
    5. insert into fdwtest select repeat('c',65536);
    6. insert into fdwtest select repeat('d',65537);
    7. insert into fdwtest select repeat('e',65538);
    8. mysql> select substr(a,1,10),length(a) from fdwtest;
    9. +----------------+-----------+
    10. | substr(a,1,10) | length(a) |
    11. +----------------+-----------+
    12. | aaaaaaaaaa | 65534 |
    13. | bbbbbbbbbb | 65535 |
    14. | cccccccccc | 65536 |
    15. | dddddddddd | 65537 |
    16. | eeeeeeeeee | 65538 |
    17. +----------------+-----------+
    18. 5 rows in set (0.00 sec)
    1. -- 在pg中使用mysql_fdw查询mysql表
    2. -- load extension first time after install
    3. CREATE EXTENSION mysql_fdw;
    4. -- create server object
    5. CREATE SERVER mysql_server
    6. FOREIGN DATA WRAPPER mysql_fdw
    7. OPTIONS(host '127.0.0.1', port '3306');
    8. -- create user mapping
    9. CREATE USER MAPPING FOR lightdb
    10. SERVER mysql_server
    11. OPTIONS (username 'ludeng', password 'ludeng');
    12. -- create foreign table
    13. CREATE FOREIGN TABLE fdwtest
    14. (
    15. a text
    16. )
    17. SERVER mysql_server
    18. OPTIONS (dbname 'ludeng', table_name 'fdwtest');
    19. -- 在msyql中的长度超过64k
    20. lightdb@postgres=# select substr(a,1,10),length(a) from fdwtest;
    21. substr | length
    22. ------------+--------
    23. aaaaaaaaaa | 65534
    24. bbbbbbbbbb | 65535
    25. cccccccccc | 65536
    26. (3 rows)

    可以看到,对于长度超过64K的字段,已经无法通过mysql_fdw来查询到了。

    mysql_fdw工作原理

    我们知道mysql数据库对外提供了一套c语言的API接口libmysqlclient,相关资料可以参考官网MySQL :: MySQL 8.0 C API Developer Guide。而mysq_fdw作为postgresql的一个插件,正是使用了libmysqlclient API来实现在postgresql上直接查询mysql数据库表的目的,使得postgresql用户可以像直接查询postgresql数据库一样查询mysql数据库,大致的执行流程如下

    libmysqlclient使用

    现在我们知道了mysql_fdw的大致工作原理是通过libmysqlclient与mysql去交互,那么我们就来简单使用一下libmysqlclient提供的c语言API,对libmysqlclient有个大致的了解。由于mysql_fdw的实现是基于C API Prepared Statement Interface的,那么我们就简单使用下相关的API,细节可参考官网

    在mysql上使用c语言prepared statement API的大致步骤如下
    1、使用mysql_stmt_init创建一个prepared statement处理器,调用mysql_stmt_prepare时传入一个字符串(sql语句)来创建一个prepared statement。当调用mysql_stmt_prepare时,mysql客户端/服务端协议执行了如下操作:
    1-服务端解析(包括词法和语法解析)传入的sql语句,并返回给客户端OK状态(返回值为0),同时还返回一个statement ID(该statement ID是正整数,用于标识所有本次sql执行的各个步骤),以及参数个数、列数和结果集元数据
    2-客户端使用statement ID来进行接下来的操作,以便服务器知道这是针对那个sql语句的


    2、通过mysql_stmt_bind_param来设置参数,所有参数必须均被设置,否则执行时会报错或返回不可预期的结果。


    3、调用mysql_stmt_execute()执行sql语句,具体步骤为
    1-客户端发送sql语句的参数给服务器,即绑定参数步骤
    2-服务端用客户端提供的statment ID找到是哪个sql语句,用客户端提供的参数替换占位符并执行sql语句。若sql语句需要返回结果集,服务端返回结果数据给客户端,否则只需要返回OK状态,以及几行数据被更新/删除/插入


    4、如果sql语句是一个select语句或者其它返回结果的语句,那么可以调用mysql_stmt_result_metadata获取结果集的元数据。结果集元数据是个MYSQL_RES结构,而结果集本身也是MYSQL_RES结构,结果集元数据中包含了查询的列数以及每列的相关信息。


    5、若一个sql语句会产生结果集,需要通过调用mysql_stmt_bind_result来绑定数据缓冲区(data buffer),而数据缓冲区将用于后续获取数据。


    6、调用mysql_stmt_fetch将数据逐行获取到缓冲区,直到所有数据行均被获取到。


    7、如有必要,重复步骤3至6。通过调用mysql_stmt_bind_param更改prepared statement的参数值,可以重复mysql_stmt_execute来重新执行该语句。


    8、当sql语句执行结束时,调用mysql_stmt_close关闭处理器(statement handler),这将释放所有资源,且handler此时已处于无效状态。


    9、若使用mysql_stmt_result_metadata获取了结果集的元数据,需调用mysql_free_result来释放元数据相关资源防止内存泄露。

    在了解了API的大致使用之后,我们直接上一个hello world例子:

    1. #include
    2. #include "mysql/mysql.h"
    3. #include
    4. #include
    5. #define STRING_SIZE 50
    6. #define SELECT_SAMPLE "select length(a),substr(a,1,10) from fdwtest limit 5"
    7. #define COLUMN_NUMS 2
    8. typedef char bool;
    9. int main()
    10. {
    11. MYSQL *mysql; // connection
    12. MYSQL_STMT *stmt;
    13. MYSQL_BIND bind[COLUMN_NUMS];
    14. MYSQL_RES *prepare_meta_result;
    15. MYSQL_TIME ts;
    16. unsigned long length[COLUMN_NUMS];
    17. int param_count, column_count, row_count;
    18. short small_data;
    19. int int_data;
    20. char str_data[STRING_SIZE];
    21. bool is_null[COLUMN_NUMS];
    22. bool error[COLUMN_NUMS];
    23. // create connection to MySQL
    24. mysql = mysql_init(NULL);
    25. if(mysql_real_connect(mysql, "127.0.0.1", "ludeng", "ludeng", "ludeng", 3306, NULL, 0) == NULL) {
    26. fprintf(stderr, "sorry, no database connection ...\n");
    27. return 1;
    28. }
    29. /* Prepare a SELECT query to fetch data from test_table */
    30. stmt = mysql_stmt_init(mysql);
    31. if (!stmt)
    32. {
    33. fprintf(stderr, " mysql_stmt_init(), out of memory\n");
    34. exit(0);
    35. }
    36. if (mysql_stmt_prepare(stmt, SELECT_SAMPLE, strlen(SELECT_SAMPLE)))
    37. {
    38. fprintf(stderr, " mysql_stmt_prepare(), SELECT failed\n");
    39. fprintf(stderr, " %s\n", mysql_stmt_error(stmt));
    40. exit(0);
    41. }
    42. fprintf(stdout, " prepare, SELECT successful\n");
    43. /* Get the parameter count from the statement */
    44. param_count= mysql_stmt_param_count(stmt);
    45. fprintf(stdout, " total parameters in SELECT: %d\n", param_count);
    46. if (param_count != 0) /* validate parameter count */
    47. {
    48. fprintf(stderr, " invalid parameter count returned by MySQL\n");
    49. exit(0);
    50. }
    51. /* Execute the SELECT query */
    52. if (mysql_stmt_execute(stmt))
    53. {
    54. fprintf(stderr, " mysql_stmt_execute(), failed\n");
    55. fprintf(stderr, " %s\n", mysql_stmt_error(stmt));
    56. exit(0);
    57. }
    58. /* Fetch result set meta information */
    59. prepare_meta_result = mysql_stmt_result_metadata(stmt);
    60. if (!prepare_meta_result)
    61. {
    62. fprintf(stderr," mysql_stmt_result_metadata(), returned no meta information\n");
    63. fprintf(stderr, " %s\n", mysql_stmt_error(stmt));
    64. exit(0);
    65. }
    66. /* Get total columns in the query */
    67. column_count= mysql_num_fields(prepare_meta_result);
    68. fprintf(stdout," total columns in SELECT statement: %d\n", column_count);
    69. if (column_count != 2) /* validate column count */
    70. {
    71. fprintf(stderr, " invalid column count returned by MySQL\n");
    72. exit(0);
    73. }
    74. /* Bind the result buffers for all 2 columns before fetching them */
    75. memset(bind, 0, sizeof(bind));
    76. /* INTEGER COLUMN */
    77. bind[0].buffer_type= MYSQL_TYPE_LONG;
    78. bind[0].buffer= (char *)&int_data;
    79. bind[0].is_null= &is_null[0];
    80. bind[0].length= &length[0];
    81. bind[0].error= &error[0];
    82. /* STRING COLUMN */
    83. bind[1].buffer_type= MYSQL_TYPE_STRING;
    84. bind[1].buffer= (char *)str_data;
    85. bind[1].buffer_length= STRING_SIZE;
    86. bind[1].is_null= &is_null[1];
    87. bind[1].length= &length[1];
    88. bind[1].error= &error[1];
    89. /* Bind the result buffers */
    90. if (mysql_stmt_bind_result(stmt, bind))
    91. {
    92. fprintf(stderr, " mysql_stmt_bind_result() failed\n");
    93. fprintf(stderr, " %s\n", mysql_stmt_error(stmt));
    94. exit(0);
    95. }
    96. /* Now buffer all results to client (optional step) */
    97. if (mysql_stmt_store_result(stmt))
    98. {
    99. fprintf(stderr, " mysql_stmt_store_result() failed\n");
    100. fprintf(stderr, " %s\n", mysql_stmt_error(stmt));
    101. exit(0);
    102. }
    103. /* Fetch all rows */
    104. row_count= 0;
    105. fprintf(stdout, "Fetching results ...\n");
    106. while (!mysql_stmt_fetch(stmt))
    107. {
    108. row_count++;
    109. fprintf(stdout, " row %d\n", row_count);
    110. /* column 1 */
    111. fprintf(stdout, " column1 (integer) : ");
    112. if (is_null[0])
    113. fprintf(stdout, " NULL\n");
    114. else
    115. fprintf(stdout, " %d(%ld)\n", int_data, length[0]);
    116. /* column 2 */
    117. fprintf(stdout, " column2 (string) : ");
    118. if (is_null[1])
    119. fprintf(stdout, " NULL\n");
    120. else
    121. fprintf(stdout, " %s(%ld)\n", str_data, length[1]);
    122. }
    123. /* Validate rows fetched */
    124. fprintf(stdout, " total rows fetched: %d\n", row_count);
    125. if (row_count != 5)
    126. {
    127. fprintf(stderr, " MySQL failed to return all rows\n");
    128. exit(0);
    129. }
    130. /* Free the prepared result metadata */
    131. mysql_free_result(prepare_meta_result);
    132. /* Close the statement */
    133. if (mysql_stmt_close(stmt))
    134. {
    135. /* mysql_stmt_close() invalidates stmt, so call */
    136. /* mysql_error(mysql) rather than mysql_stmt_error(stmt) */
    137. fprintf(stderr, " failed while closing the statement\n");
    138. fprintf(stderr, " %s\n", mysql_error(mysql));
    139. exit(0);
    140. }
    141. return 0;
    142. }
    143. //cc -o testLibMysqlClientStmt testLibMysqlClientStmt.c -I/usr/include/mysql/ -L /usr/lib64/mysql/ -lmysqlclient -g
    144. [lightdb@node101 libmysqlclient]$ ./testLibMysqlClientStmt
    145. prepare, SELECT successful
    146. total parameters in SELECT: 0
    147. total columns in SELECT statement: 2
    148. Fetching results ...
    149. row 1
    150. column1 (integer) : 65534(4)
    151. column2 (string) : aaaaaaaaaa(10)
    152. row 2
    153. column1 (integer) : 65535(4)
    154. column2 (string) : bbbbbbbbbb(10)
    155. row 3
    156. column1 (integer) : 65536(4)
    157. column2 (string) : cccccccccc(10)
    158. row 4
    159. column1 (integer) : 65537(4)
    160. column2 (string) : dddddddddd(10)
    161. row 5
    162. column1 (integer) : 65538(4)
    163. column2 (string) : eeeeeeeeee(10)
    164. total rows fetched: 5

    mysql_fdw源码分析

    注:这是2-8-0版本的mysql_fdw源码

    在mysql_fdw源码中,大致可以认为在mysqlBeginForeignScan中连接mysql数据库,而在mysqlIterateForeignScan中就是从mysql数据库获取查询数据,调用栈如下

    mysqlBeginForeignScan            
        mysql_get_connection        
        mysql_stmt_init                
        mysql_stmt_prepare            
        mysql_stmt_result_metadata    
        mysql_fetch_fields            
        若查询表存在类型为text的字段, 设置festate->has_var_size_col = true #本次新增#
        mysql_bind_result            
        mysql_stmt_attr_set    设置stmt属性 #本次新增#            
        mysql_stmt_bind_result        

    mysqlIterateForeignScan        
        ExecClearTuple                
        bind_stmt_params_and_exec    
            mysql_stmt_execute        
            mysql_stmt_store_result    在客户端结果集中缓存查询返回的全量结果,这里将field->max_length置为63358 #本次新增#
            mysql_bind_result(loop)    对TEXT类型重新绑定对应的内存 #本次新增#
            mysql_stmt_bind_result    为结果集中的输出字段建立缓冲(数据和长度的缓冲) #本次新增#

        mysql_stmt_fetch             
        mysql_convert_to_pg            
        heap_form_tuple                
        ExecStoreHeapTuple            
        内存释放和返回TupleTableSlot

    1. 所以造成数据查询不到的本质就在于:
    2. rc = mysql_stmt_fetch(festate->stmt); //返回101(MYSQL_DATA_TRUNCATED),导致查询结果不再返回给客户端了
    3. if (rc == 0)
    4. {
    5. foreach(lc, festate->retrieved_attrs)
    6. {
    7. int attnum = lfirst_int(lc) - 1;
    8. Oid pgtype = TupleDescAttr(attinmeta->tupdesc, attnum)->atttypid;
    9. int32 pgtypmod = TupleDescAttr(attinmeta->tupdesc, attnum)->atttypmod;
    10. nulls[attnum] = festate->table->column[attid].is_null;
    11. if (!festate->table->column[attid].is_null)
    12. dvalues[attnum] = mysql_convert_to_pg(pgtype, pgtypmod,
    13. &festate->table->column[attid]);
    14. attid++;
    15. }
    16. ExecClearTuple(tupleSlot);
    17. if (list_length(fdw_private) >= mysqlFdwPrivateScanTList)
    18. {
    19. /* Construct tuple with whole-row references. */
    20. tup = mysql_get_tuple_with_whole_row(festate, dvalues, nulls);
    21. }
    22. else
    23. {
    24. /* Form the Tuple using Datums */
    25. tup = heap_form_tuple(attinmeta->tupdesc, dvalues, nulls);
    26. }
    27. ...
    28. }

    查看libmysqlclient官方对于mysql_stmt_attr_set的注解:If STMT_ATTR_UPDATE_MAX_LENGTH set to 1, causes mysql_stmt_store_result() to update the metadata MYSQL_FIELD->max_length value。调用mysql_stmt_attr_set将属性STMT_ATTR_UPDATE_MAX_LENGTH设置为true后,使得调用mysql_stmt_store_result后会更新结果集中字段的长度。对于我们的案例,长度为65537的字段来说,经过上述2个步骤后,会将MYSQL_FIELD->max_length value的值设置为65537,这时我们根据MYSQL_FIELD->max_length value重新申请长度为65537字节的内存,并再次将输出字段绑定到刚申请的内存即可,后续就能正常输出长度为65537的字段了。

    总结

    这个问题当然是mysql_fdw的bug,修改本身也比较简单。通过使用mysql_stmt_attr_set设置STMT_ATTR_UPDATE_MAX_LENGTH为1,调用mysql_stmt_store_result更新结果集中字段的长度即可。

    但是mysql_fdw的官方修复只是针对在pg上创建text类型的字段时的场景,并未包含对varchar和char字段的类似处理,也就是说若pg表中存在varchar和char字段,且varchar或char对应的mysql表的字段长度超过64KB,无法查询mysql_fdw表超长字段的问题依然存在。因为pg的字符类型字段最大可以支持10485760(the length must be greater than zero and cannot exceed 10485760),远超64KB。

    实际的修复也很简单,当表中存在varchar和char字段时,只要保持跟text一样的处理方式即可,在lightdb中23.3版本,我们修复了这个问题。

    因此,若lightdb用户碰到mysql fdw表字段超过64K而无法正确查询时,只需将lightdb升级到23.3即可。

  • 相关阅读:
    Day17.1:静态与非静态的详解
    vue3项目实战中的接口调用方法(一)async/await用法 对axios二次封装 实现异步请求
    单机Linux下搭建MongoDB副本集-三节点
    canvas画布绘制线条样式:粗细,圆角,拐角等
    【微前端开发环境下,加载远程子应用的实战。】
    「网页开发|前端开发|Vue」10 vuex模块化:将数据划分成不同modules分别管理
    华为浏览器风险提示 - 解决方案
    【网络安全】使用meterpreter进行远控、Mysql注入、反弹型XSS攻防
    虾皮物流价格是多少?如何计算?
    java计算机毕业设计web家教管理系统源码+mysql数据库+系统+lw文档+部署
  • 原文地址:https://blog.csdn.net/u012796067/article/details/132617318