From c370c8b95b7a94ac689e6fe6d3071e3eb1a955cc Mon Sep 17 00:00:00 2001 From: Brian Muller Date: Sun, 19 Jan 2014 13:20:02 -0500 Subject: [PATCH] now storing replacement cache in buckets in the routing table --- kademlia/network.py | 16 +++++++++++----- kademlia/protocol.py | 6 +++--- kademlia/routing.py | 26 ++++++++++++++++++++++---- kademlia/utils.py | 7 +++++++ server.tac | 4 ++-- 5 files changed, 45 insertions(+), 14 deletions(-) diff --git a/kademlia/network.py b/kademlia/network.py index fd69c32..717d2d9 100644 --- a/kademlia/network.py +++ b/kademlia/network.py @@ -35,7 +35,7 @@ class Server(object): self.log = Logger(system=self) storage = ForgetfulStorage() self.node = Node(id or digest(random.getrandbits(255))) - self.protocol = KademliaProtocol(self.node.id, storage, ksize) + self.protocol = KademliaProtocol(self.node, storage, ksize) self.refreshLoop = LoopingCall(self.refreshTable).start(3600) def listen(self, port): @@ -123,6 +123,9 @@ class Server(object): """ node = Node(digest(key)) nearest = self.protocol.router.findNeighbors(node) + if len(nearest) == 0: + self.log.warning("There are no known neighbors to get key %s" % key) + return defer.succeed(None) spider = ValueSpiderCrawl(self.protocol, node, nearest, self.ksize, self.alpha) return spider.find() @@ -140,6 +143,9 @@ class Server(object): node = Node(dkey) nearest = self.protocol.router.findNeighbors(node) + if len(nearest) == 0: + self.log.warning("There are no known neighbors to set key %s" % key) + return defer.succeed(False) spider = NodeSpiderCrawl(self.protocol, node, nearest, self.ksize, self.alpha) return spider.find().addCallback(store) @@ -154,7 +160,7 @@ class Server(object): return True return False - def save(self, fname): + def saveState(self, fname): data = { 'ksize': self.ksize, 'alpha': self.alpha, 'id': self.node.id, @@ -163,7 +169,7 @@ class Server(object): pickle.dump(data, f) @classmethod - def load(self, fname): + def loadState(self, fname): with open(fname, 'r') as f: data = pickle.load(f) s = Server(data['ksize'], data['alpha'], data['id']) @@ -171,12 +177,12 @@ class Server(object): s.bootstrap(data['neighbors']) return s - def saveRegularly(self, fname, frequency=600): + def saveStateRegularly(self, fname, frequency=600): """ @param fname: File to save retularly to @param frequencey: Frequency in seconds that the state should be saved. By default, 10 minutes. """ - loop = LoopingCall(self.save, fname) + loop = LoopingCall(self.saveState, fname) loop.start(frequency) return loop diff --git a/kademlia/protocol.py b/kademlia/protocol.py index b4b8bfa..a9c06e4 100644 --- a/kademlia/protocol.py +++ b/kademlia/protocol.py @@ -7,11 +7,11 @@ from kademlia.log import Logger class KademliaProtocol(RPCProtocol): - def __init__(self, sourceID, storage, ksize): + def __init__(self, sourceNode, storage, ksize): RPCProtocol.__init__(self) - self.router = RoutingTable(self, ksize) + self.router = RoutingTable(self, sourceNode, ksize) self.storage = storage - self.sourceID = sourceID + self.sourceID = sourceNode.id self.log = Logger(system=self) def getRefreshIDs(self): diff --git a/kademlia/routing.py b/kademlia/routing.py index 3bbe3ed..df79a0f 100644 --- a/kademlia/routing.py +++ b/kademlia/routing.py @@ -3,11 +3,13 @@ import time import operator from collections import OrderedDict +from kademlia.utils import OrderedSet class KBucket(object): def __init__(self, rangeLower, rangeUpper, ksize): self.range = (rangeLower, rangeUpper) self.nodes = OrderedDict() + self.replacementNodes = OrderedSet() self.touchLastUpdated() self.ksize = ksize @@ -27,8 +29,14 @@ class KBucket(object): return (one, two) def removeNode(self, node): - if node.id in self.nodes: - del self.nodes[node.id] + if not node.id in self.nodes: + return + + # delete node, and see if we can add a replacement + del self.nodes[node.id] + if len(self.replacementNodes) > 0: + newnode = self.replacementNodes.pop() + self.nodes[newnode.id] = newnode def hasInRange(self, node): return self.range[0] <= node.long_id <= self.range[1] @@ -37,6 +45,9 @@ class KBucket(object): """ Add a C{Node} to the C{KBucket}. Return True if successful, False if the bucket is full. + + If the bucket is full, keep track of node in a replacement list, + per section 4.1 of the paper. """ if node.id in self.nodes: del self.nodes[node.id] @@ -44,6 +55,7 @@ class KBucket(object): elif len(self) < self.ksize: self.nodes[node.id] = node else: + self.replacementNodes.push(node) return False return True @@ -90,7 +102,13 @@ class TableTraverser(object): class RoutingTable(object): - def __init__(self, protocol, ksize): + def __init__(self, protocol, ksize, node): + """ + @param node: The node that represents this server. It won't + be added to the routing table, but will be needed later to + determine which buckets to split or not. + """ + self.node = node self.protocol = protocol self.ksize = ksize self.flush() @@ -123,7 +141,7 @@ class RoutingTable(object): if bucket.addNode(node): return - if bucket.hasInRange(node): + if bucket.hasInRange(self.node): self.splitBucket(index) self.addContact(node) else: diff --git a/kademlia/utils.py b/kademlia/utils.py index 8b55bff..6047640 100644 --- a/kademlia/utils.py +++ b/kademlia/utils.py @@ -32,3 +32,10 @@ def deferredDict(d): dl = defer.DeferredList(d.values()) return dl.addCallback(handle, d.keys()) + + +class OrderedSet(list): + def push(self, thing): + if thing in self: + self.remove(thing) + self.append(thing) diff --git a/server.tac b/server.tac index 56ea272..8973fbc 100644 --- a/server.tac +++ b/server.tac @@ -11,11 +11,11 @@ application = service.Application("kademlia") application.setComponent(ILogObserver, log.FileLogObserver(sys.stdout, log.INFO).emit) if os.path.isfile('cache.pickle'): - kserver = Server.load('cache.pickle') + kserver = Server.loadState('cache.pickle') else: kserver = Server() kserver.bootstrap([("1.2.3.4", 8468)]) -kserver.saveRegularly('cache.pickle', 10) +kserver.saveStateRegularly('cache.pickle', 10) server = internet.UDPServer(8468, kserver.protocol) server.setServiceParent(application)