Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to handle context cancellations for TCP protocol #1389

Open
wants to merge 13 commits into
base: main
Choose a base branch
from

Conversation

tinybit
Copy link
Collaborator

@tinybit tinybit commented Aug 27, 2024

Summary

Issue:

Checklist

Delete items not relevant to your PR:

  • Unit and integration tests covering the common scenarios were added
  • A human-readable description of the changes was provided to include in CHANGELOG
  • For significant changes, documentation in https://github.com/ClickHouse/clickhouse-docs was updated with further explanations or tutorials

@tinybit tinybit added the bug label Aug 27, 2024
@tinybit tinybit self-assigned this Aug 27, 2024
@jkaflik jkaflik self-requested a review August 27, 2024 13:24
Copy link
Contributor

@jkaflik jkaflik left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. I will do a small testing on my side and then we can merge.

conn.go Outdated
Comment on lines 240 to 250
if c.closed {
err := errors.New("attempted sending on closed connection")
c.debugf("[send data] err: %v", err)
return err
}

if c.buffer == nil {
err := errors.New("attempted sending on nil buffer")
c.debugf("[send data] err: %v", err)
return err
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it something you found during debugging or presumably want to safe?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, this is outdated code, please check the latest changes
https://github.com/ClickHouse/clickhouse-go/pull/1389/files

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, safety is not presumed, it's necessary here.
previously, in conn_process.go access to c.close and c.buffer was single-threaded.
now i'm reading data in background goroutine and in foreground thread i'm waiting on select:

// do reads in background
errCh := make(chan error, 1)
doneCh := make(chan bool, 1)

go func() {
	err := c.processImpl(ctx, on) // this accesses c.reader internally (reads data)
	if err != nil {
		errCh <- err
		return
	}

	doneCh <- true
}()

// select on context or read channel (errors)
select {
case <-ctx.Done():
	c.cancel() // calls c.close(), it accesses c.reader internally (sets c.reader to nil)
	return ctx.Err()

case err := <-errCh:
	return err

case <-doneCh:
	return nil
}
  1. Closing connections must be done very carefully. Currently conn.Close() is broken because multiple calls to conn.Close() have issues deadlocking the thread it was called from

  2. Also multiple calls to c.close() from multiple threads is not thread safe, doing so will result in data races.

So, this needed to be fixed from this:

func (c *connect) close() error {
    if c.closed {
        return nil
    }
    c.closed = true
<...>

To a thread-safe version of it

func (c *connect) close() error {
	c.closeMutex.Lock()
	if c.closed {
		c.closeMutex.Unlock()
		return nil
	}
	c.closed = true
	c.closeMutex.Unlock()
<...>

This guarantees that under no circumstances c.close() can be called twice no matter what.

  1. Besides this we need to guarantee threadsafe access to c.reader from c.processImpl(), c.cancel() and c.close(), hence I've added c.readerMutex()

  2. One more thing: the order of operations during closing a connection:
    Old version:

func (c *connect) close() error {
	c.closeMutex.Lock()
	if c.closed {
		c.closeMutex.Unlock()
		return nil
	}
	c.closed = true
	c.closeMutex.Unlock()

	c.buffer = nil

	c.readerMutex.Lock()
	c.reader = nil
	c.readerMutex.Unlock()

	if err := c.conn.Close(); err != nil {
		return err
	}
	
	return nil
}

As you can see here, we're changing c.buffer and c.reader that might be currently reading/sending data, before we close the network connection. Connection must be closed first, in order to stop any i/o operations on c.buffer and c.reader (and related contexts), then it's safe to deallocale c.buffer and c.reader.

func (c *connect) close() error {
	c.closeMutex.Lock()
	if c.closed {
		c.closeMutex.Unlock()
		return nil
	}
	c.closed = true
	c.closeMutex.Unlock()

	if err := c.conn.Close(); err != nil {
		return err
	}

	c.buffer = nil

	c.readerMutex.Lock()
	c.reader = nil
	c.readerMutex.Unlock()

	
	return nil
}

conn.go Outdated
@@ -131,6 +139,8 @@ type connect struct {
readTimeout time.Duration
blockBufferSize uint8
maxCompressionBuffer int
mutex sync.Mutex
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you think it makes sense to name it explicit?

Suggested change
mutex sync.Mutex
readerMutex sync.Mutex

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, good idea

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

conn.go Outdated
Comment on lines 258 to 268
if c.isClosed() {
err := errors.New("attempted sending on closed connection")
c.debugf("[send data] err: %v", err)
return err
}

if c.buffer == nil {
err := errors.New("attempted sending on nil buffer")
c.debugf("[send data] err: %v", err)
return err
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it something you found when while debugging or only added for safety?

Copy link
Collaborator Author

@tinybit tinybit Aug 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, this is outdated code, please check the latest changes
https://github.com/ClickHouse/clickhouse-go/pull/1389/files

it was all found during debugging, yes.

note: if c.buffer == nil { is already removed in latest commit, it was incorrect assumption on my side, trying to solve synchronisation issues.

if c.isClosed() { check is necessary. We can't perform read/send operations on closed connections

@jkaflik jkaflik changed the title Add ability to handle context cancellations Add ability to handle context cancellations for TCP protocol Aug 30, 2024
@jkaflik
Copy link
Contributor

jkaflik commented Aug 30, 2024

I conducted basic tests. This will be merged with next minor release in two weeks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants