SERVER-78211: Add integral vm functions in SBE

This commit is contained in:
Projjal Chanda
2023-08-02 18:41:54 +00:00
committed by Evergreen Agent
parent 16c722fb79
commit b5e3b9b8aa
5 changed files with 897 additions and 0 deletions

View File

@@ -196,6 +196,7 @@ env.CppUnitTest(
'expressions/sbe_get_element_builtin_test.cpp',
'expressions/sbe_if_test.cpp',
'expressions/sbe_index_of_test.cpp',
'expressions/sbe_integral_test.cpp',
'expressions/sbe_is_array_empty_builtin_test.cpp',
'expressions/sbe_is_member_builtin_test.cpp',
'expressions/sbe_iso_date_to_parts_test.cpp',

View File

@@ -841,6 +841,12 @@ static stdx::unordered_map<std::string, BuiltinFn> kBuiltinFunctions = {
BuiltinFn{[](size_t n) { return n == 1; }, vm::Builtin::aggRemovableSumRemove, true}},
{"aggRemovableSumFinalize",
BuiltinFn{[](size_t n) { return n == 1; }, vm::Builtin::aggRemovableSumFinalize, false}},
{"aggIntegralAdd",
BuiltinFn{[](size_t n) { return n == 2; }, vm::Builtin::aggIntegralAdd, true}},
{"aggIntegralRemove",
BuiltinFn{[](size_t n) { return n == 2; }, vm::Builtin::aggIntegralRemove, true}},
{"aggIntegralFinalize",
BuiltinFn{[](size_t n) { return n == 1; }, vm::Builtin::aggIntegralFinalize, false}},
};
/**

View File

@@ -0,0 +1,460 @@
/**
* Copyright (C) 2020-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include <cstddef>
#include <cstdint>
#include <memory>
#include <tuple>
#include <vector>
#include "mongo/base/string_data.h"
#include "mongo/db/exec/sbe/accumulator_sum_value_enum.h"
#include "mongo/db/exec/sbe/expression_test_base.h"
#include "mongo/db/exec/sbe/expressions/expression.h"
#include "mongo/db/exec/sbe/values/slot.h"
#include "mongo/db/exec/sbe/values/value.h"
#include "mongo/db/query/collation/collator_interface_mock.h"
#include "mongo/unittest/assert.h"
#include "mongo/unittest/framework.h"
namespace mongo::sbe {
enum class IntegralOp { kAdd, kRemove };
class SBEIntegralTest : public EExpressionTestFixture {
public:
std::pair<value::TypeTags, value::Value> initQueue() {
auto [queueTag, queueVal] = value::makeNewArray();
auto queue = value::getArrayView(queueVal);
auto [queueInternalArrTag, queueInternalArrVal] = value::makeNewArray();
auto arr = value::getArrayView(queueInternalArrVal);
arr->push_back(value::TypeTags::Null, 0);
queue->push_back(queueInternalArrTag, queueInternalArrVal);
queue->push_back(value::TypeTags::NumberInt64, 0);
queue->push_back(value::TypeTags::NumberInt64, 0);
return {queueTag, queueVal};
}
std::pair<value::TypeTags, value::Value> initState(boost::optional<int64_t> unitMillis) {
auto [stateTag, stateVal] = value::makeNewArray();
auto state = value::getArrayView(stateVal);
// input queue
auto [inputQueueTag, inputQueueVal] = initQueue();
state->push_back(inputQueueTag, inputQueueVal);
// sortBy queue
auto [sortByQueueTag, sortByQueueVal] = initQueue();
state->push_back(sortByQueueTag, sortByQueueVal);
// sum acc state
auto [removableSumAccTag, removableSumAccVal] = value::makeNewArray();
auto removableSumAcc = value::getArrayView(removableSumAccVal);
auto [sumAccTag, sumAccVal] = value::makeNewArray();
auto sumAcc = value::getArrayView(sumAccVal);
// DoubleDoubleSum Acc
sumAcc->reserve(AggSumValueElems::kMaxSizeOfArray);
sumAcc->push_back(value::TypeTags::NumberInt32, value::bitcastFrom<int32_t>(0));
sumAcc->push_back(value::TypeTags::NumberDouble, value::bitcastFrom<double>(0.0));
sumAcc->push_back(value::TypeTags::NumberDouble, value::bitcastFrom<double>(0.0));
// RemovableSum Acc
removableSumAcc->push_back(sumAccTag, sumAccVal);
removableSumAcc->push_back(value::TypeTags::NumberInt64, 0);
removableSumAcc->push_back(value::TypeTags::NumberInt64, 0);
removableSumAcc->push_back(value::TypeTags::NumberInt64, 0);
removableSumAcc->push_back(value::TypeTags::NumberInt64, 0);
removableSumAcc->push_back(value::TypeTags::NumberInt64, 0);
state->push_back(removableSumAccTag, removableSumAccVal);
// nanCount
state->push_back(value::TypeTags::NumberInt64, 0);
// unitMillis
if (unitMillis) {
state->push_back(value::TypeTags::NumberInt64,
value::bitcastFrom<int64_t>(*unitMillis));
} else {
state->push_back(value::TypeTags::Null, 0);
}
return {stateTag, stateVal};
}
void runAndAssertExpression(boost::optional<int64_t> unitMillis,
std::vector<std::pair<value::TypeTags, value::Value>>& inputValues,
std::vector<std::pair<value::TypeTags, value::Value>>& sortByValues,
std::vector<IntegralOp>& operations,
std::vector<std::pair<value::TypeTags, value::Value>>& expValues) {
value::ViewOfValueAccessor inputAccessor;
auto inputSlot = bindAccessor(&inputAccessor);
value::ViewOfValueAccessor sortByAccessor;
auto sortBySlot = bindAccessor(&sortByAccessor);
value::OwnedValueAccessor aggAccessor;
auto aggSlot = bindAccessor(&aggAccessor);
auto aggIntegralAddExpr = sbe::makeE<sbe::EFunction>(
"aggIntegralAdd",
sbe::makeEs(makeE<EVariable>(inputSlot), makeE<EVariable>(sortBySlot)));
auto compiledIntegralAdd = compileAggExpression(*aggIntegralAddExpr, &aggAccessor);
auto aggIntegralRemoveExpr = sbe::makeE<sbe::EFunction>(
"aggIntegralRemove",
sbe::makeEs(makeE<EVariable>(inputSlot), makeE<EVariable>(sortBySlot)));
auto compiledIntegralRemove = compileAggExpression(*aggIntegralRemoveExpr, &aggAccessor);
auto aggIntegralFinalize = sbe::makeE<sbe::EFunction>(
"aggIntegralFinalize", sbe::makeEs(makeE<EVariable>(aggSlot)));
auto compiledIntegralFinalize = compileExpression(*aggIntegralFinalize);
auto [stateTag, stateVal] = initState(unitMillis);
aggAccessor.reset(stateTag, stateVal);
// call IntegralOp (integralAdd/Remove) on the inputs and call finalize() method after each
// IntegralOp
size_t addIdx = 0, removeIdx = 0;
for (size_t i = 0; i < operations.size(); ++i) {
vm::CodeFragment* compiledExpr;
size_t idx;
if (operations[i] == IntegralOp::kAdd) {
compiledExpr = compiledIntegralAdd.get();
idx = addIdx++;
} else {
compiledExpr = compiledIntegralRemove.get();
idx = removeIdx++;
}
inputAccessor.reset(inputValues[idx].first, inputValues[idx].second);
sortByAccessor.reset(sortByValues[idx].first, sortByValues[idx].second);
auto [runTag, runVal] = runCompiledExpression(compiledExpr);
aggAccessor.reset(runTag, runVal);
auto [outTag, outVal] = runCompiledExpression(compiledIntegralFinalize.get());
ASSERT_EQ(expValues[i].first, outTag);
auto [compareTag, compareVal] =
value::compareValue(expValues[i].first, expValues[i].second, outTag, outVal);
ASSERT_EQ(compareTag, value::TypeTags::NumberInt32);
ASSERT_EQ(value::bitcastTo<int32_t>(compareVal), 0);
value::releaseValue(outTag, outVal);
value::releaseValue(expValues[i].first, expValues[i].second);
}
for (size_t i = 0; i < inputValues.size(); ++i) {
value::releaseValue(inputValues[i].first, inputValues[i].second);
value::releaseValue(sortByValues[i].first, sortByValues[i].second);
}
}
void runAndAssertErrorCode(boost::optional<int64_t> unitMillis,
std::vector<std::pair<value::TypeTags, value::Value>>& inputValues,
std::vector<std::pair<value::TypeTags, value::Value>>& sortByValues,
std::vector<IntegralOp>& operations,
int expErrCode) {
value::ViewOfValueAccessor inputAccessor;
auto inputSlot = bindAccessor(&inputAccessor);
value::ViewOfValueAccessor sortByAccessor;
auto sortBySlot = bindAccessor(&sortByAccessor);
value::OwnedValueAccessor aggAccessor;
auto aggSlot = bindAccessor(&aggAccessor);
auto aggIntegralAddExpr = sbe::makeE<sbe::EFunction>(
"aggIntegralAdd",
sbe::makeEs(makeE<EVariable>(inputSlot), makeE<EVariable>(sortBySlot)));
auto compiledIntegralAdd = compileAggExpression(*aggIntegralAddExpr, &aggAccessor);
auto aggIntegralRemoveExpr = sbe::makeE<sbe::EFunction>(
"aggIntegralRemove",
sbe::makeEs(makeE<EVariable>(inputSlot), makeE<EVariable>(sortBySlot)));
auto compiledIntegralRemove = compileAggExpression(*aggIntegralRemoveExpr, &aggAccessor);
auto aggIntegralFinalize = sbe::makeE<sbe::EFunction>(
"aggIntegralFinalize", sbe::makeEs(makeE<EVariable>(aggSlot)));
auto compiledIntegralFinalize = compileExpression(*aggIntegralFinalize);
auto [stateTag, stateVal] = initState(unitMillis);
aggAccessor.reset(stateTag, stateVal);
Status status = [&]() {
try {
size_t addIdx = 0, removeIdx = 0;
for (size_t i = 0; i < operations.size(); ++i) {
vm::CodeFragment* compiledExpr;
size_t idx;
if (operations[i] == IntegralOp::kAdd) {
compiledExpr = compiledIntegralAdd.get();
idx = addIdx++;
} else {
compiledExpr = compiledIntegralRemove.get();
idx = removeIdx++;
}
inputAccessor.reset(inputValues[idx].first, inputValues[idx].second);
sortByAccessor.reset(sortByValues[idx].first, sortByValues[idx].second);
auto [runTag, runVal] = runCompiledExpression(compiledExpr);
aggAccessor.reset(runTag, runVal);
}
return Status::OK();
} catch (AssertionException& ex) {
return ex.toStatus();
}
}();
ASSERT_FALSE(status.isOK());
ASSERT_EQ(status.code(), expErrCode);
for (size_t i = 0; i < inputValues.size(); ++i) {
value::releaseValue(inputValues[i].first, inputValues[i].second);
value::releaseValue(sortByValues[i].first, sortByValues[i].second);
}
}
};
TEST_F(SBEIntegralTest, IntegralAddRemoveOverDate) {
std::vector<std::pair<value::TypeTags, value::Value>> inputValues = {
{value::TypeTags::NumberDouble, value::bitcastFrom<double>(2.95)},
{value::TypeTags::NumberDouble, value::bitcastFrom<double>(2.7)},
{value::TypeTags::NumberDouble, value::bitcastFrom<double>(2.6)},
{value::TypeTags::NumberDouble, value::bitcastFrom<double>(2.98)}};
std::vector<std::pair<value::TypeTags, value::Value>> sortByValues = {
{value::TypeTags::Date, 1589811030000LL},
{value::TypeTags::Date, 1589811060000LL},
{value::TypeTags::Date, 1589811090000LL},
{value::TypeTags::Date, 1589811120000LL}};
std::vector<IntegralOp> integralOps = {IntegralOp::kAdd,
IntegralOp::kAdd,
IntegralOp::kAdd,
IntegralOp::kAdd,
IntegralOp::kRemove,
IntegralOp::kRemove,
IntegralOp::kRemove,
IntegralOp::kRemove};
std::vector<std::pair<value::TypeTags, value::Value>> expValues = {
{value::TypeTags::NumberDouble, value::bitcastFrom<double>(0)},
{value::TypeTags::NumberDouble, value::bitcastFrom<double>(0.023541666666666666)},
{value::TypeTags::NumberDouble, value::bitcastFrom<double>(0.045625)},
{value::TypeTags::NumberDouble, value::bitcastFrom<double>(0.068875)},
{value::TypeTags::NumberDouble, value::bitcastFrom<double>(0.045333333333333337)},
{value::TypeTags::NumberDouble, value::bitcastFrom<double>(0.02325)},
{value::TypeTags::NumberDouble, value::bitcastFrom<double>(0)},
{value::TypeTags::Null, 0}};
boost::optional<int64_t> unitMillis = 60LL * 60LL * 1000LL; // hour unit
runAndAssertExpression(unitMillis, inputValues, sortByValues, integralOps, expValues);
}
TEST_F(SBEIntegralTest, IntegralWithMixedTypes) {
std::vector<std::pair<value::TypeTags, value::Value>> inputValues = {
{value::TypeTags::NumberInt32, value::bitcastFrom<int32_t>(10)},
{value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(10ll)},
{value::TypeTags::NumberDouble, value::bitcastFrom<double>(10.0)},
{value::TypeTags::NumberDecimal, value::makeCopyDecimal(Decimal128{10.0}).second},
{value::TypeTags::NumberDouble, value::bitcastFrom<double>(10.0)},
{value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(10ll)},
{value::TypeTags::NumberInt32, value::bitcastFrom<int32_t>(10)},
};
std::vector<std::pair<value::TypeTags, value::Value>> sortByValues = {
{value::TypeTags::NumberInt32, value::bitcastFrom<int32_t>(1)},
{value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(2l)},
{value::TypeTags::NumberDouble, value::bitcastFrom<double>(3.0)},
{value::TypeTags::NumberDecimal, value::makeCopyDecimal(Decimal128{4.0}).second},
{value::TypeTags::NumberDouble, value::bitcastFrom<double>(5.0)},
{value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(6ll)},
{value::TypeTags::NumberInt32, value::bitcastFrom<int32_t>(7)},
};
std::vector<IntegralOp> integralOps = {IntegralOp::kAdd,
IntegralOp::kAdd,
IntegralOp::kAdd,
IntegralOp::kAdd,
IntegralOp::kAdd,
IntegralOp::kAdd,
IntegralOp::kAdd,
IntegralOp::kRemove,
IntegralOp::kRemove,
IntegralOp::kRemove,
IntegralOp::kRemove,
IntegralOp::kRemove,
IntegralOp::kRemove,
IntegralOp::kRemove};
std::vector<std::pair<value::TypeTags, value::Value>> expValues = {
{value::TypeTags::NumberInt32, 0},
{value::TypeTags::NumberDouble, value::bitcastFrom<double>(10.0)},
{value::TypeTags::NumberDouble, value::bitcastFrom<double>(20.0)},
{value::TypeTags::NumberDecimal, value::makeCopyDecimal(Decimal128{30.0}).second},
{value::TypeTags::NumberDecimal, value::makeCopyDecimal(Decimal128{40.0}).second},
{value::TypeTags::NumberDecimal, value::makeCopyDecimal(Decimal128{50.0}).second},
{value::TypeTags::NumberDecimal, value::makeCopyDecimal(Decimal128{60.0}).second},
{value::TypeTags::NumberDecimal, value::makeCopyDecimal(Decimal128{50.0}).second},
{value::TypeTags::NumberDecimal, value::makeCopyDecimal(Decimal128{40.0}).second},
{value::TypeTags::NumberDecimal, value::makeCopyDecimal(Decimal128{30.0}).second},
{value::TypeTags::NumberDouble, value::bitcastFrom<double>(20.0)},
{value::TypeTags::NumberDouble, value::bitcastFrom<double>(10.0)},
{value::TypeTags::NumberInt32, 0},
{value::TypeTags::Null, 0},
};
runAndAssertExpression(boost::none, inputValues, sortByValues, integralOps, expValues);
}
TEST_F(SBEIntegralTest, IntegralWithNaNAndInfinityValues) {
std::vector<std::pair<value::TypeTags, value::Value>> inputValues = {
{value::TypeTags::NumberInt64, 10},
{value::TypeTags::NumberDouble,
value::bitcastFrom<double>(std::numeric_limits<double>::quiet_NaN())},
{value::TypeTags::NumberDecimal, value::makeCopyDecimal(Decimal128::kNegativeNaN).second},
{value::TypeTags::NumberInt64, 20},
{value::TypeTags::NumberDouble,
value::bitcastFrom<double>(std::numeric_limits<double>::infinity())},
{value::TypeTags::NumberDecimal,
value::makeCopyDecimal(Decimal128::kNegativeInfinity).second},
{value::TypeTags::NumberInt64, 30},
{value::TypeTags::NumberInt64, 40},
};
std::vector<std::pair<value::TypeTags, value::Value>> sortByValues = {
{value::TypeTags::NumberInt64, value::bitcastFrom<int32_t>(1)},
{value::TypeTags::NumberInt64, value::bitcastFrom<int32_t>(2)},
{value::TypeTags::NumberInt64, value::bitcastFrom<int32_t>(3)},
{value::TypeTags::NumberInt64, value::bitcastFrom<int32_t>(4)},
{value::TypeTags::NumberInt64, value::bitcastFrom<int32_t>(5)},
{value::TypeTags::NumberInt64, value::bitcastFrom<int32_t>(6)},
{value::TypeTags::NumberInt64, value::bitcastFrom<int32_t>(7)},
{value::TypeTags::NumberInt64, value::bitcastFrom<int32_t>(8)},
};
std::vector<IntegralOp> integralOps = {IntegralOp::kAdd,
IntegralOp::kAdd,
IntegralOp::kAdd,
IntegralOp::kAdd,
IntegralOp::kAdd,
IntegralOp::kAdd,
IntegralOp::kAdd,
IntegralOp::kAdd,
IntegralOp::kRemove,
IntegralOp::kRemove,
IntegralOp::kRemove,
IntegralOp::kRemove,
IntegralOp::kRemove,
IntegralOp::kRemove,
IntegralOp::kRemove,
IntegralOp::kRemove};
std::vector<std::pair<value::TypeTags, value::Value>> expValues = {
{value::TypeTags::NumberInt32, 0},
{value::TypeTags::NumberDouble,
value::bitcastFrom<double>(std::numeric_limits<double>::quiet_NaN())},
{value::TypeTags::NumberDouble,
value::bitcastFrom<double>(std::numeric_limits<double>::quiet_NaN())},
{value::TypeTags::NumberDouble,
value::bitcastFrom<double>(std::numeric_limits<double>::quiet_NaN())},
{value::TypeTags::NumberDouble,
value::bitcastFrom<double>(std::numeric_limits<double>::quiet_NaN())},
{value::TypeTags::NumberDouble,
value::bitcastFrom<double>(std::numeric_limits<double>::quiet_NaN())},
{value::TypeTags::NumberDouble,
value::bitcastFrom<double>(std::numeric_limits<double>::quiet_NaN())},
{value::TypeTags::NumberDouble,
value::bitcastFrom<double>(std::numeric_limits<double>::quiet_NaN())},
{value::TypeTags::NumberDouble,
value::bitcastFrom<double>(std::numeric_limits<double>::quiet_NaN())},
{value::TypeTags::NumberDouble,
value::bitcastFrom<double>(std::numeric_limits<double>::quiet_NaN())},
{value::TypeTags::NumberDecimal, value::makeCopyDecimal(Decimal128::kPositiveNaN).second},
{value::TypeTags::NumberDecimal, value::makeCopyDecimal(Decimal128::kPositiveNaN).second},
{value::TypeTags::NumberDecimal,
value::makeCopyDecimal(Decimal128::kNegativeInfinity).second},
{value::TypeTags::NumberDouble, value::bitcastFrom<double>(35.0)},
{value::TypeTags::NumberInt32, value::bitcastFrom<int32_t>(0)},
{value::TypeTags::Null, 0},
};
runAndAssertExpression(boost::none, inputValues, sortByValues, integralOps, expValues);
}
TEST_F(SBEIntegralTest, IntegralWithDatesAndNoUnit) {
std::vector<std::pair<value::TypeTags, value::Value>> inputValues = {
{value::TypeTags::NumberDouble, value::bitcastFrom<double>(2.95)},
{value::TypeTags::NumberDouble, value::bitcastFrom<double>(2.98)}};
std::vector<std::pair<value::TypeTags, value::Value>> sortByValues = {
{value::TypeTags::Date, 1589811030000LL}, {value::TypeTags::Date, 1589811060000LL}};
std::vector<IntegralOp> integralOps = {
IntegralOp::kAdd, IntegralOp::kAdd, IntegralOp::kRemove, IntegralOp::kRemove};
runAndAssertErrorCode(boost::none, inputValues, sortByValues, integralOps, 7821111);
}
TEST_F(SBEIntegralTest, IntegralWithNumbersAndUnit) {
std::vector<std::pair<value::TypeTags, value::Value>> inputValues = {
{value::TypeTags::NumberInt32, value::bitcastFrom<int32_t>(10)},
{value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(10ll)},
};
std::vector<std::pair<value::TypeTags, value::Value>> sortByValues = {
{value::TypeTags::NumberInt32, value::bitcastFrom<int32_t>(1)},
{value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(2l)},
};
std::vector<IntegralOp> integralOps = {
IntegralOp::kAdd, IntegralOp::kAdd, IntegralOp::kRemove, IntegralOp::kRemove};
boost::optional<int64_t> unitMillis = 60LL * 60LL * 1000LL;
runAndAssertErrorCode(unitMillis, inputValues, sortByValues, integralOps, 7821110);
}
TEST_F(SBEIntegralTest, IntegralWithIncorrectTypes1) {
std::vector<std::pair<value::TypeTags, value::Value>> inputValues = {
{value::TypeTags::StringSmall, value::makeSmallString("a").second},
};
std::vector<std::pair<value::TypeTags, value::Value>> sortByValues = {
{value::TypeTags::NumberInt32, value::bitcastFrom<int32_t>(1)},
};
std::vector<IntegralOp> integralOps = {IntegralOp::kAdd};
runAndAssertErrorCode(boost::none, inputValues, sortByValues, integralOps, 7821109);
}
TEST_F(SBEIntegralTest, IntegralWithIncorrectTypes2) {
std::vector<std::pair<value::TypeTags, value::Value>> inputValues = {
{value::TypeTags::NumberInt32, value::bitcastFrom<int32_t>(1)},
};
std::vector<std::pair<value::TypeTags, value::Value>> sortByValues = {
{value::TypeTags::StringSmall, value::makeSmallString("a").second},
};
std::vector<IntegralOp> integralOps = {IntegralOp::kAdd};
runAndAssertErrorCode(boost::none, inputValues, sortByValues, integralOps, 7821111);
}
} // namespace mongo::sbe

View File

@@ -7072,6 +7072,384 @@ FastTuple<bool, value::TypeTags, value::Value> ByteCode::builtinAggRemovableSumF
return aggRemovableSumFinalizeImpl(state);
}
/**
* Functions that operate on `ArrayQueue`
*/
// Get the underlying array, and start index and end index that demarcates the queue
std::tuple<value::Array*, size_t, size_t> getArrayQueueState(value::Array* arrayQueue) {
auto [arrayTag, arrayVal] = arrayQueue->getAt(static_cast<size_t>(ArrayQueueElems::kArray));
uassert(7821100, "Expected an array", arrayTag == value::TypeTags::Array);
auto array = value::getArrayView(arrayVal);
auto size = array->size();
uassert(7821116, "Expected non-empty array", size > 0);
auto [startIdxTag, startIdxVal] =
arrayQueue->getAt(static_cast<size_t>(ArrayQueueElems::kStartIdx));
uassert(7821101, "Expected NumberInt64 type", startIdxTag == value::TypeTags::NumberInt64);
auto startIdx = value::bitcastTo<size_t>(startIdxVal);
uassert(7821114,
str::stream() << "Invalid startIdx " << startIdx << " with array size " << size,
startIdx < size);
auto [queueSizeTag, queueSizeVal] =
arrayQueue->getAt(static_cast<size_t>(ArrayQueueElems::kQueueSize));
uassert(7821102, "Expected NumberInt64 type", queueSizeTag == value::TypeTags::NumberInt64);
auto queueSize = value::bitcastTo<size_t>(queueSizeVal);
uassert(7821115,
str::stream() << "Invalid queueSize " << queueSize << " with array size " << size,
queueSize <= size);
return {array, startIdx, queueSize};
}
// Update the startIdex and index of the `ArrayQueue`
void updateArrayQueueState(value::Array* arrayQueue, size_t startIdx, size_t queueSize) {
arrayQueue->setAt(static_cast<size_t>(ArrayQueueElems::kStartIdx),
value::TypeTags::NumberInt64,
value::bitcastFrom<size_t>(startIdx));
arrayQueue->setAt(static_cast<size_t>(ArrayQueueElems::kQueueSize),
value::TypeTags::NumberInt64,
value::bitcastFrom<size_t>(queueSize));
}
// Return the size of the queue
size_t arrayQueueSize(value::Array* arrayQueue) {
auto [array, startIdx, queueSize] = getArrayQueueState(arrayQueue);
return queueSize;
}
// Push an element {tag, value} into the queue
void arrayQueuePush(value::Array* arrayQueue, value::TypeTags tag, value::Value val) {
/* The underlying array acts as a circular buffer for the queue with `startIdx` and `queueSize`
* demarcating the filled region (with remaining region containing nulls). When pushing an
* element to the queue, we set at the corresponding index [= (startIdx + queueSize) %
* arraySize] the element to be added. If the underlying array is filled, we double the size of
* the array (by adding nulls); the existing elements in the queue may need to be rearranged
* when that happens.
*
* Eg, Push {v} :
* => Initial State: (x = filled; _ = empty)
* [x x x x]
* |
* startIdx (queueSize = 4, arraySize = 4)
*
* => Double array size:
* [x x x x _ _ _ _]
* |
* startIdx (queueSize = 4, arraySize = 8)
*
* => Rearrange elements:
* [x x _ _ _ _ x x]
* |
* startIdx (queueSize = 4, arraySize = 8)
*
* => Add element:
* [x x v _ _ _ x x]
* |
* startIdx (queueSize = 5, arraySize = 8)
*/
value::ValueGuard guard{tag, val};
auto [array, startIdx, queueSize] = getArrayQueueState(arrayQueue);
auto cap = array->size();
if (queueSize == cap) {
// reallocate with twice size
auto newCap = cap * 2;
array->reserve(newCap);
auto extend = newCap - cap;
for (size_t i = 0; i < extend; ++i) {
array->push_back(value::TypeTags::Null, 0);
}
if (startIdx > 0) {
// existing values wrap over the array
// need to rearrange the values from [startIdx, cap-1]
for (size_t from = cap - 1, to = newCap - 1; from >= startIdx; --from, --to) {
auto [movTag, movVal] = array->swapAt(from, value::TypeTags::Null, 0);
array->setAt(to, movTag, movVal);
}
startIdx += extend;
}
cap = newCap;
}
auto endIdx = (startIdx + queueSize) % cap;
guard.reset();
array->setAt(endIdx, tag, val);
updateArrayQueueState(arrayQueue, startIdx, queueSize + 1);
}
/* Pops an element {tag, value} from the queue and returns it */
std::pair<value::TypeTags, value::Value> arrayQueuePop(value::Array* arrayQueue) {
auto [array, startIdx, queueSize] = getArrayQueueState(arrayQueue);
if (queueSize == 0) {
return {value::TypeTags::Nothing, 0};
}
auto cap = array->size();
auto pair = array->swapAt(startIdx, value::TypeTags::Null, 0);
startIdx = (startIdx + 1) % cap;
updateArrayQueueState(arrayQueue, startIdx, queueSize - 1);
return pair;
}
std::pair<value::TypeTags, value::Value> arrayQueueFront(value::Array* arrayQueue) {
auto [array, startIdx, queueSize] = getArrayQueueState(arrayQueue);
if (queueSize == 0) {
return {value::TypeTags::Nothing, 0};
}
return array->getAt(startIdx);
}
std::pair<value::TypeTags, value::Value> arrayQueueBack(value::Array* arrayQueue) {
auto [array, startIdx, queueSize] = getArrayQueueState(arrayQueue);
if (queueSize == 0) {
return {value::TypeTags::Nothing, 0};
}
auto cap = array->size();
auto endIdx = (startIdx + queueSize - 1) % cap;
return array->getAt(endIdx);
}
/**
* Helper functions for integralAdd/Remove/Finalize
*/
std::tuple<value::Array*,
value::Array*,
value::Array*,
value::Array*,
int64_t,
boost::optional<int64_t>>
getIntegralState(value::TypeTags stateTag, value::Value stateVal) {
uassert(
7821103, "The accumulator state should be an array", stateTag == value::TypeTags::Array);
auto state = value::getArrayView(stateVal);
auto maxSize = static_cast<size_t>(AggIntegralElems::kMaxSizeOfArray);
uassert(7821104,
"The accumulator state should have correct number of elements",
state->size() == maxSize);
auto [inputQueueTag, inputQueueVal] =
state->getAt(static_cast<size_t>(AggIntegralElems::kInputQueue));
uassert(7821105, "InputQueue should be of array type", inputQueueTag == value::TypeTags::Array);
auto inputQueue = value::getArrayView(inputQueueVal);
auto [sortByQueueTag, sortByQueueVal] =
state->getAt(static_cast<size_t>(AggIntegralElems::kSortByQueue));
uassert(
7821121, "SortByQueue should be of array type", sortByQueueTag == value::TypeTags::Array);
auto sortByQueue = value::getArrayView(sortByQueueVal);
auto [integralTag, integralVal] =
state->getAt(static_cast<size_t>(AggIntegralElems::kIntegral));
uassert(7821106, "Integral should be of array type", integralTag == value::TypeTags::Array);
auto integral = value::getArrayView(integralVal);
auto [nanCountTag, nanCountVal] =
state->getAt(static_cast<size_t>(AggIntegralElems::kNanCount));
uassert(7821107,
"nanCount should be of NumberInt64 type",
nanCountTag == value::TypeTags::NumberInt64);
auto nanCount = value::bitcastTo<int64_t>(nanCountVal);
boost::optional<int64_t> unitMillis;
auto [unitMillisTag, unitMillisVal] =
state->getAt(static_cast<size_t>(AggIntegralElems::kUnitMillis));
if (unitMillisTag != value::TypeTags::Null) {
uassert(7821108,
"unitMillis should be of type NumberInt64",
unitMillisTag == value::TypeTags::NumberInt64);
unitMillis = value::bitcastTo<int64_t>(unitMillisVal);
}
return {state, inputQueue, sortByQueue, integral, nanCount, unitMillis};
}
void updateNaNCount(value::Array* state, int64_t nanCount) {
state->setAt(static_cast<size_t>(AggIntegralElems::kNanCount),
value::TypeTags::NumberInt64,
value::bitcastFrom<int64_t>(nanCount));
}
void assertTypesForIntegeral(value::TypeTags inputTag,
value::TypeTags sortByTag,
boost::optional<int64_t> unitMillis) {
uassert(7821109, "input value should be of numberic type", value::isNumber(inputTag));
if (unitMillis) {
uassert(7821110,
"Sort-by value should be of date type when unitMillis is provided",
sortByTag == value::TypeTags::Date);
} else {
uassert(7821111, "Sort-by value should be of numeric type", value::isNumber(sortByTag));
}
}
FastTuple<bool, value::TypeTags, value::Value> ByteCode::integralOfTwoPointsByTrapezoidalRule(
std::pair<value::TypeTags, value::Value> prevInput,
std::pair<value::TypeTags, value::Value> prevSortByVal,
std::pair<value::TypeTags, value::Value> newInput,
std::pair<value::TypeTags, value::Value> newSortByVal) {
if (value::isNaN(prevInput.first, prevInput.second) ||
value::isNaN(prevSortByVal.first, prevSortByVal.second) ||
value::isNaN(newInput.first, newInput.second) ||
value::isNaN(newSortByVal.first, newSortByVal.second)) {
return {false, value::TypeTags::NumberInt64, 0};
}
if ((prevSortByVal.first == value::TypeTags::Date &&
newSortByVal.first == value::TypeTags::Date) ||
(value::isNumber(prevSortByVal.first) && value::isNumber(newSortByVal.first))) {
auto [deltaOwned, deltaTag, deltaVal] = genericSub(
newSortByVal.first, newSortByVal.second, prevSortByVal.first, prevSortByVal.second);
value::ValueGuard deltaGuard{deltaOwned, deltaTag, deltaVal};
auto [sumYOwned, sumYTag, sumYVal] =
genericAdd(newInput.first, newInput.second, prevInput.first, prevInput.second);
value::ValueGuard sumYGuard{sumYOwned, sumYTag, sumYVal};
auto [integralOwned, integralTag, integralVal] =
genericMul(sumYTag, sumYVal, deltaTag, deltaVal);
value::ValueGuard integralGuard{integralOwned, integralTag, integralVal};
auto result = genericDiv(
integralTag, integralVal, value::TypeTags::NumberInt64, value::bitcastFrom<int32_t>(2));
return result;
} else {
return {false, value::TypeTags::NumberInt64, 0};
}
}
FastTuple<bool, value::TypeTags, value::Value> ByteCode::builtinAggIntegralAdd(ArityType arity) {
auto [stateTag, stateVal] = moveOwnedFromStack(0);
auto [inputTag, inputVal] = moveOwnedFromStack(1);
auto [sortByTag, sortByVal] = moveOwnedFromStack(2);
value::ValueGuard stateGuard{stateTag, stateVal};
value::ValueGuard inputGuard{inputTag, inputVal};
value::ValueGuard sortByGuard{sortByTag, sortByVal};
auto [state, inputQueue, sortByQueue, integral, nanCount, unitMillis] =
getIntegralState(stateTag, stateVal);
assertTypesForIntegeral(inputTag, sortByTag, unitMillis);
if (value::isNaN(inputTag, inputVal) || value::isNaN(sortByTag, sortByVal)) {
nanCount++;
updateNaNCount(state, nanCount);
}
auto queueSize = arrayQueueSize(inputQueue);
uassert(7821119, "Queue sizes should match", queueSize == arrayQueueSize(sortByQueue));
if (queueSize > 0) {
auto inputBack = arrayQueueBack(inputQueue);
auto sortByBack = arrayQueueBack(sortByQueue);
auto [integralDeltaOwned, integralDeltaTag, integralDeltaVal] =
integralOfTwoPointsByTrapezoidalRule(
inputBack, sortByBack, {inputTag, inputVal}, {sortByTag, sortByVal});
value::ValueGuard integralDeltaGuard{
integralDeltaOwned, integralDeltaTag, integralDeltaVal};
aggRemovableSumImpl<1>(integral, integralDeltaTag, integralDeltaVal);
}
inputGuard.reset();
arrayQueuePush(inputQueue, inputTag, inputVal);
sortByGuard.reset();
arrayQueuePush(sortByQueue, sortByTag, sortByVal);
stateGuard.reset();
return {true, stateTag, stateVal};
}
FastTuple<bool, value::TypeTags, value::Value> ByteCode::builtinAggIntegralRemove(ArityType arity) {
auto [stateTag, stateVal] = moveOwnedFromStack(0);
auto [inputOwned, inputTag, inputVal] = getFromStack(1);
auto [sortByOwned, sortByTag, sortByVal] = getFromStack(2);
value::ValueGuard stateGuard{stateTag, stateVal};
auto [state, inputQueue, sortByQueue, integral, nanCount, unitMillis] =
getIntegralState(stateTag, stateVal);
assertTypesForIntegeral(inputTag, sortByTag, unitMillis);
// verify that the input and sortby value to be removed are the first elements of the queues
auto [frontInputTag, frontInputVal] = arrayQueuePop(inputQueue);
value::ValueGuard frontInputGuard{frontInputTag, frontInputVal};
auto [cmpTag, cmpVal] = value::compareValue(frontInputTag, frontInputVal, inputTag, inputVal);
uassert(7821113,
"Attempted to remove unexpected input value",
cmpTag == value::TypeTags::NumberInt32 && value::bitcastTo<int32_t>(cmpVal) == 0);
auto [frontSortByTag, frontSortByVal] = arrayQueuePop(sortByQueue);
value::ValueGuard frontSortByGuard{frontSortByTag, frontSortByVal};
std::tie(cmpTag, cmpVal) =
value::compareValue(frontSortByTag, frontSortByVal, sortByTag, sortByVal);
uassert(7821117,
"Attempted to remove unexpected sortby value",
cmpTag == value::TypeTags::NumberInt32 && value::bitcastTo<int32_t>(cmpVal) == 0);
if (value::isNaN(inputTag, inputVal) || value::isNaN(sortByTag, sortByVal)) {
nanCount--;
updateNaNCount(state, nanCount);
}
auto queueSize = arrayQueueSize(inputQueue);
uassert(7821120, "Queue sizes should match", queueSize == arrayQueueSize(sortByQueue));
if (queueSize > 0) {
auto inputPair = arrayQueueFront(inputQueue);
auto sortByPair = arrayQueueFront(sortByQueue);
auto [integralDeltaOwned, integralDeltaTag, integralDeltaVal] =
integralOfTwoPointsByTrapezoidalRule(
{inputTag, inputVal}, {sortByTag, sortByVal}, inputPair, sortByPair);
value::ValueGuard integralDeltaGuard{
integralDeltaOwned, integralDeltaTag, integralDeltaVal};
aggRemovableSumImpl<-1>(integral, integralDeltaTag, integralDeltaVal);
}
stateGuard.reset();
return {true, stateTag, stateVal};
}
FastTuple<bool, value::TypeTags, value::Value> ByteCode::builtinAggIntegralFinalize(
ArityType arity) {
auto [stateOwned, stateTag, stateVal] = getFromStack(0);
auto [state, inputQueue, sortByQueue, integral, nanCount, unitMillis] =
getIntegralState(stateTag, stateVal);
auto queueSize = arrayQueueSize(inputQueue);
uassert(7821118, "Queue sizes should match", queueSize == arrayQueueSize(sortByQueue));
if (queueSize == 0) {
return {false, value::TypeTags::Null, 0};
}
if (nanCount > 0) {
return {false,
value::TypeTags::NumberDouble,
value::bitcastFrom<double>(std::numeric_limits<double>::quiet_NaN())};
}
auto [resultOwned, resultTag, resultVal] = aggRemovableSumFinalizeImpl(integral);
value::ValueGuard resultGuard{resultOwned, resultTag, resultVal};
if (unitMillis) {
auto [divResultOwned, divResultTag, divResultVal] =
genericDiv(resultTag,
resultVal,
value::TypeTags::NumberInt64,
value::bitcastFrom<int64_t>(*unitMillis));
return {divResultOwned, divResultTag, divResultVal};
} else {
resultGuard.reset();
return {resultOwned, resultTag, resultVal};
}
}
FastTuple<bool, value::TypeTags, value::Value> ByteCode::dispatchBuiltin(Builtin f,
ArityType arity) {
switch (f) {
@@ -7404,6 +7782,12 @@ FastTuple<bool, value::TypeTags, value::Value> ByteCode::dispatchBuiltin(Builtin
return builtinAggRemovableSum<-1 /*sign*/>(arity);
case Builtin::aggRemovableSumFinalize:
return builtinAggRemovableSumFinalize(arity);
case Builtin::aggIntegralAdd:
return builtinAggIntegralAdd(arity);
case Builtin::aggIntegralRemove:
return builtinAggIntegralRemove(arity);
case Builtin::aggIntegralFinalize:
return builtinAggIntegralFinalize(arity);
}
MONGO_UNREACHABLE;
@@ -7742,6 +8126,12 @@ std::string builtinToString(Builtin b) {
return "aggRemovableSumRemove";
case Builtin::aggRemovableSumFinalize:
return "aggRemovableSumFinalize";
case Builtin::aggIntegralAdd:
return "aggIntegralAdd";
case Builtin::aggIntegralRemove:
return "aggIntegralRemove";
case Builtin::aggIntegralFinalize:
return "aggIntegralFinalize";
default:
MONGO_UNREACHABLE;
}

View File

@@ -813,6 +813,9 @@ enum class Builtin : uint8_t {
aggRemovableSumAdd,
aggRemovableSumRemove,
aggRemovableSumFinalize,
aggIntegralAdd,
aggIntegralRemove,
aggIntegralFinalize,
};
std::string builtinToString(Builtin b);
@@ -975,6 +978,34 @@ enum class AggRemovableSumElems {
kSizeOfArray
};
/**
* This enum defines indices into an 'Array' that stores the state for $integral accumulator
* Element at `kInputQueue` stores the queue of input values
* Element at `kSortByQueue` stores the queue of sortBy values
* Element at `kIntegral` stores the integral over the current window
* Element at `kNanCount` stores the count of NaN values encountered
* Element at `kunitMillis` stores the date unit (Null if not valid)
*/
enum class AggIntegralElems {
kInputQueue,
kSortByQueue,
kIntegral,
kNanCount,
kUnitMillis,
kMaxSizeOfArray
};
/**
* This enum defines indices into an 'Array' that stores the state for a queue backed by a
* circular array
* Element at `kArray` stores the underlying array thats holds the elements. This should be
* initialized to a non-zero size initially.
* Element at `kStartIdx` stores the start position of the queue
* Element at `kQueueSize` stores the size of the queue
* The empty values in the array are filled with Null
*/
enum class ArrayQueueElems { kArray, kStartIdx, kQueueSize };
using SmallArityType = uint8_t;
using ArityType = uint32_t;
@@ -1797,6 +1828,15 @@ private:
void updateRemovableSumAccForIntegerType(value::Array* sumAcc,
value::TypeTags rhsTag,
value::Value rhsVal);
FastTuple<bool, value::TypeTags, value::Value> builtinAggIntegralAdd(ArityType arity);
FastTuple<bool, value::TypeTags, value::Value> builtinAggIntegralRemove(ArityType arity);
FastTuple<bool, value::TypeTags, value::Value> builtinAggIntegralFinalize(ArityType arity);
FastTuple<bool, value::TypeTags, value::Value> integralOfTwoPointsByTrapezoidalRule(
std::pair<value::TypeTags, value::Value> prevInput,
std::pair<value::TypeTags, value::Value> prevSortByVal,
std::pair<value::TypeTags, value::Value> newInput,
std::pair<value::TypeTags, value::Value> newSortByVal);
FastTuple<bool, value::TypeTags, value::Value> dispatchBuiltin(Builtin f, ArityType arity);