Hoy no vengo a ponerme filosófico, todos conocemos la famosa obra de Franz Kafka, La metamorfosis, aunque seguro que no tantos la hemos leído. Pero no, esto es un blog sobre Go y por ello venimos a hablar de como utilizar Apache Kafka en Go.
Llevaba tiempo queriendo adentrarme en probar este software, y por distintas razones no había tenido la oportunidad, así que decidí en mis vacaciones crear un pequeño chatroom para poder probar como funcionaba con Go y divertirme en el proceso, además poder compartir con vosotros todo lo aprendido.
Pero vayamos por partes, imagino que no todos conocéis qué es Apache Kafka, algunos imagino que no habréis ni oido hablar sobre él, así que antes de meternos a picar como si no hubiera un mañana, pasemos a entender qué es y como funciona Apache Kafka.
Qué es Apache Kafka
Apache Kafka es una plataforma distribuida de transmisión de datos que permite publicar, almacenar y procesar flujos de registros, y suscribirse a ellos, en tiempo real. […]
Apache Kafka ha sido diseñado para manejar flujos ingentes de datos y distribuirlos a la vez entre diferentes usuarios. Apache Kafka es un proyecto open source, creado por LinkedIn y cedido a la Apache Software Foundation, como alternativa a otros sistemas de mensajería, esta escrito en Scala y Java.
Podríamos decir que Kafka es un sistema de mensajería, pero realmente nos estaríamos quedando cortos con esa descripción, seguramente ahora os habrá venido a la mente un famoso sistema de mensajería como es RabbbitMQ, muy útil para manejar mensajes de una manera sencilla, pero si habéis trabajado bastante con él veréis que en ciertos aspectos es algo limitado.
Lo que nos encontramos con RabbitMQ es un sistema de colas de mensajes, que permite comunicarse con multitud de actores, de forma rápida segura y asíncrona, es capaz de gestionar unos 20 mil eventos por segundo, y no soporta baja latencia. Por el contrario, como hemos dicho Kafka está diseñado para ser un bus de mensajes, donde soporta grandes entradas de datos, permite tener grupos de consumidores, que consumirán mensajes a diferente ritmo, es capaz de procesar 100 mil eventos por segundo y soporta baja latencia.
¿Eso quiere decir que debo usar Kafka ahora para todos mis proyectos que impliquen manejo de mensajes? No, ni mucho menos, siempre dependerá de cada proyecto, pero este no es el caso de nuestro artículo, hay ríos de tinta sobre softwares de mensajerías con los cuales podrás llegar a discernir cual se adapta mejor a tus necesidades.
Kafka resuelve muy bien los siguientes escenarios:
- Mensajería
- Almacenar la actividad de usuarios de una web
- Metricas
- Sistema de log
- Stream processing
- Event sourcing
- Commit-Log
Cómo funciona Apache Kafka
Kafka esta diseñado para soportar un alto volumen de mesajes pub-sub y streams. Kafka provee de un sistema de almacenamiento de mensajes de larga duración, similar a los logs, puedes ejecutarlo en una sola instancia, pero ha sido diseñado para ser distribuido con lo cual suele correr en un cluster, y almacena los registros en categorías llamadas topic
.
Kafka almacena los registros dentro de diferentes particiones dentro de un mismo topic
. Un topic
, es una categoría sobre el cual los registros serán publicados, con lo cual **Kafka es capaz de escribir en diferentes colas que tengan el mismo topic
.
Además Kafka nos permite decirle en cual de sus particiones queremos escribir dentro de un mismo topic
. Imaginémonos una cajonera, donde la cajonera será nuestro topic
y los cajones son nuestras particiones, nosotros podemos decidir en que parte vamos a guardar nuestra ropa ¿no?, pues igual pasa con los datos en Kafka.
Al igual que hemos visto para escribir, para leer también podemos acceder a una determinada partición dentro del topic
, si volvemos al ejemplo de nuestra cajonera, podremos abrir el cajón que queramos y coger la ropa que hay en él. Así pues podemos crear diferentes workers
que se suscriban a una partición determinada utilizando un consumer group
.
Nuevo término, consumer group
, ¿qué es?, es como una etiqueta dentro de un grupo de consumers
. Cada consumer
vinculado a la misma etiqueta consumirá diferentes mensajes. Vale, volvamos a la cajonera, hay termitas, y se están cargando el mueble y tenemos que sacar toda la ropa, pero es que hay mucha ropa y nosotros solos tardaremos una eternidad, así que pedimos ayuda a nuestros amigos, y hacemos grupos de por ejemplo 3 personas, y nombramos nuestros grupos, “blanco”, “negro”, “color”, con lo cual ahora en vez de ser 3 personas individuales seremos una sola entidad.
Además igual que sucede en los grupos mencionados, que podemos repartirnos las tareas entre los miembros, Kafka actúa de la misma manera, es decir puede repartir el consumo de mensajes entre los consumers
de cada consumer group
.
Y hasta aquí la teoría, la cual en mi opinión es la básica necesaria para poder adentrarnos en este maravilloso software, llamado Kafka, así que ahora empieza la diversión vamos a picar un poco de Go.
Preparando Kafka
No vamos a entrar en gran detalle en la parte de docker
que vamos a utilizar para nuestro artículo, ya que no viene al caso. Decir que hemos partido de un docker-compose.yml
de ejemplo creado por Matt Howlett, aunque hemos actualizado las versiones ya que no eran compatibles a día de hoy con las librerías que se usan en Go.
Podéis encontrar nuestro docker-compose.yml
en el repositorio que os indicaremos al final del artículo.
Para levantarlo necesitaremos indicar la IP de nuestra máquina:
$ MY_IP=your-ip docker-compose up
Con esto ya tendremos funcionando Kafka en nuestra máquina.
Lo siguiente que haremos es crear un topic
que será con el que trabajemos, si ejecutáis el siguiente comando creará el topic fogo-chat
con 4 particiones
y un factor de replicación de 2.
$ docker run --net=host --rm confluentinc/cp-kafka:5.0.0 kafka-topics --create --topic fogo-chat --partitions 4 --replication-factor 2 --if-not-exists --zookeeper localhost:32181
Ahora sí, ya tenemos todo listo para empezar a picar nuestro chatroom
ChatRoom
Para ver el poder y rapidez de Kafka no quería limitarme a un ejemplo simple, aunque al final no deja de ser un ejemplo sencillo, pero se compone de distintas partes y es algo más realista que un simple, publicar-consumir, además con el docker-compose.yml
que os hemos proporcionado tendréis Kafka corriendo en cluster, cierto es que para desarrollo con un solo nodo nos es más que suficiente, pero si queremos probarlo todo bien, hagámoslo lo más real posible.
La aplicación la he dividido en dos partes, un server, escrito en gin-gonic, que es el encargado de publicar los mensajes en Kafka (publisher
):
Y un cliente que serían los usuarios dentro del chatroom, que podemos levantar tantos como usuarios queramos:
Bueno dejemos de fantasear con una posible felicitación de Rob Pike hacia nuestro blog, y vayamos a ver como luce nuestro código.
Creando el Publisher
Si no publicamos mensajes en Kafka pues de poco nos va a server, así que lo primero que vamos a hacer es crear una API simple. Nosotros hemos utilizado como decíamos, la librería de gin-gonic, pero podéis usar perfectamente Gorilla Mux y la propia librería estándar de Go, tal como vimos en nuestro artículo, ¿Cómo crear una API Rest en Golang?.
Veamos como luce el fichero main.go
de nuestro server.
# cmd/server/main.go
package main
import (
"github.com/gin-gonic/gin"
)
func main() {
r := gin.Default()
r.POST("/join", joinHandler(publisher))
r.POST("/publish", publishHandler(publisher))
_ = r.Run()
}
Como vemos tendremos dos endpoints
que serán consumidos vía POST
, uno para publicar un mensaje cuando se une un nuevo usuario a una sala de chat, y otro para publicar los mensajes de los usuarios en sí, podríamos tener un solo handler
y por el tipo de mensaje que nos llegué actuar en consecuencia, pero por simplicidad y visibilidad lo hemos dejado separado en dos endpoints
distintos.
Obviamente, si intentamos ejecutar ese código no funcionará, ya que esos handler
que hemos declarado no existen, y por si fuera poco esperan una variable que tampoco existe. Vamos a darle forma a nuestro publisher
.
Cómo sabéis y si no lo sabéis somos fans de utilizar clean architecture siempre que podemos, y aquí no iba a ser menos:
# pkg/publisher.go
package kafkaexample
import (
"context"
)
// Publisher an instance that publish messages
type Publisher interface {
// Publish publish a message into a stream
Publish(ctx context.Context, payload interface{}) error
}
Sí, lo primero que haremos es crearnos una interfaz Publisher
con su método Publish
, de la cual sacaremos la implementación de Kafka, antes no lo hemos mencionado pero para hacer todo lo relacionado con Kafka utilizaremos la librería github.com/segmentio/kafka-go.
Vamos a ver como es está implementación, dentro del fichero pkg/kafka/publisher.go
:
Primeramente crearemos nuestro struct privado, publisher
, que tendrá una sola propiedad writer
, que será la que luego nos permita escribir en Kafka:
type publisher struct {
writer *kafka.Writer
}
Ahora inicializaremos dicho writer
:
// NewPublisher create a kafka publisher
func NewPublisher(brokers []string, topic string) kafkaexample.Publisher {
dialer := &kafka.Dialer{
Timeout: 10 * time.Second,
ClientID: clientID,
}
c := kafka.WriterConfig{
Brokers: brokers,
Topic: topic,
Balancer: &kafka.LeastBytes{},
Dialer: dialer,
WriteTimeout: 10 * time.Second,
ReadTimeout: 10 * time.Second,
CompressionCodec: snappy.NewCompressionCodec(),
}
return &publisher{kafka.NewWriter(c)}
}
Aquí tenemos dos partes, primeramente deberemos crear un dialer
que será lo que se use internamente para abrir conexiones de red contra Kafka, además indicaremos un ClientID
, que será un identificado único para nuestra conexión.
Acto seguido crearemos la configuración de nuestro writer
, el cual es importante que le llegue en que topic
tiene que escribir, y finalmente devolveremos nuestro publisher
. Pero si vemos del método, NewPublisher(brokers []string, topic string) kafkaexample.Publisher
vemos que devolvemos un Publisher
, pero nuestro struct publisher
no implementa el método Publish
, vamos a solucionarlo.
func (p *publisher) Publish(ctx context.Context, payload interface{}) error {
message, err := p.encodeMessage(payload)
if err != nil {
return err
}
return p.writer.WriteMessages(ctx, message)
}
Primero veremos que llamamos a un método propio, encodeMessage
que es utilizado para transformar nuestro payload
en un kafka.Message
, que es lo que espera el método, WriteMessages
del writer
.
Podéis ver la implementación de este método en el repositorio oficial del ejemplo, pero no es necesaria para continuar.
Una vez tenemos nuestra implementación de Publisher
funcionando para Kafka, deberemos ya podemos pasarle a nuestros handlers, el Publisher
que esperan.
# cmd/server/main.go
var (
brokers = os.Getenv("KAFKA_BROKERS")
topic = os.Getenv("KAFKA_TOPIC")
)
publisher := kafka.NewPublisher(strings.Split(brokers, ","), topic)
r := gin.Default()
r.POST("/join", joinHandler(publisher))
r.POST("/publish", publishHandler(publisher))
_ = r.Run()
Nosotros hemos tirado de variables de entorno para nuestros brokers
que son las urls de los nodos de kafka, y para el topic
ya que lo queríamos fijo.
# .env
KAFKA_BROKERS=localhost:19092,localhost:29092,localhost:39092
KAFKA_TOPIC=fogo-chat
No viene al caso meternos en como se implementan los handlers, ya que como supondréis acaban llamando al método Publish
del Publisher
, con esto ya tenemos nuestros dos endpoints
y servidor funcionando para recibir nuestros mensajes y publicarlos. ¿Vamos a por el consumer
?
¡A por nuestro consumer-worker!
Escribir el consumer, debería ser una tarea aún más sencillo si cabe, simplemente tendremos que obtener los datos de Kafka, pero al ser un chatroom, nuestro cliente deberá de recibir nuestro mensaje de texto y enviarlo al servidor, así como recoger los mensajes enviados por los demás clientes y mostrarlos. Por suerte estamos utilizando Go y podemos beneficiarnos de las gorrutinas para ello.
Esta vez vamos a ir al revés, empezaremos a ver nuestra interfaz, Consumer
:
# pkg/consumer.go
package kafkaexample
import (
"context"
)
// Consumer an instance that consumes messages
type Consumer interface {
// Read read into the stream
Read(ctx context.Context, chMsg chan Message, chErr chan error)
}
¡Madre mía, channels
! ¿y yo con estos pelos?, pero vamos no os preocupéis, últimamente se ha hablado mucho de concurrencia en el blog, deberíais tener los conceptos muy por la mano, aunque realmente nos importa poco para el tema de Kafka pero si quieres entender el ejemplo al completo, es recomendable que le eches un vistazo a nuestros artículos.
Bueno volviendo al tema, nuestra interfaz Consumer
, esperará un context
y dos canales, uno donde irá publicando los mensajes que va obteniendo de Kafka y otro donde irá escupiendo los posibles errores que se produzcan al leer esos mensajes.
Veamos como es su implementación, la cual será realizada en el fichero, pkg/kafka/consumer.go
Al igual que con el Publisher
crearemos nuestra struct privado consumer
type consumer struct {
reader *kafka.Reader
}
Nuestro consumer
, simplemente tendrá un kafka.Reader
que es el que utilizaremos luego para leer.
func NewConsumer(brokers []string, topic string) kafkaexample.Consumer {
c := kafka.ReaderConfig{
Brokers: brokers,
Topic: topic,
MinBytes: 10e3, // 10KB
MaxBytes: 10e6, // 10MB
MaxWait: 1 * time.Second, // Maximum amount of time to wait for new data to come when fetching batches of messages from kafka.
ReadLagInterval: -1,
GroupID: kafkaexample.Ulid(),
StartOffset: kafka.LastOffset,
}
return &consumer{kafka.NewReader(c)}
}
Esta vez es más sencillo aún, simplemente tendremos que declarar nuestra configuración para el reader
e inicializarlo. Dentro de esta configuración podremos indicar si queremos leer de una partición determinada, Partition
. Además vemos que tenemos la propiedad GroupID
, que como comentamos sirve para etiquetar y agrupar nuestros consumers, en este caso no queremos dicha agrupación sino que todo los consumers lean los mismos mensajes del topic
, así que asignamos un valor aleatorio a cada uno de ellos.
Por último hay que decir, que sino le decimos lo contrario cuando publicamos (nosotros no lo hemos hecho en nuestro ejemplo, pero es recomendable en entornos real), kafka almacenará todos los registros hasta el fin de los tiempos, o hasta que se llene el disco, lo que llegue antes, además cuando leamos de él sino declaramos desde donde empezamos a leer, cogerá desde el primer registro, como entenderéis esto puede ser muy lento y no lo que queremos en muchos casos, así que basta decirle que queremos empezar desde el final, StartOffset: kafka.LastOffset
.
Ahora veamos el método Read
que nos hará cumplir la interfaz, Consumer
.
func (c *consumer) Read(ctx context.Context, chMsg chan kafkaexample.Message, chErr chan error) {
defer c.reader.Close()
for {
m, err := c.reader.ReadMessage(ctx)
if err != nil {
chErr <- errors.New(fmt.Sprintf("error while reading a message: %v", err))
continue
}
var message kafkaexample.Message
err = json.Unmarshal(m.Value, &message)
if err != nil {
chErr <- err
}
chMsg <- message
}
}
Lo primero que deberemos hacer es cerrar nuestro reader
si llegamos a salir de nuestro método Read
, defer c.reader.Close()
.
Bien, el método Read
se compone de un bucle infinito, el cual estará leyendo en Kafka todo el rato, en el topic
que hemos especificado anteriormente, m, err := c.reader.ReadMessage(ctx)
, esto nos devolverá un mensaje que ya sabemos que vendrá serializado en json, y cumplirá con nuestro struct Message
.
# pkg/chatroom.go
type Message struct {
Username string `json:"username"`
Message string `json:"message"`
}
Así que una vez decodificado, lo meteremos en nuestro canal y luego lo trataremos.
Vemos entonces como funciona nuestro cliente a nivel de consola:
# cmd/client/main.go
func main() {
var (
host = os.Getenv("HOST")
brokers = os.Getenv("KAFKA_BROKERS")
topic = os.Getenv("KAFKA_TOPIC")
)
fmt.Println("Choose your nickname:")
_, _ = fmt.Scanf("%s\n", &user)
if err := joinTheRoom(host); err != nil {
log.Fatal(err)
}
go func() {
for {
reader := bufio.NewReader(os.Stdin)
msg, _ := reader.ReadString('\n')
if err := publishMessage(host, msg); err != nil {
log.Fatal(err)
}
}
}()
chMsg := make(chan kafkaexample.Message)
chErr := make(chan error)
consumer := kafka.NewConsumer(strings.Split(brokers, ","), topic)
go func() {
consumer.Read(context.Background(), chMsg, chErr)
}()
quit := make(chan os.Signal)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
for {
select {
case <-quit:
goto end
case m := <-chMsg:
printMessage(m)
case err := <-chErr:
log.Println(err)
}
}
end:
fmt.Println("\nyou have abandoned the room")
}
Tendremos dos métodos que son los que se conectarán contra nuestro servidor para publicar los mensajes, joinTheRoom
y publishMessage
, no entraremos en detalles en ellos ya que son llamadas http
contra un servidor.
Ahora vamos por parte, lo primero que hacemos según levantamos nuestro cliente es:
fmt.Println("Choose your nickname:")
_, _ = fmt.Scanf("%s\n", &user)
if err := joinTheRoom(host); err != nil {
log.Fatal(err)
}
Aquí lo primero que haremos es asignarnos un nombre, para empezar a chatear, luego esto mandará un mensaje a nuestro servidor y publicará en Kafka. Sí, estamos usando user
como variable global, no hagáis esto, es feo :), pero es un ejemplo.
A continuación publicaremos nuestra primera rutina, que será la encargada de mantenerse pendiente de leer todos los mensajes que introducimos en la consola y mandarlos al servidor, para que posteriormente sean publicados.
go func() {
for {
reader := bufio.NewReader(os.Stdin)
msg, _ := reader.ReadString('\n')
if err := publishMessage(host, msg); err != nil {
log.Fatal(err)
}
}
}()
Además queremos ver por pantalla lo que escriben nuestros amigos, así que deberemos publicar una nueva rutina que sea la que lea esos mensajes de kafka.
chMsg := make(chan kafkaexample.Message)
chErr := make(chan error)
consumer := kafka.NewConsumer(strings.Split(brokers, ","), topic)
go func() {
consumer.Read(context.Background(), chMsg, chErr)
}()
quit := make(chan os.Signal)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
for {
select {
case <-quit:
goto end
case m := <-chMsg:
printMessage(m)
case err := <-chErr:
log.Println(err)
}
}
end:
Como veis una vez declaramos nuestro consumer
, leer es tan simple como llamar a su método Read
. Además tenemos un bucle infinito que se encargará de recoger los mensajes que se publiquen en el canal para ir mostrándonos por pantalla. Para salir hemos utilizado una señal de quit
, con hacer ctrl+c
terminará nuestro proceso de manera controlada, podríamos hacer uso del contexto para por ejemplo cerrar nuestro reader
.
Conclusión
Hasta aquí ha llegado nuestro extenso artículo sobre Apache Kafka, donde hemos intentado cubrir todo lo básico para que entendáis su funcionamiento y podáis hacerlo servir, utilizando un ejemplo algo más real con varios servicios implicados.
Todo el código utilizado lo podéis encontrar en nuestro repositorio de Github os invitamos a pegarle un buen vistazo, e incluso os retamos a realizar una Pull Request, con lo siguiente.
Cread un nuevo endpoint en el servidor donde publicaréis un mensaje para informar que el usuario en cuestión ha abandonado el chat, en el cliente añadid la lógica para mandar ese mensaje de “leaveTheRoom” y mostrar en los demás clientes activos que el usuario ha abandonado la sala y además si os ponéis puristas haced uso del contexto, para cerrar el reader
del consumer antes de acabar la ejecución.
Ya sabéis que si tenéis cualquier duda o comentario podéis dejarlo en los comentarios o en nuestro Twitter @FriendsofGoTech.