diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index b16c0cc19445..ae2076a9b076 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -660,8 +660,15 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { if srv.Cfg.EnableLeaseCheckpoint { // setting checkpointer enables lease checkpoint feature. - srv.lessor.SetCheckpointer(func(ctx context.Context, cp *pb.LeaseCheckpointRequest) { + srv.lessor.SetCheckpointer(func(ctx context.Context, cp *pb.LeaseCheckpointRequest) error { + if !srv.ensureLeadership() { + srv.lg.Warn("Ignore the checkpoint request because current member isn't a leader", + zap.Uint64("local-member-id", uint64(srv.ID()))) + return lease.ErrNotPrimary + } + srv.raftRequestOnce(ctx, pb.InternalRaftRequest{LeaseCheckpoint: cp}) + return nil }) } @@ -1145,7 +1152,19 @@ func (s *EtcdServer) run() { func (s *EtcdServer) revokeExpiredLeases(leases []*lease.Lease) { s.GoAttach(func() { + // We shouldn't revoke any leases if current member isn't a leader, + // because the operation should only be performed by the leader. When + // the leader gets blocked on the raft loop, such as writing WAL entries, + // it can't process any events or messages from raft. It may think it + // is still the leader even the leader has already changed. + // Refer to https://github.com/etcd-io/etcd/issues/15247 lg := s.Logger() + if !s.ensureLeadership() { + lg.Warn("Ignore the lease revoking request because current member isn't a leader", + zap.Uint64("local-member-id", uint64(s.ID()))) + return + } + // Increases throughput of expired leases deletion process through parallelization c := make(chan struct{}, maxPendingRevokes) for _, curLease := range leases { @@ -1178,6 +1197,29 @@ func (s *EtcdServer) revokeExpiredLeases(leases []*lease.Lease) { }) } +// ensureLeadership checks whether current member is still the leader. +func (s *EtcdServer) ensureLeadership() bool { + lg := s.Logger() + + ctx, cancel := context.WithTimeout(s.ctx, s.Cfg.ReqTimeout()) + defer cancel() + if err := s.linearizableReadNotify(ctx); err != nil { + lg.Warn("Failed to check current member's leadership", + zap.Error(err)) + return false + } + + newLeaderId := s.raftStatus().Lead + if newLeaderId != uint64(s.ID()) { + lg.Warn("Current member isn't a leader", + zap.Uint64("local-member-id", uint64(s.ID())), + zap.Uint64("new-lead", newLeaderId)) + return false + } + + return true +} + // Cleanup removes allocated objects by EtcdServer.NewServer in // situation that EtcdServer::Start was not called (that takes care of cleanup). func (s *EtcdServer) Cleanup() { diff --git a/server/etcdserver/v3_server.go b/server/etcdserver/v3_server.go index 9f69b86b9b10..b6e7a8067970 100644 --- a/server/etcdserver/v3_server.go +++ b/server/etcdserver/v3_server.go @@ -297,6 +297,17 @@ func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error) { if s.isLeader() { + // If s.isLeader() returns true, but we fail to ensure the current + // member's leadership, there are a couple of possibilities: + // 1. current member gets stuck on writing WAL entries; + // 2. current member is in network isolation status; + // 3. current member isn't a leader anymore (possibly due to #1 above). + // In such case, we just return error to client, so that the client can + // switch to another member to continue the lease keep-alive operation. + if !s.ensureLeadership() { + return -1, lease.ErrNotPrimary + } + if err := s.waitAppliedIndex(); err != nil { return 0, err } diff --git a/server/lease/lessor.go b/server/lease/lessor.go index ff9cb2ca5e6e..abeeb09bf401 100644 --- a/server/lease/lessor.go +++ b/server/lease/lessor.go @@ -77,7 +77,7 @@ type RangeDeleter func() TxnDelete // Checkpointer permits checkpointing of lease remaining TTLs to the consensus log. Defined here to // avoid circular dependency with mvcc. -type Checkpointer func(ctx context.Context, lc *pb.LeaseCheckpointRequest) +type Checkpointer func(ctx context.Context, lc *pb.LeaseCheckpointRequest) error type LeaseID int64 @@ -423,7 +423,9 @@ func (le *lessor) Renew(id LeaseID) (int64, error) { // By applying a RAFT entry only when the remainingTTL is already set, we limit the number // of RAFT entries written per lease to a max of 2 per checkpoint interval. if clearRemainingTTL { - le.cp(context.Background(), &pb.LeaseCheckpointRequest{Checkpoints: []*pb.LeaseCheckpoint{{ID: int64(l.ID), Remaining_TTL: 0}}}) + if err := le.cp(context.Background(), &pb.LeaseCheckpointRequest{Checkpoints: []*pb.LeaseCheckpoint{{ID: int64(l.ID), Remaining_TTL: 0}}}); err != nil { + return -1, err + } } le.mu.Lock() @@ -659,7 +661,9 @@ func (le *lessor) checkpointScheduledLeases() { le.mu.Unlock() if len(cps) != 0 { - le.cp(context.Background(), &pb.LeaseCheckpointRequest{Checkpoints: cps}) + if err := le.cp(context.Background(), &pb.LeaseCheckpointRequest{Checkpoints: cps}); err != nil { + return + } } if len(cps) < maxLeaseCheckpointBatchSize { return diff --git a/server/lease/lessor_test.go b/server/lease/lessor_test.go index 0edebdadde80..06c2a8a96647 100644 --- a/server/lease/lessor_test.go +++ b/server/lease/lessor_test.go @@ -250,10 +250,11 @@ func TestLessorRenewWithCheckpointer(t *testing.T) { defer os.RemoveAll(dir) le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL}) - fakerCheckerpointer := func(ctx context.Context, cp *pb.LeaseCheckpointRequest) { + fakerCheckerpointer := func(ctx context.Context, cp *pb.LeaseCheckpointRequest) error { for _, cp := range cp.GetCheckpoints() { le.Checkpoint(LeaseID(cp.GetID()), cp.GetRemaining_TTL()) } + return nil } defer le.Stop() // Set checkpointer @@ -540,7 +541,7 @@ func TestLessorCheckpointScheduling(t *testing.T) { defer le.Stop() le.minLeaseTTL = 1 checkpointedC := make(chan struct{}) - le.SetCheckpointer(func(ctx context.Context, lc *pb.LeaseCheckpointRequest) { + le.SetCheckpointer(func(ctx context.Context, lc *pb.LeaseCheckpointRequest) error { close(checkpointedC) if len(lc.Checkpoints) != 1 { t.Errorf("expected 1 checkpoint but got %d", len(lc.Checkpoints)) @@ -549,6 +550,7 @@ func TestLessorCheckpointScheduling(t *testing.T) { if c.Remaining_TTL != 1 { t.Errorf("expected checkpoint to be called with Remaining_TTL=%d but got %d", 1, c.Remaining_TTL) } + return nil }) _, err := le.Grant(1, 2) if err != nil { diff --git a/tests/integration/v3_lease_test.go b/tests/integration/v3_lease_test.go index 27e8621ff134..e11c6f24692a 100644 --- a/tests/integration/v3_lease_test.go +++ b/tests/integration/v3_lease_test.go @@ -757,7 +757,7 @@ func TestV3LeaseFailover(t *testing.T) { // send keep alive to old leader until the old leader starts // to drop lease request. - var expectedExp time.Time + expectedExp := time.Now().Add(5 * time.Second) for { if err = lac.Send(lreq); err != nil { break