From 88da50674fbffb5cb339d61503d2b89aecfc1823 Mon Sep 17 00:00:00 2001 From: zeripath Date: Mon, 2 May 2022 13:44:45 +0100 Subject: [PATCH] Add finalizers to ensure that repos are closed and blobreaders are closed (#19495) (#19496) It may be prudent to add runtime finalizers to the git.Repository and git.blobReader objects to absolutely ensure that these are both properly cancelled, cleaned and closed out. This commit is a backport of an extract from #19448 Signed-off-by: Andrew Thornton --- modules/git/blob_nogogit.go | 61 +++++++++++++++++++++++++++++++------- modules/git/repo_base_gogit.go | 51 ++++++++++++++++++++++++++++---- modules/git/repo_base_nogogit.go | 63 ++++++++++++++++++++++++++++++++++++++-- 3 files changed, 157 insertions(+), 18 deletions(-) diff --git a/modules/git/blob_nogogit.go b/modules/git/blob_nogogit.go index aabf1b34ad..512dea1eb0 100644 --- a/modules/git/blob_nogogit.go +++ b/modules/git/blob_nogogit.go @@ -12,8 +12,11 @@ import ( "bytes" "io" "math" + "runtime" + "sync" "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/process" ) // Blob represents a Git object. @@ -54,11 +57,15 @@ func (b *Blob) DataAsync() (io.ReadCloser, error) { return io.NopCloser(bytes.NewReader(bs)), err } - return &blobReader{ + br := &blobReader{ + repo: b.repo, rd: rd, n: size, cancel: cancel, - }, nil + } + runtime.SetFinalizer(br, (*blobReader).finalizer) + + return br, nil } // Size returns the uncompressed size of the blob @@ -86,6 +93,10 @@ func (b *Blob) Size() int64 { } type blobReader struct { + lock sync.Mutex + closed bool + + repo *Repository rd *bufio.Reader n int64 cancel func() @@ -104,27 +115,57 @@ func (b *blobReader) Read(p []byte) (n int, err error) { } // Close implements io.Closer -func (b *blobReader) Close() error { +func (b *blobReader) Close() (err error) { + b.lock.Lock() + defer b.lock.Unlock() + if b.closed { + return + } + return b.close() +} + +func (b *blobReader) close() (err error) { defer b.cancel() + b.closed = true if b.n > 0 { + var n int for b.n > math.MaxInt32 { - n, err := b.rd.Discard(math.MaxInt32) + n, err = b.rd.Discard(math.MaxInt32) b.n -= int64(n) if err != nil { - return err + return } b.n -= math.MaxInt32 } - n, err := b.rd.Discard(int(b.n)) + n, err = b.rd.Discard(int(b.n)) b.n -= int64(n) if err != nil { - return err + return } } if b.n == 0 { - _, err := b.rd.Discard(1) + _, err = b.rd.Discard(1) b.n-- - return err + return + } + return +} + +func (b *blobReader) finalizer() error { + if b == nil { + return nil + } + b.lock.Lock() + defer b.lock.Unlock() + if b.closed { + return nil } - return nil + + pid := "" + if b.repo.Ctx != nil { + pid = " from PID: " + string(process.GetPID(b.repo.Ctx)) + } + log.Error("Finalizer running on unclosed blobReader%s: %s%s", pid, b.repo.Path) + + return b.close() } diff --git a/modules/git/repo_base_gogit.go b/modules/git/repo_base_gogit.go index 7299526562..183c7750ba 100644 --- a/modules/git/repo_base_gogit.go +++ b/modules/git/repo_base_gogit.go @@ -12,8 +12,12 @@ import ( "context" "errors" "path/filepath" + "runtime" + "sync" + "code.gitea.io/gitea/modules/log" gitealog "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/process" "code.gitea.io/gitea/modules/setting" "github.com/go-git/go-billy/v5/osfs" @@ -28,6 +32,9 @@ type Repository struct { tagCache *ObjectCache + lock sync.Mutex + closed bool + gogitRepo *gogit.Repository gogitStorage *filesystem.Storage gpgSettings *GPGSettings @@ -63,23 +70,57 @@ func OpenRepositoryCtx(ctx context.Context, repoPath string) (*Repository, error return nil, err } - return &Repository{ + repo := &Repository{ Path: repoPath, gogitRepo: gogitRepo, gogitStorage: storage, tagCache: newObjectCache(), Ctx: ctx, - }, nil + } + + runtime.SetFinalizer(repo, (*Repository).finalizer) + + return repo, nil } // Close this repository, in particular close the underlying gogitStorage if this is not nil -func (repo *Repository) Close() { - if repo == nil || repo.gogitStorage == nil { +func (repo *Repository) Close() (err error) { + if repo == nil { + return + } + repo.lock.Lock() + defer repo.lock.Unlock() + return repo.close() +} + +func (repo *Repository) close() (err error) { + repo.closed = true + if repo.gogitStorage == nil { return } - if err := repo.gogitStorage.Close(); err != nil { + err = repo.gogitStorage.Close() + if err != nil { gitealog.Error("Error closing storage: %v", err) } + return +} + +func (repo *Repository) finalizer() error { + if repo == nil { + return nil + } + repo.lock.Lock() + defer repo.lock.Unlock() + if !repo.closed { + pid := "" + if repo.Ctx != nil { + pid = " from PID: " + string(process.GetPID(repo.Ctx)) + } + log.Error("Finalizer running on unclosed repository%s: %s%s", pid, repo.Path) + } + + // We still need to run the close fn as it may be possible to reopen the gogitrepo after close + return repo.close() } // GoGitRepo gets the go-git repo representation diff --git a/modules/git/repo_base_nogogit.go b/modules/git/repo_base_nogogit.go index e264fd4a14..5499edb08b 100644 --- a/modules/git/repo_base_nogogit.go +++ b/modules/git/repo_base_nogogit.go @@ -13,8 +13,11 @@ import ( "context" "errors" "path/filepath" + "runtime" + "sync" "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/process" ) // Repository represents a Git repository. @@ -25,6 +28,10 @@ type Repository struct { gpgSettings *GPGSettings + lock sync.Mutex + + closed bool + batchCancel context.CancelFunc batchReader *bufio.Reader batchWriter WriteCloserError @@ -64,29 +71,57 @@ func OpenRepositoryCtx(ctx context.Context, repoPath string) (*Repository, error repo.batchWriter, repo.batchReader, repo.batchCancel = CatFileBatch(ctx, repoPath) repo.checkWriter, repo.checkReader, repo.checkCancel = CatFileBatchCheck(ctx, repo.Path) + runtime.SetFinalizer(repo, (*Repository).finalizer) + return repo, nil } // CatFileBatch obtains a CatFileBatch for this repository func (repo *Repository) CatFileBatch(ctx context.Context) (WriteCloserError, *bufio.Reader, func()) { - if repo.batchCancel == nil || repo.batchReader.Buffered() > 0 { + repo.lock.Lock() + defer repo.lock.Unlock() + + if repo.closed || repo.batchReader.Buffered() > 0 { log.Debug("Opening temporary cat file batch for: %s", repo.Path) return CatFileBatch(ctx, repo.Path) } + + if repo.batchCancel == nil { + repo.batchWriter, repo.batchReader, repo.batchCancel = CatFileBatch(ctx, repo.Path) + } + return repo.batchWriter, repo.batchReader, func() {} } // CatFileBatchCheck obtains a CatFileBatchCheck for this repository func (repo *Repository) CatFileBatchCheck(ctx context.Context) (WriteCloserError, *bufio.Reader, func()) { - if repo.checkCancel == nil || repo.checkReader.Buffered() > 0 { + repo.lock.Lock() + defer repo.lock.Unlock() + + if repo.closed || repo.checkReader.Buffered() > 0 { log.Debug("Opening temporary cat file batch-check: %s", repo.Path) return CatFileBatchCheck(ctx, repo.Path) } + + if repo.checkCancel == nil { + repo.checkWriter, repo.checkReader, repo.checkCancel = CatFileBatchCheck(ctx, repo.Path) + } + return repo.checkWriter, repo.checkReader, func() {} } // Close this repository, in particular close the underlying gogitStorage if this is not nil -func (repo *Repository) Close() { +func (repo *Repository) Close() (err error) { + if repo == nil { + return + } + repo.lock.Lock() + defer repo.lock.Unlock() + + return repo.close() +} + +func (repo *Repository) close() (err error) { if repo == nil { return } @@ -102,4 +137,26 @@ func (repo *Repository) Close() { repo.checkReader = nil repo.checkWriter = nil } + repo.closed = true + return +} + +func (repo *Repository) finalizer() (err error) { + if repo == nil { + return + } + repo.lock.Lock() + defer repo.lock.Unlock() + if repo.closed { + return nil + } + + if repo.batchCancel != nil || repo.checkCancel != nil { + pid := "" + if repo.Ctx != nil { + pid = " from PID: " + string(process.GetPID(repo.Ctx)) + } + log.Error("Finalizer running on unclosed repository%s: %s%s", pid, repo.Path) + } + return repo.close() }