g.go 53 KB


  1. /*
  2. * Copyright (c) 2000-2018, 达梦数据库有限公司.
  3. * All rights reserved.
  4. */
  5. package dm
  6. import (
  7. "fmt"
  8. "math"
  9. "os"
  10. "sort"
  11. "strconv"
  12. "strings"
  13. "sync"
  14. "sync/atomic"
  15. "time"
  16. "gitee.com/chunanyong/dm/util"
  17. )
  18. type ExecuteTypeEnum int
  19. const (
  20. Execute ExecuteTypeEnum = iota
  21. ExecuteQuery
  22. ExecuteUpdate
  23. )
  24. var idGenerator int64 = 0
  25. func generateId() string {
  26. return time.Now().String() + strconv.Itoa(int(atomic.AddInt64(&idGenerator, 1)))
  27. }
  28. func getInt64(counter *int64, reset bool) int64 {
  29. if reset {
  30. return atomic.SwapInt64(counter, 0)
  31. }
  32. return atomic.LoadInt64(counter)
  33. }
  34. type SqlStatValue struct {
  35. id string
  36. sql string
  37. sqlHash int64
  38. dataSource string
  39. dataSourceId string
  40. executeLastStartTime int64
  41. executeBatchSizeTotal int64
  42. executeBatchSizeMax int64
  43. executeSuccessCount int64
  44. executeSpanNanoTotal int64
  45. executeSpanNanoMax int64
  46. runningCount int64
  47. concurrentMax int64
  48. resultSetHoldTimeNano int64
  49. executeAndResultSetHoldTime int64
  50. executeNanoSpanMaxOccurTime int64
  51. executeErrorCount int64
  52. executeErrorLast error
  53. executeErrorLastMessage string
  54. executeErrorLastStackTrace string
  55. executeErrorLastTime int64
  56. updateCount int64
  57. updateCountMax int64
  58. fetchRowCount int64
  59. fetchRowCountMax int64
  60. inTransactionCount int64
  61. lastSlowParameters string
  62. clobOpenCount int64
  63. blobOpenCount int64
  64. readStringLength int64
  65. readBytesLength int64
  66. inputStreamOpenCount int64
  67. readerOpenCount int64
  68. histogram_0_1 int64
  69. histogram_1_10 int64
  70. histogram_10_100 int64
  71. histogram_100_1000 int64
  72. histogram_1000_10000 int64
  73. histogram_10000_100000 int64
  74. histogram_100000_1000000 int64
  75. histogram_1000000_more int64
  76. executeAndResultHoldTime_0_1 int64
  77. executeAndResultHoldTime_1_10 int64
  78. executeAndResultHoldTime_10_100 int64
  79. executeAndResultHoldTime_100_1000 int64
  80. executeAndResultHoldTime_1000_10000 int64
  81. executeAndResultHoldTime_10000_100000 int64
  82. executeAndResultHoldTime_100000_1000000 int64
  83. executeAndResultHoldTime_1000000_more int64
  84. fetchRowCount_0_1 int64
  85. fetchRowCount_1_10 int64
  86. fetchRowCount_10_100 int64
  87. fetchRowCount_100_1000 int64
  88. fetchRowCount_1000_10000 int64
  89. fetchRowCount_10000_more int64
  90. updateCount_0_1 int64
  91. updateCount_1_10 int64
  92. updateCount_10_100 int64
  93. updateCount_100_1000 int64
  94. updateCount_1000_10000 int64
  95. updateCount_10000_more int64
  96. }
  97. func newSqlStatValue() *SqlStatValue {
  98. ssv := new(SqlStatValue)
  99. return ssv
  100. }
  101. func (ssv *SqlStatValue) getExecuteHistogram() []int64 {
  102. return []int64{
  103. ssv.histogram_0_1,
  104. ssv.histogram_1_10,
  105. ssv.histogram_10_100,
  106. ssv.histogram_100_1000,
  107. ssv.histogram_1000_10000,
  108. ssv.histogram_10000_100000,
  109. ssv.histogram_100000_1000000,
  110. ssv.histogram_1000000_more,
  111. }
  112. }
  113. func (ssv *SqlStatValue) getExecuteAndResultHoldHistogram() []int64 {
  114. return []int64{ssv.executeAndResultHoldTime_0_1,
  115. ssv.executeAndResultHoldTime_1_10,
  116. ssv.executeAndResultHoldTime_10_100,
  117. ssv.executeAndResultHoldTime_100_1000,
  118. ssv.executeAndResultHoldTime_1000_10000,
  119. ssv.executeAndResultHoldTime_10000_100000,
  120. ssv.executeAndResultHoldTime_100000_1000000,
  121. ssv.executeAndResultHoldTime_1000000_more,
  122. }
  123. }
  124. func (ssv *SqlStatValue) getFetchRowHistogram() []int64 {
  125. return []int64{ssv.fetchRowCount_0_1,
  126. ssv.fetchRowCount_1_10,
  127. ssv.fetchRowCount_10_100,
  128. ssv.fetchRowCount_100_1000,
  129. ssv.fetchRowCount_1000_10000,
  130. ssv.fetchRowCount_10000_more,
  131. }
  132. }
  133. func (ssv *SqlStatValue) getUpdateHistogram() []int64 {
  134. return []int64{ssv.updateCount_0_1,
  135. ssv.updateCount_1_10,
  136. ssv.updateCount_10_100,
  137. ssv.updateCount_100_1000,
  138. ssv.updateCount_1000_10000,
  139. ssv.updateCount_10000_more,
  140. }
  141. }
  142. func (ssv *SqlStatValue) getExecuteCount() int64 {
  143. return ssv.executeErrorCount + ssv.executeSuccessCount
  144. }
  145. func (ssv *SqlStatValue) getExecuteMillisMax() int64 {
  146. return ssv.executeSpanNanoMax / (1000 * 1000)
  147. }
  148. func (ssv *SqlStatValue) getExecuteMillisTotal() int64 {
  149. return ssv.executeSpanNanoTotal / (1000 * 1000)
  150. }
  151. func (ssv *SqlStatValue) getHistogramValues() []int64 {
  152. return []int64{
  153. ssv.histogram_0_1,
  154. ssv.histogram_1_10,
  155. ssv.histogram_10_100,
  156. ssv.histogram_100_1000,
  157. ssv.histogram_1000_10000,
  158. ssv.histogram_10000_100000,
  159. ssv.histogram_100000_1000000,
  160. ssv.histogram_1000000_more,
  161. }
  162. }
  163. func (ssv *SqlStatValue) getFetchRowCountHistogramValues() []int64 {
  164. return []int64{
  165. ssv.fetchRowCount_0_1,
  166. ssv.fetchRowCount_1_10,
  167. ssv.fetchRowCount_10_100,
  168. ssv.fetchRowCount_100_1000,
  169. ssv.fetchRowCount_1000_10000,
  170. ssv.fetchRowCount_10000_more,
  171. }
  172. }
  173. func (ssv *SqlStatValue) getUpdateCountHistogramValues() []int64 {
  174. return []int64{
  175. ssv.updateCount_0_1,
  176. ssv.updateCount_1_10,
  177. ssv.updateCount_10_100,
  178. ssv.updateCount_100_1000,
  179. ssv.updateCount_1000_10000,
  180. ssv.updateCount_10000_more,
  181. }
  182. }
  183. func (ssv *SqlStatValue) getExecuteAndResultHoldTimeHistogramValues() []int64 {
  184. return []int64{
  185. ssv.executeAndResultHoldTime_0_1,
  186. ssv.executeAndResultHoldTime_1_10,
  187. ssv.executeAndResultHoldTime_10_100,
  188. ssv.executeAndResultHoldTime_100_1000,
  189. ssv.executeAndResultHoldTime_1000_10000,
  190. ssv.executeAndResultHoldTime_10000_100000,
  191. ssv.executeAndResultHoldTime_100000_1000000,
  192. ssv.executeAndResultHoldTime_1000000_more,
  193. }
  194. }
  195. func (ssv *SqlStatValue) getResultSetHoldTimeMilis() int64 {
  196. return ssv.resultSetHoldTimeNano / (1000 * 1000)
  197. }
  198. func (ssv *SqlStatValue) getExecuteAndResultSetHoldTimeMilis() int64 {
  199. return ssv.executeAndResultSetHoldTime / (1000 * 1000)
  200. }
  201. func (ssv *SqlStatValue) getData() map[string]interface{} {
  202. m := make(map[string]interface{})
  203. m[idConstStr] = ssv.id
  204. m[dataSourceConstStr] = ssv.dataSource
  205. m["DataSourceId"] = ssv.dataSourceId
  206. m[sqlConstStr] = ssv.sql
  207. m[executeCountConstStr] = ssv.getExecuteCount()
  208. m[errorCountConstStr] = ssv.executeErrorCount
  209. m[totalTimeConstStr] = ssv.getExecuteMillisTotal()
  210. m["LastTime"] = ssv.executeLastStartTime
  211. m[maxTimespanConstStr] = ssv.getExecuteMillisMax()
  212. m["LastError"] = ssv.executeErrorLast
  213. m[effectedRowCountConstStr] = ssv.updateCount
  214. m[fetchRowCountConstStr] = ssv.fetchRowCount
  215. m["MaxTimespanOccurTime"] = ssv.executeNanoSpanMaxOccurTime
  216. m["BatchSizeMax"] = ssv.executeBatchSizeMax
  217. m["BatchSizeTotal"] = ssv.executeBatchSizeTotal
  218. m[concurrentMaxConstStr] = ssv.concurrentMax
  219. m[runningCountConstStr] = ssv.runningCount
  220. if ssv.executeErrorLastMessage != "" {
  221. m["LastErrorMessage"] = ssv.executeErrorLastMessage
  222. m["LastErrorStackTrace"] = ssv.executeErrorLastStackTrace
  223. m["LastErrorTime"] = ssv.executeErrorLastTime
  224. } else {
  225. m["LastErrorMessage"] = ""
  226. m["LastErrorClass"] = ""
  227. m["LastErrorStackTrace"] = ""
  228. m["LastErrorTime"] = ""
  229. }
  230. m[urlConstStr] = ""
  231. m[inTransactionCountConstStr] = ssv.inTransactionCount
  232. m["Histogram"] = ssv.getHistogramValues()
  233. m["LastSlowParameters"] = ssv.lastSlowParameters
  234. m["ResultSetHoldTime"] = ssv.getResultSetHoldTimeMilis()
  235. m["ExecuteAndResultSetHoldTime"] = ssv.getExecuteAndResultSetHoldTimeMilis()
  236. m[fetchRowCountConstStr] = ssv.getFetchRowCountHistogramValues()
  237. m[effectedRowCountHistogramConstStr] = ssv.getUpdateCountHistogramValues()
  238. m[executeAndResultHoldTimeHistogramConstStr] = ssv.getExecuteAndResultHoldTimeHistogramValues()
  239. m["EffectedRowCountMax"] = ssv.updateCountMax
  240. m["FetchRowCountMax"] = ssv.fetchRowCountMax
  241. m[clobOpenCountConstStr] = ssv.clobOpenCount
  242. m[blobOpenCountConstStr] = ssv.blobOpenCount
  243. m["ReadStringLength"] = ssv.readStringLength
  244. m["ReadBytesLength"] = ssv.readBytesLength
  245. m["InputStreamOpenCount"] = ssv.inputStreamOpenCount
  246. m["ReaderOpenCount"] = ssv.readerOpenCount
  247. m["HASH"] = ssv.sqlHash
  248. m[executeHoldTimeHistogramConstStr] = ssv.getExecuteHistogram()
  249. return m
  250. }
  251. type sqlStat struct {
  252. Sql string
  253. SqlHash int64
  254. Id string
  255. ExecuteLastStartTime int64
  256. ExecuteBatchSizeTotal int64
  257. ExecuteBatchSizeMax int64
  258. ExecuteSuccessCount int64
  259. ExecuteSpanNanoTotal int64
  260. ExecuteSpanNanoMax int64
  261. RunningCount int64
  262. ConcurrentMax int64
  263. ResultSetHoldTimeNano int64
  264. ExecuteAndResultSetHoldTime int64
  265. DataSource string
  266. File string
  267. ExecuteNanoSpanMaxOccurTime int64
  268. ExecuteErrorCount int64
  269. ExecuteErrorLast error
  270. ExecuteErrorLastTime int64
  271. UpdateCount int64
  272. UpdateCountMax int64
  273. FetchRowCount int64
  274. FetchRowCountMax int64
  275. InTransactionCount int64
  276. LastSlowParameters string
  277. Removed int64
  278. ClobOpenCount int64
  279. BlobOpenCount int64
  280. ReadStringLength int64
  281. ReadBytesLength int64
  282. InputStreamOpenCount int64
  283. ReaderOpenCount int64
  284. Histogram_0_1 int64
  285. Histogram_1_10 int64
  286. Histogram_10_100 int64
  287. Histogram_100_1000 int64
  288. Histogram_1000_10000 int64
  289. Histogram_10000_100000 int64
  290. Histogram_100000_1000000 int64
  291. Histogram_1000000_more int64
  292. ExecuteAndResultHoldTime_0_1 int64
  293. ExecuteAndResultHoldTime_1_10 int64
  294. ExecuteAndResultHoldTime_10_100 int64
  295. ExecuteAndResultHoldTime_100_1000 int64
  296. ExecuteAndResultHoldTime_1000_10000 int64
  297. ExecuteAndResultHoldTime_10000_100000 int64
  298. ExecuteAndResultHoldTime_100000_1000000 int64
  299. ExecuteAndResultHoldTime_1000000_more int64
  300. FetchRowCount_0_1 int64
  301. FetchRowCount_1_10 int64
  302. FetchRowCount_10_100 int64
  303. FetchRowCount_100_1000 int64
  304. FetchRowCount_1000_10000 int64
  305. FetchRowCount_10000_more int64
  306. UpdateCount_0_1 int64
  307. UpdateCount_1_10 int64
  308. UpdateCount_10_100 int64
  309. UpdateCount_100_1000 int64
  310. UpdateCount_1000_10000 int64
  311. UpdateCount_10000_more int64
  312. DataSourceId string
  313. }
  314. func NewSqlStat(sql string) *sqlStat {
  315. s := new(sqlStat)
  316. s.Sql = sql
  317. s.Id = "SQL" + generateId()
  318. return s
  319. }
  320. func (s *sqlStat) reset() {
  321. s.ExecuteLastStartTime = 0
  322. s.ExecuteBatchSizeTotal = 0
  323. s.ExecuteBatchSizeMax = 0
  324. s.ExecuteSuccessCount = 0
  325. s.ExecuteSpanNanoTotal = 0
  326. s.ExecuteSpanNanoMax = 0
  327. s.ExecuteNanoSpanMaxOccurTime = 0
  328. s.ConcurrentMax = 0
  329. s.ExecuteErrorCount = 0
  330. s.ExecuteErrorLast = nil
  331. s.ExecuteErrorLastTime = 0
  332. s.UpdateCount = 0
  333. s.UpdateCountMax = 0
  334. s.FetchRowCount = 0
  335. s.FetchRowCountMax = 0
  336. s.Histogram_0_1 = 0
  337. s.Histogram_1_10 = 0
  338. s.Histogram_10_100 = 0
  339. s.Histogram_100_1000 = 0
  340. s.Histogram_1000_10000 = 0
  341. s.Histogram_10000_100000 = 0
  342. s.Histogram_100000_1000000 = 0
  343. s.Histogram_1000000_more = 0
  344. s.LastSlowParameters = ""
  345. s.InTransactionCount = 0
  346. s.ResultSetHoldTimeNano = 0
  347. s.ExecuteAndResultSetHoldTime = 0
  348. s.FetchRowCount_0_1 = 0
  349. s.FetchRowCount_1_10 = 0
  350. s.FetchRowCount_10_100 = 0
  351. s.FetchRowCount_100_1000 = 0
  352. s.FetchRowCount_1000_10000 = 0
  353. s.FetchRowCount_10000_more = 0
  354. s.UpdateCount_0_1 = 0
  355. s.UpdateCount_1_10 = 0
  356. s.UpdateCount_10_100 = 0
  357. s.UpdateCount_100_1000 = 0
  358. s.UpdateCount_1000_10000 = 0
  359. s.UpdateCount_10000_more = 0
  360. s.ExecuteAndResultHoldTime_0_1 = 0
  361. s.ExecuteAndResultHoldTime_1_10 = 0
  362. s.ExecuteAndResultHoldTime_10_100 = 0
  363. s.ExecuteAndResultHoldTime_100_1000 = 0
  364. s.ExecuteAndResultHoldTime_1000_10000 = 0
  365. s.ExecuteAndResultHoldTime_10000_100000 = 0
  366. s.ExecuteAndResultHoldTime_100000_1000000 = 0
  367. s.ExecuteAndResultHoldTime_1000000_more = 0
  368. s.BlobOpenCount = 0
  369. s.ClobOpenCount = 0
  370. s.ReadStringLength = 0
  371. s.ReadBytesLength = 0
  372. s.InputStreamOpenCount = 0
  373. s.ReaderOpenCount = 0
  374. }
  375. func (s *sqlStat) getValueAndReset() *SqlStatValue {
  376. return s.getValue(true)
  377. }
  378. func (s *sqlStat) getValue(reset bool) *SqlStatValue {
  379. ssv := newSqlStatValue()
  380. ssv.dataSource = s.DataSource
  381. ssv.dataSourceId = s.DataSourceId
  382. ssv.sql = s.Sql
  383. ssv.sqlHash = s.SqlHash
  384. ssv.id = s.Id
  385. ssv.executeLastStartTime = s.ExecuteLastStartTime
  386. if reset {
  387. s.ExecuteLastStartTime = 0
  388. }
  389. ssv.executeBatchSizeTotal = getInt64(&s.ExecuteBatchSizeTotal, reset)
  390. ssv.executeBatchSizeMax = getInt64(&s.ExecuteBatchSizeMax, reset)
  391. ssv.executeSuccessCount = getInt64(&s.ExecuteSuccessCount, reset)
  392. ssv.executeSpanNanoTotal = getInt64(&s.ExecuteSpanNanoTotal, reset)
  393. ssv.executeSpanNanoMax = getInt64(&s.ExecuteSpanNanoMax, reset)
  394. ssv.executeNanoSpanMaxOccurTime = s.ExecuteNanoSpanMaxOccurTime
  395. if reset {
  396. s.ExecuteNanoSpanMaxOccurTime = 0
  397. }
  398. ssv.runningCount = s.RunningCount
  399. ssv.concurrentMax = getInt64(&s.ConcurrentMax, reset)
  400. ssv.executeErrorCount = getInt64(&s.ExecuteErrorCount, reset)
  401. ssv.executeErrorLast = s.ExecuteErrorLast
  402. if reset {
  403. s.ExecuteErrorLast = nil
  404. }
  405. ssv.executeErrorLastTime = s.ExecuteErrorLastTime
  406. if reset {
  407. ssv.executeErrorLastTime = 0
  408. }
  409. ssv.updateCount = getInt64(&s.UpdateCount, reset)
  410. ssv.updateCountMax = getInt64(&s.UpdateCountMax, reset)
  411. ssv.fetchRowCount = getInt64(&s.FetchRowCount, reset)
  412. ssv.fetchRowCountMax = getInt64(&s.FetchRowCountMax, reset)
  413. ssv.histogram_0_1 = getInt64(&s.Histogram_0_1, reset)
  414. ssv.histogram_1_10 = getInt64(&s.Histogram_1_10, reset)
  415. ssv.histogram_10_100 = getInt64(&s.Histogram_10_100, reset)
  416. ssv.histogram_100_1000 = getInt64(&s.Histogram_100_1000, reset)
  417. ssv.histogram_1000_10000 = getInt64(&s.Histogram_1000_10000, reset)
  418. ssv.histogram_10000_100000 = getInt64(&s.Histogram_10000_100000, reset)
  419. ssv.histogram_100000_1000000 = getInt64(&s.Histogram_100000_1000000, reset)
  420. ssv.histogram_1000000_more = getInt64(&s.Histogram_1000000_more, reset)
  421. ssv.lastSlowParameters = s.LastSlowParameters
  422. if reset {
  423. s.LastSlowParameters = ""
  424. }
  425. ssv.inTransactionCount = getInt64(&s.InTransactionCount, reset)
  426. ssv.resultSetHoldTimeNano = getInt64(&s.ResultSetHoldTimeNano, reset)
  427. ssv.executeAndResultSetHoldTime = getInt64(&s.ExecuteAndResultSetHoldTime, reset)
  428. ssv.fetchRowCount_0_1 = getInt64(&s.FetchRowCount_0_1, reset)
  429. ssv.fetchRowCount_1_10 = getInt64(&s.FetchRowCount_1_10, reset)
  430. ssv.fetchRowCount_10_100 = getInt64(&s.FetchRowCount_10_100, reset)
  431. ssv.fetchRowCount_100_1000 = getInt64(&s.FetchRowCount_100_1000, reset)
  432. ssv.fetchRowCount_1000_10000 = getInt64(&s.FetchRowCount_1000_10000, reset)
  433. ssv.fetchRowCount_10000_more = getInt64(&s.FetchRowCount_10000_more, reset)
  434. ssv.updateCount_0_1 = getInt64(&s.UpdateCount_0_1, reset)
  435. ssv.updateCount_1_10 = getInt64(&s.UpdateCount_1_10, reset)
  436. ssv.updateCount_10_100 = getInt64(&s.UpdateCount_10_100, reset)
  437. ssv.updateCount_100_1000 = getInt64(&s.UpdateCount_100_1000, reset)
  438. ssv.updateCount_1000_10000 = getInt64(&s.UpdateCount_1000_10000, reset)
  439. ssv.updateCount_10000_more = getInt64(&s.UpdateCount_10000_more, reset)
  440. ssv.executeAndResultHoldTime_0_1 = getInt64(&s.ExecuteAndResultHoldTime_0_1, reset)
  441. ssv.executeAndResultHoldTime_1_10 = getInt64(&s.ExecuteAndResultHoldTime_1_10, reset)
  442. ssv.executeAndResultHoldTime_10_100 = getInt64(&s.ExecuteAndResultHoldTime_10_100, reset)
  443. ssv.executeAndResultHoldTime_100_1000 = getInt64(&s.ExecuteAndResultHoldTime_100_1000, reset)
  444. ssv.executeAndResultHoldTime_1000_10000 = getInt64(&s.ExecuteAndResultHoldTime_1000_10000, reset)
  445. ssv.executeAndResultHoldTime_10000_100000 = getInt64(&s.ExecuteAndResultHoldTime_10000_100000, reset)
  446. ssv.executeAndResultHoldTime_100000_1000000 = getInt64(&s.ExecuteAndResultHoldTime_100000_1000000, reset)
  447. ssv.executeAndResultHoldTime_1000000_more = getInt64(&s.ExecuteAndResultHoldTime_1000000_more, reset)
  448. ssv.blobOpenCount = getInt64(&s.BlobOpenCount, reset)
  449. ssv.clobOpenCount = getInt64(&s.ClobOpenCount, reset)
  450. ssv.readStringLength = getInt64(&s.ReadStringLength, reset)
  451. ssv.readBytesLength = getInt64(&s.ReadBytesLength, reset)
  452. ssv.inputStreamOpenCount = getInt64(&s.InputStreamOpenCount, reset)
  453. ssv.readerOpenCount = getInt64(&s.ReaderOpenCount, reset)
  454. return ssv
  455. }
  456. func (s *sqlStat) addUpdateCount(delta int64) {
  457. if delta > 0 {
  458. atomic.AddInt64(&s.UpdateCount, delta)
  459. }
  460. for {
  461. max := atomic.LoadInt64(&s.UpdateCountMax)
  462. if delta <= max {
  463. break
  464. }
  465. if atomic.CompareAndSwapInt64(&s.UpdateCountMax, max, delta) {
  466. break
  467. }
  468. }
  469. if delta < 1 {
  470. atomic.AddInt64(&s.UpdateCount_0_1, 1)
  471. } else if delta < 10 {
  472. atomic.AddInt64(&s.UpdateCount_1_10, 1)
  473. } else if delta < 100 {
  474. atomic.AddInt64(&s.UpdateCount_10_100, 1)
  475. } else if delta < 1000 {
  476. atomic.AddInt64(&s.UpdateCount_100_1000, 1)
  477. } else if delta < 10000 {
  478. atomic.AddInt64(&s.UpdateCount_1000_10000, 1)
  479. } else {
  480. atomic.AddInt64(&s.UpdateCount_10000_more, 1)
  481. }
  482. }
  483. func (s *sqlStat) incrementClobOpenCount() {
  484. atomic.AddInt64(&s.ClobOpenCount, 1)
  485. }
  486. func (s *sqlStat) incrementBlobOpenCount() {
  487. atomic.AddInt64(&s.BlobOpenCount, 1)
  488. }
  489. func (s *sqlStat) addStringReadLength(length int64) {
  490. atomic.AddInt64(&s.ReadStringLength, length)
  491. }
  492. func (s *sqlStat) addReadBytesLength(length int64) {
  493. atomic.AddInt64(&s.ReadBytesLength, length)
  494. }
  495. func (s *sqlStat) addReaderOpenCount(count int64) {
  496. atomic.AddInt64(&s.ReaderOpenCount, count)
  497. }
  498. func (s *sqlStat) addInputStreamOpenCount(count int64) {
  499. atomic.AddInt64(&s.InputStreamOpenCount, count)
  500. }
  501. func (s *sqlStat) addFetchRowCount(delta int64) {
  502. atomic.AddInt64(&s.FetchRowCount, delta)
  503. for {
  504. max := atomic.LoadInt64(&s.FetchRowCountMax)
  505. if delta <= max {
  506. break
  507. }
  508. if atomic.CompareAndSwapInt64(&s.FetchRowCountMax, max, delta) {
  509. break
  510. }
  511. }
  512. if delta < 1 {
  513. atomic.AddInt64(&s.FetchRowCount_0_1, 1)
  514. } else if delta < 10 {
  515. atomic.AddInt64(&s.FetchRowCount_1_10, 1)
  516. } else if delta < 100 {
  517. atomic.AddInt64(&s.FetchRowCount_10_100, 1)
  518. } else if delta < 1000 {
  519. atomic.AddInt64(&s.FetchRowCount_100_1000, 1)
  520. } else if delta < 10000 {
  521. atomic.AddInt64(&s.FetchRowCount_1000_10000, 1)
  522. } else {
  523. atomic.AddInt64(&s.FetchRowCount_10000_more, 1)
  524. }
  525. }
  526. func (s *sqlStat) addExecuteBatchCount(batchSize int64) {
  527. atomic.AddInt64(&s.ExecuteBatchSizeTotal, batchSize)
  528. for {
  529. current := atomic.LoadInt64(&s.ExecuteBatchSizeMax)
  530. if current < batchSize {
  531. if atomic.CompareAndSwapInt64(&s.ExecuteBatchSizeMax, current, batchSize) {
  532. break
  533. } else {
  534. continue
  535. }
  536. } else {
  537. break
  538. }
  539. }
  540. }
  541. func (s *sqlStat) incrementExecuteSuccessCount() {
  542. atomic.AddInt64(&s.ExecuteSuccessCount, 1)
  543. }
  544. func (s *sqlStat) incrementRunningCount() {
  545. val := atomic.AddInt64(&s.RunningCount, 1)
  546. for {
  547. max := atomic.LoadInt64(&s.ConcurrentMax)
  548. if val > max {
  549. if atomic.CompareAndSwapInt64(&s.ConcurrentMax, max, val) {
  550. break
  551. } else {
  552. continue
  553. }
  554. } else {
  555. break
  556. }
  557. }
  558. }
  559. func (s *sqlStat) decrementRunningCount() {
  560. atomic.AddInt64(&s.RunningCount, -1)
  561. }
  562. func (s *sqlStat) addExecuteTimeAndResultHoldTimeHistogramRecord(executeType ExecuteTypeEnum, firstResultSet bool, nanoSpan int64, parameters string) {
  563. s.addExecuteTime(nanoSpan, parameters)
  564. if ExecuteQuery != executeType && !firstResultSet {
  565. s.executeAndResultHoldTimeHistogramRecord(nanoSpan)
  566. }
  567. }
  568. func (s *sqlStat) executeAndResultHoldTimeHistogramRecord(nanoSpan int64) {
  569. millis := nanoSpan / 1000 / 1000
  570. if millis < 1 {
  571. atomic.AddInt64(&s.ExecuteAndResultHoldTime_0_1, 1)
  572. } else if millis < 10 {
  573. atomic.AddInt64(&s.ExecuteAndResultHoldTime_1_10, 1)
  574. } else if millis < 100 {
  575. atomic.AddInt64(&s.ExecuteAndResultHoldTime_10_100, 1)
  576. } else if millis < 1000 {
  577. atomic.AddInt64(&s.ExecuteAndResultHoldTime_100_1000, 1)
  578. } else if millis < 10000 {
  579. atomic.AddInt64(&s.ExecuteAndResultHoldTime_1000_10000, 1)
  580. } else if millis < 100000 {
  581. atomic.AddInt64(&s.ExecuteAndResultHoldTime_10000_100000, 1)
  582. } else if millis < 1000000 {
  583. atomic.AddInt64(&s.ExecuteAndResultHoldTime_100000_1000000, 1)
  584. } else {
  585. atomic.AddInt64(&s.ExecuteAndResultHoldTime_1000000_more, 1)
  586. }
  587. }
  588. func (s *sqlStat) histogramRecord(nanoSpan int64) {
  589. millis := nanoSpan / 1000 / 1000
  590. if millis < 1 {
  591. atomic.AddInt64(&s.Histogram_0_1, 1)
  592. } else if millis < 10 {
  593. atomic.AddInt64(&s.Histogram_1_10, 1)
  594. } else if millis < 100 {
  595. atomic.AddInt64(&s.Histogram_10_100, 1)
  596. } else if millis < 1000 {
  597. atomic.AddInt64(&s.Histogram_100_1000, 1)
  598. } else if millis < 10000 {
  599. atomic.AddInt64(&s.Histogram_1000_10000, 1)
  600. } else if millis < 100000 {
  601. atomic.AddInt64(&s.Histogram_10000_100000, 1)
  602. } else if millis < 1000000 {
  603. atomic.AddInt64(&s.Histogram_100000_1000000, 1)
  604. } else {
  605. atomic.AddInt64(&s.Histogram_1000000_more, 1)
  606. }
  607. }
  608. func (s *sqlStat) addExecuteTime(nanoSpan int64, parameters string) {
  609. atomic.AddInt64(&s.ExecuteSpanNanoTotal, nanoSpan)
  610. for {
  611. current := atomic.LoadInt64(&s.ExecuteSpanNanoMax)
  612. if current < nanoSpan {
  613. if atomic.CompareAndSwapInt64(&s.ExecuteSpanNanoMax, current, nanoSpan) {
  614. s.ExecuteNanoSpanMaxOccurTime = time.Now().UnixNano()
  615. s.LastSlowParameters = parameters
  616. break
  617. } else {
  618. continue
  619. }
  620. } else {
  621. break
  622. }
  623. }
  624. s.histogramRecord(nanoSpan)
  625. }
  626. func (s *sqlStat) getExecuteMillisTotal() int64 {
  627. return s.ExecuteSpanNanoTotal / (1000 * 1000)
  628. }
  629. func (s *sqlStat) getExecuteMillisMax() int64 {
  630. return s.ExecuteSpanNanoMax / (1000 * 1000)
  631. }
  632. func (s *sqlStat) incrementInTransactionCount() {
  633. atomic.AddInt64(&s.InTransactionCount, 1)
  634. }
  635. func (s *sqlStat) getExecuteCount() int64 {
  636. return s.ExecuteErrorCount + s.ExecuteSuccessCount
  637. }
  638. func (s *sqlStat) getData() map[string]interface{} {
  639. return s.getValue(false).getData()
  640. }
  641. func (s *sqlStat) getHistogramValues() []int64 {
  642. return []int64{
  643. s.Histogram_0_1,
  644. s.Histogram_1_10,
  645. s.Histogram_10_100,
  646. s.Histogram_100_1000,
  647. s.Histogram_1000_10000,
  648. s.Histogram_10000_100000,
  649. s.Histogram_100000_1000000,
  650. s.Histogram_1000000_more,
  651. }
  652. }
  653. func (s *sqlStat) getHistogramSum() int64 {
  654. values := s.getHistogramValues()
  655. var sum int64 = 0
  656. for i := 0; i < len(values); i++ {
  657. sum += values[i]
  658. }
  659. return sum
  660. }
  661. func (s *sqlStat) error(err error) {
  662. atomic.AddInt64(&s.ExecuteErrorCount, 1)
  663. s.ExecuteErrorLastTime = time.Now().UnixNano()
  664. s.ExecuteErrorLast = err
  665. }
  666. func (s *sqlStat) getResultSetHoldTimeMilis() int64 {
  667. return s.ResultSetHoldTimeNano / (1000 * 1000)
  668. }
  669. func (s *sqlStat) getExecuteAndResultSetHoldTimeMilis() int64 {
  670. return s.ExecuteAndResultSetHoldTime / (1000 * 1000)
  671. }
  672. func (s *sqlStat) getFetchRowCountHistogramValues() []int64 {
  673. return []int64{
  674. s.FetchRowCount_0_1,
  675. s.FetchRowCount_1_10,
  676. s.FetchRowCount_10_100,
  677. s.FetchRowCount_100_1000,
  678. s.FetchRowCount_1000_10000,
  679. s.FetchRowCount_10000_more,
  680. }
  681. }
  682. func (s *sqlStat) getUpdateCountHistogramValues() []int64 {
  683. return []int64{
  684. s.UpdateCount_0_1,
  685. s.UpdateCount_1_10,
  686. s.UpdateCount_10_100,
  687. s.UpdateCount_100_1000,
  688. s.UpdateCount_1000_10000,
  689. s.UpdateCount_10000_more,
  690. }
  691. }
  692. func (s *sqlStat) getExecuteAndResultHoldTimeHistogramValues() []int64 {
  693. return []int64{
  694. s.ExecuteAndResultHoldTime_0_1,
  695. s.ExecuteAndResultHoldTime_1_10,
  696. s.ExecuteAndResultHoldTime_10_100,
  697. s.ExecuteAndResultHoldTime_100_1000,
  698. s.ExecuteAndResultHoldTime_1000_10000,
  699. s.ExecuteAndResultHoldTime_10000_100000,
  700. s.ExecuteAndResultHoldTime_100000_1000000,
  701. s.ExecuteAndResultHoldTime_1000000_more,
  702. }
  703. }
  704. func (s *sqlStat) getExecuteAndResultHoldTimeHistogramSum() int64 {
  705. values := s.getExecuteAndResultHoldTimeHistogramValues()
  706. var sum int64 = 0
  707. for i := 0; i < len(values); i++ {
  708. sum += values[i]
  709. }
  710. return sum
  711. }
  712. func (s *sqlStat) addResultSetHoldTimeNano(nano int64) {
  713. atomic.AddInt64(&s.ResultSetHoldTimeNano, nano)
  714. }
  715. func (s *sqlStat) addResultSetHoldTimeNano2(statementExecuteNano int64, resultHoldTimeNano int64) {
  716. atomic.AddInt64(&s.ResultSetHoldTimeNano, resultHoldTimeNano)
  717. atomic.AddInt64(&s.ExecuteAndResultSetHoldTime, statementExecuteNano+resultHoldTimeNano)
  718. s.executeAndResultHoldTimeHistogramRecord((statementExecuteNano + resultHoldTimeNano) / 1000 / 1000)
  719. atomic.AddInt64(&s.UpdateCount_0_1, 1)
  720. }
  721. type connectionStatValue struct {
  722. id string
  723. url string
  724. connCount int64
  725. activeConnCount int64
  726. maxActiveConnCount int64
  727. executeCount int64
  728. errorCount int64
  729. stmtCount int64
  730. activeStmtCount int64
  731. maxActiveStmtCount int64
  732. commitCount int64
  733. rollbackCount int64
  734. clobOpenCount int64
  735. blobOpenCount int64
  736. properties string
  737. }
  738. func newConnectionStatValue() *connectionStatValue {
  739. csv := new(connectionStatValue)
  740. return csv
  741. }
  742. func (csv *connectionStatValue) getData() map[string]interface{} {
  743. m := make(map[string]interface{})
  744. m[idConstStr] = csv.id
  745. m[urlConstStr] = csv.url
  746. m[connCountConstStr] = csv.connCount
  747. m[activeConnCountConstStr] = csv.activeConnCount
  748. m[maxActiveConnCountConstStr] = csv.maxActiveConnCount
  749. m[stmtCountConstStr] = csv.stmtCount
  750. m[activeStmtCountConstStr] = csv.activeStmtCount
  751. m[maxActiveStmtCountConstStr] = csv.maxActiveStmtCount
  752. m[executeCountConstStr] = csv.executeCount
  753. m[errorCountConstStr] = csv.errorCount
  754. m[commitCountConstStr] = csv.commitCount
  755. m[rollbackCountConstStr] = csv.rollbackCount
  756. m[clobOpenCountConstStr] = csv.clobOpenCount
  757. m[blobOpenCountConstStr] = csv.blobOpenCount
  758. m[propertiesConstStr] = csv.properties
  759. return m
  760. }
  761. type connectionStat struct {
  762. id string
  763. url string
  764. connCount int64
  765. activeConnCount int64
  766. maxActiveConnCount int64
  767. executeCount int64
  768. errorCount int64
  769. stmtCount int64
  770. activeStmtCount int64
  771. maxActiveStmtCount int64
  772. commitCount int64
  773. rollbackCount int64
  774. clobOpenCount int64
  775. blobOpenCount int64
  776. sqlStatMap map[string]*sqlStat
  777. maxSqlSize int
  778. skipSqlCount int64
  779. lock sync.RWMutex
  780. properties string
  781. }
  782. func newConnectionStat(url string) *connectionStat {
  783. cs := new(connectionStat)
  784. cs.maxSqlSize = StatSqlMaxCount
  785. cs.id = "DS" + generateId()
  786. cs.url = url
  787. cs.sqlStatMap = make(map[string]*sqlStat, 200)
  788. return cs
  789. }
  790. func (cs *connectionStat) createSqlStat(sql string) *sqlStat {
  791. cs.lock.Lock()
  792. defer cs.lock.Unlock()
  793. sqlStat, ok := cs.sqlStatMap[sql]
  794. if !ok {
  795. sqlStat := NewSqlStat(sql)
  796. sqlStat.DataSource = cs.url
  797. sqlStat.DataSourceId = cs.id
  798. if cs.putSqlStat(sqlStat) {
  799. return sqlStat
  800. } else {
  801. return nil
  802. }
  803. }
  804. return sqlStat
  805. }
  806. func (cs *connectionStat) putSqlStat(sqlStat *sqlStat) bool {
  807. if cs.maxSqlSize > 0 && len(cs.sqlStatMap) == cs.maxSqlSize {
  808. if StatSqlRemoveMode == STAT_SQL_REMOVE_OLDEST {
  809. removeSqlStat := cs.eliminateSqlStat()
  810. if removeSqlStat.RunningCount > 0 || removeSqlStat.getExecuteCount() > 0 {
  811. atomic.AddInt64(&cs.skipSqlCount, 1)
  812. }
  813. cs.sqlStatMap[sqlStat.Sql] = sqlStat
  814. return true
  815. } else {
  816. if sqlStat.RunningCount > 0 || sqlStat.getExecuteCount() > 0 {
  817. atomic.AddInt64(&cs.skipSqlCount, 1)
  818. }
  819. return false
  820. }
  821. } else {
  822. cs.sqlStatMap[sqlStat.Sql] = sqlStat
  823. return true
  824. }
  825. }
  826. func (cs *connectionStat) eliminateSqlStat() *sqlStat {
  827. if cs.maxSqlSize > 0 && len(cs.sqlStatMap) == cs.maxSqlSize {
  828. if StatSqlRemoveMode == STAT_SQL_REMOVE_OLDEST {
  829. for s, item := range cs.sqlStatMap {
  830. if item != nil {
  831. delete(cs.sqlStatMap, s)
  832. return item
  833. }
  834. }
  835. }
  836. }
  837. return nil
  838. }
  839. func (cs *connectionStat) getSqlStatMap() map[string]*sqlStat {
  840. m := make(map[string]*sqlStat, len(cs.sqlStatMap))
  841. cs.lock.Lock()
  842. defer cs.lock.Unlock()
  843. for s, item := range cs.sqlStatMap {
  844. m[s] = item
  845. }
  846. return m
  847. }
  848. func (cs *connectionStat) getSqlStatMapAndReset() []*SqlStatValue {
  849. stats := make([]*sqlStat, 0, len(cs.sqlStatMap))
  850. cs.lock.Lock()
  851. defer cs.lock.Unlock()
  852. for s, stat := range cs.sqlStatMap {
  853. if stat.getExecuteCount() == 0 && stat.RunningCount == 0 {
  854. stat.Removed = 1
  855. delete(cs.sqlStatMap, s)
  856. } else {
  857. stats = append(stats, stat)
  858. }
  859. }
  860. values := make([]*SqlStatValue, 0, len(stats))
  861. for _, stat := range stats {
  862. value := stat.getValueAndReset()
  863. if value.getExecuteCount() == 0 && value.runningCount == 0 {
  864. continue
  865. }
  866. values = append(values, value)
  867. }
  868. return values
  869. }
  870. func (cs *connectionStat) incrementConn() {
  871. atomic.AddInt64(&cs.connCount, 1)
  872. atomic.AddInt64(&cs.activeConnCount, 1)
  873. count := atomic.LoadInt64(&cs.activeConnCount)
  874. if count > atomic.LoadInt64(&cs.maxActiveConnCount) {
  875. atomic.StoreInt64(&cs.maxActiveConnCount, count)
  876. }
  877. }
  878. func (cs *connectionStat) decrementConn() {
  879. atomic.AddInt64(&cs.activeConnCount, -1)
  880. }
  881. func (cs *connectionStat) incrementStmt() {
  882. atomic.AddInt64(&cs.stmtCount, 1)
  883. atomic.AddInt64(&cs.activeStmtCount, 1)
  884. count := atomic.LoadInt64(&cs.activeStmtCount)
  885. if count > atomic.LoadInt64(&cs.maxActiveStmtCount) {
  886. atomic.StoreInt64(&cs.maxActiveStmtCount, count)
  887. }
  888. }
  889. func (cs *connectionStat) decrementStmt() {
  890. atomic.AddInt64(&cs.activeStmtCount, -1)
  891. }
  892. func (cs *connectionStat) decrementStmtByActiveStmtCount(activeStmtCount int64) {
  893. atomic.AddInt64(&cs.activeStmtCount, -activeStmtCount)
  894. }
  895. func (cs *connectionStat) incrementExecuteCount() {
  896. atomic.AddInt64(&cs.executeCount, 1)
  897. }
  898. func (cs *connectionStat) incrementErrorCount() {
  899. atomic.AddInt64(&cs.errorCount, 1)
  900. }
  901. func (cs *connectionStat) incrementCommitCount() {
  902. atomic.AddInt64(&cs.commitCount, 1)
  903. }
  904. func (cs *connectionStat) incrementRollbackCount() {
  905. atomic.AddInt64(&cs.rollbackCount, 1)
  906. }
  907. func (cs *connectionStat) getValue(reset bool) *connectionStatValue {
  908. val := newConnectionStatValue()
  909. val.id = cs.id
  910. val.url = cs.url
  911. val.connCount = getInt64(&cs.connCount, reset)
  912. val.activeConnCount = getInt64(&cs.activeConnCount, false)
  913. val.maxActiveConnCount = getInt64(&cs.maxActiveConnCount, false)
  914. val.stmtCount = getInt64(&cs.stmtCount, reset)
  915. val.activeStmtCount = getInt64(&cs.activeStmtCount, false)
  916. val.maxActiveStmtCount = getInt64(&cs.maxActiveStmtCount, false)
  917. val.commitCount = getInt64(&cs.commitCount, reset)
  918. val.rollbackCount = getInt64(&cs.rollbackCount, reset)
  919. val.executeCount = getInt64(&cs.executeCount, reset)
  920. val.errorCount = getInt64(&cs.errorCount, reset)
  921. val.blobOpenCount = getInt64(&cs.blobOpenCount, reset)
  922. val.clobOpenCount = getInt64(&cs.clobOpenCount, reset)
  923. val.properties = cs.properties
  924. return val
  925. }
  926. func (cs *connectionStat) getData() map[string]interface{} {
  927. return cs.getValue(false).getData()
  928. }
  929. func (cs *connectionStat) getValueAndReset() *connectionStatValue {
  930. return cs.getValue(true)
  931. }
  932. type GoStat struct {
  933. connStatMap map[string]*connectionStat
  934. lock sync.RWMutex
  935. maxConnSize int
  936. skipConnCount int64
  937. }
  938. func newGoStat(maxConnSize int) *GoStat {
  939. gs := new(GoStat)
  940. if maxConnSize > 0 {
  941. gs.maxConnSize = maxConnSize
  942. } else {
  943. gs.maxConnSize = 1000
  944. }
  945. gs.connStatMap = make(map[string]*connectionStat, 16)
  946. return gs
  947. }
  948. func (gs *GoStat) createConnStat(conn *DmConnection) *connectionStat {
  949. url := conn.dmConnector.host + ":" + strconv.Itoa(int(conn.dmConnector.port))
  950. gs.lock.Lock()
  951. defer gs.lock.Unlock()
  952. connstat, ok := gs.connStatMap[url]
  953. if !ok {
  954. connstat = newConnectionStat(url)
  955. remove := len(gs.connStatMap) > gs.maxConnSize
  956. if remove && connstat.activeConnCount > 0 {
  957. atomic.AddInt64(&gs.skipConnCount, 1)
  958. }
  959. gs.connStatMap[url] = connstat
  960. }
  961. return connstat
  962. }
  963. func (gs *GoStat) getConnStatMap() map[string]*connectionStat {
  964. m := make(map[string]*connectionStat, len(gs.connStatMap))
  965. gs.lock.Lock()
  966. defer gs.lock.Unlock()
  967. for s, stat := range gs.connStatMap {
  968. m[s] = stat
  969. }
  970. return m
  971. }
  972. var sqlRowField = []string{rowNumConstStr, dataSourceConstStr, sqlConstStr, executeCountConstStr,
  973. totalTimeConstStr, maxTimespanConstStr, inTransactionCountConstStr, errorCountConstStr, effectedRowCountConstStr,
  974. fetchRowCountConstStr, runningCountConstStr, concurrentMaxConstStr, executeHoldTimeHistogramConstStr,
  975. executeAndResultHoldTimeHistogramConstStr, fetchRowCountHistogramConstStr, effectedRowCountHistogramConstStr}
  976. var sqlColField = []string{"ID", "DataSource", "SQL", "ExecuteCount",
  977. "ErrorCount", "TotalTime", "LastTime", "MaxTimespan", "LastError", "EffectedRowCount",
  978. "FetchRowCount", "MaxTimespanOccurTime", "BatchSizeMax", "BatchSizeTotal", "ConcurrentMax",
  979. "RunningCount", "Name", "File", "LastErrorMessage", "LastErrorClass", "LastErrorStackTrace",
  980. "LastErrorTime", "DbType", "URL", "InTransactionCount", "Histogram", "LastSlowParameters",
  981. "ResultSetHoldTime", "ExecuteAndResultSetHoldTime", "FetchRowCountHistogram",
  982. "EffectedRowCountHistogram", "ExecuteAndResultHoldTimeHistogram", "EffectedRowCountMax",
  983. "FetchRowCountMax", "ClobOpenCount"}
  984. const (
  985. rowNumConstStr = "rowNum"
  986. idConstStr = "ID"
  987. urlConstStr = "Url"
  988. connCountConstStr = "ConnCount"
  989. activeConnCountConstStr = "ActiveConnCount"
  990. maxActiveConnCountConstStr = "MaxActiveConnCount"
  991. stmtCountConstStr = "StmtCount"
  992. activeStmtCountConstStr = "ActiveStmtCount"
  993. maxActiveStmtCountConstStr = "MaxActiveStmtCount"
  994. executeCountConstStr = "ExecuteCount"
  995. errorCountConstStr = "ErrorCount"
  996. commitCountConstStr = "CommitCount"
  997. rollbackCountConstStr = "RollbackCount"
  998. clobOpenCountConstStr = "ClobOpenCount"
  999. blobOpenCountConstStr = "BlobOpenCount"
  1000. propertiesConstStr = "Properties"
  1001. dataSourceConstStr = "DataSource"
  1002. sqlConstStr = "SQL"
  1003. totalTimeConstStr = "TotalTime"
  1004. maxTimespanConstStr = "MaxTimespan"
  1005. inTransactionCountConstStr = "InTransactionCount"
  1006. effectedRowCountConstStr = "EffectedRowCount"
  1007. fetchRowCountConstStr = "FetchRowCount"
  1008. runningCountConstStr = "RunningCount"
  1009. concurrentMaxConstStr = "ConcurrentMax"
  1010. executeHoldTimeHistogramConstStr = "ExecuteHoldTimeHistogram"
  1011. executeAndResultHoldTimeHistogramConstStr = "ExecuteAndResultHoldTimeHistogram"
  1012. fetchRowCountHistogramConstStr = "FetchRowCountHistogram"
  1013. effectedRowCountHistogramConstStr = "EffectedRowCountHistogram"
  1014. )
  1015. var dsRowField = []string{rowNumConstStr, urlConstStr, activeConnCountConstStr,
  1016. maxActiveConnCountConstStr, activeStmtCountConstStr, maxActiveStmtCountConstStr, executeCountConstStr, errorCountConstStr,
  1017. commitCountConstStr, rollbackCountConstStr}
  1018. var dsColField = []string{"ID", "ConnCount", "ActiveConnCount",
  1019. "MaxActiveConnCount", "StmtCount", "ActiveStmtCount", "MaxActiveStmtCount", "ExecuteCount",
  1020. "ErrorCount", "CommitCount", "RollbackCount", "ClobOpenCount", "BlobOpenCount"}
  1021. const (
  1022. PROP_NAME_SORT = "sort"
  1023. PROP_NAME_SORT_FIELD = "field"
  1024. PROP_NAME_SORT_TYPE = "direction"
  1025. PROP_NAME_SEARCH = "search"
  1026. PROP_NAME_PAGE_NUM = "pageNum"
  1027. PROP_NAME_PAGE_SIZE = "pageSize"
  1028. PROP_NAME_PAGE_COUNT = "pageCount"
  1029. PROP_NAME_TOTAL_ROW_COUNT = "totalRowCount"
  1030. PROP_NAME_FLUSH_FREQ = "flushFreq"
  1031. PROP_NAME_DATASOURCE_ID = "dataSourceId"
  1032. PROP_NAME_SQL_ID = "sqlId"
  1033. URL_SQL = "sql"
  1034. URL_SQL_DETAIL = "sqlDetail"
  1035. URL_DATASOURCE = "dataSource"
  1036. URL_DATASOURCE_DETAIL = "dataSourceDetail"
  1037. RESULT_CODE_SUCCESS = 1
  1038. RESULT_CODE_ERROR = -1
  1039. DEFAULT_PAGE_NUM = 1
  1040. DEFAULT_PAGE_SIZE = int(INT32_MAX)
  1041. DEFAULT_ORDER_TYPE = "asc"
  1042. DEFAULT_ORDERBY = "DataSourceId"
  1043. )
  1044. type StatReader struct {
  1045. connStat []map[string]interface{}
  1046. connStatColLens []int
  1047. highFreqSqlStat []map[string]interface{}
  1048. highFreqSqlStatColLens []int
  1049. slowSqlStat []map[string]interface{}
  1050. slowSqlStatColLens []int
  1051. }
  1052. func newStatReader() *StatReader {
  1053. sr := new(StatReader)
  1054. return sr
  1055. }
  1056. func (sr *StatReader) readConnStat(retList []string, maxCount int) (bool, []string) {
  1057. fields := dsRowField
  1058. isAppend := false
  1059. if sr.connStat == nil {
  1060. sr.connStat = sr.getConnStat("", fields)
  1061. sr.connStatColLens = calcColLens(sr.connStat, fields, COL_MAX_LEN)
  1062. isAppend = false
  1063. } else {
  1064. isAppend = true
  1065. }
  1066. var retContent []map[string]interface{}
  1067. if maxCount > 0 && len(sr.connStat) > maxCount {
  1068. retContent = sr.connStat[0:maxCount]
  1069. sr.connStat = sr.connStat[maxCount:len(sr.connStat)]
  1070. } else {
  1071. retContent = sr.connStat
  1072. sr.connStat = nil
  1073. }
  1074. retList = append(retList, sr.getFormattedOutput(retContent, fields, sr.connStatColLens, isAppend))
  1075. return sr.connStat != nil, retList
  1076. }
  1077. func (sr *StatReader) readHighFreqSqlStat(retList []string, maxCount int) (bool, []string) {
  1078. isAppend := false
  1079. if sr.highFreqSqlStat == nil {
  1080. sr.highFreqSqlStat = sr.getHighFreqSqlStat(StatHighFreqSqlCount, -1, sqlRowField)
  1081. sr.highFreqSqlStatColLens = calcColLens(sr.highFreqSqlStat, sqlRowField, COL_MAX_LEN)
  1082. isAppend = false
  1083. } else {
  1084. isAppend = true
  1085. }
  1086. var retContent []map[string]interface{}
  1087. if maxCount > 0 && len(sr.highFreqSqlStat) > maxCount {
  1088. retContent = sr.highFreqSqlStat[0:maxCount]
  1089. sr.highFreqSqlStat = sr.highFreqSqlStat[maxCount:len(sr.highFreqSqlStat)]
  1090. } else {
  1091. retContent = sr.highFreqSqlStat
  1092. sr.highFreqSqlStat = nil
  1093. }
  1094. retList = append(retList, sr.getFormattedOutput(retContent, sqlRowField, sr.highFreqSqlStatColLens, isAppend))
  1095. return sr.highFreqSqlStat != nil, retList
  1096. }
  1097. func (sr *StatReader) getHighFreqSqlStat(topCount int, sqlId int,
  1098. fields []string) []map[string]interface{} {
  1099. var content []map[string]interface{}
  1100. if topCount != 0 {
  1101. parameters := NewProperties()
  1102. parameters.Set(PROP_NAME_SORT_FIELD, "ExecuteCount")
  1103. parameters.Set(PROP_NAME_SORT_TYPE, "desc")
  1104. parameters.Set(PROP_NAME_PAGE_NUM, "1")
  1105. parameters.Set(PROP_NAME_PAGE_SIZE, strconv.Itoa(topCount))
  1106. content = sr.service(URL_SQL, parameters)
  1107. if sqlId != -1 {
  1108. matchedContent := make([]map[string]interface{}, 0)
  1109. for _, sqlStat := range content {
  1110. idStr := sqlStat["ID"]
  1111. if idStr == sqlId {
  1112. matchedContent = append(matchedContent, sqlStat)
  1113. break
  1114. }
  1115. }
  1116. content = matchedContent
  1117. }
  1118. }
  1119. if content == nil {
  1120. content = make([]map[string]interface{}, 0)
  1121. } else {
  1122. i := 1
  1123. for _, m := range content {
  1124. m[rowNumConstStr] = i
  1125. i++
  1126. }
  1127. }
  1128. content = addTitles(content, fields)
  1129. return content
  1130. }
  1131. func (sr *StatReader) readSlowSqlStat(retList []string, maxCount int) (bool, []string) {
  1132. isAppend := false
  1133. if sr.slowSqlStat == nil {
  1134. sr.slowSqlStat = sr.getSlowSqlStat(StatSlowSqlCount, -1, sqlRowField)
  1135. sr.slowSqlStatColLens = calcColLens(sr.slowSqlStat, sqlRowField,
  1136. COL_MAX_LEN)
  1137. isAppend = false
  1138. } else {
  1139. isAppend = true
  1140. }
  1141. var retContent []map[string]interface{}
  1142. if maxCount > 0 && len(sr.slowSqlStat) > maxCount {
  1143. retContent = sr.slowSqlStat[0:maxCount]
  1144. sr.slowSqlStat = sr.slowSqlStat[maxCount:len(sr.slowSqlStat)]
  1145. } else {
  1146. retContent = sr.slowSqlStat
  1147. sr.slowSqlStat = nil
  1148. }
  1149. retList = append(retList, sr.getFormattedOutput(retContent, sqlRowField, sr.slowSqlStatColLens, isAppend))
  1150. return sr.slowSqlStat != nil, retList
  1151. }
  1152. func (sr *StatReader) getSlowSqlStat(topCount int, sqlId int, fields []string) []map[string]interface{} {
  1153. var content []map[string]interface{}
  1154. if topCount != 0 {
  1155. parameters := NewProperties()
  1156. parameters.Set(PROP_NAME_SORT_FIELD, "MaxTimespan")
  1157. parameters.Set(PROP_NAME_SORT_TYPE, "desc")
  1158. parameters.Set(PROP_NAME_PAGE_NUM, "1")
  1159. parameters.Set(PROP_NAME_PAGE_SIZE, strconv.Itoa(topCount))
  1160. content = sr.service(URL_SQL, parameters)
  1161. if sqlId != -1 {
  1162. matchedContent := make([]map[string]interface{}, 0)
  1163. for _, sqlStat := range content {
  1164. idStr := sqlStat["ID"]
  1165. if idStr == sqlId {
  1166. matchedContent = append(matchedContent, sqlStat)
  1167. break
  1168. }
  1169. }
  1170. content = matchedContent
  1171. }
  1172. }
  1173. if content == nil {
  1174. content = make([]map[string]interface{}, 0)
  1175. } else {
  1176. i := 1
  1177. for _, m := range content {
  1178. m["rowNum"] = i
  1179. i++
  1180. }
  1181. }
  1182. content = addTitles(content, fields)
  1183. return content
  1184. }
  1185. func (sr *StatReader) getConnStat(connId string, fields []string) []map[string]interface{} {
  1186. content := sr.service(URL_DATASOURCE, nil)
  1187. if connId != "" {
  1188. matchedContent := make([]map[string]interface{}, 0)
  1189. for _, dsStat := range content {
  1190. idStr := dsStat["Identity"]
  1191. if connId == idStr {
  1192. matchedContent = append(matchedContent, dsStat)
  1193. break
  1194. }
  1195. }
  1196. content = matchedContent
  1197. }
  1198. if content == nil {
  1199. content = make([]map[string]interface{}, 0)
  1200. } else {
  1201. i := 1
  1202. for _, m := range content {
  1203. m["rowNum"] = i
  1204. i++
  1205. }
  1206. }
  1207. content = addTitles(content, fields)
  1208. return content
  1209. }
  1210. func (sr *StatReader) getFormattedOutput(content []map[string]interface{}, fields []string, colLens []int,
  1211. isAppend bool) string {
  1212. return toTable(content, fields, colLens, true, isAppend)
  1213. }
  1214. func (sr *StatReader) parseUrl(url string) *Properties {
  1215. parameters := NewProperties()
  1216. if url == "" || len(strings.TrimSpace(url)) == 0 {
  1217. return parameters
  1218. }
  1219. parametersStr := util.StringUtil.SubstringBetween(url, "?", "")
  1220. if parametersStr == "" || len(parametersStr) == 0 {
  1221. return parameters
  1222. }
  1223. parametersArray := strings.Split(parametersStr, "&")
  1224. for _, parameterStr := range parametersArray {
  1225. index := strings.Index(parametersStr, "=")
  1226. if index <= 0 {
  1227. continue
  1228. }
  1229. name := parameterStr[0:index]
  1230. value := parameterStr[index+1:]
  1231. parameters.Set(name, value)
  1232. }
  1233. return parameters
  1234. }
  1235. func (sr *StatReader) service(url string, params *Properties) []map[string]interface{} {
  1236. if params != nil {
  1237. params.SetProperties(sr.parseUrl(url))
  1238. } else {
  1239. params = sr.parseUrl(url)
  1240. }
  1241. if strings.Index(url, URL_SQL) == 0 {
  1242. array := sr.getSqlStatList(params)
  1243. array = sr.comparatorOrderBy(array, params)
  1244. params.Set(PROP_NAME_FLUSH_FREQ, strconv.Itoa(StatFlushFreq))
  1245. return array
  1246. } else if strings.Index(url, URL_SQL_DETAIL) == 0 {
  1247. array := sr.getSqlStatDetailList(params)
  1248. return array
  1249. } else if strings.Index(url, URL_DATASOURCE) == 0 {
  1250. array := sr.getConnStatList(params)
  1251. array = sr.comparatorOrderBy(array, params)
  1252. params.Set(PROP_NAME_FLUSH_FREQ, strconv.Itoa(StatFlushFreq))
  1253. return array
  1254. } else if strings.Index(url, URL_DATASOURCE_DETAIL) == 0 {
  1255. array := sr.getConnStatDetailList(params)
  1256. return array
  1257. } else {
  1258. return nil
  1259. }
  1260. }
  1261. func (sr *StatReader) getSqlStatList(params *Properties) []map[string]interface{} {
  1262. array := make([]map[string]interface{}, 0)
  1263. connStatMap := goStat.getConnStatMap()
  1264. var sqlStatMap map[string]*sqlStat
  1265. for _, connStat := range connStatMap {
  1266. sqlStatMap = connStat.getSqlStatMap()
  1267. for _, sqlStat := range sqlStatMap {
  1268. data := sqlStat.getData()
  1269. executeCount := data[executeCountConstStr]
  1270. runningCount := data[runningCountConstStr]
  1271. if executeCount == 0 && runningCount == 0 {
  1272. continue
  1273. }
  1274. array = append(array, data)
  1275. }
  1276. }
  1277. return array
  1278. }
  1279. func (sr *StatReader) getSqlStatDetailList(params *Properties) []map[string]interface{} {
  1280. array := make([]map[string]interface{}, 0)
  1281. connStatMap := goStat.getConnStatMap()
  1282. var data *sqlStat
  1283. sqlId := ""
  1284. dsId := ""
  1285. if v := params.GetString(PROP_NAME_SQL_ID, ""); v != "" {
  1286. sqlId = v
  1287. }
  1288. if v := params.GetString(PROP_NAME_DATASOURCE_ID, ""); v != "" {
  1289. dsId = v
  1290. }
  1291. if sqlId != "" && dsId != "" {
  1292. for _, connStat := range connStatMap {
  1293. if dsId != connStat.id {
  1294. continue
  1295. } else {
  1296. sqlStatMap := connStat.getSqlStatMap()
  1297. for _, sqlStat := range sqlStatMap {
  1298. if sqlId == sqlStat.Id {
  1299. data = sqlStat
  1300. break
  1301. }
  1302. }
  1303. }
  1304. break
  1305. }
  1306. }
  1307. if data != nil {
  1308. array = append(array, data.getData())
  1309. }
  1310. return array
  1311. }
  1312. func (sr *StatReader) getConnStatList(params *Properties) []map[string]interface{} {
  1313. array := make([]map[string]interface{}, 0)
  1314. connStatMap := goStat.getConnStatMap()
  1315. id := ""
  1316. if v := params.GetString(PROP_NAME_DATASOURCE_ID, ""); v != "" {
  1317. id = v
  1318. }
  1319. for _, connStat := range connStatMap {
  1320. data := connStat.getData()
  1321. connCount := data["ConnCount"]
  1322. if connCount == 0 {
  1323. continue
  1324. }
  1325. if id != "" {
  1326. if id == connStat.id {
  1327. array = append(array, data)
  1328. break
  1329. } else {
  1330. continue
  1331. }
  1332. } else {
  1333. array = append(array, data)
  1334. }
  1335. }
  1336. return array
  1337. }
  1338. func (sr *StatReader) getConnStatDetailList(params *Properties) []map[string]interface{} {
  1339. array := make([]map[string]interface{}, 0)
  1340. var data *connectionStat
  1341. connStatMap := goStat.getConnStatMap()
  1342. id := ""
  1343. if v := params.GetString(PROP_NAME_DATASOURCE_ID, ""); v != "" {
  1344. id = v
  1345. }
  1346. if id != "" {
  1347. for _, connStat := range connStatMap {
  1348. if id == connStat.id {
  1349. data = connStat
  1350. break
  1351. }
  1352. }
  1353. }
  1354. if data != nil {
  1355. dataValue := data.getValue(false)
  1356. m := make(map[string]interface{}, 2)
  1357. m["name"] = "数据源"
  1358. m["value"] = dataValue.url
  1359. array = append(array, m)
  1360. m = make(map[string]interface{}, 2)
  1361. m["name"] = "总会话数"
  1362. m["value"] = dataValue.connCount
  1363. array = append(array, m)
  1364. m = make(map[string]interface{}, 2)
  1365. m["name"] = "活动会话数"
  1366. m["value"] = dataValue.activeConnCount
  1367. array = append(array, m)
  1368. m = make(map[string]interface{}, 2)
  1369. m["name"] = "活动会话数峰值"
  1370. m["value"] = dataValue.maxActiveStmtCount
  1371. array = append(array, m)
  1372. m = make(map[string]interface{}, 2)
  1373. m["name"] = "总句柄数"
  1374. m["value"] = dataValue.stmtCount
  1375. array = append(array, m)
  1376. m = make(map[string]interface{}, 2)
  1377. m["name"] = "活动句柄数"
  1378. m["value"] = dataValue.activeStmtCount
  1379. array = append(array, m)
  1380. m = make(map[string]interface{}, 2)
  1381. m["name"] = "活动句柄数峰值"
  1382. m["value"] = dataValue.maxActiveStmtCount
  1383. array = append(array, m)
  1384. m = make(map[string]interface{}, 2)
  1385. m["name"] = "执行次数"
  1386. m["value"] = dataValue.executeCount
  1387. array = append(array, m)
  1388. m = make(map[string]interface{}, 2)
  1389. m["name"] = "执行出错次数"
  1390. m["value"] = dataValue.errorCount
  1391. array = append(array, m)
  1392. m = make(map[string]interface{}, 2)
  1393. m["name"] = "提交次数"
  1394. m["value"] = dataValue.commitCount
  1395. array = append(array, m)
  1396. m = make(map[string]interface{}, 2)
  1397. m["name"] = "回滚次数"
  1398. m["value"] = dataValue.rollbackCount
  1399. array = append(array, m)
  1400. }
  1401. return array
  1402. }
  1403. type mapSlice struct {
  1404. m []map[string]interface{}
  1405. isDesc bool
  1406. orderByKey string
  1407. }
  1408. func newMapSlice(m []map[string]interface{}, isDesc bool, orderByKey string) *mapSlice {
  1409. ms := new(mapSlice)
  1410. ms.m = m
  1411. ms.isDesc = isDesc
  1412. ms.orderByKey = orderByKey
  1413. return ms
  1414. }
  1415. func (ms mapSlice) Len() int { return len(ms.m) }
  1416. func (ms mapSlice) Less(i, j int) bool {
  1417. m1 := ms.m[i]
  1418. m2 := ms.m[j]
  1419. v1 := m1[ms.orderByKey]
  1420. v2 := m2[ms.orderByKey]
  1421. if v1 == nil {
  1422. return true
  1423. } else if v2 == nil {
  1424. return false
  1425. }
  1426. switch v1.(type) {
  1427. case int64:
  1428. return v1.(int64) < v2.(int64)
  1429. case float64:
  1430. return v1.(float64) < v2.(float64)
  1431. default:
  1432. return true
  1433. }
  1434. }
  1435. func (ms mapSlice) Swap(i, j int) {
  1436. ms.m[i], ms.m[j] = ms.m[j], ms.m[i]
  1437. }
  1438. func (sr *StatReader) comparatorOrderBy(array []map[string]interface{}, params *Properties) []map[string]interface{} {
  1439. if array == nil {
  1440. array = make([]map[string]interface{}, 0)
  1441. }
  1442. orderBy := DEFAULT_ORDERBY
  1443. orderType := DEFAULT_ORDER_TYPE
  1444. pageNum := DEFAULT_PAGE_NUM
  1445. pageSize := DEFAULT_PAGE_SIZE
  1446. if params != nil {
  1447. if v := params.GetTrimString(PROP_NAME_SORT_FIELD, ""); v != "" {
  1448. orderBy = v
  1449. }
  1450. if v := params.GetTrimString(PROP_NAME_SORT_TYPE, ""); v != "" {
  1451. orderType = v
  1452. }
  1453. if v := params.GetTrimString(PROP_NAME_PAGE_NUM, ""); v != "" {
  1454. var err error
  1455. pageNum, err = strconv.Atoi(v)
  1456. if err != nil {
  1457. pageNum = DEFAULT_PAGE_NUM
  1458. }
  1459. }
  1460. if v := params.GetTrimString(PROP_NAME_PAGE_SIZE, ""); v != "" {
  1461. var err error
  1462. pageSize, err = strconv.Atoi(v)
  1463. if err != nil {
  1464. pageSize = DEFAULT_PAGE_SIZE
  1465. }
  1466. }
  1467. }
  1468. rowCount := len(array)
  1469. pageCount := int(math.Ceil(float64(rowCount * 1.0 / pageSize)))
  1470. if pageCount < 1 {
  1471. pageCount = 1
  1472. }
  1473. if pageNum > pageCount {
  1474. pageNum = pageCount
  1475. }
  1476. if len(array) > 0 {
  1477. if orderBy != "" {
  1478. sort.Sort(newMapSlice(array, !(DEFAULT_ORDER_TYPE == orderType), orderBy))
  1479. }
  1480. fromIndex := (pageNum - 1) * pageSize
  1481. toIndex := pageNum * pageSize
  1482. if toIndex > rowCount {
  1483. toIndex = rowCount
  1484. }
  1485. array = array[fromIndex:toIndex]
  1486. }
  1487. sr.resetPageInfo(params, rowCount, pageCount, pageNum)
  1488. return array
  1489. }
  1490. func (sr *StatReader) resetPageInfo(params *Properties, rowCount int, pageCount int, pageNum int) {
  1491. if params != nil {
  1492. v := params.GetString(PROP_NAME_PAGE_SIZE, "")
  1493. if v != "" {
  1494. params.Set(PROP_NAME_PAGE_COUNT, strconv.Itoa(pageCount))
  1495. params.Set(PROP_NAME_TOTAL_ROW_COUNT, strconv.Itoa(rowCount))
  1496. params.Set(PROP_NAME_PAGE_NUM, strconv.Itoa(pageNum))
  1497. }
  1498. }
  1499. }
  1500. const COL_MAX_LEN = 32
  1501. func calcColLens(objList []map[string]interface{}, fields []string, maxColLen int) []int {
  1502. colLen := 0
  1503. colVal := ""
  1504. colLens := make([]int, len(fields))
  1505. for _, obj := range objList {
  1506. for i := 0; i < len(fields); i++ {
  1507. colVal = getColValue(obj[fields[i]])
  1508. colLen = len(colVal)
  1509. if colLen > colLens[i] {
  1510. colLens[i] = colLen
  1511. }
  1512. }
  1513. }
  1514. if maxColLen > 0 {
  1515. for i := 0; i < len(fields); i++ {
  1516. if colLens[i] > maxColLen {
  1517. colLens[i] = maxColLen
  1518. }
  1519. }
  1520. }
  1521. return colLens
  1522. }
  1523. func addTitles(objList []map[string]interface{}, fields []string) []map[string]interface{} {
  1524. titleMap := make(map[string]interface{}, len(fields))
  1525. for i := 0; i < len(fields); i++ {
  1526. titleMap[fields[i]] = fields[i]
  1527. }
  1528. dst := append(objList, titleMap)
  1529. copy(dst[1:], dst[:len(dst)-1])
  1530. dst[0] = titleMap
  1531. return dst
  1532. }
  1533. func toTable(objList []map[string]interface{}, fields []string, colLens []int,
  1534. showAll bool, append bool) string {
  1535. if fields == nil || objList == nil {
  1536. return ""
  1537. }
  1538. if colLens == nil {
  1539. colLens = calcColLens(objList, fields, COL_MAX_LEN)
  1540. }
  1541. output := &strings.Builder{}
  1542. if !append {
  1543. sepLine(output, colLens)
  1544. }
  1545. for _, obj := range objList {
  1546. objMore := obj
  1547. for objMore != nil {
  1548. objMore = formateLine(output, objMore, fields, colLens, showAll)
  1549. }
  1550. sepLine(output, colLens)
  1551. }
  1552. return output.String()
  1553. }
  1554. func formateLine(output *strings.Builder, obj map[string]interface{}, fields []string, colLens []int,
  1555. showAll bool) map[string]interface{} {
  1556. hasMore := false
  1557. objMore := make(map[string]interface{})
  1558. colLen := 0
  1559. colVal := ""
  1560. for i := 0; i < len(fields); i++ {
  1561. colVal = getColValue(obj[fields[i]])
  1562. colLen = len(colVal)
  1563. if colLen <= colLens[i] {
  1564. output.WriteString("|")
  1565. output.WriteString(colVal)
  1566. blanks(output, colLens[i]-colLen)
  1567. if showAll {
  1568. objMore[fields[i]] = ""
  1569. }
  1570. } else {
  1571. output.WriteString("|")
  1572. if showAll {
  1573. output.WriteString(colVal[0:colLens[i]])
  1574. objMore[fields[i]] = colVal[colLens[i]:]
  1575. hasMore = true
  1576. } else {
  1577. output.WriteString(colVal[0:colLens[i]-3] + "...")
  1578. }
  1579. }
  1580. }
  1581. output.WriteString("|")
  1582. output.WriteString(util.StringUtil.LineSeparator())
  1583. if hasMore {
  1584. return objMore
  1585. } else {
  1586. return nil
  1587. }
  1588. }
  1589. func sepLine(output *strings.Builder, colLens []int) {
  1590. output.WriteString("+")
  1591. for _, colLen := range colLens {
  1592. for i := 0; i < colLen; i++ {
  1593. output.WriteString("+")
  1594. }
  1595. output.WriteString("+")
  1596. }
  1597. output.WriteString(util.StringUtil.LineSeparator())
  1598. }
  1599. func blanks(output *strings.Builder, count int) {
  1600. for count > 0 {
  1601. output.WriteString(" ")
  1602. count--
  1603. }
  1604. }
  1605. func getColValue(colObj interface{}) string {
  1606. var colVal string
  1607. if colObj == nil {
  1608. colVal = ""
  1609. } else {
  1610. colVal = fmt.Sprint(colObj)
  1611. }
  1612. colVal = strings.Replace(colVal, "\t", "", -1)
  1613. colVal = strings.Replace(colVal, "\n", "", -1)
  1614. colVal = strings.Replace(colVal, "\r", "", -1)
  1615. return colVal
  1616. }
  1617. const (
  1618. READ_MAX_SIZE = 100
  1619. )
  1620. type statFlusher struct {
  1621. sr *StatReader
  1622. logList []string
  1623. date string
  1624. logFile *os.File
  1625. flushFreq int
  1626. filePath string
  1627. filePrefix string
  1628. buffer *Dm_build_283
  1629. }
  1630. func newStatFlusher() *statFlusher {
  1631. sf := new(statFlusher)
  1632. sf.sr = newStatReader()
  1633. sf.logList = make([]string, 0, 32)
  1634. sf.date = time.Now().Format("2006-01-02")
  1635. sf.flushFreq = StatFlushFreq
  1636. sf.filePath = StatDir
  1637. sf.filePrefix = "dm_go_stat"
  1638. sf.buffer = Dm_build_287()
  1639. return sf
  1640. }
  1641. func (sf *statFlusher) isConnStatEnabled() bool {
  1642. return StatEnable
  1643. }
  1644. func (sf *statFlusher) isSlowSqlStatEnabled() bool {
  1645. return StatEnable
  1646. }
  1647. func (sf *statFlusher) isHighFreqSqlStatEnabled() bool {
  1648. return StatEnable
  1649. }
  1650. func (sf *statFlusher) doRun() {
  1651. for {
  1652. if len(goStat.connStatMap) > 0 {
  1653. sf.logList = append(sf.logList, time.Now().String())
  1654. if sf.isConnStatEnabled() {
  1655. sf.logList = append(sf.logList, "#connection stat")
  1656. hasMore := true
  1657. for hasMore {
  1658. hasMore, sf.logList = sf.sr.readConnStat(sf.logList, READ_MAX_SIZE)
  1659. sf.writeAndFlush(sf.logList, 0, len(sf.logList))
  1660. sf.logList = sf.logList[0:0]
  1661. }
  1662. }
  1663. if sf.isHighFreqSqlStatEnabled() {
  1664. sf.logList = append(sf.logList, "#top "+strconv.Itoa(StatHighFreqSqlCount)+" high freq sql stat")
  1665. hasMore := true
  1666. for hasMore {
  1667. hasMore, sf.logList = sf.sr.readHighFreqSqlStat(sf.logList, READ_MAX_SIZE)
  1668. sf.writeAndFlush(sf.logList, 0, len(sf.logList))
  1669. sf.logList = sf.logList[0:0]
  1670. }
  1671. }
  1672. if sf.isSlowSqlStatEnabled() {
  1673. sf.logList = append(sf.logList, "#top "+strconv.Itoa(StatSlowSqlCount)+" slow sql stat")
  1674. hasMore := true
  1675. for hasMore {
  1676. hasMore, sf.logList = sf.sr.readSlowSqlStat(sf.logList, READ_MAX_SIZE)
  1677. sf.writeAndFlush(sf.logList, 0, len(sf.logList))
  1678. sf.logList = sf.logList[0:0]
  1679. }
  1680. }
  1681. sf.logList = append(sf.logList, util.StringUtil.LineSeparator())
  1682. sf.logList = append(sf.logList, util.StringUtil.LineSeparator())
  1683. sf.writeAndFlush(sf.logList, 0, len(sf.logList))
  1684. sf.logList = sf.logList[0:0]
  1685. time.Sleep(time.Duration(StatFlushFreq) * time.Second)
  1686. }
  1687. }
  1688. }
  1689. func (sf *statFlusher) writeAndFlush(logs []string, startOff int, l int) {
  1690. var bytes []byte
  1691. for i := startOff; i < startOff+l; i++ {
  1692. bytes = []byte(logs[i] + util.StringUtil.LineSeparator())
  1693. sf.buffer.Dm_build_309(bytes, 0, len(bytes))
  1694. if sf.buffer.Dm_build_288() >= FLUSH_SIZE {
  1695. sf.doFlush(sf.buffer)
  1696. }
  1697. }
  1698. if sf.buffer.Dm_build_288() > 0 {
  1699. sf.doFlush(sf.buffer)
  1700. }
  1701. }
  1702. func (sf *statFlusher) doFlush(buffer *Dm_build_283) {
  1703. if sf.needCreateNewFile() {
  1704. sf.closeCurrentFile()
  1705. sf.logFile = sf.createNewFile()
  1706. }
  1707. if sf.logFile != nil {
  1708. buffer.Dm_build_303(sf.logFile, buffer.Dm_build_288())
  1709. }
  1710. }
  1711. func (sf *statFlusher) closeCurrentFile() {
  1712. if sf.logFile != nil {
  1713. sf.logFile.Close()
  1714. sf.logFile = nil
  1715. }
  1716. }
  1717. func (sf *statFlusher) createNewFile() *os.File {
  1718. sf.date = time.Now().Format("2006-01-02")
  1719. fileName := sf.filePrefix + "_" + sf.date + "_" + strconv.Itoa(time.Now().Nanosecond()) + ".txt"
  1720. sf.filePath = StatDir
  1721. if len(sf.filePath) > 0 {
  1722. if _, err := os.Stat(sf.filePath); err != nil {
  1723. os.MkdirAll(sf.filePath, 0755)
  1724. }
  1725. if _, err := os.Stat(sf.filePath + fileName); err != nil {
  1726. logFile, err := os.Create(sf.filePath + fileName)
  1727. if err != nil {
  1728. fmt.Println(err)
  1729. return nil
  1730. }
  1731. return logFile
  1732. }
  1733. }
  1734. return nil
  1735. }
  1736. func (sf *statFlusher) needCreateNewFile() bool {
  1737. now := time.Now().Format("2006-01-02")
  1738. fileInfo, err := sf.logFile.Stat()
  1739. return now != sf.date || err != nil || sf.logFile == nil || fileInfo.Size() > int64(MAX_FILE_SIZE)
  1740. }