1.深入源码分析下 HIVE JDBC 的源码超时机制及其如何配置 socketTimeOut
2.MySQL JDBC 编译添加 Maven 依赖支持
3.Flink深入浅出:JDBC Connector源码分析
4.PolarDB-X 源码解读(七):私有协议连接的一生(CN篇)
5.shardingsphere源码阅读-兼容jdbc规范
6.odbc和jdbc的区别是什么
深入源码分析下 HIVE JDBC 的超时机制及其如何配置 socketTimeOut
深入源码分析下HIVE JDBC的超时机制及其配置方法,首先,解析从一个常见的源码问题出发,即当HIVE JDBC连接在操作过程中遇到SocketTimeoutException时,解析这通常意味着操作超时。源码接下来,解析纯静态源码让我们回顾JDBC超时机制的源码相关参数和接口。
在JDBC中,解析超时机制主要通过setStatementTimeout和setConnectionTimeout这两个方法实现。源码setStatementTimeout用于设置SQL语句的解析超时时间,而setConnectionTimeout用于设置整个连接的源码超时时间。它们的解析单位都是毫秒。
在HIVE JDBC中,源码由于其基于Thrift进行通信,解析因此对socket级别的源码超时管理更为复杂。HiveStatement中的thrift socket timeout是通过配置实现的,通过深入源码分析,可以发现thrift socket timeout的值被赋值给HiveStatement实例。当应用程序直接创建和管理HIVE JDBC连接时,需要在创建HiveStatement实例时设置这个属性,以确保socket级别操作的超时时间得到正确配置。
如果应用程序通过数据库连接池进行连接管理,那么配置HiveStatement中的thrift socket timeout的过程会更复杂。通常,需要在连接池的配置中,为HIVE JDBC连接指定socket级别的超时属性,然后在使用连接时确保HiveStatement实例正确引用了这些配置。
通过以上分析,我们可以总结出在不同场景下配置HIVE JDBC socket级别的超时机制的方法。对于直接管理连接的应用程序,需要在创建HiveStatement实例时直接设置socket timeout属性。而对于使用数据库连接池的应用程序,则需要在连接池的配置阶段为HIVE JDBC连接指定socket级别的超时属性,然后确保在使用连接时HiveStatement实例正确引用了这些配置。
总之,HIVE JDBC的超时机制及其配置方法涉及到多个层面的参数和接口,理解并正确配置它们对于确保应用程序的稳定性和响应速度至关重要。通过源码分析和实践操作,可以实现对HIVE JDBC socket级别的超时管理,从而优化应用程序性能。
MySQL JDBC 编译添加 Maven 依赖支持
在当前的工作项目中,需要对MySQL JDBC进行编译,即集成mysql-connector-j包。进入年,我们依然面临着手动下载和安装JAR包的传统方式,这从MySQL官方文档的JDBC源码编译指南中可见一斑。Oracle的这一做法似乎有意为之,给MySQL开发者带来了不便。
为了解决这个问题,我决定将MySQL JDBC添加Maven依赖,以下是关键步骤:
首先,确保你的贵州在线直播系统源码项目配置了JUnit 5进行单元测试,这时需要在pom.xml中加入maven-surefire-plugin插件:
xml
org.apache.maven.plugins
maven-surefire-plugin
如果你希望尽快使用,而不是等待官方更新,可以直接从我fork的库中获取,选择feat-maven-dep分支。
虽然这个过程略显繁琐,但通过这种方式,我们至少可以简化构建流程,提高开发效率。期待MySQL官方能尽快采纳这些改进。
Flink深入浅出:JDBC Connector源码分析
大数据开发中,数据分析与报表制作是日常工作中最常遇到的任务。通常,我们通过读取Hive数据来进行计算,并将结果保存到数据库中,然后通过前端读取数据库来进行报表展示。然而,使用FlinkSQL可以简化这一过程,通过一个SQL语句即可完成整个ETL流程。
在Flink中,读取Hive数据并将数据写入数据库是常见的需求。本文将重点讲解数据如何写入数据库的过程,包括刷写数据库的机制和原理。
以下是本文将讲解的几个部分,以解答在使用过程中可能产生的疑问:
1. 表的定义
2. 定义的表如何找到具体的实现类(如何自定义第三方sink)
3. 写入数据的机制原理
(本篇基于1..0源码整理而成)
1. 表的定义
Flink官网提供了SQL中定义表的示例,以下以oracle为例:
定义好这样的表后,就可以使用insert into student执行插入操作了。接下来,我们将探讨其中的技术细节。
2. 如何找到实现类
实际上,这一过程涉及到之前分享过的SPI(服务提供者接口),即DriverManager去寻找Driver的过程。在Flink SQL执行时,会通过translate方法将SQL语句转换为对应的Operation,例如insert into xxx中的xxx会转换为CatalogSinkModifyOperation。这个操作会获取表的信息,从而得到Table对象。如果这个Table对象是CatalogTable,则会进入TableFactoryService.find()方法找到对应的实现类。
寻找实现类的过程就是SPI的过程。即通过查找路径下所有TableFactory.class的实现类,加载到内存中。这个SPI的定义位于resources下面的META-INFO下,定义接口以及实现类。
加载到内存后,首先判断是否是TableFactory的实现类,然后检查必要的参数是否满足(如果不满足会抛出异常,很多人在第一次使用Flink SQL注册表时,都会遇到NoMatchingTableFactoryException异常,其实都是因为配置的属性不全或者Jar报不满足找不到对应的TableFactory实现类造成的)。
找到对应的实现类后,调用对应的各显神通源码公式createTableSink方法就能创建具体的实现类了。
3. 工厂模式+创建者模式,创建TableSink
JDBCTableSourceSinkFactory是JDBC表的具体实现工厂,它实现了stream的sinkfactory。在1..0版本中,它不能在batch模式下使用,但在1.版本中据说会支持。这个类使用了经典的工厂模式,其中createStreamTableSink负责创建真正的Table,基于创建者模式构建JDBCUpsertTableSink。
创建出TableSink之后,就可以使用Flink API,基于DataStream创建一个Sink,并配置对应的并行度。
4. 消费数据写入数据库
在消费数据的过程中,底层基于PreparedStatement进行批量提交。需要注意的是提交的时机和机制。
控制刷写触发的最大数量 'connector.write.flush.max-rows' = ''
控制定时刷写的时间 'connector.write.flush.interval' = '2s'
这两个条件先到先触发,这两个参数都是可以通过with()属性配置的。
JDBCUpsertFunction很简单,主要的工作是包装对应的Format,执行它的open和invoke方法。其中open负责开启连接,invoke方法负责消费每条数据提交。
接下来,我们来看看关键的format.open()方法:
接下来就是消费数据,执行提交了
AppendWriter很简单,只是对PreparedStatement的封装而已
5. 总结
通过研究代码,我们应该了解了以下关键问题:
1. JDBC Sink执行的机制,比如依赖哪些包?(flink-jdbc.jar,这个包提供了JDBCTableSinkFactory的实现)
2. 如何找到对应的实现?基于SPI服务发现,扫描接口实现类,通过属性过滤,最终确定对应的实现类。
3. 底层如何提交记录?目前只支持append模式,底层基于PreparedStatement的addbatch+executeBatch批量提交
4. 数据写入数据库的时机和机制?一方面定时任务定时刷新,另一方面数量超过限制也会触发刷新。
更多Flink内容参考:
PolarDB-X 源码解读(七):私有协议连接的一生(CN篇)
通过前文的介绍,大家基本了解了一条SQL在polardbx-sql中的解析和执行流程。由于polardbx-sql是无状态的计算节点,真正数据需要从存储节点传输到计算节点,这部分工作由私有协议完成。本文将详细介绍从发送请求到存储节点,接收返回数据的完整流程,重点在于私有协议连接的生命周期和关键代码解析。
概述
为了提高数据节点本地计算能力,同时减少网络数据传输量,计算节点会尽可能下推计算内容。一个逻辑表可能需要多个物理分片,因此计算节点与存储节点的请求会话数量会随着分片数增加而增加。传统MySQL协议+连接池架构已不能满足PolarDB-X的需求,因此私有协议在这一需求场景下应运而生。查询信息类网站源码
如图所示,私有协议采用连接与会话分离的RPC协议设计理念,支持多个会话在同一个TCP通道中并行运行,具备流控机制、全双工响应式工作模式和高吞吐、可扩展等特性。
更多关于私有协议解决上述问题的设计详情,可以参考《PolarDB-X私有协议设计》一文。本文主要从代码层面详细描述私有协议的工作流程。
我们将从计算节点和存储节点两个角度完整解析私有协议连接的生命周期。篇幅限制,本文仅关注计算节点上私有协议的处理,存储节点部分将在后续文章中详细说明。
计算节点
计算节点作为私有协议的客户端,负责发送下推请求,并接收返回的数据。
网络层框架
PolarDB-X私有协议网络层采用定制化Reactor框架实现,基于Java的NIO,改进自polardbx-sql中的Reactor框架。网络层初始化时,设置CPU核心数的2倍(上限为)作为NIOProcessor,每个Reactor使用独立的堆外内存池作为收发包缓冲,总缓冲内存大小限制为堆内存大小的%。
NIO接收的包直接调用注册的处理函数,发送数据仅写入send buf,网络写入由单独线程完成。线程优先写入TCP send buf,当无法写入时,注册OP_WRITE事件等待可写后再写入剩余内容。
数据包的编码和解码在NIOClient中实现。为实现最佳性能,解包流程直接在堆外内存上进行,使用protobuf对流直接解析,将结果放入堆内。堆外内存被切分为KB chunk,每个Reactor独占一个chunk,连续解析和复用,最大化接收、解析效率。对于特大包,额外构造堆内大buffer接收和解析,回退标志在定时任务中重置,连续s无超大包时释放堆内内存,恢复高性能堆外KB buffer接收。
请求发送集成在NIOClient中,writer优先尝试写入发送缓冲队列尾部的buffer,不足时新申请buffer填充并追加到队尾。buffer来自预分配的堆外缓冲池,超过chunk大小时分配堆内buf进行序列化。
同时,NIOClient负责TCP连接的洛阳代驾app源码建立和断开资源释放,作为独立的底层网络资源管理实现。
连接及会话
网络层之后,我们聚焦连接与会话分离的具体实现。通过剥离连接及收发包的具体实现,连接和会话的管理变得更加清晰简洁。
首先,一个TCP连接的逻辑抽象结构在XClient中实现,为避免误解,取名为client与JDBC中的Connection区别。该类管理TCP连接和并行运行的会话,负责TCP完整生命周期的管理、认证鉴权,并维护公共信息。其中,workingSessionMap记录了连接上并行运行的所有会话映射关系,可快速通过会话ID找到对应的会话抽象结构XSession。
XSession提供了所有会话相关的请求函数和信息存储,包括执行计划请求、SQL查询请求、SQL更新请求、TSO请求、会话变量处理、数据包处理及异步唤醒等。
连接池及全局单例管理器
为了提高性能,TCP连接和会话的复用必不可少。由于连接和会话的解绑,连接池不仅缓存了到计算节点的TCP连接,也缓存了到计算节点的会话。
XClientPool管理到一个存储节点的连接池,通过IP,端口,用户名三元组唯一确定目标存储节点,同时存储该节点的全部TCP连接(XClient)和建立的会话(XSession)。
XClientPool实现存储节点会话获取,对应JDBC接口中的getConnection,同时实现连接和会话生命周期管理、连接探活、会话预分配等功能。实现单个存储节点连接池后,XConnectionManager维护目标存储节点三元组到实例连接池的映射,管理定时任务线程池,实现定时探活、会话&连接最长生命控制以及连接池预热等功能。
JDBC兼容层
新的SQL协议层对上层使用者要求较高,为了提高开发效率,私有协议提供兼容JDBC的使用方法,实现从JDBC平滑切换至私有协议,并支持协议热切换。
JDBC兼容层代码目录在compatible目录下,Connection继承在XConnection文件中。提供包括DataSource、Connection、Statement、PreparedStatement、ResultSet、ResultSetMetaData在内的大部分常用接口函数实现,不支持的函数会明确抛出异常避免误用。
整体关系
至此,私有协议计算节点端的大部分结构已说明完成。给出一个整体的关系图。
私有协议连接的一生(CN视角)
了解了私有协议各层实现后,我们以发到存储节点的请求为例,完整梳理执行流程。绕开计算节点复杂流程,直接运行代码示例(注:需将com.alibaba.polardbx.rpc.XConfig#GALAXY_X_PROTOCOL设置为true)。
直接运行playground看到预期的select 1的结果。接下来,我们深入跟踪说明。
数据源初始化
要使用私有协议,需要初始化对应存储节点的XDataSource。构造过程中,XDataSource会到XConnectionManager注册新的实例连接池,已存在的连接池引用计数加一。
获取Connection
当需要执行查询时,首先获取会话。无论是显式开启事务还是使用auto commit事务,会话都是执行请求的最小上下文。通过XDataSource的getConnection方法获取到对应存储节点的会话。XDataSource根据存储的IP,端口,用户名三元组查找到XConnectionManager中的连接池,在最高并发检查后,会话获取逻辑在XClientPool实现。首先尝试在空闲会话池中拿会话,通过重置检查和初始化后返回给调用者。大部分场景下,ConcurrentLinkedQueue提供较好的并发性能。
在代码场景下,数据源刚新建,后台定时任务未运行,流程进入连接创建流程。会有一把大锁锁住连接池,在TCP连接未达上限且没有超时的情况下,快速新建一个XClient占坑。若超限,则进入busy waiting循环。真正的TCP connect(waitChannel)在锁外被调用,首先client以阻塞模式带超时方式connect,然后切换为非阻塞模式,round robin策略注册到NIOProcesser上,返回时,TCP连接已建立。
为了兼顾安全和性能,连接鉴权在TCP建连后只用做一次,会话创建不需要鉴权。鉴权在initClient中完成,发送SESS_AUTHENTICATE_START_VALUE包,后续校验由回调完成。认证采用标准的MySQL认证流程,server端返回challenge值,库名、用户名和加盐hash后的密码返回给MySQL即可完成认证。
至此,到存储节点的TCP连接已建立,创建会话是一个异步流程。在创建新XClient时,XConnection已new好,通过下断点跟进去可看到newXSession流程,分配session id,设置状态为init,将XSession绑定到XConnection上。
最后,XConnection经过初始化(重置auto commit状态)、重置默认DB、默认字符集(lazy操作)和统计信息记录,返回给用户使用。
发送查询请求
拿到初始化好的兼容JDBC的Connection,为了简化流程,直接调用XConnection中的execQuery。XConnection的execQuery包装了XSession的execQuery,执行前执行了设置流式模式。
首先记录调用信息进行统计,进入关键的initForRequest流程。XSession初始化流程lazy,仅分配session id,设置状态为Init,真正创建session时发送SESS_NEW给server,绑定新session和session id。如果session已复用,则状态为Ready。
执行字符集更改的lazy操作,session可能在其他请求中切换字符集,根据目标字符集和当前字符集对比,决定是否发送额外的字符集更改请求。
经过一系列变量设置、lazy DB设置和protobuf包构造,请求发送到存储节点执行。发送后,同步生成XResult负责结果解析,同时XResult按照请求顺序依次拉链表,确保结果与请求一一对应。
请求流水线结构如下图所示,处理完成前序请求后,才能解析后续结果。
接收结果集
请求已发送到存储节点执行,拿到XResult,通过XResult收集查询结果集。XResult与发送请求一一对应,存储节点处理也是在会话上排队进行,不会影响流水线上其他请求的返回,保证流水线正常工作。
首先,查看结果集处理的状态机,主要状态包括获取元数据、获取数据行、获取额外信息等,顺序固定,根据请求类型,部分环节可能被省略。报错处理贯穿整个状态机,任何报错信息都会导致状态机进入错误处理环节。
对于非流式数据读取,请求结束时主动调用finishBlockMode将所有数据读出并缓存到rows中。对于流式执行的情况,结果集状态机消费数据包队列由XResult的next函数推动,内部函数internalFetchOneObject递归调用前序XResult,消费前序请求结果,从数据包队列中消费并推动状态机流转。
对于查询,首先收到RESULTSET_COLUMN_META_DATA包,表示返回数据列定义,一个包表示一列。元数据包后,收到包含数据行的RESULTSET_ROW包,一个包对应一行。数据行传输完成后,server端发送RESULTSET_FETCH_DONE标示数据发送完成。请求结束前,NOTICE包用于告知客户端rows affected等信息。最后,SQL_STMT_EXECUTE_OK包标示请求结束。
至此,完整请求处理完成,控制台应显示查询结果。
总结
本文详细描述了私有协议连接流程中的关键点和关键数据结构,相信通过本文描述,大家掌握了私有协议连接流程的基本点,在调试和修改使用中能够更加得心应手。虽然本文篇幅较长,但实际使用中涉及更多高级特性的使用,如多请求流水线、流控、执行计划传输、chunk结果集传输等。通过本文,我们对私有协议连接流程有了深入理解,为在实际场景中应用提供坚实基础。
shardingsphere源码阅读-兼容jdbc规范
JDBC规范提供一套标准,让不同数据库厂商遵循统一接口操作数据库,从而简化应用程序开发。shardingsphere兼容此规范,通过重写接口实现兼容。
基于JDBC规范,shardingsphere采用适配器模式重写DataSource、Connection、Statement、ResultSet等关键接口,构建了一套完整的实现方案。适配器模式确保了shardingsphere能够以与JDBC规范一致的方式操作数据库,同时支持分库分表功能。
shardingsphere中,JdbcObject接口代表JDBC规范中的核心接口,包括DataSource、Connection、Statement等。通过包装器接口Wrapper以及其子类WrapperAdapter,shardingsphere实现了适配器模式,重写了这些接口的方法,同时保留了与JDBC规范的兼容性。
AbstractUnsupportedOperationJdbcObject和AbstractJdbcObjectAdapter作为抽象类,分别用于实现部分和全部接口方法。ShardingIdbcObject继承自AbstractJdbcObjectAdapter,包括ShardingDataSource、ShardingConnection、ShardingStatement等对象,这些对象都采用适配器模式重写JDBC规范接口,确保与JDBC规范无缝衔接。
以ShardingDataSource为例,其构造过程通过ShardingDataSourceFactory创建ShardingDataSource对象,将数据源、分库分表规则和属性等信息整合,同时初始化运行时上下文和静态代码块加载路由、SQL重写、结果集引擎等组件。ShardingDataSource内部的WrapperAdapter类维护方法调用信息,通过recordMethodInvocation和replayMethodsInvocation方法记录和回放方法调用。
AbstractDataSourceAdapter作为数据源适配器的抽象类,封装公共属性和方法,减少重复代码。此类中的dataSourceMap和databaseType属性分别保存数据源信息和数据库类型,getRuntimeContext方法用于获取分库分表的运行时上下文。
综上所述,shardingsphere通过适配器模式重写JDBC规范接口,实现了与JDBC规范的兼容性。不论使用sharding-jdbc还是原生JDBC,操作数据库的方式和流程保持一致,只是在实现细节上支持了分库分表功能,为开发者提供了一种灵活且高效的数据库管理方案。
odbc和jdbc的区别是什么
odbc和jdbc的区别是JDBC比ODBC更容易理解;JDBC的移植性要比ODBC要好;JDBC数据库驱动程序是面向对象的。
1.JDBC比ODBC更容易理解。
在ODBC中一个的简单的查询,也需求分为好几块内容;而在ODBC驱动程序内部再去整合,做一些复杂的操作。这不仅降低了数据库启动程序的性能,而且也给程序开发者开发实际运用程序带来了确定的负面效果。而JDBC数据库启动程序在设计的时间就包含了大部份基本数据操作功能,为此在编写一些常规的数据库操作语句时,如查询、更新等等,其所需求的源代码比ODBC要少的多。故从这方面来说,JDBC数据库启动程序要比ODBC简易理解。
2.JDBC数据库驱动程序是面向对象的。
JDBC完全遵循Java语言的优良特性。通常情况下,只要有Java功能需设计基础的用户都能在最短时间内了解JDBC驱动程序的架构,较量简易上手,能轻而易举的开发出强悍的数据库实际运用程序。而ODBC的话,由于其内部功能复杂,源代码编写要求高。为此即使是一个的C语言的高手,仍然需求花费不少的时间去了解那个数据库启动程序;在编写源代码的时间,还离不开有关的参考书本。
3.JDBC的移植性要比ODBC要好。
通常情况下,安装完ODBC驱动程序之后,还需求经过确定的配置才能够应用。而不相同的配置在不相同数据库服务器之间不能够通用。也那是说,装一次需求配置一次。但是JDBC数据库驱动程序则不相同。假如采用JDBC数据库驱动程序的话,则只需要选取适当的 JDBC数据库驱动程序,就不需要额外的配置。在安装过程中,JDBC数据库驱动程序会自己完成有关的配置。为此JDBC的移植性要比ODBC要好。