Si bien es cierto que la programación reactiva es bastante popular en varios lenguajes de programación (véase Java), la verdad es que ésta no está muy candente en el ecosistema Go, ya sea por la falta de madurez del propio lenguaje, o por el estilo de aplicaciones que se desarrollan con el mismo. Sin embargo, no por ello estamos limitados a usar dicho patrón de programación con nuestro lenguaje favorito. Pero antes, véamos de dónde viene este concepto.
Bien, pues una forma de definir la programación reactiva es como “un paradigma enfocado en el trabajo con flujos de datos (finitos o infinitos) de manera asíncrona”. Cuyas bases se asentan en el Reactive Manifesto (el manifiesto de los sistemas reactivos). En dicho manifiesto se establecen las bases de los sistemas reactivos, los cuales deben ser:
-
Responsivos: responden a tiempo en la medida de lo posible. Son capaces de detectar los problemas rápidamente y de tratarlos efectivamente. Se enfocan en proveer tiempos de respuesta rápidos y consistentes, estableciendo límites superiores confiables para así proporcionar una calidad de servicio consistente. Este comportamiento consistente, a su vez, simplifica el tratamiento de errores, aporta seguridad al usuario final y fomenta una mayor interacción.
-
Resilientes: son capaces de permanecer responsivos frente a fallos. Los fallos son manejados dentro de cada componente, aislando cada componente de los demás, y asegurando así que cualquier parte del sistema pueda fallar y recuperarse sin comprometer el sistema como un todo.
-
Elásticos: se mantienen responsivos bajo variaciones en la carga de trabajo. Pueden reaccionar a cambios en la frecuencia de peticiones incrementando o reduciendo los recursos asignados para servir dichas peticiones. Esto implica diseños que no tengan puntos de contención o cuellos de botella centralizados, resultando en la capacidad de dividir o replicar componentes y distribuir las peticiones entre ellos. Soportan algoritmos de escalado predictivos, así como reactivos, al proporcionar relevantes medidas de rendimiento en tiempo real.
-
Orientados a mensajes: confían en el intercambio de mensajes asíncrono para establecer fronteras entre componentes, lo que asegura bajo acoplamiento, aislamiento y transparencia de ubicación. Estas fronteras también proporcionan los medios para delegar fallos como mensajes. El uso del intercambio de mensajes explícito posibilita la gestión de la carga, la elasticidad, y el control de flujo, gracias al modelado y monitorización de las colas de mensajes en el sistema, y la aplicación de back-pressure cuando sea necesario.
Y fue, precisamente con el objetivo de lograr desarrollar sistemas con esas características, lo que impulsó la definición de ReactiveX, una librería para componer programas asíncronos y basados en eventos mediante el uso de secuencias observables, que se define como una combinación de las mejores ideas del patrón de observador, el patrón iterador y la programación funcional. De hecho, esta librería extiende el patrón observador para admitir secuencias de datos y/o eventos y agrega operadores que le permiten componer secuencias de manera declarativa, al tiempo que abstrae preocupaciones sobre cosas como subprocesos de bajo nivel, sincronización, seguridad de subprocesos, estructuras de datos concurrentes y operaciones de entrada y salida no bloqueantes.
Dicha librería tiene implementaciones para un gran abanico de lenguajes de programación, cuyo nombre suele ser “Rx” seguido del nombre del lenguage de programación (RxJava, RxJS, Rx.NET, etc). Así que, como no podía ser menos: hoy veremos como funciona la librería RxGo.
Usando RxGo
Conceptualmente, y como nos indican en su propia documentación, la implementación reactiva para Go (RxGo) está basada en el patrón de concurrencia pipeline, el cual consiste en conectar una serie de etapas mediante canales, donde cada etapa es un grupo de gorutinas que ejecutan la misma función.
En la imagen anterior (obtenida de la documentación oficial), por ejemplo, podemos identificar:
- Un “observable” estático basado en una lista fija de elementos usando el operador Just.
- Una función de transformación (convertir un círculo en un cuadrado) usando el operador Map.
- Una función de filtrado (filtramos cada cuadrado amarillo) usando el operador Filter.
Ejemplo de código
Lo primero de todo, antes de empezar a escribir algo de código, será descargar la dependencia mediante el comando habitual
go get github.com/reactivex/rxgo/v2
. Y ahora sí, ya podríamos traducir la imagen anterior en código Go:
package main
import (
"context"
"errors"
"fmt"
"github.com/reactivex/rxgo/v2"
)
type Circle struct {
Radius int
Color string
}
type Square struct {
Size int
Color string
}
func mapToSquare(ctx context.Context, c interface{}) (interface{}, error) {
circle, ok := c.(Circle)
if !ok {
return nil, errors.New("unexpected input type")
}
return Square{Size: circle.Radius, Color: circle.Color}, nil
}
func filterYellowSquares(s interface{}) bool {
return s.(Square).Color == "yellow"
}
func main() {
observable := rxgo.Just(
Circle{Radius: 10, Color: "red"},
Circle{Radius: 10, Color: "yellow"},
Circle{Radius: 10, Color: "green"},
)().Map(mapToSquare).Filter(filterYellowSquares)
for item := range observable.Observe() {
if item.Error() {
fmt.Println("an error occurred: ", item.E)
}
fmt.Println("observed items are:", item.V)
}
}
Como podéis ver, lo único que hemos hecho es traducir cada uno de los puntos que indentificamos anteriormente en una pieza de código escrita en Go. Además, también podemos observar algunas consideraciones que debemos tener en cuenta:
-
Todos los operadores utilizados en el flujo del observador (transformación, filtrado, etc) no dejan de ser argumentos, lo que nos permite poder testear las funciones individuales de una forma muy sencilla.
-
El “observable” es lazy por defecto, lo que implica que solo emite eventos una vez que se realiza una suscripción.
-
El método
Observe()
devuelve un canal de elementos<-chan rxgo.Item
, que puede usarse con normalidad. -
Podemos hacer uso de
item.V
oitem.E
para acceder al valor o al error del elemento emitido por el “observable”. -
Por defecto, el
item.Error()
detiene el “observable” en el primer error, a menos que le pasemosrxgo.WithErrorStrategy(rxgo.ContinueOnError)
como opción en el métodoObserve()
.
Adicionalmente, como podemos ver, las firmas de los métodos de esta librería hacen un uso extensivo de la interfaz vacía
(interface{}
), lo cual puede ser una razón más por la que este patrón no está muy extendido en el ecosistema Go, razón
añadida a las razones que ya mencionamos al principio de este mismo artículo. Sin embargo, este debate ya lo dejamos para
otro día y para el eventual soporte para genéricos en Go.
Otras funciones y conceptos
Además de lo visto hasta ahora, la librería nos ofrece muchas más utilidades y funciones para llevar a cabo una programación
reactiva tan compleja y eficaz como queramos. Por ejemplo, así como anteriormente definimos un listado de círculos “a
mano” mediante la función Just
, también disponemos de:
-
Otras funciones de creación: para crear “observables” a partir de un canal, de un slice, de un rango o una secuencia de elementos, o incluso hasta de un Timer.
-
Otras funciones de transformación: para hacer marshal/unmarshal de datos, para añadir buffers, agrupaciones de elementos, etc.
-
Otras funciones de filtrado: para eliminar duplicados, para aplicar muestreos o incluso para aplicar estrategias más complejas como el debouncing.
-
Así como otras utilidades para la gestión de errores, operadores matemáticos y de agregación, etc.
Finalmente, es importante también conocer algunos de los conceptos que se definen dentro del contexto de la programación reactiva, para así, poder explotar al máximo el potencial que nos dan herramientas como RxGo. Veamos algunos de estos conceptos.
Observables hot vs cold
Cuándo los datos son producidos por el propio “observable”, entonces vamos a decir que se trata de un “observable frío”. Por contra, cuándo los datos se producen fuera del “observable”, entonces lo llamamos “observable caliente”. Lo importante en este caso es que una stream observable en caliente es reproducible y puede usarse varias veces.
Backpressure
Por backpressure entendemos la técnica que se utiliza en las situaciones en las que el observador no puede hacer frente a una elevada tasa de producción de eventos. Algunos ejemplos de backpressure son:
- Descartar los mensajes.
- Estrategias de amortiguación sensatas (tiempo vs recuento).
- Bloquear la ejecución y procesar el conjunto actual de eventos.
- Estrategias de throttling y debouncing.
Procesamiento secuencial vs paralelo
Este concepto hace referencia a la forma en la que actuan los operadores que procesan las pipelines de datos. Por defecto, todas las operaciones de mapeado (map) y filtrado son secuenciales. Sin embargo, y como podéis imaginar, es recomendable ejecutar las operaciones en paralelo para exprimir el uso de la CPU. Las operación paralelas mantienen el orden en que se recibieron los datos.
Observables conectables
Un observable conectable se parece a un observable ordinario, excepto que no comienza a emitir elementos cuando está suscrito,
sino solo cuando se llama a su método connect()
. De esta manera, puede esperar a que todos los suscriptores previstos
se suscriban al observable antes de que el observable comience a emitir elementos.
Y hasta aquí por hoy. Así que, si os interesa el apasionante mundo de la programación reactiva, os recomendamos que le echéis un vistazo a la documentación oficial que es muy extensa y además nos proporciona ejemplos mucho más extensos y del “mundo real”.
Y vosotros, ¿ya sabíais qué es la programación reactiva?¿ya la estábais aplicando en alguna aplicación desarrollada en Go? ¿Tenéis alguna consideración o recomendación que queráis compartir con nosotros? Como siempre, estaremos encantados de recibir vuestro feedback en los comentarios del blog o vía Twitter.