summary refs log tree commit diff stats
path: root/test/tc_signal.py
blob: 3b1bac40a73e709fadfd19ce69386bb618e7bb4e (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# Copyright (C) 2009, 2010  Roman Zimbelmann <romanz@lavabit.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import os.path
import sys
rangerpath = os.path.join(os.path.dirname(__file__), '..')
if sys.path[1] != rangerpath:
	sys.path[1:1] = [rangerpath]

import unittest
import gc
from ranger.ext.signal_dispatcher import *

class TestSignal(unittest.TestCase):
	def setUp(self):
		self.sd = SignalDispatcher()

	def test_signal_register_emit(self):
		sd = self.sd
		def poo(sig):
			self.assert_('works' in sig)
			self.assertEqual('yes', sig.works)
		handler = sd.signal_bind('x', poo)

		sd.signal_emit('x', works='yes')
		sd.signal_unbind(handler)
		sd.signal_emit('x')

	def test_signal_order(self):
		sd = self.sd
		lst = []
		def addn(n):
			return lambda _: lst.append(n)

		sd.signal_bind('x', addn(6))
		sd.signal_bind('x', addn(3), priority=1)
		sd.signal_bind('x', addn(2), priority=1)
		sd.signal_bind('x', addn(9), priority=0)
		sd.signal_bind('x', addn(1337), priority=0.7)
		sd.signal_emit('x')

		self.assert_(lst.index(3) < lst.index(6))
		self.assert_(lst.index(2) < lst.index(6))
		self.assert_(lst.index(6) < lst.index(9))
		self.assert_(lst.index(1337) < lst.index(6))
		self.assert_(lst.index(1337) < lst.index(9))
		self.assert_(lst.index(1337) > lst.index(2))

	def test_modifying_arguments(self):
		sd = self.sd
		lst = []
		def modify(s):
			s.number = 5
		def set_number(s):
			lst.append(s.number)
		def stopit(s):
			s.stop()

		sd.signal_bind('setnumber', set_number)
		sd.signal_emit('setnumber', number=100)
		self.assertEqual(100, lst[-1])

		sd.signal_bind('setnumber', modify, priority=1)
		sd.signal_emit('setnumber', number=100)
		self.assertEqual(5, lst[-1])

		lst.append(None)
		sd.signal_bind('setnumber', stopit, priority=1)
		sd.signal_emit('setnumber', number=100)
		self.assertEqual(None, lst[-1])

	def test_weak_refs(self):
		sd = self.sd
		is_deleted = [False]

		class Foo(object):
			def __init__(self):
				self.alphabet = ['a']
			def calc(self, signal):
				self.alphabet.append(chr(ord(self.alphabet[-1]) + 1))
			def __del__(self):
				is_deleted[0] = True

		foo = Foo()
		alphabet = foo.alphabet
		calc = foo.calc

		del foo
		self.assertEqual('a', ''.join(alphabet))
		sd.signal_bind('mysignal', calc, weak=True)
		sd.signal_emit('mysignal')
		self.assertEqual('ab', ''.join(alphabet))
		self.assertFalse(is_deleted[0])

		del calc
		self.assertTrue(is_deleted[0])

	def test_weak_refs_dead_on_arrival(self):
		sd = self.sd
		is_deleted = [False]

		class Foo(object):
			def __init__(self):
				self.alphabet = ['a']
			def calc(self, signal):
				self.alphabet.append(chr(ord(self.alphabet[-1]) + 1))
			def __del__(self):
				is_deleted[0] = True

		foo = Foo()
		alphabet = foo.alphabet

		self.assertEqual('a', ''.join(alphabet))
		sd.signal_bind('mysignal', foo.calc, weak=True)

		sd.signal_emit('mysignal')
		self.assertEqual('ab', ''.join(alphabet))
		self.assertFalse(is_deleted[0])

		del foo

		sd.signal_emit('mysignal')
		self.assertEqual('ab', ''.join(alphabet))
		self.assertTrue(is_deleted[0])

if __name__ == '__main__':
	unittest.main()
' href='#n29'>29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124














                                                                       
              
          



                                                          

                                           
                           

                                               


                                                
 

                                                      
                                              
 
               
                               
                                         
                                                          
                                        

                                                   

                                                     
                                                 

                                                                                
 
                                            
                         
                                                                                
                                        
                                  

                                           
                                                           
 


                                                                                       

                                                                                    

                                        

                                             




                                                                                 
                                      
                                
                                                 



                                                                                       
 
                                       
                                                
                                  
                
                                                   

                                                
                                                     

                                                                                
 
                                        

                           
                                                                                 
                                                                                         







                                                        
                                        

                          










                                                                             

                                                                                                      

                                                       
 

                          
 
# Copyright (C) 2009, 2010  Roman Zimbelmann <romanz@lavabit.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import os.path
import sys
rangerpath = os.path.join(os.path.dirname(__file__), '..')
if sys.path[1] != rangerpath:
	sys.path[1:1] = [rangerpath]

from os.path import realpath, join, dirname

from ranger import fsobject
from ranger.fsobject.file import File
from ranger.fsobject.directory import Directory
from ranger.shared.settings import SettingsAware

SettingsAware._setup()

TESTDIR = realpath(join(dirname(__file__), 'testdir'))
TESTFILE = join(TESTDIR, 'testfile5234148')
NONEXISTANT_DIR = join(TESTDIR, 'nonexistant')

import unittest
class Test1(unittest.TestCase):
	def test_initial_condition(self):
		# Check for the expected initial condition
		dir = Directory(TESTDIR)

		self.assertEqual(dir.path, TESTDIR)
		self.assertFalse(dir.content_loaded)
		self.assertEqual(dir.filenames, None)
		self.assertEqual(dir.files, None)
		if not sys.flags.optimize:  # asserts are ignored with python -O
			self.assertRaises(AssertionError, len, dir)

	def test_after_content_loaded(self):
		import os
		# Check whether the directory has the correct list of filenames.
		dir = Directory(TESTDIR)
		dir.load_content()

		self.assertTrue(dir.exists)
		self.assertEqual(type(dir.filenames), list)

		# Get the filenames you expect it to have and sort both before
		# comparing. I don't expect any order after only loading the filenames.
		assumed_filenames = os.listdir(TESTDIR)
		assumed_filenames = list(map(lambda str: os.path.join(TESTDIR, str),
			assumed_filenames))
		assumed_filenames.sort()
		dir.filenames.sort()

		self.assertTrue(len(dir) > 0)
		self.assertEqual(dir.filenames, assumed_filenames)

		# build a file object for each file in the list assumed_filenames
		# and find exactly one equivalent in dir.files
		for name in assumed_filenames:
			f = File(name)
			f.load()
			for dirfile in dir.files:
				if (f.path == dirfile.path and f.stat == dirfile.stat):
					break
			else:
				self.fail("couldn't find file {0}".format(name))

	def test_nonexistant_dir(self):
		dir = Directory(NONEXISTANT_DIR)
		dir.load_content()
		
		self.assertTrue(dir.content_loaded)
		self.assertFalse(dir.exists)
		self.assertFalse(dir.accessible)
		self.assertEqual(dir.filenames, None)
		if not sys.flags.optimize:  # asserts are ignored with python -O
			self.assertRaises(AssertionError, len, dir)

	def test_load_if_outdated(self):
		import os
		import time
		# modify the directory. If the time between the last modification
		# was within the filesystems resolution of mtime, we should have a reload

		def modify_dir():
			open(TESTFILE, 'w').close()
			os.unlink(TESTFILE)

		def mtime():
			return os.stat(TESTDIR).st_mtime

		dir = Directory(TESTDIR)
		dir.load()

		# If the modification happens to be in the same second as the
		# last modification, it will result in mtime having the same
		# integer value. So we wait until the resolution is exceeded
		# and mtime differs.
		old_mtime = mtime()
		for i in range(50):
			modify_dir()
			if old_mtime != mtime(): break
			time.sleep(0.1)
		else:
			# fail after 5 seconds of trying
			self.fail(
					"Cannot perform test: mtime of TESTDIR is not being updated.")

		self.assertTrue(dir.load_if_outdated())

if __name__ == '__main__':
	unittest.main()