MyBatis 基础支持层:数据源与事务模块

数据源是持久层框架中最核心的组件之一,在实际工作中比较常见的数据源有 C3P0、Apache Common DBCP、Proxool 等。作为一款成熟的持久化框架,MyBatis 不仅自己提供了一套数据源实现,而且还能够方便地集成第三方数据源。

javax.sql.DataSource 是 Java 语言中用来抽象数据源的接口,其中定义了所有数据源实现的公共行为,MyBatis 自身提供的数据源实现也要实现该接口。

MyBatis 提供了两种类型的数据源实现,分别是 PooledDataSourceUnpooledDataSource,继承关系如下图所示:

1
2
3
4
5
6
7
8
9
                +------------+
                | DataSource |
                +------------+
                      ^
                      |
         +------------+-------------+
+--------+---------+     +----------+---------+
| PooledDataSource |     | UnpooledDataSource |
+------------------+     +--------------------+

针对不同的 DataSource 实现,MyBatis 提供了不同的工厂实现来进行创建,如下图所示,这是工厂方法模式的一个典型应用场景:

DataSource 类图

1 工厂方法模式

工厂方法模式中定义了 Factory 这个工厂接口,如下图所示,其中定义了 createProduct() 方法创建右侧继承树中的对象,不同的工厂接口实现类会创建右侧继承树中不同 Product 实现类(例如 ProductImpl1ProductImpl2)。

工厂方法模式

从上图中,我们可以看到工厂方法模式由四个核心角色构成:

  • Factory 接口:工厂方法模式的核心接口之一。使用方会依赖 Factory 接口创建 Product 对象实例。
  • Factory 实现类(图中的 FactoryImpl1 和 FactoryImpl2):用于创建 Product 对象。不同的 Factory 实现会根据需求创建不同的 Product 实现类。
  • Product 接口:用于定义业务类的核心功能。Factory 接口创建出来的所有对象都需要实现 Product 接口。使用方依赖 Product 接口编写其他业务实现,所以使用方关心的是 Product 接口这个抽象,而不是其中的具体实现逻辑。
  • Product 实现类(图中的 ProductImpl1 和 ProductImpl2):实现了 Product 接口中定义的方法,完成了具体的业务逻辑。

这里假设一个场景:目前我们要做一个注册中心模块,已经有了 ZookeeperImplEtcdImpl 两个业务实现类,分别支持了与 ZooKeeper 交互和与 etcd 交互,此时来了个新需求,需要支持与 Consul 交互,我们只需要添加新的 ConsulFactory 实现类和 ConsulImpl 实现类即可完成扩展。

工厂方法模式最终也是符合“开放 —— 封闭”原则的,可以通过添加新的 Factory 接口实现和 Product 接口实现来扩展整个体系的功能。另外,工厂方法模式对使用方暴露的是 FactoryProduct 这两个抽象的接口,而不是具体的实现,也就帮助使用方面向接口编程。

2 DataSourceFactory

了解了工厂方法模式的基础知识之后,我们再回到 MyBatis 的数据源实现上来。MyBatis 的数据源模块也是用到了工厂方法模式,如果需要扩展新的数据源实现时,只需要添加对应的 Factory 实现类,新的数据源就可以被 MyBatis 使用

DataSourceFactory 接口就扮演了 MyBatis 数据源实现中的 Factory 接口角色。UnpooledDataSourceFactoryPooledDataSourceFactory 实现了 DataSourceFactory 接口,也就是 Factory 接口实现类的角色。三者的继承关系如下图所示:

DataSourceFactory

DataSourceFactory 接口中最核心的方法是 getDataSource() 方法,该方法用来生成一个 DataSource 对象。

1
2
3
4
public interface DataSourceFactory {
  void setProperties(Properties props);
  DataSource getDataSource();
}

2.1 UnpooledDataSourceFactory

UnpooledDataSourceFactory 这个实现类的初始化过程中,会直接创建 UnpooledDataSource 对象,其中的 dataSource 字段会指向该 UnpooledDataSource 对象。

接下来调用的 setProperties() 方法会根据传入的配置信息,完成对该 UnpooledDataSource 对象相关属性的设置:

UnpooledDataSourceFactory 对于 getDataSource() 方法的实现就相对简单了,其中直接返回了上面创建的 UnpooledDataSource 对象。

