/* * Copyright (c) 2000-2018, 达梦数据库有限公司. * All rights reserved. */ package dm import ( "context" "database/sql" "database/sql/driver" "errors" "io" "regexp" "strings" "time" "gitee.com/chunanyong/dm/util" ) const ( SQL_SELECT_STANDBY = "select distinct mailIni.inst_name, mailIni.INST_IP, mailIni.INST_PORT, archIni.arch_status " + "from v$arch_status archIni " + "left join (select * from V$DM_MAL_INI) mailIni on archIni.arch_dest = mailIni.inst_name " + "left join V$MAL_LINK_STATUS on CTL_LINK_STATUS = 'CONNECTED' AND DATA_LINK_STATUS = 'CONNECTED' " + "where archIni.arch_type in ('TIMELY', 'REALTIME') AND archIni.arch_status = 'VALID'" SQL_SELECT_STANDBY2 = "select distinct " + "mailIni.mal_inst_name, mailIni.mal_INST_HOST, mailIni.mal_INST_PORT, archIni.arch_status " + "from v$arch_status archIni " + "left join (select * from V$DM_MAL_INI) mailIni " + "on archIni.arch_dest = mailIni.mal_inst_name " + "left join V$MAL_LINK_STATUS " + "on CTL_LINK_STATUS = 'CONNECTED' AND DATA_LINK_STATUS = 'CONNECTED' " + "where archIni.arch_type in ('TIMELY', 'REALTIME') AND archIni.arch_status = 'VALID'" ) type rwUtil struct { } var RWUtil = rwUtil{} func (RWUtil rwUtil) connect(c *DmConnector, ctx context.Context) (*DmConnection, error) { c.loginMode = LOGIN_MODE_PRIMARY_ONLY connection, err := c.connect(ctx) if err != nil { return nil, err } connection.rwInfo.rwCounter = getRwCounterInstance(connection, connection.StandbyCount) err = RWUtil.connectStandby(connection) return connection, err } func (RWUtil rwUtil) reconnect(connection *DmConnection) error { if connection.rwInfo == nil { return nil } RWUtil.removeStandby(connection) err := connection.reconnect() if err != nil { return err } connection.rwInfo.cleanup() connection.rwInfo.rwCounter = getRwCounterInstance(connection, connection.StandbyCount) err = RWUtil.connectStandby(connection) return err } func (RWUtil rwUtil) recoverStandby(connection *DmConnection) error { if connection.closed.IsSet() || RWUtil.isStandbyAlive(connection) { return nil } ts := time.Now().UnixNano() / 1000000 freq := int64(connection.dmConnector.rwStandbyRecoverTime) if freq <= 0 || ts-connection.rwInfo.tryRecoverTs < freq { return nil } err := RWUtil.connectStandby(connection) connection.rwInfo.tryRecoverTs = ts return err } func (RWUtil rwUtil) connectStandby(connection *DmConnection) error { var err error db, err := RWUtil.chooseValidStandby(connection) if err != nil { return err } if db == nil { return nil } standbyConnectorValue := *connection.dmConnector standbyConnector := &standbyConnectorValue standbyConnector.host = db.host standbyConnector.port = db.port standbyConnector.rwStandby = true standbyConnector.group = nil standbyConnector.loginMode = LOGIN_MODE_STANDBY_ONLY standbyConnector.switchTimes = 0 connection.rwInfo.connStandby, err = standbyConnector.connectSingle(context.Background()) if err != nil { return err } if connection.rwInfo.connStandby.SvrMode != SERVER_MODE_STANDBY || connection.rwInfo.connStandby.SvrStat != SERVER_STATUS_OPEN { RWUtil.removeStandby(connection) } return nil } func (RWUtil rwUtil) chooseValidStandby(connection *DmConnection) (*ep, error) { stmt, rs, err := connection.driverQuery(SQL_SELECT_STANDBY2) if err != nil { stmt, rs, err = connection.driverQuery(SQL_SELECT_STANDBY) } defer func() { if rs != nil { rs.close() } if stmt != nil { stmt.close() } }() if err == nil { count := int32(rs.CurrentRows.getRowCount()) if count > 0 { connection.rwInfo.rwCounter = getRwCounterInstance(connection, count) i := int32(0) rowIndex := connection.rwInfo.rwCounter.random(count) dest := make([]driver.Value, 3) for err := rs.next(dest); err != io.EOF; err = rs.next(dest) { if i == rowIndex { ep := newEP(dest[1].(string), dest[2].(int32)) return ep, nil } i++ } } } if err != nil { return nil, errors.New("choose valid standby error!" + err.Error()) } return nil, nil } func (RWUtil rwUtil) afterExceptionOnStandby(connection *DmConnection, e error) { if e.(*DmError).ErrCode == ECGO_COMMUNITION_ERROR.ErrCode { RWUtil.removeStandby(connection) } } func (RWUtil rwUtil) removeStandby(connection *DmConnection) { if connection.rwInfo.connStandby != nil { connection.rwInfo.connStandby.close() connection.rwInfo.connStandby = nil } } func (RWUtil rwUtil) isCreateStandbyStmt(stmt *DmStatement) bool { return stmt != nil && stmt.rwInfo.readOnly && RWUtil.isStandbyAlive(stmt.dmConn) } func (RWUtil rwUtil) executeByConn(conn *DmConnection, query string, execute1 func() (interface{}, error), execute2 func(otherConn *DmConnection) (interface{}, error)) (interface{}, error) { if err := RWUtil.recoverStandby(conn); err != nil { return nil, err } RWUtil.distributeSqlByConn(conn, query) turnToPrimary := false ret, err := execute1() if err != nil { if conn.rwInfo.connCurrent == conn.rwInfo.connStandby { RWUtil.afterExceptionOnStandby(conn, err) turnToPrimary = true } else { return nil, err } } curConn := conn.rwInfo.connCurrent var otherConn *DmConnection if curConn != conn { otherConn = conn } else { otherConn = conn.rwInfo.connStandby } switch curConn.lastExecInfo.retSqlType { case Dm_build_1063, Dm_build_1064, Dm_build_1068, Dm_build_1075, Dm_build_1074, Dm_build_1066: { if otherConn != nil { execute2(otherConn) } } case Dm_build_1073: { sqlhead := regexp.MustCompile("[ (]").Split(strings.TrimSpace(query), 2)[0] if util.StringUtil.EqualsIgnoreCase(sqlhead, "SP_SET_PARA_VALUE") || util.StringUtil.EqualsIgnoreCase(sqlhead, "SP_SET_SESSION_READONLY") { if otherConn != nil { execute2(otherConn) } } } case Dm_build_1072: { if conn.dmConnector.rwHA && curConn == conn.rwInfo.connStandby && (curConn.lastExecInfo.rsDatas == nil || len(curConn.lastExecInfo.rsDatas) == 0) { turnToPrimary = true } } } if turnToPrimary { conn.rwInfo.toPrimary() conn.rwInfo.connCurrent = conn return execute2(conn) } return ret, nil } func (RWUtil rwUtil) executeByStmt(stmt *DmStatement, execute1 func() (interface{}, error), execute2 func(otherStmt *DmStatement) (interface{}, error)) (interface{}, error) { orgStmt := stmt.rwInfo.stmtCurrent query := stmt.nativeSql if err := RWUtil.recoverStandby(stmt.dmConn); err != nil { return nil, err } RWUtil.distributeSqlByStmt(stmt) if orgStmt != stmt.rwInfo.stmtCurrent { RWUtil.copyStatement(orgStmt, stmt.rwInfo.stmtCurrent) stmt.rwInfo.stmtCurrent.nativeSql = orgStmt.nativeSql } turnToPrimary := false ret, err := execute1() if err != nil { if stmt.rwInfo.stmtCurrent == stmt.rwInfo.stmtStandby { RWUtil.afterExceptionOnStandby(stmt.dmConn, err) turnToPrimary = true } else { return nil, err } } curStmt := stmt.rwInfo.stmtCurrent var otherStmt *DmStatement if curStmt != stmt { otherStmt = stmt } else { otherStmt = stmt.rwInfo.stmtStandby } switch curStmt.execInfo.retSqlType { case Dm_build_1063, Dm_build_1064, Dm_build_1068, Dm_build_1075, Dm_build_1074, Dm_build_1066: { if otherStmt != nil { RWUtil.copyStatement(curStmt, otherStmt) execute2(otherStmt) } } case Dm_build_1073: { var tmpsql string if query != "" { tmpsql = strings.TrimSpace(query) } else if stmt.nativeSql != "" { tmpsql = strings.TrimSpace(stmt.nativeSql) } else { tmpsql = "" } sqlhead := regexp.MustCompile("[ (]").Split(tmpsql, 2)[0] if util.StringUtil.EqualsIgnoreCase(sqlhead, "SP_SET_PARA_VALUE") || util.StringUtil.EqualsIgnoreCase(sqlhead, "SP_SET_SESSION_READONLY") { if otherStmt != nil { RWUtil.copyStatement(curStmt, otherStmt) execute2(otherStmt) } } } case Dm_build_1072: { if stmt.dmConn.dmConnector.rwHA && curStmt == stmt.rwInfo.stmtStandby && (curStmt.execInfo.rsDatas == nil || len(curStmt.execInfo.rsDatas) == 0) { turnToPrimary = true } } } if turnToPrimary { stmt.dmConn.rwInfo.toPrimary() stmt.rwInfo.stmtCurrent = stmt RWUtil.copyStatement(stmt.rwInfo.stmtStandby, stmt) return execute2(stmt) } return ret, nil } func (RWUtil rwUtil) checkReadonlyByConn(conn *DmConnection, sql string) bool { readonly := true if sql != "" && !conn.dmConnector.rwIgnoreSql { tmpsql := strings.TrimSpace(sql) sqlhead := strings.SplitN(tmpsql, " ", 2)[0] if util.StringUtil.EqualsIgnoreCase(sqlhead, "INSERT") || util.StringUtil.EqualsIgnoreCase(sqlhead, "UPDATE") || util.StringUtil.EqualsIgnoreCase(sqlhead, "DELETE") || util.StringUtil.EqualsIgnoreCase(sqlhead, "CREATE") || util.StringUtil.EqualsIgnoreCase(sqlhead, "TRUNCATE") || util.StringUtil.EqualsIgnoreCase(sqlhead, "DROP") || util.StringUtil.EqualsIgnoreCase(sqlhead, "ALTER") { readonly = false } else { readonly = true } } return readonly } func (RWUtil rwUtil) checkReadonlyByStmt(stmt *DmStatement) bool { return RWUtil.checkReadonlyByConn(stmt.dmConn, stmt.nativeSql) } func (RWUtil rwUtil) distributeSqlByConn(conn *DmConnection, query string) RWSiteEnum { var dest RWSiteEnum if !RWUtil.isStandbyAlive(conn) { dest = conn.rwInfo.toPrimary() } else if !RWUtil.checkReadonlyByConn(conn, query) { dest = conn.rwInfo.toPrimary() } else if (conn.rwInfo.distribute == PRIMARY && !conn.trxFinish) || (conn.rwInfo.distribute == STANDBY && !conn.rwInfo.connStandby.trxFinish) { dest = conn.rwInfo.distribute } else if conn.IsoLevel != int32(sql.LevelSerializable) { dest = conn.rwInfo.toAny() } else { dest = conn.rwInfo.toPrimary() } if dest == PRIMARY { conn.rwInfo.connCurrent = conn } else { conn.rwInfo.connCurrent = conn.rwInfo.connStandby } return dest } func (RWUtil rwUtil) distributeSqlByStmt(stmt *DmStatement) RWSiteEnum { var dest RWSiteEnum if !RWUtil.isStandbyAlive(stmt.dmConn) { dest = stmt.dmConn.rwInfo.toPrimary() } else if !RWUtil.checkReadonlyByStmt(stmt) { dest = stmt.dmConn.rwInfo.toPrimary() } else if (stmt.dmConn.rwInfo.distribute == PRIMARY && !stmt.dmConn.trxFinish) || (stmt.dmConn.rwInfo.distribute == STANDBY && !stmt.dmConn.rwInfo.connStandby.trxFinish) { dest = stmt.dmConn.rwInfo.distribute } else if stmt.dmConn.IsoLevel != int32(sql.LevelSerializable) { dest = stmt.dmConn.rwInfo.toAny() } else { dest = stmt.dmConn.rwInfo.toPrimary() } if dest == STANDBY && !RWUtil.isStandbyStatementValid(stmt) { var err error stmt.rwInfo.stmtStandby, err = stmt.dmConn.rwInfo.connStandby.prepare(stmt.nativeSql) if err != nil { dest = stmt.dmConn.rwInfo.toPrimary() } } if dest == PRIMARY { stmt.rwInfo.stmtCurrent = stmt } else { stmt.rwInfo.stmtCurrent = stmt.rwInfo.stmtStandby } return dest } func (RWUtil rwUtil) isStandbyAlive(connection *DmConnection) bool { return connection.rwInfo.connStandby != nil && !connection.rwInfo.connStandby.closed.IsSet() } func (RWUtil rwUtil) isStandbyStatementValid(statement *DmStatement) bool { return statement.rwInfo.stmtStandby != nil && !statement.rwInfo.stmtStandby.closed } func (RWUtil rwUtil) copyStatement(srcStmt *DmStatement, destStmt *DmStatement) { destStmt.nativeSql = srcStmt.nativeSql destStmt.serverParams = srcStmt.serverParams destStmt.bindParams = srcStmt.bindParams destStmt.paramCount = srcStmt.paramCount }