for {
select {
case <-s.Ticker:
n.Tick()
case rd := <-s.Node.Ready():
saveToStorage(rd.HardState, rd.Entries, rd.Snapshot)
send(rd.Messages)
if !raft.IsEmptySnap(rd.Snapshot) {
processSnapshot(rd.Snapshot)
}
for _, entry := range rd.CommittedEntries {
process(entry)
if entry.Type == raftpb.EntryConfChange {
var cc raftpb.ConfChange
cc.Unmarshal(entry.Data)
s.Node.ApplyConfChange(cc)
}
}
s.Node.Advance()
case <-s.done:
return
}
}
type raftNode struct {
lg *zap.Logger
tickMu *sync.Mutex
raftNodeConfig
// a chan to send/receive snapshot
msgSnapC chan raftpb.Message
// a chan to send out apply
applyc chan apply
// a chan to send out readState
readStateC chan raft.ReadState
// utility
ticker *time.Ticker
// contention detectors for raft heartbeat message
td *contention.TimeoutDetector
stopped chan struct{}
done chan struct{}
}
type raftNodeConfig struct {
lg *zap.Logger
// to check if msg receiver is removed from cluster
isIDRemoved func(id uint64) bool
raft.Node
raftStorage *raft.MemoryStorage
storage Storage
heartbeat time.Duration // for logging
// transport specifies the transport to send and receive msgs to members.
// Sending messages MUST NOT block. It is okay to drop messages, since
// clients should timeout and reissue their messages.
// If transport is nil, server will panic.
transport rafthttp.Transporter
}
// start prepares and starts raftNode in a new goroutine. It is no longer safe
// to modify the fields after it has been started.
func (r *raftNode) start(rh *raftReadyHandler) {
internalTimeout := time.Second
go func() {
defer r.onStop()
islead := false
for {
select {
case <-r.ticker.C:
r.tick()
case rd := <-r.Ready():
if rd.SoftState != nil {
newLeader := rd.SoftState.Lead != raft.None && rh.getLead() != rd.SoftState.Lead
if newLeader {
leaderChanges.Inc()
}
if rd.SoftState.Lead == raft.None {
hasLeader.Set(0)
} else {
hasLeader.Set(1)
}
rh.updateLead(rd.SoftState.Lead)
islead = rd.RaftState == raft.StateLeader
if islead {
isLeader.Set(1)
} else {
isLeader.Set(0)
}
rh.updateLeadership(newLeader)
r.td.Reset()
}
if len(rd.ReadStates) != 0 {
select {
case r.readStateC <- rd.ReadStates[len(rd.ReadStates)-1]:
case <-time.After(internalTimeout):
r.lg.Warn("timed out sending read state", zap.Duration("timeout", internalTimeout))
case <-r.stopped:
return
}
}
notifyc := make(chan struct{}, 1)
ap := apply{
entries: rd.CommittedEntries,
snapshot: rd.Snapshot,
notifyc: notifyc,
}
updateCommittedIndex(&ap, rh)
waitWALSync := shouldWaitWALSync(rd)
if waitWALSync {
// gofail: var raftBeforeSaveWaitWalSync struct{}
if err := r.storage.Save(rd.HardState, rd.Entries); err != nil {
r.lg.Fatal("failed to save Raft hard state and entries", zap.Error(err))
}
}
select {
case r.applyc <- ap:
case <-r.stopped:
return
}
// the leader can write to its disk in parallel with replicating to the followers and them
// writing to their disks.
// For more details, check raft thesis 10.2.1
if islead {
// gofail: var raftBeforeLeaderSend struct{}
r.transport.Send(r.processMessages(rd.Messages))
}
// Must save the snapshot file and WAL snapshot entry before saving any other entries or hardstate to
// ensure that recovery after a snapshot restore is possible.
if !raft.IsEmptySnap(rd.Snapshot) {
// gofail: var raftBeforeSaveSnap struct{}
if err := r.storage.SaveSnap(rd.Snapshot); err != nil {
r.lg.Fatal("failed to save Raft snapshot", zap.Error(err))
}
// gofail: var raftAfterSaveSnap struct{}
}
if !waitWALSync {
// gofail: var raftBeforeSave struct{}
if err := r.storage.Save(rd.HardState, rd.Entries); err != nil {
r.lg.Fatal("failed to save Raft hard state and entries", zap.Error(err))
}
}
if !raft.IsEmptyHardState(rd.HardState) {
proposalsCommitted.Set(float64(rd.HardState.Commit))
}
// gofail: var raftAfterSave struct{}
if !raft.IsEmptySnap(rd.Snapshot) {
// Force WAL to fsync its hard state before Release() releases
// old data from the WAL. Otherwise could get an error like:
// panic: tocommit(107) is out of range [lastIndex(84)]. Was the raft log corrupted, truncated, or lost?
// See https://github.com/etcd-io/etcd/issues/10219 for more details.
if err := r.storage.Sync(); err != nil {
r.lg.Fatal("failed to sync Raft snapshot", zap.Error(err))
}
// etcdserver now claim the snapshot has been persisted onto the disk
notifyc <- struct{}{}
// gofail: var raftBeforeApplySnap struct{}
r.raftStorage.ApplySnapshot(rd.Snapshot)
r.lg.Info("applied incoming Raft snapshot", zap.Uint64("snapshot-index", rd.Snapshot.Metadata.Index))
// gofail: var raftAfterApplySnap struct{}
if err := r.storage.Release(rd.Snapshot); err != nil {
r.lg.Fatal("failed to release Raft wal", zap.Error(err))
}
// gofail: var raftAfterWALRelease struct{}
}
r.raftStorage.Append(rd.Entries)
if !islead {
// finish processing incoming messages before we signal raftdone chan
msgs := r.processMessages(rd.Messages)
// now unblocks 'applyAll' that waits on Raft log disk writes before triggering snapshots
notifyc <- struct{}{}
// Candidate or follower needs to wait for all pending configuration
// changes to be applied before sending messages.
// Otherwise we might incorrectly count votes (e.g. votes from removed members).
// Also slow machine's follower raft-layer could proceed to become the leader
// on its own single-node cluster, before apply-layer applies the config change.
// We simply wait for ALL pending entries to be applied for now.
// We might improve this later on if it causes unnecessary long blocking issues.
waitApply := false
for _, ent := range rd.CommittedEntries {
if ent.Type == raftpb.EntryConfChange {
waitApply = true
break
}
}
if waitApply {
// blocks until 'applyAll' calls 'applyWait.Trigger'
// to be in sync with scheduled config-change job
// (assume notifyc has cap of 1)
select {
case notifyc <- struct{}{}:
case <-r.stopped:
return
}
}
// gofail: var raftBeforeFollowerSend struct{}
r.transport.Send(msgs)
} else {
// leader already processed 'MsgSnap' and signaled
notifyc <- struct{}{}
}
r.Advance()
case <-r.stopped:
return
}
}
}()
}
for {
select {
case ap := <-s.r.apply():
f := func(context.Context) { s.applyAll(&ep, &ap) }
sched.Schedule(f)
case leases := <-expiredLeaseC:
s.revokeExpiredLeases(leases)
case err := <-s.errorc:
lg.Warn("server error", zap.Error(err))
lg.Warn("data-dir used by this member must be removed")
return
case <-getSyncC():
if s.v2store.HasTTLKeys() {
s.sync(s.Cfg.ReqTimeout())
}
case <-s.stop:
return
}
}
type Transporter interface {
// Start starts the given Transporter.
// Start MUST be called before calling other functions in the interface.
Start() error
// Handler returns the HTTP handler of the transporter.
// A transporter HTTP handler handles the HTTP requests
// from remote peers.
// The handler MUST be used to handle RaftPrefix(/raft)
// endpoint.
Handler() http.Handler
// Send sends out the given messages to the remote peers.
// Each message has a To field, which is an id that maps
// to an existing peer in the transport.
// If the id cannot be found in the transport, the message
// will be ignored.
Send(m []raftpb.Message)
// SendSnapshot sends out the given snapshot message to a remote peer.
// The behavior of SendSnapshot is similar to Send.
SendSnapshot(m snap.Message)
// AddRemote adds a remote with given peer urls into the transport.
// A remote helps newly joined member to catch up the progress of cluster,
// and will not be used after that.
// It is the caller's responsibility to ensure the urls are all valid,
// or it panics.
AddRemote(id types.ID, urls []string)
// AddPeer adds a peer with given peer urls into the transport.
// It is the caller's responsibility to ensure the urls are all valid,
// or it panics.
// Peer urls are used to connect to the remote peer.
AddPeer(id types.ID, urls []string)
// RemovePeer removes the peer with given id.
RemovePeer(id types.ID)
// RemoveAllPeers removes all the existing peers in the transport.
RemoveAllPeers()
// UpdatePeer updates the peer urls of the peer with the given id.
// It is the caller's responsibility to ensure the urls are all valid,
// or it panics.
UpdatePeer(id types.ID, urls []string)
// ActiveSince returns the time that the connection with the peer
// of the given id becomes active.
// If the connection is active since peer was added, it returns the adding time.
// If the connection is currently inactive, it returns zero time.
ActiveSince(id types.ID) time.Time
// ActivePeers returns the number of active peers.
ActivePeers() int
// Stop closes the connections and stops the transporter.
Stop()
}
// Transport implements Transporter interface. It provides the functionality
// to send raft messages to peers, and receive raft messages from peers.
// User should call Handler method to get a handler to serve requests
// received from peerURLs.
// User needs to call Start before calling other functions, and call
// Stop when the Transport is no longer used.
type Transport struct {
Logger *zap.Logger
DialTimeout time.Duration // maximum duration before timing out dial of the request
// DialRetryFrequency defines the frequency of streamReader dial retrial attempts;
// a distinct rate limiter is created per every peer (default value: 10 events/sec)
DialRetryFrequency rate.Limit
TLSInfo transport.TLSInfo // TLS information used when creating connection
ID types.ID // local member ID
URLs types.URLs // local peer URLs
ClusterID types.ID // raft cluster ID for request validation
Raft Raft // raft state machine, to which the Transport forwards received messages and reports status
Snapshotter *snap.Snapshotter
ServerStats *stats.ServerStats // used to record general transportation statistics
// used to record transportation statistics with followers when
// performing as leader in raft protocol
LeaderStats *stats.LeaderStats
// ErrorC is used to report detected critical errors, e.g.,
// the member has been permanently removed from the cluster
// When an error is received from ErrorC, user should stop raft state
// machine and thus stop the Transport.
ErrorC chan error
streamRt http.RoundTripper // roundTripper used by streams
pipelineRt http.RoundTripper // roundTripper used by pipelines
mu sync.RWMutex // protect the remote and peer map
remotes map[types.ID]*remote // remotes map that helps newly joined member to catch up
peers map[types.ID]Peer // peers map
pipelineProber probing.Prober
streamProber probing.Prober
}
type Raft interface {
Process(ctx context.Context, m raftpb.Message) error
IsIDRemoved(id uint64) bool
ReportUnreachable(id uint64)
ReportSnapshot(id uint64, status raft.SnapshotStatus)
}
func (s *EtcdServer) RaftHandler() http.Handler { return s.r.transport.Handler() }
// Process takes a raft message and applies it to the server's raft state
// machine, respecting any timeout of the given context.
func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
lg := s.Logger()
if s.cluster.IsIDRemoved(types.ID(m.From)) {
lg.Warn(
"rejected Raft message from removed member",
zap.String("local-member-id", s.ID().String()),
zap.String("removed-member-id", types.ID(m.From).String()),
)
return httptypes.NewHTTPError(http.StatusForbidden, "cannot process message from removed member")
}
if m.Type == raftpb.MsgApp {
s.stats.RecvAppendReq(types.ID(m.From).String(), m.Size())
}
return s.r.Step(ctx, m)
}
func (s *EtcdServer) IsIDRemoved(id uint64) bool { return s.cluster.IsIDRemoved(types.ID(id)) }
func (s *EtcdServer) ReportUnreachable(id uint64) { s.r.ReportUnreachable(id) }
}
// ReportSnapshot reports snapshot sent status to the raft state machine,
// and clears the used snapshot from the snapshot store.
func (s *EtcdServer) ReportSnapshot(id uint64, status raft.SnapshotStatus) {
s.r.ReportSnapshot(id, status)
}
// newStreamRoundTripper returns a roundTripper used to send stream requests
// to rafthttp listener of remote peers.
// Read/write timeout is set for stream roundTripper to promptly
// find out broken status, which minimizes the number of messages
// sent on broken connection.
func newStreamRoundTripper(tlsInfo transport.TLSInfo, dialTimeout time.Duration) (http.RoundTripper, error) {
return transport.NewTimeoutTransport(tlsInfo, dialTimeout, ConnReadTimeout, ConnWriteTimeout)
}
// NewTimeoutTransport returns a transport created using the given TLS info.
// If read/write on the created connection blocks longer than its time limit,
// it will return timeout error.
// If read/write timeout is set, transport will not be able to reuse connection.
func NewTimeoutTransport(info TLSInfo, dialtimeoutd, rdtimeoutd, wtimeoutd time.Duration) (*http.Transport, error) {
tr, err := NewTransport(info, dialtimeoutd)
if err != nil {
return nil, err
}
if rdtimeoutd != 0 || wtimeoutd != 0 {
// the timed out connection will timeout soon after it is idle.
// it should not be put back to http transport as an idle connection for future usage.
tr.MaxIdleConnsPerHost = -1
} else {
// allow more idle connections between peers to avoid unnecessary port allocation.
tr.MaxIdleConnsPerHost = 1024
}
tr.Dial = (&rwTimeoutDialer{
Dialer: net.Dialer{
Timeout: dialtimeoutd,
KeepAlive: 30 * time.Second,
},
rdtimeoutd: rdtimeoutd,
wtimeoutd: wtimeoutd,
}).Dial
return tr, nil
}
func NewTransport(info TLSInfo, dialtimeoutd time.Duration) (*http.Transport, error) {
cfg, err := info.ClientConfig()
if err != nil {
return nil, err
}
t := &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: dialtimeoutd,
// value taken from http.DefaultTransport
KeepAlive: 30 * time.Second,
}).DialContext,
// value taken from http.DefaultTransport
TLSHandshakeTimeout: 10 * time.Second,
TLSClientConfig: cfg,
}
dialer := &net.Dialer{
Timeout: dialtimeoutd,
KeepAlive: 30 * time.Second,
}
dialContext := func(ctx context.Context, net, addr string) (net.Conn, error) {
return dialer.DialContext(ctx, "unix", addr)
}
tu := &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: dialContext,
TLSHandshakeTimeout: 10 * time.Second,
TLSClientConfig: cfg,
// Cost of reopening connection on sockets is low, and they are mostly used in testing.
// Long living unix-transport connections were leading to 'leak' test flakes.
// Alternativly the returned Transport (t) should override CloseIdleConnections to
// forward it to 'tu' as well.
IdleConnTimeout: time.Microsecond,
}
ut := &unixTransport{tu}
t.RegisterProtocol("unix", ut)
t.RegisterProtocol("unixs", ut)
return t, nil
}