From 7f07662657104bc01802435022dc8f7e05b23f4f Mon Sep 17 00:00:00 2001 From: Brian Muller Date: Sun, 19 Jan 2014 22:26:17 -0500 Subject: [PATCH] new nodes are now given keys to store based on section 2.5 of the paper --- kademlia/protocol.py | 42 +++++++++++++++++++++++++++++++------ kademlia/routing.py | 10 ++++++++- kademlia/tests/test_node.py | 1 - kademlia/utils.py | 3 ++- 4 files changed, 47 insertions(+), 9 deletions(-) diff --git a/kademlia/protocol.py b/kademlia/protocol.py index e64e9cc..7d963e4 100644 --- a/kademlia/protocol.py +++ b/kademlia/protocol.py @@ -1,9 +1,13 @@ import random +from twisted.internet import defer + from rpcudp.protocol import RPCProtocol + from kademlia.node import Node from kademlia.routing import RoutingTable from kademlia.log import Logger +from kademlia.utils import digest class KademliaProtocol(RPCProtocol): @@ -11,7 +15,7 @@ class KademliaProtocol(RPCProtocol): RPCProtocol.__init__(self) self.router = RoutingTable(self, ksize, sourceNode) self.storage = storage - self.sourceID = sourceNode.id + self.sourceNode = sourceNode self.log = Logger(system=self) def getRefreshIDs(self): @@ -29,7 +33,7 @@ class KademliaProtocol(RPCProtocol): def rpc_ping(self, sender, nodeid): source = Node(nodeid, sender[0], sender[1]) self.router.addContact(source) - return self.sourceID + return self.sourceNode.id def rpc_store(self, sender, nodeid, key, value): source = Node(nodeid, sender[0], sender[1]) @@ -55,24 +59,48 @@ class KademliaProtocol(RPCProtocol): def callFindNode(self, nodeToAsk, nodeToFind): address = (nodeToAsk.ip, nodeToAsk.port) - d = self.find_node(address, self.sourceID, nodeToFind.id) + d = self.find_node(address, self.sourceNode.id, nodeToFind.id) return d.addCallback(self.handleCallResponse, nodeToAsk) def callFindValue(self, nodeToAsk, nodeToFind): address = (nodeToAsk.ip, nodeToAsk.port) - d = self.find_value(address, self.sourceID, nodeToFind.id) + d = self.find_value(address, self.sourceNode.id, nodeToFind.id) return d.addCallback(self.handleCallResponse, nodeToAsk) def callPing(self, nodeToAsk): address = (nodeToAsk.ip, nodeToAsk.port) - d = self.ping(address, self.sourceID) + d = self.ping(address, self.sourceNode.id) return d.addCallback(self.handleCallResponse, nodeToAsk) def callStore(self, nodeToAsk, key, value): address = (nodeToAsk.ip, nodeToAsk.port) - d = self.store(address, self.sourceID, key, value) + d = self.store(address, self.sourceNode.id, key, value) return d.addCallback(self.handleCallResponse, nodeToAsk) + def transferKeyValues(self, node): + """ + Given a new node, send it all the keys/values it should be storing. + + @param node: A new node that just joined (or that we just found out + about). + + Process: + For each key in storage, get k closest nodes. If newnode is closer + than the furtherst in that list, and the node for this server + is closer than the closest in that list, then store the key/value + on the new node (per section 2.5 of the paper) + """ + ds = [] + for key, value in self.storage.iteritems(): + keynode = Node(digest(key)) + neighbors = self.router.findNeighbors(keynode) + if len(neighbors) > 0: + newNodeClose = node.distanceTo(keynode) < neighbors[-1].distanceTo(keynode) + thisNodeClosest = self.sourceNode.distanceTo(keynode) < neighbors[0].distanceTo(keynode) + if len(neighbors) == 0 or (newNodeClose and thisNodeClosest): + ds.append(self.callStore(node, key, value)) + return defer.gatherResults(ds) + def handleCallResponse(self, result, node): """ If we get a response, add the node to the routing table. If @@ -81,6 +109,8 @@ class KademliaProtocol(RPCProtocol): if result[0]: self.log.info("got response from %s, adding to router" % node) self.router.addContact(node) + if self.router.isNewNode(node): + self.transferKeyValues(node) else: self.log.debug("no response from %s, removing from router" % node) self.router.removeContact(node) diff --git a/kademlia/routing.py b/kademlia/routing.py index 20b24f0..e445f1d 100644 --- a/kademlia/routing.py +++ b/kademlia/routing.py @@ -3,7 +3,8 @@ import time import operator from collections import OrderedDict -from kademlia.utils import OrderedSet +from kademlia.utils import OrderedSet, sharedPrefix + class KBucket(object): def __init__(self, rangeLower, rangeUpper, ksize): @@ -41,6 +42,9 @@ class KBucket(object): def hasInRange(self, node): return self.range[0] <= node.long_id <= self.range[1] + def isNewNode(self, node): + return node.id not in self.nodes + def addNode(self, node): """ Add a C{Node} to the C{KBucket}. Return True if successful, @@ -136,6 +140,10 @@ class RoutingTable(object): index = self.getBucketFor(node) self.buckets[index].removeNode(node) + def isNewNode(self, node): + index = self.getBucketFor(node) + return self.buckets[index].isNewNode(node) + def addContact(self, node): index = self.getBucketFor(node) bucket = self.buckets[index] diff --git a/kademlia/tests/test_node.py b/kademlia/tests/test_node.py index b42115a..60ca14b 100644 --- a/kademlia/tests/test_node.py +++ b/kademlia/tests/test_node.py @@ -3,7 +3,6 @@ import hashlib from twisted.trial import unittest -from kademlia.utils import digest from kademlia.node import Node, NodeHeap from kademlia.tests.utils import mknode diff --git a/kademlia/utils.py b/kademlia/utils.py index 4fd0d7b..43e342f 100644 --- a/kademlia/utils.py +++ b/kademlia/utils.py @@ -2,6 +2,8 @@ General catchall for functions that don't make sense as methods. """ import hashlib +import operator + from twisted.internet import defer @@ -54,4 +56,3 @@ def sharedPrefix(args): break i += 1 return args[0][:i] -