2.2 PooledDataSourceFactory

从前面的 DataSourceFactory 继承关系图中可以看到,PooledDataSourceFactory 是通过继承 UnpooledDataSourceFactory 间接实现了 DataSourceFactory 接口。

1
2
3
4
5
public class PooledDataSourceFactory extends UnpooledDataSourceFactory {
  public PooledDataSourceFactory() {
    this.dataSource = new PooledDataSource();
  }
}

PooledDataSourceFactory 中并没有覆盖 UnpooledDataSourceFactory 中的任何方法,唯一的变化就是将 dataSource 字段指向的 DataSource 对象类型改为 PooledDataSource 类型。

3 DataSource

JDK 提供的 javax.sql.DataSource 接口在 MyBatis 数据源中扮演了 Product 接口的角色。 MyBatis 提供的数据源实现有两个,一个 UnpooledDataSource 实现,另一个 PooledDataSource 实现,它们都是 “Product” 具体实现类的角色。

3.1 UnpooledDataSource

我们先来看 UnpooledDataSource 的实现,其中的核心字段有如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
public class UnpooledDataSource implements DataSource {
  // 加载 Driver 类的类加载器
  private ClassLoader driverClassLoader;
  // 数据库连接驱动的相关配置
  private Properties driverProperties;
  // 缓存所有已注册的数据库连接驱动
  private static Map<String, Driver> registeredDrivers = new ConcurrentHashMap<>();
  // 事务隔离级别
  private Integer defaultTransactionIsolationLevel;
}

在 Java 中,几乎所有数据源实现的底层都是依赖 JDBC 操作数据库的,而使用 JDBC 的第一步就是向 DriverManager 注册 JDBC 驱动类,之后才能创建数据库连接DriverManager 中定义了 registeredDrivers 字段用于记录注册的 JDBC 驱动,这是一个 CopyOnWriteArrayList 类型的集合,是线程安全的。

MyBatis 的 UnpooledDataSource 实现中定义了如下静态代码块,从而在 UnpooledDataSource 加载时,将已在 DriverManager 中注册的 JDBC 驱动器实例复制一份到 UnpooledDataSource.registeredDrivers 集合中。

1
2
3
4
5
6
7
static {
  Enumeration<Driver> drivers = DriverManager.getDrivers();
  while (drivers.hasMoreElements()) {
    Driver driver = drivers.nextElement();
    registeredDrivers.put(driver.getClass().getName(), driver);
  }
}

getConnection() 方法中,UnpooledDataSource 会调用 doGetConnection() 方法获取数据库连接,具体实现如下:

1
2
3
4
5
6
7
8
9
private Connection doGetConnection(Properties properties) throws SQLException {
  // 初始化数据库驱动
  initializeDriver();
  // 创建数据库连接
  Connection connection = DriverManager.getConnection(url, properties);
  // 配置数据库连接
  configureConnection(connection);
  return connection;
}
  • 在调用的 initializeDriver() 方法中,完成了 JDBC 驱动的初始化,其中会创建配置中指定的 Driver 对象,并将其注册到 DriverManager 以及上面介绍的 UnpooledDataSource.registeredDrivers 集合中保存;
  • configureConnection() 方法会对数据库连接进行一系列配置,例如,数据库连接超时时长、事务是否自动提交以及使用的事务隔离级别。

3.2 PooledDataSource

PooledDataSource 中并没有直接维护数据库连接的集合,而是维护了一个 PooledState 类型的字段(state 字段),这个 PooledState 才是管理连接的地方。不过 PooledState 中维护的数据库连接并不是真正的数据库连接(不是 java.sql.Connection 对象),而是 PooledConnection 对象。

3.2.1 PooledConnection

PooledConnection 是 MyBatis 中定义的一个 InvocationHandler 接口实现类,其中封装了真正的 java.sql.Connection 对象以及相关的代理对象,这里的代理对象是通过 JDK 动态代理产生的。

