Hace tres semanas, y después de meses recibiendo feedback reclamando más artículos sobre la concurrencia en Go, decidimos empezar una nueva serie de artículos sobre patrones de concurrencia que ha dado mucho de que hablar.
Dicha serie empezó con uno de los patrones más comunes entre las aplicaciones concurrentes en Go: el patrón context. Después seguimos repasando otro patrón, también muy habitual a la hora de manejar grades volúmenes de datos de diferentes fuentes: el patrón pipeline. Y hoy, seguimos con el patrón pooling, cuya finalidad principal es la de agrupar un conjunto de recursos (pool) que pueden ser compartidos y utilizados individualmente por un número cualquiera de rutinas.
La situación más común dónde aplicaremos este patrón será en aquellos casos en los que tengamos un conjunto de recursos que queramos compartir (varias conexiones a bases de datos, por ejemplo). Cuando una rutina necesita uno de los recursos del conjunto, puede adquirir dicho recurso, usarlo, y finalmente devolverlo a la piscina (pool).
Entrando en materia
Lo primero de todo será ver qué pinta tiene ese pool de recursos y cómo lo implementamos:
package main
import (
"errors"
"io"
"sync"
)
type Pool struct {
m sync.Mutex
resources chan io.Closer
factory func() (io.Closer, error)
closed bool
}
func New(fn func() (io.Closer, error), size uint) (*Pool, error) {
if size <= 0 {
return nil, errors.New("size value is too small")
}
return &Pool{
factory: fn,
resources: make(chan io.Closer, size),
}, nil
}
Como podemos ver, con este pequeño pedazo de código ya tenemos forma de inicializar un nuevo conjunto de recursos, que,
si os fijáis, hemos definido del tipo io.Closer
. Luego veremos el porqué de esa decisión.
Sin embargo, con lo que tenemos hasta ahora, no tenemos forma de adquirir esos recursos, ni de, posteriormente, liberarlos. Así que, veamos cómo implementamos esa parte.
Adquiriendo y liberando recursos
Si no los adquirimos, evidentemente no los vamos a poder liberar, así que primero veamos cómo conseguimos un recurso:
var ErrPoolClosed = errors.New("the pool has been closed")
func (p *Pool) Acquire() (io.Closer, error) {
select {
case r, ok := <-p.resources:
if !ok {
return nil, ErrPoolClosed
}
return r, nil
default:
return p.factory()
}
}
Si habéis prestado atención hasta el momento, os habréis fijado que uno de los atributos de nuestro pool es el atributo closed, el cuál nos permitirá gestionar el estado en el que éste esté cerrado. Por lo tanto, lo primero será gestionar esa situación a la hora de intentar adquirir un nuevo recurso.
Para esta ocasión de ejemplo vamos a usar un sentinel error, sin embargo, os recomendamos que os volváis a leer el artículo de gestión de errores de nuestro compañero David López Carrascal, y que decidáis a vuestro criterio cuál sería la mejor forma de gestionarlo en ésta ocasión.
Una vez gestionado el error, lo siguiente será adquirir el recurso en sí. Eso lo podemos hacer de dos formas:
- obteniendo un recurso disponible en el pool.
- inicializando un nuevo recurso.
La primera de las formas será la más habitual en un uso común de éste patrón, sin embargo, si hemos inicializado nuestro pool recientemente, también deberemos inicializar los recursos.
Vale, ya tenemos nuestro recurso adquirido, ahora ya podemos trabajar con él y hasta ahí genial. Pero, una vez adquirido, necesitamos una forma de liberar ese recurso y de devolverlo a la piscina (pool):
func (p *Pool) Release(r io.Closer) {
p.m.Lock()
defer p.m.Unlock()
if p.closed {
r.Close()
return
}
select {
case p.resources <- r:
default:
r.Close()
}
}
De nuevo, tenemos que gestionar la situación en la que el pool haya sido cerrado. En esta ocasión es sencillo: simplemente cerraremos el recurso. Lo mismo ocurrirá si nuestro pool de recursos ya está lleno. En ambas ocasiones, además, el recurso sujeto será ignorado una vez ya haya sido cerrado.
Y aquí es dónde vemos porqué anteriormente decidimos que nuestros recursos fueran del tipo io.Closer
. Pues tal y como
introducíamos al principio, el tipo de recursos que vamos a querer gestionar con éste patrón, serán recursos como
conexiones a base de datos. Por lo tanto, vamos a querer cerrar (a modo de graceful shutdown y para liberar
recursos en el sistema) esas conexiones una vez ya no vayan a ser utilizadas de nuevo.
Finalmente, vamos a proporcionar una forma de cerrar todo el pool:
func (p *Pool) Close() {
p.m.Lock()
defer p.m.Unlock()
if p.closed {
return
}
close(p.resources)
for r := range p.resources {
r.Close()
}
}
Es decir, una forma de dejar el pool en un estado closed (flag & channel cerrado) y de cerrar todos los recursos contenidos en el mismo.
¡Et voilà! ¡Ya lo tenemos!
Aquí tenéis un Playground con la solución final.
Cómo habéis podido ver, con un sencillo struct de cuatro elementos:
- Mutex, para la gestión de operaciones concurrentes de forma segura.
- Buffered channel, como contenedor de recursos compartidos.
- Factory function, para inicializar los recursos compartidos.
- Flag (boolean) , para representar el estado (abierto-cerrado) del pool.
somos capaces de implementar un patrón tan útil y tan común como el patrón pooling.
Por qué, ¿quién no había utilizado antes una librería de BBDD que gestionaba un pool de conexiones?
Y ahora, ¿os creéis capaces de partir de nuestro ejemplo y compartir con nosotros una implementación real de dicho patrón?
Nosotros os invitamos a hacerlo, y, como siempre, estaremos encantados de recibir vuestro feedback en los comentarios del blog o vía Twitter.