Cyclops 4 HPC is the purpose built stack to support large HPC centers with resource accounting and billing of cluster as well as cloud resources.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

282 lines
7.6 KiB

package cacheManager
import (
"fmt"
"strings"
"sync"
"time"
"github.com/prometheus/client_golang/prometheus"
l "gitlab.com/cyclops-utilities/logging"
)
// cacheFetchFunction it will need the id to retrieve and as and option a Bearer
// Keycloak token can be provided when called.
// It shall return the object and errors in case of problems.
type cacheFetchFunction func(interface{}, string) (interface{}, error)
// cache struct is the main "object" which handles the cache and its items.
// It also contains a map with the conversions of interfaces to strings.
type cache struct {
////conversionDictionary map[string]string
data map[string]*cacheItem
fetchers map[string]cacheFetchFunction
mutex sync.RWMutex
}
// cacheItem struct referes to the data of each element saved in the cache.
type cacheItem struct {
fetcher cacheFetchFunction
lastUpdate time.Time
value interface{}
}
// CacheManager is the struct defined to group and contain all the methods
// that interact with the caching mechanism.
type CacheManager struct {
APIKey string
duration time.Duration
metrics *prometheus.GaugeVec
store cache
}
// New is the function to create the struct CacheManager.
// Parameters:
// - t: a time.Duration with the max duration alive of the cache elements.
// - k: string containing the APIKey/token in case of need.
// Returns:
// - CacheManager: struct to interact with CacheManager subsystem functionalities.
func New(metrics *prometheus.GaugeVec, t time.Duration, k string) *CacheManager {
l.Trace.Printf("[CACHE] Initializing the cache service.\n")
c := CacheManager{
APIKey: k,
duration: t,
metrics: metrics,
store: cache{
data: make(map[string]*cacheItem),
fetchers: make(map[string]cacheFetchFunction),
},
}
return &c
}
// Add function job is to insert a new model in the cache.
// What it does is link the model with a fetching function and, if wanted, with
// a plain text name, so later in order to retrieve things from the cache they
// can be refereced either by the struct model or the plain text name.
// Paramenters:
// - plainName: a case insensitive name/alias to retrieve the data.
// - fetcher: the cacheFetchFunction used to retrieve the data.
func (c *CacheManager) Add(plainName string, fetcher cacheFetchFunction) {
l.Trace.Printf("[CACHE] Adding a new object fetcher in the cache.\n")
key := strings.ToUpper(plainName)
c.store.mutex.Lock()
c.store.fetchers[key] = fetcher
c.store.mutex.Unlock()
c.metrics.With(prometheus.Labels{"state": "OK", "resource": "Models in Cache"}).Inc()
return
}
// fetch function job is to retrieve a new and updated copy of the remote object.
// Paramenters:
// - item: a string used as key-value in the cache storage to identify the item
// that is going to be updated.
// - token: Keycloak Bearer token, completely optional.
// Returns:
// - e: error in case of something went wrong while setting up the new association.
func (c *CacheManager) fetch(item string, token string) (e error) {
l.Trace.Printf("[CACHE] Fetching the item [ %v ] from the remote location.\n", item)
c.store.mutex.RLock()
object := c.store.data[item]
c.store.mutex.RUnlock()
id := strings.SplitN(item, "-", 2)[1]
uValue, e := object.fetcher(id, token)
if e == nil {
l.Trace.Printf("[CACHE] Item [ %v ] retrieved from the remote location and saved in the cache.\n", item)
object.value = uValue
object.lastUpdate = time.Now()
} else {
l.Warning.Printf("[CACHE] Something went wrong while retrieving the item. Error: %v\n", e)
}
return
}
// fetch function job is to create the new item in the cache and retrieve a new
// and updated initial copy of the remote object to be saved in the cache.
// Paramenters:
// - item: a string used as key-value in the cache storage to identify the item
// that is going to be updated.
// - token: Keycloak Bearer token, completely optional.
// Returns:
// - e: error in case of something went wrong while setting up the new association.
func (c *CacheManager) init(item string, token string) (e error) {
l.Trace.Printf("[CACHE] Fetching the item [ %v ] from the remote location.\n", item)
key := strings.Split(item, "-")[0]
id := strings.SplitN(item, "-", 2)[1]
uValue, e := c.store.fetchers[key](id, token)
if e == nil {
l.Trace.Printf("[CACHE] Item [ %v ] retrieved from the remote location and saved in the cache.\n", item)
i := cacheItem{
fetcher: c.store.fetchers[key],
lastUpdate: time.Now(),
value: uValue,
}
c.store.mutex.Lock()
c.store.data[item] = &i
c.store.mutex.Unlock()
} else {
l.Warning.Printf("[CACHE] Something went wrong while retrieving the item. Error: %v\n", e)
}
return
}
// key is a function to ensure that the creation of the the item key for the
// cache is consistent across all the functions.
// Paramenters:
// - id: the reference id of the object to be retrieved
// - model: the alias text used to identify the source of objects.
// Returns:
// - s: the key string
func (c *CacheManager) key(id interface{}, model string) (s string) {
s = fmt.Sprintf("%v-%v", strings.ToUpper(model), id)
return
}
// Get function job is to retrieve an object from the cache or fetch it from the
// source and upgrade the copy in the cache in case the expiration time has been
// exceeded.
// Paramenters:
// - id: the reference id of the object.
// - model: the text alias set to reference the model.
// - token: Keycloak Bearer token, completely optional.
// Returns:
// - The object associated with the request
// - An error raised in case something went wrong while retrieving the object.
func (c *CacheManager) Get(id interface{}, model string, token string) (interface{}, error) {
l.Trace.Printf("[CACHE] Retrieving object [ %v, %v ] from the cache.\n", id, model)
item := c.key(id, model)
c.store.mutex.RLock()
object, exists := c.store.data[item]
c.store.mutex.RUnlock()
if !exists {
l.Trace.Printf("[CACHE] Object [ %v ] first time requested, including in the cache.\n", item)
if e := c.init(item, token); e != nil {
l.Warning.Printf("[CACHE] Something went wrong while adding the new item [ %v ] to the cache. Error: %v\n", item, e)
c.metrics.With(prometheus.Labels{"state": "FAIL", "resource": "Total objects cached"}).Inc()
return nil, e
}
l.Trace.Printf("[CACHE] Object [ %v ] retrieved from the cache.\n", item)
c.store.mutex.RLock()
o := c.store.data[item].value
c.store.mutex.RUnlock()
c.metrics.With(prometheus.Labels{"state": "OK", "resource": "Total objects cached"}).Inc()
return o, nil
}
l.Trace.Printf("[CACHE] Object [ %v ] exists in the cache.\n", item)
c.store.mutex.RLock()
diff := (time.Now()).Sub(c.store.data[item].lastUpdate)
c.store.mutex.RUnlock()
if diff <= c.duration {
l.Trace.Printf("[CACHE] Object [ %v ] cache hasn't expired yet.\n", item)
c.metrics.With(prometheus.Labels{"state": "OK", "resource": "Total objects retrieved from cache"}).Inc()
return object.value, nil
}
l.Warning.Printf("[CACHE] Object [ %v ] cache has expired. Starting the upgrade.\n", item)
if e := c.fetch(item, token); e != nil {
l.Warning.Printf("[CACHE] Something went wrong while fetching the updated data for the object [ %v ] to the cache. Error: %v\n", item, e)
c.metrics.With(prometheus.Labels{"state": "FAIL", "resource": "Total objects refreshed"}).Inc()
return nil, e
}
l.Trace.Printf("[CACHE] Object [ %v ] updated retrieved from the cache.\n", item)
c.store.mutex.RLock()
o := c.store.data[item].value
c.store.mutex.RUnlock()
c.metrics.With(prometheus.Labels{"state": "OK", "resource": "Total objects refreshed"}).Inc()
c.metrics.With(prometheus.Labels{"state": "OK", "resource": "Total objects retrieved from cache"}).Inc()
return o, nil
}