下面来看 PooledConnection 中的核心字段:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class PooledConnection implements InvocationHandler {
  /*
  * 记录当前 PooledConnection 对象归属的 PooledDataSource 对象。
  * 也就是说,当前的 PooledConnection 是由该 PooledDataSource 对象创建的;
  * 在通过 close() 方法关闭当前 PooledConnection 的时候,当前 PooledConnection 会被返还给该 PooledDataSource 对象。
  */
  private final PooledDataSource dataSource;
  // 当前 PooledConnection 底层的真正数据库连接对象
  private final Connection realConnection;
  // 指向了 realConnection 数据库连接的代理对象
  private final Connection proxyConnection;
  // 使用方从连接池中获取连接的时间戳
  private long checkoutTimestamp;
  // 连接创建的时间戳
  private long createdTimestamp;
  // 连接最后一次被使用的时间戳
  private long lastUsedTimestamp;
  // 数据库连接的标识。该标识是由数据库 URL、username 和 password 三部分组合计算出来的 hash 值,主要用于连接对象确认归属的连接池
  private int connectionTypeCode;
  /*
  * 用于标识 PooledConnection 对象是否有效。该字段的主要目的是防止使用方将连接归还给连接池之后,
  *  依然保留该 PooledConnection 对象的引用并继续通过该 PooledConnection 对象操作数据库。
  */
  private boolean valid;
}

下面来看 PooledConnection 的构造方法,其中会初始化上述字段:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
public PooledConnection(Connection connection, PooledDataSource dataSource) {
  this.hashCode = connection.hashCode();
  this.realConnection = connection;
  this.dataSource = dataSource;
  this.createdTimestamp = System.currentTimeMillis();
  this.lastUsedTimestamp = System.currentTimeMillis();
  this.valid = true;
  this.proxyConnection = (Connection) Proxy
    .newProxyInstance(Connection.class.getClassLoader(), IFACES, this);
}

构造方法中我们重点关注 proxyConnection,它是通过 JDK 动态代理的方式生成的,其中传入的 InvocationHandler 实现正是 PooledConnection 自身。PooledConnection.invoke() 方法中只对 close() 方法进行了拦截,具体实现如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
  String methodName = method.getName();
  if (CLOSE.equals(methodName)) {
    // 如果调用 close() 方法,并没有直接关闭底层连接,而是将其归还给关联的连接池
    dataSource.pushConnection(this);
    return null;
  }
  try {
    if (!Object.class.equals(method.getDeclaringClass())) {
      // 只要不是 Object 的方法,都需要检测当前 PooledConnection 是否可用
      // 如果连接不可用,就抛出异常
      checkConnection();
    }
    // 如果连接可用,调用 realConnection 的对应方法
    return method.invoke(realConnection, args);
  } // catch ...
}

3.2.2 PoolState

接下来看 PoolState 这个类,它负责管理连接池中所有 PooledConnection 对象的状态,维护了两个 ArrayList <PooledConnection> 集合,按照 PooledConnection 对象的状态分类存储,其中:

  • idleConnections 集合用来存储空闲状态的 PooledConnection 对象;
  • activeConnections 集合用来存储活跃状态的 PooledConnection 对象。

另外,PoolState 中还定义了多个 long 类型的统计字段:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
public class PoolState {
  // 请求数据库连接的次数
  protected long requestCount = 0;
  // 获取连接的累积耗时
  protected long accumulatedRequestTime = 0;
  // 所有连接的 checkoutTime 累加。PooledConnection 中有一个 checkoutTime 属性,
  //  表示的是使用方从连接池中取出连接到归还连接的总时长,也就是连接被使用的时长
  protected long accumulatedCheckoutTime = 0;
  // 当连接长时间未归还给连接池时,会被认为该连接超时,该字段记录了超时的连接个数
  protected long claimedOverdueConnectionCount = 0;
  // 记录了累积超时时间
  protected long accumulatedCheckoutTimeOfOverdueConnections = 0;
  // 当连接池全部连接已经被占用之后,新的请求会阻塞等待,该字段就记录了累积的阻塞等待总时间
  protected long accumulatedWaitTime = 0;
  // 记录了阻塞等待总次数
  protected long hadToWaitCount = 0;
  // 无效的连接数
  protected long badConnectionCount = 0;
}

3.2.3 获取连接

在了解了 PooledConnectionPooledState 的核心实现之后,我们再来看 PooledDataSource 实现,这里按照使用方的逻辑依次分析 PooledDataSource 的核心方法。

首先是 getConnection() 方法,其中先是依赖 popConnection() 方法获取 PooledConnection 对象,然后从 PooledConnection 中获取数据库连接的代理对象(即前面介绍的 proxyConnection 字段)。这里调用的 popConnection() 方法是从连接池中获取数据库连接的核心

其核心实现总结为如下步骤:

  • 检测当前连接池中是否有空闲的有效连接,如果有,则直接返回连接;如果没有,则继续执行下一步。
  • 检查连接池当前的活跃连接数是否已经达到上限值,如果未达到,则尝试创建一个新的数据库连接,并在创建成功之后,返回新建的连接;如果已达到最大上限,则往下执行。
  • 检查活跃连接中是否有连接超时,如果有,则将超时的连接从活跃连接集合中移除,并重复步骤 2;如果没有,则执行下一步。
  • 当前请求数据库连接的线程阻塞等待,并定期执行前面三步检测相应的分支是否可能获取连接。

3.2.4 释放连接

前面介绍 PooledConnection 的时候,我们提到当调用 proxyConnection 对象的 close() 方法时,连接并没有真正关闭,而是通过 PooledDataSource.pushConnection() 方法将 PooledConnection 归还给了关联的 PooledDataSource

其核心实现总结为如下步骤:

  1. 从活跃连接集合(即前面提到的 activeConnections 集合)中删除传入的 PooledConnection 对象。
  2. 检测该 PooledConnection 对象是否可用。如果连接已不可用,则递增 badConnectionCount 字段进行统计,之后,直接丢弃 PooledConnection 对象即可。如果连接依旧可用,则执行下一步。
  3. 检测当前 PooledDataSource 连接池中的空闲连接是否已经达到上限值。如果达到上限值,则 PooledConnection 无法放回到池中,正常关闭其底层的数据库连接即可。如果未达到上限值,则继续执行下一步。
  4. 将底层连接重新封装成 PooledConnection 对象,并添加到空闲连接集合(也就是前面提到的 idleConnections 集合),然后唤醒所有阻塞等待空闲连接的线程。

3.2.5 检测连接可用性

通过对上述 pushConnection() 方法和 popConnection() 方法的分析,我们大致了解了 PooledDataSource 的核心实现。正如我们看到的那样,这两个方法都需要检测一个数据库连接是否可用,这是通过 PooledConnection.isValid() 方法实现的,在该方法中会检测三个方面:

  • valid 字段值为 true
  • realConnection 字段值不为空;
  • 执行 PooledDataSource.pingConnection() 方法,返回值为 true

只有这三个条件都成立,才认为这个 PooledConnection 对象可用。其中,PooledDataSource.pingConnection() 方法会尝试请求数据库,并执行一条测试 SQL 语句,检测是否真的能够访问到数据库,该方法的核心代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public boolean isValid() {
  return valid && realConnection != null && dataSource.pingConnection(this);
}

protected boolean pingConnection(PooledConnection conn) {
  boolean result = true; // 记录此次 ping 操作是否成功完成

  try {
      // 检测底层数据库连接是否已经关闭
      result = !conn.getRealConnection().isClosed();
  } catch (SQLException e) {
      result = false;
  }

  // 如果底层与数据库的网络连接没断开,则需要检测 poolPingEnabled 字段的配置,决定
  // 是否能执行 ping 操作。另外,ping 操作不能频繁执行,只有超过一定时长
  // (超过 poolPingConnectionsNotUsedFor 指定的时长)未使用的连接,才需要 ping
  // 操作来检测数据库连接是否正常
  if (result && poolPingEnabled && poolPingConnectionsNotUsedFor >= 0
    && conn.getTimeElapsedSinceLastUse() > poolPingConnectionsNotUsedFor) {
    try {
      // 执行 poolPingQuery 字段中记录的测试 SQL 语句
      Connection realConn = conn.getRealConnection();
      try (Statement statement = realConn.createStatement()) {
        statement.executeQuery(poolPingQuery).close();
      }
      if (!realConn.getAutoCommit()) {
        realConn.rollback();
      }
      result = true; // 不抛异常,即为成功
    } catch (Exception e) {
      try {
        // 关闭真实连接
        conn.getRealConnection().close();
      } catch (Exception e2) {
        // ignore
      }
      result = false; // 抛异常,即为失败
    }
  }
  return result;
}

