Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Timestamp support #565

Merged
merged 21 commits into from
May 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ Here is a list of sbt commands for daily development:
> ~test:compile # Compile both source and test codes
> ~test # Run tests upon source code change
> ~testOnly *MessagePackTest # Run tests in the specified class
> ~testOnly *MessagePackTest -- -n prim # Run the test tagged as "prim"
> ~testOnly *MessagePackTest -- (pattern) # Run tests matching the pattern
> project msgpack-core # Focus on a specific project
> package # Create a jar file in the target folder of each project
> findbugs # Produce findbugs report in target/findbugs
Expand Down
18 changes: 12 additions & 6 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ Global / concurrentRestrictions := Seq(
Tags.limit(Tags.Test, 1)
)

val AIRFRAME_VERSION = "20.4.1"

// Use dynamic snapshot version strings for non tagged versions
ThisBuild / dynverSonatypeSnapshots := true
// Use coursier friendly version separator
Expand Down Expand Up @@ -71,15 +73,19 @@ lazy val msgpackCore = Project(id = "msgpack-core", base = file("msgpack-core"))
"org.msgpack.value",
"org.msgpack.value.impl"
),
testFrameworks += new TestFramework("wvlet.airspec.Framework"),
libraryDependencies ++= Seq(
// msgpack-core should have no external dependencies
junitInterface,
"org.scalatest" %% "scalatest" % "3.2.8" % "test",
"org.scalacheck" %% "scalacheck" % "1.15.4" % "test",
"org.xerial" %% "xerial-core" % "3.6.0" % "test",
"org.msgpack" % "msgpack" % "0.6.12" % "test",
"commons-codec" % "commons-codec" % "1.12" % "test",
"com.typesafe.akka" %% "akka-actor" % "2.5.23" % "test"
"org.wvlet.airframe" %% "airframe-json" % AIRFRAME_VERSION % "test",
"org.wvlet.airframe" %% "airspec" % AIRFRAME_VERSION % "test",
// Add property testing support with forAll methods
"org.scalacheck" %% "scalacheck" % "1.15.4" % "test",
// For performance comparison with msgpack v6
"org.msgpack" % "msgpack" % "0.6.12" % "test",
// For integration test with Akka
"com.typesafe.akka" %% "akka-actor" % "2.5.23" % "test",
"org.scala-lang.modules" %% "scala-collection-compat" % "2.4.3" % "test"
)
)

Expand Down
2 changes: 2 additions & 0 deletions msgpack-core/src/main/java/org/msgpack/core/MessagePack.java
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ public static final boolean isFixedRaw(byte b)
public static final byte MAP32 = (byte) 0xdf;

public static final byte NEGFIXINT_PREFIX = (byte) 0xe0;

public static final byte EXT_TIMESTAMP = (byte) -1;
}

private MessagePack()
Expand Down
107 changes: 107 additions & 0 deletions msgpack-core/src/main/java/org/msgpack/core/MessagePacker.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.nio.charset.CharsetEncoder;
import java.nio.charset.CoderResult;
import java.nio.charset.CodingErrorAction;
import java.time.Instant;

import static org.msgpack.core.MessagePack.Code.ARRAY16;
import static org.msgpack.core.MessagePack.Code.ARRAY32;
Expand All @@ -41,6 +42,7 @@
import static org.msgpack.core.MessagePack.Code.EXT16;
import static org.msgpack.core.MessagePack.Code.EXT32;
import static org.msgpack.core.MessagePack.Code.EXT8;
import static org.msgpack.core.MessagePack.Code.EXT_TIMESTAMP;
import static org.msgpack.core.MessagePack.Code.FALSE;
import static org.msgpack.core.MessagePack.Code.FIXARRAY_PREFIX;
import static org.msgpack.core.MessagePack.Code.FIXEXT1;
Expand Down Expand Up @@ -798,6 +800,111 @@ else if (s.length() < (1 << 16)) {
return this;
}

/**
* Writes a Timestamp value.
*
* <p>
* This method writes a timestamp value using timestamp format family.
*
* @param instant the timestamp to be written
* @return this packer
* @throws IOException when underlying output throws IOException
*/
public MessagePacker packTimestamp(Instant instant)
throws IOException
{
return packTimestamp(instant.getEpochSecond(), instant.getNano());
}

