Skip to content

Commit

Permalink
libvuln SQL: POC to introduce bulk inserts
Browse files Browse the repository at this point in the history
POC to demonstrate the usage and benefits of executing bulk queries.

Signed-off-by: vishnuchalla <[email protected]>
  • Loading branch information
vishnuchalla committed Jul 5, 2023
1 parent 61bccf2 commit 4c86a75
Showing 1 changed file with 74 additions and 53 deletions.
127 changes: 74 additions & 53 deletions datastore/postgres/updatevulnerabilities.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ var (
)
)

type HashData struct {
HashKind string
Hash interface{}
}

// UpdateVulnerabilities implements vulnstore.Updater.
//
// It creates a new UpdateOperation for this update call, inserts the
Expand All @@ -54,26 +59,6 @@ func (s *MatcherStore) UpdateVulnerabilities(ctx context.Context, updater string
const (
// Create makes a new update operation and returns the reference and ID.
create = `INSERT INTO update_operation (updater, fingerprint, kind) VALUES ($1, $2, 'vulnerability') RETURNING id, ref;`
// Insert attempts to create a new vulnerability. It fails silently.
insert = `
INSERT INTO vuln (
hash_kind, hash,
name, updater, description, issued, links, severity, normalized_severity,
package_name, package_version, package_module, package_arch, package_kind,
dist_id, dist_name, dist_version, dist_version_code_name, dist_version_id, dist_arch, dist_cpe, dist_pretty_name,
repo_name, repo_key, repo_uri,
fixed_in_version, arch_operation, version_kind, vulnerable_range
) VALUES (
$1, $2,
$3, $4, $5, $6, $7, $8, $9,
$10, $11, $12, $13, $14,
$15, $16, $17, $18, $19, $20, $21, $22,
$23, $24, $25,
$26, $27, $28, VersionRange($29, $30)
)
ON CONFLICT (hash_kind, hash) DO NOTHING;`
// Assoc associates an update operation and a vulnerability. It fails
// silently.
assoc = `
INSERT INTO uo_vuln (uo, vuln) VALUES (
$3,
Expand Down Expand Up @@ -118,42 +103,78 @@ func (s *MatcherStore) UpdateVulnerabilities(ctx context.Context, updater string

// batch insert vulnerabilities
skipCt := 0

start = time.Now()

mBatcher := microbatch.NewInsert(tx, 2000, time.Minute)
for _, vuln := range vulns {
if vuln.Package == nil || vuln.Package.Name == "" {
skipCt++
continue
}

pkg := vuln.Package
dist := vuln.Dist
repo := vuln.Repo
if dist == nil {
dist = &zeroDist
batchSize := 1000
totalVulns := len(vulns)
numBatches := (totalVulns + batchSize - 1) / batchSize
mBatcher := microbatch.NewInsert(tx, 1000, time.Minute)
for batchIndex := 0; batchIndex < numBatches; batchIndex++ {
// Insert attempts to create a new vulnerabilities. It fails silently.
insert_query := `
INSERT INTO vuln (
hash_kind, hash,
name, updater, description, issued, links, severity, normalized_severity,
package_name, package_version, package_module, package_arch, package_kind,
dist_id, dist_name, dist_version, dist_version_code_name, dist_version_id, dist_arch, dist_cpe, dist_pretty_name,
repo_name, repo_key, repo_uri,
fixed_in_version, arch_operation, version_kind, vulnerable_range
) VALUES %s
ON CONFLICT (hash_kind, hash) DO NOTHING;`
insert_values := []interface{}{}
assoc_values := []HashData{}
placeholders := []string{}
startIndex := batchIndex * batchSize
endIndex := (batchIndex + 1) * batchSize
if endIndex > totalVulns {
endIndex = totalVulns
}
if repo == nil {
repo = &zeroRepo
j := 0
for i := startIndex; i < endIndex; i++ {
vuln := vulns[i]
if vuln.Package == nil || vuln.Package.Name == "" {
skipCt++
continue
}
pkg := vuln.Package
dist := vuln.Dist
repo := vuln.Repo
if dist == nil {
dist = &zeroDist
}
if repo == nil {
repo = &zeroRepo
}
hashKind, hash := md5Vuln(vuln)
vKind, vrLower, vrUpper := rangefmt(vuln.Range)
rowValues := fmt.Sprintf("($%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, VersionRange($%d, $%d))", j*30+1, j*30+2, j*30+3, j*30+4, j*30+5, j*30+6, j*30+7, j*30+8, j*30+9, j*30+10, j*30+11, j*30+12, j*30+13, j*30+14, j*30+15, j*30+16, j*30+17, j*30+18, j*30+19, j*30+20, j*30+21, j*30+22, j*30+23, j*30+24, j*30+25, j*30+26, j*30+27, j*30+28, j*30+29, j*30+30)
placeholders = append(placeholders, rowValues)
insert_values = append(insert_values, hashKind, hash,
vuln.Name, vuln.Updater, vuln.Description, vuln.Issued, vuln.Links, vuln.Severity, vuln.NormalizedSeverity,
pkg.Name, pkg.Version, pkg.Module, pkg.Arch, pkg.Kind,
dist.DID, dist.Name, dist.Version, dist.VersionCodeName, dist.VersionID, dist.Arch, dist.CPE, dist.PrettyName,
repo.Name, repo.Key, repo.URI,
vuln.FixedInVersion, vuln.ArchOperation, vKind, vrLower, vrUpper)
hashData := HashData{
HashKind: hashKind,
Hash: hash,
}
assoc_values = append(assoc_values, hashData)
j += 1
}
hashKind, hash := md5Vuln(vuln)
vKind, vrLower, vrUpper := rangefmt(vuln.Range)

err := mBatcher.Queue(ctx, insert,
hashKind, hash,
vuln.Name, vuln.Updater, vuln.Description, vuln.Issued, vuln.Links, vuln.Severity, vuln.NormalizedSeverity,
pkg.Name, pkg.Version, pkg.Module, pkg.Arch, pkg.Kind,
dist.DID, dist.Name, dist.Version, dist.VersionCodeName, dist.VersionID, dist.Arch, dist.CPE, dist.PrettyName,
repo.Name, repo.Key, repo.URI,
vuln.FixedInVersion, vuln.ArchOperation, vKind, vrLower, vrUpper,
)
if err != nil {
return uuid.Nil, fmt.Errorf("failed to queue vulnerability: %w", err)
bulkValues := strings.Join(placeholders, ", ")
insert_query = fmt.Sprintf(insert_query, bulkValues)
if len(insert_values) == 0 {
zlog.Debug(ctx).Msg("Bulk operations omitted because of no data")
} else {
_, err = s.pool.Exec(context.Background(), insert_query, insert_values...)
if err != nil {
return uuid.Nil, fmt.Errorf("failed to perform bulk insert vulnerabilities: %w", err)
}
}

if err := mBatcher.Queue(ctx, assoc, hashKind, hash, id); err != nil {
return uuid.Nil, fmt.Errorf("failed to queue association: %w", err)
for _, hashData := range assoc_values {
if err := mBatcher.Queue(ctx, assoc, hashData.HashKind, hashData.Hash, id); err != nil {
return uuid.Nil, fmt.Errorf("failed to queue association: %w", err)
}
}
}
if err := mBatcher.Done(ctx); err != nil {
Expand Down Expand Up @@ -251,4 +272,4 @@ func rangefmt(r *claircore.Range) (kind *string, lower, upper string) {
upper = buf.String()

return kind, lower, upper
}
}

0 comments on commit 4c86a75

Please sign in to comment.