Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 42 additions & 39 deletions pkg/sources/filesystem/filesystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,14 @@ func (s *Source) Init(aCtx trContext.Context, name string, jobId sources.JobID,
if err != nil {
return fmt.Errorf("unable to create filter: %w", err)
}

s.filter = filter

err = s.setMaxSymlinkDepth(&conn)
if err != nil {
return err
}

return nil
}

Expand All @@ -100,7 +103,9 @@ func (s *Source) setMaxSymlinkDepth(conn *sourcespb.Filesystem) error {
defaultMaxSymlinkDepth,
)
}

s.maxSymlinkDepth = depth

return nil
}

Expand Down Expand Up @@ -133,28 +138,31 @@ func (s *Source) Chunks(ctx trContext.Context, chunksChan chan *sources.Chunk, _
continue
}

initialDepth := 0

if fileInfo.Mode()&os.ModeSymlink != 0 {
// if the root path is a symlink we scan the symlink
lctx.Logger().V(5).Info("Root path is a symlink")
initialDepth := 0
err = s.scanSymlink(ctx, chunksChan, rootPath, initialDepth, cleanRootPath)
if err := s.scanSymlink(ctx, chunksChan, rootPath, initialDepth, cleanRootPath); err != nil {
lctx.Logger().Error(err, "error scanning root path symlink")
}

s.ClearEncodedResumeInfoFor(rootPath)
} else if fileInfo.IsDir() {
lctx.Logger().V(5).Info("Root path is a dir")
initialDepth := 0
err = s.scanDir(ctx, chunksChan, rootPath, initialDepth, cleanRootPath)
if err := s.scanDir(ctx, chunksChan, rootPath, initialDepth, cleanRootPath); err != nil {
lctx.Logger().Error(err, "error scanning root path directory")
}

s.ClearEncodedResumeInfoFor(rootPath)
} else if !fileInfo.Mode().IsRegular() {
lctx.Logger().V(2).Info("Root path is a non-regular file; skipping")
} else {
if !fileInfo.Mode().IsRegular() {
lctx.Logger().V(2).Info("Root path is a non-regular file; skipping")
continue
}
lctx.Logger().V(5).Info("Root path is a file")
err = s.scanFile(ctx, chunksChan, cleanRootPath)
}

if err != nil && !errors.Is(err, io.EOF) {
lctx.Logger().Error(err, "error scanning filesystem")
if err := s.scanFile(ctx, chunksChan, cleanRootPath); err != nil {
if !errors.Is(err, io.EOF) {
lctx.Logger().Error(err, "error scanning filesystem")
}
}
}
}

Expand All @@ -176,7 +184,6 @@ func (s *Source) scanSymlink(
lctx.Logger().V(5).Info("scanSymlink")

if !s.canFollowSymlinks() {
// If the file or directory is a symlink but the followSymlinks is disable ignore the path
lctx.Logger().V(2).Info("Path is a symlink, but following symlinks is not allowed; skipping")
return nil
}
Expand Down Expand Up @@ -259,8 +266,6 @@ func (s *Source) scanDir(

lctx.Logger().V(5).Info("scanDir")

// check if the full path is not matching any pattern in include
// FilterRuleSet and matching any exclude FilterRuleSet.
if s.filter != nil && s.filter.ShouldExclude(path) {
lctx.Logger().V(2).Info("Path was filtered out by filter.ShouldExclude; skipping")
return nil
Expand Down Expand Up @@ -379,7 +384,6 @@ func (s *Source) scanDir(
func (s *Source) scanFile(ctx trContext.Context, chunksChan chan *sources.Chunk, path string) error {
ctx.Logger().V(5).Info("scanFile")

// Check if file is binary and should be skipped
if (s.skipBinaries || feature.ForceSkipBinaries.Load()) && common.IsBinary(path) {
ctx.Logger().V(5).Info("File is binary; skipping")
return nil
Expand Down Expand Up @@ -454,31 +458,35 @@ func (s *Source) ChunkUnit(ctx trContext.Context, unit sources.SourceUnit, repor
}

ch := make(chan *sources.Chunk)
var scanErr error
var reportErr error
initialDepth := 0

go func() {
defer close(ch)
if fileInfo.Mode()&os.ModeSymlink != 0 {
// if the root path is a symlink we scan the symlink
lctx.Logger().V(5).Info("Root unit is a symlink")
initialDepth := 0
scanErr = s.scanSymlink(ctx, ch, rootPath, initialDepth, cleanPath)
if err := s.scanSymlink(ctx, ch, rootPath, initialDepth, cleanPath); err != nil {
lctx.Logger().Error(err, "error scanning root unit symlink")
reportErr = reporter.ChunkErr(ctx, err)
}

s.ClearEncodedResumeInfoFor(rootPath)
} else if fileInfo.IsDir() {
lctx.Logger().V(5).Info("Root unit is a directory")
initialDepth := 0
// TODO: Finer grain error tracking of individual chunks.
scanErr = s.scanDir(ctx, ch, rootPath, initialDepth, cleanPath)
s.ClearEncodedResumeInfoFor(rootPath)
} else {
// TODO: Finer grain error tracking of individual
// chunks (in the case of archives).
if !fileInfo.Mode().IsRegular() {
lctx.Logger().V(2).Info("Root unit is a non-regular file; skipping")
return
if err := s.scanDir(ctx, ch, rootPath, initialDepth, cleanPath); err != nil {
lctx.Logger().Error(err, "error scanning root unit directory")
reportErr = reporter.ChunkErr(ctx, err)
}

s.ClearEncodedResumeInfoFor(rootPath)
} else if !fileInfo.Mode().IsRegular() {
lctx.Logger().V(2).Info("Root unit is a non-regular file; skipping")
} else {
lctx.Logger().V(5).Info("Root unit is a file")
scanErr = s.scanFile(ctx, ch, cleanPath)
if err := s.scanFile(ctx, ch, cleanPath); err != nil && !errors.Is(err, io.EOF) {
lctx.Logger().Error(err, "error scanning root unit file")
reportErr = reporter.ChunkErr(ctx, err)
}
}
}()

Expand All @@ -492,10 +500,5 @@ func (s *Source) ChunkUnit(ctx trContext.Context, unit sources.SourceUnit, repor
}
}

if scanErr != nil && !errors.Is(scanErr, io.EOF) {
lctx.Logger().Error(scanErr, "error scanning filesystem")
return reporter.ChunkErr(ctx, scanErr)
}

return nil
return reportErr
}
Loading