mysql分布式事务01
分布式事务组成
- 资源管理器: 提供访问事务资源的方法,通常一个数据库就是一个资源管理器
- 事务管理器: 协调参与全局事务的各个事务。需要和参与全局事务中的所有资源管理器进行通信。
- 应用程序: 定义事务的边界,指定全局事务中的操作。
提交方式
两段式提交。在第一个阶段,所有参与全局事务的节点都开始准备,告诉事务管理器他们准备提交了;第二阶段,事务管理器告诉资源管理器执行commit或者时rollback,如果任意一个节点显示不能提交,则所有的节点进行回滚。
代码示例:
- MyXid.java
public class MyXid implements Xid{
public int formatId;
public byte gtrid[];
public byte bqual[];
public MyXid() {
}
public MyXid(int formatId, byte gtrid[], byte bqual[]) {
this.formatId = formatId;
this.gtrid = gtrid;
this.bqual = bqual;
}
@Override
public byte[] getBranchQualifier() {
return bqual;
}
@Override
public int getFormatId() {
return formatId;
}
@Override
public byte[] getGlobalTransactionId() {
return gtrid;
}
}
- Demo1.java
public class Demo1 {
/**
* 分布式数据库
* @param url
* @param user
* @param pass
* @return
*/
public static MysqlXADataSource getDataSource(String url, String user, String pass) {
MysqlXADataSource mysqlXADataSource = new MysqlXADataSource();
mysqlXADataSource.setUrl(url);
mysqlXADataSource.setUser(user);
mysqlXADataSource.setPassword(pass);
return mysqlXADataSource;
}
public static void main(String[] args) throws SQLException {
String connStr1 = "jdbc:mysql://192.168.200.137:3306/bank_shanghai"; //数据库1
String connStr2 = "jdbc:mysql://127.0.0.1:3306/bank_beijing"; //数据库2
MysqlXADataSource ds1 = getDataSource(connStr1,"root","123456");
MysqlXADataSource ds2 = getDataSource(connStr2, "root", "Gepoint");
XAConnection xaConnection1 = ds1.getXAConnection();
XAConnection xaConnection2 = ds2.getXAConnection();
XAResource xaResource1 = xaConnection1.getXAResource();
XAResource xaResource2 = xaConnection2.getXAResource();
Connection connection1 = xaConnection1.getConnection(); //获得连接
Connection connection2 = xaConnection2.getConnection(); //获得连接
Statement statement1 = connection1.createStatement();
Statement statement2 = connection2.createStatement();
Xid xid1 = new MyXid(100, new byte[] {0x01},new byte[] {0x02});
Xid xid2 = new MyXid(100, new byte[] {0x11}, new byte[] {0x12});
try {
xaResource1.start(xid1, XAResource.TMNOFLAGS);
statement1.execute("UPDATE money SET avail='22292' WHERE id= 12");
xaResource1.end(xid1, XAResource.TMSUCCESS);
xaResource2.start(xid2, XAResource.TMNOFLAGS);
statement2.execute("UPDATE mone SET avail='22293' WHERE id= 12");//此处错误,会导致整个全局事务回滚
xaResource2.end(xid2, XAResource.TMSUCCESS);
int ret2 = xaResource2.prepare(xid2); //第一阶段
int ret1 = xaResource1.prepare(xid1);
if(ret1==XAResource.XA_OK && ret2==XAResource.XA_OK) {
xaResource1.commit(xid1, false); //第二阶段
xaResource2.commit(xid2, false);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}