Skip to content

Commit

Permalink
Fix issues #13, 43,44,45,46,47,49
Browse files Browse the repository at this point in the history
  • Loading branch information
huyntsgs committed Dec 15, 2018
1 parent c4727ee commit 78d47c6
Show file tree
Hide file tree
Showing 5 changed files with 283 additions and 92 deletions.
75 changes: 40 additions & 35 deletions coinjoin/coinjoin.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ import (

pb "github.com/decred/dcrwallet/dcrtxclient/api/messages"
"github.com/decred/dcrwallet/dcrtxclient/finitefield"
"github.com/decred/dcrwallet/dcrtxclient/messages"
"github.com/decred/dcrwallet/dcrtxclient/util"
"github.com/decred/dcrwallet/dcrtxclient/messages"
)

const (
Expand Down Expand Up @@ -96,21 +95,22 @@ type (
mu sync.Mutex
Peers map[uint32]*PeerInfo
NewPeerChan chan *PeerInfo
Timeout *time.Timer
WillStart bool
}

DiceMix struct {
sessionTicker *time.Ticker
Sessions map[uint32]*JoinSession
config *Config
TimerChan <-chan time.Time
Sessions map[uint32]*JoinSession
NewPeerChan chan *PeerInfo
config *Config
WillStart bool
}

Config struct {
MinParticipants int
RandomIndex bool
JoinTicker int
RoundTimeOut int
ServerPublish bool
}
)

