From f7858b6f74cfc69f9250baea44c2a858e974c9cc Mon Sep 17 00:00:00 2001 From: jc3wish <> Date: Fri, 6 Oct 2023 18:16:30 +0800 Subject: [PATCH] =?UTF-8?q?=E5=BD=93=E6=B7=BB=E5=8A=A0AllTabls=20=E7=9A=84?= =?UTF-8?q?=E6=97=B6=E5=80=99=20,=E6=94=AF=E6=8C=81=E5=8F=AA=E6=8C=87?= =?UTF-8?q?=E5=AE=9A=E5=90=8C=E6=AD=A5=E6=9F=90=E4=BA=9B=E8=A1=A8=20https:?= =?UTF-8?q?//github.com/brokercap/Bifrost/issues/253?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- admin/controller/db_detail.go | 19 +++--- admin/controller/table.go | 5 +- admin/view/template/db.detail.html | 22 +++++-- admin/view/template/db.detail.table.add.html | 34 +++++++++-- server/channel_consume.go | 8 +++ server/db.go | 12 ++-- server/recovery.go | 2 +- server/table.go | 62 ++++++++++---------- 8 files changed, 106 insertions(+), 58 deletions(-) diff --git a/admin/controller/db_detail.go b/admin/controller/db_detail.go index cb15626c..230dc6bd 100644 --- a/admin/controller/db_detail.go +++ b/admin/controller/db_detail.go @@ -25,8 +25,8 @@ import ( func (c *DBController) Detail() { DbName := c.Ctx.Request.Form.Get("DbName") dbInfo := server.GetDBObj(DbName) - o :=inputDriver.Open(dbInfo.InputType,inputDriver.InputInfo{ConnectUri:dbInfo.ConnectUri}) - DataBaseList,_ := o.GetSchemaList() + o := inputDriver.Open(dbInfo.InputType, inputDriver.InputInfo{ConnectUri: dbInfo.ConnectUri}) + DataBaseList, _ := o.GetSchemaList() DataBaseList = append(DataBaseList, "AllDataBases") c.SetData("DbName", DbName) @@ -34,7 +34,7 @@ func (c *DBController) Detail() { c.SetData("ToServerList", toserver.GetToServerMap()) c.SetData("ChannelList:", dbInfo.ListChannel()) c.SetData("Title", DbName+" - Detail") - c.AddAdminTemplate("db.detail.html","header.html","db.detail.table.add.html","db.detail.history.add.html","footer.html") + c.AddAdminTemplate("db.detail.html", "header.html", "db.detail.table.add.html", "db.detail.history.add.html", "footer.html") } func (c *DBController) GetTableFields() { @@ -46,8 +46,8 @@ func (c *DBController) GetTableFields() { TableName = tansferTableName(TableName) dbInfo := server.GetDBObj(DbName) - o :=inputDriver.Open(dbInfo.InputType,inputDriver.InputInfo{ConnectUri:dbInfo.ConnectUri}) - TableFieldsList,_ := o.GetSchemaTableFieldList(SchemaName, TableName) + o := inputDriver.Open(dbInfo.InputType, inputDriver.InputInfo{ConnectUri: dbInfo.ConnectUri}) + TableFieldsList, _ := o.GetSchemaTableFieldList(SchemaName, TableName) c.SetJsonData(TableFieldsList) c.StopServeJSON() } @@ -56,17 +56,18 @@ func (c *DBController) TableList() { DbName := c.Ctx.Request.Form.Get("DbName") SchemaName := c.Ctx.Request.Form.Get("SchemaName") DBObj := server.GetDBObj(DbName) - o :=inputDriver.Open(DBObj.InputType,inputDriver.InputInfo{ConnectUri:DBObj.ConnectUri}) + o := inputDriver.Open(DBObj.InputType, inputDriver.InputInfo{ConnectUri: DBObj.ConnectUri}) type ResultType struct { TableName string ChannelName string AddStatus bool TableType string IgnoreTable string + DoTable string } var data []ResultType data = make([]ResultType, 0) - TableList,_ := o.GetSchemaTableList(SchemaName) + TableList, _ := o.GetSchemaTableList(SchemaName) TableList = append(TableList, inputDriver.TableList{TableName: "AllTables", TableType: "LIKE"}) var schemaName0, tableName0 string schemaName0 = tansferSchemaName(SchemaName) @@ -83,7 +84,7 @@ func (c *DBController) TableList() { if t2 == nil { data = append(data, ResultType{TableName: tableName, ChannelName: "", AddStatus: false, TableType: tableType}) } else { - data = append(data, ResultType{TableName: tableName, ChannelName: t2.Name, AddStatus: true, TableType: tableType, IgnoreTable: t.IgnoreTable}) + data = append(data, ResultType{TableName: tableName, ChannelName: t2.Name, AddStatus: true, TableType: tableType, IgnoreTable: t.IgnoreTable, DoTable: t.DoTable}) } } } @@ -107,4 +108,4 @@ func (c *DBController) TableList() { } c.SetJsonData(data) c.StopServeJSON() -} \ No newline at end of file +} diff --git a/admin/controller/table.go b/admin/controller/table.go index 53e595ac..014753c2 100644 --- a/admin/controller/table.go +++ b/admin/controller/table.go @@ -30,6 +30,7 @@ type TableParam struct { SchemaName string TableName string IgnoreTable string + DoTable string ChannelId int Id int } @@ -61,7 +62,7 @@ func (c *TableController) Add() { }() SchemaName := tansferSchemaName(param.SchemaName) TableName := tansferTableName(param.TableName) - err := server.AddTable(param.DbName, SchemaName, TableName, param.IgnoreTable, param.ChannelId) + err := server.AddTable(param.DbName, SchemaName, TableName, param.IgnoreTable, param.DoTable, param.ChannelId) if err != nil { result.Msg = err.Error() return @@ -78,7 +79,7 @@ func (c *TableController) Update() { }() SchemaName := tansferSchemaName(param.SchemaName) TableName := tansferTableName(param.TableName) - err := server.UpdateTable(param.DbName, SchemaName, TableName, param.IgnoreTable) + err := server.UpdateTable(param.DbName, SchemaName, TableName, param.IgnoreTable, param.DoTable) if err != nil { result.Msg = err.Error() return diff --git a/admin/view/template/db.detail.html b/admin/view/template/db.detail.html index 32f6487f..10e541cf 100644 --- a/admin/view/template/db.detail.html +++ b/admin/view/template/db.detail.html @@ -215,14 +215,24 @@ diff --git a/server/channel_consume.go b/server/channel_consume.go index 70b8879d..fe89b15c 100755 --- a/server/channel_consume.go +++ b/server/channel_consume.go @@ -246,6 +246,14 @@ func (This *consume_channel_obj) consumeChannel() { func (This *consume_channel_obj) checkIgnoreTable(t *Table, TableName string) bool { This.db.RLock() + if len(t.doTableMap) > 0 { + if _, ok := t.doTableMap[TableName]; ok { + This.db.RUnlock() + return false + } + This.db.RUnlock() + return true + } if _, ok := t.ignoreTableMap[TableName]; ok { This.db.RUnlock() return true diff --git a/server/db.go b/server/db.go index 6f47cf05..08343b27 100755 --- a/server/db.go +++ b/server/db.go @@ -632,7 +632,7 @@ func (db *db) IgnoreTableToMap(IgnoreTable string) map[string]bool { return m } -func (db *db) AddTable(schemaName string, tableName string, IgnoreTable string, ChannelKey int, LastToServerID int) bool { +func (db *db) AddTable(schemaName string, tableName string, IgnoreTable string, DoTable string, ChannelKey int, LastToServerID int) bool { key := GetSchemaAndTableJoin(schemaName, tableName) db.Lock() defer db.Unlock() @@ -644,18 +644,20 @@ func (db *db) AddTable(schemaName string, tableName string, IgnoreTable string, ToServerList: make([]*ToServer, 0), LastToServerID: LastToServerID, likeTableList: make([]*Table, 0), + DoTable: DoTable, + doTableMap: db.IgnoreTableToMap(DoTable), IgnoreTable: IgnoreTable, ignoreTableMap: db.IgnoreTableToMap(IgnoreTable), } db.addLikeTable(db.tableMap[key], schemaName, tableName) - log.Println("AddTable", db.Name, schemaName, tableName, db.channelMap[ChannelKey].Name, " IgnoreTable:", IgnoreTable) + log.Println("AddTable", db.Name, schemaName, tableName, db.channelMap[ChannelKey].Name, " IgnoreTable:", IgnoreTable, "DoTable:", DoTable) count.SetTable(db.Name, key) } return true } // 修改 模糊匹配的表规则 需要过滤哪些表不进行匹配 -func (db *db) UpdateTable(schemaName string, tableName string, IgnoreTable string) bool { +func (db *db) UpdateTable(schemaName string, tableName string, IgnoreTable string, DoTable string) bool { key := GetSchemaAndTableJoin(schemaName, tableName) db.Lock() defer db.Unlock() @@ -663,9 +665,11 @@ func (db *db) UpdateTable(schemaName string, tableName string, IgnoreTable strin log.Println("UpdateTable ", db.Name, schemaName, tableName, " not exsit ") return false } + db.tableMap[key].DoTable = DoTable + db.tableMap[key].doTableMap = db.IgnoreTableToMap(DoTable) db.tableMap[key].IgnoreTable = IgnoreTable db.tableMap[key].ignoreTableMap = db.IgnoreTableToMap(IgnoreTable) - log.Println("UpdateTable", db.Name, schemaName, tableName, "IgnoreTable:", IgnoreTable) + log.Println("UpdateTable", db.Name, schemaName, tableName, "IgnoreTable:", IgnoreTable, "DoTable:", DoTable) return true } diff --git a/server/recovery.go b/server/recovery.go index d51a36a1..76896e3f 100755 --- a/server/recovery.go +++ b/server/recovery.go @@ -270,7 +270,7 @@ func recoveryData(data map[string]dbSaveInfo, isStop bool) { } schemaName, tableName := GetSchemaAndTableBySplit(tInfo.key) - db.AddTable(schemaName, tableName, tInfo.IgnoreTable, channelIDMap[tInfo.ChannelKey], tInfo.LastToServerID) + db.AddTable(schemaName, tableName, tInfo.IgnoreTable, tInfo.DoTable, channelIDMap[tInfo.ChannelKey], tInfo.LastToServerID) for _, toServer := range tInfo.ToServerList { toServerBinlogPositionFromDB, _ := getBinlogPosition(getToServerBinlogkey(db, toServer)) var toServerBinlog *PositionStruct diff --git a/server/table.go b/server/table.go index 9818dfd3..29e952d5 100755 --- a/server/table.go +++ b/server/table.go @@ -20,60 +20,58 @@ import ( "sync" ) - type Table struct { sync.RWMutex - key string // schema+table 组成的key - Name string - ChannelKey int - LastToServerID int - ToServerList []*ToServer - likeTableList []*Table // 关联了哪些 模糊匹配的配置 - regexpErr bool // 是否执行正则表达式错误,如果 true,则下一次不会再执行,直接错过 - IgnoreTable string // 假如是模糊匹配的时候,指定某些表不进行匹配,逗号隔开 - ignoreTableMap map[string]bool // 指定某些表不进行匹配的表数据 map 格式 + key string // schema+table 组成的key + Name string + ChannelKey int + LastToServerID int + ToServerList []*ToServer + likeTableList []*Table // 关联了哪些 模糊匹配的配置 + regexpErr bool // 是否执行正则表达式错误,如果 true,则下一次不会再执行,直接错过 + IgnoreTable string // 假如是模糊匹配的时候,指定某些表不进行匹配,逗号隔开 + ignoreTableMap map[string]bool // 指定某些表不进行匹配的表数据 map 格式 + DoTable string + doTableMap map[string]bool // 指定某些表进行匹配的表数据 map 格式 } - -func AddTable(db ,schema ,tableName ,IgnoreTable string,channelId int) error{ - if _,ok:=DbList[db];!ok{ - return fmt.Errorf(db+" not exsit") +func AddTable(db, schema, tableName, IgnoreTable string, DoTable string, channelId int) error { + if _, ok := DbList[db]; !ok { + return fmt.Errorf(db + " not exsit") } - if DbList[db].AddTable(schema,tableName,IgnoreTable,channelId,0) == true{ + if DbList[db].AddTable(schema, tableName, IgnoreTable, DoTable, channelId, 0) == true { return nil } return fmt.Errorf("unkown error") } -func UpdateTable(db ,schema ,tableName ,IgnoreTable string) error{ - if _,ok:=DbList[db];!ok{ - return fmt.Errorf(db+" not exsit") +func UpdateTable(db, schema, tableName, IgnoreTable string, DoTable string) error { + if _, ok := DbList[db]; !ok { + return fmt.Errorf(db + " not exsit") } - if DbList[db].UpdateTable(schema,tableName,IgnoreTable) == true{ + if DbList[db].UpdateTable(schema, tableName, IgnoreTable, DoTable) == true { return nil } return fmt.Errorf("unkown error") } - - -func DelTable(db, schema, tableName string) error{ - if _,ok:=DbList[db];!ok{ - return fmt.Errorf(db+"not exsit") +func DelTable(db, schema, tableName string) error { + if _, ok := DbList[db]; !ok { + return fmt.Errorf(db + "not exsit") } - DbList[db].DelTable(schema,tableName) + DbList[db].DelTable(schema, tableName) return nil } -func AddTableToServer(db ,schemaName ,tableName string,ToServerInfo ToServer) error{ - if _,ok:=DbList[db];!ok{ - return fmt.Errorf(db+"not exsit") +func AddTableToServer(db, schemaName, tableName string, ToServerInfo ToServer) error { + if _, ok := DbList[db]; !ok { + return fmt.Errorf(db + "not exsit") } - key := GetSchemaAndTableJoin(schemaName,tableName) + key := GetSchemaAndTableJoin(schemaName, tableName) if _, ok := DbList[db].tableMap[key]; !ok { - return fmt.Errorf(key+" not exsit") + return fmt.Errorf(key + " not exsit") } else { - DbList[db].AddTableToServer(schemaName,tableName,&ToServerInfo) + DbList[db].AddTableToServer(schemaName, tableName, &ToServerInfo) } return nil -} \ No newline at end of file +}