linux 下利用ls grep 和正则表达式
926 2023-04-03 02:51:09
概览:
以下主要使用Golang作为编程语言
我觉得使用连接池最大的一个好处就是减少连接的创建和关闭,增加系统负载能力, 之前就有遇到一个问题:TCP TIME_WAIT连接数过多导致服务不可用,因为未开启数据库连接池,再加上mysql并发较大,导致需要频繁的创建链接,最终产生了上万的TIME_WAIT的tcp链接,影响了系统性能。
链接池中的的功能主要是管理一堆的链接,包括创建和关闭,所以自己在fatih/pool基础上,改造了一下:https://github.com/silenceper/pool ,使得更加通用一些,增加的一些功能点如下:
net.Conn
,变为了interface{}
(池中存储自己想要的格式)主要是用到了channel
来管理连接,并且能够很好的利用管道的顺序性,当需要使用的时候Get
一个连接,使用完毕之后Put
放回channel
中。
使用连接池之后就不再是短连接,而是长连接了,就引发了一些问题:
因为网络环境是复杂的,中间可能因为防火墙等原因,导致长时间空闲的连接会断开,所以可以通过两个方法来解决:
在https://github.com/silenceper/pool就增加了一个这样最大空闲时间的参数,在连接创建或者连接被重新返回连接池中时重置,给每个连接都增加了一个连接的创建时间,在取出的时候对时间进行比较:https://github.com/silenceper/pool/blob/master/channel.go#L85
远程服务端很有可能重启,那么之前创建的链接就失效了。客户端在使用的时候就需要判断这些失效的连接并丢弃,在database/sql
中就判断了这些失效的连接,使用这种错误表示var ErrBadConn = errors.New("driver: bad connection")
另外值得一提的就是在database/sql
对这种ErrBadConn
错误进行了重试,默认重试次数是两次,所以能够保证即便是链接失效或者断开了,本次的请求能够正常响应(继续往下看就是分析了)。
连接失效的特征
EOF
错误write tcp 127.0.0.1:52089->127.0.0.1:8002: write: broken pipe
错误在database/sql
中使用连接连接池很简单,主要涉及下面这些配置:
db.SetMaxIdleConns(10) //连接池中最大空闲连接数db.SetMaxOpenConns(20) //打开的最大连接数db.SetConnMaxLifetime(300*time.Second)//连接的最大空闲时间(可选)
注:如果
MaxIdleConns
大于0并且MaxOpenConns
小于MaxIdleConns
,那么会将MaxIdleConns
置为MaxIdleConns
来看下db这个结构,以及字段相关说明:
type DB struct {//具体的数据库实现的interface{},//例如https://github.com/go-sql-driver/mysql 就注册并并实现了driver.Open方法,主要是在里面实现了一些鉴权的操作driver driver.Driver //dsn连接dsn string//在prepared statement中用到numClosed uint64mu sync.Mutex // protects following fields//可使用的空闲的链接freeConn []*driverConn//用来传递连接请求的管道connRequests []chan connRequest//当前打开的连接数numOpen int//当需要创建新的链接的时候,往这个管道中发送一个struct数据,//因为在Open数据库的就启用了一个goroutine执行connectionOpener方法读取管道中的数据openerCh chan struct{}//数据库是否已经被关闭closed bool//用来保证锁被正确的关闭dep map[finalCloser]depSet//stacktrace of last conn's put; debug onlylastPut map[*driverConn]string //最大空闲连接maxIdle int //最大打开的连接maxOpen int //连接的最大空闲时间maxLifetime time.Duration //定时清理空闲连接的管道cleanerCh chan struct{}}
看一个查询数据库的例子:
rows, err := db.Query("select * from table1")
在调用db.Query
方法如下:
func (db *DB) Query(query string, args ...interface{}) (*Rows, error) {var rows *Rowsvar err error//这里就做了对失效的链接的重试操作for i := 0; i < maxBadConnRetries; i++ {rows, err = db.query(query, args, cachedOrNewConn)if err != driver.ErrBadConn {break}}if err == driver.ErrBadConn {return db.query(query, args, alwaysNewConn)}return rows, err}
在什么情况下会返回,可以从这里看到: readPack,writePack
继续跟进去就到了
func (db *DB) conn(strategy connReuseStrategy) (*driverConn, error) {
方法主要是创建tcp连接,并判断了连接的生存时间lifetime,以及连接数的一些限制,如果超过的设定的最大打开链接数限制等待connRequest
管道中有连接产生(在putConn
释放链接的时候就会往这个管道中写入数据)
何时释放链接?
当我们调用rows.Close()
的时候,就会把当前正在使用的链接重新放回freeConn
或者写入到db.connRequests
管道中
//putConnDBLocked 方法//如果有db.connRequests有在等待连接的话,就把当前连接给它用if c := len(db.connRequests); c > 0 {req := db.connRequests[0]// This copy is O(n) but in practice faster than a linked list.// TODO: consider compacting it down less often and// moving the base instead?copy(db.connRequests, db.connRequests[1:])db.connRequests = db.connRequests[:c-1]if err == nil {dc.inUse = true}req <- connRequest{conn: dc,err: err,}return true} else if err == nil && !db.closed && db.maxIdleConnsLocked() > len(db.freeConn) {//没人需要我这个链接,我就把他重新返回`freeConn`连接池中db.freeConn = append(db.freeConn, dc)db.startCleanerLocked()return true}
这里是使用连接池https://github.com/silenceper/pool,如何构建一个thrift链接
客户端创建Thrift的代码:
type Client struct {*user.UserClient}//创建Thrift客户端链接的方法factory := func() (interface{}, error) {protocolFactory := thrift.NewTBinaryProtocolFactoryDefault()transportFactory := thrift.NewTTransportFactory()var transport thrift.TTransportvar err errortransport, err = thrift.NewTSocket(rpcConfig.Listen)if err != nil {panic(err)}transport = transportFactory.GetTransport(transport)//defer transport.Close()if err := transport.Open(); err != nil {panic(err)}rpcClient := user.NewUserClientFactory(transport, protocolFactory)//在连接池中直接放置Client对象return &Client{UserClient: rpcClient}, nil}//关闭连接的方法close := func(v interface{}) error {v.(*Client).Transport.Close()return nil}//创建了一个 初始化连接是poolConfig := &pool.PoolConfig{InitialCap: 10,MaxCap: 20,Factory: factory,Close: close,IdleTimeout: 300 * time.Second,}p, err := pool.NewChannelPool(poolConfig)if err != nil {panic(err)}//取得链接conn, err := p.Get()if err != nil {return nil, err}v, ok := conn.(*Client)...使用连接调用远程方法//将连接重新放回连接池中p.Put(conn)
pool连接池代码地址:https://github.com/silenceper/pool
原文地址:http://silenceper.com/blog/201611/%E8%81%8A%E8%81%8Atcp%E8%BF%9E%E6%8E%A5%E6%B1%A0/