Skip to content

Timeout when apply Service #327

@sleipnir

Description

@sleipnir

Hi!
I received this when apply service descendant:

2025-09-02 19:50:55.615 [operator@127.0.0.1]:[pid=<0.564.0> ]:[error]:Task #PID<0.564.0> started from #PID<0.563.0> terminating
** (stop) exited in: Task.await(%Task{mfa: {:erlang, :apply, 2}, owner: #PID<0.564.0>, pid: #PID<0.571.0>, ref: #Reference<0.0.72195.1510785559.1662320644.257604>}, 5000)
    ** (EXIT) time out
    (elixir 1.18.0) lib/task.ex:888: Task.await_receive/3
    (elixir 1.18.0) lib/enum.ex:1714: Enum."-map/2-lists^map/1-1-"/2
    (elixir 1.18.0) lib/enum.ex:1714: Enum."-map/2-lists^map/1-1-"/2
    (bonny 1.4.0) lib/bonny/resource.ex:203: Bonny.Resource.apply_async/3
    (bonny 1.4.0) lib/bonny/axn.ex:430: Bonny.Axn.apply_descendant_group/3
    (elixir 1.18.0) lib/enum.ex:2546: Enum."-reduce/3-lists^foldl/2-0-"/3
    (bonny 1.4.0) lib/bonny/axn.ex:419: Bonny.Axn.apply_descendants/2
    (v3_paas_controller 0.1.0) lib/v3_paas_controller/operator.ex:1: V3PaaSController.Operator.pluggable_builder_call/2
Function: &Bonny.Server.AsyncStreamRunner.run/2
    Args: [#Stream<[enum: #Function<61.7259525/2 in Stream.transform/3>, funs: [#Function<49.7259525/1 in Stream.map/2>, #Function<49.7259525/1 in Stream.map/2>]]>, nil]

Bellow the controller:

defmodule V3PaasController.Controllers.WorkloadController do
  use Bonny.ControllerV2

  require Logger
  require Bonny.API.CRD

  import Bonny.Config, only: [conn: 0]

  step(Bonny.Pluggable.SkipObservedGenerations)
  step(:handle_event)

  @impl Bonny.ControllerV2
  def rbac_rules() do
    [
      to_rbac_rule({"*", "*", "*"})
      # to_rbac_rule({"cert-manager.io", "certificate", "*"}),
      # to_rbac_rule({"", ["secrets"], ["*"]}),
      # to_rbac_rule({"batch", ["cronjob", "cronjobs", "job", "jobs"], ["*"]}),
      # to_rbac_rule({"", ["pods"], ["*"]}),
      # to_rbac_rule({"apps", ["deployments", "daemonsets"], ["*"]}),
      # to_rbac_rule({"", ["services", "configmaps"], ["*"]}),
      # to_rbac_rule({"autoscaling", ["horizontalpodautoscalers"], ["*"]}),
      # to_rbac_rule({"extensions", ["ingresses", "ingressclasses"], ["*"]}),
      # to_rbac_rule({"networking.k8s.io", ["ingresses", "ingressclasses"], ["*"]}),
      # to_rbac_rule({"keda.sh", ["scaledobjects", "scaledjobs"], ["*"]}),
      # to_rbac_rule({"external.metrics.k8s.io", ["*"], ["*"]}),
      # to_rbac_rule({"gateway.networking.k8s.io", ["httproutes"], ["*"]})
    ]
  end

  def handle_event(%Bonny.Axn{action: action, resource: resource} = axn, _opts)
      when action in [:add, :modify] do
    Logger.info("Reconciling Workload #{get_in(resource, ["metadata", "name"])}")

    with {:namespace, {:ok, _}} <- {:namespace, ensure_namespace!(resource["spec"]["tenant"])},
         {:sa, axn, sa} <- {:sa, axn, build_service_account(resource)},
         {:workload, axn, workload} <- {:workload, axn, build_workload(resource)},
         {:svc, axn, svc} <- {:svc, axn, build_service(resource)},
         {:ext_secret, axn, ext_secret} <-
           {:ext_secret, axn, build_external_secret(resource)},
         {:scaled_obj, axn, autoscaling} <- {:scaled_obj, axn, build_autoscaler(resource)} do
      axn
      |> maybe_register(sa, group: -4, create_events: true)
      |> maybe_register(ext_secret, group: -3, create_events: true)
      |> maybe_register(workload, group: -2, create_events: true)
      |> maybe_register(autoscaling, group: -1, create_events: true)
      |> maybe_register(svc, group: 0, create_events: true)
      # |> success_event()
      |> set_condition("WorkloadReady", true, "All resources reconciled successfully")
      |> IO.inspect()
    else
      {:namespace, {:error, error}} ->
        axn |> failure_event(message: error) |> set_condition("NamespaceReady", false, error)

      {:sa, _axn, nil} ->
        axn
        |> failure_event(message: "ServiceAccount not created")
        |> set_condition("ServiceAccount", false, "Missing ServiceAccount")

      {:workload, _axn, nil} ->
        axn
        |> failure_event(message: "Workload not created")
        |> set_condition("Workload", false, "Missing Workload")

      {:svc, _axn, nil} ->
        axn
        |> failure_event(message: "Service not created")
        |> set_condition("Service", false, "Missing Service")

      {:ext_secret, _axn, nil} ->
        axn
        |> failure_event(message: "ExternalSecret not created")
        |> set_condition("ExternalSecret", false, "Missing ExternalSecret")

      {:scaled_obj, _axn, nil} ->
        axn
        |> failure_event(message: "Autoscaling not created")
        |> set_condition("Autoscaling", false, "Missing Autoscaling")
    end
  end

  def handle_event(%Bonny.Axn{action: :reconcile, resource: resource} = axn, _opts) do
    Logger.info("Reconciling Workload #{get_in(resource, ["metadata", "name"])}")
    Bonny.Axn.success_event(axn)
  end

  def handle_event(%Bonny.Axn{action: :delete, resource: resource} = axn, _opts) do
    Logger.info("Deleting Workload #{get_in(resource, ["metadata", "name"])}")
    Bonny.Axn.success_event(axn)
  end

  defp maybe_register(axn, res, opts \\ [])
  defp maybe_register(axn, nil, _opts), do: axn
  defp maybe_register(axn, res, opts), do: Bonny.Axn.register_descendant(axn, res, opts)

  defp build_service_account(
         %{"spec" => %{"serviceAccount" => %{"enabled" => true} = sa, "tenant" => tenant}} =
           _resource
       ) do
    name = if sa["name"], do: sa["name"], else: "#{sa["name"]}-sa"

    %{
      "apiVersion" => "v1",
      "kind" => "ServiceAccount",
      "metadata" => %{
        "name" => name,
        "namespace" => tenant,
        "annotations" =>
          Map.merge(
            %{
              "argocd.argoproj.io/compare-options" => "IgnoreExtraneous",
              "argocd.argoproj.io/tracking-id" => "#{tenant}:v3.com.br:Workload/#{name}"
            },
            Map.get(sa, "annotations", %{})
          )
      }
    }
  end

  defp build_service_account(_), do: nil

  defp build_workload(%{"spec" => %{"type" => "cronjob"}} = res), do: build_cronjob(res)
  defp build_workload(%{"spec" => %{"type" => _}} = res), do: build_deployment(res)
  defp build_workload(_), do: nil

  defp build_deployment(%{"spec" => spec, "metadata" => _meta}) do
    name = spec["name"]
    match_labels = %{"app" => name}
    labels = %{"app" => name, "tenant" => spec["tenant"]}

    sa =
      if get_in(spec, ["serviceAccount", "enabled"]) do
        n = "#{spec["serviceAccount"]["name"]}"
        sa_name = if n, do: n, else: "#{n}-sa"
        sa_name
      else
        nil
      end

    base_annotations = %{
      "argocd.argoproj.io/compare-options" => "IgnoreExtraneous",
      "argocd.argoproj.io/tracking-id" => "#{spec["tenant"]}:v3.com.br:Workload/#{name}"
    }

    annotations =
      if Map.get(spec, "meshed", false) do
        Map.put(base_annotations, "linkerd.io/inject", "enabled")
      else
        base_annotations
      end

    annotations =
      if get_in(spec, ["monitoring", "enabled"]) do
        prometheus_ann = %{
          "prometheus.io/scrape" => "true",
          "prometheus.io/port" => "#{spec["network"]["containerPort"]}",
          "prometheus.io/path" => get_in(spec, ["monitoring", "path"]) || "/metrics"
        }

        Map.merge(annotations, prometheus_ann)
      else
        annotations
      end

    pod_template_metadata = %{"labels" => labels}

    pod_template_metadata =
      if Map.get(spec, "meshed", false) do
        Map.put(pod_template_metadata, "annotations", %{"linkerd.io/inject" => "enabled"})
      else
        pod_template_metadata
      end

    default_resources = %{
      "requests" => %{"cpu" => "50m", "memory" => "64Mi"},
      "limits" => %{"cpu" => "250m", "memory" => "128Mi"}
    }

    resources =
      case get_in(spec, ["compute", "resources"]) do
        nil -> default_resources
        %{} = r -> Map.merge(default_resources, r, fn _k, v1, v2 -> Map.merge(v1, v2) end)
      end

    pull_policy = if spec["imagePullPolicy"], do: spec["imagePullPolicy"], else: "Always"

    node_selector =
      if Map.get(spec, "exclusiveTenant", false) do
        %{"tenant" => "#{spec["tenant"]}-pool"}
      else
        nil
      end

    health_probes =
      if get_in(spec, ["monitoring", "enabled"]) do
        health = spec["monitoring"]["health"]
        %{
          "livenessProbe" => %{
            "httpGet" => %{
              "path" => Map.get(health, "livenessPath", "/healthz"),
              "port" => spec["network"]["containerPort"]
            },
            "initialDelaySeconds" => 10,
            "periodSeconds" => 10,
            "failureThreshold" => 5
          },
          "readinessProbe" => %{
            "httpGet" => %{
              "path" => Map.get(health, "readinessPath", "/healthz/ready"),
              "port" => spec["network"]["containerPort"]
            },
            "initialDelaySeconds" => 5,
            "periodSeconds" => 10,
            "failureThreshold" => 3
          }
        }
      else
        %{}
      end

    container =
      %{
        "name" => name,
        "image" => spec["image"],
        "imagePullPolicy" => pull_policy,
        "ports" => [
          %{"containerPort" => spec["network"]["containerPort"]}
        ],
        "resources" => resources,
        "env" => build_env(spec),
        "envFrom" => [
          %{"secretRef" => %{"name" => spec["config"]["secretRef"]}}
        ]
      }
      |> Map.merge(health_probes)

    pod_spec =
      %{
        "serviceAccountName" => sa,
        "containers" => [container]
      }
      |> maybe_put("nodeSelector", node_selector)

    %{
      "apiVersion" => "apps/v1",
      "kind" => "Deployment",
      "metadata" => %{
        "name" => name,
        "namespace" => spec["tenant"],
        "labels" => labels,
        "annotations" => annotations
      },
      "spec" => %{
        "replicas" => max(spec["compute"]["scale"]["min"], 1),
        "selector" => %{"matchLabels" => match_labels},
        "template" => %{
          "metadata" => pod_template_metadata,
          "spec" => pod_spec
        }
      }
    }
  end

  defp build_cronjob(%{"spec" => spec}) do
    name = spec["name"]
    tenant = spec["tenant"]

    sa =
      if get_in(spec, ["serviceAccount", "enabled"]) do
        spec["serviceAccount"]["name"]
      else
        nil
      end

    pull_policy = Map.get(spec, "imagePullPolicy", "Always")

    node_selector =
      if Map.get(spec, "exclusiveTenant", false) do
        %{"tenant" => "#{tenant}-pool"}
      else
        nil
      end

    pod_spec =
      %{
        "serviceAccountName" => sa,
        "containers" => [
          %{
            "name" => name,
            "image" => spec["image"],
            "imagePullPolicy" => pull_policy,
            "env" => build_env(spec),
            "envFrom" => [%{"secretRef" => %{"name" => spec["config"]["secretRef"]}}],
            "resources" => spec["compute"]["resources"]
          }
        ],
        "restartPolicy" => "OnFailure"
      }
      |> maybe_put("nodeSelector", node_selector)

    %{
      "apiVersion" => "batch/v1",
      "kind" => "CronJob",
      "metadata" => %{
        "name" => name,
        "namespace" => tenant,
        "annotations" => %{
          "argocd.argoproj.io/compare-options" => "IgnoreExtraneous",
          "argocd.argoproj.io/tracking-id" => "#{tenant}:v3.com.br:Workload/#{name}"
        },
        "labels" => %{"app.kubernetes.io/instance" => tenant}
      },
      "spec" => %{
        "schedule" => spec["schedule"],
        "concurrencyPolicy" => "Forbid",
        "jobTemplate" => %{
          "spec" => %{
            "template" => %{"spec" => pod_spec}
          }
        }
      }
    }
  end

  defp build_service(%{"spec" => spec, "metadata" => _meta}) do
    name = spec["name"]
    svc_name = "#{spec["name"]}-svc"
    match_labels = %{"app" => name}

    %{
      "apiVersion" => "v1",
      "kind" => "Service",
      "metadata" => %{
        "name" => svc_name,
        "namespace" => spec["tenant"],
        "annotations" => %{
          "argocd.argoproj.io/compare-options" => "IgnoreExtraneous",
          "argocd.argoproj.io/tracking-id" => "#{spec["tenant"]}:v3.com.br:Workload/#{svc_name}"
        }
      },
      "spec" => %{
        "selector" => match_labels,
        "ports" => [
          %{
            "protocol" => "TCP",
            "port" => spec["network"]["containerPort"],
            "targetPort" => spec["network"]["containerPort"]
          }
        ]
      }
    }
  end

  defp build_autoscaler(%{"spec" => %{"type" => "worker"} = _spec} = res),
    do: build_scaledobject(res)

  defp build_autoscaler(%{"spec" => %{"type" => "api"} = _spec} = res), do: build_hpa(res)
  defp build_autoscaler(_), do: nil

  defp build_hpa(%{"spec" => spec, "metadata" => _meta}) do
    min = get_in(spec, ["compute", "scale", "min"])
    max = get_in(spec, ["compute", "scale", "max"])
    name = spec["name"]

    if min && max do
      %{
        "apiVersion" => "autoscaling/v2",
        "kind" => "HorizontalPodAutoscaler",
        "metadata" => %{
          "name" => name,
          "namespace" => spec["tenant"],
          "annotations" => %{
            "argocd.argoproj.io/compare-options" => "IgnoreExtraneous",
            "argocd.argoproj.io/tracking-id" => "#{spec["tenant"]}:v3.com.br:Workload/#{name}"
          }
        },
        "spec" => %{
          "scaleTargetRef" => %{
            "apiVersion" => "apps/v1",
            "kind" => "Deployment",
            "name" => name
          },
          "minReplicas" => 1,
          "maxReplicas" => max,
          "metrics" => [
            %{
              "type" => "Resource",
              "resource" => %{
                "name" => "cpu",
                "target" => %{"type" => "Utilization", "averageUtilization" => 80}
              }
            }
          ]
        }
      }
    end
  end

  defp build_scaledobject(%{"spec" => %{"type" => "worker"} = spec, "metadata" => _meta}) do
    case get_in(spec, ["compute", "scale", "keda", "enabled"]) do
      true ->
        %{
          "apiVersion" => "keda.sh/v1alpha1",
          "kind" => "ScaledObject",
          "metadata" => %{
            "name" => "#{spec["name"]}-keda",
            "namespace" => spec["tenant"],
            "annotations" => %{
              "argocd.argoproj.io/compare-options" => "IgnoreExtraneous",
              "argocd.argoproj.io/tracking-id" =>
                "#{spec["tenant"]}:v3.com.br:Workload/#{spec["name"]}"
            }
          },
          "spec" => %{
            "idleReplicaCount" => 0,
            "pollingInterval" => 10,
            "cooldownPeriod" => 120,
            "minReplicaCount" => spec["compute"]["scale"]["min"],
            "maxReplicaCount" => spec["compute"]["scale"]["max"],
            "fallback" => %{
              "failureThreshold" => 5,
              "replicas" => 1
            },
            "advanced" => %{
              "horizontalPodAutoscalerConfig" => %{
                "behavior" => %{
                  "scaleUp" => %{
                    "stabilizationWindowSeconds" => 60,
                    "policies" => [
                      %{"type" => "Percent", "value" => 100, "periodSeconds" => 60}
                    ]
                  },
                  "scaleDown" => %{
                    "stabilizationWindowSeconds" => 300,
                    "policies" => [
                      %{"type" => "Percent", "value" => 50, "periodSeconds" => 60}
                    ]
                  }
                }
              }
            },
            "scaleTargetRef" => %{
              "name" => "#{spec["name"]}"
            },
            "triggers" => [
              %{
                "type" => "nats-jetstream",
                "metadata" => %{
                  "account" => "$G",
                  "natsServer" => "nats://nats.shared.svc.cluster.local:4222",
                  "natsServerMonitoringEndpoint" => "nats-headless.shared.svc.cluster.local:8222",
                  "stream" => spec["compute"]["scale"]["keda"]["stream"],
                  "consumer" => "#{spec["name"]}-consumer",
                  "topic" =>
                    "#{spec["compute"]["scale"]["keda"]["stream"]}.#{spec["compute"]["scale"]["keda"]["topic"]}",
                  "lagThreshold" => "#{spec["compute"]["scale"]["keda"]["queueLength"]}",
                  "useHttps" => "false"
                }
              }
            ]
          }
        }

      _ ->
        nil
    end
  end

  defp build_scaledobject(_), do: nil

  defp build_external_secret(%{"spec" => spec, "metadata" => _meta}) do
    case spec["config"] do
      %{"secretRef" => ref, "secretPath" => path} ->
        %{
          "apiVersion" => "external-secrets.io/v1beta1",
          "kind" => "ExternalSecret",
          "metadata" => %{
            "name" => ref,
            "namespace" => spec["tenant"],
            "annotations" => %{
              "argocd.argoproj.io/compare-options" => "IgnoreExtraneous",
              "argocd.argoproj.io/tracking-id" => "#{spec["tenant"]}:v3.com.br:Workload/#{ref}"
            }
          },
          "spec" => %{
            "refreshInterval" => "30s",
            "secretStoreRef" => %{
              "kind" => "ClusterSecretStore",
              "name" => "vault-secret-store"
            },
            "target" => %{
              "name" => ref,
              "creationPolicy" => "Owner"
            },
            "dataFrom" => [
              %{
                "extract" => %{"key" => path}
              }
            ]
          }
        }

      _ ->
        nil
    end
  end

  defp ensure_namespace!(tenant) do
    conn = conn()

    result =
      K8s.Client.get("v1", :namespace, name: tenant)
      |> then(&K8s.Client.run(conn, &1))

    case result do
      {:ok, _ns} ->
        {:ok, :already_exists}

      {:error, %K8s.Client.APIError{reason: "NotFound"}} ->
        manifest = %{
          "apiVersion" => "v1",
          "kind" => "Namespace",
          "metadata" => %{"name" => tenant}
        }

        operation = K8s.Client.create(manifest)
        {:ok, _namespace} = K8s.Client.run(conn, operation)

      {:error, other} ->
        {:error, other}
    end
  end

  defp build_env(%{"dependencies" => dependencies} = spec) when is_map(dependencies) do
    default_envs = [
      %{"name" => "TENANT_ID", "value" => spec["tenant"]},
      %{"name" => "ACCOUNT_ID", "value" => spec["account"]},
      %{"name" => "CLIENT_SECRET", "value" => spec["clientSecret"]},
      %{"name" => "ENVIRONMENT", "value" => spec["environment"]}
    ]

    db_envs =
      case Map.get(dependencies, "database") do
        %{"enabled" => true, "credentialsSecretRef" => secret} = db ->
          [
            %{
              "name" => "DATABASE_NAME",
              "value" => Map.get(db, "serverName", "unknown")
            },
            %{
              "name" => "DATABASE_HOST",
              "value" => Map.get(db, "serverHost", "unknown")
            },
            %{
              "name" => "DATABASE_PORT",
              "value" => Map.get(db, "serverPort", "3306")
            },
            %{
              "name" => "DATABASE_USER",
              "valueFrom" => %{
                "secretKeyRef" => %{"name" => secret, "key" => "username"}
              }
            },
            %{
              "name" => "DATABASE_PASSWORD",
              "valueFrom" => %{
                "secretKeyRef" => %{"name" => secret, "key" => "password"}
              }
            }
          ]

        _ ->
          []
      end

    nats_envs =
      case Map.get(dependencies, "nats") do
        %{"serverHost" => host, "stream" => stream} ->
          [
            %{"name" => "MESSAGING_NATS_URI", "value" => host},
            %{"name" => "STREAM_NAME", "value" => stream || spec["tenant"]}
          ]

        _ ->
          []
      end

    default_envs ++ db_envs ++ nats_envs
  end

  defp build_env(spec) do
    [
      %{"name" => "TENANT_ID", "value" => spec["tenant"]},
      %{"name" => "ACCOUNT_ID", "value" => spec["account"]},
      %{"name" => "CLIENT_SECRET", "value" => spec["clientSecret"]},
      %{"name" => "ENVIRONMENT", "value" => spec["environment"]}
    ]
  end

  defp maybe_put(map, _key, nil), do: map
  defp maybe_put(map, key, value), do: Map.put(map, key, value)
end

I couldn't find the error, I believe that when applying the service some admission controller is delaying the execution and this leads to the timeout of the http client that bonny uses, but this is just speculation on my part.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions