Skip to content

Commit

Permalink
Mark NodeNotReady for docker version below 1.6.2
Browse files Browse the repository at this point in the history
  • Loading branch information
dchen1107 committed Oct 30, 2015
1 parent b691fd2 commit a39e1e9
Show file tree
Hide file tree
Showing 10 changed files with 197 additions and 10 deletions.
4 changes: 2 additions & 2 deletions cmd/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func startComponents(firstManifestURL, secondManifestURL string) (string, string
testRootDir := integration.MakeTempDirOrDie("kubelet_integ_1.", "")
configFilePath := integration.MakeTempDirOrDie("config", testRootDir)
glog.Infof("Using %s as root dir for kubelet #1", testRootDir)
fakeDocker1.VersionInfo = docker.Env{"ApiVersion=1.15"}
fakeDocker1.VersionInfo = docker.Env{"ApiVersion=1.20"}

kcfg := kubeletapp.SimpleKubelet(
cl,
Expand Down Expand Up @@ -246,7 +246,7 @@ func startComponents(firstManifestURL, secondManifestURL string) (string, string
// have a place they can schedule.
testRootDir = integration.MakeTempDirOrDie("kubelet_integ_2.", "")
glog.Infof("Using %s as root dir for kubelet #2", testRootDir)
fakeDocker2.VersionInfo = docker.Env{"ApiVersion=1.15"}
fakeDocker2.VersionInfo = docker.Env{"ApiVersion=1.20"}

kcfg = kubeletapp.SimpleKubelet(
cl,
Expand Down
6 changes: 6 additions & 0 deletions pkg/kubelet/container/fake_runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type FakeRuntime struct {
StartedContainers []string
KilledContainers []string
VersionInfo string
RuntimeType string
Err error
InspectErr error
}
Expand Down Expand Up @@ -94,6 +95,7 @@ func (f *FakeRuntime) ClearCalls() {
f.StartedContainers = []string{}
f.KilledContainers = []string{}
f.VersionInfo = ""
f.RuntimeType = ""
f.Err = nil
f.InspectErr = nil
}
Expand Down Expand Up @@ -136,6 +138,10 @@ func (f *FakeRuntime) AssertKilledContainers(containers []string) error {
return f.assertList(containers, f.KilledContainers)
}

func (f *FakeRuntime) Type() string {
return f.RuntimeType
}

func (f *FakeRuntime) Version() (Version, error) {
f.Lock()
defer f.Unlock()
Expand Down
3 changes: 3 additions & 0 deletions pkg/kubelet/container/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ type ImageSpec struct {
// by a container runtime.
// Thread safety is required from implementations of this interface.
type Runtime interface {
// Type returns the type of the container runtime.
Type() string

// Version returns the version information of the container runtime.
Version() (Version, error)
// GetPods returns a list containers group by pods. The boolean parameter
Expand Down
8 changes: 8 additions & 0 deletions pkg/kubelet/dockertools/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ import (
)

const (
DockerType = "docker"

MinimumDockerAPIVersion = "1.18"

maxReasonCacheEntries = 200

// ndots specifies the minimum number of dots that a domain name must contain for the resolver to consider it as FQDN (fully-qualified)
Expand Down Expand Up @@ -1043,6 +1047,10 @@ func (dv dockerVersion) Compare(other string) (int, error) {
return 0, nil
}

func (dm *DockerManager) Type() string {
return DockerType
}

func (dm *DockerManager) Version() (kubecontainer.Version, error) {
env, err := dm.client.Version()
if err != nil {
Expand Down
30 changes: 29 additions & 1 deletion pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -2562,11 +2562,13 @@ func (kl *Kubelet) setNodeStatus(node *api.Node) error {
containerRuntimeUp := kl.containerRuntimeUp()
// Check whether network is configured properly
networkConfigured := kl.doneNetworkConfigure()
// Check whether runtime version meets the minimal requirements
containerRuntimeVersionRequirementMet := kl.containerRuntimeVersionRequirementMet()

currentTime := unversioned.Now()
var newNodeReadyCondition api.NodeCondition
var oldNodeReadyConditionStatus api.ConditionStatus
if containerRuntimeUp && networkConfigured {
if containerRuntimeUp && networkConfigured && containerRuntimeVersionRequirementMet {
newNodeReadyCondition = api.NodeCondition{
Type: api.NodeReady,
Status: api.ConditionTrue,
Expand All @@ -2582,6 +2584,9 @@ func (kl *Kubelet) setNodeStatus(node *api.Node) error {
if !networkConfigured {
messages = append(messages, "network not configured correctly")
}
if !containerRuntimeVersionRequirementMet {
messages = append(messages, fmt.Sprintf("container runtime version is older than %s", dockertools.MinimumDockerAPIVersion))
}
newNodeReadyCondition = api.NodeCondition{
Type: api.NodeReady,
Status: api.ConditionFalse,
Expand Down Expand Up @@ -2693,6 +2698,29 @@ func (kl *Kubelet) doneNetworkConfigure() bool {
return kl.networkConfigured
}

func (kl *Kubelet) containerRuntimeVersionRequirementMet() bool {
switch kl.GetRuntime().Type() {
case "docker":
version, err := kl.GetContainerRuntimeVersion()
if err != nil {
return true
}
// Verify the docker version.
result, err := version.Compare(dockertools.MinimumDockerAPIVersion)
if err != nil {
glog.Errorf("Cannot compare current docker version %v with minimum support Docker version %q", version, dockertools.MinimumDockerAPIVersion)
return false
}
return (result >= 0)
case "rkt":
// TODO(dawnchen): Rkt support here
return true
default:
glog.Errorf("unsupported container runtime %s specified", kl.GetRuntime().Type())
return true
}
}

// tryUpdateNodeStatus tries to update node status to master. If ReconcileCBR0
// is set, this function will also confirm that cbr0 is configured correctly.
func (kl *Kubelet) tryUpdateNodeStatus() error {
Expand Down
137 changes: 137 additions & 0 deletions pkg/kubelet/kubelet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2623,6 +2623,143 @@ func TestUpdateNewNodeStatus(t *testing.T) {
}
}

func TestDockerRuntimeVersion(t *testing.T) {
testKubelet := newTestKubelet(t)
kubelet := testKubelet.kubelet
fakeRuntime := testKubelet.fakeRuntime
fakeRuntime.RuntimeType = "docker"
fakeRuntime.VersionInfo = "1.18"
kubeClient := testKubelet.fakeKubeClient
kubeClient.ReactionChain = testclient.NewSimpleFake(&api.NodeList{Items: []api.Node{
{ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}},
}}).ReactionChain
machineInfo := &cadvisorapi.MachineInfo{
MachineID: "123",
SystemUUID: "abc",
BootID: "1b3",
NumCores: 2,
MemoryCapacity: 1024,
}
mockCadvisor := testKubelet.fakeCadvisor
mockCadvisor.On("MachineInfo").Return(machineInfo, nil)
versionInfo := &cadvisorapi.VersionInfo{
KernelVersion: "3.16.0-0.bpo.4-amd64",
ContainerOsVersion: "Debian GNU/Linux 7 (wheezy)",
DockerVersion: "1.5.0",
}
mockCadvisor.On("VersionInfo").Return(versionInfo, nil)
// Create a new DiskSpaceManager with a new policy. This new manager along with the mock
// FsInfo values added to Cadvisor should make the kubelet report that it has sufficient
// disk space.
dockerimagesFsInfo := cadvisorapiv2.FsInfo{Capacity: 500 * mb, Available: 200 * mb}
rootFsInfo := cadvisorapiv2.FsInfo{Capacity: 500 * mb, Available: 200 * mb}
mockCadvisor.On("DockerImagesFsInfo").Return(dockerimagesFsInfo, nil)
mockCadvisor.On("RootFsInfo").Return(rootFsInfo, nil)
dsp := DiskSpacePolicy{DockerFreeDiskMB: 100, RootFreeDiskMB: 100}
diskSpaceManager, err := newDiskSpaceManager(mockCadvisor, dsp)
if err != nil {
t.Fatalf("can't update disk space manager: %v", err)
}
diskSpaceManager.Unfreeze()
kubelet.diskSpaceManager = diskSpaceManager

expectedNode := &api.Node{
ObjectMeta: api.ObjectMeta{Name: testKubeletHostname},
Spec: api.NodeSpec{},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Type: api.NodeReady,
Status: api.ConditionTrue,
Reason: "KubeletReady",
Message: fmt.Sprintf("kubelet is posting ready status"),
LastHeartbeatTime: unversioned.Time{},
LastTransitionTime: unversioned.Time{},
},
{
Type: api.NodeOutOfDisk,
Status: api.ConditionFalse,
Reason: "KubeletHasSufficientDisk",
Message: fmt.Sprintf("kubelet has sufficient disk space available"),
LastHeartbeatTime: unversioned.Time{},
LastTransitionTime: unversioned.Time{},
},
},
NodeInfo: api.NodeSystemInfo{
MachineID: "123",
SystemUUID: "abc",
BootID: "1b3",
KernelVersion: "3.16.0-0.bpo.4-amd64",
OsImage: "Debian GNU/Linux 7 (wheezy)",
ContainerRuntimeVersion: "docker://1.5.0",
KubeletVersion: version.Get().String(),
KubeProxyVersion: version.Get().String(),
},
Capacity: api.ResourceList{
api.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI),
api.ResourceMemory: *resource.NewQuantity(1024, resource.BinarySI),
api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI),
},
Addresses: []api.NodeAddress{
{Type: api.NodeLegacyHostIP, Address: "127.0.0.1"},
{Type: api.NodeInternalIP, Address: "127.0.0.1"},
},
},
}

kubelet.updateRuntimeUp()
if err := kubelet.updateNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err)
}
actions := kubeClient.Actions()
if len(actions) != 2 {
t.Fatalf("unexpected actions: %v", actions)
}
if !actions[1].Matches("update", "nodes") || actions[1].GetSubresource() != "status" {
t.Fatalf("unexpected actions: %v", actions)
}
updatedNode, ok := actions[1].(testclient.UpdateAction).GetObject().(*api.Node)
if !ok {
t.Errorf("unexpected object type")
}
for i, cond := range updatedNode.Status.Conditions {
if cond.LastHeartbeatTime.IsZero() {
t.Errorf("unexpected zero last probe timestamp")
}
if cond.LastTransitionTime.IsZero() {
t.Errorf("unexpected zero last transition timestamp")
}
updatedNode.Status.Conditions[i].LastHeartbeatTime = unversioned.Time{}
updatedNode.Status.Conditions[i].LastTransitionTime = unversioned.Time{}
}
if !reflect.DeepEqual(expectedNode, updatedNode) {
t.Errorf("unexpected objects: %s", util.ObjectDiff(expectedNode, updatedNode))
}

// Downgrade docker version, node should be NotReady
fakeRuntime.RuntimeType = "docker"
fakeRuntime.VersionInfo = "1.17"
kubelet.updateRuntimeUp()
if err := kubelet.updateNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err)
}
actions = kubeClient.Actions()
if len(actions) != 4 {
t.Fatalf("unexpected actions: %v", actions)
}
if !actions[1].Matches("update", "nodes") || actions[1].GetSubresource() != "status" {
t.Fatalf("unexpected actions: %v", actions)
}
updatedNode, ok = actions[3].(testclient.UpdateAction).GetObject().(*api.Node)
if !ok {
t.Errorf("unexpected object type")
}
if updatedNode.Status.Conditions[0].Reason != "KubeletNotReady" &&
!strings.Contains(updatedNode.Status.Conditions[0].Message, "container runtime version is older than") {
t.Errorf("unexpect NodeStatus due to container runtime version")
}
}

func TestUpdateExistingNodeStatus(t *testing.T) {
testKubelet := newTestKubelet(t)
kubelet := testKubelet.kubelet
Expand Down
2 changes: 0 additions & 2 deletions pkg/kubelet/rkt/container_id.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ type containerID struct {
appName string // Name of the app in that pod.
}

const RktType = "rkt"

// buildContainerID constructs the containers's ID using containerID,
// which consists of the pod uuid and the container name.
// The result can be used to uniquely identify a container.
Expand Down
6 changes: 6 additions & 0 deletions pkg/kubelet/rkt/rkt.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ import (
)

const (
RktType = "rkt"

acVersion = "0.7.1"
minimumRktVersion = "0.9.0"
recommendRktVersion = "0.9.0"
Expand Down Expand Up @@ -864,6 +866,10 @@ func (r *Runtime) GetPodStatus(pod *api.Pod) (*api.PodStatus, error) {
return r.getPodStatus(serviceName)
}

func (r *Runtime) Type() string {
return RktType
}

// Version invokes 'rkt version' to get the version information of the rkt
// runtime on the machine.
// The return values are an int array containers the version number.
Expand Down
3 changes: 2 additions & 1 deletion pkg/kubelet/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
"k8s.io/kubernetes/pkg/healthz"
"k8s.io/kubernetes/pkg/httplog"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/dockertools"
"k8s.io/kubernetes/pkg/kubelet/portforward"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util"
Expand Down Expand Up @@ -369,7 +370,7 @@ func (s *Server) dockerHealthCheck(req *http.Request) error {
return errors.New("unknown Docker version")
}
// Verify the docker version.
result, err := version.Compare("1.18")
result, err := version.Compare(dockertools.MinimumDockerAPIVersion)
if err != nil {
return err
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/kubelet/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func newServerTest() *serverTestFramework {
fw := &serverTestFramework{}
fw.fakeKubelet = &fakeKubelet{
containerVersionFunc: func() (kubecontainer.Version, error) {
return dockertools.NewVersion("1.15")
return dockertools.NewVersion("1.18")
},
hostnameFunc: func() string {
return "127.0.0.1"
Expand Down Expand Up @@ -506,7 +506,7 @@ func TestServeRunInContainerWithUID(t *testing.T) {
func TestHealthCheck(t *testing.T) {
fw := newServerTest()
fw.fakeKubelet.containerVersionFunc = func() (kubecontainer.Version, error) {
return dockertools.NewVersion("1.15")
return dockertools.NewVersion("1.18")
}
fw.fakeKubelet.hostnameFunc = func() string {
return "127.0.0.1"
Expand All @@ -523,7 +523,7 @@ func TestHealthCheck(t *testing.T) {

//Test with old container runtime version
fw.fakeKubelet.containerVersionFunc = func() (kubecontainer.Version, error) {
return dockertools.NewVersion("1.1")
return dockertools.NewVersion("1.16")
}

assertHealthFails(t, fw.testHTTPServer.URL+"/healthz", http.StatusInternalServerError)
Expand Down Expand Up @@ -715,7 +715,7 @@ func TestAuthorizationSuccess(t *testing.T) {
func TestSyncLoopCheck(t *testing.T) {
fw := newServerTest()
fw.fakeKubelet.containerVersionFunc = func() (kubecontainer.Version, error) {
return dockertools.NewVersion("1.15")
return dockertools.NewVersion("1.18")
}
fw.fakeKubelet.hostnameFunc = func() string {
return "127.0.0.1"
Expand Down

0 comments on commit a39e1e9

Please sign in to comment.