Con la excusa de la StayAtHomeConf y la charla sobre el paquete 'context' impartida en la misma, el artículo de la semana pasada estuvo centrado en el paquete '/x/sync/errgroup' cuya principal finalidad es la de proporcionarnos una herramienta de sincronización, propagación de errores y cancelación de contexto para grupos de gorrutinas.
Sin embargo, bajo el paraguas de /x/sync/ existen otros paquetes
interesantes. Todos ellos centrados en proporcionarnos herramientas de sincronización entre gorrutinas. Y vivir bajo
el contexto /x/
significa “experimental”, pero no por ello menos útil. De hecho, son ya varios los paquetes que
históricamente formaron parte de ese contexto pero que hoy en día ya forman parte de la librería estándar. O, ¿es que nadie
se acuerda ya de cuándo los errors.Is
y errors.As
estaban en fase experimental?
/x/sync/singleflight
El primero de estos paquetes es el paquete '/x/sync/singleflight', que como su nombre indica, lo que nos proporciona es un mecanismo de supresión de llamadas a funciones duplicadas.
Supongamos que tenemos una API para recuperar datos que, por debajo, va a buscar dichos datos a una BBDD o a otra API / servicio. En ese contexto, si nuestra API tiene un volumen de tráfico muy elevado es probable que en el mismo intervalo de tiempo varios usuarios nos pidan los mismos datos.
En ese caso, ¿tendría sentido realizar una petición a la BBDD o a la API externa por cada usuario solicitante? En realidad, eso dependerá de los requisitos no funcionales de nuestro proyecto, pero es probable qué, cuándo estemos hablando de datos que no se actualizan con mucha frecuencia, no tenga mucho sentido cargar el servicio colindante con todas esas peticiones.
Será precisamente en estos casos dónde este paquete nos será útil. Veamos un ejemplo:
const opKey = "fetch"
var requestGroup singleflight.Group
func fetchHandler(w http.ResponseWriter, r *http.Request) {
data, err, shared := requestGroup.Do(opKey, func() (interface{}, error) {
// do fetch data (db query or api request)
})
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(data)
}
Como podéis ver, lo que vamos a hacer es usar un singleflight.Group
para encapsular dentro del mismo (mediante el método Do
)
la operación de recuperar datos, identificada con una clave, de forma que si nos llegan múltiples peticiones concurrentes que
vayan a parar a este handler, la operación de recuperar los datos será ejecutada una única vez mientras que los datos
se retornarán a todas las llamadas a dicha función con la misma clave.
Adicionalmente, esta función también nos va devolver un booleano (shared
) que nos indicará si el resultado fue devuelto
(o no) a más una llamada al método Do
para la clave dada. Y, por si esto fuera poco, este paquete también nos proporciona
un par de métodos más que también son interesantes:
-
En primer lugar, además del método
Do
que vimos anteriormente, también tenemos un métodoDoChan
que nos permite recibir los resultados de la función ejecutada a través de un canal deResult
, dónde esta última estructura se corresponde con:type Result struct { Val interface{} Err error Shared bool }
-
Y, en segundo lugar, también tenemos un método
Forget(key string)
que nos permite decir: “Hey, olvídate de todas las peticiones que tienes agrupadas con esta llave y trata las siguientes como un nuevo grupo”, que nos puede resultar útil en determinadas circustancias, por ejemplo, cuándo los datos hayan sido actualizados.
/x/sync/semaphore
Otro de estos paquetes es el paquete '/x/sync/semaphore', que nos proporciona una implementación de un semáforo ponderado. Como ya vimos en el artículo del patrón de concurrencia pooling, el uso de buffered channels nos es muy útil cuándo queremos limitar el máximo de gorrutinas que acceden a un grupo limitado de recursos. Sin embargo, este patrón será “injusto” o insuficiente cuándo las tareas a ejecutar no sean proporcionalmente comparables.
Es decir, vamos a suponer un caso en el que queremos repartir el 100% de la CPU disponible de nuestra máquina a diez procesos diferentes. En ese caso, si hay procesos que usarán un porcentaje elevado de CPU y otros que usarán muy poca, entonces no tendrá sentido hacer una repartición equitativa (por ejemplo, el 100% de CPU repartido entre 10 rutinas -10% de CPU para cada una-). Veamos un ejemplo:
var (
maxWorkers = runtime.GOMAXPROCS(0)
sem = semaphore.NewWeighted(int64(maxWorkers))
)
var tasks = []int{1,2,2,1,4,2,1}
for _, task := range tasks {
taskWeight := task
ctx, cancel := context.WithTimeout(5 * time.Second)
if err := sem.Acquire(ctx, taskWeight); err != nil {
log.Printf("Failed to acquire semaphore: %v", err)
break
}
go func() {
defer sem.Release(taskWeight)
// do whatever
}()
}
Cómo podéis ver, lo primero que hacemos es recuperar el número actual de GOMAXPROCS
(cuándo le pasamos 0
como parámetro
no modifica el valor actual y nos devuelve dicho valor) y instanciamos un nuevo semáforo ponderado con la misma capacidad.
Posteriormente, simulamos (por simplicidad) que tenemos un listado de tareas con diferentes requisitos de recursos, entonces
para cada una de ellas lo que vamos a hacer es adquirir ese número de recursos del semáforo con el método Acquire
y posteriormente
lanzaremos nuestra rutina, liberando los recursos al final de su ejecución.
¡De este modo ya tendríamos nuestro semáforo ponderado funcionando! También es importante destacar de este ejemplo el
timeout del contexto que pasamos como primer parámetro al método Acquire
, pues de otro modo esa operación podría
resultar indefinidamente bloqueante si ninguna de las tareas que previamente ha adquirido los recursos los libera.
/x/sync/syncmap
Finalmente, el último y quizás el menos popular de estos paquetes es el paquete '/x/sync/syncmap', cuya funcionalidad ya hemos comentado brevemente en algún otro artículo y que básicamente nos proporciona una implementación de un map con soporte para la concurrencia, y cuya promoción a la librería estándar es ya cosa del pasado.
Así que, ya por cerrar el artículo y repasar brevemente el uso de este recurso, imaginemos que tenemos el siguiente handler HTTP de una API de contadores dónde usamos un map a modo de almacenamiento:
var counters = make(map[string]int)
type IncrementRequest struct {
ID string `json:"id"`
}
func incrementHandler(w http.ResponseWriter, r *http.Request) {
var req IncrementRequest
err := json.NewDecoder(r.Body).Decode(&req)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
counters[req.ID]++
w.WriteHeader(http.StatusOK)
}
Cómo deberíais saber, en el ejemplo anterior, si nos llegan varias peticiones concurrentes vamos a terminar sufriendo
un error del estilo de fatal error: concurrent map read and map write
. A lo que una posible solución (aunque no
siempre la más recomendada), podría ser el uso de dicha estructura de datos:
var counters sync.Map
type IncrementRequest struct {
ID string `json:"id"`
}
func incrementHandler(w http.ResponseWriter, r *http.Request) {
var req IncrementRequest
err := json.NewDecoder(r.Body).Decode(&req)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
result, ok := counters.Load(req.ID)
if !ok {
counters.Store(req.ID, 1)
} else {
counters.Store(req.ID, result.(int)+1)
}
w.WriteHeader(http.StatusOK)
}
Sin embargo, tal y cómo decíamos antes, esta no es siempre la solución más recomendada, pues como podéis ver en el ejemplo,
los elementos de la estructura están tipados como interface{}
, con lo cuál tenemos que tirar de reflection y, además,
esta estructura no nos deja libres de potenciales data races, pues las operaciones de lectura + escritura no son atómicas.
Es por eso, por lo que es poco frecuente el uso del sync.Map
como solución a las problemas de concurrencia con maps,
sin embargo, siempre es recomendable conocer todos los recursos que nos proporciona el lenguaje para ser capaces de escoger
el más adecuado en cada caso.