4 事务接口

介绍完 MyBatis 对数据源的实现之后,我们接下来看与数据源紧密关联的另一个概念——事务。

当我们从数据源中得到一个可用的数据库连接之后,就可以开启一个数据库事务了,事务成功开启之后,我们才能修改数据库中的数据。在修改完成之后,我们需要提交事务,完成整个事务内的全部修改操作,如果修改过程中出现异常,我们也可以回滚事务,放弃整个事务中的全部修改操作。

可见,控制事务在一个以数据库为基础的服务中,是一件非常重要的工作。为此,MyBatis 专门抽象出来一个 Transaction 接口,好在相较于我们上面讲述的数据源,这部分内容还是比较简单、比较好理解的。

org.apache.ibatis.transaction.Transaction 接口是 MyBatis 中对数据库事务的抽象,其中定义了提交事务、回滚事务,以及获取事务底层数据库连接的方法。同时, MyBatis 自带的两个 Transaction 接口实现 JdbcTransactionManagedTransaction,这里也使用到了工厂方法模式,如下图所示:

Transaction

4.1 TransactionFactory

TransactionFactory 是用于创建 Transaction 的工厂接口,其中最核心的方法是 newTransaction() 方法,它会根据数据库连接或数据源创建 Transaction 对象。

JdbcTransactionFactoryManagedTransactionFactoryTransactionFactory 的两个实现类,分别用来创建 JdbcTransaction 对象和 ManagedTransaction 对象,本质也是工厂方法模式的实现。

这两个工厂的实现特别简单,以 JdbcTransactionFactory 为例看下他的源码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class JdbcTransactionFactory implements TransactionFactory {

  private boolean skipSetAutoCommitOnClose;

  @Override
  public void setProperties(Properties props) {
    if (props == null) {
      return;
    }
    String value = props.getProperty("skipSetAutoCommitOnClose");
    if (value != null) {
      skipSetAutoCommitOnClose = Boolean.parseBoolean(value);
    }
  }

  @Override
  public Transaction newTransaction(Connection conn) {
    return new JdbcTransaction(conn);
  }

  @Override
  public Transaction newTransaction(DataSource ds, TransactionIsolationLevel level, boolean autoCommit) {
    return new JdbcTransaction(ds, level, autoCommit, skipSetAutoCommitOnClose);
  }
}

4.2 JdbcTransaction

接下来,我们看一下 JdbcTransaction 的实现,其中维护了事务关联的数据库连接以及数据源对象,同时还记录了事务自身的属性,例如“事务隔离级别”和“是否自动提交”等。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
public class JdbcTransaction implements Transaction {
  // 数据库连接
  protected Connection connection;
  // 数据源
  protected DataSource dataSource;
  // 隔离级别
  protected TransactionIsolationLevel level;
  // 是否自动提交
  protected boolean autoCommit;
}

在日常使用数据库事务的时候,我们最常用的操作就是提交和回滚事务,Transaction 接口将这两个操作抽象为 commit() 方法和 rollback() 方法。在 commit() 方法和 rollback() 方法中,JdbcTransaction 都是通过 java.sql.Connection 的同名方法实现事务的提交和回滚的。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
@Override
public void commit() throws SQLException {
  if (connection != null && !connection.getAutoCommit()) {
    connection.commit();
  }
}

@Override
public void rollback() throws SQLException {
  if (connection != null && !connection.getAutoCommit()) {
    connection.rollback();
  }
}

4.3 ManagedTransaction

ManagedTransaction 的实现相较于 JdbcTransaction 来说,有些许类似,也是依赖关联的 DataSource 获取数据库连接,但其 commit()rollback() 方法都是空实现,事务的提交和回滚都是依靠容器管理的,这也是它被称为 ManagedTransaction 的原因。

1
2
3
4
5
6
7
8
9
@Override
public void commit() throws SQLException {
  // Does nothing
}

@Override
public void rollback() throws SQLException {
  // Does nothing
}

另外,与 JdbcTransaction 不同的是,ManagedTransaction 会根据初始化时传入的 closeConnection 值确定是否在事务关闭时,同时关闭关联的数据库连接(即调用其 close() 方法)。


欢迎关注我的公众号,第一时间获取文章更新:

微信公众号

相关内容