Index: BitTornado/BT1/DownloaderFeedback.py =================================================================== --- BitTornado/BT1/DownloaderFeedback.py (revision 5) +++ BitTornado/BT1/DownloaderFeedback.py (working copy) @@ -32,6 +32,8 @@ self.doneprocessing.set() if statusfunc: self.autodisplay(statusfunc, interval) + + self.up_quota=0 def _rotate(self): @@ -98,6 +100,9 @@ return l + def set_upquota(self,quota) : + self.up_quota=quota + def gather(self, displayfunc = None): s = {'stats': self.statistics.update()} if self.sp.isSet(): @@ -105,6 +110,10 @@ else: s['spew'] = None s['up'] = self.upfunc() + + if self.up_quota and (not self.choker.paused) and (s["stats"].upTotal>=self.up_quota) : + self.choker.pause(True) + if self.finflag.isSet(): s['done'] = self.file_length return s Index: BitTornado/download_bt1.py =================================================================== --- BitTornado/download_bt1.py (revision 5) +++ BitTornado/download_bt1.py (working copy) @@ -18,7 +18,7 @@ from BT1.Rerequester import Rerequester from BT1.DownloaderFeedback import DownloaderFeedback from RateMeasure import RateMeasure -from CurrentRateMeasure import Measure +from CurrentRateMeasure import MeasureDurus from BT1.PiecePicker import PiecePicker from BT1.Statistics import Statistics from ConfigDir import ConfigDir @@ -324,8 +324,16 @@ self.myid = id self.rawserver = rawserver self.port = port - + self.info = self.response['info'] + + if "length" in self.info : + self.length=self.info["length"] + else : + self.length=0 + for i in self.info["files"] : + self.length+=i["length"] + self.pieces = [self.info['pieces'][x:x+20] for x in xrange(0, len(self.info['pieces']), 20)] self.len_pieces = len(self.pieces) @@ -592,9 +600,11 @@ for i in xrange(self.len_pieces): if self.storagewrapper.do_I_have(i): self.picker.complete(i) - self.upmeasure = Measure(self.config['max_rate_period'], - self.config['upload_rate_fudge']) - self.downmeasure = Measure(self.config['max_rate_period']) + self.upmeasure = MeasureDurus(self.config['max_rate_period'], + infohash=self.infohash,up=True, + fudge=self.config['upload_rate_fudge']) + self.downmeasure = MeasureDurus(self.config['max_rate_period'], + infohash=self.infohash,up=False) if ratelimiter: self.ratelimiter = ratelimiter @@ -641,6 +651,16 @@ if self.config['super_seeder']: self.set_super_seed() + + self.upquota=0 + if self.response["announce"].lower().startswith("http://***AQUI PONEMOS LA URL DEL TRACKER***") : + if self.length>=300*1000*1000 : # Por 1000 y no por 1024, por si sus megas no son mis megas + self.upquota=10*self.length-10*1024*1024 # Aqui mejor dejar algo de margen, para curarse en salud + + if self.upquota and (self.upmeasure.get_total()>=self.upquota) : + self.choker.pause(True) + + self.started = True return True @@ -696,18 +716,26 @@ displayfunc = self.statusfunc self._init_stats() - DownloaderFeedback(self.choker, self.httpdownloader, self.rawserver.add_task, + d=DownloaderFeedback(self.choker, self.httpdownloader, self.rawserver.add_task, self.upmeasure.get_rate, self.downmeasure.get_rate, self.ratemeasure, self.storagewrapper.get_stats, self.datalength, self.finflag, self.spewflag, self.statistics, displayfunc, self.config['display_interval']) + if self.upquota : + d.set_upquota(self.upquota) + del d + def startStats(self): self._init_stats() d = DownloaderFeedback(self.choker, self.httpdownloader, self.rawserver.add_task, self.upmeasure.get_rate, self.downmeasure.get_rate, self.ratemeasure, self.storagewrapper.get_stats, self.datalength, self.finflag, self.spewflag, self.statistics) + + if self.upquota : + d.set_upquota(self.upquota) + return d.gather @@ -880,3 +908,4 @@ def get_transfer_stats(self): return self.upmeasure.get_total(), self.downmeasure.get_total() + Index: BitTornado/CurrentRateMeasure.py =================================================================== --- BitTornado/CurrentRateMeasure.py (revision 5) +++ BitTornado/CurrentRateMeasure.py (working copy) @@ -3,6 +3,66 @@ from clock import clock +from monitor import monitor + +from time import time + +class MeasureDurus: + def __init__(self, max_rate_period, infohash,up,fudge = 1): + self.max_rate_period = max_rate_period + self.ratesince = clock() - fudge + self.last = self.ratesince + self.rate = 0.0 + self.infohash=infohash + self.up=0 if up else 1 + + @monitor + def x(conn,infohash,up) : + speed=conn.get_root()["BT"] + if infohash not in speed : + speed[infohash]=[0,0,time()] + grand_total=speed[infohash][up] + return grand_total + + self.total=x(self.infohash,self.up) + + def update_rate(self, amount): + @monitor + def x(conn,infohash,up,amount) : + speed=conn.get_root()["BT"] + x=speed[infohash] + total=x[up]+amount + if amount : # Solo graba si hay algo que grabar + x[up]=total + x[2]=time() + speed[infohash]=x + return total + + self.total=x(self.infohash,self.up,amount) + + t = clock() + self.rate = (self.rate * (self.last - self.ratesince) + + amount) / (t - self.ratesince + 0.0001) + self.last = t + if self.ratesince < t - self.max_rate_period: + self.ratesince = t - self.max_rate_period + + def get_rate(self): + self.update_rate(0) + return self.rate + + def get_rate_noupdate(self): + return self.rate + + def time_until_rate(self, newrate): + if self.rate <= newrate: + return 0 + t = clock() - self.ratesince + return ((self.rate * t) / newrate) - t + + def get_total(self): + return self.total + class Measure: def __init__(self, max_rate_period, fudge = 1): self.max_rate_period = max_rate_period @@ -13,8 +73,9 @@ def update_rate(self, amount): self.total += amount + t = clock() - self.rate = (self.rate * (self.last - self.ratesince) + + self.rate = (self.rate * (self.last - self.ratesince) + amount) / (t - self.ratesince + 0.0001) self.last = t if self.ratesince < t - self.max_rate_period: @@ -34,4 +95,5 @@ return ((self.rate * t) / newrate) - t def get_total(self): - return self.total \ No newline at end of file + return self.total + Index: BitTornado/monitor.py =================================================================== --- BitTornado/monitor.py (revision 0) +++ BitTornado/monitor.py (revision 0) @@ -0,0 +1,67 @@ +import threading +import time + +persistencia_mutex=threading.Lock() + +def conecta_storage() : + from durus.client_storage import ClientStorage + from durus.connection import Connection + return Connection(ClientStorage(address="/video/0/durus-berkeleydbstorage/db/unix_socket")) + +while True : + try : + persistencia = conecta_storage() + break + except : + time.sleep(0.1) + +def monitor(func) : + def _monitor(*args, **kwargs) : + global persistencia + global persistencia_mutex + from durus.error import ConflictError + import socket + while True : # Reintenta si hay conflictos + persistencia_mutex.acquire() + try : # Nos aseguramos de liberar el lock + try : # Conflictos + + try : # Nos aseguramos de estar conectados + persistencia.abort() # Hacemos limpieza de cache + except socket.error : + import sys + import time + print >>sys.stderr,"PROBLEMAS CON LA CONEXION DURUS... ABRIENDO UNA CONEXION NUEVA" + while True : + try : + persistencia = conecta_storage() + break + except : + time.sleep(0.1) + + ret=func(persistencia,*args, **kwargs) + persistencia.commit() + return ret + except ConflictError : + pass # El abort ya se hace en el bucle + except : + persistencia.abort() + import sys + import time + print >>sys.stderr,time.ctime() + raise + finally : + persistencia_mutex.release() + + return _monitor + +@monitor +def inicializa(conn) : + from durus.btree import BTree + from durus.persistent_dict import PersistentDict + root=conn.get_root() + if "BT" not in root : + root["BT"]=BTree() + +inicializa() +