diff --git a/samples/sample_kclpy_app.py b/samples/sample_kclpy_app.py index 9cf1ad2..5d40f5a 100755 --- a/samples/sample_kclpy_app.py +++ b/samples/sample_kclpy_app.py @@ -32,7 +32,7 @@ class RecordProcessor(processor.RecordProcessorBase): a scaling change. """ def __init__(self): - self._SLEEP_SECONDS = 5 + self._CHECKPOINT_SLEEP_SECONDS = 5 self._CHECKPOINT_RETRIES = 5 self._CHECKPOINT_FREQ_SECONDS = 60 self._largest_seq = (None, None) @@ -49,7 +49,49 @@ def initialize(self, initialize_input): self._largest_seq = (None, None) self._last_checkpoint_time = time.time() - def checkpoint(self, checkpointer, sequence_number=None, sub_sequence_number=None): + def process_records(self, process_records_input): + """ + Called by a KCLProcess with a list of records to be processed and a checkpointer which accepts sequence numbers + from the records to indicate where in the stream to checkpoint. + + :param amazon_kclpy.messages.ProcessRecordsInput process_records_input: the records, and metadata about the + records. + """ + try: + for record in process_records_input.records: + data = record.binary_data + seq = int(record.sequence_number) + sub_seq = record.sub_sequence_number + key = record.partition_key + self._process_record(data, key, seq, sub_seq) + if self._should_update_sequence(seq, sub_seq): + self._largest_seq = (seq, sub_seq) + + # + # Checkpoints every self._CHECKPOINT_FREQ_SECONDS seconds + # + if time.time() - self._last_checkpoint_time > self._CHECKPOINT_FREQ_SECONDS: + self._checkpoint(process_records_input.checkpointer, str(self._largest_seq[0]), self._largest_seq[1]) + self._last_checkpoint_time = time.time() + + except Exception as e: + sys.stderr.write("Encountered an exception while processing records. Exception was {e}\n".format(e=e)) + + def _process_record(self, data, partition_key, sequence_number, sub_sequence_number): + """ + Called for each record that is passed to process_records. + + :param str data: The blob of data that was contained in the record. + :param str partition_key: The key associated with this recod. + :param int sequence_number: The sequence number associated with this record. + :param int sub_sequence_number: the sub sequence number associated with this record. + """ + #################################### + # Insert your processing logic here + #################################### + return + + def _checkpoint(self, checkpointer, sequence_number=None, sub_sequence_number=None): """ Checkpoints with retries on retryable exceptions. @@ -80,28 +122,14 @@ def checkpoint(self, checkpointer, sequence_number=None, sub_sequence_number=Non return else: print('Was throttled while checkpointing, will attempt again in {s} seconds' - .format(s=self._SLEEP_SECONDS)) + .format(s=self._CHECKPOINT_SLEEP_SECONDS)) elif 'InvalidStateException' == e.value: sys.stderr.write('MultiLangDaemon reported an invalid state while checkpointing.\n') else: # Some other error sys.stderr.write('Encountered an error while checkpointing, error was {e}.\n'.format(e=e)) - time.sleep(self._SLEEP_SECONDS) + time.sleep(self._CHECKPOINT_SLEEP_SECONDS) - def process_record(self, data, partition_key, sequence_number, sub_sequence_number): - """ - Called for each record that is passed to process_records. - - :param str data: The blob of data that was contained in the record. - :param str partition_key: The key associated with this recod. - :param int sequence_number: The sequence number associated with this record. - :param int sub_sequence_number: the sub sequence number associated with this record. - """ - #################################### - # Insert your processing logic here - #################################### - return - - def should_update_sequence(self, sequence_number, sub_sequence_number): + def _should_update_sequence(self, sequence_number, sub_sequence_number): """ Determines whether a new larger sequence number is available @@ -110,35 +138,7 @@ def should_update_sequence(self, sequence_number, sub_sequence_number): :return boolean: true if the largest sequence should be updated, false otherwise """ return self._largest_seq == (None, None) or sequence_number > self._largest_seq[0] or \ - (sequence_number == self._largest_seq[0] and sub_sequence_number > self._largest_seq[1]) - - def process_records(self, process_records_input): - """ - Called by a KCLProcess with a list of records to be processed and a checkpointer which accepts sequence numbers - from the records to indicate where in the stream to checkpoint. - - :param amazon_kclpy.messages.ProcessRecordsInput process_records_input: the records, and metadata about the - records. - """ - try: - for record in process_records_input.records: - data = record.binary_data - seq = int(record.sequence_number) - sub_seq = record.sub_sequence_number - key = record.partition_key - self.process_record(data, key, seq, sub_seq) - if self.should_update_sequence(seq, sub_seq): - self._largest_seq = (seq, sub_seq) - - # - # Checkpoints every self._CHECKPOINT_FREQ_SECONDS seconds - # - if time.time() - self._last_checkpoint_time > self._CHECKPOINT_FREQ_SECONDS: - self.checkpoint(process_records_input.checkpointer, str(self._largest_seq[0]), self._largest_seq[1]) - self._last_checkpoint_time = time.time() - - except Exception as e: - sys.stderr.write("Encountered an exception while processing records. Exception was {e}\n".format(e=e)) + (sequence_number == self._largest_seq[0] and sub_sequence_number > self._largest_seq[1]) def shutdown(self, shutdown_input): """