| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443 |
- /*
- * Copyright (c) 2000-2018, 达梦数据库有限公司.
- * All rights reserved.
- */
- package dm
- import (
- "context"
- "database/sql/driver"
- "reflect"
- "sync"
- "sync/atomic"
- "time"
- )
- type filter interface {
- DmDriverOpen(filterChain *filterChain, d *DmDriver, dsn string) (*DmConnection, error)
- DmDriverOpenConnector(filterChain *filterChain, d *DmDriver, dsn string) (*DmConnector, error)
- DmConnectorConnect(filterChain *filterChain, c *DmConnector, ctx context.Context) (*DmConnection, error)
- DmConnectorDriver(filterChain *filterChain, c *DmConnector) *DmDriver
- DmConnectionBegin(filterChain *filterChain, c *DmConnection) (*DmConnection, error)
- DmConnectionBeginTx(filterChain *filterChain, c *DmConnection, ctx context.Context, opts driver.TxOptions) (*DmConnection, error)
- DmConnectionCommit(filterChain *filterChain, c *DmConnection) error
- DmConnectionRollback(filterChain *filterChain, c *DmConnection) error
- DmConnectionClose(filterChain *filterChain, c *DmConnection) error
- DmConnectionPing(filterChain *filterChain, c *DmConnection, ctx context.Context) error
- DmConnectionExec(filterChain *filterChain, c *DmConnection, query string, args []driver.Value) (*DmResult, error)
- DmConnectionExecContext(filterChain *filterChain, c *DmConnection, ctx context.Context, query string, args []driver.NamedValue) (*DmResult, error)
- DmConnectionQuery(filterChain *filterChain, c *DmConnection, query string, args []driver.Value) (*DmRows, error)
- DmConnectionQueryContext(filterChain *filterChain, c *DmConnection, ctx context.Context, query string, args []driver.NamedValue) (*DmRows, error)
- DmConnectionPrepare(filterChain *filterChain, c *DmConnection, query string) (*DmStatement, error)
- DmConnectionPrepareContext(filterChain *filterChain, c *DmConnection, ctx context.Context, query string) (*DmStatement, error)
- DmConnectionResetSession(filterChain *filterChain, c *DmConnection, ctx context.Context) error
- DmConnectionCheckNamedValue(filterChain *filterChain, c *DmConnection, nv *driver.NamedValue) error
- DmStatementClose(filterChain *filterChain, s *DmStatement) error
- DmStatementNumInput(filterChain *filterChain, s *DmStatement) int
- DmStatementExec(filterChain *filterChain, s *DmStatement, args []driver.Value) (*DmResult, error)
- DmStatementExecContext(filterChain *filterChain, s *DmStatement, ctx context.Context, args []driver.NamedValue) (*DmResult, error)
- DmStatementQuery(filterChain *filterChain, s *DmStatement, args []driver.Value) (*DmRows, error)
- DmStatementQueryContext(filterChain *filterChain, s *DmStatement, ctx context.Context, args []driver.NamedValue) (*DmRows, error)
- DmStatementCheckNamedValue(filterChain *filterChain, s *DmStatement, nv *driver.NamedValue) error
- DmResultLastInsertId(filterChain *filterChain, r *DmResult) (int64, error)
- DmResultRowsAffected(filterChain *filterChain, r *DmResult) (int64, error)
- DmRowsColumns(filterChain *filterChain, r *DmRows) []string
- DmRowsClose(filterChain *filterChain, r *DmRows) error
- DmRowsNext(filterChain *filterChain, r *DmRows, dest []driver.Value) error
- DmRowsHasNextResultSet(filterChain *filterChain, r *DmRows) bool
- DmRowsNextResultSet(filterChain *filterChain, r *DmRows) error
- DmRowsColumnTypeScanType(filterChain *filterChain, r *DmRows, index int) reflect.Type
- DmRowsColumnTypeDatabaseTypeName(filterChain *filterChain, r *DmRows, index int) string
- DmRowsColumnTypeLength(filterChain *filterChain, r *DmRows, index int) (length int64, ok bool)
- DmRowsColumnTypeNullable(filterChain *filterChain, r *DmRows, index int) (nullable, ok bool)
- DmRowsColumnTypePrecisionScale(filterChain *filterChain, r *DmRows, index int) (precision, scale int64, ok bool)
- }
- type IDGenerator int64
- var dmDriverIDGenerator = new(IDGenerator)
- var dmConntorIDGenerator = new(IDGenerator)
- var dmConnIDGenerator = new(IDGenerator)
- var dmStmtIDGenerator = new(IDGenerator)
- var dmResultIDGenerator = new(IDGenerator)
- var dmRowsIDGenerator = new(IDGenerator)
- func (g *IDGenerator) incrementAndGet() int64 {
- return atomic.AddInt64((*int64)(g), 1)
- }
- type RWSiteEnum int
- const (
- PRIMARY RWSiteEnum = iota
- STANDBY
- ANYSITE
- )
- var (
- goMapMu sync.RWMutex
- goMap = make(map[string]goRun, 2)
- )
- type filterable struct {
- filterChain *filterChain
- rwInfo *rwInfo
- logInfo *logInfo
- recoverInfo *recoverInfo
- statInfo *statInfo
- objId int64
- idGenerator *IDGenerator
- }
- func runLog() {
- goMapMu.Lock()
- _, ok := goMap["log"]
- if !ok {
- goMap["log"] = &logWriter{
- flushQueue: make(chan []byte, LogFlushQueueSize),
- date: time.Now().Format("2006-01-02"),
- logFile: nil,
- flushFreq: LogFlushFreq,
- filePath: LogDir,
- filePrefix: "dm_go",
- buffer: Dm_build_287(),
- }
- go goMap["log"].doRun()
- }
- goMapMu.Unlock()
- }
- func runStat() {
- goMapMu.Lock()
- _, ok := goMap["stat"]
- if !ok {
- goMap["stat"] = newStatFlusher()
- go goMap["stat"].doRun()
- }
- goMapMu.Unlock()
- }
- func (f *filterable) createFilterChain(bc *DmConnector, props *Properties) {
- var filters = make([]filter, 0, 5)
- if bc != nil {
- if LogLevel != LOG_OFF {
- filters = append(filters, &logFilter{})
- f.logInfo = &logInfo{logRecord: new(LogRecord)}
- runLog()
- }
- if StatEnable {
- filters = append(filters, &statFilter{})
- f.statInfo = newStatInfo()
- goStatMu.Lock()
- if goStat == nil {
- goStat = newGoStat(1000)
- }
- goStatMu.Unlock()
- runStat()
- }
- if bc.doSwitch != DO_SWITCH_OFF {
- filters = append(filters, &reconnectFilter{})
- }
- if bc.rwSeparate {
- filters = append(filters, &rwFilter{})
- f.rwInfo = newRwInfo()
- }
- } else if props != nil {
- if ParseLogLevel(props) != LOG_OFF {
- filters = append(filters, &logFilter{})
- f.logInfo = &logInfo{logRecord: new(LogRecord)}
- runLog()
- }
- if props.GetBool("statEnable", StatEnable) {
- filters = append(filters, &statFilter{})
- f.statInfo = newStatInfo()
- goStatMu.Lock()
- if goStat == nil {
- goStat = newGoStat(1000)
- }
- goStatMu.Unlock()
- runStat()
- }
- if props.GetInt(DoSwitchKey, int(DO_SWITCH_OFF), 0, 2) != int(DO_SWITCH_OFF) {
- filters = append(filters, &reconnectFilter{})
- f.recoverInfo = newRecoverInfo()
- }
- if props.GetBool("rwSeparate", false) {
- filters = append(filters, &rwFilter{})
- f.rwInfo = newRwInfo()
- }
- }
- f.filterChain = newFilterChain(filters)
- }
- func (f *filterable) resetFilterable(src *filterable) {
- f.filterChain = src.filterChain
- f.logInfo = src.logInfo
- f.rwInfo = src.rwInfo
- f.statInfo = src.statInfo
- }
- func (f filterable) getID() int64 {
- if f.objId < 0 {
- f.objId = f.idGenerator.incrementAndGet()
- }
- return f.objId
- }
- type logInfo struct {
- logRecord *LogRecord
- lastExecuteStartNano time.Time
- }
- type rwInfo struct {
- distribute RWSiteEnum
- rwCounter *rwCounter
- connStandby *DmConnection
- connCurrent *DmConnection
- tryRecoverTs int64
- stmtStandby *DmStatement
- stmtCurrent *DmStatement
- readOnly bool
- }
- func newRwInfo() *rwInfo {
- rwInfo := new(rwInfo)
- rwInfo.distribute = PRIMARY
- rwInfo.readOnly = true
- return rwInfo
- }
- func (rwi *rwInfo) cleanup() {
- rwi.distribute = PRIMARY
- rwi.rwCounter = nil
- rwi.connStandby = nil
- rwi.connCurrent = nil
- rwi.stmtStandby = nil
- rwi.stmtCurrent = nil
- }
- func (rwi *rwInfo) toPrimary() RWSiteEnum {
- if rwi.distribute != PRIMARY {
- rwi.rwCounter.countPrimary()
- }
- rwi.distribute = PRIMARY
- return rwi.distribute
- }
- func (rwi *rwInfo) toAny() RWSiteEnum {
- rwi.distribute = rwi.rwCounter.count(ANYSITE, rwi.connStandby)
- return rwi.distribute
- }
- type recoverInfo struct {
- checkEpRecoverTs int64
- }
- func newRecoverInfo() *recoverInfo {
- recoverInfo := new(recoverInfo)
- recoverInfo.checkEpRecoverTs = 0
- return recoverInfo
- }
- type statInfo struct {
- constructNano int64
- connStat *connectionStat
- lastExecuteStartNano int64
- lastExecuteTimeNano int64
- lastExecuteType ExecuteTypeEnum
- firstResultSet bool
- lastExecuteSql string
- sqlStat *sqlStat
- sql string
- cursorIndex int
- closeCount int
- readStringLength int64
- readBytesLength int64
- openInputStreamCount int
- openReaderCount int
- }
- var (
- goStatMu sync.RWMutex
- goStat *GoStat
- )
- func newStatInfo() *statInfo {
- si := new(statInfo)
- return si
- }
- func (si *statInfo) init(conn *DmConnection) {
- si.connStat = goStat.createConnStat(conn)
- }
- func (si *statInfo) setConstructNano() {
- si.constructNano = time.Now().UnixNano()
- }
- func (si *statInfo) getConstructNano() int64 {
- return si.constructNano
- }
- func (si *statInfo) getConnStat() *connectionStat {
- return si.connStat
- }
- func (si *statInfo) getLastExecuteStartNano() int64 {
- return si.lastExecuteStartNano
- }
- func (si *statInfo) setLastExecuteStartNano(lastExecuteStartNano int64) {
- si.lastExecuteStartNano = lastExecuteStartNano
- }
- func (si *statInfo) getLastExecuteTimeNano() int64 {
- return si.lastExecuteTimeNano
- }
- func (si *statInfo) setLastExecuteTimeNano(lastExecuteTimeNano int64) {
- si.lastExecuteTimeNano = lastExecuteTimeNano
- }
- func (si *statInfo) getLastExecuteType() ExecuteTypeEnum {
- return si.lastExecuteType
- }
- func (si *statInfo) setLastExecuteType(lastExecuteType ExecuteTypeEnum) {
- si.lastExecuteType = lastExecuteType
- }
- func (si *statInfo) isFirstResultSet() bool {
- return si.firstResultSet
- }
- func (si *statInfo) setFirstResultSet(firstResultSet bool) {
- si.firstResultSet = firstResultSet
- }
- func (si *statInfo) getLastExecuteSql() string {
- return si.lastExecuteSql
- }
- func (si *statInfo) setLastExecuteSql(lastExecuteSql string) {
- si.lastExecuteSql = lastExecuteSql
- }
- func (si *statInfo) getSqlStat() *sqlStat {
- return si.sqlStat
- }
- func (si *statInfo) setSqlStat(sqlStat *sqlStat) {
- si.sqlStat = sqlStat
- }
- func (si *statInfo) setConnStat(connStat *connectionStat) {
- si.connStat = connStat
- }
- func (si *statInfo) setConstructNanoWithConstructNano(constructNano int64) {
- si.constructNano = constructNano
- }
- func (si *statInfo) afterExecute(nanoSpan int64) {
- si.lastExecuteTimeNano = nanoSpan
- }
- func (si *statInfo) beforeExecute() {
- si.lastExecuteStartNano = time.Now().UnixNano()
- }
- func (si *statInfo) getSql() string {
- return si.sql
- }
- func (si *statInfo) setSql(sql string) {
- si.sql = sql
- }
- func (si *statInfo) getCursorIndex() int {
- return si.cursorIndex
- }
- func (si *statInfo) setCursorIndex(cursorIndex int) {
- si.cursorIndex = cursorIndex
- }
- func (si *statInfo) getCloseCount() int {
- return si.closeCount
- }
- func (si *statInfo) setCloseCount(closeCount int) {
- si.closeCount = closeCount
- }
- func (si *statInfo) getReadStringLength() int64 {
- return si.readStringLength
- }
- func (si *statInfo) setReadStringLength(readStringLength int64) {
- si.readStringLength = readStringLength
- }
- func (si *statInfo) getReadBytesLength() int64 {
- return si.readBytesLength
- }
- func (si *statInfo) setReadBytesLength(readBytesLength int64) {
- si.readBytesLength = readBytesLength
- }
- func (si *statInfo) getOpenInputStreamCount() int {
- return si.openInputStreamCount
- }
- func (si *statInfo) setOpenInputStreamCount(openInputStreamCount int) {
- si.openInputStreamCount = openInputStreamCount
- }
- func (si *statInfo) getOpenReaderCount() int {
- return si.openReaderCount
- }
- func (si *statInfo) setOpenReaderCount(openReaderCount int) {
- si.openReaderCount = openReaderCount
- }
- func (si *statInfo) incrementCloseCount() {
- si.closeCount++
- }
|