update tests without unittest only pytest (#62)

This commit is contained in:
joriscarrier 2019-06-15 16:44:53 +02:00 committed by Brian Muller
parent 4a8d445c9e
commit 3631505ae2
11 changed files with 133 additions and 130 deletions

View File

@ -6,4 +6,4 @@ python:
dist: dist:
- xenial - xenial
install: pip install . && pip install -r dev-requirements.txt install: pip install . && pip install -r dev-requirements.txt
script: py.test script: pytest

View File

@ -5,3 +5,4 @@ sphinxcontrib-napoleon>=0.6.1
sphinxcontrib-zopeext>=0.2.1 sphinxcontrib-zopeext>=0.2.1
pytest>=4.1.0 pytest>=4.1.0
pytest-asyncio>=0.10.0 pytest-asyncio>=0.10.0
pytest-cov>=2.6.1

View File

@ -1,6 +1,12 @@
import random
import hashlib
from struct import pack
import pytest import pytest
from kademlia.network import Server from kademlia.network import Server
from kademlia.node import Node
from kademlia.routing import RoutingTable
@pytest.yield_fixture @pytest.yield_fixture
@ -12,3 +18,40 @@ def bootstrap_node(event_loop):
yield ('127.0.0.1', 8468) yield ('127.0.0.1', 8468)
finally: finally:
server.stop() server.stop()
# pylint: disable=redefined-outer-name
@pytest.fixture()
def mknode():
def _mknode(node_id=None, ip_addy=None, port=None, intid=None):
"""
Make a node. Created a random id if not specified.
"""
if intid is not None:
node_id = pack('>l', intid)
if not node_id:
randbits = str(random.getrandbits(255))
node_id = hashlib.sha1(randbits.encode()).digest()
return Node(node_id, ip_addy, port)
return _mknode
# pylint: disable=too-few-public-methods
class FakeProtocol: # pylint: disable=too-few-public-methods
def __init__(self, source_id, ksize=20):
self.router = RoutingTable(self, ksize, Node(source_id))
self.storage = {}
self.source_id = source_id
# pylint: disable=too-few-public-methods
class FakeServer:
def __init__(self, node_id):
self.id = node_id # pylint: disable=invalid-name
self.protocol = FakeProtocol(self.id)
self.router = self.protocol.router
@pytest.fixture
def fake_server(mknode):
return FakeServer(mknode().id)

View File

@ -1,4 +1,3 @@
import unittest
from glob import glob from glob import glob
import pycodestyle import pycodestyle
@ -10,7 +9,7 @@ class LintError(Exception):
pass pass
class TestCodeLinting(unittest.TestCase): class TestCodeLinting:
# pylint: disable=no-self-use # pylint: disable=no-self-use
def test_pylint(self): def test_pylint(self):
(stdout, _) = lint.py_run('kademlia', return_std=True) (stdout, _) = lint.py_run('kademlia', return_std=True)

View File

@ -1,56 +1,54 @@
import unittest
import random import random
import hashlib import hashlib
from kademlia.node import Node, NodeHeap from kademlia.node import Node, NodeHeap
from kademlia.tests.utils import mknode
class NodeTest(unittest.TestCase): class TestNode:
def test_long_id(self): def test_long_id(self): # pylint: disable=no-self-use
rid = hashlib.sha1(str(random.getrandbits(255)).encode()).digest() rid = hashlib.sha1(str(random.getrandbits(255)).encode()).digest()
node = Node(rid) node = Node(rid)
self.assertEqual(node.long_id, int(rid.hex(), 16)) assert node.long_id == int(rid.hex(), 16)
def test_distance_calculation(self): def test_distance_calculation(self): # pylint: disable=no-self-use
ridone = hashlib.sha1(str(random.getrandbits(255)).encode()) ridone = hashlib.sha1(str(random.getrandbits(255)).encode())
ridtwo = hashlib.sha1(str(random.getrandbits(255)).encode()) ridtwo = hashlib.sha1(str(random.getrandbits(255)).encode())
shouldbe = int(ridone.hexdigest(), 16) ^ int(ridtwo.hexdigest(), 16) shouldbe = int(ridone.hexdigest(), 16) ^ int(ridtwo.hexdigest(), 16)
none = Node(ridone.digest()) none = Node(ridone.digest())
ntwo = Node(ridtwo.digest()) ntwo = Node(ridtwo.digest())
self.assertEqual(none.distance_to(ntwo), shouldbe) assert none.distance_to(ntwo) == shouldbe
class NodeHeapTest(unittest.TestCase): class TestNodeHeap:
def test_max_size(self): def test_max_size(self, mknode): # pylint: disable=no-self-use
node = NodeHeap(mknode(intid=0), 3) node = NodeHeap(mknode(intid=0), 3)
self.assertEqual(0, len(node)) assert not node
for digit in range(10): for digit in range(10):
node.push(mknode(intid=digit)) node.push(mknode(intid=digit))
self.assertEqual(3, len(node))
self.assertEqual(3, len(list(node))) assert len(node) == 3
assert len(list(node)) == 3
def test_iteration(self): def test_iteration(self, mknode): # pylint: disable=no-self-use
heap = NodeHeap(mknode(intid=0), 5) heap = NodeHeap(mknode(intid=0), 5)
nodes = [mknode(intid=x) for x in range(10)] nodes = [mknode(intid=x) for x in range(10)]
for index, node in enumerate(nodes): for index, node in enumerate(nodes):
heap.push(node) heap.push(node)
for index, node in enumerate(heap): for index, node in enumerate(heap):
self.assertEqual(index, node.long_id) assert index == node.long_id
self.assertTrue(index < 5) assert index < 5
def test_remove(self): def test_remove(self, mknode): # pylint: disable=no-self-use
heap = NodeHeap(mknode(intid=0), 5) heap = NodeHeap(mknode(intid=0), 5)
nodes = [mknode(intid=x) for x in range(10)] nodes = [mknode(intid=x) for x in range(10)]
for node in nodes: for node in nodes:
heap.push(node) heap.push(node)
heap.remove([nodes[0].id, nodes[1].id]) heap.remove([nodes[0].id, nodes[1].id])
self.assertEqual(len(list(heap)), 5) assert len(list(heap)) == 5
for index, node in enumerate(heap): for index, node in enumerate(heap):
self.assertEqual(index + 2, node.long_id) assert index + 2 == node.long_id
self.assertTrue(index < 5) assert index < 5

View File

@ -1,28 +1,25 @@
import unittest
from random import shuffle from random import shuffle
from kademlia.routing import KBucket, TableTraverser from kademlia.routing import KBucket, TableTraverser
from kademlia.tests.utils import mknode, FakeProtocol
class KBucketTest(unittest.TestCase): class TestKBucket:
def test_split(self): def test_split(self, mknode): # pylint: disable=no-self-use
bucket = KBucket(0, 10, 5) bucket = KBucket(0, 10, 5)
bucket.add_node(mknode(intid=5)) bucket.add_node(mknode(intid=5))
bucket.add_node(mknode(intid=6)) bucket.add_node(mknode(intid=6))
one, two = bucket.split() one, two = bucket.split()
self.assertEqual(len(one), 1) assert len(one) == 1
self.assertEqual(one.range, (0, 5)) assert one.range == (0, 5)
self.assertEqual(len(two), 1) assert len(two) == 1
self.assertEqual(two.range, (6, 10)) assert two.range == (6, 10)
def test_add_node(self): def test_add_node(self, mknode): # pylint: disable=no-self-use
# when full, return false # when full, return false
bucket = KBucket(0, 10, 2) bucket = KBucket(0, 10, 2)
self.assertTrue(bucket.add_node(mknode())) assert bucket.add_node(mknode()) is True
self.assertTrue(bucket.add_node(mknode())) assert bucket.add_node(mknode()) is True
self.assertFalse(bucket.add_node(mknode())) assert bucket.add_node(mknode()) is False
self.assertEqual(len(bucket), 2) assert len(bucket) == 2
# make sure when a node is double added it's put at the end # make sure when a node is double added it's put at the end
bucket = KBucket(0, 10, 3) bucket = KBucket(0, 10, 3)
@ -30,9 +27,9 @@ class KBucketTest(unittest.TestCase):
for node in nodes: for node in nodes:
bucket.add_node(node) bucket.add_node(node)
for index, node in enumerate(bucket.get_nodes()): for index, node in enumerate(bucket.get_nodes()):
self.assertEqual(node, nodes[index]) assert node == nodes[index]
def test_remove_node(self): def test_remove_node(self, mknode): # pylint: disable=no-self-use
k = 3 k = 3
bucket = KBucket(0, 10, k) bucket = KBucket(0, 10, k)
nodes = [mknode() for _ in range(10)] nodes = [mknode() for _ in range(10)]
@ -40,50 +37,44 @@ class KBucketTest(unittest.TestCase):
bucket.add_node(node) bucket.add_node(node)
replacement_nodes = bucket.replacement_nodes replacement_nodes = bucket.replacement_nodes
self.assertEqual(list(bucket.nodes.values()), nodes[:k]) assert list(bucket.nodes.values()) == nodes[:k]
self.assertEqual(list(replacement_nodes.values()), nodes[k:]) assert list(replacement_nodes.values()) == nodes[k:]
bucket.remove_node(nodes.pop()) bucket.remove_node(nodes.pop())
self.assertEqual(list(bucket.nodes.values()), nodes[:k]) assert list(bucket.nodes.values()) == nodes[:k]
self.assertEqual(list(replacement_nodes.values()), nodes[k:]) assert list(replacement_nodes.values()) == nodes[k:]
bucket.remove_node(nodes.pop(0)) bucket.remove_node(nodes.pop(0))
self.assertEqual(list(bucket.nodes.values()), nodes[:k-1] + nodes[-1:]) assert list(bucket.nodes.values()) == nodes[:k-1] + nodes[-1:]
self.assertEqual(list(replacement_nodes.values()), nodes[k-1:-1]) assert list(replacement_nodes.values()) == nodes[k-1:-1]
shuffle(nodes) shuffle(nodes)
for node in nodes: for node in nodes:
bucket.remove_node(node) bucket.remove_node(node)
self.assertEqual(len(bucket), 0) assert not bucket
self.assertEqual(len(replacement_nodes), 0) assert not replacement_nodes
def test_in_range(self): def test_in_range(self, mknode): # pylint: disable=no-self-use
bucket = KBucket(0, 10, 10) bucket = KBucket(0, 10, 10)
self.assertTrue(bucket.has_in_range(mknode(intid=5))) assert bucket.has_in_range(mknode(intid=5)) is True
self.assertFalse(bucket.has_in_range(mknode(intid=11))) assert bucket.has_in_range(mknode(intid=11)) is False
self.assertTrue(bucket.has_in_range(mknode(intid=10))) assert bucket.has_in_range(mknode(intid=10)) is True
self.assertTrue(bucket.has_in_range(mknode(intid=0))) assert bucket.has_in_range(mknode(intid=0)) is True
class RoutingTableTest(unittest.TestCase): # pylint: disable=too-few-public-methods
def setUp(self): class TestRoutingTable:
self.id = mknode().id # pylint: disable=invalid-name # pylint: disable=no-self-use
self.protocol = FakeProtocol(self.id) def test_add_contact(self, fake_server, mknode):
self.router = self.protocol.router fake_server.router.add_contact(mknode())
assert len(fake_server.router.buckets) == 1
def test_add_contact(self): assert len(fake_server.router.buckets[0].nodes) == 1
self.router.add_contact(mknode())
self.assertTrue(len(self.router.buckets), 1)
self.assertTrue(len(self.router.buckets[0].nodes), 1)
class TableTraverserTest(unittest.TestCase): # pylint: disable=too-few-public-methods
def setUp(self): class TestTableTraverser:
self.id = mknode().id # pylint: disable=invalid-name # pylint: disable=no-self-use
self.protocol = FakeProtocol(self.id) def test_iteration(self, fake_server, mknode):
self.router = self.protocol.router
def test_iteration(self):
""" """
Make 10 nodes, 5 buckets, two nodes add to one bucket in order, Make 10 nodes, 5 buckets, two nodes add to one bucket in order,
All buckets: [node0, node1], [node2, node3], [node4, node5], All buckets: [node0, node1], [node2, node3], [node4, node5],
@ -101,12 +92,13 @@ class TableTraverserTest(unittest.TestCase):
buckets.append(bucket) buckets.append(bucket)
# replace router's bucket with our test buckets # replace router's bucket with our test buckets
self.router.buckets = buckets fake_server.router.buckets = buckets
# expected nodes order # expected nodes order
expected_nodes = [nodes[5], nodes[4], nodes[3], nodes[2], nodes[7], expected_nodes = [nodes[5], nodes[4], nodes[3], nodes[2], nodes[7],
nodes[6], nodes[1], nodes[0], nodes[9], nodes[8]] nodes[6], nodes[1], nodes[0], nodes[9], nodes[8]]
start_node = nodes[4] start_node = nodes[4]
for index, node in enumerate(TableTraverser(self.router, start_node)): table_traverser = TableTraverser(fake_server.router, start_node)
self.assertEqual(node, expected_nodes[index]) for index, node in enumerate(table_traverser):
assert node == expected_nodes[index]

View File

@ -1,4 +1,3 @@
import unittest
import asyncio import asyncio
import pytest import pytest
@ -20,9 +19,9 @@ async def test_storing(bootstrap_node):
server.stop() server.stop()
class SwappableProtocolTests(unittest.TestCase): class TestSwappableProtocol:
def test_default_protocol(self): def test_default_protocol(self): # pylint: disable=no-self-use
""" """
An ordinary Server object will initially not have a protocol, but will An ordinary Server object will initially not have a protocol, but will
have a KademliaProtocol object as its protocol after its listen() have a KademliaProtocol object as its protocol after its listen()
@ -30,12 +29,12 @@ class SwappableProtocolTests(unittest.TestCase):
""" """
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
server = Server() server = Server()
self.assertIsNone(server.protocol) assert server.protocol is None
loop.run_until_complete(server.listen(8469)) loop.run_until_complete(server.listen(8469))
self.assertIsInstance(server.protocol, KademliaProtocol) assert isinstance(server.protocol, KademliaProtocol)
server.stop() server.stop()
def test_custom_protocol(self): def test_custom_protocol(self): # pylint: disable=no-self-use
""" """
A subclass of Server which overrides the protocol_class attribute will A subclass of Server which overrides the protocol_class attribute will
have an instance of that class as its protocol after its listen() have an instance of that class as its protocol after its listen()
@ -53,11 +52,11 @@ class SwappableProtocolTests(unittest.TestCase):
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
server = Server() server = Server()
loop.run_until_complete(server.listen(8469)) loop.run_until_complete(server.listen(8469))
self.assertNotIsInstance(server.protocol, CoconutProtocol) assert not isinstance(server.protocol, CoconutProtocol)
server.stop() server.stop()
# ...but our custom server does. # ...but our custom server does.
husk_server = HuskServer() husk_server = HuskServer()
loop.run_until_complete(husk_server.listen(8469)) loop.run_until_complete(husk_server.listen(8469))
self.assertIsInstance(husk_server.protocol, CoconutProtocol) assert isinstance(husk_server.protocol, CoconutProtocol)
husk_server.stop() husk_server.stop()

View File

@ -1,29 +1,27 @@
import unittest
from kademlia.storage import ForgetfulStorage from kademlia.storage import ForgetfulStorage
class ForgetfulStorageTest(unittest.TestCase): class ForgetfulStorageTest:
def test_storing(self): def test_storing(self): # pylint: disable=no-self-use
storage = ForgetfulStorage(10) storage = ForgetfulStorage(10)
storage['one'] = 'two' storage['one'] = 'two'
self.assertEqual(storage['one'], 'two') assert storage['one'] == 'two'
def test_forgetting(self): def test_forgetting(self): # pylint: disable=no-self-use
storage = ForgetfulStorage(0) storage = ForgetfulStorage(0)
storage['one'] = 'two' storage['one'] = 'two'
self.assertEqual(storage.get('one'), None) assert storage.get('one') is None
def test_iter(self): def test_iter(self): # pylint: disable=no-self-use
storage = ForgetfulStorage(10) storage = ForgetfulStorage(10)
storage['one'] = 'two' storage['one'] = 'two'
for key, value in storage: for key, value in storage:
self.assertEqual(key, 'one') assert key == 'one'
self.assertEqual(value, 'two') assert value == 'two'
def test_iter_old(self): def test_iter_old(self): # pylint: disable=no-self-use
storage = ForgetfulStorage(10) storage = ForgetfulStorage(10)
storage['one'] = 'two' storage['one'] = 'two'
for key, value in storage.iter_older_than(0): for key, value in storage.iter_older_than(0):
self.assertEqual(key, 'one') assert key == 'one'
self.assertEqual(value, 'two') assert value == 'two'

View File

@ -1,26 +1,25 @@
import hashlib import hashlib
import unittest
from kademlia.utils import digest, shared_prefix from kademlia.utils import digest, shared_prefix
class UtilsTest(unittest.TestCase): class TestUtils:
def test_digest(self): def test_digest(self): # pylint: disable=no-self-use
dig = hashlib.sha1(b'1').digest() dig = hashlib.sha1(b'1').digest()
self.assertEqual(dig, digest(1)) assert dig == digest(1)
dig = hashlib.sha1(b'another').digest() dig = hashlib.sha1(b'another').digest()
self.assertEqual(dig, digest('another')) assert dig == digest('another')
def test_shared_prefix(self): def test_shared_prefix(self): # pylint: disable=no-self-use
args = ['prefix', 'prefixasdf', 'prefix', 'prefixxxx'] args = ['prefix', 'prefixasdf', 'prefix', 'prefixxxx']
self.assertEqual(shared_prefix(args), 'prefix') assert shared_prefix(args) == 'prefix'
args = ['p', 'prefixasdf', 'prefix', 'prefixxxx'] args = ['p', 'prefixasdf', 'prefix', 'prefixxxx']
self.assertEqual(shared_prefix(args), 'p') assert shared_prefix(args) == 'p'
args = ['one', 'two'] args = ['one', 'two']
self.assertEqual(shared_prefix(args), '') assert shared_prefix(args) == ''
args = ['hi'] args = ['hi']
self.assertEqual(shared_prefix(args), 'hi') assert shared_prefix(args) == 'hi'

View File

@ -1,28 +0,0 @@
"""
Utility functions for tests.
"""
import random
import hashlib
from struct import pack
from kademlia.node import Node
from kademlia.routing import RoutingTable
def mknode(node_id=None, ip_addy=None, port=None, intid=None):
"""
Make a node. Created a random id if not specified.
"""
if intid is not None:
node_id = pack('>l', intid)
if not node_id:
randbits = str(random.getrandbits(255))
node_id = hashlib.sha1(randbits.encode()).digest()
return Node(node_id, ip_addy, port)
class FakeProtocol: # pylint: disable=too-few-public-methods
def __init__(self, source_id, ksize=20):
self.router = RoutingTable(self, ksize, Node(source_id))
self.storage = {}
self.source_id = source_id

2
pytest.ini Normal file
View File

@ -0,0 +1,2 @@
[pytest]
addopts = -vv --cov-report term-missing --cov kademlia