/**
* Writes a Timesamp value using a millisecond value (e.g., System.currentTimeMillis())
* @param millis the millisecond value
* @return this packer
* @throws IOException when underlying output throws IOException
*/
public MessagePacker packTimestamp(long millis)
throws IOException
{
return packTimestamp(Instant.ofEpochMilli(millis));
}

private static final long NANOS_PER_SECOND = 1000000000L;

/**
* Writes a Timestamp value.
*
* <p>
* This method writes a timestamp value using timestamp format family.
*
* @param epochSecond the number of seconds from 1970-01-01T00:00:00Z
* @param nanoAdjustment the nanosecond adjustment to the number of seconds, positive or negative
* @return this
* @throws IOException when underlying output throws IOException
* @throws ArithmeticException when epochSecond plus nanoAdjustment in seconds exceeds the range of long
*/
public MessagePacker packTimestamp(long epochSecond, int nanoAdjustment)
throws IOException, ArithmeticException
{
long sec = Math.addExact(epochSecond, Math.floorDiv(nanoAdjustment, NANOS_PER_SECOND));
long nsec = Math.floorMod((long) nanoAdjustment, NANOS_PER_SECOND);

if (sec >>> 34 == 0) {
// sec can be serialized in 34 bits.
long data64 = (nsec << 34) | sec;
if ((data64 & 0xffffffff00000000L) == 0L) {
// sec can be serialized in 32 bits and nsec is 0.
// use timestamp 32
writeTimestamp32((int) sec);
}
else {
// sec exceeded 32 bits or nsec is not 0.
// use timestamp 64
writeTimestamp64(data64);
}
}
else {
// use timestamp 96 format
writeTimestamp96(sec, (int) nsec);
}
return this;
}

private void writeTimestamp32(int sec)
throws IOException
{
// timestamp 32 in fixext 4
ensureCapacity(6);
buffer.putByte(position++, FIXEXT4);
buffer.putByte(position++, EXT_TIMESTAMP);
buffer.putInt(position, sec);
position += 4;
}

private void writeTimestamp64(long data64)
throws IOException
{
// timestamp 64 in fixext 8
ensureCapacity(10);
buffer.putByte(position++, FIXEXT8);
buffer.putByte(position++, EXT_TIMESTAMP);
buffer.putLong(position, data64);
position += 8;
}

private void writeTimestamp96(long sec, int nsec)
throws IOException
{
// timestamp 96 in ext 8
ensureCapacity(15);
buffer.putByte(position++, EXT8);
buffer.putByte(position++, (byte) 12); // length of nsec and sec
buffer.putByte(position++, EXT_TIMESTAMP);
buffer.putInt(position, nsec);
position += 4;
buffer.putLong(position, sec);
position += 8;
}

/**
* Writes header of an Array value.
* <p>
Expand Down
62 changes: 60 additions & 2 deletions msgpack-core/src/main/java/org/msgpack/core/MessageUnpacker.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CoderResult;
import java.nio.charset.CodingErrorAction;
import java.time.Instant;

import static org.msgpack.core.MessagePack.Code.EXT_TIMESTAMP;
import static org.msgpack.core.Preconditions.checkNotNull;

/**
Expand Down Expand Up @@ -595,6 +597,12 @@ private static MessagePackException unexpected(String expected, byte b)
}
}

private static MessagePackException unexpectedExtension(String expected, int expectedType, int actualType)
{
return new MessageTypeException(String.format("Expected extension type %s (%d), but got extension type %d",
expected, expectedType, actualType));
}

public ImmutableValue unpackValue()
throws IOException
{
Expand Down Expand Up @@ -643,7 +651,12 @@ public ImmutableValue unpackValue()
}
case EXTENSION: {
ExtensionTypeHeader extHeader = unpackExtensionTypeHeader();
return ValueFactory.newExtension(extHeader.getType(), readPayload(extHeader.getLength()));
switch (extHeader.getType()) {
case EXT_TIMESTAMP:
return ValueFactory.newTimestamp(unpackTimestamp(extHeader));
default:
return ValueFactory.newExtension(extHeader.getType(), readPayload(extHeader.getLength()));
}
}
default:
throw new MessageNeverUsedFormatException("Unknown value type");
Expand Down Expand Up @@ -707,7 +720,13 @@ public Variable unpackValue(Variable var)
}
case EXTENSION: {
ExtensionTypeHeader extHeader = unpackExtensionTypeHeader();
var.setExtensionValue(extHeader.getType(), readPayload(extHeader.getLength()));
switch (extHeader.getType()) {
case EXT_TIMESTAMP:
var.setTimestampValue(unpackTimestamp(extHeader));
break;
default:
var.setExtensionValue(extHeader.getType(), readPayload(extHeader.getLength()));
}
return var;
}
default:
Expand Down Expand Up @@ -1257,6 +1276,45 @@ private String decodeStringFastPath(int length)
}
}

public Instant unpackTimestamp()
throws IOException
{
ExtensionTypeHeader ext = unpackExtensionTypeHeader();
return unpackTimestamp(ext);
}

/**
* Internal method that can be used only when the extension type header is already read.
*/
private Instant unpackTimestamp(ExtensionTypeHeader ext) throws IOException
{
if (ext.getType() != EXT_TIMESTAMP) {
throw unexpectedExtension("Timestamp", EXT_TIMESTAMP, ext.getType());
}
switch (ext.getLength()) {
case 4: {
// Need to convert Java's int (int32) to uint32
long u32 = readInt() & 0xffffffffL;
return Instant.ofEpochSecond(u32);
}
case 8: {
long data64 = readLong();
int nsec = (int) (data64 >>> 34);
long sec = data64 & 0x00000003ffffffffL;
return Instant.ofEpochSecond(sec, nsec);
}
case 12: {
// Need to convert Java's int (int32) to uint32
long nsecU32 = readInt() & 0xffffffffL;
long sec = readLong();
return Instant.ofEpochSecond(sec, nsecU32);
}
default:
throw new MessageFormatException(String.format("Timestamp extension type (%d) expects 4, 8, or 12 bytes of payload but got %d bytes",
EXT_TIMESTAMP, ext.getLength()));
}
}

/**
* Reads header of an array.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
//
// MessagePack for Java
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package org.msgpack.value;

/**
* Immutable representation of MessagePack's Timestamp type.
*
* @see org.msgpack.value.TimestampValue
*/
public interface ImmutableTimestampValue
extends TimestampValue, ImmutableValue
{
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,7 @@ public interface ImmutableValue

@Override
public ImmutableStringValue asStringValue();

@Override
public ImmutableTimestampValue asTimestampValue();
}
33 changes: 33 additions & 0 deletions msgpack-core/src/main/java/org/msgpack/value/TimestampValue.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
//
// MessagePack for Java
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package org.msgpack.value;

import java.time.Instant;

/**
* Value representation of MessagePack's Timestamp type.
*/
public interface TimestampValue
extends ExtensionValue
{
long getEpochSecond();

int getNano();

long toEpochMillis();

Instant toInstant();
}
18 changes: 18 additions & 0 deletions msgpack-core/src/main/java/org/msgpack/value/Value.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package org.msgpack.value;

import org.msgpack.core.MessagePacker;
import org.msgpack.core.MessageTypeCastException;

import java.io.IOException;

Expand Down Expand Up @@ -180,6 +181,14 @@ public interface Value
*/
boolean isExtensionValue();

/**
* Returns true if the type of this value is Timestamp.
*
* If this method returns true, {@code asTimestamp} never throws exceptions.
* Note that you can't use <code>instanceof</code> or cast <code>((MapValue) thisValue)</code> to check type of a value because type of a mutable value is variable.
*/
boolean isTimestampValue();

/**
* Returns the value as {@code NilValue}. Otherwise throws {@code MessageTypeCastException}.
*
Expand Down Expand Up @@ -280,6 +289,15 @@ public interface Value
*/
ExtensionValue asExtensionValue();

/**
* Returns the value as {@code TimestampValue}. Otherwise throws {@code MessageTypeCastException}.
*
* Note that you can't use <code>instanceof</code> or cast <code>((TimestampValue) thisValue)</code> to check type of a value because type of a mutable value is variable.
*
* @throws MessageTypeCastException If type of this value is not Map.
*/
TimestampValue asTimestampValue();

/**
* Serializes the value using the specified {@code MessagePacker}
*
Expand Down
Loading