Skip to content

Commit 1bc6ae8

Browse files
Apply various Clippy recommendations (#603)
1 parent 0062c29 commit 1bc6ae8

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+124
-140
lines changed

bytes/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ pub mod arc {
9696
sequestered: self.sequestered.clone(),
9797
};
9898

99-
unsafe { self.ptr = self.ptr.offset(index as isize); }
99+
unsafe { self.ptr = self.ptr.add(index); }
100100
self.len -= index;
101101

102102
result
@@ -161,7 +161,7 @@ pub mod arc {
161161
/// shared4.try_merge(shared2).ok().expect("Failed to merge 4 and 231");
162162
/// ```
163163
pub fn try_merge(&mut self, other: Bytes) -> Result<(), Bytes> {
164-
if Arc::ptr_eq(&self.sequestered, &other.sequestered) && ::std::ptr::eq(unsafe { self.ptr.offset(self.len as isize) }, other.ptr) {
164+
if Arc::ptr_eq(&self.sequestered, &other.sequestered) && ::std::ptr::eq(unsafe { self.ptr.add(self.len) }, other.ptr) {
165165
self.len += other.len;
166166
Ok(())
167167
}

communication/src/allocator/process.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ impl AllocateBuilder for ProcessBuilder {
3535

3636
// Initialize buzzers; send first, then recv.
3737
for worker in self.buzzers_send.iter() {
38-
let buzzer = Buzzer::new();
38+
let buzzer = Buzzer::default();
3939
worker.send(buzzer).expect("Failed to send buzzer");
4040
}
4141
let mut buzzers = Vec::with_capacity(self.buzzers_recv.len());
@@ -88,8 +88,8 @@ impl Process {
8888

8989
counters_recv
9090
.into_iter()
91-
.zip(buzzers_send.into_iter())
92-
.zip(buzzers_recv.into_iter())
91+
.zip(buzzers_send)
92+
.zip(buzzers_recv)
9393
.enumerate()
9494
.map(|(index, ((recv, bsend), brecv))| {
9595
ProcessBuilder {

communication/src/allocator/thread.rs

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,12 @@ pub struct ThreadBuilder;
1515

1616
impl AllocateBuilder for ThreadBuilder {
1717
type Allocator = Thread;
18-
fn build(self) -> Self::Allocator { Thread::new() }
18+
fn build(self) -> Self::Allocator { Thread::default() }
1919
}
2020

2121

2222
/// An allocator for intra-thread communication.
23+
#[derive(Default)]
2324
pub struct Thread {
2425
/// Shared counts of messages in channels.
2526
events: Rc<RefCell<Vec<usize>>>,
@@ -53,13 +54,6 @@ pub type ThreadPusher<T> = CountPusher<T, Pusher<T>>;
5354
pub type ThreadPuller<T> = CountPuller<T, Puller<T>>;
5455

5556
impl Thread {
56-
/// Allocates a new thread-local channel allocator.
57-
pub fn new() -> Self {
58-
Thread {
59-
events: Rc::new(RefCell::new(Default::default())),
60-
}
61-
}
62-
6357
/// Creates a new thread-local channel from an identifier and shared counts.
6458
pub fn new_from<T: 'static>(identifier: usize, events: Rc<RefCell<Vec<usize>>>)
6559
-> (ThreadPusher<T>, ThreadPuller<T>)

communication/src/allocator/zero_copy/allocator.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ impl<A: AllocateBuilder> TcpBuilder<A> {
8282
// Fulfill puller obligations.
8383
let mut recvs = Vec::with_capacity(self.peers);
8484
for promise in self.promises.into_iter() {
85-
let buzzer = crate::buzzer::Buzzer::new();
85+
let buzzer = crate::buzzer::Buzzer::default();
8686
let queue = MergeQueue::new(buzzer);
8787
promise.send(queue.clone()).expect("Failed to send MergeQueue");
8888
recvs.push(queue.clone());

communication/src/allocator/zero_copy/allocator_process.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ impl ProcessBuilder {
6060
// Fulfill puller obligations.
6161
let mut recvs = Vec::with_capacity(self.peers);
6262
for puller in self.pullers.into_iter() {
63-
let buzzer = crate::buzzer::Buzzer::new();
63+
let buzzer = crate::buzzer::Buzzer::default();
6464
let queue = MergeQueue::new(buzzer);
6565
puller.send(queue.clone()).expect("Failed to send MergeQueue");
6666
recvs.push(queue.clone());

communication/src/allocator/zero_copy/initialize.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -63,10 +63,8 @@ pub fn initialize_networking_from_sockets<S: Stream + 'static>(
6363
-> ::std::io::Result<(Vec<TcpBuilder<ProcessBuilder>>, CommsGuard)>
6464
{
6565
// Sockets are expected to be blocking,
66-
for socket in sockets.iter_mut() {
67-
if let Some(socket) = socket {
68-
socket.set_nonblocking(false).expect("failed to set socket to blocking");
69-
}
66+
for socket in sockets.iter_mut().flatten() {
67+
socket.set_nonblocking(false).expect("failed to set socket to blocking");
7068
}
7169

7270
let processes = sockets.len();

communication/src/allocator/zero_copy/tcp.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
//!
1+
//! Methods related to reading from and writing to TCP connections
22
33
use std::io::{self, Write};
44
use crossbeam_channel::{Sender, Receiver};
@@ -67,9 +67,9 @@ where
6767
assert!(!buffer.empty().is_empty());
6868

6969
// Attempt to read some more bytes into self.buffer.
70-
let read = match reader.read(&mut buffer.empty()) {
70+
let read = match reader.read(buffer.empty()) {
7171
Err(x) => tcp_panic("reading data", x),
72-
Ok(n) if n == 0 => {
72+
Ok(0) => {
7373
tcp_panic(
7474
"reading data",
7575
std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "socket closed"),
@@ -102,7 +102,7 @@ where
102102
panic!("Clean shutdown followed by data.");
103103
}
104104
buffer.ensure_capacity(1);
105-
if reader.read(&mut buffer.empty()).unwrap_or_else(|e| tcp_panic("reading EOF", e)) > 0 {
105+
if reader.read(buffer.empty()).unwrap_or_else(|e| tcp_panic("reading EOF", e)) > 0 {
106106
panic!("Clean shutdown followed by data.");
107107
}
108108
}
@@ -141,7 +141,7 @@ pub fn send_loop<S: Stream>(
141141
logger.as_mut().map(|l| l.log(StateEvent { send: true, process, remote, start: true, }));
142142

143143
let mut sources: Vec<MergeQueue> = sources.into_iter().map(|x| {
144-
let buzzer = crate::buzzer::Buzzer::new();
144+
let buzzer = crate::buzzer::Buzzer::default();
145145
let queue = MergeQueue::new(buzzer);
146146
x.send(queue.clone()).expect("failed to send MergeQueue");
147147
queue

communication/src/buzzer.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,11 @@ pub struct Buzzer {
88
thread: Thread,
99
}
1010

11+
impl Default for Buzzer {
12+
fn default() -> Self { Self { thread: std::thread::current() } }
13+
}
14+
1115
impl Buzzer {
12-
/// Creates a new buzzer for the current thread.
13-
pub fn new() -> Self {
14-
Self {
15-
thread: std::thread::current()
16-
}
17-
}
1816
/// Unparks the target thread.
1917
pub fn buzz(&self) {
2018
self.thread.unpark()

communication/src/initialize.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -155,15 +155,15 @@ impl Config {
155155
Ok((vec![GenericBuilder::Thread(ThreadBuilder)], Box::new(())))
156156
},
157157
Config::Process(threads) => {
158-
Ok((Process::new_vector(threads).into_iter().map(|x| GenericBuilder::Process(x)).collect(), Box::new(())))
158+
Ok((Process::new_vector(threads).into_iter().map(GenericBuilder::Process).collect(), Box::new(())))
159159
},
160160
Config::ProcessBinary(threads) => {
161-
Ok((ProcessBuilder::new_vector(threads).into_iter().map(|x| GenericBuilder::ProcessBinary(x)).collect(), Box::new(())))
161+
Ok((ProcessBuilder::new_vector(threads).into_iter().map(GenericBuilder::ProcessBinary).collect(), Box::new(())))
162162
},
163163
Config::Cluster { threads, process, addresses, report, log_fn } => {
164164
match initialize_networking(addresses, process, threads, report, log_fn) {
165165
Ok((stuff, guard)) => {
166-
Ok((stuff.into_iter().map(|x| GenericBuilder::ZeroCopy(x)).collect(), Box::new(guard)))
166+
Ok((stuff.into_iter().map(GenericBuilder::ZeroCopy).collect(), Box::new(guard)))
167167
},
168168
Err(err) => Err(format!("failed to initialize networking: {}", err))
169169
}

communication/src/lib.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -181,11 +181,11 @@ fn promise_futures<T>(sends: usize, recvs: usize) -> (Vec<Vec<Sender<T>>>, Vec<V
181181
let mut senders: Vec<_> = (0 .. sends).map(|_| Vec::with_capacity(recvs)).collect();
182182
let mut recvers: Vec<_> = (0 .. recvs).map(|_| Vec::with_capacity(sends)).collect();
183183

184-
for sender in 0 .. sends {
185-
for recver in 0 .. recvs {
184+
for sender in senders.iter_mut() {
185+
for recver in recvers.iter_mut() {
186186
let (send, recv) = crossbeam_channel::unbounded();
187-
senders[sender].push(send);
188-
recvers[recver].push(recv);
187+
sender.push(send);
188+
recver.push(recv);
189189
}
190190
}
191191

communication/src/networking.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ pub fn create_sockets(addresses: Vec<String>, my_index: usize, noisy: bool) -> R
100100
let mut results = start_task.join().unwrap()?;
101101
results.push(None);
102102
let to_extend = await_task.join().unwrap()?;
103-
results.extend(to_extend.into_iter());
103+
results.extend(to_extend);
104104

105105
if noisy { println!("worker {}:\tinitialization complete", my_index) }
106106

container/src/columnation.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -200,9 +200,9 @@ impl<T: Columnation> Default for TimelyStack<T> {
200200

201201
impl<'a, A: 'a + Columnation> FromIterator<&'a A> for TimelyStack<A> {
202202
fn from_iter<T: IntoIterator<Item = &'a A>>(iter: T) -> Self {
203-
let mut iter = iter.into_iter();
203+
let iter = iter.into_iter();
204204
let mut c = TimelyStack::<A>::with_capacity(iter.size_hint().0);
205-
while let Some(element) = iter.next() {
205+
for element in iter {
206206
c.copy(element);
207207
}
208208

container/src/flatcontainer.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,13 @@ impl<R: Region + Clone + 'static> Container for FlatStack<R> {
1717

1818
type Iter<'a> = <&'a Self as IntoIterator>::IntoIter;
1919

20-
fn iter<'a>(&'a self) -> Self::Iter<'a> {
20+
fn iter(&self) -> Self::Iter<'_> {
2121
IntoIterator::into_iter(self)
2222
}
2323

2424
type DrainIter<'a> = Self::Iter<'a>;
2525

26-
fn drain<'a>(&'a mut self) -> Self::DrainIter<'a> {
26+
fn drain(&mut self) -> Self::DrainIter<'_> {
2727
IntoIterator::into_iter(&*self)
2828
}
2929
}

mdbook/src/chapter_2/chapter_2_4.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ fn main() {
184184

185185
in1.binary_frontier(&in2, Pipeline, Pipeline, "concat_buffer", |capability, info| {
186186

187-
let mut notificator = FrontierNotificator::new();
187+
let mut notificator = FrontierNotificator::default();
188188
let mut stash = HashMap::new();
189189

190190
move |input1, input2, output| {

timely/examples/threadless.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use timely::WorkerConfig;
55
fn main() {
66

77
// create a naked single-threaded worker.
8-
let allocator = timely::communication::allocator::Thread::new();
8+
let allocator = timely::communication::allocator::Thread::default();
99
let mut worker = timely::worker::Worker::new(WorkerConfig::default(), allocator);
1010

1111
// create input and probe handles.

timely/src/dataflow/channels/pullers/counter.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ pub struct ConsumedGuard<T: Ord + Clone + 'static> {
2424

2525
impl<T:Ord+Clone+'static> ConsumedGuard<T> {
2626
pub(crate) fn time(&self) -> &T {
27-
&self.time.as_ref().unwrap()
27+
self.time.as_ref().unwrap()
2828
}
2929
}
3030

timely/src/dataflow/channels/pushers/exchange.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ where
7171
let hash_func = &mut self.hash_func;
7272

7373
// if the number of pushers is a power of two, use a mask
74-
if (self.pushers.len() & (self.pushers.len() - 1)) == 0 {
74+
if self.pushers.len().is_power_of_two() {
7575
let mask = (self.pushers.len() - 1) as u64;
7676
let pushers = &mut self.pushers;
7777
data.push_partitioned(

timely/src/dataflow/operators/aggregation/aggregate.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ impl<S: Scope, K: ExchangeData+Hash+Eq, V: ExchangeData> Aggregate<S, K, V> for
7676
hash: H) -> Stream<S, R> where S::Timestamp: Eq {
7777

7878
let mut aggregates = HashMap::new();
79-
self.unary_notify(Exchange::new(move |&(ref k, _)| hash(k)), "Aggregate", vec![], move |input, output, notificator| {
79+
self.unary_notify(Exchange::new(move |(k, _)| hash(k)), "Aggregate", vec![], move |input, output, notificator| {
8080

8181
// read each input, fold into aggregates
8282
input.for_each(|time, data| {

timely/src/dataflow/operators/aggregation/state_machine.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ use crate::dataflow::{Stream, Scope};
77
use crate::dataflow::operators::generic::operator::Operator;
88
use crate::dataflow::channels::pact::Exchange;
99

10+
/// Provides the `state_machine` method.
11+
///
1012
/// Generic state-transition machinery: each key has a state, and receives a sequence of events.
1113
/// Events are applied in time-order, but no other promises are made. Each state transition can
1214
/// produce output, which is sent.
@@ -15,8 +17,6 @@ use crate::dataflow::channels::pact::Exchange;
1517
/// updates for the current time reflected in the notificator, though. In the case of partially
1618
/// ordered times, the only guarantee is that updates are not applied out of order, not that there
1719
/// is some total order on times respecting the total order (updates may be interleaved).
18-
19-
/// Provides the `state_machine` method.
2020
pub trait StateMachine<S: Scope, K: ExchangeData+Hash+Eq, V: ExchangeData> {
2121
/// Tracks a state for each presented key, using user-supplied state transition logic.
2222
///
@@ -66,7 +66,7 @@ impl<S: Scope, K: ExchangeData+Hash+Eq, V: ExchangeData> StateMachine<S, K, V> f
6666
let mut pending: HashMap<_, Vec<(K, V)>> = HashMap::new(); // times -> (keys -> state)
6767
let mut states = HashMap::new(); // keys -> state
6868

69-
self.unary_notify(Exchange::new(move |&(ref k, _)| hash(k)), "StateMachine", vec![], move |input, output, notificator| {
69+
self.unary_notify(Exchange::new(move |(k, _)| hash(k)), "StateMachine", vec![], move |input, output, notificator| {
7070

7171
// go through each time with data, process each (key, val) pair.
7272
notificator.for_each(|time,_,_| {
@@ -88,7 +88,7 @@ impl<S: Scope, K: ExchangeData+Hash+Eq, V: ExchangeData> StateMachine<S, K, V> f
8888

8989
// stash if not time yet
9090
if notificator.frontier(0).less_than(time.time()) {
91-
pending.entry(time.time().clone()).or_insert_with(Vec::new).extend(data.drain(..));
91+
pending.entry(time.time().clone()).or_insert_with(Vec::new).append(data);
9292
notificator.notify_at(time.retain());
9393
}
9494
else {

timely/src/dataflow/operators/branch.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ impl<S: Scope, D: Data> Branch<S, D> for Stream<S, D> {
5454
let mut out1 = output1_handle.session(&time);
5555
let mut out2 = output2_handle.session(&time);
5656
for datum in data.drain(..) {
57-
if condition(&time.time(), &datum) {
57+
if condition(time.time(), &datum) {
5858
out2.give(datum);
5959
} else {
6060
out1.give(datum);
@@ -107,7 +107,7 @@ impl<S: Scope, C: Container> BranchWhen<S::Timestamp> for StreamCore<S, C> {
107107
let mut output2_handle = output2.activate();
108108

109109
input.for_each(|time, data| {
110-
let mut out = if condition(&time.time()) {
110+
let mut out = if condition(time.time()) {
111111
output2_handle.session(&time)
112112
} else {
113113
output1_handle.session(&time)

timely/src/dataflow/operators/capability.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,13 +40,13 @@ pub trait CapabilityTrait<T: Timestamp> {
4040
fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>) -> bool;
4141
}
4242

43-
impl<'a, T: Timestamp, C: CapabilityTrait<T>> CapabilityTrait<T> for &'a C {
43+
impl<T: Timestamp, C: CapabilityTrait<T>> CapabilityTrait<T> for &C {
4444
fn time(&self) -> &T { (**self).time() }
4545
fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>) -> bool {
4646
(**self).valid_for_output(query_buffer)
4747
}
4848
}
49-
impl<'a, T: Timestamp, C: CapabilityTrait<T>> CapabilityTrait<T> for &'a mut C {
49+
impl<T: Timestamp, C: CapabilityTrait<T>> CapabilityTrait<T> for &mut C {
5050
fn time(&self) -> &T { (**self).time() }
5151
fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>) -> bool {
5252
(**self).valid_for_output(query_buffer)
@@ -227,9 +227,10 @@ impl Error for DowngradeError {}
227227
/// A shared list of shared output capability buffers.
228228
type CapabilityUpdates<T> = Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<T>>>>>>;
229229

230-
/// An capability of an input port. Holding onto this capability will implicitly holds onto a
231-
/// capability for all the outputs ports this input is connected to, after the connection summaries
232-
/// have been applied.
230+
/// An capability of an input port.
231+
///
232+
/// Holding onto this capability will implicitly holds onto a capability for all the outputs
233+
/// ports this input is connected to, after the connection summaries have been applied.
233234
///
234235
/// This input capability supplies a `retain_for_output(self)` method which consumes the input
235236
/// capability and turns it into a [Capability] for a specific output port.

timely/src/dataflow/operators/core/enterleave.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ pub trait Leave<G: Scope, C: Container> {
103103
fn leave(&self) -> StreamCore<G, C>;
104104
}
105105

106-
impl<'a, G: Scope, C: Clone+Container, T: Timestamp+Refines<G::Timestamp>> Leave<G, C> for StreamCore<Child<'a, G, T>, C> {
106+
impl<G: Scope, C: Clone+Container, T: Timestamp+Refines<G::Timestamp>> Leave<G, C> for StreamCore<Child<'_, G, T>, C> {
107107
fn leave(&self) -> StreamCore<G, C> {
108108

109109
let scope = self.scope();

timely/src/dataflow/operators/core/exchange.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,18 +23,18 @@ pub trait Exchange<C: PushPartitioned> {
2323
/// .inspect(|x| println!("seen: {:?}", x));
2424
/// });
2525
/// ```
26-
fn exchange<F: 'static>(&self, route: F) -> Self
26+
fn exchange<F>(&self, route: F) -> Self
2727
where
28-
for<'a> F: FnMut(&C::Item<'a>) -> u64;
28+
for<'a> F: FnMut(&C::Item<'a>) -> u64 + 'static;
2929
}
3030

3131
impl<G: Scope, C> Exchange<C> for StreamCore<G, C>
3232
where
3333
C: PushPartitioned + ExchangeData,
3434
{
35-
fn exchange<F: 'static>(&self, route: F) -> StreamCore<G, C>
35+
fn exchange<F>(&self, route: F) -> StreamCore<G, C>
3636
where
37-
for<'a> F: FnMut(&C::Item<'a>) -> u64,
37+
for<'a> F: FnMut(&C::Item<'a>) -> u64 + 'static,
3838
{
3939
self.unary(ExchangeCore::new(route), "Exchange", |_, _| {
4040
move |input, output| {

0 commit comments

Comments
 (0)