Skip to content

Commit

Permalink
当添加AllTabls 的时候 ,支持只指定同步某些表
Browse files Browse the repository at this point in the history
  • Loading branch information
jc3wish committed Oct 6, 2023
1 parent 8a90ff3 commit f7858b6
Show file tree
Hide file tree
Showing 8 changed files with 106 additions and 58 deletions.
19 changes: 10 additions & 9 deletions admin/controller/db_detail.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,16 @@ import (
func (c *DBController) Detail() {
DbName := c.Ctx.Request.Form.Get("DbName")
dbInfo := server.GetDBObj(DbName)
o :=inputDriver.Open(dbInfo.InputType,inputDriver.InputInfo{ConnectUri:dbInfo.ConnectUri})
DataBaseList,_ := o.GetSchemaList()
o := inputDriver.Open(dbInfo.InputType, inputDriver.InputInfo{ConnectUri: dbInfo.ConnectUri})
DataBaseList, _ := o.GetSchemaList()
DataBaseList = append(DataBaseList, "AllDataBases")

c.SetData("DbName", DbName)
c.SetData("DataBaseList", DataBaseList)
c.SetData("ToServerList", toserver.GetToServerMap())
c.SetData("ChannelList:", dbInfo.ListChannel())
c.SetData("Title", DbName+" - Detail")
c.AddAdminTemplate("db.detail.html","header.html","db.detail.table.add.html","db.detail.history.add.html","footer.html")
c.AddAdminTemplate("db.detail.html", "header.html", "db.detail.table.add.html", "db.detail.history.add.html", "footer.html")
}

func (c *DBController) GetTableFields() {
Expand All @@ -46,8 +46,8 @@ func (c *DBController) GetTableFields() {
TableName = tansferTableName(TableName)

dbInfo := server.GetDBObj(DbName)
o :=inputDriver.Open(dbInfo.InputType,inputDriver.InputInfo{ConnectUri:dbInfo.ConnectUri})
TableFieldsList,_ := o.GetSchemaTableFieldList(SchemaName, TableName)
o := inputDriver.Open(dbInfo.InputType, inputDriver.InputInfo{ConnectUri: dbInfo.ConnectUri})
TableFieldsList, _ := o.GetSchemaTableFieldList(SchemaName, TableName)
c.SetJsonData(TableFieldsList)
c.StopServeJSON()
}
Expand All @@ -56,17 +56,18 @@ func (c *DBController) TableList() {
DbName := c.Ctx.Request.Form.Get("DbName")
SchemaName := c.Ctx.Request.Form.Get("SchemaName")
DBObj := server.GetDBObj(DbName)
o :=inputDriver.Open(DBObj.InputType,inputDriver.InputInfo{ConnectUri:DBObj.ConnectUri})
o := inputDriver.Open(DBObj.InputType, inputDriver.InputInfo{ConnectUri: DBObj.ConnectUri})
type ResultType struct {
TableName string
ChannelName string
AddStatus bool
TableType string
IgnoreTable string
DoTable string
}
var data []ResultType
data = make([]ResultType, 0)
TableList,_ := o.GetSchemaTableList(SchemaName)
TableList, _ := o.GetSchemaTableList(SchemaName)
TableList = append(TableList, inputDriver.TableList{TableName: "AllTables", TableType: "LIKE"})
var schemaName0, tableName0 string
schemaName0 = tansferSchemaName(SchemaName)
Expand All @@ -83,7 +84,7 @@ func (c *DBController) TableList() {
if t2 == nil {
data = append(data, ResultType{TableName: tableName, ChannelName: "", AddStatus: false, TableType: tableType})
} else {
data = append(data, ResultType{TableName: tableName, ChannelName: t2.Name, AddStatus: true, TableType: tableType, IgnoreTable: t.IgnoreTable})
data = append(data, ResultType{TableName: tableName, ChannelName: t2.Name, AddStatus: true, TableType: tableType, IgnoreTable: t.IgnoreTable, DoTable: t.DoTable})
}
}
}
Expand All @@ -107,4 +108,4 @@ func (c *DBController) TableList() {
}
c.SetJsonData(data)
c.StopServeJSON()
}
}
5 changes: 3 additions & 2 deletions admin/controller/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type TableParam struct {
SchemaName string
TableName string
IgnoreTable string
DoTable string
ChannelId int
Id int
}
Expand Down Expand Up @@ -61,7 +62,7 @@ func (c *TableController) Add() {
}()
SchemaName := tansferSchemaName(param.SchemaName)
TableName := tansferTableName(param.TableName)
err := server.AddTable(param.DbName, SchemaName, TableName, param.IgnoreTable, param.ChannelId)
err := server.AddTable(param.DbName, SchemaName, TableName, param.IgnoreTable, param.DoTable, param.ChannelId)
if err != nil {
result.Msg = err.Error()
return
Expand All @@ -78,7 +79,7 @@ func (c *TableController) Update() {
}()
SchemaName := tansferSchemaName(param.SchemaName)
TableName := tansferTableName(param.TableName)
err := server.UpdateTable(param.DbName, SchemaName, TableName, param.IgnoreTable)
err := server.UpdateTable(param.DbName, SchemaName, TableName, param.IgnoreTable, param.DoTable)
if err != nil {
result.Msg = err.Error()
return
Expand Down
22 changes: 16 additions & 6 deletions admin/view/template/db.detail.html
Original file line number Diff line number Diff line change
Expand Up @@ -215,14 +215,24 @@ <h3 class="modal-title" id="showLikeTable_Title"></h3>
</div>
</div>
<script type="text/javascript">
function ShowLikeTableNames(tableName,ignoreTables) {
function ShowLikeTableNames(tableName,ignoreTables,doTables) {
var tableNames = getTablesByLikeName(tableName);
ignoreTables = ignoreTables+",";
var newIgnoreTables = ignoreTables+",";
var newDoTables = "";
if (doTables != "") {
newDoTables = doTables + ","
}
$("#showLikeTable_Title").text(tableName);
var html = "";
for (var name of tableNames.split(";")) {
if( ignoreTables.indexOf(name+",") < 0 ){
html += "<p>"+name+"</p>";
if (newDoTables != "") {
if( newDoTables.indexOf(name+",") >= 0 ){
html += "<p>"+name+"</p>";
}
}else{
if( newIgnoreTables.indexOf(name+",") < 0 ){
html += "<p>"+name+"</p>";
}
}
}
$("#showLikeTable_Body").html(html);
Expand Down Expand Up @@ -677,11 +687,11 @@ <h3 class="modal-title" id="showLikeTable_Title"></h3>
if (v.TableType.toUpperCase().indexOf("LIKE") != -1) {
if (v.AddStatus == true){
html += '<div class="right2">';
html += '<div class="button"><button data-toggle="button" class="btn-sm btn-warning" type="button" onclick="showUpdateTable(\''+DbName+'\',\''+SchemaName+'\',\''+v.TableName+'\',\''+v.IgnoreTable+'\')">修改</button></div>';
html += '<div class="button"><button data-toggle="button" class="btn-sm btn-warning" type="button" onclick="showUpdateTable(\''+DbName+'\',\''+SchemaName+'\',\''+v.TableName+'\',\''+v.IgnoreTable+'\',\''+v.DoTable+'\')">修改</button></div>';
html += '</div>';
}
html += '<div class="right1">';
html += '<div class="button"><button data-toggle="button" class="btn-sm btn-success" type="button" onclick="ShowLikeTableNames(\''+v.TableName+'\',\''+v.IgnoreTable+'\')">多表</button></div>';
html += '<div class="button"><button data-toggle="button" class="btn-sm btn-success" type="button" onclick="ShowLikeTableNames(\''+v.TableName+'\',\''+v.IgnoreTable+'\',\''+v.DoTable+'\')">多表</button></div>';
html += '</div>';
}
html += '<div class="right">';
Expand Down
34 changes: 30 additions & 4 deletions admin/view/template/db.detail.table.add.html
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,17 @@ <h3 class="modal-title">Add New Table</h3>
</div>
</td>
</tr>
<tr>
<td align="right" height="50" width="20%">DoTables : </td>
<td style="text-indent:10px" >
<textarea type="text" name="TableName" class="form-control" value="" id="AddTableDoTable" disabled></textarea>
<div style="word-wrap: break-word;word-break: normal; width: auto;">
<p>模糊匹配的时候,指定哪些表名进行匹配,多个用 逗号 隔开</p>
<p>只对增量有效,全量任务无效,全量任务,可以自行修改不查询哪些表</p>
<p>同时配置了IgnoreTables和DoTables,DoTables优先</p>
</div>
</td>
</tr>
<tr>
<td align="right" height="50">ChannelKey : </td>
<td style="text-indent:10px">
Expand Down Expand Up @@ -115,6 +126,17 @@ <h3 class="modal-title">Update Table ignoreTables </h3>
</div>
</td>
</tr>
<tr>
<td align="right" height="50" width="20%">DoTables : </td>
<td style="text-indent:10px" >
<textarea type="text" name="TableName" class="form-control" placeholder="DoTables" value="" id="updateTableDoTable"></textarea>
<div style="word-wrap: break-word;word-break: normal; width: auto;">
<p>模糊匹配的时候,指定哪些表名进行匹配,多个用 逗号 隔开</p>
<p>只对增量有效,全量任务无效,全量任务,可以自行修改不查询哪些表</p>
<p>同时配置了IgnoreTables和DoTables,DoTables优先</p>
</div>
</td>
</tr>
</table>

</div>
Expand Down Expand Up @@ -159,6 +181,7 @@ <h3 class="modal-title">Update Table ignoreTables </h3>
$("#AddTableIsFuzzyMatching").attr("disabled","disabled");
$("#addTableName").attr("disabled","disabled");
$('#AddTableIgnoreTable').removeAttr("disabled");
$('#AddTableDoTable').removeAttr("disabled");
}else{
$('#AddTableIsFuzzyMatching').removeAttr("disabled");
}
Expand Down Expand Up @@ -196,6 +219,7 @@ <h3 class="modal-title">Update Table ignoreTables </h3>
//var TableNames_old = $("#addTableNameOld").val();
var SchemaName = $("#addTableSchema").val();
var IgnoreTable = $("#AddTableIgnoreTable").val();
var DoTable = $("#AddTableDoTable").val();
var channelid = $("#AddTableChannel").val();
if ( TableNames == "" ){
return false;
Expand Down Expand Up @@ -242,7 +266,7 @@ <h3 class="modal-title">Update Table ignoreTables </h3>
html += '<a class="list-group-item"><div class="tableDiv" title="' + TableName + '" id="' + tableDivId + '">';
html += '<h5 class="left" ' + title + ' onClick="GetTableToServerList(\'' + SchemaName + '\',\'' + TableName + '\')">' + TableName + '</h5>';
html += '<div class="right1">';
html += '<div class="button"><button data-toggle="button" class="btn-sm btn-success" type="button" onclick="ShowLikeTableNames(\''+TableName+'\',\''+IgnoreTable+'\')">多表</button></div>';
html += '<div class="button"><button data-toggle="button" class="btn-sm btn-success" type="button" onclick="ShowLikeTableNames(\''+TableName+'\',\''+IgnoreTable+'\',\''+DoTable+'\')">多表</button></div>';
html += '</div>';
html += '<div class="right">';
html += '<div class="button"><button data-toggle="button" class="btn-sm btn-danger" type="button" onClick="DelTable(\''+TableName+'\')">DEL</button></div>';
Expand All @@ -263,18 +287,19 @@ <h3 class="modal-title">Update Table ignoreTables </h3>
}
batchDelOrAddTableResultFun(TableName +" ADD Result:"+data.msg,1);
}
Ajax("POST",url,{DbName: DbName, SchemaName: SchemaName, TableName: TableName, ChannelId: parseInt(channelid),IgnoreTable:IgnoreTable},callback,false);
Ajax("POST",url,{DbName: DbName, SchemaName: SchemaName, TableName: TableName, ChannelId: parseInt(channelid),IgnoreTable:IgnoreTable,DoTable:DoTable},callback,false);
}
batchDelOrAddTableResultFun("添加完成",-2);
$(obj).text("提交");
}


function showUpdateTable(DbName,SchemaName,TableName,IgnoreTable){
function showUpdateTable(DbName,SchemaName,TableName,IgnoreTable,DoTable){
$("#updateTableDbName").val(DbName);
$("#updateTableSchema").val(SchemaName);
$("#updateTableName").val(TableName);
$("#updateTableIgnoreTable").val(IgnoreTable);
$("#updateTableDoTable").val(DoTable);
$("#updateTableDiv").modal('show');
}

Expand All @@ -283,14 +308,15 @@ <h3 class="modal-title">Update Table ignoreTables </h3>
var SchemaName = $("#updateTableSchema").val();
var TableName = $("#updateTableName").val();
var IgnoreTable = $("#updateTableIgnoreTable").val();
var DoTable = $("#updateTableDoTable").val();
var url = "/table/update";
var callback = function(data){
alert(data.msg);
DataBaseMap.delete(SchemaName);
showSchemaTableList("Schema-"+SchemaName);
$("#updateTableDiv").modal('hide');
}
Ajax("POST",url,{DbName: DbName, SchemaName: SchemaName, TableName: TableName, IgnoreTable:IgnoreTable},callback,false);
Ajax("POST",url,{DbName: DbName, SchemaName: SchemaName, TableName: TableName, IgnoreTable:IgnoreTable,DoTable:DoTable},callback,false);
}

</script>
Expand Down
8 changes: 8 additions & 0 deletions server/channel_consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,14 @@ func (This *consume_channel_obj) consumeChannel() {

func (This *consume_channel_obj) checkIgnoreTable(t *Table, TableName string) bool {
This.db.RLock()
if len(t.doTableMap) > 0 {
if _, ok := t.doTableMap[TableName]; ok {
This.db.RUnlock()
return false
}
This.db.RUnlock()
return true
}
if _, ok := t.ignoreTableMap[TableName]; ok {
This.db.RUnlock()
return true
Expand Down
12 changes: 8 additions & 4 deletions server/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -632,7 +632,7 @@ func (db *db) IgnoreTableToMap(IgnoreTable string) map[string]bool {
return m
}

func (db *db) AddTable(schemaName string, tableName string, IgnoreTable string, ChannelKey int, LastToServerID int) bool {
func (db *db) AddTable(schemaName string, tableName string, IgnoreTable string, DoTable string, ChannelKey int, LastToServerID int) bool {
key := GetSchemaAndTableJoin(schemaName, tableName)
db.Lock()
defer db.Unlock()
Expand All @@ -644,28 +644,32 @@ func (db *db) AddTable(schemaName string, tableName string, IgnoreTable string,
ToServerList: make([]*ToServer, 0),
LastToServerID: LastToServerID,
likeTableList: make([]*Table, 0),
DoTable: DoTable,
doTableMap: db.IgnoreTableToMap(DoTable),
IgnoreTable: IgnoreTable,
ignoreTableMap: db.IgnoreTableToMap(IgnoreTable),
}
db.addLikeTable(db.tableMap[key], schemaName, tableName)
log.Println("AddTable", db.Name, schemaName, tableName, db.channelMap[ChannelKey].Name, " IgnoreTable:", IgnoreTable)
log.Println("AddTable", db.Name, schemaName, tableName, db.channelMap[ChannelKey].Name, " IgnoreTable:", IgnoreTable, "DoTable:", DoTable)
count.SetTable(db.Name, key)
}
return true
}

// 修改 模糊匹配的表规则 需要过滤哪些表不进行匹配
func (db *db) UpdateTable(schemaName string, tableName string, IgnoreTable string) bool {
func (db *db) UpdateTable(schemaName string, tableName string, IgnoreTable string, DoTable string) bool {
key := GetSchemaAndTableJoin(schemaName, tableName)
db.Lock()
defer db.Unlock()
if _, ok := db.tableMap[key]; !ok {
log.Println("UpdateTable ", db.Name, schemaName, tableName, " not exsit ")
return false
}
db.tableMap[key].DoTable = DoTable
db.tableMap[key].doTableMap = db.IgnoreTableToMap(DoTable)
db.tableMap[key].IgnoreTable = IgnoreTable
db.tableMap[key].ignoreTableMap = db.IgnoreTableToMap(IgnoreTable)
log.Println("UpdateTable", db.Name, schemaName, tableName, "IgnoreTable:", IgnoreTable)
log.Println("UpdateTable", db.Name, schemaName, tableName, "IgnoreTable:", IgnoreTable, "DoTable:", DoTable)
return true
}

Expand Down
2 changes: 1 addition & 1 deletion server/recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ func recoveryData(data map[string]dbSaveInfo, isStop bool) {
}

schemaName, tableName := GetSchemaAndTableBySplit(tInfo.key)
db.AddTable(schemaName, tableName, tInfo.IgnoreTable, channelIDMap[tInfo.ChannelKey], tInfo.LastToServerID)
db.AddTable(schemaName, tableName, tInfo.IgnoreTable, tInfo.DoTable, channelIDMap[tInfo.ChannelKey], tInfo.LastToServerID)
for _, toServer := range tInfo.ToServerList {
toServerBinlogPositionFromDB, _ := getBinlogPosition(getToServerBinlogkey(db, toServer))
var toServerBinlog *PositionStruct
Expand Down
62 changes: 30 additions & 32 deletions server/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,60 +20,58 @@ import (
"sync"
)


type Table struct {
sync.RWMutex
key string // schema+table 组成的key
Name string
ChannelKey int
LastToServerID int
ToServerList []*ToServer
likeTableList []*Table // 关联了哪些 模糊匹配的配置
regexpErr bool // 是否执行正则表达式错误,如果 true,则下一次不会再执行,直接错过
IgnoreTable string // 假如是模糊匹配的时候,指定某些表不进行匹配,逗号隔开
ignoreTableMap map[string]bool // 指定某些表不进行匹配的表数据 map 格式
key string // schema+table 组成的key
Name string
ChannelKey int
LastToServerID int
ToServerList []*ToServer
likeTableList []*Table // 关联了哪些 模糊匹配的配置
regexpErr bool // 是否执行正则表达式错误,如果 true,则下一次不会再执行,直接错过
IgnoreTable string // 假如是模糊匹配的时候,指定某些表不进行匹配,逗号隔开
ignoreTableMap map[string]bool // 指定某些表不进行匹配的表数据 map 格式
DoTable string
doTableMap map[string]bool // 指定某些表进行匹配的表数据 map 格式
}


func AddTable(db ,schema ,tableName ,IgnoreTable string,channelId int) error{
if _,ok:=DbList[db];!ok{
return fmt.Errorf(db+" not exsit")
func AddTable(db, schema, tableName, IgnoreTable string, DoTable string, channelId int) error {
if _, ok := DbList[db]; !ok {
return fmt.Errorf(db + " not exsit")
}
if DbList[db].AddTable(schema,tableName,IgnoreTable,channelId,0) == true{
if DbList[db].AddTable(schema, tableName, IgnoreTable, DoTable, channelId, 0) == true {
return nil
}
return fmt.Errorf("unkown error")
}

func UpdateTable(db ,schema ,tableName ,IgnoreTable string) error{
if _,ok:=DbList[db];!ok{
return fmt.Errorf(db+" not exsit")
func UpdateTable(db, schema, tableName, IgnoreTable string, DoTable string) error {
if _, ok := DbList[db]; !ok {
return fmt.Errorf(db + " not exsit")
}
if DbList[db].UpdateTable(schema,tableName,IgnoreTable) == true{
if DbList[db].UpdateTable(schema, tableName, IgnoreTable, DoTable) == true {
return nil
}
return fmt.Errorf("unkown error")
}



func DelTable(db, schema, tableName string) error{
if _,ok:=DbList[db];!ok{
return fmt.Errorf(db+"not exsit")
func DelTable(db, schema, tableName string) error {
if _, ok := DbList[db]; !ok {
return fmt.Errorf(db + "not exsit")
}
DbList[db].DelTable(schema,tableName)
DbList[db].DelTable(schema, tableName)
return nil
}

func AddTableToServer(db ,schemaName ,tableName string,ToServerInfo ToServer) error{
if _,ok:=DbList[db];!ok{
return fmt.Errorf(db+"not exsit")
func AddTableToServer(db, schemaName, tableName string, ToServerInfo ToServer) error {
if _, ok := DbList[db]; !ok {
return fmt.Errorf(db + "not exsit")
}
key := GetSchemaAndTableJoin(schemaName,tableName)
key := GetSchemaAndTableJoin(schemaName, tableName)
if _, ok := DbList[db].tableMap[key]; !ok {
return fmt.Errorf(key+" not exsit")
return fmt.Errorf(key + " not exsit")
} else {
DbList[db].AddTableToServer(schemaName,tableName,&ToServerInfo)
DbList[db].AddTableToServer(schemaName, tableName, &ToServerInfo)
}
return nil
}
}

0 comments on commit f7858b6

Please sign in to comment.