mysql分布式事务01

分布式事务组成

  1. 资源管理器: 提供访问事务资源的方法,通常一个数据库就是一个资源管理器
  2. 事务管理器: 协调参与全局事务的各个事务。需要和参与全局事务中的所有资源管理器进行通信。
  3. 应用程序: 定义事务的边界,指定全局事务中的操作。

提交方式

两段式提交。在第一个阶段,所有参与全局事务的节点都开始准备,告诉事务管理器他们准备提交了;第二阶段,事务管理器告诉资源管理器执行commit或者时rollback,如果任意一个节点显示不能提交,则所有的节点进行回滚。

代码示例:

  1. MyXid.java
public class MyXid implements Xid{

    public int formatId;
    public byte gtrid[];
    public byte bqual[];

    public MyXid() {

    }

    public MyXid(int formatId, byte gtrid[], byte bqual[]) {
        this.formatId = formatId;
        this.gtrid = gtrid;
        this.bqual = bqual;
    }

    @Override
    public byte[] getBranchQualifier() {
        return bqual;
    }

    @Override
    public int getFormatId() {
        return formatId;
    }

    @Override
    public byte[] getGlobalTransactionId() {
        return gtrid;
    }

}
  1. Demo1.java

    ```java public class Demo1 {

    /**

    • 分布式数据库

    • @param url

    • @param user

    • @param pass

    • @return */ public static MysqlXADataSource getDataSource(String url, String user, String pass) { MysqlXADataSource mysqlXADataSource = new MysqlXADataSource(); mysqlXADataSource.setUrl(url); mysqlXADataSource.setUser(user); mysqlXADataSource.setPassword(pass); return mysqlXADataSource; }

      public static void main(String[] args) throws SQLException { String connStr1 = “jdbc:mysql://192.168.200.137:3306/bank_shanghai”; //数据库1 String connStr2 = “jdbc:mysql://127.0.0.1:3306/bank_beijing”; //数据库2

      MysqlXADataSource ds1 = getDataSource(connStr1,“root”,“123456”); MysqlXADataSource ds2 = getDataSource(connStr2, “root”, “Gepoint”);

      XAConnection xaConnection1 = ds1.getXAConnection(); XAConnection xaConnection2 = ds2.getXAConnection();

      XAResource xaResource1 = xaConnection1.getXAResource(); XAResource xaResource2 = xaConnection2.getXAResource();

      Connection connection1 = xaConnection1.getConnection(); //获得连接 Connection connection2 = xaConnection2.getConnection(); //获得连接

      Statement statement1 = connection1.createStatement(); Statement statement2 = connection2.createStatement();

      Xid xid1 = new MyXid(100, new byte[] {0x01},new byte[] {0x02}); Xid xid2 = new MyXid(100, new byte[] {0x11}, new byte[] {0x12});

      try { xaResource1.start(xid1, XAResource.TMNOFLAGS); statement1.execute(“UPDATE money SET avail='22292’ WHERE id= 12”); xaResource1.end(xid1, XAResource.TMSUCCESS); xaResource2.start(xid2, XAResource.TMNOFLAGS); statement2.execute(“UPDATE mone SET avail='22293’ WHERE id= 12”);//此处错误,会导致整个全局事务回滚 xaResource2.end(xid2, XAResource.TMSUCCESS); int ret2 = xaResource2.prepare(xid2); //第一阶段 int ret1 = xaResource1.prepare(xid1); if(ret1==XAResource.XA_OK && ret2==XAResource.XA_OK) { xaResource1.commit(xid1, false); //第二阶段 xaResource2.commit(xid2, false); }

      } catch (Exception e) { e.printStackTrace(); }

}

} ```