Member of The Internet Defense League Últimos cambios
Últimos Cambios
Blog personal: El hilo del laberinto Geocaching

Optimización en el sistema de persistencia, y gestión de errores de conexión

Última Actualización: 15 de marzo de 2008 - Sábado

El sistema de persistencia que he desplegado hace un mes funciona a la perfección, conforme a lo esperado. Pero he detectado algunas áreas mejorables en el uso concreto de la persistencia en este proyecto.

El principal de ellos es el rendimiento.

Tengo varias sesiones de BitTornado abiertas constantemente, algunas de las cuales con más de 500 torrents. Si recalculamos la pantalla una vez por segundo y tenemos 500 torrents, estamos haciendo unas 1000 transacciones por segundo. La contabilidad de tráfico para cada torrent supone dos transacciones: la primera para sincronizar la caché persistente local, y la segunda para actualizar los datos.

Debido a ello todos los procesos me estaban consumiendo el 27% de la CPU Una máquina antigua, eso sí: Pentium 4 a 1.4 Ghz, de 2001, con 640MB de RAM. la máquina no se dedica a nada más, así que no es un problema incluso en un caso extremo como éste (miles de torrents).

La cuestión es que el consumo de CPU es proporcional al número de torrents en el sistema, cuando debería ser proporcional al número de torrents activos en cada momento (o, más concretamente, proporcional al tráfico cursado). Esto es especialmente importante porque, en general, el número de torrents activos en un momento dado es muy bajo comparado con el número de torrents existentes.

La solución es bastante simple: refrescamos la caché local de persistencia una vez por segundo, no una vez por torrent existente en el sistema. Adicionalmente, hacemos "commit" solo cuando hay cambios.

Conforme a este nuevo diseño, si no hay actividad sólo se realiza una sincronización por segundo, independientemente del número de torrents en el sistema. Y si hay actividad, el tráfico de persistencia será proporcional al tráfico P2P (típicamente un "commit" cada 16 Kbytes). Con estos cambios, el consumo de CPU ha pasado del 27% a ser inapreciable.

Los cambios son:

--- BitTornado/CurrentRateMeasure.py    (revision 7)
+++ BitTornado/CurrentRateMeasure.py    (working copy)
@@ -3,7 +3,7 @@

 from clock import clock

-from monitor import monitor
+from monitor import monitor_opt

 from time import time

@@ -16,18 +16,18 @@
         self.infohash=infohash
         self.up=0 if up else 1

-        @monitor
+        @monitor_opt
         def x(conn,infohash,up) :
           speed=conn.get_root()["BT"]
           if infohash not in speed :
             speed[infohash]=[0,0,time()]
           grand_total=speed[infohash][up]
-          return grand_total
+          return (True,grand_total)

         self.total=x(self.infohash,self.up)

     def update_rate(self, amount):
-        @monitor
+        @monitor_opt
         def x(conn,infohash,up,amount) :
           speed=conn.get_root()["BT"]
           x=speed[infohash]
@@ -36,7 +36,8 @@
             x[up]=total
             x[2]=time()
             speed[infohash]=x
-          return total
+            return (True,total)
+          return (False,total)

         self.total=x(self.infohash,self.up,amount)

Index: BitTornado/monitor.py
===================================================================
--- BitTornado/monitor.py       (revision 7)
+++ BitTornado/monitor.py       (working copy)
@@ -3,6 +3,12 @@

 persistencia_mutex=threading.Lock()

+visualizar_mensaje=None
+
+def set_display(handler) :
+  global visualizar_mensaje
+  visualizar_mensaje=handler
+
 def conecta_storage() :
   from durus.client_storage import ClientStorage
   from durus.connection import Connection
@@ -12,39 +18,66 @@
   try :
     persistencia = conecta_storage()
     break
+  except KeyboardInterrupt :
+    raise
   except :
     time.sleep(0.1)

-def monitor(func) :
+last_durus_sync=0
+
+def monitor_opt(func) :
   def _monitor(*args, **kwargs) :
     global persistencia
     global persistencia_mutex
+    global last_durus_sync
     from durus.error import ConflictError
-    import socket
+    import socket,time
+    must_reconnect=False
     while True : # Reintenta si hay conflictos
       persistencia_mutex.acquire()
       try : # Nos aseguramos de liberar el lock
+        t=time.time()
+        if must_reconnect :
+          must_reconnect=False
+          if visualizar_mensaje!=None :
+            visualizar_mensaje("PROBLEMAS CON LA CONEXION DURUS... ABRIENDO UNA CONEXION NUEVA...")
+          last_durus_sync=t
+          while True :
+            try :
+              persistencia = conecta_storage()
+              break
+            except KeyboardInterrupt :
+              raise
+            except :
+              time.sleep(0.1)
+          if visualizar_mensaje!=None :
+            visualizar_mensaje("Conexion DURUS recuperada.")
         try : # Conflictos
-
+          t=time.time()
           try : # Nos aseguramos de estar conectados
-            persistencia.abort() # Hacemos limpieza de cache
+            if t-last_durus_sync>1.0 :
+              last_durus_sync=t
+              persistencia.abort() # Hacemos limpieza de cache
           except socket.error :
-            import sys
-            import time
-            print >>sys.stderr,"PROBLEMAS CON LA CONEXION DURUS... ABRIENDO UNA CONEXION NUEVA"
-            while True :
-              try :
-                persistencia = conecta_storage()
-                break
-              except :
-                time.sleep(0.1)
+            must_reconnect=True
+            continue  # Vuelve a intentarlo

           ret=func(persistencia,*args, **kwargs)
-          persistencia.commit()
-          return ret
+          if ret[0] :
+            last_durus_sync=time.time()
+            try :
+              persistencia.commit()
+            except socket.error :
+              must_reconnect=True
+              continue  # Vuelve a intentarlo
+
+          return ret[1]
+
         except ConflictError :
+          last_durus_sync=0
           pass # El abort ya se hace en el bucle
         except :
+          last_durus_sync=t
           persistencia.abort()
           import sys
           import time
@@ -55,13 +88,14 @@

   return _monitor

-@monitor
+@monitor_opt
 def inicializa(conn) :
   from durus.btree import BTree
   from durus.persistent_dict import PersistentDict
   root=conn.get_root()
   if "BT" not in root :
     root["BT"]=BTree()
+  return (True,None)

 inicializa()

Index: btlaunchmanycurses.py
===================================================================
--- btlaunchmanycurses.py       (revision 6)
+++ btlaunchmanycurses.py       (working copy)
@@ -273,7 +273,10 @@


 def LaunchManyWrapper(scrwin, config):
-    LaunchMany(config, CursesDisplayer(scrwin))
+    ventana=CursesDisplayer(scrwin)
+    from BitTornado import monitor
+    monitor.set_display(ventana.message)
+    LaunchMany(config, ventana)


 if __name__ == '__main__':

Detalles:

  • Sincronizamos la caché persistente local una vez por segundo, aunque no haya tráfico, para poder acceder a las actualizaciones de tráfico de otros BitTornado. Por ejemplo, si tenemos un torrent presente en varios procesos, y uno de ellos está activo, debe verse el tráfico también en los demás. Estas sincronizaciones periódicas también permiten que el servidor DURUS "vacíe" su lista de objetos modificados, de vez en cuando.

  • Dado que las sincronizaciones son una vez por segundo, es posible que cuando intentemos hacer un "commit" estemos anticuados. DURUS detecta este hecho sin problemas, y generará una excepción indicando que ha existido un conflicto. El código de "monitor" se encargará de reintentar la operación de nuevo, una vez actualizada la caché. Es decir, se producirán más conflictos que antes, pero se resuelven automáticamente.

  • Como ahora solo se hace "connection.commit()" cuando hay cambios, podemos tener conflictos espurios. La casuística es como sigue: realizamos una actualización de tráfico para un torrent con tráfico 0, así que nos ahorramos el "connection.commit()". Seguidamente actualizamos otro torrent, que sí ha tenido tráfico. Por ello, esta vez sí hacemos el "connection.commit()". Si mientras tanto el primer torrent ha sido modificado por otro proceso BitTornado, tendremos un conflicto que, en realidad, no es tal.

    No pasa nada, porque el sistema (a través del módulo "monitor") está preparado para detectar este hecho y reintentar la operación por segunda vez.

  • Dado que ahora ya no estamos protegidos por el "connection.abort()" inicial, que comprobaba que la conexión siga viva, es necesario proteger el "connection.commit()" final. En realidad éste era un bug en mi código anterior, ya que ese problema ya existía antes, solo que con una ventana mucho menor. Sigue existiendo la posibilidad de que, por ejemplo, se intente acceder a un objeto que no tenemos en caché y detectar en ese momento que hemos perdido la conexión con DURUS. Podríamos proteger las rutinas decoradas con "monitor" mediante un "try...except socket.error:", ya que no contienen ningún código de sockets y, por tanto, cualquier error de ese tipo será un problema con DURUS. Pero en este contexto concreto es previsible que tengamos todos los objetos ya en caché, porque son pocos, pequeños y nuestro "working set" es básicamente constante.

    En cualquier caso son mejoras a considerar para una versión futura.

  • Con DURUS es posible comprobar si una conexión tiene objetos modificados y, por tanto, necesita hacer un "connection.commit()" mediante una función no documentada. Debido a este hecho, y a que el código gestionado por el decorador "monitor" es corto y sencillo, he preferido ser explícito y que sea el código en cuestión quien indique si necesita hacer un "connection.commit()" o no. En otras circunstancias sería preferible que dicha necesidad la evaluase el decorador "monitor" directamente, aunque tenga que utilizar un método no oficial de DURUS.

  • Antes cuando se perdía la conexión con el servidor DURUS, se machacaba la pantalla con un mensaje de error. Ahora imprimimos el error limpiamente, e imprimimos también cuándo se recupera la conexión.

  • Antes, cuando estábamos intentando recuperar una conexión DURUS, no podíamos cortar el proceso con CONTROL+C porque estamos interceptando todas las excepciones de manera temporal. Con el parche propuesto, permitimos hacer CONTROL+C.


Historia

  • 15/mar/08: Primera versión de esta página.



Python Zope ©2008 jcea@jcea.es

Más información sobre los OpenBadges

Donación BitCoin: 19niBN42ac2pqDQFx6GJZxry2JQSFvwAfS