Expand Down Expand Up @@ -140,32 +140,21 @@ func (peer *PeerInfo) ResetData(id, sessionId uint32) {
// Run does join transaction in every 2 minutes (setting in config file).
// If there is enough peers for join transaction, creates new join session.
func (diceMix *DiceMix) Run(joinQueue *JoinQueue) {
startTime := false

for {
select {
case peer := <-joinQueue.NewPeerChan:
joinQueue.AddNewPeer(peer)

case <-diceMix.sessionTicker.C:
timeStartJoin := time.Now().Add(time.Second * time.Duration(diceMix.config.JoinTicker))
case <-diceMix.TimerChan:
joinQueue.mu.Lock()
queueSize := len(joinQueue.Peers)
if queueSize == 0 {
//log.Info("Zero participant connected")
if startTime {
log.Info("Will start next join session at", util.GetTimeString(timeStartJoin))
}
continue
}

if queueSize < diceMix.config.MinParticipants {
log.Infof("Number participants %d, will wait for minimum %d", queueSize, diceMix.config.MinParticipants)
if startTime {
log.Info("Will start next join session at", util.GetTimeString(timeStartJoin))
}
diceMix.WillStart = false
joinQueue.mu.Unlock()
continue
}
//log.Info("Will start next join session at", util.GetTimeString(timeStartJoin))
startTime = true
joinQueue.mu.Lock()
sessionId := GenId()
joinSession := NewJoinSession(sessionId, diceMix.config.RoundTimeOut)
log.Infof("Start coin join transaction - sessionId %v", sessionId)
Expand Down Expand Up @@ -193,10 +182,11 @@ func (diceMix *DiceMix) Run(joinQueue *JoinQueue) {

// Init new queue for next incoming peers
joinQueue.Peers = make(map[uint32]*PeerInfo)
diceMix.WillStart = false
joinQueue.mu.Unlock()

// Run the join session
joinSession.Config = &Config{RoundTimeOut: diceMix.config.RoundTimeOut}
joinSession.Config = &Config{RoundTimeOut: diceMix.config.RoundTimeOut, ServerPublish: diceMix.config.ServerPublish}
go joinSession.run()

}
Expand All @@ -206,13 +196,14 @@ func (diceMix *DiceMix) Run(joinQueue *JoinQueue) {
// Run does join transaction in every 2 minutes (setting in config file).
// If there is enough peers for join transaction, creates new join session.
func (joinQueue *JoinQueue) Run(cfg Config) {
joinQueue.mu.Lock()
time.Sleep(time.Second * time.Duration(cfg.JoinTicker))
queueSize := len(joinQueue.Peers)
if queueSize < cfg.MinParticipants {
if queueSize < cfg.MinParticipants {
joinQueue.mu.Unlock()
return
}

joinQueue.mu.Lock()
sessionId := GenId()
joinSession := NewJoinSession(sessionId, cfg.RoundTimeOut)
log.Infof("Start coin join transaction - sessionId %v", sessionId)
Expand All @@ -239,12 +230,11 @@ func (joinQueue *JoinQueue) Run(cfg Config) {
}

// Init new queue for next incoming peers
joinQueue.Peers = make(map[uint32]*PeerInfo)
joinQueue.WillStart = false
joinQueue.Peers = make(map[uint32]*PeerInfo)
joinQueue.mu.Unlock()

// Run the join session
joinSession.Config = &Config{RoundTimeOut: cfg.RoundTimeOut}
joinSession.Config = &Config{RoundTimeOut: cfg.RoundTimeOut, ServerPublish: cfg.ServerPublish}
go joinSession.run()
}

Expand Down Expand Up @@ -466,12 +456,12 @@ func (peer *PeerInfo) ReadMessages() {
continue
}

peer.JoinSession.mu.Lock()
message, err := messages.ParseMessage(data)
if err != nil {
log.Errorf("Can not parse data from websocket: %v", err)
continue
}
peer.JoinSession.mu.Lock()

// Check message type, forwarding the message data to corresponding channel.
switch message.MsgType {
Expand All @@ -480,12 +470,14 @@ func (peer *PeerInfo) ReadMessages() {
// Peer sent data invalid state
log.Infof("Current join session state is %s. Peer id %d has sent invalid state: StateKeyExchange",
peer.JoinSession.getStateString(), peer.Id)
peer.JoinSession.mu.Unlock()
continue
}
keyex := &pb.KeyExchangeReq{}
err := proto.Unmarshal(message.Data, keyex)
if err != nil {
log.Errorf("Can not unmarshal KeyExchangeReq: %v", err)
peer.JoinSession.mu.Unlock()
peer.JoinSession.removePeer(peer.Id)
continue
}
Expand All @@ -497,12 +489,14 @@ func (peer *PeerInfo) ReadMessages() {
// Peer sent data invalid state
log.Infof("Current join session state is %s. Peer id %d has sent invalid state: StateDcExponential",
peer.JoinSession.getStateString(), peer.Id)
peer.JoinSession.mu.Unlock()
continue
}
dcExpVector := &pb.DcExpVector{}
err := proto.Unmarshal(message.Data, dcExpVector)
if err != nil {
log.Errorf("Can not unmarshal DcExpVector: %v", err)
peer.JoinSession.mu.Unlock()
peer.JoinSession.pushMaliciousInfo([]uint32{peer.Id})
continue
}
Expand All @@ -514,12 +508,14 @@ func (peer *PeerInfo) ReadMessages() {
// Peer sent data invalid state
log.Infof("Current join session state is %s. Peer id %d has sent invalid state: StateDcXor",
peer.JoinSession.getStateString(), peer.Id)
peer.JoinSession.mu.Unlock()
continue
}
dcXorVector := &pb.DcXorVector{}
err := proto.Unmarshal(message.Data, dcXorVector)
if err != nil {
log.Errorf("Can not unmarshal DcExpVector: %v", err)
peer.JoinSession.mu.Unlock()
peer.JoinSession.pushMaliciousInfo([]uint32{peer.Id})
continue
}
Expand All @@ -531,12 +527,14 @@ func (peer *PeerInfo) ReadMessages() {
// Peer sent data invalid state
log.Infof("Current join session state is %s. Peer id %d has sent invalid state: StateTxInput",
peer.JoinSession.getStateString(), peer.Id)
peer.JoinSession.mu.Unlock()
continue
}
txins := &pb.TxInputs{}
err := proto.Unmarshal(message.Data, txins)
if err != nil {
log.Errorf("Can not unmarshal TxInputs: %v", err)
peer.JoinSession.mu.Unlock()
peer.JoinSession.pushMaliciousInfo([]uint32{peer.Id})
continue
}
Expand All @@ -548,12 +546,14 @@ func (peer *PeerInfo) ReadMessages() {
// Peer sent data invalid state
log.Infof("Current join session state is %s. Peer id %d has sent invalid state: StateTxSign",
peer.JoinSession.getStateString(), peer.Id)
peer.JoinSession.mu.Unlock()
continue
}
signTx := &pb.JoinTx{}
err := proto.Unmarshal(message.Data, signTx)
if err != nil {
log.Errorf("Can not unmarshal JoinTx: %v", err)
peer.JoinSession.mu.Unlock()
peer.JoinSession.pushMaliciousInfo([]uint32{peer.Id})
continue
}
Expand All @@ -565,35 +565,40 @@ func (peer *PeerInfo) ReadMessages() {
// Peer sent data invalid state
log.Infof("Current join session state is %s. Peer id %d has sent invalid state: StateTxPublish",
peer.JoinSession.getStateString(), peer.Id)
peer.JoinSession.mu.Unlock()
continue
}
pubResult := &pb.PublishResult{}
pubResult.PeerId = peer.Id
if peer.Id != peer.JoinSession.Publisher {
log.Debugf("peer %d is not publisher %d", peer.Id, peer.JoinSession.Publisher)
peer.JoinSession.mu.Unlock()
continue
}
err := proto.Unmarshal(message.Data, pubResult)
if err != nil {
log.Errorf("Can not unmarshal PublishResult: %v", err)
peer.JoinSession.mu.Unlock()
peer.JoinSession.pushMaliciousInfo([]uint32{peer.Id})
continue
}
peer.JoinSession.mu.Unlock()
peer.JoinSession.txPublishResultChan <- *pubResult
case messages.C_REVEAL_SECRET:
workState := peer.JoinSession.State == StateDcExponential || peer.JoinSession.State == StateDcXor ||
peer.JoinSession.State == StateTxInput || peer.JoinSession.State == StateRevealSecret
workState := (peer.JoinSession.State == StateDcExponential || peer.JoinSession.State == StateDcXor ||
peer.JoinSession.State == StateTxInput || peer.JoinSession.State == StateRevealSecret)
if !workState {
// Peer sent invalid data with state
log.Infof("Current join session state is %s. Peer id %d has sent invalid state: StateRevealSecret",
peer.JoinSession.getStateString(), peer.Id)
peer.JoinSession.mu.Unlock()
continue
}
rs := &pb.RevealSecret{}
err := proto.Unmarshal(message.Data, rs)
if err != nil {
log.Errorf("Can not unmarshal reveal secrect key: %v", err)
peer.JoinSession.mu.Unlock()
peer.JoinSession.pushMaliciousInfo([]uint32{peer.Id})
continue
}
Expand All @@ -605,6 +610,7 @@ func (peer *PeerInfo) ReadMessages() {
// Peer sent invalid data with state
log.Infof("Current join session state is %s. Peer id %d has sent invalid state: StateMsgNotFound",
peer.JoinSession.getStateString(), peer.Id)
peer.JoinSession.mu.Unlock()
continue
}
if peer.JoinSession.maliciousFinding {
Expand All @@ -614,6 +620,7 @@ func (peer *PeerInfo) ReadMessages() {
err := proto.Unmarshal(message.Data, msg)
if err != nil {
log.Errorf("Can not proto.Unmarshal: %v", err)
peer.JoinSession.mu.Unlock()
peer.JoinSession.pushMaliciousInfo([]uint32{peer.Id})
continue
}
Expand All @@ -622,8 +629,6 @@ func (peer *PeerInfo) ReadMessages() {
peer.JoinSession.maliciousFinding = true
peer.JoinSession.mu.Unlock()
peer.JoinSession.msgNotFoundChan <- *msg
default:
peer.JoinSession.mu.Unlock()
}
}
}
Expand Down
Loading

0 comments on commit 78d47c6

Please sign in to comment.