2014-01-03 23:50:49 +01:00
|
|
|
import random
|
2016-07-12 14:32:02 +02:00
|
|
|
from logging import getLogger
|
2014-01-20 04:26:17 +01:00
|
|
|
|
2014-01-03 03:39:06 +01:00
|
|
|
from rpcudp.protocol import RPCProtocol
|
2014-01-20 04:26:17 +01:00
|
|
|
|
2014-01-03 03:39:06 +01:00
|
|
|
from kademlia.node import Node
|
|
|
|
from kademlia.routing import RoutingTable
|
2014-01-20 04:26:17 +01:00
|
|
|
from kademlia.utils import digest
|
2014-01-03 03:39:06 +01:00
|
|
|
|
|
|
|
|
|
|
|
class KademliaProtocol(RPCProtocol):
|
2014-01-19 19:20:02 +01:00
|
|
|
def __init__(self, sourceNode, storage, ksize):
|
2014-01-04 20:27:45 +01:00
|
|
|
RPCProtocol.__init__(self)
|
2014-01-19 20:12:04 +01:00
|
|
|
self.router = RoutingTable(self, ksize, sourceNode)
|
2014-01-03 03:39:06 +01:00
|
|
|
self.storage = storage
|
2014-01-20 04:26:17 +01:00
|
|
|
self.sourceNode = sourceNode
|
2016-07-12 14:32:02 +02:00
|
|
|
self.log = getLogger("kademlia-protocol")
|
2014-01-03 23:50:49 +01:00
|
|
|
|
|
|
|
def getRefreshIDs(self):
|
|
|
|
"""
|
|
|
|
Get ids to search for to keep old buckets up to date.
|
|
|
|
"""
|
|
|
|
ids = []
|
|
|
|
for bucket in self.router.getLonelyBuckets():
|
2016-08-02 15:55:49 +02:00
|
|
|
ids.append(random.randint(*bucket.range).to_bytes(20, byteorder='big'))
|
2014-01-03 23:50:49 +01:00
|
|
|
return ids
|
2014-01-03 03:39:06 +01:00
|
|
|
|
2014-01-14 04:25:51 +01:00
|
|
|
def rpc_stun(self, sender):
|
|
|
|
return sender
|
|
|
|
|
2014-01-03 03:39:06 +01:00
|
|
|
def rpc_ping(self, sender, nodeid):
|
2014-01-14 01:49:16 +01:00
|
|
|
source = Node(nodeid, sender[0], sender[1])
|
2015-07-18 03:29:36 +02:00
|
|
|
self.welcomeIfNewNode(source)
|
2014-01-20 04:26:17 +01:00
|
|
|
return self.sourceNode.id
|
2014-01-03 03:39:06 +01:00
|
|
|
|
|
|
|
def rpc_store(self, sender, nodeid, key, value):
|
2014-01-14 01:49:16 +01:00
|
|
|
source = Node(nodeid, sender[0], sender[1])
|
2015-07-18 03:29:36 +02:00
|
|
|
self.welcomeIfNewNode(source)
|
2014-01-03 23:50:49 +01:00
|
|
|
self.log.debug("got a store request from %s, storing value" % str(sender))
|
2014-01-03 03:39:06 +01:00
|
|
|
self.storage[key] = value
|
2014-01-18 18:55:34 +01:00
|
|
|
return True
|
2014-01-03 03:39:06 +01:00
|
|
|
|
|
|
|
def rpc_find_node(self, sender, nodeid, key):
|
2016-07-12 14:32:02 +02:00
|
|
|
self.log.info("finding neighbors of %i in local table" % int(nodeid.hex(), 16))
|
2014-01-14 01:49:16 +01:00
|
|
|
source = Node(nodeid, sender[0], sender[1])
|
2015-07-18 03:29:36 +02:00
|
|
|
self.welcomeIfNewNode(source)
|
2014-01-14 01:49:16 +01:00
|
|
|
node = Node(key)
|
2016-07-12 14:32:02 +02:00
|
|
|
return list(map(tuple, self.router.findNeighbors(node, exclude=source)))
|
2014-01-03 03:39:06 +01:00
|
|
|
|
|
|
|
def rpc_find_value(self, sender, nodeid, key):
|
2014-01-14 01:49:16 +01:00
|
|
|
source = Node(nodeid, sender[0], sender[1])
|
2015-07-18 03:29:36 +02:00
|
|
|
self.welcomeIfNewNode(source)
|
2014-01-03 03:39:06 +01:00
|
|
|
value = self.storage.get(key, None)
|
|
|
|
if value is None:
|
|
|
|
return self.rpc_find_node(sender, nodeid, key)
|
|
|
|
return { 'value': value }
|
|
|
|
|
2016-07-12 14:32:02 +02:00
|
|
|
async def callFindNode(self, nodeToAsk, nodeToFind):
|
2014-01-03 03:39:06 +01:00
|
|
|
address = (nodeToAsk.ip, nodeToAsk.port)
|
2016-07-12 14:32:02 +02:00
|
|
|
result = await self.find_node(address, self.sourceNode.id, nodeToFind.id)
|
|
|
|
return self.handleCallResponse(result, nodeToAsk)
|
2014-01-03 03:39:06 +01:00
|
|
|
|
2016-07-12 14:32:02 +02:00
|
|
|
async def callFindValue(self, nodeToAsk, nodeToFind):
|
2014-01-03 03:39:06 +01:00
|
|
|
address = (nodeToAsk.ip, nodeToAsk.port)
|
2016-07-12 14:32:02 +02:00
|
|
|
result = await self.find_value(address, self.sourceNode.id, nodeToFind.id)
|
|
|
|
return self.handleCallResponse(result, nodeToAsk)
|
2014-01-03 03:39:06 +01:00
|
|
|
|
2016-07-12 14:32:02 +02:00
|
|
|
async def callPing(self, nodeToAsk):
|
2014-01-03 03:39:06 +01:00
|
|
|
address = (nodeToAsk.ip, nodeToAsk.port)
|
2016-07-12 14:32:02 +02:00
|
|
|
result = await self.ping(address, self.sourceNode.id)
|
|
|
|
return self.handleCallResponse(result, nodeToAsk)
|
2014-01-03 03:39:06 +01:00
|
|
|
|
2016-07-12 14:32:02 +02:00
|
|
|
async def callStore(self, nodeToAsk, key, value):
|
2014-01-03 03:39:06 +01:00
|
|
|
address = (nodeToAsk.ip, nodeToAsk.port)
|
2016-07-12 14:32:02 +02:00
|
|
|
result = await self.store(address, self.sourceNode.id, key, value)
|
|
|
|
return self.handleCallResponse(result, nodeToAsk)
|
2014-01-03 03:39:06 +01:00
|
|
|
|
2015-07-18 03:29:36 +02:00
|
|
|
def welcomeIfNewNode(self, node):
|
2014-01-20 04:26:17 +01:00
|
|
|
"""
|
2015-07-18 03:29:36 +02:00
|
|
|
Given a new node, send it all the keys/values it should be storing,
|
|
|
|
then add it to the routing table.
|
2014-01-20 04:26:17 +01:00
|
|
|
|
|
|
|
@param node: A new node that just joined (or that we just found out
|
|
|
|
about).
|
|
|
|
|
|
|
|
Process:
|
|
|
|
For each key in storage, get k closest nodes. If newnode is closer
|
|
|
|
than the furtherst in that list, and the node for this server
|
|
|
|
is closer than the closest in that list, then store the key/value
|
|
|
|
on the new node (per section 2.5 of the paper)
|
|
|
|
"""
|
2016-07-12 14:32:02 +02:00
|
|
|
if not self.router.isNewNode(node):
|
|
|
|
return
|
|
|
|
|
|
|
|
self.log.info("never seen %s before, adding to router and setting nearby " % node)
|
|
|
|
for key, value in self.storage.items():
|
|
|
|
keynode = Node(digest(key))
|
|
|
|
neighbors = self.router.findNeighbors(keynode)
|
|
|
|
if len(neighbors) > 0:
|
|
|
|
newNodeClose = node.distanceTo(keynode) < neighbors[-1].distanceTo(keynode)
|
|
|
|
thisNodeClosest = self.sourceNode.distanceTo(keynode) < neighbors[0].distanceTo(keynode)
|
|
|
|
if len(neighbors) == 0 or (newNodeClose and thisNodeClosest):
|
|
|
|
asyncio.ensure_future(self.callStore(node, key, value))
|
|
|
|
self.router.addContact(node)
|
2015-07-15 03:29:48 +02:00
|
|
|
|
2014-01-03 05:06:12 +01:00
|
|
|
def handleCallResponse(self, result, node):
|
2014-01-03 03:39:06 +01:00
|
|
|
"""
|
|
|
|
If we get a response, add the node to the routing table. If
|
|
|
|
we get no response, make sure it's removed from the routing table.
|
|
|
|
"""
|
2016-07-12 14:32:02 +02:00
|
|
|
if not result[0]:
|
|
|
|
self.log.warning("no response from %s, removing from router" % node)
|
2014-01-03 03:39:06 +01:00
|
|
|
self.router.removeContact(node)
|
2016-07-12 14:32:02 +02:00
|
|
|
return result
|
|
|
|
|
|
|
|
self.log.info("got successful response from %s")
|
|
|
|
self.welcomeIfNewNode(node)
|
2014-01-03 03:39:06 +01:00
|
|
|
return result
|