Cyclops 4 HPC is the purpose built stack to support large HPC centers with resource accounting and billing of cluster as well as cloud resources.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

770 lines
21 KiB

package dbManager
import (
"errors"
"sync"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/remeh/sizedwaitgroup"
"gitlab.com/cyclops-utilities/datamodels"
"github.com/Cyclops-Labs/cyclops-4-hpc.git/services/eventsengine/models"
l "gitlab.com/cyclops-utilities/logging"
"gorm.io/driver/postgres"
"gorm.io/gorm"
)
const (
statusDuplicated = iota
statusFail
statusMissing
statusOK
innerIncluded
lowerIncluded
notIncluded
outterIncluded
upperIncluded
stateActive = "active"
stateError = "error"
stateInactive = "inactive"
stateSuspended = "suspended"
stateTerminated = "terminated"
bigBang = int64(0)
endOfTime = int64(32503680000)
)
// DbParameter is the struct defined to group and contain all the methods
// that interact with the database.
// On it there is the following parameters:
// - connStr: strings with the connection information to the database
// - Db: a gorm.DB pointer to the db to invoke all the db methods
type DbParameter struct {
connStr string
Db *gorm.DB
Metrics map[string]*prometheus.GaugeVec
}
var (
eeTime float64
eeTotal float64
)
// New is the function to create the struct DbParameter.
// Parameters:
// - dbConn: strings with the connection information to the database
// - tables: array of interfaces that will contains the models to migrate
// to the database on initialization
// Returns:
// - DbParameter: struct to interact with dbManager functionalities
func New(dbConn string, tables ...interface{}) *DbParameter {
l.Trace.Printf("[DB] Gerenating new DBParameter.\n")
var (
dp DbParameter
err error
)
dp.connStr = dbConn
dp.Db, err = gorm.Open(postgres.Open(dbConn), &gorm.Config{})
if err != nil {
l.Error.Printf("[DB] Error opening connection. Error: %v\n", err)
}
l.Trace.Printf("[DB] Migrating tables.\n")
//Database migration, it handles everything
dp.Db.AutoMigrate(tables...)
l.Trace.Printf("[DB] Generating hypertables.\n")
// Hypertables creation for timescaledb in case of needed
//dp.Db.Exec("SELECT create_hypertable('" + dp.Db.NewScope(&models.TABLE).TableName() + "', 'TIMESCALE-ROW-INDEX');")
return &dp
}
// AddEvent job is to register the new event on its corresponding resource state.
// Parameters:
// - event: a reference containing the new event in the resource.
// Returns:
// - e: error raised in case of problems
func (d *DbParameter) AddEvent(event models.Event) (e error) {
l.Trace.Printf("[DB] Attempting to register a new in the resource [ %v ] from the account [ %v ].\n", event.ResourceID, event.Account)
now := time.Now().UnixNano()
var states []*models.State
var s, state models.State
patternSearch := models.State{
Account: event.Account,
ResourceID: event.ResourceID,
Region: event.Region,
}
patternUpdate := models.State{
TimeFrom: event.TimeFrom,
EventTime: event.EventTime,
LastEvent: func(s string) *string { return &s }(stateTerminated),
}
s = patternSearch
s.MetaData = event.MetaData
// First check is there's anything from same resourceid with different metadata,
// if so, terminate them all
if e = d.Db.Where(patternSearch).Not(models.State{
MetaData: event.MetaData,
}).Not(models.State{
LastEvent: func(s string) *string { return &s }(stateTerminated),
}).Find(&states).Error; e != nil {
l.Warning.Printf("[DB] Something went wrong while checking if the resource already has registered states in the system. Error: %v\n", e)
}
if len(states) != 0 {
l.Debug.Printf("[DB] We found [ %v ] states linked to the resource [ %v ] proceeding to mark them as terminated...\n", len(states), event.ResourceID)
if e = d.UpdateStates(states, patternUpdate); e != nil {
l.Warning.Printf("[DB] Something went wrong while terminating the other states linked to the resource [ %v ]. Error: %v\n", event.ResourceID, e)
} else {
d.Metrics["count"].With(prometheus.Labels{"type": "States updated"}).Add(float64(len(states)))
}
}
// check if Status is the same
// if so, update the last time event field
// if diferent, copy actual state to history, then change state into new values from event
if r := d.Db.Where(&s).First(&state).Error; errors.Is(r, gorm.ErrRecordNotFound) {
l.Debug.Printf("[DB] The resource [ %v ] doesn't have a linked state in the system, proceeding to generate one...\n", event.ResourceID)
s.ResourceName = event.ResourceName
s.TimeTo = endOfTime
s.TimeFrom = *event.EventTime
s.ResourceType = event.ResourceType
s.LastEvent = event.LastEvent
s.EventTime = event.EventTime
if e = d.Db.Create(&s).Error; e != nil {
l.Warning.Printf("[DB] Something went wrong while trying to create the state. Error: %v\n", e)
} else {
d.Metrics["count"].With(prometheus.Labels{"type": "States created"}).Inc()
}
} else {
l.Debug.Printf("[DB] The resource [ %v ] seems to be already in the system with state [ %v ], proceeding to update its state...\n", event.ResourceID, *state.LastEvent)
if *state.LastEvent == *event.LastEvent {
if e = d.Db.Model(&state).Updates(models.State{
EventTime: event.EventTime,
}).Error; e != nil {
l.Warning.Printf("[DB] Something went wrong while updating the state. Error: %v\n", e)
} else {
d.Metrics["count"].With(prometheus.Labels{"type": "States updated"}).Inc()
}
} else {
l.Debug.Printf("[DB] The resource [ %v ] seems to have a previous different state: [ %v ] -> [ %v ], proceeding to update it...\n", event.ResourceID, *state.LastEvent, *event.LastEvent)
h := state
h.TimeTo = *event.EventTime
if e = d.AddToHistory(h); e != nil {
l.Warning.Printf("[DB] Something went wrong while saving in the history the previous state. Error: %v\n", e)
}
if e = d.Db.Model(&state).Updates(models.State{
EventTime: event.EventTime,
LastEvent: event.LastEvent,
TimeFrom: *event.EventTime,
}).Error; e != nil {
l.Warning.Printf("[DB] Something went wrong while updating the state. Error: %v\n", e)
} else {
d.Metrics["count"].With(prometheus.Labels{"type": "States updated"}).Inc()
}
}
}
eeTotal++
eeTime += float64(time.Now().UnixNano() - now)
d.Metrics["time"].With(prometheus.Labels{"type": "Events average registering time"}).Set(eeTime / eeTotal / float64(time.Millisecond))
d.Metrics["count"].With(prometheus.Labels{"type": "Total events processed"}).Inc()
d.Metrics["count"].With(prometheus.Labels{"type": "Total " + event.ResourceType + " events processed"}).Inc()
return
}
// AddToHistory job is to archive the provided state as a new event in the
// history of changes linked to the state provided.
// Parameters:
// - state: State to be archived as a new event in the history of changes of
// the State.
// Returns:
// - e: error raised in case of problems
func (d *DbParameter) AddToHistory(state models.State) (e error) {
l.Trace.Printf("[DB] Attempting to add the event [ %v ] to the history of changes of the state. Account [ %v ], Resource [ %v ].\n", *state.LastEvent, state.Account, state.ResourceID)
h := models.Event{
Account: state.Account,
EventTime: state.EventTime,
LastEvent: state.LastEvent,
MetaData: state.MetaData,
Region: state.Region,
ResourceID: state.ResourceID,
ResourceName: state.ResourceName,
ResourceType: state.ResourceType,
TimeFrom: state.TimeFrom,
TimeTo: state.TimeTo,
}
if e = d.Db.Create(&h).Error; e != nil {
l.Warning.Printf("[DB] Something went wrong while adding the event to the history of the state. Error: %v\n", e)
} else {
d.Metrics["count"].With(prometheus.Labels{"type": "Histories updated"}).Inc()
}
return
}
// DatesWithin job is to check whether the analized time-window overlaps or not,
// reporting how this happens and how much overlapping exists.
// Parameters:
// - EventFrom: int representing the start of the interval for the event.
// - EventTp: int representing the end of the interval for the event.
// - TimeFrom: int representing the start of the time-window.
// - TimeTo: int representing the end of the time-window.
// Returns:
// - inc: const(int) representing the state of the time-window vs the interval.
// - diff: int representing the region of overlapping between both time-windows.
func (d *DbParameter) DatesWithin(EventFrom, EventTo, TimeFrom, TimeTo int64) (inc int, diff int64) {
l.Trace.Printf("[DB] Attempting to match the time-window with the state/event time interval.\n")
if EventTo <= TimeFrom {
l.Debug.Printf("[DB] Time-window is before the time interval of the state/interval.\n")
inc = notIncluded
return
}
if EventFrom >= TimeTo {
l.Debug.Printf("[DB] Time-window is after the time interval of the state/interval.\n")
inc = notIncluded
return
}
if EventFrom <= TimeFrom && TimeTo <= EventTo {
l.Debug.Printf("[DB] Time-window is completely within the time interval of the state/interval.\n")
// INNER Time Window
inc = innerIncluded
diff = TimeTo - TimeFrom
return
}
if EventFrom <= TimeFrom && EventTo <= TimeTo {
l.Debug.Printf("[DB] Time-window starts within the time interval of the state/interval and goes forward in the timeline.\n")
// UPPER Time Window
inc = upperIncluded
diff = EventTo - TimeFrom
return
}
if TimeFrom <= EventFrom && TimeTo <= EventTo {
l.Debug.Printf("[DB] Time-window finished within the time interval of the state/interval and starts before it in the timeline.\n")
// LOWER Time Window included
inc = lowerIncluded
diff = TimeTo - EventFrom
return
}
if TimeFrom <= EventFrom && EventTo <= TimeTo {
l.Debug.Printf("[DB] The time interval of the state/interval is within the time-windows (AKA starts and finish outside the interval).\n")
// OUTTER Time Window
inc = outterIncluded
diff = EventTo - EventFrom
return
}
l.Warning.Printf("[DB] Time-window is from the 4th dimension, reach the administrator, the CERN, and the whole scientific community ASAP!\n")
inc = notIncluded
return
}
// GetAllStates job is to retrieve a snapshot of all the states present in the
// system at the time of invokation-
// Returns:
// - slice of States containing all the resources state in the system
// - error raised in case of problems
func (d *DbParameter) GetAllStates() ([]*models.State, error) {
l.Trace.Printf("[DB] Attempting to retrieve all the states present in the system.\n")
var s []*models.State
var e error
if e := d.Db.Find(&s).Error; e != nil {
l.Warning.Printf("[DB] Something went wrong while retrieving the states in the system. Error: %v\n", e)
}
return s, e
}
// GetState job is to retrieve the actual state of every resource linked to the
// provided account.
// Parameters:
// - ac: string representing the ID of the account to be processed.
// Returns:
// - slice of State containing the states kept in the system of the resources
// linked to the provided account
// - error raised in case of problems
func (d *DbParameter) GetState(ac string) ([]*models.State, error) {
l.Trace.Printf("[DB] Attempting to get the actual state of the account [ %v ].\n", ac)
var s []*models.State
var e error
if e := d.Db.Where(&models.State{Account: ac}).Find(&s).Error; e != nil {
l.Warning.Printf("[DB] Something went wrong while retrieving the state of the account [ %v ]. Error: %v\n", ac, e)
}
return s, e
}
// ListState job is to retrieve the list of not terminated states in the system
// for the requested region and resource type..
// Parameters:
// - metric
// - region
// Returns:
// - slice of State containing the states kept in the system.
// - error raised in case of problems
func (d *DbParameter) ListState(metric, region string) ([]*models.MinimalState, error) {
l.Trace.Printf("[DB] Attempting to get the list of not terminated [ %v ] states for region [ %v ].\n", metric, region)
var s []*models.MinimalState
var e error
filter := "terminated"
if e := d.Db.Table("states").Where(&models.State{Region: region, ResourceType: metric}).Not(&models.State{LastEvent: &filter}).Order("account").Scan(&s).Error; e != nil {
l.Warning.Printf("[DB] Something went wrong while retrieving the list of states from the system. Error: %v\n", e)
}
return s, e
}
// GetAllHistory job is to retrieve the complete history of changes in the state
// of the provided resource linked to the given account.
// Parameters:
// - ac: string representing the ID of the account to be processed.
// - u: Use reference used as the pattern to match the specific state needed.
// Returns:
// - slice of Event containing the changes in the history of the provided account.
// - error raised in case of problems
func (d *DbParameter) GetAllHistory(ac string, u models.Use) ([]*models.Event, error) {
l.Trace.Printf("[DB] Attempting to retrieve the full history of the resource [ %v ] from account [ %v ].\n", u.ResourceID, ac)
var ev []*models.Event
var e error
if e := d.Db.Where(&models.Event{
Account: ac,
ResourceID: u.ResourceID,
ResourceName: u.ResourceName,
ResourceType: u.ResourceType,
MetaData: u.MetaData,
}).Find(&ev).Error; e != nil {
l.Warning.Printf("[DB] Something went wrong while retrieving the history of the resource [ %v ]. Error: %v\n", u.ResourceID, e)
}
return ev, e
}
// GetHistory job is to retrieve the history of changes in the state of the
// provided account during the given time-window, with the posibility of filter
// the result by the type of resource.
// Parameters:
// - ac: string representing the ID of the account to be processed.
// - ty: string representing the resource type to use as filter
// - rg: string representing the resource region to use as filter
// - from: int representing the start of the time-window.
// - to: int representing the end of the time-window.
// Returns:
// - slice of Event containing the changes in the history of the provided account.
// - error raised in case of problems
func (d *DbParameter) GetHistory(ac, ty, rg string, from, to int64) ([]*models.Event, error) {
l.Trace.Printf("[DB] Attempting to retrieve the history of the account [ %v ].\n", ac)
var ev []*models.Event
var e error
whereString := d.Db.NamingStrategy.ColumnName("", "TimeFrom") + " >= ? AND " + d.Db.NamingStrategy.ColumnName("", "TimeTo") + " <= ?"
if e := d.Db.Where(whereString, from, to).Where(&models.Event{
Account: ac,
ResourceType: ty,
Region: rg,
}).Find(&ev).Error; e != nil {
l.Warning.Printf("[DB] Something went wrong while retrieving the history of the account [ %v ]. Error: %v\n", ac, e)
}
return ev, e
}
// GetUsage job is to compute the usage accumulated by the provided account
// considering the time in the actual state and the history of changes that
// the account resources have had during the provided time-window, with the
// posibility of filtering by resource type.
// Parameters:
// - ac: string representing the ID of the account to be processed.
// - ty: string representing the resource type to use as filter
// - rg: string representing the resource region to use as filter
// - from: int representing the start of the time-window.
// - to: int representing the end of the time-window.
// Returns:
// - Usage reference containing the usage of the account in the system in the
// provided time-window.
// - error raised in case of problems
func (d *DbParameter) GetUsage(ac, ty, rg string, from, to int64) (*models.Usage, error) {
l.Trace.Printf("[DB] Attempting to compute the usage of the account [ %v ].\n", ac)
var u models.Usage
var e error
// First we check if the actual State is in the time-window
states, e := d.GetState(ac)
u.AccountID = ac
u.TimeFrom = from
u.TimeTo = to
for i := range states {
if ty != "" && states[i].ResourceType != ty {
l.Trace.Printf("[DB] Resource filtering active. Resource [ %v ] not matching criteria [ %v ].\n", states[i].ResourceType, ty)
continue
}
if rg != "" && states[i].Region != rg {
l.Trace.Printf("[DB] Resource filtering active. Resource [ %v ] not matching criteria [ %v ].\n", states[i].ResourceType, rg)
continue
}
var use models.Use
use.Region = states[i].Region
use.ResourceID = states[i].ResourceID
use.ResourceName = states[i].ResourceName
use.ResourceType = states[i].ResourceType
use.MetaData = states[i].MetaData
use.Unit = "seconds"
usage := make(datamodels.JSONdb)
usage[*states[i].LastEvent] = int64(0)
// If it is, then we check is the time window goes before the actual state,
// if not, we create the usage and done
// If it does..
if inc, contrib := d.DatesWithin(states[i].TimeFrom, states[i].TimeTo, from, to); inc == innerIncluded || inc == upperIncluded {
l.Trace.Printf("[DB] Resource [ %v ] state contained within the time-window, skipping history check.\n", states[i].ResourceID)
usage[*states[i].LastEvent] = usage[*states[i].LastEvent].(int64) + contrib
} else {
if inc != notIncluded {
l.Trace.Printf("[DB] Resource [ %v ] state not (fully) contained within the time-window, checking history of the resource.\n", states[i].ResourceID)
usage[*states[i].LastEvent] = usage[*states[i].LastEvent].(int64) + contrib
}
// We get the History of the State,
// for each entry we check if the time-windows covers it,
// if so we add the contribution to the usage
// if not, we skip it
history, e := d.GetAllHistory(ac, use)
if e != nil {
l.Warning.Printf("[DB] Something went wrong while retrieving the resource [ %v ] history. Error: %v\n", states[i].ResourceID, e)
} else {
for id := range history {
if included, contribution := d.DatesWithin(history[id].TimeFrom, history[id].TimeTo, from, to); included != notIncluded {
if _, ok := usage[*history[id].LastEvent]; !ok {
usage[*history[id].LastEvent] = int64(0)
}
l.Debug.Printf("[DB] Resource [ %v ] history register contained within the time-window.\n", states[i].ResourceID)
usage[*history[id].LastEvent] = usage[*history[id].LastEvent].(int64) + contribution
}
}
}
}
delete(usage, stateTerminated)
for id := range usage {
if usage[id].(int64) == int64(0) {
delete(usage, id)
}
}
if len(usage) != 0 {
use.UsageBreakup = usage
u.Usage = append(u.Usage, &use)
} else {
l.Warning.Printf("[DB] The resource [ %v ] has no data in the specified time-window, skipping it's usage report.\n", states[i].ResourceID)
}
}
return &u, e
}
// GetSystemUsage job is to compute the usage accumulated in the whole system
// by account considering the time in the actual state and the history of changes
// that the account resources have had during the provided time-window, with the
// posibility of filtering by resource type.
// Parameters:
// - from: int representing the start of the time-window.
// - to: int representing the end of the time-window.
// - ty: string representing the resource type to use as filter
// - rg: string representing the resource region to use as filter
// Returns:
// - slice of Usages containing the usage per account in the system in the
// provided time-window.
// - error raised in case of problems
func (d *DbParameter) GetSystemUsage(from, to int64, ty, rg string) ([]*models.Usage, error) {
l.Trace.Printf("[DB] Attempting to compute the system usage.\n")
var u []*models.Usage
var e error
ids := make(map[string]struct{})
mutex := &sync.Mutex{}
swg := sizedwaitgroup.New(8)
if states, e := d.GetAllStates(); e != nil {
l.Warning.Printf("[DB] Something went wrong while retrieving the states recorded in the system. Error: %v\n", e)
} else {
for index := range states {
// Goroutines start
swg.Add()
go func(index int) {
i := index
defer swg.Done()
mutex.Lock()
_, exists := ids[states[i].Account]
mutex.Unlock()
if exists {
l.Trace.Printf("[DB] Account [ %v ] already processed, skipping...\n", states[i].Account)
return
}
mutex.Lock()
ids[states[i].Account] = struct{}{}
mutex.Unlock()
if su, e := d.GetUsage(states[i].Account, ty, rg, from, to); e != nil || su.Usage == nil {
l.Warning.Printf("[DB] Something went wrong while retrieving the usage of the account [ %v ]. Error: %v\n", states[i].Account, e)
} else {
mutex.Lock()
u = append(u, su)
mutex.Unlock()
}
}(index)
}
}
swg.Wait()
return u, e
}
// UpdateStates job is to process the provided states given a pattern, ranging
// through the given states, records the change in the history of the state and
// then updates the state with in the system according to the pattern.
// Parameters:
// - states: slice of State to be updated in the system.
// - pattern: a State reference containing the changes to be done previous to
// the update in the system.
// Returns:
// - error raised in case of problems
func (d *DbParameter) UpdateStates(states []*models.State, pattern models.State) error {
l.Trace.Printf("[DB] Attempting to update [ %v ] states.\n", len(states))
var counth, countdb int = 0, 0
var e error
for i := range states {
l.Debug.Printf("[DB] Processing state for resource [ %v ] from account [ %v ].\n", states[i].ResourceID, states[i].Account)
s := states[i]
s.TimeTo = pattern.TimeFrom
if e = d.AddToHistory(*s); e != nil {
l.Warning.Printf("[DB] Something went wrong while saving the history of the state. Error: %v\n", e)
counth++
}
if e = d.Db.Model(states[i]).Updates(pattern).Error; e != nil {
l.Warning.Printf("[DB] Something went wrong while updating the state in the system. Error: %v\n", e)
countdb++
}
}
l.Trace.Printf("[DB] Update of states finished with [ %v ] errors while saving the history and [ %v ] while updating the states\n", counth, countdb)
return e
}