@@ -87,13 +87,45 @@ def put(self, source_data, target_path, replication=1):
87
87
fileUploadClient .close ()
88
88
return response .status
89
89
90
-
90
+
91
+ def get (self , source_path ):
92
+ if os .path .isabs (source_path )== False :
93
+ raise Exception ("Only absolute paths supported: %s" % (source_path ))
94
+ url_path = WEBHDFS_CONTEXT_ROOT + source_path + '?op=OPEN&overwrite=true&user.name=' + self .username
95
+ logger .debug ("GET URL: %s" % url_path )
96
+ httpClient = self .__getNameNodeHTTPClient ()
97
+ httpClient .request ('GET' , url_path , headers = {})
98
+ response = httpClient .getresponse ()
99
+ data = None
100
+ if response .length != None :
101
+ msg = response .msg
102
+ redirect_location = msg ["location" ]
103
+ logger .debug ("HTTP Response: %d, %s" % (response .status , response .reason ))
104
+ logger .debug ("HTTP Location: %s" % (redirect_location ))
105
+ result = urlparse .urlparse (redirect_location )
106
+ redirect_host = result .netloc [:result .netloc .index (":" )]
107
+ redirect_port = result .netloc [(result .netloc .index (":" )+ 1 ):]
108
+
109
+ redirect_path = result .path + "?" + result .query
110
+
111
+ logger .debug ("Send redirect to: host: %s, port: %s, path: %s " % (redirect_host , redirect_port , redirect_path ))
112
+ fileDownloadClient = httplib .HTTPConnection (redirect_host ,
113
+ redirect_port , timeout = 600 )
114
+
115
+ fileDownloadClient .request ('GET' , redirect_path , headers = {})
116
+ response = fileDownloadClient .getresponse ()
117
+ logger .debug ("HTTP Response: %d, %s" % (response .status , response .reason ))
118
+ data = response .read ()
119
+ httpClient .close ()
120
+ return data
121
+
122
+
91
123
def copyFromLocal (self , source_path , target_path , replication = 1 ):
92
124
f = open (source_path , "r" )
93
125
source_data = f .read ()
94
126
f .close ()
95
127
return self .put (source_data , target_path , replication )
96
-
128
+
97
129
98
130
def copyToLocal (self , source_path , target_path ):
99
131
if os .path .isabs (source_path )== False :
0 commit comments