Cyclops 4 HPC is the purpose built stack to support large HPC centers with resource accounting and billing of cluster as well as cloud resources.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

298 lines
8.1 KiB

package main
import (
"context"
"crypto/tls"
"reflect"
"strconv"
"time"
"github.com/segmentio/encoding/json"
"github.com/prometheus/client_golang/prometheus"
kafka "github.com/segmentio/kafka-go"
"github.com/Cyclops-Labs/cyclops-4-hpc.git/extensions/lexis/server/dbManager"
"github.com/Cyclops-Labs/cyclops-4-hpc.git/extensions/lexis/server/statusManager"
l "gitlab.com/cyclops-utilities/logging"
)
type kafkaFunction func(*dbManager.DbParameter, interface{}) error
type kafkaHandlerConf struct {
in []kafkaPackage
out []kafkaPackage
}
type kafkaPackage struct {
topic string
partition int
model interface{}
function kafkaFunction
channel chan interface{}
saveDB bool
}
// kafkaHandler job is to check the config that it receives and initialize the
// go rutines necesaries to satisfay the configuration it receives.
// Paramenters:
// - db: DbParameter to direct interaction with the database.
// - monit: statusManager parameter to interact with statusManager subsystem.
// - kH: kafkaHandlerConf struct with the specific configuration used by the
// service.
func kafkaHandler(db *dbManager.DbParameter, monit *statusManager.StatusManager, kH kafkaHandlerConf) {
l.Trace.Printf("[KAFKA] Initializing the receivers/senders according to the provided configuration.\n")
if kH.in != nil {
monit.InitEndpoint("kafka-receiver")
for _, p := range kH.in {
go kafkaReceiver(db, monit, p.topic, p.partition, p.model, p.function, p.saveDB)
}
}
if kH.out != nil {
monit.InitEndpoint("kafka-sender")
for _, p := range kH.out {
go kafkaSender(db, monit, p.topic, p.partition, p.channel)
}
}
}
// kafkaReceiver is the abstracted interface used to receive JSON data from kafka.
// The functions assumes that by default anything that comes from the kafka topic
// and validates against a specific data model has to be added to the db.
// Besides that it has the option to call an external function to interact with
// the data before putting it in the db.
// Parameters:
// - db: DbParameter to direct interaction with the database.
// - monit: statusManager parameter to interact with statusManager subsystem.
// - t: string containing the kafka-topic in use.
// - p: int containing the kafka-topic partition.
// - m: model to validate data against.
// - f: optional external function for more functionality.
// - saveDB: bool to control is the received data needs to be saved in the db.
func kafkaReceiver(db *dbManager.DbParameter, monit *statusManager.StatusManager, t string, p int, m interface{}, f kafkaFunction, saveDB bool) {
l.Trace.Printf("[KAFKA] Initializing kafka receiver for topic: %v.\n", t)
conf := kafka.ReaderConfig{
Brokers: cfg.Kafka.Brokers,
Topic: t,
Partition: p,
MinBytes: cfg.Kafka.MinBytes,
MaxBytes: cfg.Kafka.MaxBytes,
}
if cfg.Kafka.TLSEnabled {
dialer := &kafka.Dialer{
Timeout: 10 * time.Second,
DualStack: true,
TLS: &tls.Config{
MinVersion: tls.VersionTLS12,
CurvePreferences: []tls.CurveID{tls.CurveP521, tls.CurveP384, tls.CurveP256},
PreferServerCipherSuites: true,
InsecureSkipVerify: cfg.General.InsecureSkipVerify,
},
}
conf.Dialer = dialer
}
r := kafka.NewReader(conf)
defer r.Close()
if e := r.SetOffset(cfg.Kafka.Offset); e != nil {
l.Warning.Printf("[KAFKA] There was a problem when setting the offset, stopping the kafka handler NOW. Error: %v\n", e)
db.Metrics["kafka"].With(prometheus.Labels{"mode": "RECEIVER", "topic": t, "state": "FAIL: stream offset"}).Inc()
return
}
for {
callTime := time.Now()
monit.APIHit("kafka-receiver", callTime)
rm, e := r.ReadMessage(context.Background())
if e != nil {
l.Warning.Printf("[KAFKA] Error detected in the kafka stream. Error: %v\n", e)
db.Metrics["kafka"].With(prometheus.Labels{"mode": "RECEIVER", "topic": t, "state": "FAIL: stream"}).Inc()
monit.APIHitDone("kafka-receiver", callTime)
continue
}
o := reflect.New(reflect.TypeOf(m)).Interface()
if e := json.Unmarshal(rm.Value, &o); e == nil {
l.Info.Printf("[KAFKA] Relevant information detected in the stream: Topic: %v, partition: %v.\n", t, p)
db.Metrics["kafka"].With(prometheus.Labels{"mode": "RECEIVER", "topic": t, "state": "OK: object received"}).Inc()
if f != nil {
l.Info.Printf("[KAFKA] Function for specialized work detected. Starting its processing.\n")
if err := f(db, o); err != nil {
l.Warning.Printf("[KAFKA] There was a problem processing the model's specific function. Error: %v\n", err)
db.Metrics["kafka"].With(prometheus.Labels{"mode": "RECEIVER", "topic": t, "state": "FAIL: linked function"}).Inc()
} else {
db.Metrics["kafka"].With(prometheus.Labels{"mode": "RECEIVER", "topic": t, "state": "OK: linked function"}).Inc()
}
}
if saveDB {
l.Info.Printf("[KAFKA] Saving procesed record in the database.\n")
if err := db.Db.Create(o).Error; err != nil {
l.Warning.Printf("[KAFKA] There was a problem adding the record into the database. Error: %v\n", err)
db.Metrics["kafka"].With(prometheus.Labels{"mode": "RECEIVER", "topic": t, "state": "FAIL: db saving"}).Inc()
} else {
db.Metrics["kafka"].With(prometheus.Labels{"mode": "RECEIVER", "topic": t, "state": "OK: object db saved"}).Inc()
}
}
} else {
l.Warning.Printf("[KAFKA] The information in the stream does not fit the expected model, please check with the administrator. Error: %v\n", e)
db.Metrics["kafka"].With(prometheus.Labels{"mode": "RECEIVER", "topic": t, "state": "FAIL: stream-rubish"}).Inc()
}
monit.APIHitDone("kafka-receiver", callTime)
}
}
// kafkaSender is the abstracted interface handling the sending of data through
// kafka topics.
// Paramenters:
// - db: DbParameter to direct interaction with the database.
// - monit: statusManager parameter to interact with statusManager subsystem.
// - t: string containing the kafka-topic in use.
// - p: int containing the kafka-topic partition.
// - c: interface{} channel to receive the data that will be marshalled into
// JSON and then transmitted via kafka.
func kafkaSender(db *dbManager.DbParameter, monit *statusManager.StatusManager, t string, p int, c chan interface{}) {
l.Trace.Printf("[KAFKA] Initializing kafka sender for topic: %v.\n", t)
conf := kafka.WriterConfig{
Brokers: cfg.Kafka.Brokers,
Topic: t,
Balancer: &kafka.LeastBytes{},
}
if cfg.Kafka.TLSEnabled {
dialer := &kafka.Dialer{
Timeout: 10 * time.Second,
DualStack: true,
TLS: &tls.Config{
MinVersion: tls.VersionTLS12,
CurvePreferences: []tls.CurveID{tls.CurveP521, tls.CurveP384, tls.CurveP256},
PreferServerCipherSuites: true,
InsecureSkipVerify: cfg.General.InsecureSkipVerify,
},
}
conf.Dialer = dialer
}
w := kafka.NewWriter(conf)
defer w.Close()
for {
v, ok := <-c
if !ok {
l.Info.Printf("[KAFKA] The channel has problems or has been closed.\n")
db.Metrics["kafka"].With(prometheus.Labels{"mode": "SENDER", "topic": t, "state": "FAIL: channel"}).Inc()
break
}
m, e := json.Marshal(&v)
if e == nil {
callTime := time.Now()
monit.APIHit("kafka-sender", callTime)
l.Info.Printf("[KAFKA] Object received through the channel. Starting its processing.\n")
err := w.WriteMessages(context.Background(),
kafka.Message{
Key: []byte(t + "-" + strconv.Itoa(p)),
Value: m,
},
)
if err != nil {
l.Warning.Printf("[KAFKA] There was a problem when sending the record through the stream. Error: %v\n", err)
db.Metrics["kafka"].With(prometheus.Labels{"mode": "SENDER", "topic": t, "state": "FAIL: stream"}).Inc()
} else {
l.Info.Printf("[KAFKA] Object added to the stream succesfully. Topic: %v.\n", t)
db.Metrics["kafka"].With(prometheus.Labels{"mode": "SENDER", "topic": t, "state": "OK: object sent"}).Inc()
}
monit.APIHitDone("kafka-sender", callTime)
} else {
l.Warning.Printf("[KAFKA] The information to be sent into the stream cannot be marshalled, please check with the administrator. Error: %v\n", e)
db.Metrics["kafka"].With(prometheus.Labels{"mode": "SENDER", "topic": t, "state": "FAIL: JSON Marshalling"}).Inc()
}
}
}