@@ -30,14 +30,21 @@ class HyperdriveDaemon extends EventEmitter {
30
30
this . db = level ( `${ storage } /db` , { valueEncoding : 'json' } )
31
31
this . opts = opts
32
32
33
+ const dbs = {
34
+ megastore : sub ( this . db , 'megastore' ) ,
35
+ fuse : sub ( this . db , 'fuse' , { valueEncoding : 'json' } ) ,
36
+ drives : sub ( this . db , 'drives' , { valueEncoding : 'json' } )
37
+ }
38
+
33
39
const megastoreOpts = {
34
40
storage : path => raf ( `${ storage } /cores/${ path } ` ) ,
35
- db : sub ( this . db , ' megastore' ) ,
41
+ db : dbs . megastore ,
36
42
networker : new SwarmNetworker ( opts . network )
37
43
}
44
+
38
45
this . megastore = new Megastore ( megastoreOpts . storage , megastoreOpts . db , megastoreOpts . networker )
39
- this . drives = new DriveManager ( this . megastore , sub ( this . db , ' drives' ) , this . opts )
40
- this . fuse = hyperfuse ? new FuseManager ( this . megastore , this . drives , sub ( this . db , ' fuse' ) , this . opts ) : null
46
+ this . drives = new DriveManager ( this . megastore , dbs . drives , this . opts )
47
+ this . fuse = hyperfuse ? new FuseManager ( this . megastore , this . drives , dbs . fuse , this . opts ) : null
41
48
42
49
this . drives . on ( 'error' , err => this . emit ( 'error' , err ) )
43
50
this . fuse . on ( 'error' , err => this . emit ( 'error' , err ) )
@@ -90,34 +97,33 @@ async function start () {
90
97
const storageRoot = argv . storage
91
98
await ensureStorage ( )
92
99
93
- const hypermount = new HyperdriveDaemon ( storageRoot )
94
- await hypermount . ready ( )
100
+ const daemon = new HyperdriveDaemon ( storageRoot )
101
+ await daemon . ready ( )
95
102
96
103
const server = new grpc . Server ( ) ;
97
104
if ( hyperfuse ) {
98
105
server . addService ( rpc . fuse . services . FuseService , {
99
- ...authenticate ( metadata , catchErrors ( createFuseHandlers ( this . fuseManager ) ) )
106
+ ...wrap ( metadata , createFuseHandlers ( daemon . fuse ) , { authenticate : true } )
100
107
} )
101
108
}
102
109
server . addService ( rpc . drive . services . DriveService , {
103
- ...authenticate ( metadata , catchErrors ( createDriveHandlers ( this . driveManager ) ) )
110
+ ...wrap ( metadata , createDriveHandlers ( daemon . drives ) , { authenticate : true } )
104
111
} )
105
112
server . addService ( rpc . main . services . HyperdriveService , {
106
- ...authenticate ( metadata , catchErrors ( createMainHandlers ( this ) ) )
113
+ ...wrap ( metadata , createMainHandlers ( daemon ) , { authenticate : true } )
107
114
} )
108
115
109
- console . log ( 'binding server...' )
110
116
server . bind ( `0.0.0.0:${ argv . port } ` , grpc . ServerCredentials . createInsecure ( ) )
111
117
server . start ( )
112
- console . log ( 'server started. ' )
118
+ log . info ( { port : argv . port } , 'server listening ' )
113
119
114
120
process . once ( 'SIGINT' , cleanup )
115
121
process . once ( 'SIGTERM' , cleanup )
116
122
process . once ( 'unhandledRejection' , cleanup )
117
123
process . once ( 'uncaughtException' , cleanup )
118
124
119
125
async function cleanup ( ) {
120
- await hypermount . close ( )
126
+ await daemon . close ( )
121
127
server . tryDestroy ( )
122
128
}
123
129
@@ -131,26 +137,47 @@ async function start () {
131
137
}
132
138
}
133
139
134
- function authenticate ( metadata , methods ) {
135
- const authenticated = { }
140
+ function wrap ( metadata , methods ) {
141
+ const promisified = promisify ( methods )
142
+ let authenticated = authenticate ( metadata , methods )
143
+ }
144
+
145
+ function wrap ( metadata , methods , opts ) {
146
+ const wrapped = { }
147
+ const authenticate = opts && opts . authenticate
136
148
for ( const methodName of Object . keys ( methods ) ) {
137
149
const method = methods [ methodName ]
138
- authenticated [ methodName ] = function ( call , ...args ) {
139
- const cb = args [ args . length - 1 ]
140
- const token = call . metadata && call . metadata . token
141
- if ( ! token || ! token . equals ( metadata . token ) ) {
142
- const err = {
143
- code : grpc . status . UNAUTHENTICATED ,
144
- message : 'Invalid auth token.'
150
+ wrapped [ methodName ] = function ( call , ...args ) {
151
+ const tag = { method : methodName , received : Date . now ( ) }
152
+ const cb = args . length ? args [ args . length - 1 ] : null
153
+ if ( authenticate ) {
154
+ let token = call . metadata && call . metadata . get ( 'token' )
155
+ if ( token ) token = token [ 0 ]
156
+ log . trace ( { ...tag , token } , 'received token' )
157
+ if ( ! token || token !== metadata . token ) {
158
+ log . error ( tag , 'request authentication failed' )
159
+ const err = {
160
+ code : grpc . status . UNAUTHENTICATED ,
161
+ message : 'Invalid auth token.'
162
+ }
163
+ if ( cb ) return cb ( err )
164
+ return call . destroy ( err )
145
165
}
146
- if ( cb ) return cb ( err )
147
- return call . destroy ( err )
166
+ log . debug ( tag , 'request authentication succeeded' )
148
167
}
149
-
150
- return method ( call , ...args )
168
+ method ( call )
169
+ . then ( rsp => {
170
+ log . debug ( tag , 'request was successful' )
171
+ if ( cb ) return cb ( null , rsp )
172
+ } )
173
+ . catch ( err => {
174
+ log . error ( { ...tag , error : err . toString ( ) , stack : err . stack } , 'request failed' )
175
+ if ( cb ) return cb ( serverError ( err ) )
176
+ return call . destroy ( err )
177
+ } )
151
178
}
152
179
}
153
- return authenticated
180
+ return wrapped
154
181
}
155
182
156
183
function createMainHandlers ( daemon ) {
0 commit comments