Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify the resource cache #288

Merged
merged 7 commits into from
Feb 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions cmd/eno-reconciler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/Azure/eno/internal/k8s"
"github.com/Azure/eno/internal/manager"
"github.com/Azure/eno/internal/reconstitution"
"github.com/Azure/eno/internal/resource"
)

func main() {
Expand Down Expand Up @@ -108,15 +109,15 @@ func run() error {
// This provides quick feedback in cases where only a few resources have changed.
writeBuffer := flowcontrol.NewResourceSliceWriteBufferForManager(mgr)

rCache := reconstitution.NewCache(mgr.GetClient())
recOpts.Manager = mgr
recOpts.Cache = rCache
recOpts.WriteBuffer = writeBuffer
recOpts.Downstream = remoteConfig
recOpts.Queue = workqueue.NewTypedRateLimitingQueue(
workqueue.DefaultTypedItemBasedRateLimiter[reconstitution.Request]())
workqueue.DefaultTypedItemBasedRateLimiter[resource.Request]())
recOpts.Cache = &resource.Cache{}
recOpts.Cache.SetQueue(recOpts.Queue)

err = reconstitution.New(mgr, rCache, recOpts.Queue)
err = reconstitution.New(mgr, recOpts.Cache, recOpts.Queue)
if err != nil {
return fmt.Errorf("constructing reconstitution manager: %w", err)
}
Expand Down
93 changes: 20 additions & 73 deletions internal/controllers/reconciliation/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,18 @@ import (
"github.com/Azure/eno/internal/discovery"
"github.com/Azure/eno/internal/flowcontrol"
"github.com/Azure/eno/internal/manager"
"github.com/Azure/eno/internal/reconstitution"
"github.com/Azure/eno/internal/resource"
"github.com/go-logr/logr"
)

var insecureLogPatch = os.Getenv("INSECURE_LOG_PATCH") == "true"

type Options struct {
Manager ctrl.Manager
Cache *reconstitution.Cache
Cache *resource.Cache
WriteBuffer *flowcontrol.ResourceSliceWriteBuffer
Downstream *rest.Config
Queue workqueue.TypedRateLimitingInterface[reconstitution.Request]
Queue workqueue.TypedRateLimitingInterface[resource.Request]

DiscoveryRPS float32

Expand All @@ -50,7 +50,7 @@ type Options struct {
type Controller struct {
client client.Client
writeBuffer *flowcontrol.ResourceSliceWriteBuffer
resourceClient reconstitution.Client
resourceClient *resource.Cache
timeout time.Duration
readinessPollInterval time.Duration
upstreamClient client.Client
Expand Down Expand Up @@ -80,22 +80,22 @@ func New(mgr ctrl.Manager, opts Options) error {
discovery: disc,
}

return builder.TypedControllerManagedBy[reconstitution.Request](mgr).
return builder.TypedControllerManagedBy[resource.Request](mgr).
Named("reconciliationController").
WithLogConstructor(manager.NewTypedLogConstructor[*reconstitution.Request](mgr, "reconciliationController")).
WithLogConstructor(manager.NewTypedLogConstructor[*resource.Request](mgr, "reconciliationController")).

// Eventually the reconstitution cache will implement source.Source but for now we need to inject our own workqueue.
// Since controllers require at least one source, we also start a fake/no-op source+handler.
WatchesRawSource(source.TypedChannel[reconstitution.Request, reconstitution.Request](make(<-chan event.TypedGenericEvent[reconstitution.Request]), &handler.TypedFuncs[reconstitution.Request, reconstitution.Request]{})).
WithOptions(controller.TypedOptions[reconstitution.Request]{
NewQueue: func(name string, q workqueue.TypedRateLimiter[reconstitution.Request]) workqueue.TypedRateLimitingInterface[reconstitution.Request] {
WatchesRawSource(source.TypedChannel[resource.Request, resource.Request](make(<-chan event.TypedGenericEvent[resource.Request]), &handler.TypedFuncs[resource.Request, resource.Request]{})).
WithOptions(controller.TypedOptions[resource.Request]{
NewQueue: func(name string, q workqueue.TypedRateLimiter[resource.Request]) workqueue.TypedRateLimitingInterface[resource.Request] {
return opts.Queue
},
}).
Complete(c)
}

func (c *Controller) Reconcile(ctx context.Context, req reconstitution.Request) (ctrl.Result, error) {
func (c *Controller) Reconcile(ctx context.Context, req resource.Request) (ctrl.Result, error) {
ctx, cancel := context.WithTimeout(ctx, c.timeout)
defer cancel()

Expand All @@ -115,58 +115,22 @@ func (c *Controller) Reconcile(ctx context.Context, req reconstitution.Request)
ctx = logr.NewContext(ctx, logger)

// Find the current and (optionally) previous desired states in the cache
synRef := reconstitution.NewSynthesisRef(comp)
resource, exists := c.resourceClient.Get(ctx, synRef, &req.Resource)
var prev *resource.Resource
resource, visible, exists := c.resourceClient.Get(ctx, comp.Status.GetCurrentSynthesisUUID(), req.Resource)
if !exists {
// It's possible for the cache to be empty because a manifest for this resource no longer exists at the requested composition generation.
// Dropping the work item is safe since filling the new version will generate a new queue message.
// Returning is safe because filling the cache will enqueue a new work item
logger.V(1).Info("dropping work item because the corresponding synthesis no longer exists in the cache")
return ctrl.Result{}, nil
}

var prev *reconstitution.Resource
if comp.Status.PreviousSynthesis != nil {
prevSynRef := reconstitution.NewSynthesisRef(comp)
prevSynRef.UUID = comp.Status.PreviousSynthesis.UUID
prev, _ = c.resourceClient.Get(ctx, prevSynRef, &req.Resource)
if !visible {
logger.V(1).Info("skipping because the resource isn't visible yet")
return ctrl.Result{}, nil
}
logger = logger.WithValues("resourceKind", resource.Ref.Kind, "resourceName", resource.Ref.Name, "resourceNamespace", resource.Ref.Namespace)
ctx = logr.NewContext(ctx, logger)

// Keep track of the last reconciliation time and report on it relative to the resource's reconcile interval
// This is useful for identifying cases where the loop can't keep up
if resource.ReconcileInterval != nil {
observation := resource.ObserveReconciliation()
if observation > 0 {
delta := observation - resource.ReconcileInterval.Duration
reconciliationScheduleDelta.Observe(delta.Seconds())
}
}

// CRDs must be reconciled before any CRs that use the type they define.
// For initial creation just failing and retrying will eventually converge but updates are tricky
// since unknown properties sent from clients are ignored by apiserver.
// i.e. ordering is necessary to handle adding a new property and populating it in the same synthesis.
crdResource, ok := c.resourceClient.GetDefiningCRD(ctx, synRef, resource.GVK.GroupKind())
if ok {
slice := &apiv1.ResourceSlice{}
err = c.client.Get(ctx, crdResource.ManifestRef.Slice, slice)
if err != nil {
return ctrl.Result{}, fmt.Errorf("getting resource slice: %w", err)
}
status := crdResource.FindStatus(slice)
if status == nil || status.Ready == nil {
logger.V(1).Info("skipping because the CRD that defines this resource type isn't ready")
return ctrl.Result{}, nil
}

// apiserver doesn't "close the loop" on CRD loading, so there is no way to know
// when CRDs are actually ready. This normally only takes a couple of milliseconds
// but we round up to a full second here to be safe.
if delta := time.Second - time.Since(status.Ready.Time); delta > 0 {
logger.V(1).Info("deferring until the defining CRD has been ready for 1 second")
return ctrl.Result{RequeueAfter: delta}, nil
}
if syn := comp.Status.PreviousSynthesis; syn != nil {
prev, _, _ = c.resourceClient.Get(ctx, syn.UUID, req.Resource)
}

// Fetch the current resource
Expand Down Expand Up @@ -195,23 +159,6 @@ func (c *Controller) Reconcile(ctx context.Context, req reconstitution.Request)
ready = status.Ready
}

// Evaluate the readiness of resources in the previous readiness group
if (status == nil || !status.Reconciled) && !resource.Deleted(comp) {
dependencies := c.resourceClient.RangeByReadinessGroup(ctx, synRef, resource.ReadinessGroup, reconstitution.RangeDesc)
for _, dep := range dependencies {
slice := &apiv1.ResourceSlice{}
err = c.client.Get(ctx, dep.ManifestRef.Slice, slice)
if err != nil {
return ctrl.Result{}, fmt.Errorf("getting resource slice: %w", err)
}
status := dep.FindStatus(slice)
if status == nil || status.Ready == nil {
logger.V(1).Info("skipping because at least one resource in an earlier readiness group isn't ready yet")
return ctrl.Result{}, nil
}
}
}

modified, err := c.reconcileResource(ctx, comp, prev, resource, current)
if err != nil {
return ctrl.Result{}, err
Expand All @@ -235,7 +182,7 @@ func (c *Controller) Reconcile(ctx context.Context, req reconstitution.Request)
return ctrl.Result{}, nil
}

func (c *Controller) reconcileResource(ctx context.Context, comp *apiv1.Composition, prev, resource *reconstitution.Resource, current *unstructured.Unstructured) (bool, error) {
func (c *Controller) reconcileResource(ctx context.Context, comp *apiv1.Composition, prev, resource *resource.Resource, current *unstructured.Unstructured) (bool, error) {
logger := logr.FromContextOrDiscard(ctx)
start := time.Now()
defer func() {
Expand Down Expand Up @@ -324,7 +271,7 @@ func (c *Controller) reconcileResource(ctx context.Context, comp *apiv1.Composit
return true, nil
}

func (c *Controller) getCurrent(ctx context.Context, resource *reconstitution.Resource) (*unstructured.Unstructured, error) {
func (c *Controller) getCurrent(ctx context.Context, resource *resource.Resource) (*unstructured.Unstructured, error) {
current := &unstructured.Unstructured{}
current.SetName(resource.Ref.Name)
current.SetNamespace(resource.Ref.Namespace)
Expand Down
15 changes: 9 additions & 6 deletions internal/controllers/reconciliation/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,19 @@ import (
apiv1 "github.com/Azure/eno/api/v1"
"github.com/Azure/eno/internal/flowcontrol"
"github.com/Azure/eno/internal/reconstitution"
"github.com/Azure/eno/internal/resource"
"github.com/Azure/eno/internal/testutil"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/client-go/util/workqueue"
)

func mapToResource(t *testing.T, res map[string]any) (*unstructured.Unstructured, *reconstitution.Resource) {
func mapToResource(t *testing.T, res map[string]any) (*unstructured.Unstructured, *resource.Resource) {
obj := &unstructured.Unstructured{Object: res}
js, err := obj.MarshalJSON()
require.NoError(t, err)

rr := &reconstitution.Resource{
rr := &resource.Resource{
Manifest: &apiv1.Manifest{Manifest: string(js)},
GVK: obj.GroupVersionKind(),
}
Expand All @@ -27,13 +28,15 @@ func mapToResource(t *testing.T, res map[string]any) (*unstructured.Unstructured

func setupTestSubject(t *testing.T, mgr *testutil.Manager) {
rswb := flowcontrol.NewResourceSliceWriteBufferForManager(mgr.Manager)
cache := reconstitution.NewCache(mgr.GetClient())
rateLimiter := workqueue.DefaultTypedItemBasedRateLimiter[reconstitution.Request]()

var cache resource.Cache
rateLimiter := workqueue.DefaultTypedItemBasedRateLimiter[resource.Request]()
queue := workqueue.NewTypedRateLimitingQueue(rateLimiter)
cache.SetQueue(queue)

err := New(mgr.Manager, Options{
Manager: mgr.Manager,
Cache: cache,
Cache: &cache,
WriteBuffer: rswb,
Downstream: mgr.DownstreamRestConfig,
Queue: queue,
Expand All @@ -43,6 +46,6 @@ func setupTestSubject(t *testing.T, mgr *testutil.Manager) {
})
require.NoError(t, err)

err = reconstitution.New(mgr.Manager, cache, queue)
err = reconstitution.New(mgr.Manager, &cache, queue)
require.NoError(t, err)
}
22 changes: 12 additions & 10 deletions internal/readiness/readiness.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package readiness

import (
"context"
"fmt"
"sort"
"time"

Expand All @@ -13,17 +14,18 @@ import (
"github.com/google/cel-go/cel"
)

// Env encapsulates a CEL environment for use in readiness checks.
type Env struct {
cel *cel.Env
var defaultEnv *cel.Env

func init() {
initDefaultEnv()
}

func NewEnv() (*Env, error) {
ce, err := cel.NewEnv(cel.Variable("self", cel.DynType))
func initDefaultEnv() {
var err error
defaultEnv, err = cel.NewEnv(cel.Variable("self", cel.DynType))
if err != nil {
return nil, err
panic(fmt.Sprintf("failed to create default CEL environment: %v", err))
}
return &Env{cel: ce}, nil
}

// Check represents a parsed readiness check CEL expression.
Expand All @@ -34,12 +36,12 @@ type Check struct {

// ParseCheck parses the given CEL expression in the context of an environment,
// and returns a reusable execution handle.
func ParseCheck(env *Env, expr string) (*Check, error) {
ast, iss := env.cel.Compile(expr)
func ParseCheck(expr string) (*Check, error) {
ast, iss := defaultEnv.Compile(expr)
if iss != nil && iss.Err() != nil {
return nil, iss.Err()
}
prgm, err := env.cel.Program(ast, cel.InterruptCheckFrequency(10))
prgm, err := defaultEnv.Program(ast, cel.InterruptCheckFrequency(10))
if err != nil {
return nil, err
}
Expand Down
12 changes: 3 additions & 9 deletions internal/readiness/readiness_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,9 @@ var evalCheckTests = []struct {
}

func TestEvalCheck(t *testing.T) {
env, err := NewEnv()
require.NoError(t, err)

for _, tc := range evalCheckTests {
t.Run(tc.Name, func(t *testing.T) {
r, err := ParseCheck(env, tc.Expr)
r, err := ParseCheck(tc.Expr)
require.NoError(t, err)

time, ok := r.Eval(context.Background(), tc.Resource)
Expand Down Expand Up @@ -286,11 +283,8 @@ func TestTimeouts(t *testing.T) {
}

func mustParse(expr string) *Check {
e, err := NewEnv()
if err != nil {
panic(err)
}
check, err := ParseCheck(e, expr)
initDefaultEnv() // it's possible that the tests variables are evaluated before the init function is called
check, err := ParseCheck(expr)
if err != nil {
panic(err)
}
Expand Down
Loading
Loading