now storing replacement cache in buckets in the routing table

This commit is contained in:
Brian Muller 2014-01-19 13:20:02 -05:00
parent 7962e43a6c
commit c370c8b95b
5 changed files with 45 additions and 14 deletions

View File

@ -35,7 +35,7 @@ class Server(object):
self.log = Logger(system=self)
storage = ForgetfulStorage()
self.node = Node(id or digest(random.getrandbits(255)))
self.protocol = KademliaProtocol(self.node.id, storage, ksize)
self.protocol = KademliaProtocol(self.node, storage, ksize)
self.refreshLoop = LoopingCall(self.refreshTable).start(3600)
def listen(self, port):
@ -123,6 +123,9 @@ class Server(object):
"""
node = Node(digest(key))
nearest = self.protocol.router.findNeighbors(node)
if len(nearest) == 0:
self.log.warning("There are no known neighbors to get key %s" % key)
return defer.succeed(None)
spider = ValueSpiderCrawl(self.protocol, node, nearest, self.ksize, self.alpha)
return spider.find()
@ -140,6 +143,9 @@ class Server(object):
node = Node(dkey)
nearest = self.protocol.router.findNeighbors(node)
if len(nearest) == 0:
self.log.warning("There are no known neighbors to set key %s" % key)
return defer.succeed(False)
spider = NodeSpiderCrawl(self.protocol, node, nearest, self.ksize, self.alpha)
return spider.find().addCallback(store)
@ -154,7 +160,7 @@ class Server(object):
return True
return False
def save(self, fname):
def saveState(self, fname):
data = { 'ksize': self.ksize,
'alpha': self.alpha,
'id': self.node.id,
@ -163,7 +169,7 @@ class Server(object):
pickle.dump(data, f)
@classmethod
def load(self, fname):
def loadState(self, fname):
with open(fname, 'r') as f:
data = pickle.load(f)
s = Server(data['ksize'], data['alpha'], data['id'])
@ -171,12 +177,12 @@ class Server(object):
s.bootstrap(data['neighbors'])
return s
def saveRegularly(self, fname, frequency=600):
def saveStateRegularly(self, fname, frequency=600):
"""
@param fname: File to save retularly to
@param frequencey: Frequency in seconds that the state
should be saved. By default, 10 minutes.
"""
loop = LoopingCall(self.save, fname)
loop = LoopingCall(self.saveState, fname)
loop.start(frequency)
return loop

View File

@ -7,11 +7,11 @@ from kademlia.log import Logger
class KademliaProtocol(RPCProtocol):
def __init__(self, sourceID, storage, ksize):
def __init__(self, sourceNode, storage, ksize):
RPCProtocol.__init__(self)
self.router = RoutingTable(self, ksize)
self.router = RoutingTable(self, sourceNode, ksize)
self.storage = storage
self.sourceID = sourceID
self.sourceID = sourceNode.id
self.log = Logger(system=self)
def getRefreshIDs(self):

View File

@ -3,11 +3,13 @@ import time
import operator
from collections import OrderedDict
from kademlia.utils import OrderedSet
class KBucket(object):
def __init__(self, rangeLower, rangeUpper, ksize):
self.range = (rangeLower, rangeUpper)
self.nodes = OrderedDict()
self.replacementNodes = OrderedSet()
self.touchLastUpdated()
self.ksize = ksize
@ -27,8 +29,14 @@ class KBucket(object):
return (one, two)
def removeNode(self, node):
if node.id in self.nodes:
if not node.id in self.nodes:
return
# delete node, and see if we can add a replacement
del self.nodes[node.id]
if len(self.replacementNodes) > 0:
newnode = self.replacementNodes.pop()
self.nodes[newnode.id] = newnode
def hasInRange(self, node):
return self.range[0] <= node.long_id <= self.range[1]
@ -37,6 +45,9 @@ class KBucket(object):
"""
Add a C{Node} to the C{KBucket}. Return True if successful,
False if the bucket is full.
If the bucket is full, keep track of node in a replacement list,
per section 4.1 of the paper.
"""
if node.id in self.nodes:
del self.nodes[node.id]
@ -44,6 +55,7 @@ class KBucket(object):
elif len(self) < self.ksize:
self.nodes[node.id] = node
else:
self.replacementNodes.push(node)
return False
return True
@ -90,7 +102,13 @@ class TableTraverser(object):
class RoutingTable(object):
def __init__(self, protocol, ksize):
def __init__(self, protocol, ksize, node):
"""
@param node: The node that represents this server. It won't
be added to the routing table, but will be needed later to
determine which buckets to split or not.
"""
self.node = node
self.protocol = protocol
self.ksize = ksize
self.flush()
@ -123,7 +141,7 @@ class RoutingTable(object):
if bucket.addNode(node):
return
if bucket.hasInRange(node):
if bucket.hasInRange(self.node):
self.splitBucket(index)
self.addContact(node)
else:

View File

@ -32,3 +32,10 @@ def deferredDict(d):
dl = defer.DeferredList(d.values())
return dl.addCallback(handle, d.keys())
class OrderedSet(list):
def push(self, thing):
if thing in self:
self.remove(thing)
self.append(thing)

View File

@ -11,11 +11,11 @@ application = service.Application("kademlia")
application.setComponent(ILogObserver, log.FileLogObserver(sys.stdout, log.INFO).emit)
if os.path.isfile('cache.pickle'):
kserver = Server.load('cache.pickle')
kserver = Server.loadState('cache.pickle')
else:
kserver = Server()
kserver.bootstrap([("1.2.3.4", 8468)])
kserver.saveRegularly('cache.pickle', 10)
kserver.saveStateRegularly('cache.pickle', 10)
server = internet.UDPServer(8468, kserver.protocol)
server.setServiceParent(application)