28
28
"""Unit test fileio functions"""
29
29
30
30
import binascii
31
+ import codecs
31
32
import os
32
33
import sys
33
34
import unittest
34
35
35
36
# NOTE: wrap next imports in try except to prevent autopep8 shuffling up
36
37
try :
37
- from tests .support import MigTestCase , cleanpath , temppath , testmain
38
+ from tests .support import PY2 , MigTestCase , cleanpath , temppath , testmain
38
39
import mig .shared .fileio as fileio
39
40
except ImportError as ioe :
40
41
print ("Failed to import mig core modules: %s" % ioe )
43
44
DUMMY_BYTES = binascii .unhexlify ('DEADBEEF' ) # 4 bytes
44
45
DUMMY_BYTES_LENGTH = 4
45
46
DUMMY_UNICODE = u'UniCode123½¾µßðþđŋħĸþł@ª€£$¥©®'
46
- DUMMY_UNICODE_LENGTH = len (DUMMY_UNICODE )
47
+ DUMMY_UNICODE_BYTES = bytearray (DUMMY_UNICODE , 'utf8' )
48
+ DUMMY_UNICODE_BYTES_LENGTH = len (DUMMY_UNICODE_BYTES )
47
49
DUMMY_FILE_WRITECHUNK = 'fileio/write_chunk'
48
50
DUMMY_FILE_WRITEFILE = 'fileio/write_file'
49
51
50
52
assert isinstance (DUMMY_BYTES , bytes )
51
53
52
54
55
+ def _as_unicode_string (value ):
56
+ assert isinstance (value , bytearray )
57
+ return unicode (codecs .decode (value , 'utf8' )) if PY2 else str (value , 'utf8' )
58
+
59
+
60
+ class _ByteText :
61
+ """File-like object that allows interacting with a text file as a bytearray.
62
+
63
+ Supports reading and directly usable as a context manager."""
64
+
65
+ def __init__ (self , path , mode = 'r' ):
66
+ self ._file = None
67
+ self ._path = path
68
+ self ._mode = "%s%s" % (mode , 'b' ) if 'b' not in mode else mode
69
+
70
+ def read (self , size ):
71
+ content = self ._file .read (size )
72
+ # always read back the content as though it was raw bytes
73
+ return bytearray (content )
74
+
75
+ def __enter__ (self ):
76
+ self ._file = open (self ._path , mode = self ._mode )
77
+ return self
78
+
79
+ def __exit__ (self , * args ):
80
+ self ._file .close ()
81
+ if args [1 ] is not None :
82
+ raise args [1 ]
83
+
84
+
53
85
class MigSharedFileio__write_chunk (MigTestCase ):
54
- # TODO: Add docstrings to this class and its methods
86
+ """Coverage of the write_chunk() function."""
87
+
55
88
def setUp (self ):
56
89
super (MigSharedFileio__write_chunk , self ).setUp ()
57
90
self .tmp_path = temppath (DUMMY_FILE_WRITECHUNK , self , skip_clean = True )
@@ -60,9 +93,7 @@ def setUp(self):
60
93
def test_return_false_on_invalid_data (self ):
61
94
self .logger .forgive_errors ()
62
95
63
- # NOTE: we make sure to disable any forced stringification here
64
- did_succeed = fileio .write_chunk (self .tmp_path , 1234 , 0 , self .logger ,
65
- force_string = False )
96
+ did_succeed = fileio .write_chunk (self .tmp_path , 1234 , 0 , self .logger )
66
97
self .assertFalse (did_succeed )
67
98
68
99
def test_return_false_on_invalid_offset (self ):
@@ -106,7 +137,6 @@ def test_store_bytes_at_offset(self):
106
137
"expected a hole was left" )
107
138
self .assertEqual (content [3 :], DUMMY_BYTES )
108
139
109
- @unittest .skip ("TODO: enable again - requires the temporarily disabled auto mode select" )
110
140
def test_store_bytes_in_text_mode (self ):
111
141
fileio .write_chunk (self .tmp_path , DUMMY_BYTES , 0 , self .logger ,
112
142
mode = "r+" )
@@ -116,28 +146,29 @@ def test_store_bytes_in_text_mode(self):
116
146
self .assertEqual (len (content ), DUMMY_BYTES_LENGTH )
117
147
self .assertEqual (content [:], DUMMY_BYTES )
118
148
119
- @unittest .skip ("TODO: enable again - requires the temporarily disabled auto mode select" )
120
149
def test_store_unicode (self ):
121
- fileio .write_chunk (self .tmp_path , DUMMY_UNICODE , 0 , self .logger ,
122
- mode = 'r+' )
150
+ did_succeed = fileio .write_chunk (self .tmp_path , DUMMY_UNICODE , 0 , self .logger ,
151
+ mode = 'r+' )
152
+ self .assertTrue (did_succeed )
123
153
124
- with open (self .tmp_path , 'r' ) as file :
154
+ with _ByteText (self .tmp_path ) as file :
125
155
content = file .read (1024 )
126
- self .assertEqual (len (content ), DUMMY_UNICODE_LENGTH )
127
- self .assertEqual (content [:], DUMMY_UNICODE )
156
+ self .assertEqual (len (content ), DUMMY_UNICODE_BYTES_LENGTH )
157
+ self .assertEqual (content [:], DUMMY_UNICODE_BYTES )
128
158
129
- @unittest .skip ("TODO: enable again - requires the temporarily disabled auto mode select" )
130
159
def test_store_unicode_in_binary_mode (self ):
131
160
fileio .write_chunk (self .tmp_path , DUMMY_UNICODE , 0 , self .logger ,
132
161
mode = 'r+b' )
133
162
134
- with open (self .tmp_path , 'r' ) as file :
163
+ with _ByteText (self .tmp_path ) as file :
135
164
content = file .read (1024 )
136
- self .assertEqual (len (content ), DUMMY_UNICODE_LENGTH )
137
- self .assertEqual (content [:], DUMMY_UNICODE )
165
+ self .assertEqual (len (content ), DUMMY_UNICODE_BYTES_LENGTH )
166
+ self .assertEqual (_as_unicode_string ( content [:]) , DUMMY_UNICODE )
138
167
139
168
140
169
class MigSharedFileio__write_file (MigTestCase ):
170
+ """Coverage of the write_file() function."""
171
+
141
172
def setUp (self ):
142
173
super (MigSharedFileio__write_file , self ).setUp ()
143
174
self .tmp_path = temppath (DUMMY_FILE_WRITEFILE , self , skip_clean = True )
@@ -146,17 +177,16 @@ def setUp(self):
146
177
def test_return_false_on_invalid_data (self ):
147
178
self .logger .forgive_errors ()
148
179
149
- # NOTE: we make sure to disable any forced stringification here
150
- did_succeed = fileio .write_file (1234 , self .tmp_path , self .logger ,
151
- force_string = False )
180
+ did_succeed = fileio .write_file (1234 , self .tmp_path , self .logger )
152
181
self .assertFalse (did_succeed )
153
182
154
183
def test_return_false_on_invalid_dir (self ):
155
184
self .logger .forgive_errors ()
156
185
157
186
os .makedirs (self .tmp_path )
158
187
159
- did_succeed = fileio .write_file (DUMMY_BYTES , self .tmp_path , self .logger )
188
+ did_succeed = fileio .write_file (
189
+ DUMMY_BYTES , self .tmp_path , self .logger )
160
190
self .assertFalse (did_succeed )
161
191
162
192
def test_return_false_on_missing_dir (self ):
@@ -167,28 +197,23 @@ def test_return_false_on_missing_dir(self):
167
197
self .assertFalse (did_succeed )
168
198
169
199
def test_creates_directory (self ):
170
- # TODO: temporarily use empty string to avoid any byte/unicode issues
171
- # did_succeed = fileio.write_file(DUMMY_BYTES, self.tmp_path, self.logger)
172
- did_succeed = fileio .write_file ('' , self .tmp_path , self .logger )
200
+ did_succeed = fileio .write_file (
201
+ DUMMY_BYTES , self .tmp_path , self .logger )
173
202
self .assertTrue (did_succeed )
174
203
175
204
path_kind = self .assertPathExists (DUMMY_FILE_WRITEFILE )
176
205
self .assertEqual (path_kind , "file" )
177
206
178
207
def test_store_bytes (self ):
179
- mode = 'w'
180
- # TODO: remove next once we have auto adjust mode in write helper
181
- mode = fileio ._auto_adjust_mode (DUMMY_BYTES , mode )
182
- did_succeed = fileio .write_file (DUMMY_BYTES , self .tmp_path , self .logger ,
183
- mode = mode )
208
+ did_succeed = fileio .write_file (
209
+ DUMMY_BYTES , self .tmp_path , self .logger )
184
210
self .assertTrue (did_succeed )
185
211
186
212
with open (self .tmp_path , 'rb' ) as file :
187
213
content = file .read (1024 )
188
214
self .assertEqual (len (content ), DUMMY_BYTES_LENGTH )
189
215
self .assertEqual (content [:], DUMMY_BYTES )
190
216
191
- @unittest .skip ("TODO: enable again - requires the temporarily disabled auto mode select" )
192
217
def test_store_bytes_in_text_mode (self ):
193
218
did_succeed = fileio .write_file (DUMMY_BYTES , self .tmp_path , self .logger ,
194
219
mode = "w" )
@@ -199,27 +224,25 @@ def test_store_bytes_in_text_mode(self):
199
224
self .assertEqual (len (content ), DUMMY_BYTES_LENGTH )
200
225
self .assertEqual (content [:], DUMMY_BYTES )
201
226
202
- @unittest .skip ("TODO: enable again - requires the temporarily disabled auto mode select" )
203
227
def test_store_unicode (self ):
204
228
did_succeed = fileio .write_file (DUMMY_UNICODE , self .tmp_path ,
205
229
self .logger , mode = 'w' )
206
230
self .assertTrue (did_succeed )
207
231
208
- with open (self .tmp_path , 'r' ) as file :
232
+ with _ByteText (self .tmp_path , 'r' ) as file :
209
233
content = file .read (1024 )
210
- self .assertEqual (len (content ), DUMMY_UNICODE_LENGTH )
211
- self .assertEqual (content [:], DUMMY_UNICODE )
234
+ self .assertEqual (len (content ), DUMMY_UNICODE_BYTES_LENGTH )
235
+ self .assertEqual (content [:], DUMMY_UNICODE_BYTES )
212
236
213
- @unittest .skip ("TODO: enable again - requires the temporarily disabled auto mode select" )
214
237
def test_store_unicode_in_binary_mode (self ):
215
238
did_succeed = fileio .write_file (DUMMY_UNICODE , self .tmp_path ,
216
239
self .logger , mode = 'wb' )
217
240
self .assertTrue (did_succeed )
218
241
219
- with open (self .tmp_path , 'r' ) as file :
242
+ with _ByteText (self .tmp_path , 'r' ) as file :
220
243
content = file .read (1024 )
221
- self .assertEqual (len (content ), DUMMY_UNICODE_LENGTH )
222
- self .assertEqual (content [:], DUMMY_UNICODE )
244
+ self .assertEqual (len (content ), DUMMY_UNICODE_BYTES_LENGTH )
245
+ self .assertEqual (content [:], DUMMY_UNICODE_BYTES )
223
246
224
247
225
248
if __name__ == '__main__' :
0 commit comments