new nodes are now given keys to store based on section 2.5 of the paper

This commit is contained in:
Brian Muller 2014-01-19 22:26:17 -05:00
parent 21dc0cca39
commit 7f07662657
4 changed files with 47 additions and 9 deletions

View File

@ -1,9 +1,13 @@
import random import random
from twisted.internet import defer
from rpcudp.protocol import RPCProtocol from rpcudp.protocol import RPCProtocol
from kademlia.node import Node from kademlia.node import Node
from kademlia.routing import RoutingTable from kademlia.routing import RoutingTable
from kademlia.log import Logger from kademlia.log import Logger
from kademlia.utils import digest
class KademliaProtocol(RPCProtocol): class KademliaProtocol(RPCProtocol):
@ -11,7 +15,7 @@ class KademliaProtocol(RPCProtocol):
RPCProtocol.__init__(self) RPCProtocol.__init__(self)
self.router = RoutingTable(self, ksize, sourceNode) self.router = RoutingTable(self, ksize, sourceNode)
self.storage = storage self.storage = storage
self.sourceID = sourceNode.id self.sourceNode = sourceNode
self.log = Logger(system=self) self.log = Logger(system=self)
def getRefreshIDs(self): def getRefreshIDs(self):
@ -29,7 +33,7 @@ class KademliaProtocol(RPCProtocol):
def rpc_ping(self, sender, nodeid): def rpc_ping(self, sender, nodeid):
source = Node(nodeid, sender[0], sender[1]) source = Node(nodeid, sender[0], sender[1])
self.router.addContact(source) self.router.addContact(source)
return self.sourceID return self.sourceNode.id
def rpc_store(self, sender, nodeid, key, value): def rpc_store(self, sender, nodeid, key, value):
source = Node(nodeid, sender[0], sender[1]) source = Node(nodeid, sender[0], sender[1])
@ -55,24 +59,48 @@ class KademliaProtocol(RPCProtocol):
def callFindNode(self, nodeToAsk, nodeToFind): def callFindNode(self, nodeToAsk, nodeToFind):
address = (nodeToAsk.ip, nodeToAsk.port) address = (nodeToAsk.ip, nodeToAsk.port)
d = self.find_node(address, self.sourceID, nodeToFind.id) d = self.find_node(address, self.sourceNode.id, nodeToFind.id)
return d.addCallback(self.handleCallResponse, nodeToAsk) return d.addCallback(self.handleCallResponse, nodeToAsk)
def callFindValue(self, nodeToAsk, nodeToFind): def callFindValue(self, nodeToAsk, nodeToFind):
address = (nodeToAsk.ip, nodeToAsk.port) address = (nodeToAsk.ip, nodeToAsk.port)
d = self.find_value(address, self.sourceID, nodeToFind.id) d = self.find_value(address, self.sourceNode.id, nodeToFind.id)
return d.addCallback(self.handleCallResponse, nodeToAsk) return d.addCallback(self.handleCallResponse, nodeToAsk)
def callPing(self, nodeToAsk): def callPing(self, nodeToAsk):
address = (nodeToAsk.ip, nodeToAsk.port) address = (nodeToAsk.ip, nodeToAsk.port)
d = self.ping(address, self.sourceID) d = self.ping(address, self.sourceNode.id)
return d.addCallback(self.handleCallResponse, nodeToAsk) return d.addCallback(self.handleCallResponse, nodeToAsk)
def callStore(self, nodeToAsk, key, value): def callStore(self, nodeToAsk, key, value):
address = (nodeToAsk.ip, nodeToAsk.port) address = (nodeToAsk.ip, nodeToAsk.port)
d = self.store(address, self.sourceID, key, value) d = self.store(address, self.sourceNode.id, key, value)
return d.addCallback(self.handleCallResponse, nodeToAsk) return d.addCallback(self.handleCallResponse, nodeToAsk)
def transferKeyValues(self, node):
"""
Given a new node, send it all the keys/values it should be storing.
@param node: A new node that just joined (or that we just found out
about).
Process:
For each key in storage, get k closest nodes. If newnode is closer
than the furtherst in that list, and the node for this server
is closer than the closest in that list, then store the key/value
on the new node (per section 2.5 of the paper)
"""
ds = []
for key, value in self.storage.iteritems():
keynode = Node(digest(key))
neighbors = self.router.findNeighbors(keynode)
if len(neighbors) > 0:
newNodeClose = node.distanceTo(keynode) < neighbors[-1].distanceTo(keynode)
thisNodeClosest = self.sourceNode.distanceTo(keynode) < neighbors[0].distanceTo(keynode)
if len(neighbors) == 0 or (newNodeClose and thisNodeClosest):
ds.append(self.callStore(node, key, value))
return defer.gatherResults(ds)
def handleCallResponse(self, result, node): def handleCallResponse(self, result, node):
""" """
If we get a response, add the node to the routing table. If If we get a response, add the node to the routing table. If
@ -81,6 +109,8 @@ class KademliaProtocol(RPCProtocol):
if result[0]: if result[0]:
self.log.info("got response from %s, adding to router" % node) self.log.info("got response from %s, adding to router" % node)
self.router.addContact(node) self.router.addContact(node)
if self.router.isNewNode(node):
self.transferKeyValues(node)
else: else:
self.log.debug("no response from %s, removing from router" % node) self.log.debug("no response from %s, removing from router" % node)
self.router.removeContact(node) self.router.removeContact(node)

View File

@ -3,7 +3,8 @@ import time
import operator import operator
from collections import OrderedDict from collections import OrderedDict
from kademlia.utils import OrderedSet from kademlia.utils import OrderedSet, sharedPrefix
class KBucket(object): class KBucket(object):
def __init__(self, rangeLower, rangeUpper, ksize): def __init__(self, rangeLower, rangeUpper, ksize):
@ -41,6 +42,9 @@ class KBucket(object):
def hasInRange(self, node): def hasInRange(self, node):
return self.range[0] <= node.long_id <= self.range[1] return self.range[0] <= node.long_id <= self.range[1]
def isNewNode(self, node):
return node.id not in self.nodes
def addNode(self, node): def addNode(self, node):
""" """
Add a C{Node} to the C{KBucket}. Return True if successful, Add a C{Node} to the C{KBucket}. Return True if successful,
@ -136,6 +140,10 @@ class RoutingTable(object):
index = self.getBucketFor(node) index = self.getBucketFor(node)
self.buckets[index].removeNode(node) self.buckets[index].removeNode(node)
def isNewNode(self, node):
index = self.getBucketFor(node)
return self.buckets[index].isNewNode(node)
def addContact(self, node): def addContact(self, node):
index = self.getBucketFor(node) index = self.getBucketFor(node)
bucket = self.buckets[index] bucket = self.buckets[index]

View File

@ -3,7 +3,6 @@ import hashlib
from twisted.trial import unittest from twisted.trial import unittest
from kademlia.utils import digest
from kademlia.node import Node, NodeHeap from kademlia.node import Node, NodeHeap
from kademlia.tests.utils import mknode from kademlia.tests.utils import mknode

View File

@ -2,6 +2,8 @@
General catchall for functions that don't make sense as methods. General catchall for functions that don't make sense as methods.
""" """
import hashlib import hashlib
import operator
from twisted.internet import defer from twisted.internet import defer
@ -54,4 +56,3 @@ def sharedPrefix(args):
break break
i += 1 i += 1
return args[0][:i] return args[0][:i]