Lo dijimos, y lo prometido es deuda. Volvemos a la carga para seguir nuestra serie de patrones de concurrencia con un nuevo patrón. De nuevo, intentando dar solución a problematicas para las que aún no habíamos dado respuesta. Hoy es el turno de las pipelines.
De hecho, una pipeline no es ninguna definición formal en el contexto de la terminología de Go. Pero, sí que podemos definir dicho concepto de manera informal:
Una pipeline es una serie de diferentes etapas conectadas por canales y formadas por grupos de rutinas que reciben los datos de salida de la etapa anterior, aplican una función sobre los mismos, y mediante dicha aplicación generan nuevos datos que serán enviados a la siguiente etapa. Y así sucesivamente.
Veamos un ejemplo
Vista la definición textual, creemos que lo mejor es ver un ejemplo para comprender mejor qué pinta tiene una pipeline.
Además, aprovechamos dicho ejemplo para añadir un par de conceptos más:
- Llamaremos source o producer a la primera de todas las etapas.
- Y llamaremos sink o consumer a la última de todas las etapas.
Por lo tanto, el ejemplo más pequeño y sencillo de comprender será aquél formado estrictamente por un productor (o source) y un consumidor (o sink). Sin etapas intermedias entre ellos. Algo así:
package main
import (
"fmt"
"math/rand"
"time"
)
const N = 10
func main() {
out := random(N)
print(out)
}
func random(n int) <-chan int {
out := make(chan int)
go func() {
for i := 0; i < n; i++ {
x := rand.New(rand.NewSource(time.Now().UTC().UnixNano())).Int()
out <- x
}
close(out)
}()
return out
}
func print(in <-chan int) {
for n := range in {
fmt.Println(n)
}
}
En esta pequeña representación del patrón pipeline podemos ver los dos elementos anteriormente introducidos. En primer lugar, un productor (o grupo de productores) encargado de generar números aleatorios, y, en segundo lugar, un consumidor (o grupo de consumidores) responsable de escribir dichos números en la salida estándar del sistema.
Ya está, ya tenemos nuestra primera pipeline. Sencilla, pero no por ello menos funcional.
Añadiendo más complejidad a nuestras pipelines
Efectivamente, ya tenemos nuestra pipeline funcionando. Sin embargo, las situaciones reales dónde la aplicación de dicho patrón sea una posible solución a nuestros problemas, serán situaciones con una complejidad más elevada que la vista hasta ahora.
Entonces, veamos algunos de los recursos que tenemos a la hora de tener que hacer que nuestras pipelines respondan a lógicas más complejas.
Distribuir el cómputo de nuestras etapas (fan-out)
Una situación muy común en las pipelines es precisamente la de tener que hacer una tarea repetitiva múltiples veces. Pongamos por ejemplo que tenemos que hacer un conjunto de peticiones HTTP o que tengamos que leer un conjunto de ficheros. En este tipo de situaciones, lo habitual será tener una (o pocas) rutinas con los datos relativos a las tareas que debemos realizar (el listado de endpoints a peticionar o el listado de ficheros a leer) y que nuestro procedimiento consista en instanciar tantas rutinas como sean necesarias para llevar a cabo dichas tareas de la forma más rápida (paralelizable) posible.
Este tipo de recurso se llama fan-out y se correspondería con algo así:
package main
import (
"fmt"
"io/ioutil"
"os"
)
func main() {
filePaths := []string{"lorem1.txt", "lorem2.txt", "lorem3.txt", "lorem4.txt"}
channels := readFiles(filePaths)
for _, c := range channels {
b := <-c
fmt.Println(string(b))
}
}
func readFiles(paths []string) []<-chan []byte {
var channels []<-chan []byte
for _, p := range paths {
contentStream := make(chan []byte)
go readFile(p, contentStream)
channels = append(channels, contentStream)
}
return channels
}
func readFile(path string, stream chan<- []byte) {
file, err := os.Open(path)
if err != nil {
fmt.Println(err)
}
defer file.Close()
b, _ := ioutil.ReadAll(file)
fmt.Println(string(b))
stream <- b
}
Como podemos ver, lo que hacemos es instanciar una nueva rutina para cada uno de los path que nos pasan. Además, dotamos a cada una de estas rutinas con un canal a través del cuál nos publicará los datos leídos del fichero.
Recopilar los resultados del cómputo de nuestras etapas (fan-in)
La situación opuesta a la anterior se daría cuándo queramos hacer la operación inversa. Es decir, juntar todos los resultados de los cómputos anteriores. Por ejemplo, para juntar todos los datos recibidos por HTTP o para juntar el contenido de todos los ficheros leídos.
En esta ocasión estaríamos hablando de un fan-in y sería algo como:
package main
import (
"fmt"
"io/ioutil"
"os"
)
func main() {
filePaths := []string{"lorem1.txt", "lorem2.txt", "lorem3.txt", "lorem4.txt"}
channels := readFiles(filePaths)
b := mergeContent(channels...)
fmt.Println(string(b))
}
func mergeContent(channels ...chan []byte) (bytes []byte) {
agg := make(chan []byte)
for _, ch := range channels {
go func(c chan []byte) {
agg <- <-c
}(ch)
}
for i := 0; i < len(channels); i++ {
select {
case b := <-agg:
bytes = append(bytes, b...)
}
}
return
}
En este caso, la función mergeContent
es la responsable de hacer ese fan-in de todos los canales que instanciamos
en el ejemplo de código anterior. De esta forma habríamos cerrado el ciclo que consistiría en crear múltiples rutinas
para paralelizar la lectura de varios ficheros (o de varias peticiones HTTP) y en juntar los resultados de todas ellas.
A partir de aquí nuestra pipeline podría seguir de forma lineal o con nuevas apariciones de ambos recursos (fan-out, fan-in).
Además, nos debemos fijar en un pequeño detalle presente en el último ejemplo de código. Pues, si buscáis ejemplos de fan-in / fan-out os daréis cuenta que, habitualmente, los ejemplos no tienen un número variable (dinámico) de canales a gestionar. Sin embargo, en esta ocasión, el código es compatible con cualquier número de canales. Pese a que podríamos saber de antemano que el slice que recibimos como argumento siempre tendrá un tamaño fijo de cuatro elementos.
Eso es gracias a las siguientes líneas de código:
agg := make(chan []byte)
for _, ch := range channels {
go func(c chan []byte) {
agg <- <-c
}(ch)
}
Cuyo objetivo es permitirnos hacer un select
sobre los múltiples canales recibidos (que recordemos no sabemos el número).
Por lo tanto, de este modo lo que haremos es hacer una redirección (forward) de los diferentes canales a un único canal agregado
(que será sobre el que haremos realmente el select
.
Cancelando las tareas de nuestra pipeline
De modo similar a como lo veíamos en el artículo sobre el patrón context, habrá situaciones en las que necesitaremos notificar a las rutinas de nuestras etapas de que deben finalizar su ejecución. Eso, evidentemente, lo podríamos gestionar con contextos. Aunque hoy vamos a presentar otro medio de llevarlo a cabo. A éste le llamaremos done channel. Y basicamente consiste en hacer la siguiente gestión:
package main
import (
"fmt"
"io/ioutil"
"os"
"os/signal"
"time"
)
func main() {
done := make(chan struct{})
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
go func() {
select {
case <-c:
done <- struct{}{}
}
}()
filePaths := []string{"lorem1.txt", "lorem2.txt", "lorem3.txt", "lorem4.txt"}
channels := readFiles(done, filePaths)
b := mergeContent(done, channels)
fmt.Println(string(b))
}
func mergeContent(done chan struct{}, channels []chan []byte) (bytes []byte) {
agg := make(chan []byte)
for _, ch := range channels {
go func(c chan []byte) {
agg <- <-c
}(ch)
}
for i := 0; i < len(channels); i++ {
select {
case b := <-agg:
bytes = append(bytes, b...)
case <-done:
return
}
}
return
}
func readFiles(done chan struct{}, paths []string) []chan []byte {
var channels []chan []byte
for _, p := range paths {
contentStream := make(chan []byte)
go readFile(done, p, contentStream)
channels = append(channels, contentStream)
}
return channels
}
func readFile(done chan struct{}, path string, stream chan []byte) {
time.Sleep(2 * time.Second)
file, err := os.Open(path)
if err != nil {
fmt.Println(err)
}
defer file.Close()
b, _ := ioutil.ReadAll(file)
fmt.Println(string(b))
select {
case stream <- b:
case <-done:
}
}
Qué como podemos ver, básicamente consiste en propagar un done channel (chan struct{}
) por todas las rutinas y hacer
que éstas tengan la señal de dicho canal como una condición de salida.
Y, ¡eso es todo!
En esta ocasión, además, hemos vinculado la finalización de la tarea a la señal de fin de ejecución del programa (CTRL + C
),
de un modo similar a como haríamos un cleanup de nuestras aplicaciones. Pero, como podemos ver, la señal de done la
podríamos gestionar de cualquier otro modo (publicando en dicho canal cuándo quisiéramos pausar la ejecución de todas las rutinas).
¿Aún no conocías éste patrón? Cuéntamos qué tipo de problema crees que te ayudará a resolver.
¿Ya lo conocías y ya lo habías puesto en práctica? Cuéntamos qué tipo de problema te ha ayudado a resolver.
Como siempre, estaremos encantados de recibir vuestro feedback en los comentarios del blog o vía Twitter.