Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions control-operator/api/v1alpha1/task_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,24 @@ type TaskSpec struct {
NodeName string `json:"nodeName,omitempty"`
}

const (
ConditionPodReady = "PodReady"
ConditionGRPCConnected = "GRPCConnected"
ConditionStateInitialized = "StateInitialized"
ConditionStateTransitioned = "StateTransitioned"
)

// TaskStatus defines the observed state of Task
type TaskStatus struct {
// INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
// Important: Run "make" to regenerate code after modifying this file
Pod v1.PodStatus `json:"pod,omitempty"`
State string `json:"state,omitempty"`
Error string `json:"error,omitempty"`
// +listType=map
// +listMapKey=type
// +optional
Conditions []metav1.Condition `json:"conditions,omitempty"`
}

// +kubebuilder:object:root=true
Expand Down
7 changes: 7 additions & 0 deletions control-operator/api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

39 changes: 39 additions & 0 deletions control-operator/config/crd/bases/aliecs.alice.cern_tasks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3931,6 +3931,45 @@ spec:
type: object
status:
properties:
conditions:
items:
properties:
lastTransitionTime:
format: date-time
type: string
message:
maxLength: 32768
type: string
observedGeneration:
format: int64
minimum: 0
type: integer
reason:
maxLength: 1024
minLength: 1
pattern: ^[A-Za-z]([A-Za-z0-9_,:]*[A-Za-z0-9_])?$
type: string
status:
enum:
- "True"
- "False"
- Unknown
type: string
type:
maxLength: 316
pattern: ^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])$
type: string
required:
- lastTransitionTime
- message
- reason
- status
- type
type: object
type: array
x-kubernetes-list-map-keys:
- type
x-kubernetes-list-type: map
error:
type: string
pod:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ spec:
- --leader-elect
- --health-probe-bind-address=:9080
- --metrics-bind-address=:9081
- --zap-encoder=json
image: environment-manager:latest
name: manager
securityContext:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ spec:
args:
- --health-probe-bind-address=:9082
- --metrics-bind-address=:9083
- --zap-encoder=json
env:
- name: NODE_NAME
valueFrom:
Expand Down
37 changes: 37 additions & 0 deletions control-operator/internal/controller/task_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -127,6 +128,9 @@ func (r *TaskReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.
if err := r.Status().Update(ctx, t); err != nil {
return ctrl.Result{}, err
}
if err := r.recordCondition(ctx, t, aliecsv1alpha1.ConditionPodReady, metav1.ConditionFalse, "PodFailed", reason); err != nil {
return ctrl.Result{}, err
}
}
// Always stop reconciliation if the Pod is in a failed state
return ctrl.Result{}, nil
Expand All @@ -137,6 +141,9 @@ func (r *TaskReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.
log.Info("pod doesn't have IP yet, we wait for different event")
return ctrl.Result{}, nil
}
if err := r.recordCondition(ctx, t, aliecsv1alpha1.ConditionPodReady, metav1.ConditionTrue, "PodRunning", fmt.Sprintf("Pod %s has IP %s", existingPod.Name, existingPod.Status.PodIP)); err != nil {
return ctrl.Result{}, err
}
res, err := r.createGRPCConsumer(ctx, t, existingPod, req.NamespacedName, log)
if err != nil || !res.IsZero() {
return res, err
Expand Down Expand Up @@ -169,6 +176,9 @@ func (r *TaskReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.
if err := r.Status().Update(ctx, t); err != nil {
return ctrl.Result{}, err
}
if err := r.recordCondition(ctx, t, aliecsv1alpha1.ConditionStateInitialized, metav1.ConditionTrue, "StateQueried", fmt.Sprintf("Initial state: %s", t.Status.State)); err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}

Expand Down Expand Up @@ -214,6 +224,9 @@ func (r *TaskReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.
if err := r.Status().Update(ctx, t); err != nil {
return ctrl.Result{}, err
}
if err := r.recordCondition(ctx, t, aliecsv1alpha1.ConditionStateTransitioned, metav1.ConditionFalse, "TransitionFailed", fmt.Sprintf("Transition from %s to %s failed: %s", stateReply.GetState(), t.Spec.State, transErr.Error())); err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}
if newState != "" {
Expand All @@ -232,6 +245,11 @@ func (r *TaskReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.
if err := r.Status().Update(ctx, t); err != nil {
return ctrl.Result{}, err
}
if t.Status.State != oldStatus.State {
if err := r.recordCondition(ctx, t, aliecsv1alpha1.ConditionStateTransitioned, metav1.ConditionTrue, "TransitionComplete", fmt.Sprintf("Transitioned from %s to %s", oldStatus.State, t.Status.State)); err != nil {
return ctrl.Result{}, err
}
}
}

return ctrl.Result{}, nil
Expand All @@ -252,6 +270,9 @@ func (r *TaskReconciler) createGRPCConsumer(ctx context.Context, t *aliecsv1alph

clientsForContainers[t.Name] = client

if err := r.recordCondition(ctx, t, aliecsv1alpha1.ConditionGRPCConnected, metav1.ConditionTrue, "Connected", fmt.Sprintf("gRPC connection established to %s", addr)); err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}

Expand Down Expand Up @@ -385,6 +406,22 @@ func (r *TaskReconciler) SetupWithManager(mgr ctrl.Manager) error {
Complete(r)
}

func (r *TaskReconciler) recordCondition(ctx context.Context, t *aliecsv1alpha1.Task, condType string, condStatus metav1.ConditionStatus, reason, message string) error {
patch := client.MergeFrom(t.DeepCopy())
eventType := v1.EventTypeNormal
if condStatus == metav1.ConditionFalse {
eventType = v1.EventTypeWarning
}
meta.SetStatusCondition(&t.Status.Conditions, metav1.Condition{
Type: condType,
Status: condStatus,
Reason: reason,
Message: message,
})
r.Recorder.Event(t, eventType, reason, message)
return r.Status().Patch(ctx, t, patch)
}

func prettyPrint(i any) string {
s, err := json.MarshalIndent(i, "", " ")
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: fluent-bit-audit-config
namespace: kube-system
data:
fluent-bit-audit.yml: |
service:
flush: 5
log_level: info

parsers:
- name: audit-json
format: json
time_key: requestReceivedTimestamp
time_format: "%Y-%m-%dT%H:%M:%S.%LZ"

pipeline:
inputs:
- name: tail
path: /var/log/k3s-audit.log
tag: k8s.audit
parser: audit-json
db: /db/fluent-bit-audit.db
buffer_max_size: 2MB
skip_long_lines: off

filters:
- name: nest
match: k8s.audit
operation: lift
nested_under: objectRef
add_prefix: objectRef_

- name: nest
match: k8s.audit
operation: lift
nested_under: user
add_prefix: user_

outputs:
- name: forward
match: k8s.audit
host: ${OPENSEARCH_HOST}
port: ${OPENSEARCH_PORT}
---
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: fluent-bit-audit
namespace: kube-system
spec:
selector:
matchLabels:
app: fluent-bit-audit
template:
metadata:
labels:
app: fluent-bit-audit
annotations:
reloader.stakater.com/auto: "true"
spec:
nodeSelector:
node-role.kubernetes.io/control-plane: "true"
tolerations:
- key: node-role.kubernetes.io/control-plane
operator: Exists
effect: NoSchedule
containers:
- name: fluent-bit
image: fluent/fluent-bit:5.0
args:
- -c
- /etc/fluent-bit-audit/fluent-bit-audit.yml
envFrom:
- configMapRef:
name: opensearch-config
volumeMounts:
- name: config
mountPath: /etc/fluent-bit-audit/fluent-bit-audit.yml
subPath: fluent-bit-audit.yml
- name: auditlog
mountPath: /var/log/k3s-audit.log
readOnly: true
- name: db
mountPath: /db
volumes:
- name: config
configMap:
name: fluent-bit-audit-config
- name: auditlog
hostPath:
path: /var/log/k3s-audit.log
type: File
- name: db
hostPath:
path: /var/log/fluent-bit-audit
type: DirectoryOrCreate
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: fluent-bit-events-config
namespace: kube-system
data:
fluent-bit.yml: |
service:
flush: 5
log_level: info

pipeline:
inputs:
- name: kubernetes_events
tag: kube.events
kube_url: https://kubernetes.default.svc:443
kube_ca_file: /var/lib/rancher/k3s/server/tls/server-ca.crt
kube_token_file: /var/run/secrets/kubernetes.io/serviceaccount/token

outputs:
- name: forward
match: kube.events
host: ${OPENSEARCH_HOST}
port: ${OPENSEARCH_PORT}
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: fluent-bit-events
namespace: kube-system
spec:
replicas: 1
selector:
matchLabels:
app: fluent-bit-events
template:
metadata:
labels:
app: fluent-bit-events
annotations:
reloader.stakater.com/auto: "true"
spec:
serviceAccountName: fluent-bit-events
containers:
- name: fluent-bit
image: fluent/fluent-bit:5.0
args:
- -c
- /etc/fluent-bit/fluent-bit.yml
envFrom:
- configMapRef:
name: opensearch-config
volumeMounts:
- name: config
mountPath: /etc/fluent-bit/fluent-bit.yml
subPath: fluent-bit.yml
- name: k3s-tls
mountPath: /var/lib/rancher/k3s/server/tls
readOnly: true
volumes:
- name: config
configMap:
name: fluent-bit-events-config
- name: k3s-tls
hostPath:
path: /var/lib/rancher/k3s/server/tls
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: fluent-bit-events
namespace: kube-system
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: fluent-bit-events
rules:
- apiGroups: [""]
resources: [events]
verbs: [get, list, watch]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: fluent-bit-events
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: fluent-bit-events
subjects:
- kind: ServiceAccount
name: fluent-bit-events
namespace: kube-system
Loading
Loading