Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce unordered timestamps do a dataflow #532

Open
rafaelcgs10 opened this issue Sep 13, 2023 · 4 comments
Open

Introduce unordered timestamps do a dataflow #532

rafaelcgs10 opened this issue Sep 13, 2023 · 4 comments

Comments

@rafaelcgs10
Copy link

How one can introduce data with unordered timestamps?

The example from examples/unordered_input.rs sends data with timestamps ordered.

I cannot use delayed to send data with timestamp 9 after sending 10, for example.

I am asking this because I would like to validate that an algorithm still works even if data is not in order.

@frankmcsherry
Copy link
Member

You should be able to send data with timestamp 9 after sending 10, because delayed does not modify the capability on which it is called. What actually causes the downgrade of cap in the example is the cap = ... bit. To use an unordered input you do need to retain a capability with some time, but you can delayed it to any time greater or equal to the capability time. You could for example do

input.session(cap.delayed(&(round + rand() % 5))).give(round);

which would use some random time within 5 of the current round, almost certainly producing out of order data. The next line, cap = cap.delayed(...);, you still want in order to allow the system to move forward (you are committing at this point to times at least the delayed-to time).

@rafaelcgs10
Copy link
Author

rafaelcgs10 commented Sep 14, 2023

Thanks for the answer, Frank.

I was not aware that delayed would not downgrade cap. Only when it is replaced with cap = ....

I find this implicit downgrade confusing.

For example,

my initial capability is 5:
cap = cap.delayed(&RootTimestamp::new(5));

Data is sent with timestamp 8 and 6 from capability 5

input
    .session(cap.delayed(&RootTimestamp::new(8)))
    .give("data2");
input
    .session(cap.delayed(&RootTimestamp::new(6)))
    .give("data1");

Next, we replace capability 5 with capability 7:

cap = cap.delayed(&RootTimestamp::new(7));
worker.step();

This cause the call of drop for cap with timestamp 5, correct?
I suppose this only changes the multiplicity of timestamp 5, I didn't get if this will also inform the system about timestamps 6 also not being sent anymore.

Will this capability replacement also allow the processing of data1 with timestamp 6 at some frontiered operator downstream after worker step?

Or do I need to also downgrade capability 7 to cause data1 with timestamp 6 to process?

cap = cap.delayed(&RootTimestamp::new(8));

@frankmcsherry
Copy link
Member

I didn't get if this will also inform the system about timestamps 6 also not being sent anymore.

It will. Strictly speaking, it only informs the system that this capability is no longer able to produce timestamp 6, but if this is the only input capability that you have, when you advance from 5 to 7, you should see the output for 6. Let me know if that does not happen!

@rafaelcgs10
Copy link
Author

It happens!

I managed to create an example of sending data out of order and advancing the frontier gradually:

extern crate timely;
extern crate timely_communication;

use std::collections::HashMap;
use std::println;
use timely::dataflow::channels::pact::Pipeline;
use timely::dataflow::operators::*;
use timely::progress::timestamp::RootTimestamp;
use timely_communication::Configuration;

use timely::dataflow::operators::{Operator, Probe};
use timely::dataflow::ProbeHandle;

fn main() {
    timely::execute(Configuration::Thread, |worker| {
        let mut probe = ProbeHandle::new();

        let (mut input, mut cap) = worker.dataflow(|scope| {
            let (input, stream) = scope.new_unordered_input();
            stream
                .inspect_batch(move |t, xs| {
                    for x in xs.iter() {
                        println!("streamed {} @ {:?}", x, t)
                    }
                })
                .unary_frontier(Pipeline, "batcher", |_capability, _info| {
                    let mut buffer = HashMap::new();

                    move |input, output| {
                        while let Some((time, data)) = input.next() {
                            buffer
                                .entry(time.retain())
                                .or_insert(Vec::new())
                                .push(data.take());
                        }

                        for (key, val) in buffer.iter_mut() {
                            if !input.frontier().less_equal(key.time()) {
                                let mut session = output.session(key);
                                for mut batch in val.drain(..) {
                                    for value in batch.drain(..) {
                                        session.give(value);
                                    }
                                }
                            }
                        }

                        buffer.retain(|_key, val| !val.is_empty());
                    }
                })
                .inspect_batch(move |t, xs| {
                    for x in xs.iter() {
                        println!("batched {} @ {:?}", x, t)
                    }
                })
                .probe_with(&mut probe);

            input
        });

        cap = cap.delayed(&RootTimestamp::new(0));

        input.session(cap.delayed(&RootTimestamp::new(2))).give(3);
        input.session(cap.delayed(&RootTimestamp::new(0))).give(0);
        input.session(cap.delayed(&RootTimestamp::new(5))).give(1);
        input.session(cap.delayed(&RootTimestamp::new(5))).give(2);

        worker.step();

        println!("Replaces initial cap by 4");
        cap = cap.delayed(&RootTimestamp::new(4));
        while probe.less_than(&RootTimestamp::new(4)) {
            worker.step();
        }

        println!("Replaces cap 4 by 5");
        cap = cap.delayed(&RootTimestamp::new(5));
        while probe.less_than(&RootTimestamp::new(5)) {
            worker.step();
        }

        println!("Replaces cap 5 by 7");
        cap = cap.delayed(&RootTimestamp::new(7));
        while probe.less_than(&RootTimestamp::new(7)) {
            worker.step();
        }

        println!("Finish");
    })
    .unwrap();
}

output:

streamed 3 @ (Root, 2)
streamed 0 @ (Root, 0)
streamed 1 @ (Root, 5)
streamed 2 @ (Root, 5)
Replaces initial cap by 4
batched 0 @ (Root, 0)
batched 3 @ (Root, 2)
Replaces cap 4 by 5
Replaces cap 5 by 7
batched 1 @ (Root, 5)
batched 2 @ (Root, 5)
Finish

I should dive more into Timely's implementation to see how this works.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants