zc.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443
  1. /*
  2. * Copyright (c) 2000-2018, 达梦数据库有限公司.
  3. * All rights reserved.
  4. */
  5. package dm
  6. import (
  7. "context"
  8. "database/sql/driver"
  9. "reflect"
  10. "sync"
  11. "sync/atomic"
  12. "time"
  13. )
  14. type filter interface {
  15. DmDriverOpen(filterChain *filterChain, d *DmDriver, dsn string) (*DmConnection, error)
  16. DmDriverOpenConnector(filterChain *filterChain, d *DmDriver, dsn string) (*DmConnector, error)
  17. DmConnectorConnect(filterChain *filterChain, c *DmConnector, ctx context.Context) (*DmConnection, error)
  18. DmConnectorDriver(filterChain *filterChain, c *DmConnector) *DmDriver
  19. DmConnectionBegin(filterChain *filterChain, c *DmConnection) (*DmConnection, error)
  20. DmConnectionBeginTx(filterChain *filterChain, c *DmConnection, ctx context.Context, opts driver.TxOptions) (*DmConnection, error)
  21. DmConnectionCommit(filterChain *filterChain, c *DmConnection) error
  22. DmConnectionRollback(filterChain *filterChain, c *DmConnection) error
  23. DmConnectionClose(filterChain *filterChain, c *DmConnection) error
  24. DmConnectionPing(filterChain *filterChain, c *DmConnection, ctx context.Context) error
  25. DmConnectionExec(filterChain *filterChain, c *DmConnection, query string, args []driver.Value) (*DmResult, error)
  26. DmConnectionExecContext(filterChain *filterChain, c *DmConnection, ctx context.Context, query string, args []driver.NamedValue) (*DmResult, error)
  27. DmConnectionQuery(filterChain *filterChain, c *DmConnection, query string, args []driver.Value) (*DmRows, error)
  28. DmConnectionQueryContext(filterChain *filterChain, c *DmConnection, ctx context.Context, query string, args []driver.NamedValue) (*DmRows, error)
  29. DmConnectionPrepare(filterChain *filterChain, c *DmConnection, query string) (*DmStatement, error)
  30. DmConnectionPrepareContext(filterChain *filterChain, c *DmConnection, ctx context.Context, query string) (*DmStatement, error)
  31. DmConnectionResetSession(filterChain *filterChain, c *DmConnection, ctx context.Context) error
  32. DmConnectionCheckNamedValue(filterChain *filterChain, c *DmConnection, nv *driver.NamedValue) error
  33. DmStatementClose(filterChain *filterChain, s *DmStatement) error
  34. DmStatementNumInput(filterChain *filterChain, s *DmStatement) int
  35. DmStatementExec(filterChain *filterChain, s *DmStatement, args []driver.Value) (*DmResult, error)
  36. DmStatementExecContext(filterChain *filterChain, s *DmStatement, ctx context.Context, args []driver.NamedValue) (*DmResult, error)
  37. DmStatementQuery(filterChain *filterChain, s *DmStatement, args []driver.Value) (*DmRows, error)
  38. DmStatementQueryContext(filterChain *filterChain, s *DmStatement, ctx context.Context, args []driver.NamedValue) (*DmRows, error)
  39. DmStatementCheckNamedValue(filterChain *filterChain, s *DmStatement, nv *driver.NamedValue) error
  40. DmResultLastInsertId(filterChain *filterChain, r *DmResult) (int64, error)
  41. DmResultRowsAffected(filterChain *filterChain, r *DmResult) (int64, error)
  42. DmRowsColumns(filterChain *filterChain, r *DmRows) []string
  43. DmRowsClose(filterChain *filterChain, r *DmRows) error
  44. DmRowsNext(filterChain *filterChain, r *DmRows, dest []driver.Value) error
  45. DmRowsHasNextResultSet(filterChain *filterChain, r *DmRows) bool
  46. DmRowsNextResultSet(filterChain *filterChain, r *DmRows) error
  47. DmRowsColumnTypeScanType(filterChain *filterChain, r *DmRows, index int) reflect.Type
  48. DmRowsColumnTypeDatabaseTypeName(filterChain *filterChain, r *DmRows, index int) string
  49. DmRowsColumnTypeLength(filterChain *filterChain, r *DmRows, index int) (length int64, ok bool)
  50. DmRowsColumnTypeNullable(filterChain *filterChain, r *DmRows, index int) (nullable, ok bool)
  51. DmRowsColumnTypePrecisionScale(filterChain *filterChain, r *DmRows, index int) (precision, scale int64, ok bool)
  52. }
  53. type IDGenerator int64
  54. var dmDriverIDGenerator = new(IDGenerator)
  55. var dmConntorIDGenerator = new(IDGenerator)
  56. var dmConnIDGenerator = new(IDGenerator)
  57. var dmStmtIDGenerator = new(IDGenerator)
  58. var dmResultIDGenerator = new(IDGenerator)
  59. var dmRowsIDGenerator = new(IDGenerator)
  60. func (g *IDGenerator) incrementAndGet() int64 {
  61. return atomic.AddInt64((*int64)(g), 1)
  62. }
  63. type RWSiteEnum int
  64. const (
  65. PRIMARY RWSiteEnum = iota
  66. STANDBY
  67. ANYSITE
  68. )
  69. var (
  70. goMapMu sync.RWMutex
  71. goMap = make(map[string]goRun, 2)
  72. )
  73. type filterable struct {
  74. filterChain *filterChain
  75. rwInfo *rwInfo
  76. logInfo *logInfo
  77. recoverInfo *recoverInfo
  78. statInfo *statInfo
  79. objId int64
  80. idGenerator *IDGenerator
  81. }
  82. func runLog() {
  83. goMapMu.Lock()
  84. _, ok := goMap["log"]
  85. if !ok {
  86. goMap["log"] = &logWriter{
  87. flushQueue: make(chan []byte, LogFlushQueueSize),
  88. date: time.Now().Format("2006-01-02"),
  89. logFile: nil,
  90. flushFreq: LogFlushFreq,
  91. filePath: LogDir,
  92. filePrefix: "dm_go",
  93. buffer: Dm_build_287(),
  94. }
  95. go goMap["log"].doRun()
  96. }
  97. goMapMu.Unlock()
  98. }
  99. func runStat() {
  100. goMapMu.Lock()
  101. _, ok := goMap["stat"]
  102. if !ok {
  103. goMap["stat"] = newStatFlusher()
  104. go goMap["stat"].doRun()
  105. }
  106. goMapMu.Unlock()
  107. }
  108. func (f *filterable) createFilterChain(bc *DmConnector, props *Properties) {
  109. var filters = make([]filter, 0, 5)
  110. if bc != nil {
  111. if LogLevel != LOG_OFF {
  112. filters = append(filters, &logFilter{})
  113. f.logInfo = &logInfo{logRecord: new(LogRecord)}
  114. runLog()
  115. }
  116. if StatEnable {
  117. filters = append(filters, &statFilter{})
  118. f.statInfo = newStatInfo()
  119. goStatMu.Lock()
  120. if goStat == nil {
  121. goStat = newGoStat(1000)
  122. }
  123. goStatMu.Unlock()
  124. runStat()
  125. }
  126. if bc.doSwitch != DO_SWITCH_OFF {
  127. filters = append(filters, &reconnectFilter{})
  128. }
  129. if bc.rwSeparate {
  130. filters = append(filters, &rwFilter{})
  131. f.rwInfo = newRwInfo()
  132. }
  133. } else if props != nil {
  134. if ParseLogLevel(props) != LOG_OFF {
  135. filters = append(filters, &logFilter{})
  136. f.logInfo = &logInfo{logRecord: new(LogRecord)}
  137. runLog()
  138. }
  139. if props.GetBool("statEnable", StatEnable) {
  140. filters = append(filters, &statFilter{})
  141. f.statInfo = newStatInfo()
  142. goStatMu.Lock()
  143. if goStat == nil {
  144. goStat = newGoStat(1000)
  145. }
  146. goStatMu.Unlock()
  147. runStat()
  148. }
  149. if props.GetInt(DoSwitchKey, int(DO_SWITCH_OFF), 0, 2) != int(DO_SWITCH_OFF) {
  150. filters = append(filters, &reconnectFilter{})
  151. f.recoverInfo = newRecoverInfo()
  152. }
  153. if props.GetBool("rwSeparate", false) {
  154. filters = append(filters, &rwFilter{})
  155. f.rwInfo = newRwInfo()
  156. }
  157. }
  158. f.filterChain = newFilterChain(filters)
  159. }
  160. func (f *filterable) resetFilterable(src *filterable) {
  161. f.filterChain = src.filterChain
  162. f.logInfo = src.logInfo
  163. f.rwInfo = src.rwInfo
  164. f.statInfo = src.statInfo
  165. }
  166. func (f filterable) getID() int64 {
  167. if f.objId < 0 {
  168. f.objId = f.idGenerator.incrementAndGet()
  169. }
  170. return f.objId
  171. }
  172. type logInfo struct {
  173. logRecord *LogRecord
  174. lastExecuteStartNano time.Time
  175. }
  176. type rwInfo struct {
  177. distribute RWSiteEnum
  178. rwCounter *rwCounter
  179. connStandby *DmConnection
  180. connCurrent *DmConnection
  181. tryRecoverTs int64
  182. stmtStandby *DmStatement
  183. stmtCurrent *DmStatement
  184. readOnly bool
  185. }
  186. func newRwInfo() *rwInfo {
  187. rwInfo := new(rwInfo)
  188. rwInfo.distribute = PRIMARY
  189. rwInfo.readOnly = true
  190. return rwInfo
  191. }
  192. func (rwi *rwInfo) cleanup() {
  193. rwi.distribute = PRIMARY
  194. rwi.rwCounter = nil
  195. rwi.connStandby = nil
  196. rwi.connCurrent = nil
  197. rwi.stmtStandby = nil
  198. rwi.stmtCurrent = nil
  199. }
  200. func (rwi *rwInfo) toPrimary() RWSiteEnum {
  201. if rwi.distribute != PRIMARY {
  202. rwi.rwCounter.countPrimary()
  203. }
  204. rwi.distribute = PRIMARY
  205. return rwi.distribute
  206. }
  207. func (rwi *rwInfo) toAny() RWSiteEnum {
  208. rwi.distribute = rwi.rwCounter.count(ANYSITE, rwi.connStandby)
  209. return rwi.distribute
  210. }
  211. type recoverInfo struct {
  212. checkEpRecoverTs int64
  213. }
  214. func newRecoverInfo() *recoverInfo {
  215. recoverInfo := new(recoverInfo)
  216. recoverInfo.checkEpRecoverTs = 0
  217. return recoverInfo
  218. }
  219. type statInfo struct {
  220. constructNano int64
  221. connStat *connectionStat
  222. lastExecuteStartNano int64
  223. lastExecuteTimeNano int64
  224. lastExecuteType ExecuteTypeEnum
  225. firstResultSet bool
  226. lastExecuteSql string
  227. sqlStat *sqlStat
  228. sql string
  229. cursorIndex int
  230. closeCount int
  231. readStringLength int64
  232. readBytesLength int64
  233. openInputStreamCount int
  234. openReaderCount int
  235. }
  236. var (
  237. goStatMu sync.RWMutex
  238. goStat *GoStat
  239. )
  240. func newStatInfo() *statInfo {
  241. si := new(statInfo)
  242. return si
  243. }
  244. func (si *statInfo) init(conn *DmConnection) {
  245. si.connStat = goStat.createConnStat(conn)
  246. }
  247. func (si *statInfo) setConstructNano() {
  248. si.constructNano = time.Now().UnixNano()
  249. }
  250. func (si *statInfo) getConstructNano() int64 {
  251. return si.constructNano
  252. }
  253. func (si *statInfo) getConnStat() *connectionStat {
  254. return si.connStat
  255. }
  256. func (si *statInfo) getLastExecuteStartNano() int64 {
  257. return si.lastExecuteStartNano
  258. }
  259. func (si *statInfo) setLastExecuteStartNano(lastExecuteStartNano int64) {
  260. si.lastExecuteStartNano = lastExecuteStartNano
  261. }
  262. func (si *statInfo) getLastExecuteTimeNano() int64 {
  263. return si.lastExecuteTimeNano
  264. }
  265. func (si *statInfo) setLastExecuteTimeNano(lastExecuteTimeNano int64) {
  266. si.lastExecuteTimeNano = lastExecuteTimeNano
  267. }
  268. func (si *statInfo) getLastExecuteType() ExecuteTypeEnum {
  269. return si.lastExecuteType
  270. }
  271. func (si *statInfo) setLastExecuteType(lastExecuteType ExecuteTypeEnum) {
  272. si.lastExecuteType = lastExecuteType
  273. }
  274. func (si *statInfo) isFirstResultSet() bool {
  275. return si.firstResultSet
  276. }
  277. func (si *statInfo) setFirstResultSet(firstResultSet bool) {
  278. si.firstResultSet = firstResultSet
  279. }
  280. func (si *statInfo) getLastExecuteSql() string {
  281. return si.lastExecuteSql
  282. }
  283. func (si *statInfo) setLastExecuteSql(lastExecuteSql string) {
  284. si.lastExecuteSql = lastExecuteSql
  285. }
  286. func (si *statInfo) getSqlStat() *sqlStat {
  287. return si.sqlStat
  288. }
  289. func (si *statInfo) setSqlStat(sqlStat *sqlStat) {
  290. si.sqlStat = sqlStat
  291. }
  292. func (si *statInfo) setConnStat(connStat *connectionStat) {
  293. si.connStat = connStat
  294. }
  295. func (si *statInfo) setConstructNanoWithConstructNano(constructNano int64) {
  296. si.constructNano = constructNano
  297. }
  298. func (si *statInfo) afterExecute(nanoSpan int64) {
  299. si.lastExecuteTimeNano = nanoSpan
  300. }
  301. func (si *statInfo) beforeExecute() {
  302. si.lastExecuteStartNano = time.Now().UnixNano()
  303. }
  304. func (si *statInfo) getSql() string {
  305. return si.sql
  306. }
  307. func (si *statInfo) setSql(sql string) {
  308. si.sql = sql
  309. }
  310. func (si *statInfo) getCursorIndex() int {
  311. return si.cursorIndex
  312. }
  313. func (si *statInfo) setCursorIndex(cursorIndex int) {
  314. si.cursorIndex = cursorIndex
  315. }
  316. func (si *statInfo) getCloseCount() int {
  317. return si.closeCount
  318. }
  319. func (si *statInfo) setCloseCount(closeCount int) {
  320. si.closeCount = closeCount
  321. }
  322. func (si *statInfo) getReadStringLength() int64 {
  323. return si.readStringLength
  324. }
  325. func (si *statInfo) setReadStringLength(readStringLength int64) {
  326. si.readStringLength = readStringLength
  327. }
  328. func (si *statInfo) getReadBytesLength() int64 {
  329. return si.readBytesLength
  330. }
  331. func (si *statInfo) setReadBytesLength(readBytesLength int64) {
  332. si.readBytesLength = readBytesLength
  333. }
  334. func (si *statInfo) getOpenInputStreamCount() int {
  335. return si.openInputStreamCount
  336. }
  337. func (si *statInfo) setOpenInputStreamCount(openInputStreamCount int) {
  338. si.openInputStreamCount = openInputStreamCount
  339. }
  340. func (si *statInfo) getOpenReaderCount() int {
  341. return si.openReaderCount
  342. }
  343. func (si *statInfo) setOpenReaderCount(openReaderCount int) {
  344. si.openReaderCount = openReaderCount
  345. }
  346. func (si *statInfo) incrementCloseCount() {
  347. si.closeCount++
  348. }